summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2017-08-28 18:50:48 -0700
committerJeff Forcier <jeff@bitprophet.org>2017-08-28 18:50:48 -0700
commit38cf578bb2c06c600eaf56d4380fcf84ed399a08 (patch)
treec54fdd3f7d8b6d42c63a2139a5d941656d4e6686
parent84d29dd4ea9d957d778207078c7cfed1d4bf9d46 (diff)
downloadparamiko-38cf578bb2c06c600eaf56d4380fcf84ed399a08.tar.gz
Update first few stub tests + required test-server and PublicBlob impl bits
-rw-r--r--paramiko/pkey.py7
-rw-r--r--tests/test_client.py50
2 files changed, 44 insertions, 13 deletions
diff --git a/paramiko/pkey.py b/paramiko/pkey.py
index 948152fa..1683c35c 100644
--- a/paramiko/pkey.py
+++ b/paramiko/pkey.py
@@ -477,3 +477,10 @@ class PublicBlob(object):
return '%s public key/certificate - %s' % (self.key_type, self.comment)
else:
return '%s public key/certificate' % self.key_type
+
+ def __eq__(self, other):
+ # Just piggyback on Message/BytesIO, since both of these should be one.
+ return self and other and self.key_blob == other.key_blob
+
+ def __ne__(self, other):
+ return not self == other
diff --git a/tests/test_client.py b/tests/test_client.py
index cfffa030..da6cb58d 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -35,6 +35,7 @@ import time
from tests.util import test_path
import paramiko
+from paramiko.pkey import PublicBlob
from paramiko.common import PY2
from paramiko.ssh_exception import SSHException, AuthenticationException
@@ -47,10 +48,12 @@ FINGERPRINTS = {
}
-class NullServer (paramiko.ServerInterface):
+class NullServer(paramiko.ServerInterface):
def __init__(self, *args, **kwargs):
# Allow tests to enable/disable specific key types
self.__allowed_keys = kwargs.pop('allowed_keys', [])
+ # And allow them to set a (single...meh) expected public blob (cert)
+ self.__expected_public_blob = kwargs.pop('public_blob', None)
super(NullServer, self).__init__(*args, **kwargs)
def get_allowed_auths(self, username):
@@ -71,12 +74,18 @@ class NullServer (paramiko.ServerInterface):
expected = FINGERPRINTS[key.get_name()]
except KeyError:
return paramiko.AUTH_FAILED
- if (
+ # Base check: allowed auth type & fingerprint matches
+ happy = (
key.get_name() in self.__allowed_keys and
key.get_fingerprint() == expected
+ )
+ # Secondary check: if test wants assertions about cert data
+ if (
+ self.__expected_public_blob is not None and
+ key.public_blob != self.__expected_public_blob
):
- return paramiko.AUTH_SUCCESSFUL
- return paramiko.AUTH_FAILED
+ happy = False
+ return paramiko.AUTH_SUCCESSFUL if happy else paramiko.AUTH_FAILED
def check_channel_request(self, kind, chanid):
return paramiko.OPEN_SUCCEEDED
@@ -117,7 +126,7 @@ class SSHClientTest (unittest.TestCase):
if hasattr(self, attr):
getattr(self, attr).close()
- def _run(self, allowed_keys=None, delay=0):
+ def _run(self, allowed_keys=None, delay=0, public_blob=None):
if allowed_keys is None:
allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
@@ -128,7 +137,7 @@ class SSHClientTest (unittest.TestCase):
keypath = test_path('test_ecdsa_256.key')
host_key = paramiko.ECDSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
- server = NullServer(allowed_keys=allowed_keys)
+ server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob)
if delay:
time.sleep(delay)
self.ts.start_server(self.event, server)
@@ -140,7 +149,9 @@ class SSHClientTest (unittest.TestCase):
The exception is ``allowed_keys`` which is stripped and handed to the
``NullServer`` used for testing.
"""
- run_kwargs = {'allowed_keys': kwargs.pop('allowed_keys', None)}
+ run_kwargs = {}
+ for key in ('allowed_keys', 'public_blob'):
+ run_kwargs[key] = kwargs.pop(key, None)
# Server setup
threading.Thread(target=self._run, kwargs=run_kwargs).start()
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
@@ -249,14 +260,27 @@ class SSHClientTest (unittest.TestCase):
)
def test_certs_allowed_as_key_filename_values(self):
- # TODO: connect() with key_filename containing a cert file, loads up
- # both the cert and its implied key. This is new functionality on top
- # of the OpenSSH-compatible stuff.
- assert False
+ # NOTE: giving cert path here, not key path. (Key path test is below.
+ # They're similar except for which path is given; the expected auth and
+ # server-side behavior is 100% identical.)
+ cert_path = test_path('test_rsa.key-cert.pub')
+ self._test_connection(
+ key_filename=cert_path,
+ public_blob=PublicBlob.from_file(cert_path),
+ )
def test_certs_implicitly_loaded_alongside_key_filename_keys(self):
- # TODO: same but with just the key paths, i.e. vanilla OpenSSH behavior
- assert False
+ # NOTE: a regular test_connection() w/ test_rsa.key would incidentally
+ # test this (because test_rsa.key-cert.pub exists) but incidental tests
+ # stink, so NullServer and friends were updated to allow assertions
+ # about the server-side key object's public blob. Thus, we can prove
+ # that a specific cert was found, along with regular authorization
+ # succeeding proving that the overall flow works.
+ key_path = test_path('test_rsa.key')
+ self._test_connection(
+ key_filename=key_path,
+ public_blob=PublicBlob.from_file('{0}-cert.pub'.format(key_path)),
+ )
def test_default_key_locations_trigger_cert_loads_if_found(self):
# TODO: what it says on the tin: ~/.ssh/id_rsa tries to load