summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2014-09-10 12:58:02 -0700
committerJeff Forcier <jeff@bitprophet.org>2014-09-10 12:58:02 -0700
commitbade24d2d5b76c3f38bf0310ea8072184ab95b2a (patch)
tree160c9e52fc9911696247c1fe9e4bfec3c15ab662 /tests
parent0063e64046c732e8c50fc5f54234942feaa313d9 (diff)
parente71f4e59878a636268475f642ed4e98a1b3e375d (diff)
downloadparamiko-bade24d2d5b76c3f38bf0310ea8072184ab95b2a.tar.gz
Merge branch 'master' into 216-int
Conflicts: paramiko/transport.py paramiko/util.py tests/test_client.py
Diffstat (limited to 'tests')
-rw-r--r--tests/stub_sftp.py1
-rw-r--r--tests/test_buffered_pipe.py10
-rw-r--r--tests/test_client.py31
-rw-r--r--tests/test_gssapi.py157
-rw-r--r--tests/test_kex_gss.py133
-rw-r--r--tests/test_packetizer.py46
-rw-r--r--tests/test_pkey.py18
-rw-r--r--tests/test_ssh_gss.py126
-rw-r--r--tests/test_transport.py134
-rw-r--r--tests/test_util.py17
-rw-r--r--tests/util.py10
11 files changed, 606 insertions, 77 deletions
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py
index 47644433..24380ba1 100644
--- a/tests/stub_sftp.py
+++ b/tests/stub_sftp.py
@@ -21,6 +21,7 @@ A stub SFTP server for loopback SFTP testing.
"""
import os
+import sys
from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \
SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED
from paramiko.common import o666
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
index a53081a9..eeb4d0ad 100644
--- a/tests/test_buffered_pipe.py
+++ b/tests/test_buffered_pipe.py
@@ -22,10 +22,10 @@ Some unit tests for BufferedPipe.
import threading
import time
+import unittest
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
from paramiko import pipe
-
-from tests.util import ParamikoTest
+from paramiko.py3compat import b
def delay_thread(p):
@@ -40,7 +40,7 @@ def close_thread(p):
p.close()
-class BufferedPipeTest(ParamikoTest):
+class BufferedPipeTest(unittest.TestCase):
def test_1_buffered_pipe(self):
p = BufferedPipe()
self.assertTrue(not p.read_ready())
@@ -48,12 +48,12 @@ class BufferedPipeTest(ParamikoTest):
self.assertTrue(p.read_ready())
data = p.read(6)
self.assertEqual(b'hello.', data)
-
+
p.feed('plus/minus')
self.assertEqual(b'plu', p.read(3))
self.assertEqual(b's/m', p.read(3))
self.assertEqual(b'inus', p.read(4))
-
+
p.close()
self.assertTrue(not p.read_ready())
self.assertEqual(b'', p.read(1))
diff --git a/tests/test_client.py b/tests/test_client.py
index b3635272..28d1cb46 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -29,6 +29,7 @@ import unittest
import weakref
import warnings
import os
+import time
from tests.util import test_path
import paramiko
from paramiko.common import PY2, b
@@ -93,7 +94,7 @@ class SSHClientTest (unittest.TestCase):
if hasattr(self, attr):
getattr(self, attr).close()
- def _run(self, allowed_keys=None):
+ def _run(self, allowed_keys=None, delay=0):
if allowed_keys is None:
allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
@@ -101,6 +102,8 @@ class SSHClientTest (unittest.TestCase):
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
self.ts.add_server_key(host_key)
server = NullServer(allowed_keys=allowed_keys)
+ if delay:
+ time.sleep(delay)
self.ts.start_server(self.event, server)
def _test_connection(self, **kwargs):
@@ -264,6 +267,8 @@ class SSHClientTest (unittest.TestCase):
"""
# Unclear why this is borked on Py3, but it is, and does not seem worth
# pursuing at the moment.
+ # XXX: It's the release of the references to e.g packetizer that fails
+ # in py3...
if not PY2:
return
threading.Thread(target=self._run).start()
@@ -296,7 +301,7 @@ class SSHClientTest (unittest.TestCase):
self.assertTrue(p() is None)
- def test_6_client_can_be_used_as_context_manager(self):
+ def test_client_can_be_used_as_context_manager(self):
"""
verify that an SSHClient can be used a context manager
"""
@@ -317,3 +322,25 @@ class SSHClientTest (unittest.TestCase):
self.assertTrue(self.tc._transport is not None)
self.assertTrue(self.tc._transport is None)
+
+ def test_7_banner_timeout(self):
+ """
+ verify that the SSHClient has a configurable banner timeout.
+ """
+ # Start the thread with a 1 second wait.
+ threading.Thread(target=self._run, kwargs={'delay': 1}).start()
+ host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+
+ self.tc = paramiko.SSHClient()
+ self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
+ # Connect with a half second banner timeout.
+ self.assertRaises(
+ paramiko.SSHException,
+ self.tc.connect,
+ self.addr,
+ self.port,
+ username='slowdive',
+ password='pygmalion',
+ banner_timeout=0.5
+ )
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
new file mode 100644
index 00000000..0d3df72c
--- /dev/null
+++ b/tests/test_gssapi.py
@@ -0,0 +1,157 @@
+# Copyright (C) 2013-2014 science + computing ag
+# Author: Sebastian Deiss <sebastian.deiss@t-online.de>
+#
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Test the used APIs for GSS-API / SSPI authentication
+"""
+
+import unittest
+import socket
+
+
+class GSSAPITest(unittest.TestCase):
+
+ def init(hostname=None, srv_mode=False):
+ global krb5_mech, targ_name, server_mode
+ krb5_mech = "1.2.840.113554.1.2.2"
+ targ_name = hostname
+ server_mode = srv_mode
+
+ init = staticmethod(init)
+
+ def test_1_pyasn1(self):
+ """
+ Test the used methods of pyasn1.
+ """
+ from pyasn1.type.univ import ObjectIdentifier
+ from pyasn1.codec.der import encoder, decoder
+ oid = encoder.encode(ObjectIdentifier(krb5_mech))
+ mech, __ = decoder.decode(oid)
+ self.assertEquals(krb5_mech, mech.__str__())
+
+ def test_2_gssapi_sspi(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ _API = "MIT"
+ try:
+ import gssapi
+ except ImportError:
+ import sspicon
+ import sspi
+ _API = "SSPI"
+
+ c_token = None
+ gss_ctxt_status = False
+ mic_msg = b"G'day Mate!"
+
+ if _API == "MIT":
+ if server_mode:
+ gss_flags = (gssapi.C_PROT_READY_FLAG,
+ gssapi.C_INTEG_FLAG,
+ gssapi.C_MUTUAL_FLAG,
+ gssapi.C_DELEG_FLAG)
+ else:
+ gss_flags = (gssapi.C_PROT_READY_FLAG,
+ gssapi.C_INTEG_FLAG,
+ gssapi.C_DELEG_FLAG)
+ """
+ Initialize a GSS-API context.
+ """
+ ctx = gssapi.Context()
+ ctx.flags = gss_flags
+ krb5_oid = gssapi.OID.mech_from_string(krb5_mech)
+ target_name = gssapi.Name("host@" + targ_name,
+ gssapi.C_NT_HOSTBASED_SERVICE)
+ gss_ctxt = gssapi.InitContext(peer_name=target_name,
+ mech_type=krb5_oid,
+ req_flags=ctx.flags)
+ if server_mode:
+ c_token = gss_ctxt.step(c_token)
+ gss_ctxt_status = gss_ctxt.established
+ self.assertEquals(False, gss_ctxt_status)
+ """
+ Accept a GSS-API context.
+ """
+ gss_srv_ctxt = gssapi.AcceptContext()
+ s_token = gss_srv_ctxt.step(c_token)
+ gss_ctxt_status = gss_srv_ctxt.established
+ self.assertNotEquals(None, s_token)
+ self.assertEquals(True, gss_ctxt_status)
+ """
+ Establish the client context
+ """
+ c_token = gss_ctxt.step(s_token)
+ self.assertEquals(None, c_token)
+ else:
+ while not gss_ctxt.established:
+ c_token = gss_ctxt.step(c_token)
+ self.assertNotEquals(None, c_token)
+ """
+ Build MIC
+ """
+ mic_token = gss_ctxt.get_mic(mic_msg)
+
+ if server_mode:
+ """
+ Check MIC
+ """
+ status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
+ self.assertEquals(0, status)
+ else:
+ gss_flags = sspicon.ISC_REQ_INTEGRITY |\
+ sspicon.ISC_REQ_MUTUAL_AUTH |\
+ sspicon.ISC_REQ_DELEGATE
+ """
+ Initialize a GSS-API context.
+ """
+ target_name = "host/" + socket.getfqdn(targ_name)
+ gss_ctxt = sspi.ClientAuth("Kerberos",
+ scflags=gss_flags,
+ targetspn=target_name)
+ if server_mode:
+ error, token = gss_ctxt.authorize(c_token)
+ c_token = token[0].Buffer
+ self.assertEquals(0, error)
+ """
+ Accept a GSS-API context.
+ """
+ gss_srv_ctxt = sspi.ServerAuth("Kerberos", spn=target_name)
+ error, token = gss_srv_ctxt.authorize(c_token)
+ s_token = token[0].Buffer
+ """
+ Establish the context.
+ """
+ error, token = gss_ctxt.authorize(s_token)
+ c_token = token[0].Buffer
+ self.assertEquals(None, c_token)
+ self.assertEquals(0, error)
+ """
+ Build MIC
+ """
+ mic_token = gss_ctxt.sign(mic_msg)
+ """
+ Check MIC
+ """
+ gss_srv_ctxt.verify(mic_msg, mic_token)
+ else:
+ error, token = gss_ctxt.authorize(c_token)
+ c_token = token[0].Buffer
+ self.assertNotEquals(0, error)
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
new file mode 100644
index 00000000..b5e277b3
--- /dev/null
+++ b/tests/test_kex_gss.py
@@ -0,0 +1,133 @@
+# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
+# Copyright (C) 2013-2014 science + computing ag
+# Author: Sebastian Deiss <sebastian.deiss@t-online.de>
+#
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Unit Tests for the GSS-API / SSPI SSHv2 Diffie-Hellman Key Exchange and user
+authentication
+"""
+
+
+import socket
+import threading
+import unittest
+
+import paramiko
+
+
+class NullServer (paramiko.ServerInterface):
+
+ def get_allowed_auths(self, username):
+ return 'gssapi-keyex'
+
+ def check_auth_gssapi_keyex(self, username,
+ gss_authenticated=paramiko.AUTH_FAILED,
+ cc_file=None):
+ if gss_authenticated == paramiko.AUTH_SUCCESSFUL:
+ return paramiko.AUTH_SUCCESSFUL
+ return paramiko.AUTH_FAILED
+
+ def enable_auth_gssapi(self):
+ UseGSSAPI = True
+ return UseGSSAPI
+
+ def check_channel_request(self, kind, chanid):
+ return paramiko.OPEN_SUCCEEDED
+
+ def check_channel_exec_request(self, channel, command):
+ if command != 'yes':
+ return False
+ return True
+
+
+class GSSKexTest(unittest.TestCase):
+
+ def init(username, hostname):
+ global krb5_principal, targ_name
+ krb5_principal = username
+ targ_name = hostname
+
+ init = staticmethod(init)
+
+ def setUp(self):
+ self.username = krb5_principal
+ self.hostname = socket.getfqdn(targ_name)
+ self.sockl = socket.socket()
+ self.sockl.bind((targ_name, 0))
+ self.sockl.listen(1)
+ self.addr, self.port = self.sockl.getsockname()
+ self.event = threading.Event()
+ thread = threading.Thread(target=self._run)
+ thread.start()
+
+ def tearDown(self):
+ for attr in "tc ts socks sockl".split():
+ if hasattr(self, attr):
+ getattr(self, attr).close()
+
+ def _run(self):
+ self.socks, addr = self.sockl.accept()
+ self.ts = paramiko.Transport(self.socks, True)
+ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ self.ts.add_server_key(host_key)
+ self.ts.set_gss_host(targ_name)
+ try:
+ self.ts.load_server_moduli()
+ except:
+ print ('(Failed to load moduli -- gex will be unsupported.)')
+ server = NullServer()
+ self.ts.start_server(self.event, server)
+
+ def test_1_gsskex_and_auth(self):
+ """
+ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated
+ Diffie-Hellman Key Exchange and user authentication with the GSS-API
+ context created during key exchange.
+ """
+ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+
+ self.tc = paramiko.SSHClient()
+ self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port),
+ 'ssh-rsa', public_host_key)
+ self.tc.connect(self.hostname, self.port, username=self.username,
+ gss_auth=True, gss_kex=True)
+
+ self.event.wait(1.0)
+ self.assert_(self.event.isSet())
+ self.assert_(self.ts.is_active())
+ self.assertEquals(self.username, self.ts.get_username())
+ self.assertEquals(True, self.ts.is_authenticated())
+
+ stdin, stdout, stderr = self.tc.exec_command('yes')
+ schan = self.ts.accept(1.0)
+
+ schan.send('Hello there.\n')
+ schan.send_stderr('This is on stderr.\n')
+ schan.close()
+
+ self.assertEquals('Hello there.\n', stdout.readline())
+ self.assertEquals('', stdout.readline())
+ self.assertEquals('This is on stderr.\n', stderr.readline())
+ self.assertEquals('', stderr.readline())
+
+ stdin.close()
+ stdout.close()
+ stderr.close()
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index a8c0f973..8faec03c 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -74,3 +74,49 @@ class PacketizerTest (unittest.TestCase):
self.assertEqual(100, m.get_int())
self.assertEqual(1, m.get_int())
self.assertEqual(900, m.get_int())
+
+ def test_3_closed(self):
+ rsock = LoopSocket()
+ wsock = LoopSocket()
+ rsock.link(wsock)
+ p = Packetizer(wsock)
+ p.set_log(util.get_logger('paramiko.transport'))
+ p.set_hexdump(True)
+ cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16)
+ p.set_outbound_cipher(cipher, 16, sha1, 12, x1f * 20)
+
+ # message has to be at least 16 bytes long, so we'll have at least one
+ # block of data encrypted that contains zero random padding bytes
+ m = Message()
+ m.add_byte(byte_chr(100))
+ m.add_int(100)
+ m.add_int(1)
+ m.add_int(900)
+ wsock.send = lambda x: 0
+ from functools import wraps
+ import errno
+ import os
+ import signal
+
+ class TimeoutError(Exception):
+ pass
+
+ def timeout(seconds=1, error_message=os.strerror(errno.ETIME)):
+ def decorator(func):
+ def _handle_timeout(signum, frame):
+ raise TimeoutError(error_message)
+
+ def wrapper(*args, **kwargs):
+ signal.signal(signal.SIGALRM, _handle_timeout)
+ signal.alarm(seconds)
+ try:
+ result = func(*args, **kwargs)
+ finally:
+ signal.alarm(0)
+ return result
+
+ return wraps(func)(wrapper)
+
+ return decorator
+ send = timeout()(p.send_message)
+ self.assertRaises(EOFError, send, m)
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 1468ee27..f673254f 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -21,6 +21,7 @@ Some unit tests for public/private key objects.
"""
import unittest
+import os
from binascii import hexlify
from hashlib import md5
@@ -253,3 +254,20 @@ class KeyTest (unittest.TestCase):
msg.rewind()
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg))
+
+ def test_salt_size(self):
+ # Read an existing encrypted private key
+ file_ = test_path('test_rsa_password.key')
+ password = 'television'
+ newfile = file_ + '.new'
+ newpassword = 'radio'
+ key = RSAKey(filename=file_, password=password)
+ # Write out a newly re-encrypted copy with a new password.
+ # When the bug under test exists, this will ValueError.
+ try:
+ key.write_private_key_file(newfile, password=newpassword)
+ # Verify the inner key data still matches (when no ValueError)
+ key2 = RSAKey(filename=newfile, password=newpassword)
+ self.assertEqual(key, key2)
+ finally:
+ os.remove(newfile)
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
new file mode 100644
index 00000000..595081b8
--- /dev/null
+++ b/tests/test_ssh_gss.py
@@ -0,0 +1,126 @@
+# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
+# Copyright (C) 2013-2014 science + computing ag
+# Author: Sebastian Deiss <sebastian.deiss@t-online.de>
+#
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic)
+"""
+
+import socket
+import threading
+import unittest
+
+import paramiko
+
+
+class NullServer (paramiko.ServerInterface):
+
+ def get_allowed_auths(self, username):
+ return 'gssapi-with-mic'
+
+ def check_auth_gssapi_with_mic(self, username,
+ gss_authenticated=paramiko.AUTH_FAILED,
+ cc_file=None):
+ if gss_authenticated == paramiko.AUTH_SUCCESSFUL:
+ return paramiko.AUTH_SUCCESSFUL
+ return paramiko.AUTH_FAILED
+
+ def enable_auth_gssapi(self):
+ UseGSSAPI = True
+ GSSAPICleanupCredentials = True
+ return UseGSSAPI
+
+ def check_channel_request(self, kind, chanid):
+ return paramiko.OPEN_SUCCEEDED
+
+ def check_channel_exec_request(self, channel, command):
+ if command != 'yes':
+ return False
+ return True
+
+
+class GSSAuthTest(unittest.TestCase):
+
+ def init(username, hostname):
+ global krb5_principal, targ_name
+ krb5_principal = username
+ targ_name = hostname
+
+ init = staticmethod(init)
+
+ def setUp(self):
+ self.username = krb5_principal
+ self.hostname = socket.getfqdn(targ_name)
+ self.sockl = socket.socket()
+ self.sockl.bind((targ_name, 0))
+ self.sockl.listen(1)
+ self.addr, self.port = self.sockl.getsockname()
+ self.event = threading.Event()
+ thread = threading.Thread(target=self._run)
+ thread.start()
+
+ def tearDown(self):
+ for attr in "tc ts socks sockl".split():
+ if hasattr(self, attr):
+ getattr(self, attr).close()
+
+ def _run(self):
+ self.socks, addr = self.sockl.accept()
+ self.ts = paramiko.Transport(self.socks)
+ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ self.ts.add_server_key(host_key)
+ server = NullServer()
+ self.ts.start_server(self.event, server)
+
+ def test_1_gss_auth(self):
+ """
+ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication
+ (gssapi-with-mic) in client and server mode.
+ """
+ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+
+ self.tc = paramiko.SSHClient()
+ self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port),
+ 'ssh-rsa', public_host_key)
+ self.tc.connect(self.hostname, self.port, username=self.username,
+ gss_auth=True)
+
+ self.event.wait(1.0)
+ self.assert_(self.event.isSet())
+ self.assert_(self.ts.is_active())
+ self.assertEquals(self.username, self.ts.get_username())
+ self.assertEquals(True, self.ts.is_authenticated())
+
+ stdin, stdout, stderr = self.tc.exec_command('yes')
+ schan = self.ts.accept(1.0)
+
+ schan.send('Hello there.\n')
+ schan.send_stderr('This is on stderr.\n')
+ schan.close()
+
+ self.assertEquals('Hello there.\n', stdout.readline())
+ self.assertEquals('', stdout.readline())
+ self.assertEquals('This is on stderr.\n', stderr.readline())
+ self.assertEquals('', stderr.readline())
+
+ stdin.close()
+ stdout.close()
+ stderr.close()
diff --git a/tests/test_transport.py b/tests/test_transport.py
index ae0f5e01..50b1d86b 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -28,16 +28,19 @@ import socket
import time
import threading
import random
+import unittest
from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \
SSHException, ChannelException
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
-from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST
+from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, \
+ MIN_PACKET_SIZE, MAX_WINDOW_SIZE, \
+ DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE
from paramiko.py3compat import bytes
from paramiko.message import Message
from tests.loop import LoopSocket
-from tests.util import ParamikoTest, test_path
+from tests.util import test_path
LONG_BANNER = """\
@@ -57,7 +60,7 @@ class NullServer (ServerInterface):
paranoid_did_password = False
paranoid_did_public_key = False
paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key'))
-
+
def get_allowed_auths(self, username):
if username == 'slowdive':
return 'publickey,password'
@@ -80,24 +83,24 @@ class NullServer (ServerInterface):
def check_channel_shell_request(self, channel):
return True
-
+
def check_global_request(self, kind, msg):
self._global_request = kind
return False
-
+
def check_channel_x11_request(self, channel, single_connection, auth_protocol, auth_cookie, screen_number):
self._x11_single_connection = single_connection
self._x11_auth_protocol = auth_protocol
self._x11_auth_cookie = auth_cookie
self._x11_screen_number = screen_number
return True
-
+
def check_port_forward_request(self, addr, port):
self._listen = socket.socket()
self._listen.bind(('127.0.0.1', 0))
self._listen.listen(1)
return self._listen.getsockname()[1]
-
+
def cancel_port_forward_request(self, addr, port):
self._listen.close()
self._listen = None
@@ -107,7 +110,7 @@ class NullServer (ServerInterface):
return OPEN_SUCCEEDED
-class TransportTest(ParamikoTest):
+class TransportTest(unittest.TestCase):
def setUp(self):
self.socks = LoopSocket()
self.sockc = LoopSocket()
@@ -125,12 +128,12 @@ class TransportTest(ParamikoTest):
host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
-
+
if client_options is not None:
client_options(self.tc.get_security_options())
if server_options is not None:
server_options(self.ts.get_security_options())
-
+
event = threading.Event()
self.server = NullServer()
self.assertTrue(not event.isSet())
@@ -157,7 +160,7 @@ class TransportTest(ParamikoTest):
self.assertTrue(False)
except TypeError:
pass
-
+
def test_2_compute_key(self):
self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
self.tc.H = b'\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3'
@@ -210,7 +213,7 @@ class TransportTest(ParamikoTest):
event.wait(1.0)
self.assertTrue(event.isSet())
self.assertTrue(self.ts.is_active())
-
+
def test_4_special(self):
"""
verify that the client can demand odd handshake settings, and can
@@ -224,7 +227,7 @@ class TransportTest(ParamikoTest):
self.assertEqual('aes256-cbc', self.tc.remote_cipher)
self.assertEqual(12, self.tc.packetizer.get_mac_size_out())
self.assertEqual(12, self.tc.packetizer.get_mac_size_in())
-
+
self.tc.send_ignore(1024)
self.tc.renegotiate_keys()
self.ts.send_ignore(1024)
@@ -238,7 +241,7 @@ class TransportTest(ParamikoTest):
self.tc.set_keepalive(1)
time.sleep(2)
self.assertEqual('keepalive@lag.net', self.server._global_request)
-
+
def test_6_exec_command(self):
"""
verify that exec_command() does something reasonable.
@@ -252,7 +255,7 @@ class TransportTest(ParamikoTest):
self.assertTrue(False)
except SSHException:
pass
-
+
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -266,7 +269,7 @@ class TransportTest(ParamikoTest):
f = chan.makefile_stderr()
self.assertEqual('This is on stderr.\n', f.readline())
self.assertEqual('', f.readline())
-
+
# now try it with combined stdout/stderr
chan = self.tc.open_session()
chan.exec_command('yes')
@@ -275,7 +278,7 @@ class TransportTest(ParamikoTest):
schan.send_stderr('This is on stderr.\n')
schan.close()
- chan.set_combine_stderr(True)
+ chan.set_combine_stderr(True)
f = chan.makefile()
self.assertEqual('Hello there.\n', f.readline())
self.assertEqual('This is on stderr.\n', f.readline())
@@ -338,7 +341,7 @@ class TransportTest(ParamikoTest):
schan.shutdown_write()
schan.send_exit_status(23)
schan.close()
-
+
f = chan.makefile()
self.assertEqual('Hello there.\n', f.readline())
self.assertEqual('', f.readline())
@@ -360,14 +363,14 @@ class TransportTest(ParamikoTest):
chan.invoke_shell()
schan = self.ts.accept(1.0)
- # nothing should be ready
+ # nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
self.assertEqual([], w)
self.assertEqual([], e)
-
+
schan.send('hello\n')
-
+
# something should be ready now (give it 1 second to appear)
for i in range(10):
r, w, e = select.select([chan], [], [], 0.1)
@@ -379,7 +382,7 @@ class TransportTest(ParamikoTest):
self.assertEqual([], e)
self.assertEqual(b'hello\n', chan.recv(6))
-
+
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
@@ -387,7 +390,7 @@ class TransportTest(ParamikoTest):
self.assertEqual([], e)
schan.close()
-
+
# detect eof?
for i in range(10):
r, w, e = select.select([chan], [], [], 0.1)
@@ -398,14 +401,14 @@ class TransportTest(ParamikoTest):
self.assertEqual([], w)
self.assertEqual([], e)
self.assertEqual(bytes(), chan.recv(16))
-
+
# make sure the pipe is still open for now...
p = chan._pipe
self.assertEqual(False, p._closed)
chan.close()
# ...and now is closed.
self.assertEqual(True, p._closed)
-
+
def test_B_renegotiate(self):
"""
verify that a transport can correctly renegotiate mid-stream.
@@ -420,7 +423,7 @@ class TransportTest(ParamikoTest):
for i in range(20):
chan.send('x' * 1024)
chan.close()
-
+
# allow a few seconds for the rekeying to complete
for i in range(50):
if self.tc.H != self.tc.session_id:
@@ -459,28 +462,28 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
-
+
requested = []
def handler(c, addr_port):
addr, port = addr_port
requested.append((addr, port))
self.tc._queue_incoming_channel(c)
-
+
self.assertEqual(None, getattr(self.server, '_x11_screen_number', None))
cookie = chan.request_x11(0, single_connection=True, handler=handler)
self.assertEqual(0, self.server._x11_screen_number)
self.assertEqual('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol)
self.assertEqual(cookie, self.server._x11_auth_cookie)
self.assertEqual(True, self.server._x11_single_connection)
-
+
x11_server = self.ts.open_x11_channel(('localhost', 6093))
x11_client = self.tc.accept()
self.assertEqual('localhost', requested[0][0])
self.assertEqual(6093, requested[0][1])
-
+
x11_server.send('hello')
self.assertEqual(b'hello', x11_client.recv(5))
-
+
x11_server.close()
x11_client.close()
chan.close()
@@ -495,13 +498,13 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
-
+
requested = []
def handler(c, origin_addr_port, server_addr_port):
requested.append(origin_addr_port)
requested.append(server_addr_port)
self.tc._queue_incoming_channel(c)
-
+
port = self.tc.request_port_forward('127.0.0.1', 0, handler)
self.assertEqual(port, self.server._listen.getsockname()[1])
@@ -510,14 +513,14 @@ class TransportTest(ParamikoTest):
ss, _ = self.server._listen.accept()
sch = self.ts.open_forwarded_tcpip_channel(ss.getsockname(), ss.getpeername())
cch = self.tc.accept()
-
+
sch.send('hello')
self.assertEqual(b'hello', cch.recv(5))
sch.close()
cch.close()
ss.close()
cs.close()
-
+
# now cancel it.
self.tc.cancel_port_forward('127.0.0.1', port)
self.assertTrue(self.server._listen is None)
@@ -531,7 +534,7 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
-
+
# open a port on the "server" that the client will ask to forward to.
greeting_server = socket.socket()
greeting_server.bind(('127.0.0.1', 0))
@@ -542,13 +545,13 @@ class TransportTest(ParamikoTest):
sch = self.ts.accept(1.0)
cch = socket.socket()
cch.connect(self.server._tcpip_dest)
-
+
ss, _ = greeting_server.accept()
ss.send(b'Hello!\n')
ss.close()
sch.send(cch.recv(8192))
sch.close()
-
+
self.assertEqual(b'Hello!\n', cs.recv(7))
cs.close()
@@ -562,14 +565,14 @@ class TransportTest(ParamikoTest):
chan.invoke_shell()
schan = self.ts.accept(1.0)
- # nothing should be ready
+ # nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
self.assertEqual([], w)
self.assertEqual([], e)
-
+
schan.send_stderr('hello\n')
-
+
# something should be ready now (give it 1 second to appear)
for i in range(10):
r, w, e = select.select([chan], [], [], 0.1)
@@ -581,7 +584,7 @@ class TransportTest(ParamikoTest):
self.assertEqual([], e)
self.assertEqual(b'hello\n', chan.recv_stderr(6))
-
+
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
@@ -603,12 +606,13 @@ class TransportTest(ParamikoTest):
self.assertEqual(chan.send_ready(), True)
total = 0
K = '*' * 1024
- while total < 1024 * 1024:
+ limit = 1+(64 * 2 ** 15)
+ while total < limit:
chan.send(K)
total += len(K)
if not chan.send_ready():
break
- self.assertTrue(total < 1024 * 1024)
+ self.assertTrue(total < limit)
schan.close()
chan.close()
@@ -617,10 +621,10 @@ class TransportTest(ParamikoTest):
def test_I_rekey_deadlock(self):
"""
Regression test for deadlock when in-transit messages are received after MSG_KEXINIT is sent
-
+
Note: When this test fails, it may leak threads.
"""
-
+
# Test for an obscure deadlocking bug that can occur if we receive
# certain messages while initiating a key exchange.
#
@@ -637,7 +641,7 @@ class TransportTest(ParamikoTest):
# NeedRekeyException.
# 4. In response to NeedRekeyException, the transport thread sends
# MSG_KEXINIT to the remote host.
- #
+ #
# On the remote host (using any SSH implementation):
# 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST is sent.
# 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is sent.
@@ -672,7 +676,7 @@ class TransportTest(ParamikoTest):
self.done_event = done_event
self.watchdog_event = threading.Event()
self.last = None
-
+
def run(self):
try:
for i in range(1, 1+self.iterations):
@@ -684,7 +688,7 @@ class TransportTest(ParamikoTest):
finally:
self.done_event.set()
self.watchdog_event.set()
-
+
class ReceiveThread(threading.Thread):
def __init__(self, chan, done_event):
threading.Thread.__init__(self, None, None, self.__class__.__name__)
@@ -692,7 +696,7 @@ class TransportTest(ParamikoTest):
self.chan = chan
self.done_event = done_event
self.watchdog_event = threading.Event()
-
+
def run(self):
try:
while not self.done_event.isSet():
@@ -705,10 +709,10 @@ class TransportTest(ParamikoTest):
finally:
self.done_event.set()
self.watchdog_event.set()
-
+
self.setup_test_server()
self.ts.packetizer.REKEY_BYTES = 2048
-
+
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -730,7 +734,7 @@ class TransportTest(ParamikoTest):
self._send_message(m2)
return _negotiate_keys(self, m)
self.tc._handler_table[MSG_KEXINIT] = _negotiate_keys_wrapper
-
+
# Parameters for the test
iterations = 500 # The deadlock does not happen every time, but it
# should after many iterations.
@@ -742,12 +746,12 @@ class TransportTest(ParamikoTest):
# Start the sending thread
st = SendThread(schan, iterations, done_event)
st.start()
-
+
# Start the receiving thread
rt = ReceiveThread(chan, done_event)
rt.start()
- # Act as a watchdog timer, checking
+ # Act as a watchdog timer, checking
deadlocked = False
while not deadlocked and not done_event.isSet():
for event in (st.watchdog_event, rt.watchdog_event):
@@ -758,7 +762,7 @@ class TransportTest(ParamikoTest):
deadlocked = True
break
event.clear()
-
+
# Tell the threads to stop (if they haven't already stopped). Note
# that if one or more threads are deadlocked, they might hang around
# forever (until the process exits).
@@ -770,3 +774,21 @@ class TransportTest(ParamikoTest):
# Close the channels
schan.close()
chan.close()
+
+ def test_J_sanitze_packet_size(self):
+ """
+ verify that we conform to the rfc of packet and window sizes.
+ """
+ for val, correct in [(32767, MIN_PACKET_SIZE),
+ (None, DEFAULT_MAX_PACKET_SIZE),
+ (2**32, MAX_WINDOW_SIZE)]:
+ self.assertEqual(self.tc._sanitize_packet_size(val), correct)
+
+ def test_K_sanitze_window_size(self):
+ """
+ verify that we conform to the rfc of packet and window sizes.
+ """
+ for val, correct in [(32767, MIN_PACKET_SIZE),
+ (None, DEFAULT_WINDOW_SIZE),
+ (2**32, MAX_WINDOW_SIZE)]:
+ self.assertEqual(self.tc._sanitize_window_size(val), correct)
diff --git a/tests/test_util.py b/tests/test_util.py
index 394ea553..35e15765 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -24,13 +24,12 @@ from binascii import hexlify
import errno
import os
from hashlib import sha1
+import unittest
import paramiko.util
from paramiko.util import lookup_ssh_host_config as host_config
from paramiko.py3compat import StringIO, byte_ord
-from tests.util import ParamikoTest
-
test_config_file = """\
Host *
User robey
@@ -41,7 +40,12 @@ Host *.example.com
\tUser bjork
Port=3333
Host *
- \t \t Crazy something dumb
+"""
+
+dont_strip_whitespace_please = "\t \t Crazy something dumb "
+
+test_config_file += dont_strip_whitespace_please
+test_config_file += """
Host spoo.example.com
Crazy something else
"""
@@ -60,7 +64,7 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
from paramiko import *
-class UtilTest(ParamikoTest):
+class UtilTest(unittest.TestCase):
def test_1_import(self):
"""
verify that all the classes can be imported from paramiko.
@@ -334,6 +338,11 @@ IdentityFile something_%l_using_fqdn
config = paramiko.util.parse_ssh_config(StringIO(test_config))
assert config.lookup('meh') # will die during lookup() if bug regresses
+ def test_clamp_value(self):
+ self.assertEqual(32768, paramiko.util.clamp_value(32767, 32768, 32769))
+ self.assertEqual(32767, paramiko.util.clamp_value(32767, 32765, 32769))
+ self.assertEqual(32769, paramiko.util.clamp_value(32767, 32770, 32769))
+
def test_13_config_dos_crlf_succeeds(self):
config_file = StringIO("host abcqwerty\r\nHostName 127.0.0.1\r\n")
config = paramiko.SSHConfig()
diff --git a/tests/util.py b/tests/util.py
index 66d2696c..b546a7e1 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,17 +1,7 @@
import os
-import unittest
root_path = os.path.dirname(os.path.realpath(__file__))
-
-class ParamikoTest(unittest.TestCase):
- # for Python 2.3 and below
- if not hasattr(unittest.TestCase, 'assertTrue'):
- assertTrue = unittest.TestCase.failUnless
- if not hasattr(unittest.TestCase, 'assertFalse'):
- assertFalse = unittest.TestCase.failIf
-
-
def test_path(filename):
return os.path.join(root_path, filename)