blob: a46535da80907b8a0cd32ddc5c16d857b7cee843 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
import multiprocessing, subprocess, time, random
import plac
from ishelve2 import ShelveInterface
i = plac.Interpreter(ShelveInterface(configfile=None))
COMMANDS = ['''\
help
set a 1
''',
'''\
set b 1
wrong command
showall
''']
def telnet(commands, port):
time.sleep(.5) # wait a bit for the server to start
po = subprocess.Popen(['telnet', 'localhost', str(port)],
stdin=subprocess.PIPE)
try:
for cmd in commands.splitlines():
po.stdin.write(cmd + '\n')
time.sleep(.1) # wait a bit for the server to answer
finally:
po.stdin.close()
def test():
port = random.choice(range(2000, 20000))
clients = []
for cmds in COMMANDS:
cl = multiprocessing.Process(target=telnet, args=(cmds, port))
clients.append(cl)
cl.start()
i.start_server(port, timeout=.5)
for cl in clients:
cl.join()
i.stop_server()
|