summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/ci.yml40
-rw-r--r--doc/api/index.rst1
-rw-r--r--doc/api/socket_pexpect.rst20
-rwxr-xr-xexamples/terminal_emulation.py120
-rw-r--r--pexpect/__init__.py6
-rw-r--r--pexpect/_async.py131
-rw-r--r--pexpect/_async_pre_await.py111
-rw-r--r--pexpect/_async_w_await.py118
-rw-r--r--pexpect/fdpexpect.py6
-rw-r--r--pexpect/pxssh.py5
-rw-r--r--pexpect/socket_pexpect.py145
-rw-r--r--pexpect/spawnbase.py11
-rw-r--r--requirements-testing.txt1
-rw-r--r--tests/PexpectTestCase.py35
-rwxr-xr-xtests/deprecated_test_filedescriptor.py2
-rwxr-xr-xtests/deprecated_test_run_out_of_pty.py2
-rwxr-xr-xtests/fakessh/ssh2
-rwxr-xr-xtests/test_ansi.py2
-rwxr-xr-xtests/test_command_list_split.py2
-rwxr-xr-xtests/test_constructor.py2
-rwxr-xr-xtests/test_ctrl_chars.py7
-rwxr-xr-xtests/test_destructor.py2
-rwxr-xr-xtests/test_dotall.py2
-rwxr-xr-xtests/test_expect.py76
-rwxr-xr-xtests/test_filedescriptor.py2
-rwxr-xr-xtests/test_interact.py10
-rwxr-xr-xtests/test_isalive.py2
-rwxr-xr-xtests/test_log.py2
-rwxr-xr-xtests/test_misc.py2
-rwxr-xr-xtests/test_missing_command.py2
-rwxr-xr-xtests/test_performance.py2
-rw-r--r--tests/test_popen_spawn.py2
-rw-r--r--tests/test_pxssh.py4
-rw-r--r--tests/test_replwrap.py2
-rwxr-xr-xtests/test_run.py28
-rwxr-xr-xtests/test_screen.py2
-rw-r--r--tests/test_socket.py42
-rw-r--r--tests/test_socket_fd.py64
-rw-r--r--tests/test_socket_pexpect.py72
-rwxr-xr-xtests/test_timeout_pattern.py2
-rw-r--r--tests/test_unicode.py2
-rwxr-xr-xtests/test_winsize.py2
42 files changed, 898 insertions, 195 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 0000000..dbfbbea
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,40 @@
+name: CI
+
+on:
+ push:
+ branches: [ master ]
+ pull_request:
+ branches: [ master ]
+
+jobs:
+ test:
+ runs-on: ubuntu-22.04
+ strategy:
+ fail-fast: false
+ matrix:
+ python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "pypy3.9"]
+
+ steps:
+ - uses: actions/checkout@v3
+
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v4
+ with:
+ python-version: ${{ matrix.python-version }}
+
+ - name: Install packages
+ run: |
+ export PYTHONIOENCODING=UTF8
+ pip install coveralls pytest-cov ptyprocess
+
+ - name: Run tests
+ run: |
+ ./tools/display-sighandlers.py
+ ./tools/display-terminalinfo.py
+ py.test --cov pexpect --cov-config .coveragerc
+
+ - name: Check coverage
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ run: |
+ coveralls --service=github
diff --git a/doc/api/index.rst b/doc/api/index.rst
index 5277d1c..747c812 100644
--- a/doc/api/index.rst
+++ b/doc/api/index.rst
@@ -6,6 +6,7 @@ API documentation
pexpect
fdpexpect
+ socket_pexpect
popen_spawn
replwrap
pxssh
diff --git a/doc/api/socket_pexpect.rst b/doc/api/socket_pexpect.rst
new file mode 100644
index 0000000..726a999
--- /dev/null
+++ b/doc/api/socket_pexpect.rst
@@ -0,0 +1,20 @@
+socket_pexpect - use pexpect with a socket
+==========================================
+
+.. automodule:: pexpect.socket_pexpect
+
+SocketSpawn class
+-----------------
+
+.. autoclass:: SocketSpawn
+ :show-inheritance:
+
+ .. automethod:: __init__
+ .. automethod:: isalive
+ .. automethod:: close
+
+ .. method:: expect
+ expect_exact
+ expect_list
+
+ As :class:`pexpect.spawn`.
diff --git a/examples/terminal_emulation.py b/examples/terminal_emulation.py
new file mode 100755
index 0000000..802f9e8
--- /dev/null
+++ b/examples/terminal_emulation.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python
+
+'''These examples show how to integrate pexpect with pyte, an ANSI terminal
+emulator.
+
+These examples were taken from:
+https://byexamples.github.io/byexample
+
+We will execute three commands:
+ - an 'echo' of a colored message to show how the ANSI colors can be removed.
+ - an 'echo' of a very large message to show how pyte emulates the terminal
+ geometry
+ - a 'less' of a very small file to show how pyte handles not only
+ the terminal geometry but also how interprets ANSI commands that control
+ the position of the cursor.
+
+See also https://github.com/pexpect/pexpect/issues/587
+
+PEXPECT LICENSE
+
+ This license is approved by the OSI and FSF as GPL-compatible.
+ http://opensource.org/licenses/isc-license.txt
+
+ Copyright (c) 2012, Noah Spurrier <noah@noah.org>
+ PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
+ PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
+ COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+'''
+
+from __future__ import print_function
+from __future__ import absolute_import
+from __future__ import unicode_literals
+
+import pexpect
+import pyte
+import os
+
+# The geometry of the terminal. Typically this is 24x80
+# but we are going to us set a much smaller terminal
+# to show how to change the default.
+ROWS, COLS = 10, 40
+
+# We create the Screen with the correct geometry and
+# a Stream to process the output coming from pexpect.
+screen = pyte.Screen(COLS, ROWS)
+stream = pyte.Stream(screen)
+
+# Spawn a process using pexpect.spawn as usual
+# with a particularity: it sets the geometry of the terminal
+# using the environment variables *and* using the 'dimensions'
+# parameter of pexpect.spawn.
+# This is needed because no all the program honors the geometry
+# set by pexpect or by the env vars.
+def spawn_process(cmd):
+ env = os.environ.copy()
+ env.update({'LINES': str(ROWS), 'COLUMNS': str(COLS)})
+
+ return pexpect.spawn(cmd, echo=False, encoding='utf-8', dimensions=(ROWS, COLS), env=env)
+
+# Send the raw output to pyte.Stream and get the emulated output
+# from pyte.Screen.
+# In each call we *reset* the display so we don't get the same
+# emulated output twice.
+#
+# Pyte emulates the whole terminal so it will return us ROWS rows
+# of each COLS columns each one completed with spaces.
+#
+# Optionally we strip the whitespace on the right and any empty line
+def emulate_ansi_terminal(raw_output, clean=True):
+ stream.feed(raw_output)
+
+ lines = screen.display
+ screen.reset()
+
+ if clean:
+ lines = (line.rstrip() for line in lines)
+ lines = (line for line in lines if line)
+
+ return '\n'.join(lines)
+
+def pprint(out):
+ print("-" * COLS)
+ print(out)
+ print("-" * COLS)
+
+print("\nFirst example: echo a message with ANSI color sequences.")
+child = spawn_process(r'echo -e "\033[31mThis message should not be in red\033[0m"')
+child.expect(pexpect.EOF)
+out = emulate_ansi_terminal(child.before)
+
+print("This should *not* print any escape sequence,",
+ "those were emulated and discarded by pyte.\n")
+pprint(out)
+
+print("\nSecond example: echo a very large message.")
+msg = ("aaaabbbb" * 8)
+child = spawn_process('echo "%s"' % msg)
+child.expect(pexpect.EOF)
+out = emulate_ansi_terminal(child.before)
+
+print("This should print the message in *two* lines because we",
+ "configured a terminal very small and the message will",
+ "not fit in one line.\n")
+pprint(out)
+
+
+print("\nThird example: run the less program.")
+child = spawn_process('''bash -c "head -n7 '%s' | less"''' % __file__)
+child.expect(pexpect.TIMEOUT, timeout=5)
+out = emulate_ansi_terminal(child.before, clean=False)
+
+pprint(out)
diff --git a/pexpect/__init__.py b/pexpect/__init__.py
index b3a64b6..1c6bdb7 100644
--- a/pexpect/__init__.py
+++ b/pexpect/__init__.py
@@ -29,6 +29,12 @@ For example::
child.expect('Password:')
child.sendline(mypassword)
+Context manager can be used for the spawn() function::
+
+ with pexpect.spawn('scp foo user@example.com:.') as child:
+ child.expect('Password:')
+ child.sendline(mypassword)
+
This works even for commands that ask for passwords or other input outside of
the normal stdio streams. For example, ssh reads input directly from the TTY
device which bypasses stdin.
diff --git a/pexpect/_async.py b/pexpect/_async.py
index dfbfeef..261720c 100644
--- a/pexpect/_async.py
+++ b/pexpect/_async.py
@@ -1,103 +1,28 @@
-import asyncio
-import errno
-import signal
-
-from pexpect import EOF
-
-@asyncio.coroutine
-def expect_async(expecter, timeout=None):
- # First process data that was previously read - if it maches, we don't need
- # async stuff.
- idx = expecter.existing_data()
- if idx is not None:
- return idx
- if not expecter.spawn.async_pw_transport:
- pw = PatternWaiter()
- pw.set_expecter(expecter)
- transport, pw = yield from asyncio.get_event_loop()\
- .connect_read_pipe(lambda: pw, expecter.spawn)
- expecter.spawn.async_pw_transport = pw, transport
- else:
- pw, transport = expecter.spawn.async_pw_transport
- pw.set_expecter(expecter)
- transport.resume_reading()
- try:
- return (yield from asyncio.wait_for(pw.fut, timeout))
- except asyncio.TimeoutError as e:
- transport.pause_reading()
- return expecter.timeout(e)
-
-@asyncio.coroutine
-def repl_run_command_async(repl, cmdlines, timeout=-1):
- res = []
- repl.child.sendline(cmdlines[0])
- for line in cmdlines[1:]:
- yield from repl._expect_prompt(timeout=timeout, async_=True)
- res.append(repl.child.before)
- repl.child.sendline(line)
-
- # Command was fully submitted, now wait for the next prompt
- prompt_idx = yield from repl._expect_prompt(timeout=timeout, async_=True)
- if prompt_idx == 1:
- # We got the continuation prompt - command was incomplete
- repl.child.kill(signal.SIGINT)
- yield from repl._expect_prompt(timeout=1, async_=True)
- raise ValueError("Continuation prompt found - input was incomplete:")
- return u''.join(res + [repl.child.before])
-
-class PatternWaiter(asyncio.Protocol):
- transport = None
-
- def set_expecter(self, expecter):
- self.expecter = expecter
- self.fut = asyncio.Future()
-
- def found(self, result):
- if not self.fut.done():
- self.fut.set_result(result)
- self.transport.pause_reading()
-
- def error(self, exc):
- if not self.fut.done():
- self.fut.set_exception(exc)
- self.transport.pause_reading()
-
- def connection_made(self, transport):
- self.transport = transport
-
- def data_received(self, data):
- spawn = self.expecter.spawn
- s = spawn._decoder.decode(data)
- spawn._log(s, 'read')
-
- if self.fut.done():
- spawn._before.write(s)
- spawn._buffer.write(s)
- return
-
- try:
- index = self.expecter.new_data(s)
- if index is not None:
- # Found a match
- self.found(index)
- except Exception as e:
- self.expecter.errored()
- self.error(e)
-
- def eof_received(self):
- # N.B. If this gets called, async will close the pipe (the spawn object)
- # for us
- try:
- self.expecter.spawn.flag_eof = True
- index = self.expecter.eof()
- except EOF as e:
- self.error(e)
- else:
- self.found(index)
-
- def connection_lost(self, exc):
- if isinstance(exc, OSError) and exc.errno == errno.EIO:
- # We may get here without eof_received being called, e.g on Linux
- self.eof_received()
- elif exc is not None:
- self.error(exc)
+"""Facade that provides coroutines implementation pertinent to running Py version.
+
+Python 3.5 introduced the async def/await syntax keyword.
+With later versions coroutines and methods to get the running asyncio loop are
+being deprecated, not supported anymore.
+
+For Python versions later than 3.6, coroutines and objects that are defined via
+``async def``/``await`` keywords are imported.
+
+Here the code is just imported, to provide the same interface to older code.
+"""
+# pylint: disable=unused-import
+# flake8: noqa: F401
+from sys import version_info as py_version_info
+
+# this assumes async def/await are more stable
+if py_version_info >= (3, 6):
+ from pexpect._async_w_await import (
+ PatternWaiter,
+ expect_async,
+ repl_run_command_async,
+ )
+else:
+ from pexpect._async_pre_await import (
+ PatternWaiter,
+ expect_async,
+ repl_run_command_async,
+ )
diff --git a/pexpect/_async_pre_await.py b/pexpect/_async_pre_await.py
new file mode 100644
index 0000000..81ece1b
--- /dev/null
+++ b/pexpect/_async_pre_await.py
@@ -0,0 +1,111 @@
+"""Implementation of coroutines without using ``async def``/``await`` keywords.
+
+``@asyncio.coroutine`` and ``yield from`` are used here instead.
+"""
+import asyncio
+import errno
+import signal
+
+from pexpect import EOF
+
+
+@asyncio.coroutine
+def expect_async(expecter, timeout=None):
+ # First process data that was previously read - if it maches, we don't need
+ # async stuff.
+ idx = expecter.existing_data()
+ if idx is not None:
+ return idx
+ if not expecter.spawn.async_pw_transport:
+ pw = PatternWaiter()
+ pw.set_expecter(expecter)
+ transport, pw = yield from asyncio.get_event_loop().connect_read_pipe(
+ lambda: pw, expecter.spawn
+ )
+ expecter.spawn.async_pw_transport = pw, transport
+ else:
+ pw, transport = expecter.spawn.async_pw_transport
+ pw.set_expecter(expecter)
+ transport.resume_reading()
+ try:
+ return (yield from asyncio.wait_for(pw.fut, timeout))
+ except asyncio.TimeoutError as e:
+ transport.pause_reading()
+ return expecter.timeout(e)
+
+
+@asyncio.coroutine
+def repl_run_command_async(repl, cmdlines, timeout=-1):
+ res = []
+ repl.child.sendline(cmdlines[0])
+ for line in cmdlines[1:]:
+ yield from repl._expect_prompt(timeout=timeout, async_=True)
+ res.append(repl.child.before)
+ repl.child.sendline(line)
+
+ # Command was fully submitted, now wait for the next prompt
+ prompt_idx = yield from repl._expect_prompt(timeout=timeout, async_=True)
+ if prompt_idx == 1:
+ # We got the continuation prompt - command was incomplete
+ repl.child.kill(signal.SIGINT)
+ yield from repl._expect_prompt(timeout=1, async_=True)
+ raise ValueError("Continuation prompt found - input was incomplete:")
+ return "".join(res + [repl.child.before])
+
+
+class PatternWaiter(asyncio.Protocol):
+ transport = None
+
+ def set_expecter(self, expecter):
+ self.expecter = expecter
+ self.fut = asyncio.Future()
+
+ def found(self, result):
+ if not self.fut.done():
+ self.fut.set_result(result)
+ self.transport.pause_reading()
+
+ def error(self, exc):
+ if not self.fut.done():
+ self.fut.set_exception(exc)
+ self.transport.pause_reading()
+
+ def connection_made(self, transport):
+ self.transport = transport
+
+ def data_received(self, data):
+ spawn = self.expecter.spawn
+ s = spawn._decoder.decode(data)
+ spawn._log(s, "read")
+
+ if self.fut.done():
+ spawn._before.write(s)
+ spawn._buffer.write(s)
+ return
+
+ try:
+ index = self.expecter.new_data(s)
+ if index is not None:
+ # Found a match
+ self.found(index)
+ except Exception as e:
+ self.expecter.errored()
+ self.error(e)
+
+ def eof_received(self):
+ # N.B. If this gets called, async will close the pipe (the spawn object)
+ # for us
+ try:
+ self.expecter.spawn.flag_eof = True
+ index = self.expecter.eof()
+ except EOF as e:
+ self.error(e)
+ else:
+ self.found(index)
+
+ def connection_lost(self, exc):
+ if isinstance(exc, OSError) and exc.errno == errno.EIO:
+ # We may get here without eof_received being called, e.g on Linux
+ self.eof_received()
+ elif exc is not None:
+ self.error(exc)
diff --git a/pexpect/_async_w_await.py b/pexpect/_async_w_await.py
new file mode 100644
index 0000000..59cb1ef
--- /dev/null
+++ b/pexpect/_async_w_await.py
@@ -0,0 +1,118 @@
+"""Implementation of coroutines using ``async def``/``await`` keywords.
+
+These keywords replaced ``@asyncio.coroutine`` and ``yield from`` from
+Python 3.5 onwards.
+"""
+import asyncio
+import errno
+import signal
+from sys import version_info as py_version_info
+
+from pexpect import EOF
+
+if py_version_info >= (3, 7):
+ # get_running_loop, new in 3.7, is preferred to get_event_loop
+ _loop_getter = asyncio.get_running_loop
+else:
+ # Deprecation warning since 3.10
+ _loop_getter = asyncio.get_event_loop
+
+
+async def expect_async(expecter, timeout=None):
+ # First process data that was previously read - if it maches, we don't need
+ # async stuff.
+ idx = expecter.existing_data()
+ if idx is not None:
+ return idx
+ if not expecter.spawn.async_pw_transport:
+ pattern_waiter = PatternWaiter()
+ pattern_waiter.set_expecter(expecter)
+ transport, pattern_waiter = await _loop_getter().connect_read_pipe(
+ lambda: pattern_waiter, expecter.spawn
+ )
+ expecter.spawn.async_pw_transport = pattern_waiter, transport
+ else:
+ pattern_waiter, transport = expecter.spawn.async_pw_transport
+ pattern_waiter.set_expecter(expecter)
+ transport.resume_reading()
+ try:
+ return await asyncio.wait_for(pattern_waiter.fut, timeout)
+ except asyncio.TimeoutError as exc:
+ transport.pause_reading()
+ return expecter.timeout(exc)
+
+
+async def repl_run_command_async(repl, cmdlines, timeout=-1):
+ res = []
+ repl.child.sendline(cmdlines[0])
+ for line in cmdlines[1:]:
+ await repl._expect_prompt(timeout=timeout, async_=True)
+ res.append(repl.child.before)
+ repl.child.sendline(line)
+
+ # Command was fully submitted, now wait for the next prompt
+ prompt_idx = await repl._expect_prompt(timeout=timeout, async_=True)
+ if prompt_idx == 1:
+ # We got the continuation prompt - command was incomplete
+ repl.child.kill(signal.SIGINT)
+ await repl._expect_prompt(timeout=1, async_=True)
+ raise ValueError("Continuation prompt found - input was incomplete:")
+ return "".join(res + [repl.child.before])
+
+
+class PatternWaiter(asyncio.Protocol):
+ transport = None
+
+ def set_expecter(self, expecter):
+ self.expecter = expecter
+ self.fut = asyncio.Future()
+
+ def found(self, result):
+ if not self.fut.done():
+ self.fut.set_result(result)
+ self.transport.pause_reading()
+
+ def error(self, exc):
+ if not self.fut.done():
+ self.fut.set_exception(exc)
+ self.transport.pause_reading()
+
+ def connection_made(self, transport):
+ self.transport = transport
+
+ def data_received(self, data):
+ spawn = self.expecter.spawn
+ s = spawn._decoder.decode(data)
+ spawn._log(s, "read")
+
+ if self.fut.done():
+ spawn._before.write(s)
+ spawn._buffer.write(s)
+ return
+
+ try:
+ index = self.expecter.new_data(s)
+ if index is not None:
+ # Found a match
+ self.found(index)
+ except Exception as exc:
+ self.expecter.errored()
+ self.error(exc)
+
+ def eof_received(self):
+ # N.B. If this gets called, async will close the pipe (the spawn object)
+ # for us
+ try:
+ self.expecter.spawn.flag_eof = True
+ index = self.expecter.eof()
+ except EOF as exc:
+ self.error(exc)
+ else:
+ self.found(index)
+
+ def connection_lost(self, exc):
+ if isinstance(exc, OSError) and exc.errno == errno.EIO:
+ # We may get here without eof_received being called, e.g on Linux
+ self.eof_received()
+ elif exc is not None:
+ self.error(exc)
diff --git a/pexpect/fdpexpect.py b/pexpect/fdpexpect.py
index cddd50e..140bdfe 100644
--- a/pexpect/fdpexpect.py
+++ b/pexpect/fdpexpect.py
@@ -1,7 +1,11 @@
-'''This is like pexpect, but it will work with any file descriptor that you
+'''This is like :mod:`pexpect`, but it will work with any file descriptor that you
pass it. You are responsible for opening and close the file descriptor.
This allows you to use Pexpect with sockets and named pipes (FIFOs).
+.. note::
+ socket.fileno() does not give a readable file descriptor on windows.
+ Use :mod:`pexpect.socket_pexpect` for cross-platform socket support
+
PEXPECT LICENSE
This license is approved by the OSI and FSF as GPL-compatible.
diff --git a/pexpect/pxssh.py b/pexpect/pxssh.py
index 3d53bd9..bfefc7a 100644
--- a/pexpect/pxssh.py
+++ b/pexpect/pxssh.py
@@ -143,8 +143,7 @@ class pxssh (spawn):
# used to set shell command-line prompt to UNIQUE_PROMPT.
self.PROMPT_SET_SH = r"PS1='[PEXPECT]\$ '"
self.PROMPT_SET_CSH = r"set prompt='[PEXPECT]\$ '"
- self.SSH_OPTS = ("-o'RSAAuthentication=no'"
- + " -o 'PubkeyAuthentication=no'")
+ self.SSH_OPTS = (" -o 'PubkeyAuthentication=no'")
# Disabling host key checking, makes you vulnerable to MITM attacks.
# + " -o 'StrictHostKeyChecking=no'"
# + " -o 'UserKnownHostsFile /dev/null' ")
@@ -152,7 +151,7 @@ class pxssh (spawn):
# displaying a GUI password dialog. I have not figured out how to
# disable only SSH_ASKPASS without also disabling X11 forwarding.
# Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
- #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
+ #self.SSH_OPTS = "-x -o 'PubkeyAuthentication=no'"
self.force_password = False
self.debug_command_string = debug_command_string
diff --git a/pexpect/socket_pexpect.py b/pexpect/socket_pexpect.py
new file mode 100644
index 0000000..cb11ac2
--- /dev/null
+++ b/pexpect/socket_pexpect.py
@@ -0,0 +1,145 @@
+"""This is like :mod:`pexpect`, but it will work with any socket that you
+pass it. You are responsible for opening and closing the socket.
+
+PEXPECT LICENSE
+
+ This license is approved by the OSI and FSF as GPL-compatible.
+ http://opensource.org/licenses/isc-license.txt
+
+ Copyright (c) 2012, Noah Spurrier <noah@noah.org>
+ PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
+ PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
+ COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""
+
+import socket
+from contextlib import contextmanager
+
+from .exceptions import TIMEOUT, EOF
+from .spawnbase import SpawnBase
+
+__all__ = ["SocketSpawn"]
+
+
+class SocketSpawn(SpawnBase):
+ """This is like :mod:`pexpect.fdpexpect` but uses the cross-platform python socket api,
+ rather than the unix-specific file descriptor api. Thus, it works with
+ remote connections on both unix and windows."""
+
+ def __init__(
+ self,
+ socket: socket.socket,
+ args=None,
+ timeout=30,
+ maxread=2000,
+ searchwindowsize=None,
+ logfile=None,
+ encoding=None,
+ codec_errors="strict",
+ use_poll=False,
+ ):
+ """This takes an open socket."""
+
+ self.args = None
+ self.command = None
+ SpawnBase.__init__(
+ self,
+ timeout,
+ maxread,
+ searchwindowsize,
+ logfile,
+ encoding=encoding,
+ codec_errors=codec_errors,
+ )
+ self.socket = socket
+ self.child_fd = socket.fileno()
+ self.closed = False
+ self.name = "<socket %s>" % socket
+ self.use_poll = use_poll
+
+ def close(self):
+ """Close the socket.
+
+ Calling this method a second time does nothing, but if the file
+ descriptor was closed elsewhere, :class:`OSError` will be raised.
+ """
+ if self.child_fd == -1:
+ return
+
+ self.flush()
+ self.socket.shutdown(socket.SHUT_RDWR)
+ self.socket.close()
+ self.child_fd = -1
+ self.closed = True
+
+ def isalive(self):
+ """ Alive if the fileno is valid """
+ return self.socket.fileno() >= 0
+
+ def send(self, s) -> int:
+ """Write to socket, return number of bytes written"""
+ s = self._coerce_send_string(s)
+ self._log(s, "send")
+
+ b = self._encoder.encode(s, final=False)
+ self.socket.sendall(b)
+ return len(b)
+
+ def sendline(self, s) -> int:
+ """Write to socket with trailing newline, return number of bytes written"""
+ s = self._coerce_send_string(s)
+ return self.send(s + self.linesep)
+
+ def write(self, s):
+ """Write to socket, return None"""
+ self.send(s)
+
+ def writelines(self, sequence):
+ "Call self.write() for each item in sequence"
+ for s in sequence:
+ self.write(s)
+
+ @contextmanager
+ def _timeout(self, timeout):
+ saved_timeout = self.socket.gettimeout()
+ try:
+ self.socket.settimeout(timeout)
+ yield
+ finally:
+ self.socket.settimeout(saved_timeout)
+
+ def read_nonblocking(self, size=1, timeout=-1):
+ """
+ Read from the file descriptor and return the result as a string.
+
+ The read_nonblocking method of :class:`SpawnBase` assumes that a call
+ to os.read will not block (timeout parameter is ignored). This is not
+ the case for POSIX file-like objects such as sockets and serial ports.
+
+ Use :func:`select.select`, timeout is implemented conditionally for
+ POSIX systems.
+
+ :param int size: Read at most *size* bytes.
+ :param int timeout: Wait timeout seconds for file descriptor to be
+ ready to read. When -1 (default), use self.timeout. When 0, poll.
+ :return: String containing the bytes read
+ """
+ if timeout == -1:
+ timeout = self.timeout
+ try:
+ with self._timeout(timeout):
+ s = self.socket.recv(size)
+ if s == b'':
+ self.flag_eof = True
+ raise EOF("Socket closed")
+ return s
+ except socket.timeout:
+ raise TIMEOUT("Timeout exceeded.")
diff --git a/pexpect/spawnbase.py b/pexpect/spawnbase.py
index aacb63a..abf8071 100644
--- a/pexpect/spawnbase.py
+++ b/pexpect/spawnbase.py
@@ -141,6 +141,16 @@ class SpawnBase(object):
return s.encode('ascii')
return s
+ # In bytes mode, regex patterns should also be of bytes type
+ def _coerce_expect_re(self, r):
+ p = r.pattern
+ if self.encoding is None and not isinstance(p, bytes):
+ return re.compile(p.encode('utf-8'))
+ # And vice-versa
+ elif self.encoding is not None and isinstance(p, bytes):
+ return re.compile(p.decode('utf-8'))
+ return r
+
def _coerce_send_string(self, s):
if self.encoding is None and not isinstance(s, bytes):
return s.encode('utf-8')
@@ -235,6 +245,7 @@ class SpawnBase(object):
elif p is TIMEOUT:
compiled_pattern_list.append(TIMEOUT)
elif isinstance(p, type(re.compile(''))):
+ p = self._coerce_expect_re(p)
compiled_pattern_list.append(p)
else:
self._pattern_type_err(p)
diff --git a/requirements-testing.txt b/requirements-testing.txt
index 1894122..c47d7b1 100644
--- a/requirements-testing.txt
+++ b/requirements-testing.txt
@@ -2,4 +2,3 @@ pytest
pytest-cov
coverage
coveralls
-pytest-capturelog
diff --git a/tests/PexpectTestCase.py b/tests/PexpectTestCase.py
index 5d7a168..a762d8f 100644
--- a/tests/PexpectTestCase.py
+++ b/tests/PexpectTestCase.py
@@ -49,22 +49,25 @@ class PexpectTestCase(unittest.TestCase):
print('\n', self.id(), end=' ')
sys.stdout.flush()
- # some build agents will ignore SIGHUP and SIGINT, which python
- # inherits. This causes some of the tests related to terminate()
- # to fail. We set them to the default handlers that they should
- # be, and restore them back to their SIG_IGN value on tearDown.
- #
- # I'm not entirely convinced they need to be restored, only our
- # test runner is affected.
- self.restore_ignored_signals = [
- value for value in (signal.SIGHUP, signal.SIGINT,)
- if signal.getsignal(value) == signal.SIG_IGN]
- if signal.SIGHUP in self.restore_ignored_signals:
- # sighup should be set to default handler
- signal.signal(signal.SIGHUP, signal.SIG_DFL)
- if signal.SIGINT in self.restore_ignored_signals:
- # SIGINT should be set to signal.default_int_handler
- signal.signal(signal.SIGINT, signal.default_int_handler)
+ if sys.platform != 'win32':
+ # some build agents will ignore SIGHUP and SIGINT, which python
+ # inherits. This causes some of the tests related to terminate()
+ # to fail. We set them to the default handlers that they should
+ # be, and restore them back to their SIG_IGN value on tearDown.
+ #
+ # I'm not entirely convinced they need to be restored, only our
+ # test runner is affected.
+ self.restore_ignored_signals = [
+ value for value in (signal.SIGHUP, signal.SIGINT,)
+ if signal.getsignal(value) == signal.SIG_IGN]
+ if signal.SIGHUP in self.restore_ignored_signals:
+ # sighup should be set to default handler
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
+ if signal.SIGINT in self.restore_ignored_signals:
+ # SIGINT should be set to signal.default_int_handler
+ signal.signal(signal.SIGINT, signal.default_int_handler)
+ else:
+ self.restore_ignored_signals = []
unittest.TestCase.setUp(self)
def tearDown(self):
diff --git a/tests/deprecated_test_filedescriptor.py b/tests/deprecated_test_filedescriptor.py
index 6b0ef3e..cd930cf 100755
--- a/tests/deprecated_test_filedescriptor.py
+++ b/tests/deprecated_test_filedescriptor.py
@@ -72,7 +72,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
#fout = open('delete_me_1','wb')
#fout.write(the_old_way)
diff --git a/tests/deprecated_test_run_out_of_pty.py b/tests/deprecated_test_run_out_of_pty.py
index 3090147..b34094e 100755
--- a/tests/deprecated_test_run_out_of_pty.py
+++ b/tests/deprecated_test_run_out_of_pty.py
@@ -47,5 +47,5 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/fakessh/ssh b/tests/fakessh/ssh
index 4a5be1b..74ffe20 100755
--- a/tests/fakessh/ssh
+++ b/tests/fakessh/ssh
@@ -62,7 +62,7 @@ prompt = "$"
while True:
cmd = input(prompt)
if cmd.startswith('PS1='):
- prompt = eval(cmd[4:]).replace('\$', '$')
+ prompt = eval(cmd[4:]).replace(r'\$', '$')
elif cmd == 'ping':
print('pong')
elif cmd.startswith('ls'):
diff --git a/tests/test_ansi.py b/tests/test_ansi.py
index 3d73fe8..a49c663 100755
--- a/tests/test_ansi.py
+++ b/tests/test_ansi.py
@@ -236,5 +236,5 @@ class ansiTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ansiTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ansiTestCase)
diff --git a/tests/test_command_list_split.py b/tests/test_command_list_split.py
index 370f46e..eeaf6c0 100755
--- a/tests/test_command_list_split.py
+++ b/tests/test_command_list_split.py
@@ -37,4 +37,4 @@ class SplitCommandLineTestCase(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(SplitCommandLineTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(SplitCommandLineTestCase)
diff --git a/tests/test_constructor.py b/tests/test_constructor.py
index 98c473a..1b4d717 100755
--- a/tests/test_constructor.py
+++ b/tests/test_constructor.py
@@ -44,5 +44,5 @@ class TestCaseConstructor(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCaseConstructor,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCaseConstructor)
diff --git a/tests/test_ctrl_chars.py b/tests/test_ctrl_chars.py
index 032027c..11fb55c 100755
--- a/tests/test_ctrl_chars.py
+++ b/tests/test_ctrl_chars.py
@@ -26,8 +26,9 @@ from . import PexpectTestCase
import time
import sys
-from ptyprocess import ptyprocess
-ptyprocess._make_eof_intr()
+if sys.platform != 'win32':
+ from ptyprocess import ptyprocess
+ ptyprocess._make_eof_intr()
if sys.version_info[0] >= 3:
def byte(i):
@@ -124,5 +125,5 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCtrlChars,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCtrlChars)
diff --git a/tests/test_destructor.py b/tests/test_destructor.py
index d27b6f6..01d89a0 100755
--- a/tests/test_destructor.py
+++ b/tests/test_destructor.py
@@ -80,5 +80,5 @@ class TestCaseDestructor(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCaseDestructor,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCaseDestructor)
diff --git a/tests/test_dotall.py b/tests/test_dotall.py
index 68aef3f..44c58c5 100755
--- a/tests/test_dotall.py
+++ b/tests/test_dotall.py
@@ -39,5 +39,5 @@ class TestCaseDotall(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCaseDotall,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCaseDotall)
diff --git a/tests/test_expect.py b/tests/test_expect.py
index 5553d28..c16e055 100755
--- a/tests/test_expect.py
+++ b/tests/test_expect.py
@@ -25,11 +25,14 @@ import time
import signal
import sys
import os
+import re
import pexpect
from . import PexpectTestCase
from .utils import no_coverage_env
+PY3 = bool(sys.version_info.major >= 3)
+
# Many of these test cases blindly assume that sequential directory
# listings of the /bin directory will yield the same results.
# This may not be true, but seems adequate for testing now.
@@ -101,6 +104,77 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
p.sendeof ()
p.expect (pexpect.EOF)
+ def _select_types(self, encoding=None):
+ if encoding is None:
+ if PY3:
+ expect_string = 'String'
+ expected_type = bytes
+ else:
+ expect_string = u'String'
+ expected_type = str
+ else:
+ if PY3:
+ expect_string = b'String'
+ expected_type = str
+ else:
+ expect_string = 'String'
+ expected_type = unicode
+ return re.compile(expect_string), expected_type
+
+ def test_coerce_expect_re_enc_none (self):
+ '''This test that compiled regex patterns will always be bytes type
+ when spawn objects have no encoding or encoding=None
+ '''
+ r, expected_type = self._select_types()
+ p = pexpect.spawn('true')
+ c = pexpect.spawnbase.SpawnBase._coerce_expect_re(p, r)
+ self.assertIsInstance(c.pattern, expected_type)
+ p.expect (pexpect.EOF)
+
+ def test_coerce_expect_re_enc_ascii (self):
+ '''This test that compiled regex patterns won't ever be bytes type
+ when spawn objects have ascii encoding
+ '''
+ r, expected_type = self._select_types('ascii')
+ p = pexpect.spawn('true', encoding='ascii')
+ c = pexpect.spawnbase.SpawnBase._coerce_expect_re(p, r)
+ self.assertIsInstance(c.pattern, expected_type)
+ p.expect (pexpect.EOF)
+
+ def test_coerce_expect_re_enc_utf8 (self):
+ '''This test that compiled regex patterns won't ever be bytes type
+ when spawn objects have utf-8 encoding
+ '''
+ r, expected_type = self._select_types('utf-8')
+ p = pexpect.spawn('true', encoding='utf-8')
+ c = pexpect.spawnbase.SpawnBase._coerce_expect_re(p, r)
+ self.assertIsInstance(c.pattern, expected_type)
+ p.expect (pexpect.EOF)
+
+ def test_expect_regex_enc_none (self):
+ '''This test that bytes mode spawn objects (encoding=None)
+ parses correctly regex patterns compiled from non-bytes type objects
+ '''
+ p = pexpect.spawn('cat', echo=False, timeout=5)
+ p.sendline ('We are the Knights who say "Ni!"')
+ index = p.expect ([re.compile('We are the Knights who say "Ni!"'),
+ pexpect.EOF, pexpect.TIMEOUT])
+ self.assertEqual(index, 0)
+ p.sendeof ()
+ p.expect_exact (pexpect.EOF)
+
+ def test_expect_regex_enc_utf8 (self):
+ '''This test that non-bytes mode spawn objects (encoding='utf-8')
+ parses correctly regex patterns compiled from bytes type objects
+ '''
+ p = pexpect.spawn('cat', echo=False, timeout=5, encoding='utf-8')
+ p.sendline ('We are the Knights who say "Ni!"')
+ index = p.expect ([re.compile(b'We are the Knights who say "Ni!"'),
+ pexpect.EOF, pexpect.TIMEOUT])
+ self.assertEqual(index, 0)
+ p.sendeof ()
+ p.expect_exact (pexpect.EOF)
+
def test_expect_order (self):
'''This tests that patterns are matched in the same order as given in the pattern_list.
@@ -640,4 +714,4 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/test_filedescriptor.py b/tests/test_filedescriptor.py
index d9164e1..3f9d954 100755
--- a/tests/test_filedescriptor.py
+++ b/tests/test_filedescriptor.py
@@ -69,4 +69,4 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/test_interact.py b/tests/test_interact.py
index 4afbd18..7205d80 100755
--- a/tests/test_interact.py
+++ b/tests/test_interact.py
@@ -62,8 +62,8 @@ class InteractTestCase (PexpectTestCase.PexpectTestCase):
p.sendcontrol(']')
p.expect('29<STOP>')
p.send('\x00')
- if not os.environ.get('TRAVIS', None):
- # on Travis-CI, we sometimes miss trailing stdout from the
+ if not os.environ.get('CI', None):
+ # on CI platforms, we sometimes miss trailing stdout from the
# chain of child processes, not entirely sure why. So this
# is skipped on such systems.
p.expect('0<STOP>')
@@ -84,8 +84,8 @@ class InteractTestCase (PexpectTestCase.PexpectTestCase):
p.expect('206<STOP>') # [206, 146]
p.expect('146<STOP>')
p.send('\x00')
- if not os.environ.get('TRAVIS', None):
- # on Travis-CI, we sometimes miss trailing stdout from the
+ if not os.environ.get('CI', None):
+ # on CI platforms, we sometimes miss trailing stdout from the
# chain of child processes, not entirely sure why. So this
# is skipped on such systems.
p.expect('0<STOP>')
@@ -97,5 +97,5 @@ class InteractTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(InteractTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(InteractTestCase)
diff --git a/tests/test_isalive.py b/tests/test_isalive.py
index 5e3021e..ba2b5d4 100755
--- a/tests/test_isalive.py
+++ b/tests/test_isalive.py
@@ -121,5 +121,5 @@ class IsAliveTestCase(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(IsAliveTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(IsAliveTestCase)
diff --git a/tests/test_log.py b/tests/test_log.py
index 4ad2256..e2defff 100755
--- a/tests/test_log.py
+++ b/tests/test_log.py
@@ -104,5 +104,5 @@ class TestCaseLog(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCaseLog,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCaseLog)
diff --git a/tests/test_misc.py b/tests/test_misc.py
index 3dc2b6e..5c81f92 100755
--- a/tests/test_misc.py
+++ b/tests/test_misc.py
@@ -370,4 +370,4 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCaseMisc,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCaseMisc)
diff --git a/tests/test_missing_command.py b/tests/test_missing_command.py
index 92e4733..3775632 100755
--- a/tests/test_missing_command.py
+++ b/tests/test_missing_command.py
@@ -34,5 +34,5 @@ class MissingCommandTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(MissingCommandTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(MissingCommandTestCase)
diff --git a/tests/test_performance.py b/tests/test_performance.py
index d7e2cd6..05027a0 100755
--- a/tests/test_performance.py
+++ b/tests/test_performance.py
@@ -110,4 +110,4 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == "__main__":
unittest.main()
-suite = unittest.makeSuite(PerformanceTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(PerformanceTestCase)
diff --git a/tests/test_popen_spawn.py b/tests/test_popen_spawn.py
index 10d7032..6168148 100644
--- a/tests/test_popen_spawn.py
+++ b/tests/test_popen_spawn.py
@@ -136,4 +136,4 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/test_pxssh.py b/tests/test_pxssh.py
index c6ec4e2..ba700c8 100644
--- a/tests/test_pxssh.py
+++ b/tests/test_pxssh.py
@@ -1,10 +1,12 @@
#!/usr/bin/env python
+import sys
import os
import shutil
import tempfile
import unittest
-from pexpect import pxssh
+if sys.platform != 'win32':
+ from pexpect import pxssh
from .PexpectTestCase import PexpectTestCase
class SSHTestBase(PexpectTestCase):
diff --git a/tests/test_replwrap.py b/tests/test_replwrap.py
index 73c82e8..9e95112 100644
--- a/tests/test_replwrap.py
+++ b/tests/test_replwrap.py
@@ -39,7 +39,7 @@ class REPLWrapTestCase(unittest.TestCase):
" PAGER is set to cat, to prevent timeout in ``man sleep``. "
bash = replwrap.bash()
res = bash.run_command('man sleep', timeout=5)
- assert 'SLEEP' in res, res
+ assert 'SLEEP' in res.upper(), res
def test_bash_env(self):
"""env, which displays PS1=..., should not mess up finding the prompt.
diff --git a/tests/test_run.py b/tests/test_run.py
index f750fb2..15d0c40 100755
--- a/tests/test_run.py
+++ b/tests/test_run.py
@@ -22,7 +22,6 @@ PEXPECT LICENSE
import pexpect
import unittest
import subprocess
-import tempfile
import sys
import os
from . import PexpectTestCase
@@ -53,21 +52,17 @@ def function_events_callback(values):
class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
- runfunc = staticmethod(pexpect.run)
+ if sys.platform != 'win32':
+ runfunc = staticmethod(pexpect.run)
cr = b'\r'
empty = b''
prep_subprocess_out = staticmethod(lambda x: x)
def setUp(self):
- fd, self.rcfile = tempfile.mkstemp()
- os.write(fd, b'PS1=GO: \n')
- os.close(fd)
+ self.runenv = os.environ.copy()
+ self.runenv['PS1'] = 'GO:'
super(RunFuncTestCase, self).setUp()
- def tearDown(self):
- os.unlink(self.rcfile)
- super(RunFuncTestCase, self).tearDown()
-
def test_run_exit(self):
(data, exitstatus) = self.runfunc(sys.executable + ' exit1.py', withexitstatus=1)
assert exitstatus == 1, "Exit status of 'python exit1.py' should be 1."
@@ -106,9 +101,10 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
]
(data, exitstatus) = pexpect.run(
- 'bash --rcfile {0}'.format(self.rcfile),
+ 'bash --norc',
withexitstatus=True,
events=events,
+ env=self.runenv,
timeout=10)
assert exitstatus == 0
@@ -118,9 +114,10 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
]
(data, exitstatus) = pexpect.run(
- 'bash --rcfile {0}'.format(self.rcfile),
+ 'bash --norc',
withexitstatus=True,
events=events,
+ env=self.runenv,
timeout=10)
assert exitstatus == 0
@@ -130,18 +127,20 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
]
(data, exitstatus) = pexpect.run(
- 'bash --rcfile {0}'.format(self.rcfile),
+ 'bash --norc',
withexitstatus=True,
events=events,
+ env=self.runenv,
timeout=10)
assert exitstatus == 0
def test_run_event_typeerror(self):
events = [('GO:', -1)]
with self.assertRaises(TypeError):
- pexpect.run('bash --rcfile {0}'.format(self.rcfile),
+ pexpect.run('bash --norc',
withexitstatus=True,
events=events,
+ env=self.runenv,
timeout=10)
def _method_events_callback(self, values):
@@ -162,7 +161,8 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
class RunUnicodeFuncTestCase(RunFuncTestCase):
- runfunc = staticmethod(pexpect.runu)
+ if sys.platform != 'win32':
+ runfunc = staticmethod(pexpect.runu)
cr = b'\r'.decode('ascii')
empty = b''.decode('ascii')
prep_subprocess_out = staticmethod(lambda x: x.decode('utf-8', 'replace'))
diff --git a/tests/test_screen.py b/tests/test_screen.py
index 2429e57..9e275bc 100755
--- a/tests/test_screen.py
+++ b/tests/test_screen.py
@@ -282,6 +282,6 @@ class screenTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(screenTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(screenTestCase)
diff --git a/tests/test_socket.py b/tests/test_socket.py
index a8c8595..b801b00 100644
--- a/tests/test_socket.py
+++ b/tests/test_socket.py
@@ -19,7 +19,7 @@ PEXPECT LICENSE
'''
import pexpect
-from pexpect import fdpexpect
+from pexpect import socket_pexpect
import unittest
from . import PexpectTestCase
import multiprocessing
@@ -133,12 +133,16 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
pass
exit(0)
+ def spawn(self, socket, timeout=30, use_poll=False):
+ """override me with other ways of spawning on a socket"""
+ return socket_pexpect.SocketSpawn(socket, timeout=timeout, use_poll=use_poll)
+
def socket_fn(self, timed_out, all_read):
result = 0
try:
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock, timeout=10)
+ session = self.spawn(sock, timeout=10)
# Get all data from server
session.read_nonblocking(size=4096)
all_read.set()
@@ -152,7 +156,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_socket(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
+ session = self.spawn(sock, timeout=10)
session.expect(self.prompt1)
self.assertEqual(session.before, self.motd)
session.send(self.enter)
@@ -166,7 +170,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_socket_with_write(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
+ session = self.spawn(sock, timeout=10)
session.expect(self.prompt1)
self.assertEqual(session.before, self.motd)
session.write(self.enter)
@@ -177,19 +181,11 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
session.expect(pexpect.EOF)
self.assertEqual(session.before, b'')
- def test_not_int(self):
- with self.assertRaises(pexpect.ExceptionPexpect):
- session = fdpexpect.fdspawn('bogus', timeout=10)
-
- def test_not_file_descriptor(self):
- with self.assertRaises(pexpect.ExceptionPexpect):
- session = fdpexpect.fdspawn(-1, timeout=10)
-
def test_timeout(self):
with self.assertRaises(pexpect.TIMEOUT):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock, timeout=10)
+ session = self.spawn(sock, timeout=10)
session.expect(b'Bogus response')
def test_interrupt(self):
@@ -223,7 +219,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_maxread(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
+ session = self.spawn(sock, timeout=10)
session.maxread = 1100
session.expect(self.prompt1)
self.assertEqual(session.before, self.motd)
@@ -238,7 +234,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_fd_isalive(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
+ session = self.spawn(sock, timeout=10)
assert session.isalive()
sock.close()
assert not session.isalive(), "Should not be alive after close()"
@@ -246,7 +242,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_fd_isalive_poll(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10, use_poll=True)
+ session = self.spawn(sock, timeout=10, use_poll=True)
assert session.isalive()
sock.close()
assert not session.isalive(), "Should not be alive after close()"
@@ -254,27 +250,19 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_fd_isatty(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
+ session = self.spawn(sock, timeout=10)
assert not session.isatty()
session.close()
def test_fd_isatty_poll(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10, use_poll=True)
+ session = self.spawn(sock, timeout=10, use_poll=True)
assert not session.isatty()
session.close()
- def test_fileobj(self):
- sock = socket.socket(self.af, socket.SOCK_STREAM)
- sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock, timeout=10) # Should get the fileno from the socket
- session.expect(self.prompt1)
- session.close()
- assert not session.isalive()
- session.close() # Smoketest - should be able to call this again
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/test_socket_fd.py b/tests/test_socket_fd.py
new file mode 100644
index 0000000..5be733c
--- /dev/null
+++ b/tests/test_socket_fd.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+'''
+PEXPECT LICENSE
+
+ This license is approved by the OSI and FSF as GPL-compatible.
+ http://opensource.org/licenses/isc-license.txt
+
+ Copyright (c) 2012, Noah Spurrier <noah@noah.org>
+ PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
+ PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
+ COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+'''
+import pexpect
+from pexpect import fdpexpect
+import unittest
+from . import test_socket
+import multiprocessing
+import os
+import signal
+import socket
+import time
+import errno
+
+
+class SocketServerError(Exception):
+ pass
+
+
+class ExpectTestCase(test_socket.ExpectTestCase):
+ """ duplicate of test_socket, but using fdpexpect rather than socket_expect """
+
+ def spawn(self, socket, timeout=30, use_poll=False):
+ return fdpexpect.fdspawn(socket.fileno(), timeout=timeout, use_poll=use_poll)
+
+ def test_not_int(self):
+ with self.assertRaises(pexpect.ExceptionPexpect):
+ session = fdpexpect.fdspawn('bogus', timeout=10)
+
+ def test_not_file_descriptor(self):
+ with self.assertRaises(pexpect.ExceptionPexpect):
+ session = fdpexpect.fdspawn(-1, timeout=10)
+
+ def test_fileobj(self):
+ sock = socket.socket(self.af, socket.SOCK_STREAM)
+ sock.connect((self.host, self.port))
+ session = fdpexpect.fdspawn(sock, timeout=10) # Should get the fileno from the socket
+ session.expect(self.prompt1)
+ session.close()
+ assert not session.isalive()
+ session.close() # Smoketest - should be able to call this again
+
+
+if __name__ == '__main__':
+ unittest.main()
+
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/test_socket_pexpect.py b/tests/test_socket_pexpect.py
new file mode 100644
index 0000000..8fbcebf
--- /dev/null
+++ b/tests/test_socket_pexpect.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+'''
+PEXPECT LICENSE
+
+ This license is approved by the OSI and FSF as GPL-compatible.
+ http://opensource.org/licenses/isc-license.txt
+
+ Copyright (c) 2012, Noah Spurrier <noah@noah.org>
+ PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
+ PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
+ COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+'''
+import pexpect
+from pexpect import socket_pexpect
+import unittest
+from . import PexpectTestCase
+import socket
+
+def open_file_socket(filename):
+ read_socket, write_socket = socket.socketpair()
+ with open(filename, "rb") as file:
+ write_socket.sendall(file.read())
+ write_socket.close()
+ return read_socket
+
+class ExpectTestCase(PexpectTestCase.PexpectTestCase):
+ def setUp(self):
+ print(self.id())
+ PexpectTestCase.PexpectTestCase.setUp(self)
+
+ def test_socket (self):
+ socket = open_file_socket('TESTDATA.txt')
+ s = socket_pexpect.SocketSpawn(socket)
+ s.expect(b'This is the end of test data:')
+ s.expect(pexpect.EOF)
+ self.assertEqual(s.before, b' END\n')
+
+ def test_maxread (self):
+ socket = open_file_socket('TESTDATA.txt')
+ s = socket_pexpect.SocketSpawn(socket)
+ s.maxread = 100
+ s.expect('2')
+ s.expect ('This is the end of test data:')
+ s.expect (pexpect.EOF)
+ self.assertEqual(s.before, b' END\n')
+
+ def test_socket_isalive (self):
+ socket = open_file_socket('TESTDATA.txt')
+ s = socket_pexpect.SocketSpawn(socket)
+ assert s.isalive()
+ s.close()
+ assert not s.isalive(), "Should not be alive after close()"
+
+ def test_socket_isatty (self):
+ socket = open_file_socket('TESTDATA.txt')
+ s = socket_pexpect.SocketSpawn(socket)
+ assert not s.isatty()
+ s.close()
+
+
+if __name__ == '__main__':
+ unittest.main()
+
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/test_timeout_pattern.py b/tests/test_timeout_pattern.py
index 5f610ef..35d4816 100755
--- a/tests/test_timeout_pattern.py
+++ b/tests/test_timeout_pattern.py
@@ -89,4 +89,4 @@ class Exp_TimeoutTestCase(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(Exp_TimeoutTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(Exp_TimeoutTestCase)
diff --git a/tests/test_unicode.py b/tests/test_unicode.py
index 9b5b988..6103167 100644
--- a/tests/test_unicode.py
+++ b/tests/test_unicode.py
@@ -184,4 +184,4 @@ class UnicodeTests(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(UnicodeTests, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(UnicodeTests)
diff --git a/tests/test_winsize.py b/tests/test_winsize.py
index be16773..6fc78ce 100755
--- a/tests/test_winsize.py
+++ b/tests/test_winsize.py
@@ -55,6 +55,6 @@ class TestCaseWinsize(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCaseWinsize,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCaseWinsize)