summaryrefslogtreecommitdiff
path: root/test/test_dropbear.py
blob: 05ea95341508290d2107b94d76b6c8bf55456ecc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import subprocess
import os
import pty
import tempfile
import logging
import time
import socketserver
import threading
import queue

import pytest

LOCALADDR="127.0.5.5"

@pytest.fixture(scope="module")
def dropbear(request):
	opt = request.config.option
	if opt.remote:
		yield None
		return

	args = [opt.dropbear,
		"-p", LOCALADDR, # bind locally only
		"-r", opt.hostkey,
		"-p", opt.port,
		"-F", "-E",
		]
	p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
	# Wait until it has started listening
	for l in p.stderr:
		if "Not backgrounding" in l:
			break
	# Check it's still running
		assert p.poll() is None
	# Ready
	yield p
	p.terminate()
	print("Terminated dropbear. Flushing output:")
	for l in p.stderr:
		print(l.rstrip())
	print("Done")

def dbclient(request, *args, **kwargs):
	opt = request.config.option
	host = opt.remote or LOCALADDR
	base_args = [opt.dbclient, "-y", host, "-p", opt.port]
	if opt.user:
		full_args.extend(['-l', opt.user])
	full_args = base_args + list(args)
	bg = kwargs.get("background")
	if "background" in kwargs:
		del kwargs["background"]
	if bg:
		return subprocess.Popen(full_args, **kwargs)
	else:
		kwargs.setdefault("timeout", 10)
		# wait for response
		return subprocess.run(full_args, **kwargs)

class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):
	""" Listens for a single incoming request, sends a response if given,
	and returns the inbound data.
	Reponse can be a queue object, in which case each item in the queue will
	be sent as a response, until it receives a None item.
	"""
	def __init__(self, port, timeout, response=None):
		super().__init__(('localhost', port), self.Handler)
		self.port = 	port
		self.timeout = timeout
		self.response = response
		self.sink = None

	class Handler(socketserver.StreamRequestHandler):
		def handle(self):
			if isinstance(self.server.response, queue.Queue):
				while True:
					i = self.server.response.get()
					if i is None:
						break
					self.wfile.write(i)
			elif self.server.response:
				self.wfile.write(self.server.response)
			assert self.server.sink is None, ">1 request sent to handler"
			self.server.sink = self.rfile.read()

	def __enter__(self):
		self.server_thread = threading.Thread(target=self.serve_forever)
		self.server_thread.daemon = True
		self.server_thread.start()
		return self

	def __exit__(self, *exc_stuff):
		self.shutdown()
		self.server_thread.join()

	def inbound(self):
		""" Returns the data sent to the socket """
		return self.sink

def readall_socket(sock):
	b = []
	while True:
		i = sock.recv(4096)
		if not i:
			break
		b.append(i)
	return b''.join(b)

# returns a str
def random_alnum(size):
	r = os.urandom(500 + size*5)
	return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()