summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRed_M <1468433+Red-M@users.noreply.github.com>2019-07-07 20:36:49 +1000
committerGitHub <noreply@github.com>2019-07-07 20:36:49 +1000
commit59e8768f8a9bbd213bc2fc5e063feb7115f13404 (patch)
tree0d22d0188859ff345ae040e45be8ac49c746fd9f
parent01f2acca9ac4a8ebcfe440812d25a72f5be3e892 (diff)
parent121ca37789bd22cadf2e6f7cf1ee78c5a458f217 (diff)
downloadpexpect-59e8768f8a9bbd213bc2fc5e063feb7115f13404.tar.gz
Merge pull request #8 from pexpect/master
updates
-rw-r--r--.travis.yml5
-rw-r--r--doc/api/pxssh.rst8
-rw-r--r--doc/commonissues.rst8
-rw-r--r--doc/conf.py2
-rw-r--r--doc/history.rst18
-rw-r--r--doc/overview.rst5
-rw-r--r--pexpect/_async.py26
-rw-r--r--pexpect/expect.py2
-rw-r--r--pexpect/pty_spawn.py116
-rw-r--r--pexpect/pxssh.py76
-rw-r--r--pexpect/replwrap.py14
-rwxr-xr-xtests/fakessh/ssh46
-rw-r--r--tests/test_async.py29
-rwxr-xr-xtests/test_expect.py2
-rw-r--r--tests/test_pxssh.py156
-rw-r--r--tests/test_replwrap.py13
16 files changed, 437 insertions, 89 deletions
diff --git a/.travis.yml b/.travis.yml
index 40d9622..217c443 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -2,7 +2,6 @@ language: python
python:
- 2.7
- - 3.3
- 3.4
- 3.5
- 3.6
@@ -10,6 +9,10 @@ python:
- nightly
matrix:
+ include:
+ - python: 3.7
+ dist: xenial # required for Python 3.7 (travis-ci/travis-ci#9069)
+ sudo: required # required for Python 3.7 (travis-ci/travis-ci#9069)
allow_failures:
# PyPy on Travis is currently incompatible with Cryptography.
- python: pypy
diff --git a/doc/api/pxssh.rst b/doc/api/pxssh.rst
index b947f4b..c9c80c6 100644
--- a/doc/api/pxssh.rst
+++ b/doc/api/pxssh.rst
@@ -1,6 +1,14 @@
pxssh - control an SSH session
==============================
+.. note::
+
+ *pxssh* is a screen-scraping wrapper around the SSH command on your system.
+ In many cases, you should consider using
+ `Paramiko <https://github.com/paramiko/paramiko>`_ instead.
+ Paramiko is a Python module which speaks the SSH protocol directly, so it
+ doesn't have the extra complexity of running a local subprocess.
+
.. automodule:: pexpect.pxssh
.. autoclass:: ExceptionPxssh
diff --git a/doc/commonissues.rst b/doc/commonissues.rst
index f60085e..c74ea7a 100644
--- a/doc/commonissues.rst
+++ b/doc/commonissues.rst
@@ -41,10 +41,10 @@ back to the TTY. I would call this an SSH bug.
Pexpect now automatically adds a short delay before sending data to a child
process. This more closely mimics what happens in the usual human-to-app
-interaction. The delay can be tuned with the ``delaybeforesend`` attribute of the
-spawn class. In general, this fixes the problem for everyone and so this should
-not be an issue for most users. For some applications you might with to turn it
-off::
+interaction. The delay can be tuned with the ``delaybeforesend`` attribute of
+objects of the spawn class. In general, this fixes the problem for everyone and so
+this should not be an issue for most users. For some applications you might with
+to turn it off::
child = pexpect.spawn ("ssh user@example.com")
child.delaybeforesend = None
diff --git a/doc/conf.py b/doc/conf.py
index a734147..166300e 100644
--- a/doc/conf.py
+++ b/doc/conf.py
@@ -52,7 +52,7 @@ copyright = u'2013, Noah Spurrier and contributors'
# built documents.
#
# The short X.Y version.
-version = '4.6'
+version = '4.7'
# The full version, including alpha/beta/rc tags.
release = version
diff --git a/doc/history.rst b/doc/history.rst
index 41a5918..b3b0ff1 100644
--- a/doc/history.rst
+++ b/doc/history.rst
@@ -4,6 +4,24 @@ History
Releases
--------
+Version 4.7
+```````````
+
+* The :meth:`.pxssh.login` method now no longer requires a username if an ssh
+ config is provided and will raise an error if neither are provided.
+ (:ghpull:`562`).
+* The :meth:`.pxssh.login` method now supports providing your own ``ssh``
+ command via the ``cmd`` parameter.
+ (:ghpull:`528`) (:ghpull:`563`).
+* :class:`.pxssh` now supports the ``use_poll`` parameter which is passed into :meth:`.pexpect.spawn`
+ (:ghpull:`542`).
+* Minor bug fix with ``ssh_config``.
+ (:ghpull:`498`).
+* :meth:`.replwrap.run_command` now has async support via an ``async_`` parameter.
+ (:ghpull:`501`).
+* :meth:`.pexpect.spawn` will now read additional bytes if able up to a buffer limit.
+ (:ghpull:`304`).
+
Version 4.6
```````````
diff --git a/doc/overview.rst b/doc/overview.rst
index e52809c..aeb8887 100644
--- a/doc/overview.rst
+++ b/doc/overview.rst
@@ -207,7 +207,10 @@ It is also useful to log the child's input and out to a file or the screen. The
following will turn on logging and send output to stdout (the screen)::
child = pexpect.spawn(foo)
- child.logfile = sys.stdout
+ child.logfile = sys.stdout.buffer
+
+The `sys.stdout.buffer` object is available since Python 3. With Python 2, one
+has to assign just `sys.stdout` instead.
Exceptions
----------
diff --git a/pexpect/_async.py b/pexpect/_async.py
index bdd515b..ca2044e 100644
--- a/pexpect/_async.py
+++ b/pexpect/_async.py
@@ -1,5 +1,6 @@
import asyncio
import errno
+import signal
from pexpect import EOF
@@ -29,6 +30,23 @@ def expect_async(expecter, timeout=None):
transport.pause_reading()
return expecter.timeout(e)
+@asyncio.coroutine
+def repl_run_command_async(repl, cmdlines, timeout=-1):
+ res = []
+ repl.child.sendline(cmdlines[0])
+ for line in cmdlines[1:]:
+ yield from repl._expect_prompt(timeout=timeout, async_=True)
+ res.append(repl.child.before)
+ repl.child.sendline(line)
+
+ # Command was fully submitted, now wait for the next prompt
+ prompt_idx = yield from repl._expect_prompt(timeout=timeout, async_=True)
+ if prompt_idx == 1:
+ # We got the continuation prompt - command was incomplete
+ repl.child.kill(signal.SIGINT)
+ yield from repl._expect_prompt(timeout=1, async_=True)
+ raise ValueError("Continuation prompt found - input was incomplete:")
+ return u''.join(res + [repl.child.before])
class PatternWaiter(asyncio.Protocol):
transport = None
@@ -41,7 +59,7 @@ class PatternWaiter(asyncio.Protocol):
if not self.fut.done():
self.fut.set_result(result)
self.transport.pause_reading()
-
+
def error(self, exc):
if not self.fut.done():
self.fut.set_exception(exc)
@@ -49,7 +67,7 @@ class PatternWaiter(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
-
+
def data_received(self, data):
spawn = self.expecter.spawn
s = spawn._decoder.decode(data)
@@ -67,7 +85,7 @@ class PatternWaiter(asyncio.Protocol):
except Exception as e:
self.expecter.errored()
self.error(e)
-
+
def eof_received(self):
# N.B. If this gets called, async will close the pipe (the spawn object)
# for us
@@ -78,7 +96,7 @@ class PatternWaiter(asyncio.Protocol):
self.error(e)
else:
self.found(index)
-
+
def connection_lost(self, exc):
if isinstance(exc, OSError) and exc.errno == errno.EIO:
# We may get here without eof_received being called, e.g on Linux
diff --git a/pexpect/expect.py b/pexpect/expect.py
index 1c0275b..db376d5 100644
--- a/pexpect/expect.py
+++ b/pexpect/expect.py
@@ -244,7 +244,7 @@ class searcher_re(object):
self.eof_index = -1
self.timeout_index = -1
self._searches = []
- for n, s in zip(list(range(len(patterns))), patterns):
+ for n, s in enumerate(patterns):
if s is EOF:
self.eof_index = n
continue
diff --git a/pexpect/pty_spawn.py b/pexpect/pty_spawn.py
index e0e2b54..58d57f7 100644
--- a/pexpect/pty_spawn.py
+++ b/pexpect/pty_spawn.py
@@ -430,61 +430,83 @@ class spawn(SpawnBase):
available right away then one character will be returned immediately.
It will not wait for 30 seconds for another 99 characters to come in.
- This is a wrapper around os.read(). It uses select.select() to
- implement the timeout. '''
+ On the other hand, if there are bytes available to read immediately,
+ all those bytes will be read (up to the buffer size). So, if the
+ buffer size is 1 megabyte and there is 1 megabyte of data available
+ to read, the buffer will be filled, regardless of timeout.
+
+ This is a wrapper around os.read(). It uses select.select() or
+ select.poll() to implement the timeout. '''
if self.closed:
raise ValueError('I/O operation on closed file.')
+ if self.use_poll:
+ def select(timeout):
+ return poll_ignore_interrupts([self.child_fd], timeout)
+ else:
+ def select(timeout):
+ return select_ignore_interrupts([self.child_fd], [], [], timeout)[0]
+
+ # If there is data available to read right now, read as much as
+ # we can. We do this to increase performance if there are a lot
+ # of bytes to be read. This also avoids calling isalive() too
+ # often. See also:
+ # * https://github.com/pexpect/pexpect/pull/304
+ # * http://trac.sagemath.org/ticket/10295
+ if select(0):
+ try:
+ incoming = super(spawn, self).read_nonblocking(size)
+ except EOF:
+ # Maybe the child is dead: update some attributes in that case
+ self.isalive()
+ raise
+ while len(incoming) < size and select(0):
+ try:
+ incoming += super(spawn, self).read_nonblocking(size - len(incoming))
+ except EOF:
+ # Maybe the child is dead: update some attributes in that case
+ self.isalive()
+ # Don't raise EOF, just return what we read so far.
+ return incoming
+ return incoming
+
if timeout == -1:
timeout = self.timeout
- # Note that some systems such as Solaris do not give an EOF when
- # the child dies. In fact, you can still try to read
- # from the child_fd -- it will block forever or until TIMEOUT.
- # For this case, I test isalive() before doing any reading.
- # If isalive() is false, then I pretend that this is the same as EOF.
if not self.isalive():
- # timeout of 0 means "poll"
- if self.use_poll:
- r = poll_ignore_interrupts([self.child_fd], timeout)
- else:
- r, w, e = select_ignore_interrupts([self.child_fd], [], [], 0)
- if not r:
- self.flag_eof = True
- raise EOF('End Of File (EOF). Braindead platform.')
+ # The process is dead, but there may or may not be data
+ # available to read. Note that some systems such as Solaris
+ # do not give an EOF when the child dies. In fact, you can
+ # still try to read from the child_fd -- it will block
+ # forever or until TIMEOUT. For that reason, it's important
+ # to do this check before calling select() with timeout.
+ if select(0):
+ return super(spawn, self).read_nonblocking(size)
+ self.flag_eof = True
+ raise EOF('End Of File (EOF). Braindead platform.')
elif self.__irix_hack:
# Irix takes a long time before it realizes a child was terminated.
+ # Make sure that the timeout is at least 2 seconds.
# FIXME So does this mean Irix systems are forced to always have
# FIXME a 2 second delay when calling read_nonblocking? That sucks.
- if self.use_poll:
- r = poll_ignore_interrupts([self.child_fd], timeout)
- else:
- r, w, e = select_ignore_interrupts([self.child_fd], [], [], 2)
- if not r and not self.isalive():
- self.flag_eof = True
- raise EOF('End Of File (EOF). Slow platform.')
- if self.use_poll:
- r = poll_ignore_interrupts([self.child_fd], timeout)
- else:
- r, w, e = select_ignore_interrupts(
- [self.child_fd], [], [], timeout
- )
-
- if not r:
- if not self.isalive():
- # Some platforms, such as Irix, will claim that their
- # processes are alive; timeout on the select; and
- # then finally admit that they are not alive.
- self.flag_eof = True
- raise EOF('End of File (EOF). Very slow platform.')
- else:
- raise TIMEOUT('Timeout exceeded.')
+ if timeout is not None and timeout < 2:
+ timeout = 2
- if self.child_fd in r:
+ # Because of the select(0) check above, we know that no data
+ # is available right now. But if a non-zero timeout is given
+ # (possibly timeout=None), we call select() with a timeout.
+ if (timeout != 0) and select(timeout):
return super(spawn, self).read_nonblocking(size)
- raise ExceptionPexpect('Reached an unexpected state.') # pragma: no cover
+ if not self.isalive():
+ # Some platforms, such as Irix, will claim that their
+ # processes are alive; timeout on the select; and
+ # then finally admit that they are not alive.
+ self.flag_eof = True
+ raise EOF('End of File (EOF). Very slow platform.')
+ else:
+ raise TIMEOUT('Timeout exceeded.')
def write(self, s):
'''This is similar to send() except that there is no return value.
@@ -730,10 +752,14 @@ class spawn(SpawnBase):
child process in interact mode is duplicated to the given log.
You may pass in optional input and output filter functions. These
- functions should take a string and return a string. The output_filter
- will be passed all the output from the child process. The input_filter
- will be passed all the keyboard input from the user. The input_filter
- is run BEFORE the check for the escape_character.
+ functions should take bytes array and return bytes array too. Even
+ with ``encoding='utf-8'`` support, meth:`interact` will always pass
+ input_filter and output_filter bytes. You may need to wrap your
+ function to decode and encode back to UTF-8.
+
+ The output_filter will be passed all the output from the child process.
+ The input_filter will be passed all the keyboard input from the user.
+ The input_filter is run BEFORE the check for the escape_character.
Note that if you change the window size of the parent the SIGWINCH
signal will not be passed through to the child. If you want the child
@@ -753,7 +779,7 @@ class spawn(SpawnBase):
signal.signal(signal.SIGWINCH, sigwinch_passthrough)
p.interact()
'''
-
+
# Flush the buffer.
self.write_to_stdout(self.buffer)
self.stdout.flush()
diff --git a/pexpect/pxssh.py b/pexpect/pxssh.py
index f230e04..3d53bd9 100644
--- a/pexpect/pxssh.py
+++ b/pexpect/pxssh.py
@@ -109,7 +109,7 @@ class pxssh (spawn):
username = raw_input('username: ')
password = getpass.getpass('password: ')
s.login (hostname, username, password)
-
+
`debug_command_string` is only for the test suite to confirm that the string
generated for SSH is correct, using this will not allow you to do
anything other than get a string back from `pxssh.pxssh.login()`.
@@ -118,12 +118,12 @@ class pxssh (spawn):
def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None,
logfile=None, cwd=None, env=None, ignore_sighup=True, echo=True,
options={}, encoding=None, codec_errors='strict',
- debug_command_string=False):
+ debug_command_string=False, use_poll=False):
spawn.__init__(self, None, timeout=timeout, maxread=maxread,
searchwindowsize=searchwindowsize, logfile=logfile,
cwd=cwd, env=env, ignore_sighup=ignore_sighup, echo=echo,
- encoding=encoding, codec_errors=codec_errors)
+ encoding=encoding, codec_errors=codec_errors, use_poll=use_poll)
self.name = '<pxssh>'
@@ -154,7 +154,7 @@ class pxssh (spawn):
# Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
#self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
self.force_password = False
-
+
self.debug_command_string = debug_command_string
# User defined SSH options, eg,
@@ -220,7 +220,7 @@ class pxssh (spawn):
can take 12 seconds. Low latency connections are more likely to fail
with a low sync_multiplier. Best case sync time gets worse with a
high sync multiplier (500 ms with default). '''
-
+
# All of these timing pace values are magic.
# I came up with these based on what seemed reliable for
# connecting to a heavily loaded machine I have.
@@ -253,20 +253,19 @@ class pxssh (spawn):
### TODO: This is getting messy and I'm pretty sure this isn't perfect.
### TODO: I need to draw a flow chart for this.
### TODO: Unit tests for SSH tunnels, remote SSH command exec, disabling original prompt sync
- def login (self, server, username, password='', terminal_type='ansi',
+ def login (self, server, username=None, password='', terminal_type='ansi',
original_prompt=r"[#$]", login_timeout=10, port=None,
auto_prompt_reset=True, ssh_key=None, quiet=True,
sync_multiplier=1, check_local_ip=True,
password_regex=r'(?i)(?:password:)|(?:passphrase for key)',
ssh_tunnels={}, spawn_local_ssh=True,
- sync_original_prompt=True, ssh_config=None):
+ sync_original_prompt=True, ssh_config=None, cmd='ssh'):
'''This logs the user into the given server.
- It uses
- 'original_prompt' to try to find the prompt right after login. When it
- finds the prompt it immediately tries to reset the prompt to something
- more easily matched. The default 'original_prompt' is very optimistic
- and is easily fooled. It's more reliable to try to match the original
+ It uses 'original_prompt' to try to find the prompt right after login.
+ When it finds the prompt it immediately tries to reset the prompt to
+ something more easily matched. The default 'original_prompt' is very
+ optimistic and is easily fooled. It's more reliable to try to match the original
prompt as exactly as possible to prevent false matches by server
strings such as the "Message Of The Day". On many systems you can
disable the MOTD on the remote server by creating a zero-length file
@@ -284,27 +283,31 @@ class pxssh (spawn):
uses a unique prompt in the :meth:`prompt` method. If the original prompt is
not reset then this will disable the :meth:`prompt` method unless you
manually set the :attr:`PROMPT` attribute.
-
+
Set ``password_regex`` if there is a MOTD message with `password` in it.
Changing this is like playing in traffic, don't (p)expect it to match straight
away.
-
+
If you require to connect to another SSH server from the your original SSH
connection set ``spawn_local_ssh`` to `False` and this will use your current
session to do so. Setting this option to `False` and not having an active session
will trigger an error.
-
+
Set ``ssh_key`` to a file path to an SSH private key to use that SSH key
for the session authentication.
Set ``ssh_key`` to `True` to force passing the current SSH authentication socket
to the desired ``hostname``.
-
+
Set ``ssh_config`` to a file path string of an SSH client config file to pass that
file to the client to handle itself. You may set any options you wish in here, however
doing so will require you to post extra information that you may not want to if you
run into issues.
+
+ Alter the ``cmd`` to change the ssh client used, or to prepend it with network
+ namespaces. For example ```cmd="ip netns exec vlan2 ssh"``` to execute the ssh in
+ network namespace named ```vlan```.
'''
-
+
session_regex_array = ["(?i)are you sure you want to continue connecting", original_prompt, password_regex, "(?i)permission denied", "(?i)terminal type", TIMEOUT]
session_init_regex_array = []
session_init_regex_array.extend(session_regex_array)
@@ -331,7 +334,7 @@ class pxssh (spawn):
if spawn_local_ssh and not os.path.isfile(ssh_key):
raise ExceptionPxssh('private ssh key does not exist or is not a file.')
ssh_options = ssh_options + ' -i %s' % (ssh_key)
-
+
# SSH tunnels, make sure you know what you're putting into the lists
# under each heading. Do not expect these to open 100% of the time,
# The port you're requesting might be bound.
@@ -354,7 +357,42 @@ class pxssh (spawn):
if spawn_local_ssh==False:
tunnel = quote(str(tunnel))
ssh_options = ssh_options + ' -' + cmd_type + ' ' + str(tunnel)
- cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
+
+ if username is not None:
+ ssh_options = ssh_options + ' -l ' + username
+ elif ssh_config is None:
+ raise TypeError('login() needs either a username or an ssh_config')
+ else: # make sure ssh_config has an entry for the server with a username
+ with open(ssh_config, 'rt') as f:
+ lines = [l.strip() for l in f.readlines()]
+
+ server_regex = r'^Host\s+%s\s*$' % server
+ user_regex = r'^User\s+\w+\s*$'
+ config_has_server = False
+ server_has_username = False
+ for line in lines:
+ if not config_has_server and re.match(server_regex, line, re.IGNORECASE):
+ config_has_server = True
+ elif config_has_server and 'hostname' in line.lower():
+ pass
+ elif config_has_server and 'host' in line.lower():
+ server_has_username = False # insurance
+ break # we have left the relevant section
+ elif config_has_server and re.match(user_regex, line, re.IGNORECASE):
+ server_has_username = True
+ break
+
+ if lines:
+ del line
+
+ del lines
+
+ if not config_has_server:
+ raise TypeError('login() ssh_config has no Host entry for %s' % server)
+ elif not server_has_username:
+ raise TypeError('login() ssh_config has no user entry for %s' % server)
+
+ cmd += " %s %s" % (ssh_options, server)
if self.debug_command_string:
return(cmd)
diff --git a/pexpect/replwrap.py b/pexpect/replwrap.py
index ed0e657..c930f1e 100644
--- a/pexpect/replwrap.py
+++ b/pexpect/replwrap.py
@@ -61,11 +61,11 @@ class REPLWrapper(object):
self.child.expect(orig_prompt)
self.child.sendline(prompt_change)
- def _expect_prompt(self, timeout=-1):
+ def _expect_prompt(self, timeout=-1, async_=False):
return self.child.expect_exact([self.prompt, self.continuation_prompt],
- timeout=timeout)
+ timeout=timeout, async_=async_)
- def run_command(self, command, timeout=-1):
+ def run_command(self, command, timeout=-1, async_=False):
"""Send a command to the REPL, wait for and return output.
:param str command: The command to send. Trailing newlines are not needed.
@@ -75,6 +75,10 @@ class REPLWrapper(object):
:param int timeout: How long to wait for the next prompt. -1 means the
default from the :class:`pexpect.spawn` object (default 30 seconds).
None means to wait indefinitely.
+ :param bool async_: On Python 3.4, or Python 3.3 with asyncio
+ installed, passing ``async_=True`` will make this return an
+ :mod:`asyncio` Future, which you can yield from to get the same
+ result that this method would normally give directly.
"""
# Split up multiline commands and feed them in bit-by-bit
cmdlines = command.splitlines()
@@ -84,6 +88,10 @@ class REPLWrapper(object):
if not cmdlines:
raise ValueError("No command was given")
+ if async_:
+ from ._async import repl_run_command_async
+ return repl_run_command_async(self, cmdlines, timeout)
+
res = []
self.child.sendline(cmdlines[0])
for line in cmdlines[1:]:
diff --git a/tests/fakessh/ssh b/tests/fakessh/ssh
index d3259e4..4a5be1b 100755
--- a/tests/fakessh/ssh
+++ b/tests/fakessh/ssh
@@ -3,13 +3,52 @@ from __future__ import print_function
import getpass
import sys
+import getopt
PY3 = (sys.version_info[0] >= 3)
if not PY3:
input = raw_input
-server = sys.argv[-1]
-if server == 'noserver':
- print('No route to host')
+ssh_usage = "usage: ssh [-2qV] [-c cipher_spec] [-l login_name]\r\n" \
+ + " hostname"
+
+cipher_valid_list = ['aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'arcfour256', 'arcfour128', \
+ 'aes128-cbc','3des-cbc','blowfish-cbc','cast128-cbc','aes192-cbc', \
+ 'aes256-cbc','arcfour']
+
+try:
+ server = sys.argv[-1]
+ if server == 'noserver':
+ print('No route to host')
+ sys.exit(1)
+
+ elif len(sys.argv) < 2:
+ print(ssh_usage)
+ sys.exit(1)
+
+ cipher = ''
+ cipher_list = []
+ fullCmdArguments = sys.argv
+ argumentList = fullCmdArguments[1:]
+ unixOptions = "2qVc:l"
+ arguments, values = getopt.getopt(argumentList, unixOptions)
+ for currentArgument, currentValue in arguments:
+ if currentArgument in ("-2"):
+ pass
+ elif currentArgument in ("-V"):
+ print("Mock SSH client version 0.2")
+ sys.exit(1)
+ elif currentArgument in ("-c"):
+ cipher = currentValue
+ cipher_list = cipher.split(",")
+ for cipher_item in cipher_list:
+ if cipher_item not in cipher_valid_list:
+ print("Unknown cipher type '" + str(cipher_item) + "'")
+ sys.exit(1)
+
+
+except Exception as e:
+ print(ssh_usage)
+ print('error = ' + str(e))
sys.exit(1)
print("Mock SSH client for tests. Do not enter real security info.")
@@ -31,4 +70,5 @@ while True:
elif cmd == 'echo $?':
print(0)
elif cmd in ('exit', 'logout'):
+ print('Closed connection')
break
diff --git a/tests/test_async.py b/tests/test_async.py
index 1cc3236..991890c 100644
--- a/tests/test_async.py
+++ b/tests/test_async.py
@@ -8,6 +8,7 @@ import sys
import unittest
import pexpect
+from pexpect import replwrap
from .PexpectTestCase import PexpectTestCase
def run(coro):
@@ -27,7 +28,7 @@ class AsyncTests(PexpectTestCase):
coro = p.expect('foo', timeout=1, async_=True)
with self.assertRaises(pexpect.TIMEOUT):
run(coro)
-
+
p = pexpect.spawn('cat')
coro = p.expect(['foo', pexpect.TIMEOUT], timeout=1, async_=True)
assert run(coro) == 1
@@ -68,3 +69,29 @@ class AsyncTests(PexpectTestCase):
assert run(p.expect_exact(u'1', async_=True)) == 0
assert p.expect_exact(u'2') == 0
assert run(p.expect_exact(u'3', async_=True)) == 0
+
+ def test_async_replwrap(self):
+ bash = replwrap.bash()
+ coro = bash.run_command("time", async_=True)
+ res = run(coro)
+ assert 'real' in res, res
+
+ def test_async_replwrap_multiline(self):
+ bash = replwrap.bash()
+ coro = bash.run_command("echo '1 2\n3 4'", async_=True)
+ res = run(coro)
+ self.assertEqual(res.strip().splitlines(), ['1 2', '3 4'])
+
+ # Should raise ValueError if input is incomplete
+ coro = bash.run_command("echo '5 6", async_=True)
+ try:
+ run(coro)
+ except ValueError:
+ pass
+ else:
+ assert False, "Didn't raise ValueError for incomplete input"
+
+ # Check that the REPL was reset (SIGINT) after the incomplete input
+ coro = bash.run_command("echo '1 2\n3 4'", async_=True)
+ res = run(coro)
+ self.assertEqual(res.strip().splitlines(), ['1 2', '3 4'])
diff --git a/tests/test_expect.py b/tests/test_expect.py
index 795518a..2c74744 100755
--- a/tests/test_expect.py
+++ b/tests/test_expect.py
@@ -411,7 +411,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
def test_before_across_chunks(self):
# https://github.com/pexpect/pexpect/issues/478
child = pexpect.spawn(
- '''/bin/bash -c "openssl rand -base64 {} | head -500 | nl --number-format=rz --number-width=5 2>&1 ; echo 'PATTERN!!!'"'''.format(1024 * 1024 * 2),
+ '''/bin/bash -c "openssl rand -base64 {} 2>/dev/null | head -500 | nl --number-format=rz --number-width=5 2>&1 ; echo 'PATTERN!!!'"'''.format(1024 * 1024 * 2),
searchwindowsize=128
)
child.expect(['PATTERN'])
diff --git a/tests/test_pxssh.py b/tests/test_pxssh.py
index 5f82302..0d49b23 100644
--- a/tests/test_pxssh.py
+++ b/tests/test_pxssh.py
@@ -87,11 +87,82 @@ class PxsshTestCase(SSHTestBase):
def test_ssh_config_passing_string(self):
ssh = pxssh.pxssh(debug_command_string=True)
- (temp_file,config_path) = tempfile.mkstemp()
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
string = ssh.login('server', 'me', password='s3cret', spawn_local_ssh=False, ssh_config=config_path)
if not '-F '+config_path in string:
assert False, 'String generated from SSH config passing is incorrect.'
+ def test_username_or_ssh_config(self):
+ try:
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ string = ssh.login('server')
+ raise AssertionError('Should have failed due to missing username and missing ssh_config.')
+ except TypeError:
+ pass
+
+ def test_ssh_config_user(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ temp_file.write(b'HosT server\n'
+ b'UsEr me\n'
+ b'hOSt not-server\n')
+ temp_file.seek(0)
+ string = ssh.login('server', ssh_config=config_path)
+
+ def test_ssh_config_no_username_empty_config(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ try:
+ string = ssh.login('server', ssh_config=config_path)
+ raise AssertionError('Should have failed due to no Host.')
+ except TypeError:
+ pass
+
+ def test_ssh_config_wrong_Host(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ temp_file.write(b'Host not-server\n'
+ b'Host also-not-server\n')
+ temp_file.seek(0)
+ try:
+ string = ssh.login('server', ssh_config=config_path)
+ raise AssertionError('Should have failed due to no matching Host.')
+ except TypeError:
+ pass
+
+ def test_ssh_config_no_user(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ temp_file.write(b'Host server\n'
+ b'Host not-server\n')
+ temp_file.seek(0)
+ try:
+ string = ssh.login('server', ssh_config=config_path)
+ raise AssertionError('Should have failed due to no user.')
+ except TypeError:
+ pass
+
+ def test_ssh_config_empty_user(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ temp_file = tempfile.NamedTemporaryFile()
+ config_path = temp_file.name
+ temp_file.write(b'Host server\n'
+ b'user \n'
+ b'Host not-server\n')
+ temp_file.seek(0)
+ try:
+ string = ssh.login('server', ssh_config=config_path)
+ raise AssertionError('Should have failed due to empty user.')
+ except TypeError:
+ pass
+
def test_ssh_key_string(self):
ssh = pxssh.pxssh(debug_command_string=True)
confirmation_strings = 0
@@ -105,7 +176,8 @@ class PxsshTestCase(SSHTestBase):
assert False, 'String generated from forcing the SSH agent sock is incorrect.'
confirmation_strings = 0
- (temp_file,ssh_key) = tempfile.mkstemp()
+ temp_file = tempfile.NamedTemporaryFile()
+ ssh_key = temp_file.name
confirmation_array = [' -i '+ssh_key]
string = ssh.login('server', 'me', password='s3cret', ssh_key=ssh_key)
for confirmation in confirmation_array:
@@ -115,6 +187,86 @@ class PxsshTestCase(SSHTestBase):
if confirmation_strings!=len(confirmation_array):
assert False, 'String generated from adding an SSH key is incorrect.'
+ def test_custom_ssh_cmd_debug(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \
+ + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \
+ + 'aes256-cbc,arcfour'
+ confirmation_strings = 0
+ confirmation_array = [cipher_string, '-2']
+ string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
+ for confirmation in confirmation_array:
+ if confirmation in string:
+ confirmation_strings+=1
+
+ if confirmation_strings!=len(confirmation_array):
+ assert False, 'String generated for custom ssh client command is incorrect.'
+
+ def test_custom_ssh_cmd_debug(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \
+ + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \
+ + 'aes256-cbc,arcfour'
+ confirmation_strings = 0
+ confirmation_array = [cipher_string, '-2']
+ string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
+ for confirmation in confirmation_array:
+ if confirmation in string:
+ confirmation_strings+=1
+
+ if confirmation_strings!=len(confirmation_array):
+ assert False, 'String generated for custom ssh client command is incorrect.'
+
+ def test_failed_custom_ssh_cmd_debug(self):
+ ssh = pxssh.pxssh(debug_command_string=True)
+ cipher_string = '-c invalid_cipher'
+ confirmation_strings = 0
+ confirmation_array = [cipher_string, '-2']
+ string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
+ for confirmation in confirmation_array:
+ if confirmation in string:
+ confirmation_strings+=1
+
+ if confirmation_strings!=len(confirmation_array):
+ assert False, 'String generated for custom ssh client command is incorrect.'
+
+ def test_custom_ssh_cmd(self):
+ try:
+ ssh = pxssh.pxssh()
+ cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \
+ + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \
+ + 'aes256-cbc,arcfour'
+ result = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
+
+ ssh.PROMPT = r'Closed connection'
+ ssh.sendline('exit')
+ ssh.prompt(timeout=5)
+ string = str(ssh.before) + str(ssh.after)
+
+ if 'Closed connection' not in string:
+ assert False, 'should have logged into Mock SSH client and exited'
+ except pxssh.ExceptionPxssh as e:
+ assert False, 'should not have raised exception, pxssh.ExceptionPxssh'
+ else:
+ pass
+
+ def test_failed_custom_ssh_cmd(self):
+ try:
+ ssh = pxssh.pxssh()
+ cipher_string = '-c invalid_cipher'
+ result = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
+
+ ssh.PROMPT = r'Closed connection'
+ ssh.sendline('exit')
+ ssh.prompt(timeout=5)
+ string = str(ssh.before) + str(ssh.after)
+
+ if 'Closed connection' not in string:
+ assert False, 'should not have completed logging into Mock SSH client and exited'
+ except pxssh.ExceptionPxssh as e:
+ pass
+ else:
+ assert False, 'should have raised exception, pxssh.ExceptionPxssh'
if __name__ == '__main__':
unittest.main()
diff --git a/tests/test_replwrap.py b/tests/test_replwrap.py
index d0fe8e2..06ca07b 100644
--- a/tests/test_replwrap.py
+++ b/tests/test_replwrap.py
@@ -24,8 +24,15 @@ class REPLWrapTestCase(unittest.TestCase):
def test_bash(self):
bash = replwrap.bash()
- res = bash.run_command("time")
- assert 'real' in res, res
+ res = bash.run_command("alias")
+ assert 'alias' in res, res
+
+ try:
+ bash.run_command('')
+ except ValueError:
+ pass
+ else:
+ assert False, "Didn't raise ValueError for empty input"
def test_pager_as_cat(self):
" PAGER is set to cat, to prevent timeout in ``man sleep``. "
@@ -78,7 +85,7 @@ class REPLWrapTestCase(unittest.TestCase):
self.assertEqual(res.strip().splitlines(), ['1 2', '3 4'])
def test_existing_spawn(self):
- child = pexpect.spawn("bash", timeout=5, echo=False, encoding='utf-8')
+ child = pexpect.spawn("bash", timeout=5, encoding='utf-8')
repl = replwrap.REPLWrapper(child, re.compile('[$#]'),
"PS1='{0}' PS2='{1}' "
"PROMPT_COMMAND=''")