diff options
44 files changed, 688 insertions, 293 deletions
@@ -13,7 +13,7 @@ Don Libes' Expect. Pexpect allows your script to spawn a child application and control it as if a human were typing commands. Pexpect can be used for automating interactive applications such as ssh, ftp, -passwd, telnet, etc. It can be used to a automate setup scripts for duplicating +passwd, telnet, etc. It can be used to automate setup scripts for duplicating software package installations on different servers. It can be used for automated software testing. Pexpect is in the spirit of Don Libes' Expect, but Pexpect is pure Python. diff --git a/doc/api/pxssh.rst b/doc/api/pxssh.rst index c9c80c6..e82d950 100644 --- a/doc/api/pxssh.rst +++ b/doc/api/pxssh.rst @@ -5,9 +5,16 @@ pxssh - control an SSH session *pxssh* is a screen-scraping wrapper around the SSH command on your system. In many cases, you should consider using - `Paramiko <https://github.com/paramiko/paramiko>`_ instead. + `Paramiko <https://github.com/paramiko/paramiko>`_ or + `RedExpect <https://github.com/Red-M/RedExpect>`_ instead. Paramiko is a Python module which speaks the SSH protocol directly, so it doesn't have the extra complexity of running a local subprocess. + RedExpect is very similar to pxssh except that it reads and writes directly + into an SSH session all done via Python with all the SSH protocol in C, + additionally it is written for communicating to SSH servers that are not just + Linux machines. Meaning that it is extremely fast in comparison to Paramiko + and already has the familiar expect API. In most cases RedExpect and pxssh + code should be fairly interchangeable. .. automodule:: pexpect.pxssh @@ -27,7 +34,7 @@ pxssh class .. attribute:: force_password - If this is set to True, public key authentication is disabled, forcing the + If this is set to ``True``, public key authentication is disabled, forcing the server to ask for a password. Note that the sysadmin can disable password logins, in which case this won't work. diff --git a/doc/commonissues.rst b/doc/commonissues.rst index f60085e..c74ea7a 100644 --- a/doc/commonissues.rst +++ b/doc/commonissues.rst @@ -41,10 +41,10 @@ back to the TTY. I would call this an SSH bug. Pexpect now automatically adds a short delay before sending data to a child process. This more closely mimics what happens in the usual human-to-app -interaction. The delay can be tuned with the ``delaybeforesend`` attribute of the -spawn class. In general, this fixes the problem for everyone and so this should -not be an issue for most users. For some applications you might with to turn it -off:: +interaction. The delay can be tuned with the ``delaybeforesend`` attribute of +objects of the spawn class. In general, this fixes the problem for everyone and so +this should not be an issue for most users. For some applications you might with +to turn it off:: child = pexpect.spawn ("ssh user@example.com") child.delaybeforesend = None diff --git a/doc/conf.py b/doc/conf.py index a734147..c7c9589 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -16,7 +16,7 @@ import sys, os # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. -sys.path.insert(0, os.path.abspath('sphinxext')) +#sys.path.insert(0, os.path.abspath('sphinxext')) # -- General configuration ----------------------------------------------------- @@ -26,7 +26,8 @@ sys.path.insert(0, os.path.abspath('sphinxext')) # Add any Sphinx extension module names here, as strings. They can be extensions # coming with Sphinx (named 'sphinx.ext.*') or your custom ones. extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', - 'sphinx.ext.viewcode', 'github', # for easy GitHub links + 'sphinx.ext.viewcode', + 'sphinxcontrib_github_alt', # for easy GitHub links ] github_project_url = "https://github.com/pexpect/pexpect" @@ -52,7 +53,7 @@ copyright = u'2013, Noah Spurrier and contributors' # built documents. # # The short X.Y version. -version = '4.6' +version = '4.8' # The full version, including alpha/beta/rc tags. release = version diff --git a/doc/examples.rst b/doc/examples.rst index 6338b5c..be48f6a 100644 --- a/doc/examples.rst +++ b/doc/examples.rst @@ -48,7 +48,7 @@ Examples. `python.py <https://github.com/pexpect/pexpect/blob/master/examples/python.py>`_ This starts the python interpreter and prints the greeting message - backwards. It then gives the user iteractive control of Python. It's + backwards. It then gives the user interactive control of Python. It's pretty useless! `ssh_tunnel.py <https://github.com/pexpect/pexpect/blob/master/examples/ssh_tunnel.py>`_ diff --git a/doc/history.rst b/doc/history.rst index 41a5918..68b5af1 100644 --- a/doc/history.rst +++ b/doc/history.rst @@ -4,13 +4,47 @@ History Releases -------- +Version 4.8 +``````````` + +* Returned behavior of searchwindowsize to that in 4.3 and earlier (searches + are only done within the search window) (:ghpull:`579`). +* Fixed a bug truncating ``before`` attribute after a timeout (:ghpull:`579`). +* Fixed a bug where a search could be less than ``searchwindowsize`` if it + was increased between calls (:ghpull:`579`). +* Minor test cleanups to improve portability (:ghpull:`580`) (:ghpull:`581`) + (:ghpull:`582`) (:ghpull:`583`) (:ghpull:`584`) (:ghpull:`585`). +* Disable chaining of timeout and EOF exceptions (:ghpull:`606`). +* Allow traceback included snippet length to be configured via + ``str_last_chars`` rather than always 100 (:ghpull:`598`). +* Python 3 warning added to interact.py (:ghpull:`537`). +* Several doc updates. + +Version 4.7 +``````````` + +* The :meth:`.pxssh.login` method now no longer requires a username if an ssh + config is provided and will raise an error if neither are provided. + (:ghpull:`562`). +* The :meth:`.pxssh.login` method now supports providing your own ``ssh`` + command via the ``cmd`` parameter. + (:ghpull:`528`) (:ghpull:`563`). +* :class:`.pxssh` now supports the ``use_poll`` parameter which is passed into :meth:`.pexpect.spawn` + (:ghpull:`542`). +* Minor bug fix with ``ssh_config``. + (:ghpull:`498`). +* :meth:`.replwrap.run_command` now has async support via an ``async_`` parameter. + (:ghpull:`501`). +* :meth:`.pexpect.spawn` will now read additional bytes if able up to a buffer limit. + (:ghpull:`304`). + Version 4.6 ``````````` * The :meth:`.pxssh.login` method now supports an ``ssh_config`` parameter, which can be used to specify a file path to an SSH config file (:ghpull:`490`). -* Improved compatability for the ``crlf`` parameter of :class:`~.PopenSpawn` +* Improved compatibility for the ``crlf`` parameter of :class:`~.PopenSpawn` (:ghpull:`493`) * Fixed an issue in read timeout handling when using :class:`~.spawn` and :class:`~.fdspawn` with the ``use_poll`` parameter (:ghpull:`492`). diff --git a/doc/index.rst b/doc/index.rst index 0bcf862..83bb7f2 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -16,7 +16,7 @@ spawn a child application and control it as if a human were typing commands. Pexpect can be used for automating interactive applications such as -ssh, ftp, passwd, telnet, etc. It can be used to a automate setup +ssh, ftp, passwd, telnet, etc. It can be used to automate setup scripts for duplicating software package installations on different servers. It can be used for automated software testing. Pexpect is in the spirit of Don Libes' Expect, but Pexpect is pure Python. Unlike diff --git a/doc/overview.rst b/doc/overview.rst index aeb8887..8e47ed0 100644 --- a/doc/overview.rst +++ b/doc/overview.rst @@ -162,7 +162,7 @@ to perform a regular expression match on a stream. Regular expressions need to look ahead. With a stream it is hard to look ahead because the process generating the stream may not be finished. There is no way to know if the process has paused momentarily or is finished and waiting for you. Pexpect must -implicitly always do a NON greedy match (minimal) at the end of a input. +implicitly always do a NON greedy match (minimal) at the end of input. Pexpect compiles all regular expressions with the :data:`re.DOTALL` flag. With the :data:`~re.DOTALL` flag, a ``"."`` will match a newline. @@ -264,6 +264,6 @@ When run by ``PopenSpawn``, they may behave differently. .. seealso:: - `winpexpect <https://pypi.python.org/pypi/winpexpect>`__ and `wexpect <https://gist.github.com/anthonyeden/8488763>`__ - Two unmaintained pexpect-like modules for Windows, which work with a - hidden console. + `wexpect <https://pypi.org/project/wexpect/>`__ is an + alternative for Windows, which works with a hidden console. + diff --git a/doc/requirements.txt b/doc/requirements.txt index 57ebb2d..b2c914e 100644 --- a/doc/requirements.txt +++ b/doc/requirements.txt @@ -1 +1,3 @@ ptyprocess +sphinx +sphinxcontrib_github_alt diff --git a/doc/sphinxext/github.py b/doc/sphinxext/github.py deleted file mode 100644 index 519e146..0000000 --- a/doc/sphinxext/github.py +++ /dev/null @@ -1,155 +0,0 @@ -"""Define text roles for GitHub - -* ghissue - Issue -* ghpull - Pull Request -* ghuser - User - -Adapted from bitbucket example here: -https://bitbucket.org/birkenfeld/sphinx-contrib/src/tip/bitbucket/sphinxcontrib/bitbucket.py - -Authors -------- - -* Doug Hellmann -* Min RK -""" -# -# Original Copyright (c) 2010 Doug Hellmann. All rights reserved. -# - -from docutils import nodes, utils -from docutils.parsers.rst.roles import set_classes - -def make_link_node(rawtext, app, type, slug, options): - """Create a link to a github resource. - - :param rawtext: Text being replaced with link node. - :param app: Sphinx application context - :param type: Link type (issues, changeset, etc.) - :param slug: ID of the thing to link to - :param options: Options dictionary passed to role func. - """ - - try: - base = app.config.github_project_url - if not base: - raise AttributeError - if not base.endswith('/'): - base += '/' - except AttributeError as err: - raise ValueError('github_project_url configuration value is not set (%s)' % str(err)) - - ref = base + type + '/' + slug + '/' - set_classes(options) - prefix = "#" - if type == 'pull': - prefix = "PR " + prefix - node = nodes.reference(rawtext, prefix + utils.unescape(slug), refuri=ref, - **options) - return node - -def ghissue_role(name, rawtext, text, lineno, inliner, options={}, content=[]): - """Link to a GitHub issue. - - Returns 2 part tuple containing list of nodes to insert into the - document and a list of system messages. Both are allowed to be - empty. - - :param name: The role name used in the document. - :param rawtext: The entire markup snippet, with role. - :param text: The text marked with the role. - :param lineno: The line number where rawtext appears in the input. - :param inliner: The inliner instance that called us. - :param options: Directive options for customization. - :param content: The directive content for customization. - """ - - try: - issue_num = int(text) - if issue_num <= 0: - raise ValueError - except ValueError: - msg = inliner.reporter.error( - 'GitHub issue number must be a number greater than or equal to 1; ' - '"%s" is invalid.' % text, line=lineno) - prb = inliner.problematic(rawtext, rawtext, msg) - return [prb], [msg] - app = inliner.document.settings.env.app - #app.info('issue %r' % text) - if 'pull' in name.lower(): - category = 'pull' - elif 'issue' in name.lower(): - category = 'issues' - else: - msg = inliner.reporter.error( - 'GitHub roles include "ghpull" and "ghissue", ' - '"%s" is invalid.' % name, line=lineno) - prb = inliner.problematic(rawtext, rawtext, msg) - return [prb], [msg] - node = make_link_node(rawtext, app, category, str(issue_num), options) - return [node], [] - -def ghuser_role(name, rawtext, text, lineno, inliner, options={}, content=[]): - """Link to a GitHub user. - - Returns 2 part tuple containing list of nodes to insert into the - document and a list of system messages. Both are allowed to be - empty. - - :param name: The role name used in the document. - :param rawtext: The entire markup snippet, with role. - :param text: The text marked with the role. - :param lineno: The line number where rawtext appears in the input. - :param inliner: The inliner instance that called us. - :param options: Directive options for customization. - :param content: The directive content for customization. - """ - app = inliner.document.settings.env.app - #app.info('user link %r' % text) - ref = 'https://www.github.com/' + text - node = nodes.reference(rawtext, text, refuri=ref, **options) - return [node], [] - -def ghcommit_role(name, rawtext, text, lineno, inliner, options={}, content=[]): - """Link to a GitHub commit. - - Returns 2 part tuple containing list of nodes to insert into the - document and a list of system messages. Both are allowed to be - empty. - - :param name: The role name used in the document. - :param rawtext: The entire markup snippet, with role. - :param text: The text marked with the role. - :param lineno: The line number where rawtext appears in the input. - :param inliner: The inliner instance that called us. - :param options: Directive options for customization. - :param content: The directive content for customization. - """ - app = inliner.document.settings.env.app - #app.info('user link %r' % text) - try: - base = app.config.github_project_url - if not base: - raise AttributeError - if not base.endswith('/'): - base += '/' - except AttributeError as err: - raise ValueError('github_project_url configuration value is not set (%s)' % str(err)) - - ref = base + text - node = nodes.reference(rawtext, text[:6], refuri=ref, **options) - return [node], [] - - -def setup(app): - """Install the plugin. - - :param app: Sphinx application context. - """ - app.info('Initializing GitHub plugin') - app.add_role('ghissue', ghissue_role) - app.add_role('ghpull', ghissue_role) - app.add_role('ghuser', ghuser_role) - app.add_role('ghcommit', ghcommit_role) - app.add_config_value('github_project_url', None, 'env') - return diff --git a/examples/astat.py b/examples/astat.py index abba1be..50937a3 100755 --- a/examples/astat.py +++ b/examples/astat.py @@ -38,7 +38,7 @@ import os import sys import getopt import getpass -import pxssh +from pexpect import pxssh try: diff --git a/examples/chess.py b/examples/chess.py index f97a3a9..a15fd94 100755 --- a/examples/chess.py +++ b/examples/chess.py @@ -27,7 +27,7 @@ from __future__ import print_function from __future__ import absolute_import import pexpect -import ANSI +from pexpect import ANSI REGEX_MOVE = r'(?:[a-z]|\x1b\[C)(?:[0-9]|\x1b\[C)(?:[a-z]|\x1b\[C)(?:[0-9]|\x1b\[C)' REGEX_MOVE_PART = r'(?:[0-9]|\x1b\[C)(?:[a-z]|\x1b\[C)(?:[0-9]|\x1b\[C)' diff --git a/examples/chess2.py b/examples/chess2.py index b92509e..f8a7c24 100755 --- a/examples/chess2.py +++ b/examples/chess2.py @@ -27,7 +27,7 @@ from __future__ import print_function from __future__ import absolute_import import pexpect -import ANSI +from pexpect import ANSI import sys import time diff --git a/examples/chess3.py b/examples/chess3.py index 2c087b0..e3e6200 100755 --- a/examples/chess3.py +++ b/examples/chess3.py @@ -27,7 +27,7 @@ from __future__ import print_function from __future__ import absolute_import import pexpect -import ANSI +from pexpect import ANSI REGEX_MOVE = r'(?:[a-z]|\x1b\[C)(?:[0-9]|\x1b\[C)(?:[a-z]|\x1b\[C)(?:[0-9]|\x1b\[C)' REGEX_MOVE_PART = r'(?:[0-9]|\x1b\[C)(?:[a-z]|\x1b\[C)(?:[0-9]|\x1b\[C)' diff --git a/examples/hive.py b/examples/hive.py index 1b7bcbf..0d34b03 100755 --- a/examples/hive.py +++ b/examples/hive.py @@ -94,7 +94,7 @@ import readline import atexit try: import pexpect - import pxssh + from pexpect import pxssh except ImportError: sys.stderr.write("You do not have 'pexpect' installed.\n") sys.stderr.write("On Ubuntu you need the 'python-pexpect' package.\n") @@ -436,7 +436,7 @@ def resync (hive, hive_names, timeout=2, max_attempts=5): def parse_host_connect_string (hcs): '''This parses a host connection string in the form - username:password@hostname:port. All fields are options expcet hostname. A + username:password@hostname:port. All fields are optional except hostname. A dictionary is returned with all four keys. Keys that were not included are set to empty strings ''. Note that if your password has the '@' character then you must backslash escape it. ''' diff --git a/examples/terminal_emulation.py b/examples/terminal_emulation.py new file mode 100755 index 0000000..802f9e8 --- /dev/null +++ b/examples/terminal_emulation.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python + +'''These examples show how to integrate pexpect with pyte, an ANSI terminal +emulator. + +These examples were taken from: +https://byexamples.github.io/byexample + +We will execute three commands: + - an 'echo' of a colored message to show how the ANSI colors can be removed. + - an 'echo' of a very large message to show how pyte emulates the terminal + geometry + - a 'less' of a very small file to show how pyte handles not only + the terminal geometry but also how interprets ANSI commands that control + the position of the cursor. + +See also https://github.com/pexpect/pexpect/issues/587 + +PEXPECT LICENSE + + This license is approved by the OSI and FSF as GPL-compatible. + http://opensource.org/licenses/isc-license.txt + + Copyright (c) 2012, Noah Spurrier <noah@noah.org> + PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY + PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE + COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES. + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +''' + +from __future__ import print_function +from __future__ import absolute_import +from __future__ import unicode_literals + +import pexpect +import pyte +import os + +# The geometry of the terminal. Typically this is 24x80 +# but we are going to us set a much smaller terminal +# to show how to change the default. +ROWS, COLS = 10, 40 + +# We create the Screen with the correct geometry and +# a Stream to process the output coming from pexpect. +screen = pyte.Screen(COLS, ROWS) +stream = pyte.Stream(screen) + +# Spawn a process using pexpect.spawn as usual +# with a particularity: it sets the geometry of the terminal +# using the environment variables *and* using the 'dimensions' +# parameter of pexpect.spawn. +# This is needed because no all the program honors the geometry +# set by pexpect or by the env vars. +def spawn_process(cmd): + env = os.environ.copy() + env.update({'LINES': str(ROWS), 'COLUMNS': str(COLS)}) + + return pexpect.spawn(cmd, echo=False, encoding='utf-8', dimensions=(ROWS, COLS), env=env) + +# Send the raw output to pyte.Stream and get the emulated output +# from pyte.Screen. +# In each call we *reset* the display so we don't get the same +# emulated output twice. +# +# Pyte emulates the whole terminal so it will return us ROWS rows +# of each COLS columns each one completed with spaces. +# +# Optionally we strip the whitespace on the right and any empty line +def emulate_ansi_terminal(raw_output, clean=True): + stream.feed(raw_output) + + lines = screen.display + screen.reset() + + if clean: + lines = (line.rstrip() for line in lines) + lines = (line for line in lines if line) + + return '\n'.join(lines) + +def pprint(out): + print("-" * COLS) + print(out) + print("-" * COLS) + +print("\nFirst example: echo a message with ANSI color sequences.") +child = spawn_process(r'echo -e "\033[31mThis message should not be in red\033[0m"') +child.expect(pexpect.EOF) +out = emulate_ansi_terminal(child.before) + +print("This should *not* print any escape sequence,", + "those were emulated and discarded by pyte.\n") +pprint(out) + +print("\nSecond example: echo a very large message.") +msg = ("aaaabbbb" * 8) +child = spawn_process('echo "%s"' % msg) +child.expect(pexpect.EOF) +out = emulate_ansi_terminal(child.before) + +print("This should print the message in *two* lines because we", + "configured a terminal very small and the message will", + "not fit in one line.\n") +pprint(out) + + +print("\nThird example: run the less program.") +child = spawn_process('''bash -c "head -n7 '%s' | less"''' % __file__) +child.expect(pexpect.TIMEOUT, timeout=5) +out = emulate_ansi_terminal(child.before, clean=False) + +pprint(out) diff --git a/examples/topip.py b/examples/topip.py index 64dac30..9cf3824 100755 --- a/examples/topip.py +++ b/examples/topip.py @@ -70,7 +70,7 @@ from __future__ import unicode_literals # See http://pexpect.sourceforge.net/ import pexpect -import pxssh +from pexpect import pxssh import os import sys import time diff --git a/pexpect/__init__.py b/pexpect/__init__.py index 2a18d19..b3a64b6 100644 --- a/pexpect/__init__.py +++ b/pexpect/__init__.py @@ -1,6 +1,6 @@ '''Pexpect is a Python module for spawning child applications and controlling them automatically. Pexpect can be used for automating interactive applications -such as ssh, ftp, passwd, telnet, etc. It can be used to a automate setup +such as ssh, ftp, passwd, telnet, etc. It can be used to automate setup scripts for duplicating software package installations on different servers. It can be used for automated software testing. Pexpect is in the spirit of Don Libes' Expect, but Pexpect is pure Python. Other Expect-like modules for Python @@ -75,7 +75,7 @@ if sys.platform != 'win32': from .pty_spawn import spawn, spawnu from .run import run, runu -__version__ = '4.6.0' +__version__ = '4.8.0' __revision__ = '' __all__ = ['ExceptionPexpect', 'EOF', 'TIMEOUT', 'spawn', 'spawnu', 'run', 'runu', 'which', 'split_command_line', '__version__', '__revision__'] diff --git a/pexpect/_async.py b/pexpect/_async.py index ca2044e..dfbfeef 100644 --- a/pexpect/_async.py +++ b/pexpect/_async.py @@ -8,10 +8,7 @@ from pexpect import EOF def expect_async(expecter, timeout=None): # First process data that was previously read - if it maches, we don't need # async stuff. - previously_read = expecter.spawn.buffer - expecter.spawn._buffer = expecter.spawn.buffer_type() - expecter.spawn._before = expecter.spawn.buffer_type() - idx = expecter.new_data(previously_read) + idx = expecter.existing_data() if idx is not None: return idx if not expecter.spawn.async_pw_transport: @@ -74,6 +71,7 @@ class PatternWaiter(asyncio.Protocol): spawn._log(s, 'read') if self.fut.done(): + spawn._before.write(s) spawn._buffer.write(s) return diff --git a/pexpect/expect.py b/pexpect/expect.py index db376d5..d3409db 100644 --- a/pexpect/expect.py +++ b/pexpect/expect.py @@ -6,45 +6,101 @@ class Expecter(object): def __init__(self, spawn, searcher, searchwindowsize=-1): self.spawn = spawn self.searcher = searcher + # A value of -1 means to use the figure from spawn, which should + # be None or a positive number. if searchwindowsize == -1: searchwindowsize = spawn.searchwindowsize self.searchwindowsize = searchwindowsize + self.lookback = None + if hasattr(searcher, 'longest_string'): + self.lookback = searcher.longest_string - def new_data(self, data): + def do_search(self, window, freshlen): spawn = self.spawn searcher = self.searcher - - pos = spawn._buffer.tell() - spawn._buffer.write(data) - spawn._before.write(data) - - # determine which chunk of data to search; if a windowsize is - # specified, this is the *new* data + the preceding <windowsize> bytes - if self.searchwindowsize: - spawn._buffer.seek(max(0, pos - self.searchwindowsize)) - window = spawn._buffer.read(self.searchwindowsize + len(data)) - else: - # otherwise, search the whole buffer (really slow for large datasets) - window = spawn.buffer - index = searcher.search(window, len(data)) + if freshlen > len(window): + freshlen = len(window) + index = searcher.search(window, freshlen, self.searchwindowsize) if index >= 0: spawn._buffer = spawn.buffer_type() spawn._buffer.write(window[searcher.end:]) - spawn.before = spawn._before.getvalue()[0:-(len(window) - searcher.start)] + spawn.before = spawn._before.getvalue()[ + 0:-(len(window) - searcher.start)] spawn._before = spawn.buffer_type() - spawn.after = window[searcher.start: searcher.end] + spawn._before.write(window[searcher.end:]) + spawn.after = window[searcher.start:searcher.end] spawn.match = searcher.match spawn.match_index = index # Found a match return index - elif self.searchwindowsize: - spawn._buffer = spawn.buffer_type() - spawn._buffer.write(window) + elif self.searchwindowsize or self.lookback: + maintain = self.searchwindowsize or self.lookback + if spawn._buffer.tell() > maintain: + spawn._buffer = spawn.buffer_type() + spawn._buffer.write(window[-maintain:]) + + def existing_data(self): + # First call from a new call to expect_loop or expect_async. + # self.searchwindowsize may have changed. + # Treat all data as fresh. + spawn = self.spawn + before_len = spawn._before.tell() + buf_len = spawn._buffer.tell() + freshlen = before_len + if before_len > buf_len: + if not self.searchwindowsize: + spawn._buffer = spawn.buffer_type() + window = spawn._before.getvalue() + spawn._buffer.write(window) + elif buf_len < self.searchwindowsize: + spawn._buffer = spawn.buffer_type() + spawn._before.seek( + max(0, before_len - self.searchwindowsize)) + window = spawn._before.read() + spawn._buffer.write(window) + else: + spawn._buffer.seek(max(0, buf_len - self.searchwindowsize)) + window = spawn._buffer.read() + else: + if self.searchwindowsize: + spawn._buffer.seek(max(0, buf_len - self.searchwindowsize)) + window = spawn._buffer.read() + else: + window = spawn._buffer.getvalue() + return self.do_search(window, freshlen) + + def new_data(self, data): + # A subsequent call, after a call to existing_data. + spawn = self.spawn + freshlen = len(data) + spawn._before.write(data) + if not self.searchwindowsize: + if self.lookback: + # search lookback + new data. + old_len = spawn._buffer.tell() + spawn._buffer.write(data) + spawn._buffer.seek(max(0, old_len - self.lookback)) + window = spawn._buffer.read() + else: + # copy the whole buffer (really slow for large datasets). + spawn._buffer.write(data) + window = spawn.buffer + else: + if len(data) >= self.searchwindowsize or not spawn._buffer.tell(): + window = data[-self.searchwindowsize:] + spawn._buffer = spawn.buffer_type() + spawn._buffer.write(window[-self.searchwindowsize:]) + else: + spawn._buffer.write(data) + new_len = spawn._buffer.tell() + spawn._buffer.seek(max(0, new_len - self.searchwindowsize)) + window = spawn._buffer.read() + return self.do_search(window, freshlen) def eof(self, err=None): spawn = self.spawn - spawn.before = spawn.buffer + spawn.before = spawn._before.getvalue() spawn._buffer = spawn.buffer_type() spawn._before = spawn.buffer_type() spawn.after = EOF @@ -60,12 +116,15 @@ class Expecter(object): msg += '\nsearcher: %s' % self.searcher if err is not None: msg = str(err) + '\n' + msg - raise EOF(msg) - + + exc = EOF(msg) + exc.__cause__ = None # in Python 3.x we can use "raise exc from None" + raise exc + def timeout(self, err=None): spawn = self.spawn - spawn.before = spawn.buffer + spawn.before = spawn._before.getvalue() spawn.after = TIMEOUT index = self.searcher.timeout_index if index >= 0: @@ -79,15 +138,18 @@ class Expecter(object): msg += '\nsearcher: %s' % self.searcher if err is not None: msg = str(err) + '\n' + msg - raise TIMEOUT(msg) + + exc = TIMEOUT(msg) + exc.__cause__ = None # in Python 3.x we can use "raise exc from None" + raise exc def errored(self): spawn = self.spawn - spawn.before = spawn.buffer + spawn.before = spawn._before.getvalue() spawn.after = None spawn.match = None spawn.match_index = None - + def expect_loop(self, timeout=-1): """Blocking expect""" spawn = self.spawn @@ -96,14 +158,10 @@ class Expecter(object): end_time = time.time() + timeout try: - incoming = spawn.buffer - spawn._buffer = spawn.buffer_type() - spawn._before = spawn.buffer_type() + idx = self.existing_data() + if idx is not None: + return idx while True: - idx = self.new_data(incoming) - # Keep reading until exception or return. - if idx is not None: - return idx # No match at this point if (timeout is not None) and (timeout < 0): return self.timeout() @@ -111,6 +169,10 @@ class Expecter(object): incoming = spawn.read_nonblocking(spawn.maxread, timeout) if self.spawn.delayafterread is not None: time.sleep(self.spawn.delayafterread) + idx = self.new_data(incoming) + # Keep reading until exception or return. + if idx is not None: + return idx if timeout is not None: timeout = end_time - time.time() except EOF as e: @@ -148,6 +210,7 @@ class searcher_string(object): self.eof_index = -1 self.timeout_index = -1 self._strings = [] + self.longest_string = 0 for n, s in enumerate(strings): if s is EOF: self.eof_index = n @@ -156,6 +219,8 @@ class searcher_string(object): self.timeout_index = n continue self._strings.append((n, s)) + if len(s) > self.longest_string: + self.longest_string = len(s) def __str__(self): '''This returns a human-readable string that represents the state of diff --git a/pexpect/popen_spawn.py b/pexpect/popen_spawn.py index 4bb58cf..e6bdf07 100644 --- a/pexpect/popen_spawn.py +++ b/pexpect/popen_spawn.py @@ -57,7 +57,7 @@ class PopenSpawn(SpawnBase): self._read_queue = Queue() self._read_thread = threading.Thread(target=self._read_incoming) - self._read_thread.setDaemon(True) + self._read_thread.daemon = True self._read_thread.start() _read_reached_eof = False diff --git a/pexpect/pty_spawn.py b/pexpect/pty_spawn.py index 691c2c6..8e28ca7 100644 --- a/pexpect/pty_spawn.py +++ b/pexpect/pty_spawn.py @@ -191,6 +191,7 @@ class spawn(SpawnBase): self.STDIN_FILENO = pty.STDIN_FILENO self.STDOUT_FILENO = pty.STDOUT_FILENO self.STDERR_FILENO = pty.STDERR_FILENO + self.str_last_chars = 100 self.cwd = cwd self.env = env self.echo = echo @@ -212,8 +213,8 @@ class spawn(SpawnBase): s.append(repr(self)) s.append('command: ' + str(self.command)) s.append('args: %r' % (self.args,)) - s.append('buffer (last 100 chars): %r' % self.buffer[-100:]) - s.append('before (last 100 chars): %r' % self.before[-100:] if self.before else '') + s.append('buffer (last %s chars): %r' % (self.str_last_chars,self.buffer[-self.str_last_chars:])) + s.append('before (last %s chars): %r' % (self.str_last_chars,self.before[-self.str_last_chars:] if self.before else '')) s.append('after: %r' % (self.after,)) s.append('match: %r' % (self.match,)) s.append('match_index: ' + str(self.match_index)) @@ -752,10 +753,14 @@ class spawn(SpawnBase): child process in interact mode is duplicated to the given log. You may pass in optional input and output filter functions. These - functions should take a string and return a string. The output_filter - will be passed all the output from the child process. The input_filter - will be passed all the keyboard input from the user. The input_filter - is run BEFORE the check for the escape_character. + functions should take bytes array and return bytes array too. Even + with ``encoding='utf-8'`` support, meth:`interact` will always pass + input_filter and output_filter bytes. You may need to wrap your + function to decode and encode back to UTF-8. + + The output_filter will be passed all the output from the child process. + The input_filter will be passed all the keyboard input from the user. + The input_filter is run BEFORE the check for the escape_character. Note that if you change the window size of the parent the SIGWINCH signal will not be passed through to the child. If you want the child diff --git a/pexpect/pxssh.py b/pexpect/pxssh.py index 49d20cc..3d53bd9 100644 --- a/pexpect/pxssh.py +++ b/pexpect/pxssh.py @@ -109,7 +109,7 @@ class pxssh (spawn): username = raw_input('username: ') password = getpass.getpass('password: ') s.login (hostname, username, password) - + `debug_command_string` is only for the test suite to confirm that the string generated for SSH is correct, using this will not allow you to do anything other than get a string back from `pxssh.pxssh.login()`. @@ -154,7 +154,7 @@ class pxssh (spawn): # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying! #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" self.force_password = False - + self.debug_command_string = debug_command_string # User defined SSH options, eg, @@ -220,7 +220,7 @@ class pxssh (spawn): can take 12 seconds. Low latency connections are more likely to fail with a low sync_multiplier. Best case sync time gets worse with a high sync multiplier (500 ms with default). ''' - + # All of these timing pace values are magic. # I came up with these based on what seemed reliable for # connecting to a heavily loaded machine I have. @@ -253,20 +253,19 @@ class pxssh (spawn): ### TODO: This is getting messy and I'm pretty sure this isn't perfect. ### TODO: I need to draw a flow chart for this. ### TODO: Unit tests for SSH tunnels, remote SSH command exec, disabling original prompt sync - def login (self, server, username, password='', terminal_type='ansi', + def login (self, server, username=None, password='', terminal_type='ansi', original_prompt=r"[#$]", login_timeout=10, port=None, auto_prompt_reset=True, ssh_key=None, quiet=True, sync_multiplier=1, check_local_ip=True, password_regex=r'(?i)(?:password:)|(?:passphrase for key)', ssh_tunnels={}, spawn_local_ssh=True, - sync_original_prompt=True, ssh_config=None): + sync_original_prompt=True, ssh_config=None, cmd='ssh'): '''This logs the user into the given server. - It uses - 'original_prompt' to try to find the prompt right after login. When it - finds the prompt it immediately tries to reset the prompt to something - more easily matched. The default 'original_prompt' is very optimistic - and is easily fooled. It's more reliable to try to match the original + It uses 'original_prompt' to try to find the prompt right after login. + When it finds the prompt it immediately tries to reset the prompt to + something more easily matched. The default 'original_prompt' is very + optimistic and is easily fooled. It's more reliable to try to match the original prompt as exactly as possible to prevent false matches by server strings such as the "Message Of The Day". On many systems you can disable the MOTD on the remote server by creating a zero-length file @@ -284,27 +283,31 @@ class pxssh (spawn): uses a unique prompt in the :meth:`prompt` method. If the original prompt is not reset then this will disable the :meth:`prompt` method unless you manually set the :attr:`PROMPT` attribute. - + Set ``password_regex`` if there is a MOTD message with `password` in it. Changing this is like playing in traffic, don't (p)expect it to match straight away. - + If you require to connect to another SSH server from the your original SSH connection set ``spawn_local_ssh`` to `False` and this will use your current session to do so. Setting this option to `False` and not having an active session will trigger an error. - + Set ``ssh_key`` to a file path to an SSH private key to use that SSH key for the session authentication. Set ``ssh_key`` to `True` to force passing the current SSH authentication socket to the desired ``hostname``. - + Set ``ssh_config`` to a file path string of an SSH client config file to pass that file to the client to handle itself. You may set any options you wish in here, however doing so will require you to post extra information that you may not want to if you run into issues. + + Alter the ``cmd`` to change the ssh client used, or to prepend it with network + namespaces. For example ```cmd="ip netns exec vlan2 ssh"``` to execute the ssh in + network namespace named ```vlan```. ''' - + session_regex_array = ["(?i)are you sure you want to continue connecting", original_prompt, password_regex, "(?i)permission denied", "(?i)terminal type", TIMEOUT] session_init_regex_array = [] session_init_regex_array.extend(session_regex_array) @@ -331,7 +334,7 @@ class pxssh (spawn): if spawn_local_ssh and not os.path.isfile(ssh_key): raise ExceptionPxssh('private ssh key does not exist or is not a file.') ssh_options = ssh_options + ' -i %s' % (ssh_key) - + # SSH tunnels, make sure you know what you're putting into the lists # under each heading. Do not expect these to open 100% of the time, # The port you're requesting might be bound. @@ -354,7 +357,42 @@ class pxssh (spawn): if spawn_local_ssh==False: tunnel = quote(str(tunnel)) ssh_options = ssh_options + ' -' + cmd_type + ' ' + str(tunnel) - cmd = "ssh %s -l %s %s" % (ssh_options, username, server) + + if username is not None: + ssh_options = ssh_options + ' -l ' + username + elif ssh_config is None: + raise TypeError('login() needs either a username or an ssh_config') + else: # make sure ssh_config has an entry for the server with a username + with open(ssh_config, 'rt') as f: + lines = [l.strip() for l in f.readlines()] + + server_regex = r'^Host\s+%s\s*$' % server + user_regex = r'^User\s+\w+\s*$' + config_has_server = False + server_has_username = False + for line in lines: + if not config_has_server and re.match(server_regex, line, re.IGNORECASE): + config_has_server = True + elif config_has_server and 'hostname' in line.lower(): + pass + elif config_has_server and 'host' in line.lower(): + server_has_username = False # insurance + break # we have left the relevant section + elif config_has_server and re.match(user_regex, line, re.IGNORECASE): + server_has_username = True + break + + if lines: + del line + + del lines + + if not config_has_server: + raise TypeError('login() ssh_config has no Host entry for %s' % server) + elif not server_has_username: + raise TypeError('login() ssh_config has no user entry for %s' % server) + + cmd += " %s %s" % (ssh_options, server) if self.debug_command_string: return(cmd) diff --git a/pexpect/replwrap.py b/pexpect/replwrap.py index c930f1e..6c34ce4 100644 --- a/pexpect/replwrap.py +++ b/pexpect/replwrap.py @@ -108,7 +108,7 @@ class REPLWrapper(object): + command) return u''.join(res + [self.child.before]) -def python(command="python"): +def python(command=sys.executable): """Start a Python shell and return a :class:`REPLWrapper` object.""" return REPLWrapper(command, u">>> ", u"import sys; sys.ps1={0!r}; sys.ps2={1!r}") diff --git a/pexpect/run.py b/pexpect/run.py index d9dfe76..5695ab7 100644 --- a/pexpect/run.py +++ b/pexpect/run.py @@ -66,8 +66,8 @@ def run(command, timeout=30, withexitstatus=False, events=None, The 'events' argument should be either a dictionary or a tuple list that contains patterns and responses. Whenever one of the patterns is seen in the command output, run() will send the associated response string. - So, run() in the above example can be also written as: - + So, run() in the above example can be also written as:: + run("mencoder dvd://1 -o video.avi -oac copy -ovc copy", events=[(TIMEOUT,print_ticks)], timeout=5) diff --git a/pexpect/screen.py b/pexpect/screen.py index 5ab45b9..79f95c4 100644 --- a/pexpect/screen.py +++ b/pexpect/screen.py @@ -90,7 +90,7 @@ class screen: self.encoding = encoding self.encoding_errors = encoding_errors if encoding is not None: - self.decoder = codecs.getincrementaldecoder(encoding)(encoding_errors) + self.decoder = codecs.getincrementaldecoder(encoding)(encoding_errors) else: self.decoder = None self.cur_r = 1 diff --git a/pexpect/spawnbase.py b/pexpect/spawnbase.py index ee96cfa..abf8071 100644 --- a/pexpect/spawnbase.py +++ b/pexpect/spawnbase.py @@ -120,6 +120,9 @@ class SpawnBase(object): self.async_pw_transport = None # This is the read buffer. See maxread. self._buffer = self.buffer_type() + # The buffer may be trimmed for efficiency reasons. This is the + # untrimmed buffer, used to create the before attribute. + self._before = self.buffer_type() def _log(self, s, direction): if self.logfile is not None: @@ -160,7 +163,7 @@ class SpawnBase(object): self._buffer = self.buffer_type() self._buffer.write(value) - # This property is provided for backwards compatability (self.buffer used + # This property is provided for backwards compatibility (self.buffer used # to be a string/bytes object) buffer = property(_get_buffer, _set_buffer) @@ -23,7 +23,7 @@ Don Libes' Expect. Pexpect allows your script to spawn a child application and control it as if a human were typing commands. Pexpect can be used for automating interactive applications such as ssh, ftp, -passwd, telnet, etc. It can be used to a automate setup scripts for duplicating +passwd, telnet, etc. It can be used to automate setup scripts for duplicating software package installations on different servers. It can be used for automated software testing. Pexpect is in the spirit of Don Libes' Expect, but Pexpect is pure Python. @@ -43,6 +43,12 @@ setup(name='pexpect', author='Noah Spurrier; Thomas Kluyver; Jeff Quast', author_email='noah@noah.org, thomas@kluyver.me.uk, contact@jeffquast.com', url='https://pexpect.readthedocs.io/', + project_urls={ + "Bug Tracker": "https://github.com/pexpect/pexpect/issues", + "Documentation": "https://pexpect.readthedocs.io/", + "Source Code": "https://github.com/pexpect/pexpect", + "History": "https://pexpect.readthedocs.io/en/stable/history.html", + }, license='ISC license', platforms='UNIX', classifiers = [ diff --git a/tests/depricated_test_filedescriptor.py b/tests/deprecated_test_filedescriptor.py index 6b0ef3e..6b0ef3e 100755 --- a/tests/depricated_test_filedescriptor.py +++ b/tests/deprecated_test_filedescriptor.py diff --git a/tests/test_run_out_of_pty.py b/tests/deprecated_test_run_out_of_pty.py index 3090147..3090147 100755 --- a/tests/test_run_out_of_pty.py +++ b/tests/deprecated_test_run_out_of_pty.py diff --git a/tests/fakessh/ssh b/tests/fakessh/ssh index d3259e4..4a5be1b 100755 --- a/tests/fakessh/ssh +++ b/tests/fakessh/ssh @@ -3,13 +3,52 @@ from __future__ import print_function import getpass import sys +import getopt PY3 = (sys.version_info[0] >= 3) if not PY3: input = raw_input -server = sys.argv[-1] -if server == 'noserver': - print('No route to host') +ssh_usage = "usage: ssh [-2qV] [-c cipher_spec] [-l login_name]\r\n" \ + + " hostname" + +cipher_valid_list = ['aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'arcfour256', 'arcfour128', \ + 'aes128-cbc','3des-cbc','blowfish-cbc','cast128-cbc','aes192-cbc', \ + 'aes256-cbc','arcfour'] + +try: + server = sys.argv[-1] + if server == 'noserver': + print('No route to host') + sys.exit(1) + + elif len(sys.argv) < 2: + print(ssh_usage) + sys.exit(1) + + cipher = '' + cipher_list = [] + fullCmdArguments = sys.argv + argumentList = fullCmdArguments[1:] + unixOptions = "2qVc:l" + arguments, values = getopt.getopt(argumentList, unixOptions) + for currentArgument, currentValue in arguments: + if currentArgument in ("-2"): + pass + elif currentArgument in ("-V"): + print("Mock SSH client version 0.2") + sys.exit(1) + elif currentArgument in ("-c"): + cipher = currentValue + cipher_list = cipher.split(",") + for cipher_item in cipher_list: + if cipher_item not in cipher_valid_list: + print("Unknown cipher type '" + str(cipher_item) + "'") + sys.exit(1) + + +except Exception as e: + print(ssh_usage) + print('error = ' + str(e)) sys.exit(1) print("Mock SSH client for tests. Do not enter real security info.") @@ -31,4 +70,5 @@ while True: elif cmd == 'echo $?': print(0) elif cmd in ('exit', 'logout'): + print('Closed connection') break diff --git a/tests/test_ansi.py b/tests/test_ansi.py index a9d445e..3d73fe8 100755 --- a/tests/test_ansi.py +++ b/tests/test_ansi.py @@ -21,7 +21,10 @@ PEXPECT LICENSE from pexpect import ANSI import unittest from . import PexpectTestCase +import os +import shutil import sys +import tempfile PY3 = (sys.version_info[0] >= 3) @@ -120,8 +123,17 @@ class ansiTestCase (PexpectTestCase.PexpectTestCase): s = ANSI.ANSI (24,80) with open('torturet.vt') as f: sample_text = f.read() - for c in sample_text: - s.process (c) + # This causes ANSI.py's DoLog to write in the cwd. Make sure we're in a + # writeable directory. + d = tempfile.mkdtemp() + old_cwd = os.getcwd() + os.chdir(d) + try: + for c in sample_text: + s.process (c) + finally: + os.chdir(old_cwd) + shutil.rmtree(d) assert s.pretty() == torture_target, 'processed: \n' + s.pretty() + '\nexpected:\n' + torture_target def test_tetris (self): diff --git a/tests/test_async.py b/tests/test_async.py index 991890c..466d56f 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -47,19 +47,19 @@ class AsyncTests(PexpectTestCase): run(coro) def test_expect_exact(self): - p = pexpect.spawn('%s list100.py' % sys.executable) + p = pexpect.spawn('%s list100.py' % self.PYTHONBIN) assert run(p.expect_exact(b'5', async_=True)) == 0 assert run(p.expect_exact(['wpeok', b'11'], async_=True)) == 1 assert run(p.expect_exact([b'foo', pexpect.EOF], async_=True)) == 1 def test_async_utf8(self): - p = pexpect.spawn('%s list100.py' % sys.executable, encoding='utf8') + p = pexpect.spawn('%s list100.py' % self.PYTHONBIN, encoding='utf8') assert run(p.expect_exact(u'5', async_=True)) == 0 assert run(p.expect_exact([u'wpeok', u'11'], async_=True)) == 1 assert run(p.expect_exact([u'foo', pexpect.EOF], async_=True)) == 1 def test_async_and_gc(self): - p = pexpect.spawn('%s sleep_for.py 1' % sys.executable, encoding='utf8') + p = pexpect.spawn('%s sleep_for.py 1' % self.PYTHONBIN, encoding='utf8') assert run(p.expect_exact(u'READY', async_=True)) == 0 gc.collect() assert run(p.expect_exact(u'END', async_=True)) == 0 diff --git a/tests/test_ctrl_chars.py b/tests/test_ctrl_chars.py index 10d03db..032027c 100755 --- a/tests/test_ctrl_chars.py +++ b/tests/test_ctrl_chars.py @@ -36,11 +36,14 @@ else: byte = chr class TestCtrlChars(PexpectTestCase.PexpectTestCase): + def setUp(self): + super(TestCtrlChars, self).setUp() + self.getch_cmd = self.PYTHONBIN + ' getch.py' def test_control_chars(self): '''This tests that we can send all 256 8-bit characters to a child process.''' - child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child = pexpect.spawn(self.getch_cmd, echo=False, timeout=5) child.expect('READY') for i in range(1, 256): child.send(byte(i)) @@ -54,7 +57,7 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase): assert child.exitstatus == 0 def test_sendintr (self): - child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child = pexpect.spawn(self.getch_cmd, echo=False, timeout=5) child.expect('READY') child.sendintr() child.expect(str(ord(ptyprocess._INTR)) + '<STOP>') @@ -66,7 +69,7 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase): assert child.exitstatus == 0 def test_sendeof(self): - child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child = pexpect.spawn(self.getch_cmd, echo=False, timeout=5) child.expect('READY') child.sendeof() child.expect(str(ord(ptyprocess._EOF)) + '<STOP>') @@ -80,14 +83,14 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase): def test_bad_sendcontrol_chars (self): '''This tests that sendcontrol will return 0 for an unknown char. ''' - child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child = pexpect.spawn(self.getch_cmd, echo=False, timeout=5) child.expect('READY') assert 0 == child.sendcontrol('1') def test_sendcontrol(self): '''This tests that we can send all special control codes by name. ''' - child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child = pexpect.spawn(self.getch_cmd, echo=False, timeout=5) child.expect('READY') for ctrl in 'abcdefghijklmnopqrstuvwxyz': assert child.sendcontrol(ctrl) == 1 diff --git a/tests/test_expect.py b/tests/test_expect.py index 31a4592..5e54d65 100755 --- a/tests/test_expect.py +++ b/tests/test_expect.py @@ -41,7 +41,7 @@ PY3 = bool(sys.version_info.major >= 3) FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)]) def hex_dump(src, length=16): result=[] - for i in xrange(0, len(src), length): + for i in range(0, len(src), length): s = src[i:i+length] hexa = ' '.join(["%02X"%ord(x) for x in s]) printable = s.translate(FILTER) @@ -525,6 +525,47 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): p.expect = p.expect_exact self._before_after(p) + def test_before_after_timeout(self): + '''Tests that timeouts do not truncate before, a bug in 4.4-4.7.''' + child = pexpect.spawn('cat', echo=False) + child.sendline('BEGIN') + for i in range(100): + child.sendline('foo' * 100) + e = child.expect([b'xyzzy', pexpect.TIMEOUT], + searchwindowsize=10, timeout=0.001) + self.assertEqual(e, 1) + child.sendline('xyzzy') + e = child.expect([b'xyzzy', pexpect.TIMEOUT], + searchwindowsize=10, timeout=30) + self.assertEqual(e, 0) + self.assertEqual(child.before[0:5], b'BEGIN') + child.sendeof() + child.expect(pexpect.EOF) + + def test_increasing_searchwindowsize(self): + '''Tests that the search window can be expanded, a bug in 4.4-4.7.''' + child = pexpect.spawn('cat', echo=False) + child.sendline('BEGIN') + for i in range(100): + child.sendline('foo' * 100) + e = child.expect([b'xyzzy', pexpect.TIMEOUT], + searchwindowsize=10, timeout=0.5) + self.assertEqual(e, 1) + e = child.expect([b'BEGIN', pexpect.TIMEOUT], + searchwindowsize=10, timeout=0.5) + self.assertEqual(e, 1) + e = child.expect([b'BEGIN', pexpect.TIMEOUT], + searchwindowsize=40000, timeout=30.0) + self.assertEqual(e, 0) + child.sendeof() + child.expect(pexpect.EOF) + + def test_searchwindowsize(self): + '''Tests that we don't match outside the window, a bug in 4.4-4.7.''' + p = pexpect.spawn('echo foobarbazbop') + e = p.expect([b'bar', b'bop'], searchwindowsize=6) + self.assertEqual(e, 1) + def _ordering(self, p): p.timeout = 20 p.expect(b'>>> ') diff --git a/tests/test_interact.py b/tests/test_interact.py index 865353b..4afbd18 100755 --- a/tests/test_interact.py +++ b/tests/test_interact.py @@ -41,7 +41,7 @@ class InteractTestCase (PexpectTestCase.PexpectTestCase): else: env['PYTHONPATH'] = self.project_dir - self.interact_py = ('{sys.executable} interact.py'.format(sys=sys)) + self.interact_py = ('{self.PYTHONBIN} interact.py'.format(self=self)) def test_interact_escape(self): " Ensure `escape_character' value exits interactive mode. " diff --git a/tests/test_isalive.py b/tests/test_isalive.py index cd79d09..5e3021e 100755 --- a/tests/test_isalive.py +++ b/tests/test_isalive.py @@ -57,7 +57,7 @@ class IsAliveTestCase(PexpectTestCase.PexpectTestCase): '''Test calling wait with a process terminated by a signal.''' if not hasattr(signal, 'SIGALRM'): return 'SKIP' - p = pexpect.spawn(sys.executable, ['alarm_die.py']) + p = pexpect.spawn(self.PYTHONBIN, ['alarm_die.py']) p.wait() assert p.exitstatus is None self.assertEqual(p.signalstatus, signal.SIGALRM) @@ -99,7 +99,7 @@ class IsAliveTestCase(PexpectTestCase.PexpectTestCase): assert not p.isalive() def test_forced_terminate(self): - p = pexpect.spawn(sys.executable, ['needs_kill.py']) + p = pexpect.spawn(self.PYTHONBIN, ['needs_kill.py']) p.expect('READY') assert p.terminate(force=True) == True p.expect(pexpect.EOF) diff --git a/tests/test_misc.py b/tests/test_misc.py index 6052b6a..7784759 100755 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -171,7 +171,7 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): def test_with(self): "spawn can be used as a context manager" - with pexpect.spawn(sys.executable + ' echo_w_prompt.py') as p: + with pexpect.spawn(self.PYTHONBIN + ' echo_w_prompt.py') as p: p.expect('<in >') p.sendline(b'alpha') p.expect(b'<out>alpha') @@ -187,7 +187,7 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): def test_sighup(self): " validate argument `ignore_sighup=True` and `ignore_sighup=False`. " - getch = sys.executable + ' getch.py' + getch = self.PYTHONBIN + ' getch.py' child = pexpect.spawn(getch, ignore_sighup=True) child.expect('READY') child.kill(signal.SIGHUP) diff --git a/tests/test_performance.py b/tests/test_performance.py index 63778af..d7e2cd6 100755 --- a/tests/test_performance.py +++ b/tests/test_performance.py @@ -45,7 +45,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): return 'for n in range(1, %d+1): print(n)' % n def plain_range(self, n): - e = pexpect.spawn('python', timeout=100) + e = pexpect.spawn(sys.executable, timeout=100) self.assertEqual(e.expect(b'>>>'), 0) e.sendline(self._iter_n(n)) self.assertEqual(e.expect(br'\.{3}'), 0) @@ -53,7 +53,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): self.assertEqual(e.expect([b'inquisition', '%d' % n]), 1) def window_range(self, n): - e = pexpect.spawn('python', timeout=100) + e = pexpect.spawn(sys.executable, timeout=100) self.assertEqual(e.expect(b'>>>'), 0) e.sendline(self._iter_n(n)) self.assertEqual(e.expect(r'\.{3}'), 0) @@ -61,7 +61,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): self.assertEqual(e.expect([b'inquisition', '%d' % n], searchwindowsize=20), 1) def exact_range(self, n): - e = pexpect.spawn('python', timeout=100) + e = pexpect.spawn(sys.executable, timeout=100) self.assertEqual(e.expect_exact([b'>>>']), 0) e.sendline(self._iter_n(n)) self.assertEqual(e.expect_exact([b'...']), 0) @@ -69,7 +69,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): self.assertEqual(e.expect_exact([b'inquisition', '%d' % n],timeout=520), 1) def ewin_range(self, n): - e = pexpect.spawn('python', timeout=100) + e = pexpect.spawn(sys.executable, timeout=100) self.assertEqual(e.expect_exact([b'>>>']), 0) e.sendline(self._iter_n(n)) self.assertEqual(e.expect_exact([b'...']), 0) @@ -77,7 +77,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): self.assertEqual(e.expect_exact([b'inquisition', '%d' % n], searchwindowsize=20), 1) def faster_range(self, n): - e = pexpect.spawn('python', timeout=100) + e = pexpect.spawn(sys.executable, timeout=100) self.assertEqual(e.expect(b'>>>'), 0) e.sendline(('list(range(1, %d+1))' % n).encode('ascii')) self.assertEqual(e.expect([b'inquisition', '%d' % n]), 1) diff --git a/tests/test_pxssh.py b/tests/test_pxssh.py index 5f82302..c6ec4e2 100644 --- a/tests/test_pxssh.py +++ b/tests/test_pxssh.py @@ -1,18 +1,24 @@ #!/usr/bin/env python import os +import shutil import tempfile import unittest from pexpect import pxssh +from .PexpectTestCase import PexpectTestCase -class SSHTestBase(unittest.TestCase): +class SSHTestBase(PexpectTestCase): def setUp(self): + super(SSHTestBase, self).setUp() + self.tempdir = tempfile.mkdtemp() self.orig_path = os.environ.get('PATH') + os.symlink(self.PYTHONBIN, os.path.join(self.tempdir, 'python')) fakessh_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'fakessh')) - os.environ['PATH'] = fakessh_dir + \ + os.environ['PATH'] = self.tempdir + os.pathsep + fakessh_dir + \ ((os.pathsep + self.orig_path) if self.orig_path else '') def tearDown(self): + shutil.rmtree(self.tempdir) if self.orig_path: os.environ['PATH'] = self.orig_path else: @@ -87,11 +93,82 @@ class PxsshTestCase(SSHTestBase): def test_ssh_config_passing_string(self): ssh = pxssh.pxssh(debug_command_string=True) - (temp_file,config_path) = tempfile.mkstemp() + temp_file = tempfile.NamedTemporaryFile() + config_path = temp_file.name string = ssh.login('server', 'me', password='s3cret', spawn_local_ssh=False, ssh_config=config_path) if not '-F '+config_path in string: assert False, 'String generated from SSH config passing is incorrect.' + def test_username_or_ssh_config(self): + try: + ssh = pxssh.pxssh(debug_command_string=True) + temp_file = tempfile.NamedTemporaryFile() + config_path = temp_file.name + string = ssh.login('server') + raise AssertionError('Should have failed due to missing username and missing ssh_config.') + except TypeError: + pass + + def test_ssh_config_user(self): + ssh = pxssh.pxssh(debug_command_string=True) + temp_file = tempfile.NamedTemporaryFile() + config_path = temp_file.name + temp_file.write(b'HosT server\n' + b'UsEr me\n' + b'hOSt not-server\n') + temp_file.seek(0) + string = ssh.login('server', ssh_config=config_path) + + def test_ssh_config_no_username_empty_config(self): + ssh = pxssh.pxssh(debug_command_string=True) + temp_file = tempfile.NamedTemporaryFile() + config_path = temp_file.name + try: + string = ssh.login('server', ssh_config=config_path) + raise AssertionError('Should have failed due to no Host.') + except TypeError: + pass + + def test_ssh_config_wrong_Host(self): + ssh = pxssh.pxssh(debug_command_string=True) + temp_file = tempfile.NamedTemporaryFile() + config_path = temp_file.name + temp_file.write(b'Host not-server\n' + b'Host also-not-server\n') + temp_file.seek(0) + try: + string = ssh.login('server', ssh_config=config_path) + raise AssertionError('Should have failed due to no matching Host.') + except TypeError: + pass + + def test_ssh_config_no_user(self): + ssh = pxssh.pxssh(debug_command_string=True) + temp_file = tempfile.NamedTemporaryFile() + config_path = temp_file.name + temp_file.write(b'Host server\n' + b'Host not-server\n') + temp_file.seek(0) + try: + string = ssh.login('server', ssh_config=config_path) + raise AssertionError('Should have failed due to no user.') + except TypeError: + pass + + def test_ssh_config_empty_user(self): + ssh = pxssh.pxssh(debug_command_string=True) + temp_file = tempfile.NamedTemporaryFile() + config_path = temp_file.name + temp_file.write(b'Host server\n' + b'user \n' + b'Host not-server\n') + temp_file.seek(0) + try: + string = ssh.login('server', ssh_config=config_path) + raise AssertionError('Should have failed due to empty user.') + except TypeError: + pass + def test_ssh_key_string(self): ssh = pxssh.pxssh(debug_command_string=True) confirmation_strings = 0 @@ -105,7 +182,8 @@ class PxsshTestCase(SSHTestBase): assert False, 'String generated from forcing the SSH agent sock is incorrect.' confirmation_strings = 0 - (temp_file,ssh_key) = tempfile.mkstemp() + temp_file = tempfile.NamedTemporaryFile() + ssh_key = temp_file.name confirmation_array = [' -i '+ssh_key] string = ssh.login('server', 'me', password='s3cret', ssh_key=ssh_key) for confirmation in confirmation_array: @@ -115,6 +193,86 @@ class PxsshTestCase(SSHTestBase): if confirmation_strings!=len(confirmation_array): assert False, 'String generated from adding an SSH key is incorrect.' + def test_custom_ssh_cmd_debug(self): + ssh = pxssh.pxssh(debug_command_string=True) + cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \ + + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \ + + 'aes256-cbc,arcfour' + confirmation_strings = 0 + confirmation_array = [cipher_string, '-2'] + string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2') + for confirmation in confirmation_array: + if confirmation in string: + confirmation_strings+=1 + + if confirmation_strings!=len(confirmation_array): + assert False, 'String generated for custom ssh client command is incorrect.' + + def test_custom_ssh_cmd_debug(self): + ssh = pxssh.pxssh(debug_command_string=True) + cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \ + + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \ + + 'aes256-cbc,arcfour' + confirmation_strings = 0 + confirmation_array = [cipher_string, '-2'] + string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2') + for confirmation in confirmation_array: + if confirmation in string: + confirmation_strings+=1 + + if confirmation_strings!=len(confirmation_array): + assert False, 'String generated for custom ssh client command is incorrect.' + + def test_failed_custom_ssh_cmd_debug(self): + ssh = pxssh.pxssh(debug_command_string=True) + cipher_string = '-c invalid_cipher' + confirmation_strings = 0 + confirmation_array = [cipher_string, '-2'] + string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2') + for confirmation in confirmation_array: + if confirmation in string: + confirmation_strings+=1 + + if confirmation_strings!=len(confirmation_array): + assert False, 'String generated for custom ssh client command is incorrect.' + + def test_custom_ssh_cmd(self): + try: + ssh = pxssh.pxssh() + cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \ + + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \ + + 'aes256-cbc,arcfour' + result = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2') + + ssh.PROMPT = r'Closed connection' + ssh.sendline('exit') + ssh.prompt(timeout=5) + string = str(ssh.before) + str(ssh.after) + + if 'Closed connection' not in string: + assert False, 'should have logged into Mock SSH client and exited' + except pxssh.ExceptionPxssh as e: + assert False, 'should not have raised exception, pxssh.ExceptionPxssh' + else: + pass + + def test_failed_custom_ssh_cmd(self): + try: + ssh = pxssh.pxssh() + cipher_string = '-c invalid_cipher' + result = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2') + + ssh.PROMPT = r'Closed connection' + ssh.sendline('exit') + ssh.prompt(timeout=5) + string = str(ssh.before) + str(ssh.after) + + if 'Closed connection' not in string: + assert False, 'should not have completed logging into Mock SSH client and exited' + except pxssh.ExceptionPxssh as e: + pass + else: + assert False, 'should have raised exception, pxssh.ExceptionPxssh' if __name__ == '__main__': unittest.main() diff --git a/tests/test_replwrap.py b/tests/test_replwrap.py index 06ca07b..e29080d 100644 --- a/tests/test_replwrap.py +++ b/tests/test_replwrap.py @@ -2,6 +2,7 @@ import platform import unittest import re import os +import sys import pexpect from pexpect import replwrap @@ -38,12 +39,13 @@ class REPLWrapTestCase(unittest.TestCase): " PAGER is set to cat, to prevent timeout in ``man sleep``. " bash = replwrap.bash() res = bash.run_command('man sleep', timeout=5) - assert 'SLEEP' in res, res + assert 'SLEEP' in res.upper(), res def test_bash_env(self): """env, which displays PS1=..., should not mess up finding the prompt. """ bash = replwrap.bash() + res = bash.run_command("export PS1") res = bash.run_command("env") self.assertIn('PS1', res) res = bash.run_command("echo $HOME") @@ -108,7 +110,7 @@ class REPLWrapTestCase(unittest.TestCase): if platform.python_implementation() == 'PyPy': raise unittest.SkipTest(skip_pypy) - child = pexpect.spawn('python', echo=False, timeout=5, encoding='utf-8') + child = pexpect.spawn(sys.executable, echo=False, timeout=5, encoding='utf-8') # prompt_change=None should mean no prompt change py = replwrap.REPLWrapper(child, u">>> ", prompt_change=None, continuation_prompt=u"... ") diff --git a/tests/test_run.py b/tests/test_run.py index 1b3c92f..f750fb2 100755 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -69,7 +69,7 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase): super(RunFuncTestCase, self).tearDown() def test_run_exit(self): - (data, exitstatus) = self.runfunc('python exit1.py', withexitstatus=1) + (data, exitstatus) = self.runfunc(sys.executable + ' exit1.py', withexitstatus=1) assert exitstatus == 1, "Exit status of 'python exit1.py' should be 1." def test_run(self): @@ -181,7 +181,7 @@ class RunUnicodeFuncTestCase(RunFuncTestCase): else: return True # Stop the child process - output = pexpect.runu(sys.executable + ' echo_w_prompt.py', + output = pexpect.runu(self.PYTHONBIN + ' echo_w_prompt.py', env={'PYTHONIOENCODING': 'utf-8'}, events={pattern: callback}) assert isinstance(output, unicode_type), type(output) diff --git a/tests/test_socket.py b/tests/test_socket.py index 21648f4..a8c8595 100644 --- a/tests/test_socket.py +++ b/tests/test_socket.py @@ -39,7 +39,17 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase): def setUp(self): print(self.id()) PexpectTestCase.PexpectTestCase.setUp(self) + self.af = socket.AF_INET self.host = '127.0.0.1' + try: + socket.socket(socket.AF_INET, socket.SOCK_STREAM) + except socket.error: + try: + socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + self.af = socket.AF_INET6 + self.host = '::1' + except socket.error: + pass self.port = 49152 + 10000 self.motd = b"""\ ------------------------------------------------------------------------------ @@ -92,7 +102,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase): def socket_server(self, server_up): sock = None try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(self.af, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port)) sock.listen(5) @@ -126,7 +136,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase): def socket_fn(self, timed_out, all_read): result = 0 try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(self.af, socket.SOCK_STREAM) sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock, timeout=10) # Get all data from server @@ -140,7 +150,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase): exit(result) def test_socket(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(self.af, socket.SOCK_STREAM) sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock.fileno(), timeout=10) session.expect(self.prompt1) @@ -154,7 +164,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase): self.assertEqual(session.before, b'') def test_socket_with_write(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(self.af, socket.SOCK_STREAM) sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock.fileno(), timeout=10) session.expect(self.prompt1) @@ -177,7 +187,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase): def test_timeout(self): with self.assertRaises(pexpect.TIMEOUT): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(self.af, socket.SOCK_STREAM) sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock, timeout=10) session.expect(b'Bogus response') @@ -211,7 +221,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase): self.assertEqual(test_proc.exitcode, errno.ETIMEDOUT) def test_maxread(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(self.af, socket.SOCK_STREAM) sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock.fileno(), timeout=10) session.maxread = 1100 @@ -226,7 +236,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase): self.assertEqual(session.before, b'') def test_fd_isalive(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(self.af, socket.SOCK_STREAM) sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock.fileno(), timeout=10) assert session.isalive() @@ -234,7 +244,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase): assert not session.isalive(), "Should not be alive after close()" def test_fd_isalive_poll(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(self.af, socket.SOCK_STREAM) sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock.fileno(), timeout=10, use_poll=True) assert session.isalive() @@ -242,21 +252,21 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase): assert not session.isalive(), "Should not be alive after close()" def test_fd_isatty(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(self.af, socket.SOCK_STREAM) sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock.fileno(), timeout=10) assert not session.isatty() session.close() def test_fd_isatty_poll(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(self.af, socket.SOCK_STREAM) sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock.fileno(), timeout=10, use_poll=True) assert not session.isatty() session.close() def test_fileobj(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(self.af, socket.SOCK_STREAM) sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock, timeout=10) # Should get the fileno from the socket session.expect(self.prompt1) diff --git a/tests/test_which.py b/tests/test_which.py index 15a8944..e1b19a8 100644 --- a/tests/test_which.py +++ b/tests/test_which.py @@ -2,6 +2,7 @@ import subprocess import tempfile import shutil +import sys import errno import os @@ -41,8 +42,12 @@ class TestCaseWhich(PexpectTestCase.PexpectTestCase): " which() finds an executable in $PATH and returns its abspath. " bin_dir = tempfile.mkdtemp() + if sys.getfilesystemencoding() in ('ascii', 'ANSI_X3.4-1968'): + prefix = 'ascii-' + else: + prefix = u'ǝpoɔıun-' temp_obj = tempfile.NamedTemporaryFile( - suffix=u'.sh', prefix=u'ǝpoɔıun-', + suffix=u'.sh', prefix=prefix, dir=bin_dir, delete=False) bin_path = temp_obj.name fname = os.path.basename(temp_obj.name) |