summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2017-11-22 12:31:07 -0800
committerJeff Forcier <jeff@bitprophet.org>2017-12-31 16:10:47 -0500
commit057a4cfa8a0bc9518cf4d2bff0b4798dff6be1a0 (patch)
tree06faad48c75743b659756a806a19989270a65175
parent8df20798dda78d8ab1e500c26a7f39b802cec4c4 (diff)
downloadparamiko-057a4cfa8a0bc9518cf4d2bff0b4798dff6be1a0.tar.gz
Refactor some presumably earlier-copypasta'd test server classes
-rw-r--r--tests/_util.py138
-rw-r--r--tests/test_auth.py89
-rw-r--r--tests/test_client.py66
3 files changed, 142 insertions, 151 deletions
diff --git a/tests/_util.py b/tests/_util.py
index 4ca02374..d69f855a 100644
--- a/tests/_util.py
+++ b/tests/_util.py
@@ -1,8 +1,18 @@
from os.path import dirname, realpath, join
+from time import sleep
import pytest
-from paramiko.py3compat import builtins
+from paramiko import (
+ AUTH_FAILED,
+ AUTH_PARTIALLY_SUCCESSFUL,
+ AUTH_SUCCESSFUL,
+ OPEN_SUCCEEDED,
+ DSSKey,
+ InteractiveQuery,
+ ServerInterface,
+)
+from paramiko.py3compat import builtins, u
def _support(filename):
@@ -25,3 +35,129 @@ def needs_builtin(name):
slow = pytest.mark.slow
+
+
+_pwd = u('\u2022')
+
+
+FINGERPRINTS = {
+ 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c', # noqa
+ 'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5', # noqa
+ 'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60', # noqa
+ 'ssh-ed25519': b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80',
+}
+
+
+class NullServer (ServerInterface):
+ paranoid_did_password = False
+ paranoid_did_public_key = False
+ paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key'))
+
+ def __init__(self, *args, **kwargs):
+ # Allow tests to enable/disable specific key types
+ self.__allowed_keys = kwargs.pop('allowed_keys', [])
+ # And allow them to set a (single...meh) expected public blob (cert)
+ self.__expected_public_blob = kwargs.pop('public_blob', None)
+ super(NullServer, self).__init__(*args, **kwargs)
+
+ def get_allowed_auths(self, username):
+ if username == 'slowdive':
+ return 'publickey,password'
+ if username == 'paranoid':
+ if (
+ not self.paranoid_did_password and
+ not self.paranoid_did_public_key
+ ):
+ return 'publickey,password'
+ elif self.paranoid_did_password:
+ return 'publickey'
+ else:
+ return 'password'
+ if username == 'commie':
+ return 'keyboard-interactive'
+ if username == 'utf8':
+ return 'password'
+ if username == 'non-utf8':
+ return 'password'
+ return 'publickey'
+
+ def check_auth_password(self, username, password):
+ if (username == 'slowdive') and (password == 'pygmalion'):
+ return AUTH_SUCCESSFUL
+ if (username == 'paranoid') and (password == 'paranoid'):
+ # 2-part auth (even openssh doesn't support this)
+ self.paranoid_did_password = True
+ if self.paranoid_did_public_key:
+ return AUTH_SUCCESSFUL
+ return AUTH_PARTIALLY_SUCCESSFUL
+ if (username == 'utf8') and (password == _pwd):
+ return AUTH_SUCCESSFUL
+ if (username == 'non-utf8') and (password == '\xff'):
+ return AUTH_SUCCESSFUL
+ if username == 'bad-server':
+ raise Exception("Ack!")
+ if (username == 'slowdive') and (password == 'unresponsive-server'):
+ sleep(5)
+ return AUTH_SUCCESSFUL
+ return AUTH_FAILED
+
+ def check_auth_publickey(self, username, key):
+ # NOTE: this is for an existing multipart auth test.
+ if (username == 'paranoid') and (key == self.paranoid_key):
+ # 2-part auth
+ self.paranoid_did_public_key = True
+ if self.paranoid_did_password:
+ return AUTH_SUCCESSFUL
+ return AUTH_PARTIALLY_SUCCESSFUL
+ # NOTE: these bits below are mostly used by client tests or
+ # straightforward key tests.
+ try:
+ expected = FINGERPRINTS[key.get_name()]
+ except KeyError:
+ return AUTH_FAILED
+ # Base check: allowed auth type & fingerprint matches
+ happy = (
+ key.get_name() in self.__allowed_keys and
+ key.get_fingerprint() == expected
+ )
+ # Secondary check: if test wants assertions about cert data
+ if (
+ self.__expected_public_blob is not None and
+ key.public_blob != self.__expected_public_blob
+ ):
+ happy = False
+ return AUTH_SUCCESSFUL if happy else AUTH_FAILED
+
+ def check_auth_interactive(self, username, submethods):
+ if username == 'commie':
+ self.username = username
+ return InteractiveQuery(
+ 'password',
+ 'Please enter a password.',
+ ('Password', False),
+ )
+ return AUTH_FAILED
+
+ def check_auth_interactive_response(self, responses):
+ if self.username == 'commie':
+ if (len(responses) == 1) and (responses[0] == 'cat'):
+ return AUTH_SUCCESSFUL
+ return AUTH_FAILED
+
+ def check_channel_request(self, kind, chanid):
+ return OPEN_SUCCEEDED
+
+ def check_channel_exec_request(self, channel, command):
+ if command != b'yes':
+ return False
+ return True
+
+ def check_channel_env_request(self, channel, name, value):
+ if name == 'INVALID_ENV':
+ return False
+
+ if not hasattr(channel, 'env'):
+ setattr(channel, 'env', {})
+
+ channel.env[name] = value
+ return True
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 0b90634b..74719ac7 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -23,95 +23,14 @@ Some unit tests for authenticating over a Transport.
import sys
import threading
import unittest
-from time import sleep
from paramiko import (
- Transport, ServerInterface, RSAKey, DSSKey, BadAuthenticationType,
- InteractiveQuery, AuthenticationException,
+ Transport, RSAKey, DSSKey, BadAuthenticationType,
+ AuthenticationException,
)
-from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
-from paramiko.py3compat import u
from ._loop import LoopSocket
-from ._util import _support, slow
-
-
-# TODO: see what other tests in other modules might want to move in here
-
-
-_pwd = u('\u2022')
-
-
-class NullServer (ServerInterface):
- paranoid_did_password = False
- paranoid_did_public_key = False
- paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key'))
-
- def get_allowed_auths(self, username):
- if username == 'slowdive':
- return 'publickey,password'
- if username == 'paranoid':
- if (
- not self.paranoid_did_password and
- not self.paranoid_did_public_key
- ):
- return 'publickey,password'
- elif self.paranoid_did_password:
- return 'publickey'
- else:
- return 'password'
- if username == 'commie':
- return 'keyboard-interactive'
- if username == 'utf8':
- return 'password'
- if username == 'non-utf8':
- return 'password'
- return 'publickey'
-
- def check_auth_password(self, username, password):
- if (username == 'slowdive') and (password == 'pygmalion'):
- return AUTH_SUCCESSFUL
- if (username == 'paranoid') and (password == 'paranoid'):
- # 2-part auth (even openssh doesn't support this)
- self.paranoid_did_password = True
- if self.paranoid_did_public_key:
- return AUTH_SUCCESSFUL
- return AUTH_PARTIALLY_SUCCESSFUL
- if (username == 'utf8') and (password == _pwd):
- return AUTH_SUCCESSFUL
- if (username == 'non-utf8') and (password == '\xff'):
- return AUTH_SUCCESSFUL
- if username == 'bad-server':
- raise Exception("Ack!")
- if username == 'unresponsive-server':
- sleep(5)
- return AUTH_SUCCESSFUL
- return AUTH_FAILED
-
- def check_auth_publickey(self, username, key):
- if (username == 'paranoid') and (key == self.paranoid_key):
- # 2-part auth
- self.paranoid_did_public_key = True
- if self.paranoid_did_password:
- return AUTH_SUCCESSFUL
- return AUTH_PARTIALLY_SUCCESSFUL
- return AUTH_FAILED
-
- def check_auth_interactive(self, username, submethods):
- if username == 'commie':
- self.username = username
- return InteractiveQuery(
- 'password',
- 'Please enter a password.',
- ('Password', False),
- )
- return AUTH_FAILED
-
- def check_auth_interactive_response(self, responses):
- if self.username == 'commie':
- if (len(responses) == 1) and (responses[0] == 'cat'):
- return AUTH_SUCCESSFUL
- return AUTH_FAILED
+from ._util import _support, slow, NullServer, _pwd
class TestAuth(unittest.TestCase):
@@ -188,7 +107,7 @@ class TestEdgeCaseFailures(TestAuth):
self.start_server()
self.tc.connect()
try:
- self.tc.auth_password('unresponsive-server', 'hello')
+ self.tc.auth_password('slowdive', 'unresponsive-server')
except:
etype, evalue, etb = sys.exc_info()
self.assertTrue(issubclass(etype, AuthenticationException))
diff --git a/tests/test_client.py b/tests/test_client.py
index 75da3b85..a86b37ca 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -40,77 +40,13 @@ from paramiko.pkey import PublicBlob
from paramiko.common import PY2
from paramiko.ssh_exception import SSHException, AuthenticationException
-from ._util import _support, slow
+from ._util import _support, slow, NullServer, FINGERPRINTS
requires_gss_auth = unittest.skipUnless(
paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available"
)
-FINGERPRINTS = {
- 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c',
- 'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5',
- 'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60',
- 'ssh-ed25519': b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80',
-}
-
-
-class NullServer(paramiko.ServerInterface):
- def __init__(self, *args, **kwargs):
- # Allow tests to enable/disable specific key types
- self.__allowed_keys = kwargs.pop('allowed_keys', [])
- # And allow them to set a (single...meh) expected public blob (cert)
- self.__expected_public_blob = kwargs.pop('public_blob', None)
- super(NullServer, self).__init__(*args, **kwargs)
-
- def get_allowed_auths(self, username):
- if username == 'slowdive':
- return 'publickey,password'
- return 'publickey'
-
- def check_auth_password(self, username, password):
- if (username == 'slowdive') and (password == 'pygmalion'):
- return paramiko.AUTH_SUCCESSFUL
- if (username == 'slowdive') and (password == 'unresponsive-server'):
- time.sleep(5)
- return paramiko.AUTH_SUCCESSFUL
- return paramiko.AUTH_FAILED
-
- def check_auth_publickey(self, username, key):
- try:
- expected = FINGERPRINTS[key.get_name()]
- except KeyError:
- return paramiko.AUTH_FAILED
- # Base check: allowed auth type & fingerprint matches
- happy = (
- key.get_name() in self.__allowed_keys and
- key.get_fingerprint() == expected
- )
- # Secondary check: if test wants assertions about cert data
- if (
- self.__expected_public_blob is not None and
- key.public_blob != self.__expected_public_blob
- ):
- happy = False
- return paramiko.AUTH_SUCCESSFUL if happy else paramiko.AUTH_FAILED
-
- def check_channel_request(self, kind, chanid):
- return paramiko.OPEN_SUCCEEDED
-
- def check_channel_exec_request(self, channel, command):
- if command != b'yes':
- return False
- return True
-
- def check_channel_env_request(self, channel, name, value):
- if name == 'INVALID_ENV':
- return False
-
- if not hasattr(channel, 'env'):
- setattr(channel, 'env', {})
-
- channel.env[name] = value
- return True
class ClientTest(unittest.TestCase):