summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/ci.yml40
-rw-r--r--doc/api/index.rst1
-rw-r--r--doc/api/socket_pexpect.rst20
-rw-r--r--pexpect/_async.py131
-rw-r--r--pexpect/_async_pre_await.py111
-rw-r--r--pexpect/_async_w_await.py118
-rw-r--r--pexpect/fdpexpect.py6
-rw-r--r--pexpect/pxssh.py5
-rw-r--r--pexpect/socket_pexpect.py145
-rw-r--r--requirements-testing.txt1
-rw-r--r--tests/PexpectTestCase.py37
-rwxr-xr-xtests/deprecated_test_filedescriptor.py2
-rwxr-xr-xtests/deprecated_test_run_out_of_pty.py2
-rwxr-xr-xtests/fakessh/ssh2
-rwxr-xr-xtests/test_ansi.py2
-rwxr-xr-xtests/test_command_list_split.py2
-rwxr-xr-xtests/test_constructor.py2
-rwxr-xr-xtests/test_ctrl_chars.py7
-rwxr-xr-xtests/test_destructor.py2
-rwxr-xr-xtests/test_dotall.py2
-rwxr-xr-xtests/test_expect.py10
-rwxr-xr-xtests/test_filedescriptor.py2
-rwxr-xr-xtests/test_interact.py10
-rwxr-xr-xtests/test_isalive.py2
-rwxr-xr-xtests/test_log.py2
-rwxr-xr-xtests/test_misc.py6
-rwxr-xr-xtests/test_missing_command.py2
-rwxr-xr-xtests/test_performance.py2
-rw-r--r--tests/test_popen_spawn.py10
-rw-r--r--tests/test_pxssh.py4
-rwxr-xr-xtests/test_run.py28
-rwxr-xr-xtests/test_screen.py2
-rw-r--r--tests/test_socket.py42
-rw-r--r--tests/test_socket_fd.py64
-rw-r--r--tests/test_socket_pexpect.py72
-rwxr-xr-xtests/test_timeout_pattern.py2
-rw-r--r--tests/test_unicode.py2
-rwxr-xr-xtests/test_winsize.py2
38 files changed, 697 insertions, 205 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 0000000..dbfbbea
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,40 @@
+name: CI
+
+on:
+ push:
+ branches: [ master ]
+ pull_request:
+ branches: [ master ]
+
+jobs:
+ test:
+ runs-on: ubuntu-22.04
+ strategy:
+ fail-fast: false
+ matrix:
+ python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "pypy3.9"]
+
+ steps:
+ - uses: actions/checkout@v3
+
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v4
+ with:
+ python-version: ${{ matrix.python-version }}
+
+ - name: Install packages
+ run: |
+ export PYTHONIOENCODING=UTF8
+ pip install coveralls pytest-cov ptyprocess
+
+ - name: Run tests
+ run: |
+ ./tools/display-sighandlers.py
+ ./tools/display-terminalinfo.py
+ py.test --cov pexpect --cov-config .coveragerc
+
+ - name: Check coverage
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ run: |
+ coveralls --service=github
diff --git a/doc/api/index.rst b/doc/api/index.rst
index 5277d1c..747c812 100644
--- a/doc/api/index.rst
+++ b/doc/api/index.rst
@@ -6,6 +6,7 @@ API documentation
pexpect
fdpexpect
+ socket_pexpect
popen_spawn
replwrap
pxssh
diff --git a/doc/api/socket_pexpect.rst b/doc/api/socket_pexpect.rst
new file mode 100644
index 0000000..726a999
--- /dev/null
+++ b/doc/api/socket_pexpect.rst
@@ -0,0 +1,20 @@
+socket_pexpect - use pexpect with a socket
+==========================================
+
+.. automodule:: pexpect.socket_pexpect
+
+SocketSpawn class
+-----------------
+
+.. autoclass:: SocketSpawn
+ :show-inheritance:
+
+ .. automethod:: __init__
+ .. automethod:: isalive
+ .. automethod:: close
+
+ .. method:: expect
+ expect_exact
+ expect_list
+
+ As :class:`pexpect.spawn`.
diff --git a/pexpect/_async.py b/pexpect/_async.py
index dfbfeef..261720c 100644
--- a/pexpect/_async.py
+++ b/pexpect/_async.py
@@ -1,103 +1,28 @@
-import asyncio
-import errno
-import signal
-
-from pexpect import EOF
-
-@asyncio.coroutine
-def expect_async(expecter, timeout=None):
- # First process data that was previously read - if it maches, we don't need
- # async stuff.
- idx = expecter.existing_data()
- if idx is not None:
- return idx
- if not expecter.spawn.async_pw_transport:
- pw = PatternWaiter()
- pw.set_expecter(expecter)
- transport, pw = yield from asyncio.get_event_loop()\
- .connect_read_pipe(lambda: pw, expecter.spawn)
- expecter.spawn.async_pw_transport = pw, transport
- else:
- pw, transport = expecter.spawn.async_pw_transport
- pw.set_expecter(expecter)
- transport.resume_reading()
- try:
- return (yield from asyncio.wait_for(pw.fut, timeout))
- except asyncio.TimeoutError as e:
- transport.pause_reading()
- return expecter.timeout(e)
-
-@asyncio.coroutine
-def repl_run_command_async(repl, cmdlines, timeout=-1):
- res = []
- repl.child.sendline(cmdlines[0])
- for line in cmdlines[1:]:
- yield from repl._expect_prompt(timeout=timeout, async_=True)
- res.append(repl.child.before)
- repl.child.sendline(line)
-
- # Command was fully submitted, now wait for the next prompt
- prompt_idx = yield from repl._expect_prompt(timeout=timeout, async_=True)
- if prompt_idx == 1:
- # We got the continuation prompt - command was incomplete
- repl.child.kill(signal.SIGINT)
- yield from repl._expect_prompt(timeout=1, async_=True)
- raise ValueError("Continuation prompt found - input was incomplete:")
- return u''.join(res + [repl.child.before])
-
-class PatternWaiter(asyncio.Protocol):
- transport = None
-
- def set_expecter(self, expecter):
- self.expecter = expecter
- self.fut = asyncio.Future()
-
- def found(self, result):
- if not self.fut.done():
- self.fut.set_result(result)
- self.transport.pause_reading()
-
- def error(self, exc):
- if not self.fut.done():
- self.fut.set_exception(exc)
- self.transport.pause_reading()
-
- def connection_made(self, transport):
- self.transport = transport
-
- def data_received(self, data):
- spawn = self.expecter.spawn
- s = spawn._decoder.decode(data)
- spawn._log(s, 'read')
-
- if self.fut.done():
- spawn._before.write(s)
- spawn._buffer.write(s)
- return
-
- try:
- index = self.expecter.new_data(s)
- if index is not None:
- # Found a match
- self.found(index)
- except Exception as e:
- self.expecter.errored()
- self.error(e)
-
- def eof_received(self):
- # N.B. If this gets called, async will close the pipe (the spawn object)
- # for us
- try:
- self.expecter.spawn.flag_eof = True
- index = self.expecter.eof()
- except EOF as e:
- self.error(e)
- else:
- self.found(index)
-
- def connection_lost(self, exc):
- if isinstance(exc, OSError) and exc.errno == errno.EIO:
- # We may get here without eof_received being called, e.g on Linux
- self.eof_received()
- elif exc is not None:
- self.error(exc)
+"""Facade that provides coroutines implementation pertinent to running Py version.
+
+Python 3.5 introduced the async def/await syntax keyword.
+With later versions coroutines and methods to get the running asyncio loop are
+being deprecated, not supported anymore.
+
+For Python versions later than 3.6, coroutines and objects that are defined via
+``async def``/``await`` keywords are imported.
+
+Here the code is just imported, to provide the same interface to older code.
+"""
+# pylint: disable=unused-import
+# flake8: noqa: F401
+from sys import version_info as py_version_info
+
+# this assumes async def/await are more stable
+if py_version_info >= (3, 6):
+ from pexpect._async_w_await import (
+ PatternWaiter,
+ expect_async,
+ repl_run_command_async,
+ )
+else:
+ from pexpect._async_pre_await import (
+ PatternWaiter,
+ expect_async,
+ repl_run_command_async,
+ )
diff --git a/pexpect/_async_pre_await.py b/pexpect/_async_pre_await.py
new file mode 100644
index 0000000..81ece1b
--- /dev/null
+++ b/pexpect/_async_pre_await.py
@@ -0,0 +1,111 @@
+"""Implementation of coroutines without using ``async def``/``await`` keywords.
+
+``@asyncio.coroutine`` and ``yield from`` are used here instead.
+"""
+import asyncio
+import errno
+import signal
+
+from pexpect import EOF
+
+
+@asyncio.coroutine
+def expect_async(expecter, timeout=None):
+ # First process data that was previously read - if it maches, we don't need
+ # async stuff.
+ idx = expecter.existing_data()
+ if idx is not None:
+ return idx
+ if not expecter.spawn.async_pw_transport:
+ pw = PatternWaiter()
+ pw.set_expecter(expecter)
+ transport, pw = yield from asyncio.get_event_loop().connect_read_pipe(
+ lambda: pw, expecter.spawn
+ )
+ expecter.spawn.async_pw_transport = pw, transport
+ else:
+ pw, transport = expecter.spawn.async_pw_transport
+ pw.set_expecter(expecter)
+ transport.resume_reading()
+ try:
+ return (yield from asyncio.wait_for(pw.fut, timeout))
+ except asyncio.TimeoutError as e:
+ transport.pause_reading()
+ return expecter.timeout(e)
+
+
+@asyncio.coroutine
+def repl_run_command_async(repl, cmdlines, timeout=-1):
+ res = []
+ repl.child.sendline(cmdlines[0])
+ for line in cmdlines[1:]:
+ yield from repl._expect_prompt(timeout=timeout, async_=True)
+ res.append(repl.child.before)
+ repl.child.sendline(line)
+
+ # Command was fully submitted, now wait for the next prompt
+ prompt_idx = yield from repl._expect_prompt(timeout=timeout, async_=True)
+ if prompt_idx == 1:
+ # We got the continuation prompt - command was incomplete
+ repl.child.kill(signal.SIGINT)
+ yield from repl._expect_prompt(timeout=1, async_=True)
+ raise ValueError("Continuation prompt found - input was incomplete:")
+ return "".join(res + [repl.child.before])
+
+
+class PatternWaiter(asyncio.Protocol):
+ transport = None
+
+ def set_expecter(self, expecter):
+ self.expecter = expecter
+ self.fut = asyncio.Future()
+
+ def found(self, result):
+ if not self.fut.done():
+ self.fut.set_result(result)
+ self.transport.pause_reading()
+
+ def error(self, exc):
+ if not self.fut.done():
+ self.fut.set_exception(exc)
+ self.transport.pause_reading()
+
+ def connection_made(self, transport):
+ self.transport = transport
+
+ def data_received(self, data):
+ spawn = self.expecter.spawn
+ s = spawn._decoder.decode(data)
+ spawn._log(s, "read")
+
+ if self.fut.done():
+ spawn._before.write(s)
+ spawn._buffer.write(s)
+ return
+
+ try:
+ index = self.expecter.new_data(s)
+ if index is not None:
+ # Found a match
+ self.found(index)
+ except Exception as e:
+ self.expecter.errored()
+ self.error(e)
+
+ def eof_received(self):
+ # N.B. If this gets called, async will close the pipe (the spawn object)
+ # for us
+ try:
+ self.expecter.spawn.flag_eof = True
+ index = self.expecter.eof()
+ except EOF as e:
+ self.error(e)
+ else:
+ self.found(index)
+
+ def connection_lost(self, exc):
+ if isinstance(exc, OSError) and exc.errno == errno.EIO:
+ # We may get here without eof_received being called, e.g on Linux
+ self.eof_received()
+ elif exc is not None:
+ self.error(exc)
diff --git a/pexpect/_async_w_await.py b/pexpect/_async_w_await.py
new file mode 100644
index 0000000..59cb1ef
--- /dev/null
+++ b/pexpect/_async_w_await.py
@@ -0,0 +1,118 @@
+"""Implementation of coroutines using ``async def``/``await`` keywords.
+
+These keywords replaced ``@asyncio.coroutine`` and ``yield from`` from
+Python 3.5 onwards.
+"""
+import asyncio
+import errno
+import signal
+from sys import version_info as py_version_info
+
+from pexpect import EOF
+
+if py_version_info >= (3, 7):
+ # get_running_loop, new in 3.7, is preferred to get_event_loop
+ _loop_getter = asyncio.get_running_loop
+else:
+ # Deprecation warning since 3.10
+ _loop_getter = asyncio.get_event_loop
+
+
+async def expect_async(expecter, timeout=None):
+ # First process data that was previously read - if it maches, we don't need
+ # async stuff.
+ idx = expecter.existing_data()
+ if idx is not None:
+ return idx
+ if not expecter.spawn.async_pw_transport:
+ pattern_waiter = PatternWaiter()
+ pattern_waiter.set_expecter(expecter)
+ transport, pattern_waiter = await _loop_getter().connect_read_pipe(
+ lambda: pattern_waiter, expecter.spawn
+ )
+ expecter.spawn.async_pw_transport = pattern_waiter, transport
+ else:
+ pattern_waiter, transport = expecter.spawn.async_pw_transport
+ pattern_waiter.set_expecter(expecter)
+ transport.resume_reading()
+ try:
+ return await asyncio.wait_for(pattern_waiter.fut, timeout)
+ except asyncio.TimeoutError as exc:
+ transport.pause_reading()
+ return expecter.timeout(exc)
+
+
+async def repl_run_command_async(repl, cmdlines, timeout=-1):
+ res = []
+ repl.child.sendline(cmdlines[0])
+ for line in cmdlines[1:]:
+ await repl._expect_prompt(timeout=timeout, async_=True)
+ res.append(repl.child.before)
+ repl.child.sendline(line)
+
+ # Command was fully submitted, now wait for the next prompt
+ prompt_idx = await repl._expect_prompt(timeout=timeout, async_=True)
+ if prompt_idx == 1:
+ # We got the continuation prompt - command was incomplete
+ repl.child.kill(signal.SIGINT)
+ await repl._expect_prompt(timeout=1, async_=True)
+ raise ValueError("Continuation prompt found - input was incomplete:")
+ return "".join(res + [repl.child.before])
+
+
+class PatternWaiter(asyncio.Protocol):
+ transport = None
+
+ def set_expecter(self, expecter):
+ self.expecter = expecter
+ self.fut = asyncio.Future()
+
+ def found(self, result):
+ if not self.fut.done():
+ self.fut.set_result(result)
+ self.transport.pause_reading()
+
+ def error(self, exc):
+ if not self.fut.done():
+ self.fut.set_exception(exc)
+ self.transport.pause_reading()
+
+ def connection_made(self, transport):
+ self.transport = transport
+
+ def data_received(self, data):
+ spawn = self.expecter.spawn
+ s = spawn._decoder.decode(data)
+ spawn._log(s, "read")
+
+ if self.fut.done():
+ spawn._before.write(s)
+ spawn._buffer.write(s)
+ return
+
+ try:
+ index = self.expecter.new_data(s)
+ if index is not None:
+ # Found a match
+ self.found(index)
+ except Exception as exc:
+ self.expecter.errored()
+ self.error(exc)
+
+ def eof_received(self):
+ # N.B. If this gets called, async will close the pipe (the spawn object)
+ # for us
+ try:
+ self.expecter.spawn.flag_eof = True
+ index = self.expecter.eof()
+ except EOF as exc:
+ self.error(exc)
+ else:
+ self.found(index)
+
+ def connection_lost(self, exc):
+ if isinstance(exc, OSError) and exc.errno == errno.EIO:
+ # We may get here without eof_received being called, e.g on Linux
+ self.eof_received()
+ elif exc is not None:
+ self.error(exc)
diff --git a/pexpect/fdpexpect.py b/pexpect/fdpexpect.py
index cddd50e..140bdfe 100644
--- a/pexpect/fdpexpect.py
+++ b/pexpect/fdpexpect.py
@@ -1,7 +1,11 @@
-'''This is like pexpect, but it will work with any file descriptor that you
+'''This is like :mod:`pexpect`, but it will work with any file descriptor that you
pass it. You are responsible for opening and close the file descriptor.
This allows you to use Pexpect with sockets and named pipes (FIFOs).
+.. note::
+ socket.fileno() does not give a readable file descriptor on windows.
+ Use :mod:`pexpect.socket_pexpect` for cross-platform socket support
+
PEXPECT LICENSE
This license is approved by the OSI and FSF as GPL-compatible.
diff --git a/pexpect/pxssh.py b/pexpect/pxssh.py
index 3d53bd9..bfefc7a 100644
--- a/pexpect/pxssh.py
+++ b/pexpect/pxssh.py
@@ -143,8 +143,7 @@ class pxssh (spawn):
# used to set shell command-line prompt to UNIQUE_PROMPT.
self.PROMPT_SET_SH = r"PS1='[PEXPECT]\$ '"
self.PROMPT_SET_CSH = r"set prompt='[PEXPECT]\$ '"
- self.SSH_OPTS = ("-o'RSAAuthentication=no'"
- + " -o 'PubkeyAuthentication=no'")
+ self.SSH_OPTS = (" -o 'PubkeyAuthentication=no'")
# Disabling host key checking, makes you vulnerable to MITM attacks.
# + " -o 'StrictHostKeyChecking=no'"
# + " -o 'UserKnownHostsFile /dev/null' ")
@@ -152,7 +151,7 @@ class pxssh (spawn):
# displaying a GUI password dialog. I have not figured out how to
# disable only SSH_ASKPASS without also disabling X11 forwarding.
# Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
- #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
+ #self.SSH_OPTS = "-x -o 'PubkeyAuthentication=no'"
self.force_password = False
self.debug_command_string = debug_command_string
diff --git a/pexpect/socket_pexpect.py b/pexpect/socket_pexpect.py
new file mode 100644
index 0000000..cb11ac2
--- /dev/null
+++ b/pexpect/socket_pexpect.py
@@ -0,0 +1,145 @@
+"""This is like :mod:`pexpect`, but it will work with any socket that you
+pass it. You are responsible for opening and closing the socket.
+
+PEXPECT LICENSE
+
+ This license is approved by the OSI and FSF as GPL-compatible.
+ http://opensource.org/licenses/isc-license.txt
+
+ Copyright (c) 2012, Noah Spurrier <noah@noah.org>
+ PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
+ PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
+ COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""
+
+import socket
+from contextlib import contextmanager
+
+from .exceptions import TIMEOUT, EOF
+from .spawnbase import SpawnBase
+
+__all__ = ["SocketSpawn"]
+
+
+class SocketSpawn(SpawnBase):
+ """This is like :mod:`pexpect.fdpexpect` but uses the cross-platform python socket api,
+ rather than the unix-specific file descriptor api. Thus, it works with
+ remote connections on both unix and windows."""
+
+ def __init__(
+ self,
+ socket: socket.socket,
+ args=None,
+ timeout=30,
+ maxread=2000,
+ searchwindowsize=None,
+ logfile=None,
+ encoding=None,
+ codec_errors="strict",
+ use_poll=False,
+ ):
+ """This takes an open socket."""
+
+ self.args = None
+ self.command = None
+ SpawnBase.__init__(
+ self,
+ timeout,
+ maxread,
+ searchwindowsize,
+ logfile,
+ encoding=encoding,
+ codec_errors=codec_errors,
+ )
+ self.socket = socket
+ self.child_fd = socket.fileno()
+ self.closed = False
+ self.name = "<socket %s>" % socket
+ self.use_poll = use_poll
+
+ def close(self):
+ """Close the socket.
+
+ Calling this method a second time does nothing, but if the file
+ descriptor was closed elsewhere, :class:`OSError` will be raised.
+ """
+ if self.child_fd == -1:
+ return
+
+ self.flush()
+ self.socket.shutdown(socket.SHUT_RDWR)
+ self.socket.close()
+ self.child_fd = -1
+ self.closed = True
+
+ def isalive(self):
+ """ Alive if the fileno is valid """
+ return self.socket.fileno() >= 0
+
+ def send(self, s) -> int:
+ """Write to socket, return number of bytes written"""
+ s = self._coerce_send_string(s)
+ self._log(s, "send")
+
+ b = self._encoder.encode(s, final=False)
+ self.socket.sendall(b)
+ return len(b)
+
+ def sendline(self, s) -> int:
+ """Write to socket with trailing newline, return number of bytes written"""
+ s = self._coerce_send_string(s)
+ return self.send(s + self.linesep)
+
+ def write(self, s):
+ """Write to socket, return None"""
+ self.send(s)
+
+ def writelines(self, sequence):
+ "Call self.write() for each item in sequence"
+ for s in sequence:
+ self.write(s)
+
+ @contextmanager
+ def _timeout(self, timeout):
+ saved_timeout = self.socket.gettimeout()
+ try:
+ self.socket.settimeout(timeout)
+ yield
+ finally:
+ self.socket.settimeout(saved_timeout)
+
+ def read_nonblocking(self, size=1, timeout=-1):
+ """
+ Read from the file descriptor and return the result as a string.
+
+ The read_nonblocking method of :class:`SpawnBase` assumes that a call
+ to os.read will not block (timeout parameter is ignored). This is not
+ the case for POSIX file-like objects such as sockets and serial ports.
+
+ Use :func:`select.select`, timeout is implemented conditionally for
+ POSIX systems.
+
+ :param int size: Read at most *size* bytes.
+ :param int timeout: Wait timeout seconds for file descriptor to be
+ ready to read. When -1 (default), use self.timeout. When 0, poll.
+ :return: String containing the bytes read
+ """
+ if timeout == -1:
+ timeout = self.timeout
+ try:
+ with self._timeout(timeout):
+ s = self.socket.recv(size)
+ if s == b'':
+ self.flag_eof = True
+ raise EOF("Socket closed")
+ return s
+ except socket.timeout:
+ raise TIMEOUT("Timeout exceeded.")
diff --git a/requirements-testing.txt b/requirements-testing.txt
index 1894122..c47d7b1 100644
--- a/requirements-testing.txt
+++ b/requirements-testing.txt
@@ -2,4 +2,3 @@ pytest
pytest-cov
coverage
coveralls
-pytest-capturelog
diff --git a/tests/PexpectTestCase.py b/tests/PexpectTestCase.py
index 307437e..a762d8f 100644
--- a/tests/PexpectTestCase.py
+++ b/tests/PexpectTestCase.py
@@ -49,22 +49,25 @@ class PexpectTestCase(unittest.TestCase):
print('\n', self.id(), end=' ')
sys.stdout.flush()
- # some build agents will ignore SIGHUP and SIGINT, which python
- # inherits. This causes some of the tests related to terminate()
- # to fail. We set them to the default handlers that they should
- # be, and restore them back to their SIG_IGN value on tearDown.
- #
- # I'm not entirely convinced they need to be restored, only our
- # test runner is affected.
- self.restore_ignored_signals = [
- value for value in (signal.SIGHUP, signal.SIGINT,)
- if signal.getsignal(value) == signal.SIG_IGN]
- if signal.SIGHUP in self.restore_ignored_signals:
- # sighup should be set to default handler
- signal.signal(signal.SIGHUP, signal.SIG_DFL)
- if signal.SIGINT in self.restore_ignored_signals:
- # SIGINT should be set to signal.default_int_handler
- signal.signal(signal.SIGINT, signal.default_int_handler)
+ if sys.platform != 'win32':
+ # some build agents will ignore SIGHUP and SIGINT, which python
+ # inherits. This causes some of the tests related to terminate()
+ # to fail. We set them to the default handlers that they should
+ # be, and restore them back to their SIG_IGN value on tearDown.
+ #
+ # I'm not entirely convinced they need to be restored, only our
+ # test runner is affected.
+ self.restore_ignored_signals = [
+ value for value in (signal.SIGHUP, signal.SIGINT,)
+ if signal.getsignal(value) == signal.SIG_IGN]
+ if signal.SIGHUP in self.restore_ignored_signals:
+ # sighup should be set to default handler
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
+ if signal.SIGINT in self.restore_ignored_signals:
+ # SIGINT should be set to signal.default_int_handler
+ signal.signal(signal.SIGINT, signal.default_int_handler)
+ else:
+ self.restore_ignored_signals = []
unittest.TestCase.setUp(self)
def tearDown(self):
@@ -97,7 +100,7 @@ class PexpectTestCase(unittest.TestCase):
raise AssertionError("%s was not raised" % excClass)
@contextlib.contextmanager
- def assertRaisesRegexp(self, excClass, pattern):
+ def assertRaisesRegex(self, excClass, pattern):
import re
try:
yield
diff --git a/tests/deprecated_test_filedescriptor.py b/tests/deprecated_test_filedescriptor.py
index 6b0ef3e..cd930cf 100755
--- a/tests/deprecated_test_filedescriptor.py
+++ b/tests/deprecated_test_filedescriptor.py
@@ -72,7 +72,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
#fout = open('delete_me_1','wb')
#fout.write(the_old_way)
diff --git a/tests/deprecated_test_run_out_of_pty.py b/tests/deprecated_test_run_out_of_pty.py
index 3090147..b34094e 100755
--- a/tests/deprecated_test_run_out_of_pty.py
+++ b/tests/deprecated_test_run_out_of_pty.py
@@ -47,5 +47,5 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/fakessh/ssh b/tests/fakessh/ssh
index 4a5be1b..74ffe20 100755
--- a/tests/fakessh/ssh
+++ b/tests/fakessh/ssh
@@ -62,7 +62,7 @@ prompt = "$"
while True:
cmd = input(prompt)
if cmd.startswith('PS1='):
- prompt = eval(cmd[4:]).replace('\$', '$')
+ prompt = eval(cmd[4:]).replace(r'\$', '$')
elif cmd == 'ping':
print('pong')
elif cmd.startswith('ls'):
diff --git a/tests/test_ansi.py b/tests/test_ansi.py
index 3d73fe8..a49c663 100755
--- a/tests/test_ansi.py
+++ b/tests/test_ansi.py
@@ -236,5 +236,5 @@ class ansiTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ansiTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ansiTestCase)
diff --git a/tests/test_command_list_split.py b/tests/test_command_list_split.py
index 370f46e..eeaf6c0 100755
--- a/tests/test_command_list_split.py
+++ b/tests/test_command_list_split.py
@@ -37,4 +37,4 @@ class SplitCommandLineTestCase(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(SplitCommandLineTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(SplitCommandLineTestCase)
diff --git a/tests/test_constructor.py b/tests/test_constructor.py
index 98c473a..1b4d717 100755
--- a/tests/test_constructor.py
+++ b/tests/test_constructor.py
@@ -44,5 +44,5 @@ class TestCaseConstructor(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCaseConstructor,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCaseConstructor)
diff --git a/tests/test_ctrl_chars.py b/tests/test_ctrl_chars.py
index 032027c..11fb55c 100755
--- a/tests/test_ctrl_chars.py
+++ b/tests/test_ctrl_chars.py
@@ -26,8 +26,9 @@ from . import PexpectTestCase
import time
import sys
-from ptyprocess import ptyprocess
-ptyprocess._make_eof_intr()
+if sys.platform != 'win32':
+ from ptyprocess import ptyprocess
+ ptyprocess._make_eof_intr()
if sys.version_info[0] >= 3:
def byte(i):
@@ -124,5 +125,5 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCtrlChars,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCtrlChars)
diff --git a/tests/test_destructor.py b/tests/test_destructor.py
index d27b6f6..01d89a0 100755
--- a/tests/test_destructor.py
+++ b/tests/test_destructor.py
@@ -80,5 +80,5 @@ class TestCaseDestructor(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCaseDestructor,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCaseDestructor)
diff --git a/tests/test_dotall.py b/tests/test_dotall.py
index 68aef3f..44c58c5 100755
--- a/tests/test_dotall.py
+++ b/tests/test_dotall.py
@@ -39,5 +39,5 @@ class TestCaseDotall(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCaseDotall,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCaseDotall)
diff --git a/tests/test_expect.py b/tests/test_expect.py
index 5e54d65..c6d72da 100755
--- a/tests/test_expect.py
+++ b/tests/test_expect.py
@@ -643,13 +643,13 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
def test_bad_arg(self):
p = pexpect.spawn('cat')
- with self.assertRaisesRegexp(TypeError, '.*must be one of'):
+ with self.assertRaisesRegex(TypeError, '.*must be one of'):
p.expect(1)
- with self.assertRaisesRegexp(TypeError, '.*must be one of'):
+ with self.assertRaisesRegex(TypeError, '.*must be one of'):
p.expect([1, b'2'])
- with self.assertRaisesRegexp(TypeError, '.*must be one of'):
+ with self.assertRaisesRegex(TypeError, '.*must be one of'):
p.expect_exact(1)
- with self.assertRaisesRegexp(TypeError, '.*must be one of'):
+ with self.assertRaisesRegex(TypeError, '.*must be one of'):
p.expect_exact([1, b'2'])
def test_timeout_none(self):
@@ -714,4 +714,4 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/test_filedescriptor.py b/tests/test_filedescriptor.py
index d9164e1..3f9d954 100755
--- a/tests/test_filedescriptor.py
+++ b/tests/test_filedescriptor.py
@@ -69,4 +69,4 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/test_interact.py b/tests/test_interact.py
index 4afbd18..7205d80 100755
--- a/tests/test_interact.py
+++ b/tests/test_interact.py
@@ -62,8 +62,8 @@ class InteractTestCase (PexpectTestCase.PexpectTestCase):
p.sendcontrol(']')
p.expect('29<STOP>')
p.send('\x00')
- if not os.environ.get('TRAVIS', None):
- # on Travis-CI, we sometimes miss trailing stdout from the
+ if not os.environ.get('CI', None):
+ # on CI platforms, we sometimes miss trailing stdout from the
# chain of child processes, not entirely sure why. So this
# is skipped on such systems.
p.expect('0<STOP>')
@@ -84,8 +84,8 @@ class InteractTestCase (PexpectTestCase.PexpectTestCase):
p.expect('206<STOP>') # [206, 146]
p.expect('146<STOP>')
p.send('\x00')
- if not os.environ.get('TRAVIS', None):
- # on Travis-CI, we sometimes miss trailing stdout from the
+ if not os.environ.get('CI', None):
+ # on CI platforms, we sometimes miss trailing stdout from the
# chain of child processes, not entirely sure why. So this
# is skipped on such systems.
p.expect('0<STOP>')
@@ -97,5 +97,5 @@ class InteractTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(InteractTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(InteractTestCase)
diff --git a/tests/test_isalive.py b/tests/test_isalive.py
index 5e3021e..ba2b5d4 100755
--- a/tests/test_isalive.py
+++ b/tests/test_isalive.py
@@ -121,5 +121,5 @@ class IsAliveTestCase(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(IsAliveTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(IsAliveTestCase)
diff --git a/tests/test_log.py b/tests/test_log.py
index 4ad2256..e2defff 100755
--- a/tests/test_log.py
+++ b/tests/test_log.py
@@ -104,5 +104,5 @@ class TestCaseLog(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCaseLog,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCaseLog)
diff --git a/tests/test_misc.py b/tests/test_misc.py
index 7784759..5c81f92 100755
--- a/tests/test_misc.py
+++ b/tests/test_misc.py
@@ -214,7 +214,7 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
# Force an invalid state to test isalive
child.ptyproc.terminated = 0
try:
- with self.assertRaisesRegexp(pexpect.ExceptionPexpect,
+ with self.assertRaisesRegex(pexpect.ExceptionPexpect,
".*" + expect_errmsg):
child.isalive()
finally:
@@ -224,7 +224,7 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
def test_bad_arguments_suggest_fdpsawn(self):
" assert custom exception for spawn(int). "
expect_errmsg = "maybe you want to use fdpexpect.fdspawn"
- with self.assertRaisesRegexp(pexpect.ExceptionPexpect,
+ with self.assertRaisesRegex(pexpect.ExceptionPexpect,
".*" + expect_errmsg):
pexpect.spawn(1)
@@ -370,4 +370,4 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCaseMisc,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCaseMisc)
diff --git a/tests/test_missing_command.py b/tests/test_missing_command.py
index 92e4733..3775632 100755
--- a/tests/test_missing_command.py
+++ b/tests/test_missing_command.py
@@ -34,5 +34,5 @@ class MissingCommandTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(MissingCommandTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(MissingCommandTestCase)
diff --git a/tests/test_performance.py b/tests/test_performance.py
index d7e2cd6..05027a0 100755
--- a/tests/test_performance.py
+++ b/tests/test_performance.py
@@ -110,4 +110,4 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == "__main__":
unittest.main()
-suite = unittest.makeSuite(PerformanceTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(PerformanceTestCase)
diff --git a/tests/test_popen_spawn.py b/tests/test_popen_spawn.py
index fca7493..6168148 100644
--- a/tests/test_popen_spawn.py
+++ b/tests/test_popen_spawn.py
@@ -110,13 +110,13 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
def test_bad_arg(self):
p = PopenSpawn('cat')
- with self.assertRaisesRegexp(TypeError, '.*must be one of'):
+ with self.assertRaisesRegex(TypeError, '.*must be one of'):
p.expect(1)
- with self.assertRaisesRegexp(TypeError, '.*must be one of'):
+ with self.assertRaisesRegex(TypeError, '.*must be one of'):
p.expect([1, b'2'])
- with self.assertRaisesRegexp(TypeError, '.*must be one of'):
+ with self.assertRaisesRegex(TypeError, '.*must be one of'):
p.expect_exact(1)
- with self.assertRaisesRegexp(TypeError, '.*must be one of'):
+ with self.assertRaisesRegex(TypeError, '.*must be one of'):
p.expect_exact([1, b'2'])
def test_timeout_none(self):
@@ -136,4 +136,4 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/test_pxssh.py b/tests/test_pxssh.py
index c6ec4e2..ba700c8 100644
--- a/tests/test_pxssh.py
+++ b/tests/test_pxssh.py
@@ -1,10 +1,12 @@
#!/usr/bin/env python
+import sys
import os
import shutil
import tempfile
import unittest
-from pexpect import pxssh
+if sys.platform != 'win32':
+ from pexpect import pxssh
from .PexpectTestCase import PexpectTestCase
class SSHTestBase(PexpectTestCase):
diff --git a/tests/test_run.py b/tests/test_run.py
index f750fb2..15d0c40 100755
--- a/tests/test_run.py
+++ b/tests/test_run.py
@@ -22,7 +22,6 @@ PEXPECT LICENSE
import pexpect
import unittest
import subprocess
-import tempfile
import sys
import os
from . import PexpectTestCase
@@ -53,21 +52,17 @@ def function_events_callback(values):
class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
- runfunc = staticmethod(pexpect.run)
+ if sys.platform != 'win32':
+ runfunc = staticmethod(pexpect.run)
cr = b'\r'
empty = b''
prep_subprocess_out = staticmethod(lambda x: x)
def setUp(self):
- fd, self.rcfile = tempfile.mkstemp()
- os.write(fd, b'PS1=GO: \n')
- os.close(fd)
+ self.runenv = os.environ.copy()
+ self.runenv['PS1'] = 'GO:'
super(RunFuncTestCase, self).setUp()
- def tearDown(self):
- os.unlink(self.rcfile)
- super(RunFuncTestCase, self).tearDown()
-
def test_run_exit(self):
(data, exitstatus) = self.runfunc(sys.executable + ' exit1.py', withexitstatus=1)
assert exitstatus == 1, "Exit status of 'python exit1.py' should be 1."
@@ -106,9 +101,10 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
]
(data, exitstatus) = pexpect.run(
- 'bash --rcfile {0}'.format(self.rcfile),
+ 'bash --norc',
withexitstatus=True,
events=events,
+ env=self.runenv,
timeout=10)
assert exitstatus == 0
@@ -118,9 +114,10 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
]
(data, exitstatus) = pexpect.run(
- 'bash --rcfile {0}'.format(self.rcfile),
+ 'bash --norc',
withexitstatus=True,
events=events,
+ env=self.runenv,
timeout=10)
assert exitstatus == 0
@@ -130,18 +127,20 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
]
(data, exitstatus) = pexpect.run(
- 'bash --rcfile {0}'.format(self.rcfile),
+ 'bash --norc',
withexitstatus=True,
events=events,
+ env=self.runenv,
timeout=10)
assert exitstatus == 0
def test_run_event_typeerror(self):
events = [('GO:', -1)]
with self.assertRaises(TypeError):
- pexpect.run('bash --rcfile {0}'.format(self.rcfile),
+ pexpect.run('bash --norc',
withexitstatus=True,
events=events,
+ env=self.runenv,
timeout=10)
def _method_events_callback(self, values):
@@ -162,7 +161,8 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
class RunUnicodeFuncTestCase(RunFuncTestCase):
- runfunc = staticmethod(pexpect.runu)
+ if sys.platform != 'win32':
+ runfunc = staticmethod(pexpect.runu)
cr = b'\r'.decode('ascii')
empty = b''.decode('ascii')
prep_subprocess_out = staticmethod(lambda x: x.decode('utf-8', 'replace'))
diff --git a/tests/test_screen.py b/tests/test_screen.py
index 2429e57..9e275bc 100755
--- a/tests/test_screen.py
+++ b/tests/test_screen.py
@@ -282,6 +282,6 @@ class screenTestCase (PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(screenTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(screenTestCase)
diff --git a/tests/test_socket.py b/tests/test_socket.py
index a8c8595..b801b00 100644
--- a/tests/test_socket.py
+++ b/tests/test_socket.py
@@ -19,7 +19,7 @@ PEXPECT LICENSE
'''
import pexpect
-from pexpect import fdpexpect
+from pexpect import socket_pexpect
import unittest
from . import PexpectTestCase
import multiprocessing
@@ -133,12 +133,16 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
pass
exit(0)
+ def spawn(self, socket, timeout=30, use_poll=False):
+ """override me with other ways of spawning on a socket"""
+ return socket_pexpect.SocketSpawn(socket, timeout=timeout, use_poll=use_poll)
+
def socket_fn(self, timed_out, all_read):
result = 0
try:
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock, timeout=10)
+ session = self.spawn(sock, timeout=10)
# Get all data from server
session.read_nonblocking(size=4096)
all_read.set()
@@ -152,7 +156,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_socket(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
+ session = self.spawn(sock, timeout=10)
session.expect(self.prompt1)
self.assertEqual(session.before, self.motd)
session.send(self.enter)
@@ -166,7 +170,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_socket_with_write(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
+ session = self.spawn(sock, timeout=10)
session.expect(self.prompt1)
self.assertEqual(session.before, self.motd)
session.write(self.enter)
@@ -177,19 +181,11 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
session.expect(pexpect.EOF)
self.assertEqual(session.before, b'')
- def test_not_int(self):
- with self.assertRaises(pexpect.ExceptionPexpect):
- session = fdpexpect.fdspawn('bogus', timeout=10)
-
- def test_not_file_descriptor(self):
- with self.assertRaises(pexpect.ExceptionPexpect):
- session = fdpexpect.fdspawn(-1, timeout=10)
-
def test_timeout(self):
with self.assertRaises(pexpect.TIMEOUT):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock, timeout=10)
+ session = self.spawn(sock, timeout=10)
session.expect(b'Bogus response')
def test_interrupt(self):
@@ -223,7 +219,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_maxread(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
+ session = self.spawn(sock, timeout=10)
session.maxread = 1100
session.expect(self.prompt1)
self.assertEqual(session.before, self.motd)
@@ -238,7 +234,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_fd_isalive(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
+ session = self.spawn(sock, timeout=10)
assert session.isalive()
sock.close()
assert not session.isalive(), "Should not be alive after close()"
@@ -246,7 +242,7 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_fd_isalive_poll(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10, use_poll=True)
+ session = self.spawn(sock, timeout=10, use_poll=True)
assert session.isalive()
sock.close()
assert not session.isalive(), "Should not be alive after close()"
@@ -254,27 +250,19 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_fd_isatty(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
+ session = self.spawn(sock, timeout=10)
assert not session.isatty()
session.close()
def test_fd_isatty_poll(self):
sock = socket.socket(self.af, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock.fileno(), timeout=10, use_poll=True)
+ session = self.spawn(sock, timeout=10, use_poll=True)
assert not session.isatty()
session.close()
- def test_fileobj(self):
- sock = socket.socket(self.af, socket.SOCK_STREAM)
- sock.connect((self.host, self.port))
- session = fdpexpect.fdspawn(sock, timeout=10) # Should get the fileno from the socket
- session.expect(self.prompt1)
- session.close()
- assert not session.isalive()
- session.close() # Smoketest - should be able to call this again
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/test_socket_fd.py b/tests/test_socket_fd.py
new file mode 100644
index 0000000..5be733c
--- /dev/null
+++ b/tests/test_socket_fd.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+'''
+PEXPECT LICENSE
+
+ This license is approved by the OSI and FSF as GPL-compatible.
+ http://opensource.org/licenses/isc-license.txt
+
+ Copyright (c) 2012, Noah Spurrier <noah@noah.org>
+ PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
+ PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
+ COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+'''
+import pexpect
+from pexpect import fdpexpect
+import unittest
+from . import test_socket
+import multiprocessing
+import os
+import signal
+import socket
+import time
+import errno
+
+
+class SocketServerError(Exception):
+ pass
+
+
+class ExpectTestCase(test_socket.ExpectTestCase):
+ """ duplicate of test_socket, but using fdpexpect rather than socket_expect """
+
+ def spawn(self, socket, timeout=30, use_poll=False):
+ return fdpexpect.fdspawn(socket.fileno(), timeout=timeout, use_poll=use_poll)
+
+ def test_not_int(self):
+ with self.assertRaises(pexpect.ExceptionPexpect):
+ session = fdpexpect.fdspawn('bogus', timeout=10)
+
+ def test_not_file_descriptor(self):
+ with self.assertRaises(pexpect.ExceptionPexpect):
+ session = fdpexpect.fdspawn(-1, timeout=10)
+
+ def test_fileobj(self):
+ sock = socket.socket(self.af, socket.SOCK_STREAM)
+ sock.connect((self.host, self.port))
+ session = fdpexpect.fdspawn(sock, timeout=10) # Should get the fileno from the socket
+ session.expect(self.prompt1)
+ session.close()
+ assert not session.isalive()
+ session.close() # Smoketest - should be able to call this again
+
+
+if __name__ == '__main__':
+ unittest.main()
+
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/test_socket_pexpect.py b/tests/test_socket_pexpect.py
new file mode 100644
index 0000000..8fbcebf
--- /dev/null
+++ b/tests/test_socket_pexpect.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+'''
+PEXPECT LICENSE
+
+ This license is approved by the OSI and FSF as GPL-compatible.
+ http://opensource.org/licenses/isc-license.txt
+
+ Copyright (c) 2012, Noah Spurrier <noah@noah.org>
+ PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
+ PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
+ COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+'''
+import pexpect
+from pexpect import socket_pexpect
+import unittest
+from . import PexpectTestCase
+import socket
+
+def open_file_socket(filename):
+ read_socket, write_socket = socket.socketpair()
+ with open(filename, "rb") as file:
+ write_socket.sendall(file.read())
+ write_socket.close()
+ return read_socket
+
+class ExpectTestCase(PexpectTestCase.PexpectTestCase):
+ def setUp(self):
+ print(self.id())
+ PexpectTestCase.PexpectTestCase.setUp(self)
+
+ def test_socket (self):
+ socket = open_file_socket('TESTDATA.txt')
+ s = socket_pexpect.SocketSpawn(socket)
+ s.expect(b'This is the end of test data:')
+ s.expect(pexpect.EOF)
+ self.assertEqual(s.before, b' END\n')
+
+ def test_maxread (self):
+ socket = open_file_socket('TESTDATA.txt')
+ s = socket_pexpect.SocketSpawn(socket)
+ s.maxread = 100
+ s.expect('2')
+ s.expect ('This is the end of test data:')
+ s.expect (pexpect.EOF)
+ self.assertEqual(s.before, b' END\n')
+
+ def test_socket_isalive (self):
+ socket = open_file_socket('TESTDATA.txt')
+ s = socket_pexpect.SocketSpawn(socket)
+ assert s.isalive()
+ s.close()
+ assert not s.isalive(), "Should not be alive after close()"
+
+ def test_socket_isatty (self):
+ socket = open_file_socket('TESTDATA.txt')
+ s = socket_pexpect.SocketSpawn(socket)
+ assert not s.isatty()
+ s.close()
+
+
+if __name__ == '__main__':
+ unittest.main()
+
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)
diff --git a/tests/test_timeout_pattern.py b/tests/test_timeout_pattern.py
index 5f610ef..35d4816 100755
--- a/tests/test_timeout_pattern.py
+++ b/tests/test_timeout_pattern.py
@@ -89,4 +89,4 @@ class Exp_TimeoutTestCase(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(Exp_TimeoutTestCase,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(Exp_TimeoutTestCase)
diff --git a/tests/test_unicode.py b/tests/test_unicode.py
index 9b5b988..6103167 100644
--- a/tests/test_unicode.py
+++ b/tests/test_unicode.py
@@ -184,4 +184,4 @@ class UnicodeTests(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(UnicodeTests, 'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(UnicodeTests)
diff --git a/tests/test_winsize.py b/tests/test_winsize.py
index be16773..6fc78ce 100755
--- a/tests/test_winsize.py
+++ b/tests/test_winsize.py
@@ -55,6 +55,6 @@ class TestCaseWinsize(PexpectTestCase.PexpectTestCase):
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(TestCaseWinsize,'test')
+suite = unittest.TestLoader().loadTestsFromTestCase(TestCaseWinsize)