summaryrefslogtreecommitdiff
path: root/test/test_dropbear.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_dropbear.py')
-rw-r--r--test/test_dropbear.py109
1 files changed, 109 insertions, 0 deletions
diff --git a/test/test_dropbear.py b/test/test_dropbear.py
new file mode 100644
index 0000000..30665cd
--- /dev/null
+++ b/test/test_dropbear.py
@@ -0,0 +1,109 @@
+import subprocess
+import os
+import pty
+import tempfile
+import logging
+import time
+import socketserver
+import threading
+import queue
+
+import pytest
+
+LOCALADDR="127.0.5.5"
+
+@pytest.fixture(scope="module")
+def dropbear(request):
+ opt = request.config.option
+ if opt.remote:
+ yield None
+ return
+
+ args = [opt.dropbear,
+ "-p", LOCALADDR, # bind locally only
+ "-r", opt.hostkey,
+ "-p", opt.port,
+ "-F", "-E",
+ ]
+ p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
+ # Wait until it has started listening
+ for l in p.stderr:
+ if "Not backgrounding" in l:
+ break
+ # Check it's still running
+ assert p.poll() is None
+ # Ready
+ yield p
+ p.terminate()
+
+def dbclient(request, *args, **kwargs):
+ opt = request.config.option
+ host = opt.remote or LOCALADDR
+ base_args = [opt.dbclient, "-y", host, "-p", opt.port]
+ if opt.user:
+ full_args.extend(['-l', opt.user])
+ full_args = base_args + list(args)
+ bg = kwargs.get("background")
+ if "background" in kwargs:
+ del kwargs["background"]
+ if bg:
+ return subprocess.Popen(full_args, **kwargs)
+ else:
+ # wait for response
+ return subprocess.run(full_args, **kwargs)
+
+class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):
+ """ Listens for a single incoming request, sends a response if given,
+ and returns the inbound data.
+ Reponse can be a queue object, in which case each item in the queue will
+ be sent as a response, until it receives a None item.
+ """
+ def __init__(self, port, timeout, response=None):
+ super().__init__(('localhost', port), self.Handler)
+ self.port = port
+ self.timeout = timeout
+ self.response = response
+ self.sink = None
+
+ class Handler(socketserver.StreamRequestHandler):
+ def handle(self):
+ if isinstance(self.server.response, queue.Queue):
+ while True:
+ i = self.server.response.get()
+ if i is None:
+ break
+ self.wfile.write(i)
+ elif self.server.response:
+ self.wfile.write(self.server.response)
+ assert self.server.sink is None, ">1 request sent to handler"
+ self.server.sink = self.rfile.read()
+
+ def __enter__(self):
+ self.server_thread = threading.Thread(target=self.serve_forever)
+ self.server_thread.daemon = True
+ self.server_thread.start()
+ return self
+
+ def __exit__(self, *exc_stuff):
+ self.shutdown()
+ self.server_thread.join()
+
+ def inbound(self):
+ """ Returns the data sent to the socket """
+ return self.sink
+
+def readall_socket(sock):
+ b = []
+ while True:
+ i = sock.recv(4096)
+ if not i:
+ break
+ b.append(i)
+ return b''.join(b)
+
+# returns a str
+def random_alnum(size):
+ r = os.urandom(500 + size*5)
+ return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()
+
+