summaryrefslogtreecommitdiff
path: root/tests/test_transport.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r--tests/test_transport.py198
1 files changed, 19 insertions, 179 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py
index d7704af6..ee00830a 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -22,8 +22,6 @@ Some unit tests for the ssh2 protocol in Transport.
from binascii import hexlify
-from contextlib import contextmanager
-import pytest
import select
import socket
import time
@@ -35,18 +33,15 @@ from unittest.mock import Mock
from paramiko import (
AuthHandler,
ChannelException,
- DSSKey,
Packetizer,
RSAKey,
SSHException,
AuthenticationException,
IncompatiblePeer,
SecurityOptions,
- ServerInterface,
Transport,
)
-from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
-from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
+from paramiko import OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
from paramiko.common import (
DEFAULT_MAX_PACKET_SIZE,
DEFAULT_WINDOW_SIZE,
@@ -61,7 +56,18 @@ from paramiko.common import (
)
from paramiko.message import Message
-from ._util import needs_builtin, _support, requires_sha1_signing, slow
+from ._util import (
+ needs_builtin,
+ _support,
+ requires_sha1_signing,
+ slow,
+ server,
+ _disable_sha2,
+ _disable_sha1,
+ _disable_sha2_pubkey,
+ _disable_sha1_pubkey,
+ TestServer as NullServer,
+)
from ._loop import LoopSocket
@@ -78,79 +84,6 @@ Maybe.
"""
-class NullServer(ServerInterface):
- paranoid_did_password = False
- paranoid_did_public_key = False
- paranoid_key = DSSKey.from_private_key_file(_support("dss.key"))
-
- def __init__(self, allowed_keys=None):
- self.allowed_keys = allowed_keys if allowed_keys is not None else []
-
- def get_allowed_auths(self, username):
- if username == "slowdive":
- return "publickey,password"
- return "publickey"
-
- def check_auth_password(self, username, password):
- if (username == "slowdive") and (password == "pygmalion"):
- return AUTH_SUCCESSFUL
- return AUTH_FAILED
-
- def check_auth_publickey(self, username, key):
- if key in self.allowed_keys:
- return AUTH_SUCCESSFUL
- return AUTH_FAILED
-
- def check_channel_request(self, kind, chanid):
- if kind == "bogus":
- return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
- return OPEN_SUCCEEDED
-
- def check_channel_exec_request(self, channel, command):
- if command != b"yes":
- return False
- return True
-
- def check_channel_shell_request(self, channel):
- return True
-
- def check_global_request(self, kind, msg):
- self._global_request = kind
- # NOTE: for w/e reason, older impl of this returned False always, even
- # tho that's only supposed to occur if the request cannot be served.
- # For now, leaving that the default unless test supplies specific
- # 'acceptable' request kind
- return kind == "acceptable"
-
- def check_channel_x11_request(
- self,
- channel,
- single_connection,
- auth_protocol,
- auth_cookie,
- screen_number,
- ):
- self._x11_single_connection = single_connection
- self._x11_auth_protocol = auth_protocol
- self._x11_auth_cookie = auth_cookie
- self._x11_screen_number = screen_number
- return True
-
- def check_port_forward_request(self, addr, port):
- self._listen = socket.socket()
- self._listen.bind(("127.0.0.1", 0))
- self._listen.listen(1)
- return self._listen.getsockname()[1]
-
- def cancel_port_forward_request(self, addr, port):
- self._listen.close()
- self._listen = None
-
- def check_channel_direct_tcpip_request(self, chanid, origin, destination):
- self._tcpip_dest = destination
- return OPEN_SUCCEEDED
-
-
class TransportTest(unittest.TestCase):
def setUp(self):
self.socks = LoopSocket()
@@ -1190,103 +1123,6 @@ class AlgorithmDisablingTests(unittest.TestCase):
assert "zlib" not in compressions
-@contextmanager
-def server(
- hostkey=None,
- init=None,
- server_init=None,
- client_init=None,
- connect=None,
- pubkeys=None,
- catch_error=False,
- transport_factory=None,
-):
- """
- SSH server contextmanager for testing.
-
- :param hostkey:
- Host key to use for the server; if None, loads
- ``rsa.key``.
- :param init:
- Default `Transport` constructor kwargs to use for both sides.
- :param server_init:
- Extends and/or overrides ``init`` for server transport only.
- :param client_init:
- Extends and/or overrides ``init`` for client transport only.
- :param connect:
- Kwargs to use for ``connect()`` on the client.
- :param pubkeys:
- List of public keys for auth.
- :param catch_error:
- Whether to capture connection errors & yield from contextmanager.
- Necessary for connection_time exception testing.
- :param transport_factory:
- Like the same-named param in SSHClient: which Transport class to use.
- """
- if init is None:
- init = {}
- if server_init is None:
- server_init = {}
- if client_init is None:
- client_init = {}
- if connect is None:
- connect = dict(username="slowdive", password="pygmalion")
- socks = LoopSocket()
- sockc = LoopSocket()
- sockc.link(socks)
- if transport_factory is None:
- transport_factory = Transport
- tc = transport_factory(sockc, **dict(init, **client_init))
- ts = transport_factory(socks, **dict(init, **server_init))
-
- if hostkey is None:
- hostkey = RSAKey.from_private_key_file(_support("rsa.key"))
- ts.add_server_key(hostkey)
- event = threading.Event()
- server = NullServer(allowed_keys=pubkeys)
- assert not event.is_set()
- assert not ts.is_active()
- assert tc.get_username() is None
- assert ts.get_username() is None
- assert not tc.is_authenticated()
- assert not ts.is_authenticated()
-
- err = None
- # Trap errors and yield instead of raising right away; otherwise callers
- # cannot usefully deal with problems at connect time which stem from errors
- # in the server side.
- try:
- ts.start_server(event, server)
- tc.connect(**connect)
-
- event.wait(1.0)
- assert event.is_set()
- assert ts.is_active()
- assert tc.is_active()
-
- except Exception as e:
- if not catch_error:
- raise
- err = e
-
- yield (tc, ts, err) if catch_error else (tc, ts)
-
- tc.close()
- ts.close()
- socks.close()
- sockc.close()
-
-
-_disable_sha2 = dict(
- disabled_algorithms=dict(keys=["rsa-sha2-256", "rsa-sha2-512"])
-)
-_disable_sha1 = dict(disabled_algorithms=dict(keys=["ssh-rsa"]))
-_disable_sha2_pubkey = dict(
- disabled_algorithms=dict(pubkeys=["rsa-sha2-256", "rsa-sha2-512"])
-)
-_disable_sha1_pubkey = dict(disabled_algorithms=dict(pubkeys=["ssh-rsa"]))
-
-
class TestSHA2SignatureKeyExchange(unittest.TestCase):
# NOTE: these all rely on the default server() hostkey being RSA
# NOTE: these rely on both sides being properly implemented re: agreed-upon
@@ -1351,7 +1187,10 @@ class TestSHA2SignatureKeyExchange(unittest.TestCase):
# the entire preferred-hostkeys structure when given an explicit key as
# a client.)
hostkey = RSAKey.from_private_key_file(_support("rsa.key"))
- with server(hostkey=hostkey, connect=dict(hostkey=hostkey)) as (tc, _):
+ connect = dict(
+ hostkey=hostkey, username="slowdive", password="pygmalion"
+ )
+ with server(hostkey=hostkey, connect=connect) as (tc, _):
assert tc.host_key_type == "rsa-sha2-512"
@@ -1442,7 +1281,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase):
server_init = dict(_disable_sha2_pubkey, server_sig_algs=False)
with server(
pubkeys=[privkey],
- connect=dict(pkey=privkey),
+ connect=dict(username="slowdive", pkey=privkey),
server_init=server_init,
catch_error=True,
) as (tc, ts, err):
@@ -1455,6 +1294,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase):
privkey = RSAKey.from_private_key_file(_support("rsa.key"))
with server(
pubkeys=[privkey],
+ # TODO: why is this passing without a username?
connect=dict(pkey=privkey),
init=dict(
disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-256"])