summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMatt Johnston <matt@codeconstruct.com.au>2021-10-18 14:22:37 +0800
committerMatt Johnston <matt@codeconstruct.com.au>2021-10-18 14:22:37 +0800
commit32d498cec3bd943bf01b83fe8ac90efe08663b96 (patch)
tree3793506765b640f887d96d61307b0eefe8389b36 /test
parent4ee9231b778744baf4855cf5e1b03f8ca441a179 (diff)
downloaddropbear-32d498cec3bd943bf01b83fe8ac90efe08663b96.tar.gz
Add first channel tests
These initial tests are checking various edge cases of channel handling that have cropped up over the years.
Diffstat (limited to 'test')
-rw-r--r--test/conftest.py18
-rw-r--r--test/requirements.txt8
-rw-r--r--test/test_channels.py134
-rw-r--r--test/test_dropbear.py109
4 files changed, 269 insertions, 0 deletions
diff --git a/test/conftest.py b/test/conftest.py
new file mode 100644
index 0000000..a8728f1
--- /dev/null
+++ b/test/conftest.py
@@ -0,0 +1,18 @@
+def pytest_addoption(parser):
+ parser.addoption("--port", type=str, help="default is 2244 local, 22 remote")
+ parser.addoption("--dbclient", type=str, default="../dbclient")
+ parser.addoption("--dropbear", type=str, default="../dropbear")
+ parser.addoption("--hostkey", type=str, help="required unless --remote")
+ parser.addoption("--remote", type=str, help="remote host")
+ parser.addoption("--user", type=str, help="optional username")
+
+def pytest_configure(config):
+ opt = config.option
+ if not opt.hostkey and not opt.remote:
+ raise Exception("--hostkey must be given")
+ if not opt.port:
+ if opt.remote:
+ opt.port = "22"
+ else:
+ opt.port = "2244"
+
diff --git a/test/requirements.txt b/test/requirements.txt
new file mode 100644
index 0000000..36f6f91
--- /dev/null
+++ b/test/requirements.txt
@@ -0,0 +1,8 @@
+attrs==21.2.0
+iniconfig==1.1.1
+packaging==21.0
+pluggy==1.0.0
+py==1.10.0
+pyparsing==2.4.7
+pytest==6.2.5
+toml==0.10.2
diff --git a/test/test_channels.py b/test/test_channels.py
new file mode 100644
index 0000000..9c493ad
--- /dev/null
+++ b/test/test_channels.py
@@ -0,0 +1,134 @@
+from test_dropbear import *
+import signal
+import queue
+import socket
+
+# Tests for various edge cases of SSH channels and connection service
+
+def test_exitcode(request, dropbear):
+ r = dbclient(request, "exit 44")
+ assert r.returncode == 44
+
+@pytest.mark.xfail(reason="Not yet implemented", strict=True)
+def test_signal(request, dropbear):
+ r = dbclient(request, "kill -FPE $$")
+ assert r.returncode == -signal.SIGFPE
+
+@pytest.mark.parametrize("size", [0, 1, 2, 100, 5000, 200_000])
+def test_roundtrip(request, dropbear, size):
+ dat = os.urandom(size)
+ r = dbclient(request, "cat", input=dat, capture_output=True)
+ r.check_returncode()
+ assert r.stdout == dat
+
+@pytest.mark.parametrize("size", [0, 1, 2, 100, 20001, 41234])
+def test_read_pty(request, dropbear, size):
+ # testcase for
+ # https://bugs.openwrt.org/index.php?do=details&task_id=1814
+ # https://github.com/mkj/dropbear/pull/85
+ # From Yousong Zhou
+ # Fixed Oct 2021
+ #
+ #$ ssh -t my.router cat /tmp/bigfile | wc
+ #Connection to my.router closed.
+ # 0 1 14335 <- should be 20001
+
+ # Write the file. No newlines etc which could confuse ptys
+ dat = random_alnum(size)
+ r = dbclient(request, "tmpf=`mktemp`; echo $tmpf; cat > $tmpf", input=dat, capture_output=True, text=True)
+ tmpf = r.stdout.rstrip()
+ r.check_returncode()
+ # Read with a pty, this is what is being tested.
+ # Timing/buffering is subtle, we seem to need to cat a file from disk to hit it.
+ m, s = pty.openpty()
+ r = dbclient(request, "-t", f"cat {tmpf}; rm {tmpf}", stdin=s, capture_output=True)
+ r.check_returncode()
+ assert r.stdout.decode() == dat
+
+@pytest.mark.parametrize("fd", [1, 2])
+def test_bg_sleep(request, fd, dropbear):
+ # https://lists.ucc.asn.au/pipermail/dropbear/2006q1/000362.html
+ # Rob Landley "Is this a bug?" 24 Mar 2006
+ # dbclient user@system "sleep 10& echo hello"
+ #
+ # It should return right after printing hello, but it doesn't. It waits until
+ # the child process exits.
+
+ # failure is TimeoutExpired
+ redir = "" if fd == 1 else " >&2 "
+ r = dbclient(request, f"sleep 10& echo hello {redir}",
+ capture_output=True, timeout=2, text=True)
+ r.check_returncode()
+ st = r.stdout if fd == 1 else r.stderr
+
+ if fd == 2 and 'accepted unconditionally' in st:
+ # ignore hostkey warning, a bit of a hack
+ assert st.endswith("\n\nhello\n")
+ else:
+ assert st.rstrip() == "hello"
+
+
+def test_idle(request, dropbear):
+ # Idle test, -I 1 should make it return before the 2 second timeout
+ r = dbclient(request, "-I", "1", "echo zong; sleep 10",
+ capture_output=True, timeout=2, text=True)
+ r.check_returncode()
+ assert r.stdout.rstrip() == "zong"
+
+@pytest.mark.parametrize("size", [1, 4000, 40000])
+def test_netcat(request, dropbear, size):
+ opt = request.config.option
+ if opt.remote:
+ pytest.xfail("don't know netcat address for remote")
+
+ dat1 = os.urandom(size)
+ dat2 = os.urandom(size)
+ with HandleTcp(3344, 1, dat2) as tcp:
+ r = dbclient(request, "-B", "localhost:3344", input=dat1, capture_output=True)
+ r.check_returncode()
+ assert r.stdout == dat2
+ assert tcp.inbound() == dat1
+
+@pytest.mark.parametrize("size", [1, 4000, 40000])
+@pytest.mark.parametrize("fwd_flag", "LR")
+def test_tcpflushout(request, dropbear, size, fwd_flag):
+ """ Tests that an opened TCP connection prevent a SSH session from being closed
+ until that TCP connection has finished transferring
+ """
+ opt = request.config.option
+ if opt.remote:
+ pytest.xfail("don't know address for remote")
+
+ dat1 = os.urandom(size)
+ dat2 = os.urandom(size)
+ q = queue.Queue()
+ with HandleTcp(3344, timeout=1, response=q) as tcp:
+
+ r = dbclient(request, f"-{fwd_flag}", "7788:localhost:3344", "sleep 0.1; echo -n done",
+ text=True, background=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
+ # time to let the listener start
+ time.sleep(0.1)
+ # open a tcp connection
+ c = socket.create_connection(("localhost", 7788))
+
+ # wait for the shell to finish. sleep a bit longer in case it exits.
+ assert r.stdout.read(4) == "done"
+ time.sleep(0.1)
+
+ # now the shell has finished, we can write on the tcp socket
+ c.sendall(dat2)
+ c.shutdown(socket.SHUT_WR)
+ q.put(dat1)
+
+ # return a tcp response
+ q.put(None)
+ # check hasn't exited
+ assert r.poll() == None
+
+ # read the response
+ assert readall_socket(c) == dat1
+ c.close()
+ assert tcp.inbound() == dat2
+ # check has exited, allow time for dbclient to exit
+ time.sleep(0.1)
+ assert r.poll() == 0
diff --git a/test/test_dropbear.py b/test/test_dropbear.py
new file mode 100644
index 0000000..30665cd
--- /dev/null
+++ b/test/test_dropbear.py
@@ -0,0 +1,109 @@
+import subprocess
+import os
+import pty
+import tempfile
+import logging
+import time
+import socketserver
+import threading
+import queue
+
+import pytest
+
+LOCALADDR="127.0.5.5"
+
+@pytest.fixture(scope="module")
+def dropbear(request):
+ opt = request.config.option
+ if opt.remote:
+ yield None
+ return
+
+ args = [opt.dropbear,
+ "-p", LOCALADDR, # bind locally only
+ "-r", opt.hostkey,
+ "-p", opt.port,
+ "-F", "-E",
+ ]
+ p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
+ # Wait until it has started listening
+ for l in p.stderr:
+ if "Not backgrounding" in l:
+ break
+ # Check it's still running
+ assert p.poll() is None
+ # Ready
+ yield p
+ p.terminate()
+
+def dbclient(request, *args, **kwargs):
+ opt = request.config.option
+ host = opt.remote or LOCALADDR
+ base_args = [opt.dbclient, "-y", host, "-p", opt.port]
+ if opt.user:
+ full_args.extend(['-l', opt.user])
+ full_args = base_args + list(args)
+ bg = kwargs.get("background")
+ if "background" in kwargs:
+ del kwargs["background"]
+ if bg:
+ return subprocess.Popen(full_args, **kwargs)
+ else:
+ # wait for response
+ return subprocess.run(full_args, **kwargs)
+
+class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):
+ """ Listens for a single incoming request, sends a response if given,
+ and returns the inbound data.
+ Reponse can be a queue object, in which case each item in the queue will
+ be sent as a response, until it receives a None item.
+ """
+ def __init__(self, port, timeout, response=None):
+ super().__init__(('localhost', port), self.Handler)
+ self.port = port
+ self.timeout = timeout
+ self.response = response
+ self.sink = None
+
+ class Handler(socketserver.StreamRequestHandler):
+ def handle(self):
+ if isinstance(self.server.response, queue.Queue):
+ while True:
+ i = self.server.response.get()
+ if i is None:
+ break
+ self.wfile.write(i)
+ elif self.server.response:
+ self.wfile.write(self.server.response)
+ assert self.server.sink is None, ">1 request sent to handler"
+ self.server.sink = self.rfile.read()
+
+ def __enter__(self):
+ self.server_thread = threading.Thread(target=self.serve_forever)
+ self.server_thread.daemon = True
+ self.server_thread.start()
+ return self
+
+ def __exit__(self, *exc_stuff):
+ self.shutdown()
+ self.server_thread.join()
+
+ def inbound(self):
+ """ Returns the data sent to the socket """
+ return self.sink
+
+def readall_socket(sock):
+ b = []
+ while True:
+ i = sock.recv(4096)
+ if not i:
+ break
+ b.append(i)
+ return b''.join(b)
+
+# returns a str
+def random_alnum(size):
+ r = os.urandom(500 + size*5)
+ return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()
+
+