summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2023-04-27 18:00:16 -0400
committerJeff Forcier <jeff@bitprophet.org>2023-05-05 12:27:20 -0400
commit162213fa1a4551bd955134c97ca5276a5f29e907 (patch)
tree5a70c153853fa2114c7f67523cb59db63ecfc5d8
parent9ece9fcc8d8e5d22de0a65fcc44374a53c31dfdb (diff)
downloadparamiko-162213fa1a4551bd955134c97ca5276a5f29e907.tar.gz
Migrate rest of main keys and update suite to be more pytest-relaxed compat
Main branch as of today: 350 passed, 21 skipped, 52 deselected, 3 warnings in 11.10s This branch as of this commit: 361 passed, 21 skipped, 52 deselected, 3 warnings in 10.51s Of those 11 "new" tests, 8 are ones I wrote (tests/pkey.py). Hard to figure out what the other 3 are given pytest-relaxed's output is very different from regular verbose pytest. oops.
-rw-r--r--paramiko/pkey.py54
-rw-r--r--pytest.ini5
-rw-r--r--tests/_loop.py (renamed from tests/loop.py)0
-rw-r--r--tests/_stub_sftp.py (renamed from tests/stub_sftp.py)0
-rw-r--r--tests/_support/ecdsa-256.key (renamed from tests/test_ecdsa_256.key)0
-rw-r--r--tests/_support/ed25519.key (renamed from tests/test_ed25519.key)0
-rw-r--r--tests/_support/rsa.key (renamed from tests/test_rsa.key)0
-rw-r--r--tests/_util.py (renamed from tests/util.py)0
-rw-r--r--tests/conftest.py63
-rw-r--r--tests/pkey.py13
-rw-r--r--tests/test_auth.py6
-rw-r--r--tests/test_client.py39
-rw-r--r--tests/test_config.py2
-rw-r--r--tests/test_file.py2
-rw-r--r--tests/test_gssapi.py2
-rw-r--r--tests/test_kex_gss.py6
-rw-r--r--tests/test_packetizer.py2
-rw-r--r--tests/test_pkey.py35
-rw-r--r--tests/test_sftp.py4
-rw-r--r--tests/test_sftp_big.py2
-rw-r--r--tests/test_ssh_gss.py8
-rw-r--r--tests/test_transport.py34
22 files changed, 199 insertions, 78 deletions
diff --git a/paramiko/pkey.py b/paramiko/pkey.py
index 91a33bed..98bd82cd 100644
--- a/paramiko/pkey.py
+++ b/paramiko/pkey.py
@@ -33,6 +33,7 @@ import bcrypt
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.ciphers import algorithms, modes, Cipher
+from cryptography.hazmat.primitives import asymmetric
from paramiko import util
from paramiko.util import u, b
@@ -64,10 +65,13 @@ class UnknownKeyType(Exception):
An unknown public/private key algorithm was attempted to be read.
"""
- def __init__(self, key_type, key_bytes):
+ def __init__(self, key_type=None, key_bytes=None):
self.key_type = key_type
self.key_bytes = key_bytes
+ def __str__(self):
+ return f"UnknownKeyType(type={self.key_type!r}, bytes=<{len(self.key_bytes)}>)" # noqa
+
class PKey:
"""
@@ -106,6 +110,53 @@ class PKey:
END_TAG = re.compile(r"^-{5}END (RSA|DSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$")
@staticmethod
+ def from_path(path, passphrase=None):
+ """
+ Attempt to instantiate appropriate key subclass from given file path.
+
+ :param Path path: The path to load.
+
+ .. versionadded:: 3.2
+ """
+ # TODO: make sure sphinx is reading Path right in param list...
+ from paramiko import DSSKey, RSAKey, Ed25519Key, ECDSAKey
+
+ data = path.read_bytes()
+ # Like OpenSSH, try modern/OpenSSH-specific key load first
+ try:
+ loaded = serialization.load_ssh_private_key(
+ data=data, password=passphrase
+ )
+ # Then fall back to assuming legacy PEM type
+ except ValueError:
+ loaded = serialization.load_pem_private_key(
+ data=data, password=passphrase
+ )
+ # TODO Python 3.10: match statement? (NOTE: we cannot use a dict
+ # because the results from the loader are literal backend, eg openssl,
+ # private classes, so isinstance tests work but exact 'x class is y'
+ # tests will not work)
+ # TODO: leverage already-parsed/mathed obj to avoid duplicate cpu
+ # cycles? seemingly requires most of our key subclasses to be rewritten
+ # to be cryptography-object-forward. this is still likely faster than
+ # the old SSHClient code that just tried instantiating every class!
+ key_class = None
+ if isinstance(loaded, asymmetric.dsa.DSAPrivateKey):
+ key_class = DSSKey
+ elif isinstance(loaded, asymmetric.rsa.RSAPrivateKey):
+ key_class = RSAKey
+ elif isinstance(loaded, asymmetric.ed25519.Ed25519PrivateKey):
+ key_class = Ed25519Key
+ elif isinstance(loaded, asymmetric.ec.EllipticCurvePrivateKey):
+ key_class = ECDSAKey
+ else:
+ raise UnknownKeyType(
+ key_bytes=data, key_type=loaded.__class__.__name__
+ )
+ with path.open() as fd:
+ return key_class.from_private_key(fd, password=passphrase)
+
+ @staticmethod
def from_type_string(key_type, key_bytes):
"""
Given type `str` & raw `bytes`, return a `PKey` subclass instance.
@@ -131,6 +182,7 @@ class PKey:
for key_class in key_classes:
if key_type in key_class.identifiers():
+ # TODO: needs to passthru things like passphrase
return key_class(data=key_bytes)
raise UnknownKeyType(key_type=key_type, key_bytes=key_bytes)
diff --git a/pytest.ini b/pytest.ini
index 62fef863..209df545 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -1,6 +1,5 @@
[pytest]
-# We use pytest-relaxed just for its utils at the moment, so disable it at the
-# plugin level until we adapt test organization to really use it.
-addopts = -p no:relaxed
+testpaths = tests
+python_files = *
# Loop on failure
looponfailroots = tests paramiko
diff --git a/tests/loop.py b/tests/_loop.py
index a3740013..a3740013 100644
--- a/tests/loop.py
+++ b/tests/_loop.py
diff --git a/tests/stub_sftp.py b/tests/_stub_sftp.py
index 0c0372e9..0c0372e9 100644
--- a/tests/stub_sftp.py
+++ b/tests/_stub_sftp.py
diff --git a/tests/test_ecdsa_256.key b/tests/_support/ecdsa-256.key
index 42d44734..42d44734 100644
--- a/tests/test_ecdsa_256.key
+++ b/tests/_support/ecdsa-256.key
diff --git a/tests/test_ed25519.key b/tests/_support/ed25519.key
index eb9f94c2..eb9f94c2 100644
--- a/tests/test_ed25519.key
+++ b/tests/_support/ed25519.key
diff --git a/tests/test_rsa.key b/tests/_support/rsa.key
index f50e9c53..f50e9c53 100644
--- a/tests/test_rsa.key
+++ b/tests/_support/rsa.key
diff --git a/tests/util.py b/tests/_util.py
index 2f1c5ac2..2f1c5ac2 100644
--- a/tests/util.py
+++ b/tests/_util.py
diff --git a/tests/conftest.py b/tests/conftest.py
index b28d2a17..beef87c2 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -2,13 +2,24 @@ import logging
import os
import shutil
import threading
+from pathlib import Path
+
+from invoke.vendor.lexicon import Lexicon
import pytest
-from paramiko import RSAKey, SFTPServer, SFTP, Transport
+from paramiko import (
+ SFTPServer,
+ SFTP,
+ Transport,
+ DSSKey,
+ RSAKey,
+ Ed25519Key,
+ ECDSAKey,
+)
-from .loop import LoopSocket
-from .stub_sftp import StubServer, StubSFTPServer
-from .util import _support
+from ._loop import LoopSocket
+from ._stub_sftp import StubServer, StubSFTPServer
+from ._util import _support
from icecream import ic, install as install_ic
@@ -71,7 +82,7 @@ def sftp_server():
tc = Transport(sockc)
ts = Transport(socks)
# Auth
- host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ host_key = RSAKey.from_private_key_file(_support("rsa.key"))
ts.add_server_key(host_key)
# Server setup
event = threading.Event()
@@ -103,3 +114,45 @@ def sftp(sftp_server):
yield client
# Clean up - as in make_sftp_folder, we assume local-only exec for now.
shutil.rmtree(client.FOLDER, ignore_errors=True)
+
+
+key_data = [
+ ["ssh-rsa", RSAKey, "SHA256:OhNL391d/beeFnxxg18AwWVYTAHww+D4djEE7Co0Yng"],
+ ["ssh-dss", DSSKey, "SHA256:uHwwykG099f4M4kfzvFpKCTino0/P03DRbAidpAmPm0"],
+ [
+ "ssh-ed25519",
+ Ed25519Key,
+ "SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow",
+ ],
+ [
+ "ecdsa-sha2-nistp256",
+ ECDSAKey,
+ "SHA256:BrQG04oNKUETjKCeL4ifkARASg3yxS/pUHl3wWM26Yg",
+ ],
+]
+for datum in key_data:
+ short = datum[0].replace("ssh-", "").replace("sha2-nistp", "")
+ datum.insert(0, short)
+
+
+@pytest.fixture(scope="session", params=key_data, ids=lambda x: x[0])
+def key(request):
+ """
+ Yield an object for each known type of key, with attributes:
+
+ - ``short_type``: short identifier, eg ``rsa`` or ``ecdsa-256``
+ - ``full_type``: the "message style" key identifier, eg ``ssh-rsa``, or
+ ``ecdsa-sha2-nistp256``.
+ - ``path``: a pathlib Path object to the fixture key file
+ - ``pkey``: an instantiated PKey subclass object
+ - ``fingerprint``: the expected fingerprint of said key
+ """
+ short_type, key_type, key_class, fingerprint = request.param
+ bag = Lexicon()
+ bag.short_type = short_type
+ bag.full_type = key_type
+ bag.path = Path(_support(f"{short_type}.key"))
+ with bag.path.open() as fd:
+ bag.pkey = key_class.from_private_key(fd)
+ bag.fingerprint = fingerprint
+ yield bag
diff --git a/tests/pkey.py b/tests/pkey.py
new file mode 100644
index 00000000..b1cba825
--- /dev/null
+++ b/tests/pkey.py
@@ -0,0 +1,13 @@
+from paramiko import PKey
+
+
+class PKey_:
+ class from_type_string:
+ def loads_from_type_and_bytes(self, key):
+ obj = PKey.from_type_string(key.full_type, key.pkey.asbytes())
+ assert obj == key.pkey
+
+ class from_path:
+ def loads_from_file_path(self, key):
+ obj = PKey.from_path(key.path)
+ assert obj == key.pkey
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 592e589f..02df8c12 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -37,8 +37,8 @@ from paramiko import (
from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
from paramiko.util import u
-from .loop import LoopSocket
-from .util import _support, slow
+from ._loop import LoopSocket
+from ._util import _support, slow
_pwd = u("\u2022")
@@ -129,7 +129,7 @@ class AuthTest(unittest.TestCase):
self.sockc.close()
def start_server(self):
- host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ host_key = RSAKey.from_private_key_file(_support("rsa.key"))
self.public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
self.event = threading.Event()
diff --git a/tests/test_client.py b/tests/test_client.py
index 62c92b35..564cda00 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -41,7 +41,7 @@ from paramiko import SSHClient
from paramiko.pkey import PublicBlob
from paramiko.ssh_exception import SSHException, AuthenticationException
-from .util import _support, requires_sha1_signing, slow
+from ._util import _support, requires_sha1_signing, slow
requires_gss_auth = unittest.skipUnless(
@@ -171,10 +171,10 @@ class ClientTest(unittest.TestCase):
self.ts = paramiko.Transport(self.socks)
if server_name is not None:
self.ts.local_version = server_name
- keypath = _support("test_rsa.key")
+ keypath = _support("rsa.key")
host_key = paramiko.RSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
- keypath = _support("test_ecdsa_256.key")
+ keypath = _support("ecdsa-256.key")
host_key = paramiko.ECDSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob)
@@ -195,7 +195,7 @@ class ClientTest(unittest.TestCase):
# Server setup
threading.Thread(target=self._run, kwargs=run_kwargs).start()
host_key = paramiko.RSAKey.from_private_key_file(
- _support("test_rsa.key")
+ _support("rsa.key")
)
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
@@ -263,18 +263,18 @@ class SSHClientTest(ClientTest):
"""
verify that SSHClient works with an RSA key.
"""
- self._test_connection(key_filename=_support("test_rsa.key"))
+ self._test_connection(key_filename=_support("rsa.key"))
@requires_sha1_signing
def test_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
- self._test_connection(key_filename=_support("test_ecdsa_256.key"))
+ self._test_connection(key_filename=_support("ecdsa-256.key"))
@requires_sha1_signing
def test_client_ed25519(self):
- self._test_connection(key_filename=_support("test_ed25519.key"))
+ self._test_connection(key_filename=_support("ed25519.key"))
@requires_sha1_signing
def test_multiple_key_files(self):
@@ -289,16 +289,17 @@ class SSHClientTest(ClientTest):
}
# Various combos of attempted & valid keys
# TODO: try every possible combo using itertools functions
+ # TODO: use new key(s) fixture(s)
for attempt, accept in (
(["rsa", "dss"], ["dss"]), # Original test #3
(["dss", "rsa"], ["dss"]), # Ordering matters sometimes, sadly
- (["dss", "rsa", "ecdsa_256"], ["dss"]), # Try ECDSA but fail
- (["rsa", "ecdsa_256"], ["ecdsa"]), # ECDSA success
+ (["dss", "rsa", "ecdsa-256"], ["dss"]), # Try ECDSA but fail
+ (["rsa", "ecdsa-256"], ["ecdsa"]), # ECDSA success
):
try:
self._test_connection(
key_filename=[
- _support("test_{}.key".format(x)) for x in attempt
+ _support("{}.key".format(x)) for x in attempt
],
allowed_keys=[types_[x] for x in accept],
)
@@ -318,7 +319,7 @@ class SSHClientTest(ClientTest):
self.assertRaises(
SSHException,
self._test_connection,
- key_filename=[_support("test_rsa.key")],
+ key_filename=[_support("rsa.key")],
allowed_keys=["ecdsa-sha2-nistp256"],
)
@@ -338,7 +339,7 @@ class SSHClientTest(ClientTest):
@requires_sha1_signing
def test_certs_implicitly_loaded_alongside_key_filename_keys(self):
- # NOTE: a regular test_connection() w/ test_rsa.key would incidentally
+ # NOTE: a regular test_connection() w/ rsa.key would incidentally
# test this (because test_xxx.key-cert.pub exists) but incidental tests
# stink, so NullServer and friends were updated to allow assertions
# about the server-side key object's public blob. Thus, we can prove
@@ -391,7 +392,7 @@ class SSHClientTest(ClientTest):
"""
threading.Thread(target=self._run).start()
hostname = f"[{self.addr}]:{self.port}"
- key_file = _support("test_ecdsa_256.key")
+ key_file = _support("ecdsa-256.key")
public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
self.tc = SSHClient()
@@ -415,7 +416,7 @@ class SSHClientTest(ClientTest):
warnings.filterwarnings("ignore", "tempnam.*")
host_key = paramiko.RSAKey.from_private_key_file(
- _support("test_rsa.key")
+ _support("rsa.key")
)
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
fd, localname = mkstemp()
@@ -517,7 +518,7 @@ class SSHClientTest(ClientTest):
# Start the thread with a 1 second wait.
threading.Thread(target=self._run, kwargs={"delay": 1}).start()
host_key = paramiko.RSAKey.from_private_key_file(
- _support("test_rsa.key")
+ _support("rsa.key")
)
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
@@ -593,7 +594,7 @@ class SSHClientTest(ClientTest):
"""
Failed gssapi-keyex doesn't prevent subsequent key from succeeding
"""
- kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")])
+ kwargs = dict(gss_kex=True, key_filename=[_support("rsa.key")])
self._test_connection(**kwargs)
@requires_gss_auth
@@ -601,7 +602,7 @@ class SSHClientTest(ClientTest):
"""
Failed gssapi-with-mic doesn't prevent subsequent key from succeeding
"""
- kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")])
+ kwargs = dict(gss_auth=True, key_filename=[_support("rsa.key")])
self._test_connection(**kwargs)
def test_reject_policy(self):
@@ -683,11 +684,11 @@ class SSHClientTest(ClientTest):
self._client_host_key_bad(host_key)
def test_host_key_negotiation_3(self):
- self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key")
+ self._client_host_key_good(paramiko.ECDSAKey, "ecdsa-256.key")
@requires_sha1_signing
def test_host_key_negotiation_4(self):
- self._client_host_key_good(paramiko.RSAKey, "test_rsa.key")
+ self._client_host_key_good(paramiko.RSAKey, "rsa.key")
def _setup_for_env(self):
threading.Thread(target=self._run).start()
diff --git a/tests/test_config.py b/tests/test_config.py
index a2c60a32..fcb120b6 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -19,7 +19,7 @@ from paramiko import (
ConfigParseError,
)
-from .util import _config
+from ._util import _config
@fixture
diff --git a/tests/test_file.py b/tests/test_file.py
index 456c0388..9344495b 100644
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -26,7 +26,7 @@ from io import BytesIO
from paramiko.common import linefeed_byte, crlf, cr_byte
from paramiko.file import BufferedFile
-from .util import needs_builtin
+from ._util import needs_builtin
class LoopbackFile(BufferedFile):
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 671f1ba0..da62fd97 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -24,7 +24,7 @@ Test the used APIs for GSS-API / SSPI authentication
import socket
-from .util import needs_gssapi, KerberosTestCase, update_env
+from ._util import needs_gssapi, KerberosTestCase, update_env
#
# NOTE: KerberosTestCase skips all tests if it was unable to import k5test
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index d4868f4a..c33f4c68 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -31,7 +31,7 @@ import unittest
import paramiko
-from .util import needs_gssapi, KerberosTestCase, update_env
+from ._util import needs_gssapi, KerberosTestCase, update_env
class NullServer(paramiko.ServerInterface):
@@ -80,7 +80,7 @@ class GSSKexTest(KerberosTestCase):
def _run(self):
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks, gss_kex=True)
- host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
+ host_key = paramiko.RSAKey.from_private_key_file("tests/rsa.key")
self.ts.add_server_key(host_key)
self.ts.set_gss_host(self.realm.hostname)
try:
@@ -96,7 +96,7 @@ class GSSKexTest(KerberosTestCase):
Diffie-Hellman Key Exchange and user authentication with the GSS-API
context created during key exchange.
"""
- host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
+ host_key = paramiko.RSAKey.from_private_key_file("tests/rsa.key")
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index d4dd58ad..aee21c21 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -30,7 +30,7 @@ from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
from paramiko import Message, Packetizer, util
from paramiko.common import byte_chr, zero_byte
-from .loop import LoopSocket
+from ._loop import LoopSocket
x55 = byte_chr(0x55)
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 5dfaaff7..c5b20f91 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -45,7 +45,7 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateNumbers
from unittest.mock import patch, Mock
import pytest
-from .util import _support, is_low_entropy, requires_sha1_signing
+from ._util import _support, is_low_entropy, requires_sha1_signing
# from openssh's ssh-keygen
@@ -161,7 +161,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(exp, key)
def test_load_rsa(self):
- key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ key = RSAKey.from_private_key_file(_support("rsa.key"))
self.assertEqual("ssh-rsa", key.get_name())
exp_rsa = b(FINGER_RSA.split()[1].replace(":", ""))
my_rsa = hexlify(key.get_fingerprint())
@@ -184,7 +184,7 @@ class KeyTest(unittest.TestCase):
) as loader:
loader.side_effect = exception
with pytest.raises(SSHException, match=str(exception)):
- RSAKey.from_private_key_file(_support("test_rsa.key"))
+ RSAKey.from_private_key_file(_support("rsa.key"))
def test_loading_empty_keys_errors_usefully(self):
# #1599 - raise SSHException instead of IndexError
@@ -231,7 +231,7 @@ class KeyTest(unittest.TestCase):
def test_compare_rsa(self):
# verify that the private & public keys compare equal
- key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ key = RSAKey.from_private_key_file(_support("rsa.key"))
self.assertEqual(key, key)
pub = RSAKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -248,7 +248,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key, pub)
def _sign_and_verify_rsa(self, algorithm, saved_sig):
- key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ key = RSAKey.from_private_key_file(_support("rsa.key"))
msg = key.sign_ssh_data(b"ice weasels", algorithm)
assert isinstance(msg, Message)
msg.rewind()
@@ -329,7 +329,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key.get_name(), "ecdsa-sha2-nistp521")
def test_load_ecdsa_256(self):
- key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key"))
+ key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key"))
self.assertEqual("ecdsa-sha2-nistp256", key.get_name())
exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(":", ""))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -357,7 +357,7 @@ class KeyTest(unittest.TestCase):
def test_compare_ecdsa_256(self):
# verify that the private & public keys compare equal
- key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key"))
+ key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key"))
self.assertEqual(key, key)
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -366,7 +366,7 @@ class KeyTest(unittest.TestCase):
def test_sign_ecdsa_256(self):
# verify that the rsa private key can sign and verify
- key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key"))
+ key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key"))
msg = key.sign_ssh_data(b"ice weasels")
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -408,7 +408,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(384, key.get_bits())
def test_load_ecdsa_transmutes_crypto_exceptions(self):
- path = _support("test_ecdsa_256.key")
+ path = _support("ecdsa-256.key")
# TODO: nix unittest for pytest
for exception in (TypeError("onoz"), UnsupportedAlgorithm("oops")):
with patch(
@@ -569,12 +569,12 @@ class KeyTest(unittest.TestCase):
RSAKey.from_private_key_file(_support("test_rsa_openssh_nopad.key"))
def test_stringification(self):
- key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ key = RSAKey.from_private_key_file(_support("rsa.key"))
comparable = TEST_KEY_BYTESTR
self.assertEqual(str(key), comparable)
def test_ed25519(self):
- key1 = Ed25519Key.from_private_key_file(_support("test_ed25519.key"))
+ key1 = Ed25519Key.from_private_key_file(_support("ed25519.key"))
key2 = Ed25519Key.from_private_key_file(
_support("test_ed25519_password.key"), b"abc123"
)
@@ -594,7 +594,7 @@ class KeyTest(unittest.TestCase):
def test_ed25519_compare(self):
# verify that the private & public keys compare equal
- key = Ed25519Key.from_private_key_file(_support("test_ed25519.key"))
+ key = Ed25519Key.from_private_key_file(_support("ed25519.key"))
self.assertEqual(key, key)
pub = Ed25519Key(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -616,12 +616,13 @@ class KeyTest(unittest.TestCase):
)
assert original != generated
+ # TODO: use keys fixture
def keys(self):
for key_class, filename in [
- (RSAKey, "test_rsa.key"),
+ (RSAKey, "rsa.key"),
(DSSKey, "dss.key"),
- (ECDSAKey, "test_ecdsa_256.key"),
- (Ed25519Key, "test_ed25519.key"),
+ (ECDSAKey, "ecdsa-256.key"),
+ (Ed25519Key, "ed25519.key"),
]:
key1 = key_class.from_private_key_file(_support(filename))
key2 = key_class.from_private_key_file(_support(filename))
@@ -643,6 +644,7 @@ class KeyTest(unittest.TestCase):
for key1, key2 in self.keys():
assert hash(key1) == hash(key2)
+ # TODO: use keys fixture
def test_new_fingerprint(self):
# Assumes the RSA, DSS, ECDSA, Ed25519 order seen in 'def keys'.
fingerprints = [x.fingerprint for x, _ in self.keys()]
@@ -653,6 +655,7 @@ class KeyTest(unittest.TestCase):
"SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow",
]
+ # TODO: use keys fixture
def test_algorithm_property(self):
# Assumes the RSA, DSS, ECDSA, Ed25519 order seen in 'def keys'.
algorithms = [x.algorithm_name for x, _ in self.keys()]
@@ -669,7 +672,7 @@ class KeyTest(unittest.TestCase):
# No exception -> it's good. Meh.
def test_ed25519_load_from_file_obj(self):
- with open(_support("test_ed25519.key")) as pkey_fileobj:
+ with open(_support("ed25519.key")) as pkey_fileobj:
key = Ed25519Key.from_private_key(pkey_fileobj)
self.assertEqual(key, key)
self.assertTrue(key.can_sign())
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index be123de4..7fd274bc 100644
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -38,8 +38,8 @@ from paramiko.sftp_attr import SFTPAttributes
from paramiko.util import b, u
from tests import requireNonAsciiLocale
-from .util import needs_builtin
-from .util import slow
+from ._util import needs_builtin
+from ._util import slow
ARTICLE = """
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index 5192f657..acfe71e3 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -30,7 +30,7 @@ import time
from paramiko.common import o660
-from .util import slow
+from ._util import slow
@slow
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index a8175ccb..27976a8d 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -28,7 +28,7 @@ import threading
import paramiko
-from .util import _support, needs_gssapi, KerberosTestCase, update_env
+from ._util import _support, needs_gssapi, KerberosTestCase, update_env
from .test_client import FINGERPRINTS
@@ -89,7 +89,7 @@ class GSSAuthTest(KerberosTestCase):
def _run(self):
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks)
- host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
+ host_key = paramiko.RSAKey.from_private_key_file("tests/rsa.key")
self.ts.add_server_key(host_key)
server = NullServer()
self.ts.start_server(self.event, server)
@@ -100,7 +100,7 @@ class GSSAuthTest(KerberosTestCase):
The exception is ... no exception yet
"""
- host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
+ host_key = paramiko.RSAKey.from_private_key_file("tests/rsa.key")
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
@@ -154,7 +154,7 @@ class GSSAuthTest(KerberosTestCase):
"this_host_does_not_exists_and_causes_a_GSSAPI-exception"
)
self._test_connection(
- key_filename=[_support("test_rsa.key")],
+ key_filename=[_support("rsa.key")],
allow_agent=False,
look_for_keys=False,
)
diff --git a/tests/test_transport.py b/tests/test_transport.py
index a6b15ee1..d8ac8a4b 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -60,8 +60,8 @@ from paramiko.common import (
)
from paramiko.message import Message
-from .util import needs_builtin, _support, requires_sha1_signing, slow
-from .loop import LoopSocket
+from ._util import needs_builtin, _support, requires_sha1_signing, slow
+from ._loop import LoopSocket
LONG_BANNER = """\
@@ -168,7 +168,7 @@ class TransportTest(unittest.TestCase):
def setup_test_server(
self, client_options=None, server_options=None, connect_kwargs=None
):
- host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ host_key = RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
@@ -234,7 +234,7 @@ class TransportTest(unittest.TestCase):
loopback sockets. this is hardly "simple" but it's simpler than the
later tests. :)
"""
- host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ host_key = RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -260,7 +260,7 @@ class TransportTest(unittest.TestCase):
"""
verify that a long banner doesn't mess up the handshake.
"""
- host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ host_key = RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -910,7 +910,7 @@ class TransportTest(unittest.TestCase):
# be fine. Even tho it's a bit squicky.
self.tc.packetizer = SlowPacketizer(self.tc.sock)
# Continue with regular test red tape.
- host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ host_key = RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -1204,7 +1204,7 @@ def server(
:param hostkey:
Host key to use for the server; if None, loads
- ``test_rsa.key``.
+ ``rsa.key``.
:param init:
Default `Transport` constructor kwargs to use for both sides.
:param server_init:
@@ -1234,7 +1234,7 @@ def server(
ts = Transport(socks, **dict(init, **server_init))
if hostkey is None:
- hostkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ hostkey = RSAKey.from_private_key_file(_support("rsa.key"))
ts.add_server_key(hostkey)
event = threading.Event()
server = NullServer(allowed_keys=pubkeys)
@@ -1344,7 +1344,7 @@ class TestSHA2SignatureKeyExchange(unittest.TestCase):
# (This is a regression test vs previous implementation which overwrote
# the entire preferred-hostkeys structure when given an explicit key as
# a client.)
- hostkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ hostkey = RSAKey.from_private_key_file(_support("rsa.key"))
with server(hostkey=hostkey, connect=dict(hostkey=hostkey)) as (tc, _):
assert tc.host_key_type == "rsa-sha2-512"
@@ -1359,7 +1359,7 @@ class TestExtInfo(unittest.TestCase):
}
def test_client_uses_server_sig_algs_for_pubkey_auth(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
with server(
pubkeys=[privkey],
connect=dict(pkey=privkey),
@@ -1376,7 +1376,7 @@ class TestExtInfo(unittest.TestCase):
# with this module anyways...
class TestSHA2SignaturePubkeys(unittest.TestCase):
def test_pubkey_auth_honors_disabled_algorithms(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
with server(
pubkeys=[privkey],
connect=dict(pkey=privkey),
@@ -1391,7 +1391,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase):
assert "no RSA pubkey algorithms" in str(err)
def test_client_sha2_disabled_server_sha1_disabled_no_match(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
with server(
pubkeys=[privkey],
connect=dict(pkey=privkey),
@@ -1402,7 +1402,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase):
assert isinstance(err, AuthenticationException)
def test_client_sha1_disabled_server_sha2_disabled_no_match(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
with server(
pubkeys=[privkey],
connect=dict(pkey=privkey),
@@ -1414,7 +1414,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase):
@requires_sha1_signing
def test_ssh_rsa_still_used_when_sha2_disabled(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
# NOTE: this works because key obj comparison uses public bytes
# TODO: would be nice for PKey to grow a legit "give me another obj of
# same class but just the public bits" using asbytes()
@@ -1424,7 +1424,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase):
assert tc.is_authenticated()
def test_sha2_512(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
with server(
pubkeys=[privkey],
connect=dict(pkey=privkey),
@@ -1436,7 +1436,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase):
assert tc._agreed_pubkey_algorithm == "rsa-sha2-512"
def test_sha2_256(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
with server(
pubkeys=[privkey],
connect=dict(pkey=privkey),
@@ -1448,7 +1448,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase):
assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
def test_sha2_256_when_client_only_enables_256(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
with server(
pubkeys=[privkey],
connect=dict(pkey=privkey),