diff options
author | Jeff Forcier <jeff@bitprophet.org> | 2017-11-22 12:31:07 -0800 |
---|---|---|
committer | Jeff Forcier <jeff@bitprophet.org> | 2017-12-31 16:10:47 -0500 |
commit | 057a4cfa8a0bc9518cf4d2bff0b4798dff6be1a0 (patch) | |
tree | 06faad48c75743b659756a806a19989270a65175 | |
parent | 8df20798dda78d8ab1e500c26a7f39b802cec4c4 (diff) | |
download | paramiko-057a4cfa8a0bc9518cf4d2bff0b4798dff6be1a0.tar.gz |
Refactor some presumably earlier-copypasta'd test server classes
-rw-r--r-- | tests/_util.py | 138 | ||||
-rw-r--r-- | tests/test_auth.py | 89 | ||||
-rw-r--r-- | tests/test_client.py | 66 |
3 files changed, 142 insertions, 151 deletions
diff --git a/tests/_util.py b/tests/_util.py index 4ca02374..d69f855a 100644 --- a/tests/_util.py +++ b/tests/_util.py @@ -1,8 +1,18 @@ from os.path import dirname, realpath, join +from time import sleep import pytest -from paramiko.py3compat import builtins +from paramiko import ( + AUTH_FAILED, + AUTH_PARTIALLY_SUCCESSFUL, + AUTH_SUCCESSFUL, + OPEN_SUCCEEDED, + DSSKey, + InteractiveQuery, + ServerInterface, +) +from paramiko.py3compat import builtins, u def _support(filename): @@ -25,3 +35,129 @@ def needs_builtin(name): slow = pytest.mark.slow + + +_pwd = u('\u2022') + + +FINGERPRINTS = { + 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c', # noqa + 'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5', # noqa + 'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60', # noqa + 'ssh-ed25519': b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80', +} + + +class NullServer (ServerInterface): + paranoid_did_password = False + paranoid_did_public_key = False + paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key')) + + def __init__(self, *args, **kwargs): + # Allow tests to enable/disable specific key types + self.__allowed_keys = kwargs.pop('allowed_keys', []) + # And allow them to set a (single...meh) expected public blob (cert) + self.__expected_public_blob = kwargs.pop('public_blob', None) + super(NullServer, self).__init__(*args, **kwargs) + + def get_allowed_auths(self, username): + if username == 'slowdive': + return 'publickey,password' + if username == 'paranoid': + if ( + not self.paranoid_did_password and + not self.paranoid_did_public_key + ): + return 'publickey,password' + elif self.paranoid_did_password: + return 'publickey' + else: + return 'password' + if username == 'commie': + return 'keyboard-interactive' + if username == 'utf8': + return 'password' + if username == 'non-utf8': + return 'password' + return 'publickey' + + def check_auth_password(self, username, password): + if (username == 'slowdive') and (password == 'pygmalion'): + return AUTH_SUCCESSFUL + if (username == 'paranoid') and (password == 'paranoid'): + # 2-part auth (even openssh doesn't support this) + self.paranoid_did_password = True + if self.paranoid_did_public_key: + return AUTH_SUCCESSFUL + return AUTH_PARTIALLY_SUCCESSFUL + if (username == 'utf8') and (password == _pwd): + return AUTH_SUCCESSFUL + if (username == 'non-utf8') and (password == '\xff'): + return AUTH_SUCCESSFUL + if username == 'bad-server': + raise Exception("Ack!") + if (username == 'slowdive') and (password == 'unresponsive-server'): + sleep(5) + return AUTH_SUCCESSFUL + return AUTH_FAILED + + def check_auth_publickey(self, username, key): + # NOTE: this is for an existing multipart auth test. + if (username == 'paranoid') and (key == self.paranoid_key): + # 2-part auth + self.paranoid_did_public_key = True + if self.paranoid_did_password: + return AUTH_SUCCESSFUL + return AUTH_PARTIALLY_SUCCESSFUL + # NOTE: these bits below are mostly used by client tests or + # straightforward key tests. + try: + expected = FINGERPRINTS[key.get_name()] + except KeyError: + return AUTH_FAILED + # Base check: allowed auth type & fingerprint matches + happy = ( + key.get_name() in self.__allowed_keys and + key.get_fingerprint() == expected + ) + # Secondary check: if test wants assertions about cert data + if ( + self.__expected_public_blob is not None and + key.public_blob != self.__expected_public_blob + ): + happy = False + return AUTH_SUCCESSFUL if happy else AUTH_FAILED + + def check_auth_interactive(self, username, submethods): + if username == 'commie': + self.username = username + return InteractiveQuery( + 'password', + 'Please enter a password.', + ('Password', False), + ) + return AUTH_FAILED + + def check_auth_interactive_response(self, responses): + if self.username == 'commie': + if (len(responses) == 1) and (responses[0] == 'cat'): + return AUTH_SUCCESSFUL + return AUTH_FAILED + + def check_channel_request(self, kind, chanid): + return OPEN_SUCCEEDED + + def check_channel_exec_request(self, channel, command): + if command != b'yes': + return False + return True + + def check_channel_env_request(self, channel, name, value): + if name == 'INVALID_ENV': + return False + + if not hasattr(channel, 'env'): + setattr(channel, 'env', {}) + + channel.env[name] = value + return True diff --git a/tests/test_auth.py b/tests/test_auth.py index 0b90634b..74719ac7 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -23,95 +23,14 @@ Some unit tests for authenticating over a Transport. import sys import threading import unittest -from time import sleep from paramiko import ( - Transport, ServerInterface, RSAKey, DSSKey, BadAuthenticationType, - InteractiveQuery, AuthenticationException, + Transport, RSAKey, DSSKey, BadAuthenticationType, + AuthenticationException, ) -from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL -from paramiko.py3compat import u from ._loop import LoopSocket -from ._util import _support, slow - - -# TODO: see what other tests in other modules might want to move in here - - -_pwd = u('\u2022') - - -class NullServer (ServerInterface): - paranoid_did_password = False - paranoid_did_public_key = False - paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key')) - - def get_allowed_auths(self, username): - if username == 'slowdive': - return 'publickey,password' - if username == 'paranoid': - if ( - not self.paranoid_did_password and - not self.paranoid_did_public_key - ): - return 'publickey,password' - elif self.paranoid_did_password: - return 'publickey' - else: - return 'password' - if username == 'commie': - return 'keyboard-interactive' - if username == 'utf8': - return 'password' - if username == 'non-utf8': - return 'password' - return 'publickey' - - def check_auth_password(self, username, password): - if (username == 'slowdive') and (password == 'pygmalion'): - return AUTH_SUCCESSFUL - if (username == 'paranoid') and (password == 'paranoid'): - # 2-part auth (even openssh doesn't support this) - self.paranoid_did_password = True - if self.paranoid_did_public_key: - return AUTH_SUCCESSFUL - return AUTH_PARTIALLY_SUCCESSFUL - if (username == 'utf8') and (password == _pwd): - return AUTH_SUCCESSFUL - if (username == 'non-utf8') and (password == '\xff'): - return AUTH_SUCCESSFUL - if username == 'bad-server': - raise Exception("Ack!") - if username == 'unresponsive-server': - sleep(5) - return AUTH_SUCCESSFUL - return AUTH_FAILED - - def check_auth_publickey(self, username, key): - if (username == 'paranoid') and (key == self.paranoid_key): - # 2-part auth - self.paranoid_did_public_key = True - if self.paranoid_did_password: - return AUTH_SUCCESSFUL - return AUTH_PARTIALLY_SUCCESSFUL - return AUTH_FAILED - - def check_auth_interactive(self, username, submethods): - if username == 'commie': - self.username = username - return InteractiveQuery( - 'password', - 'Please enter a password.', - ('Password', False), - ) - return AUTH_FAILED - - def check_auth_interactive_response(self, responses): - if self.username == 'commie': - if (len(responses) == 1) and (responses[0] == 'cat'): - return AUTH_SUCCESSFUL - return AUTH_FAILED +from ._util import _support, slow, NullServer, _pwd class TestAuth(unittest.TestCase): @@ -188,7 +107,7 @@ class TestEdgeCaseFailures(TestAuth): self.start_server() self.tc.connect() try: - self.tc.auth_password('unresponsive-server', 'hello') + self.tc.auth_password('slowdive', 'unresponsive-server') except: etype, evalue, etb = sys.exc_info() self.assertTrue(issubclass(etype, AuthenticationException)) diff --git a/tests/test_client.py b/tests/test_client.py index 75da3b85..a86b37ca 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -40,77 +40,13 @@ from paramiko.pkey import PublicBlob from paramiko.common import PY2 from paramiko.ssh_exception import SSHException, AuthenticationException -from ._util import _support, slow +from ._util import _support, slow, NullServer, FINGERPRINTS requires_gss_auth = unittest.skipUnless( paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available" ) -FINGERPRINTS = { - 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c', - 'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5', - 'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60', - 'ssh-ed25519': b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80', -} - - -class NullServer(paramiko.ServerInterface): - def __init__(self, *args, **kwargs): - # Allow tests to enable/disable specific key types - self.__allowed_keys = kwargs.pop('allowed_keys', []) - # And allow them to set a (single...meh) expected public blob (cert) - self.__expected_public_blob = kwargs.pop('public_blob', None) - super(NullServer, self).__init__(*args, **kwargs) - - def get_allowed_auths(self, username): - if username == 'slowdive': - return 'publickey,password' - return 'publickey' - - def check_auth_password(self, username, password): - if (username == 'slowdive') and (password == 'pygmalion'): - return paramiko.AUTH_SUCCESSFUL - if (username == 'slowdive') and (password == 'unresponsive-server'): - time.sleep(5) - return paramiko.AUTH_SUCCESSFUL - return paramiko.AUTH_FAILED - - def check_auth_publickey(self, username, key): - try: - expected = FINGERPRINTS[key.get_name()] - except KeyError: - return paramiko.AUTH_FAILED - # Base check: allowed auth type & fingerprint matches - happy = ( - key.get_name() in self.__allowed_keys and - key.get_fingerprint() == expected - ) - # Secondary check: if test wants assertions about cert data - if ( - self.__expected_public_blob is not None and - key.public_blob != self.__expected_public_blob - ): - happy = False - return paramiko.AUTH_SUCCESSFUL if happy else paramiko.AUTH_FAILED - - def check_channel_request(self, kind, chanid): - return paramiko.OPEN_SUCCEEDED - - def check_channel_exec_request(self, channel, command): - if command != b'yes': - return False - return True - - def check_channel_env_request(self, channel, name, value): - if name == 'INVALID_ENV': - return False - - if not hasattr(channel, 'env'): - setattr(channel, 'env', {}) - - channel.env[name] = value - return True class ClientTest(unittest.TestCase): |