1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import os, cmd, config
from ms.file_utils import ifiles
from ms.concurrency import Popen
from operator import attrgetter
import sys, re
def strip_number(song, rx=re.compile(r"\s*\d\d?\. ")):
return rx.sub("", os.path.basename(song))
class TestProcess(object):
def __init__(self, number, name, popencmd):
self.name = name
self.number = number
self.popencmd = popencmd
def start(self):
self.proc = Popen(self.popencmd)
def stop(self):
if self.is_running():
self.proc.kill()
def is_running(self):
return hasattr(self, "proc") and self.proc.is_running()
def popencmd(song):
if sys.platform == "win32":
return "mplay32", "/play", "/close", song
else:
return "xterm", "-geometry", "70x14", "-e", "mpg123", song
def str2int(args):
for arg in args.split():
try:
i = int(arg)
except ValueError:
print "Warning: argument %s is not an integer, ignored" % arg
else:
yield i
class Tester(cmd.Cmd):
use_rawinput = False
prompt = ">>" + chr(0)
def preloop(self):
self.songs = list(ifiles(config.MUSICDIR,
lambda f: f.endswith(".mp3")))
self.tests = dict(
[i, TestProcess(i, strip_number(song), popencmd(song))]
for i, song in enumerate(self.songs))
self.started_tests = set()
self.do_show()
def running_tests(self):
for test in self.started_tests:
if test.is_running():
yield test
def run_test(self, testnumber):
if testnumber in self.tests:
test = self.tests[testnumber]
if test.is_running():
print "Test #%s is already running!" % test.number
else:
test.start()
self.started_tests.add(test)
print "test #%s started" % test.number
else:
print "There is no test #%s" % testnumber
def stop_test(self, testnumber):
if testnumber in (test.number for test in self.running_tests()):
self.tests[testnumber].stop()
print "Test #%s stopped." % testnumber
else:
print "There is no process #%s running" % testnumber
def do_show(self, arg=""):
if arg == "runnable":
self.show_runnable()
elif arg == "already_run":
self.show_already_run()
elif arg is "":
self.show_already_run()
self.show_runnable()
else:
print "Unknown argument %s" % arg
def show_runnable(self):
print "Runnable tests:"
if set(self.tests.itervalues()) != self.started_tests:
for test in self.tests.itervalues():
if not test in self.started_tests:
print test.number, test.name
else: # all tests have been started
print "None"
def show_already_run(self):
print "Already run tests:"
if not self.started_tests:
print "None"
else:
for test in sorted(self.started_tests, key=attrgetter("number")):
print test.number, test.name
def do_run(self, args):
run = map(self.run_test, str2int(args))
if not run: map(self.run_test, range(len(self.tests))) # run all
def do_stop(self, args):
stop = map(self.stop_test, str2int(args))
if not stop: self.postloop() # stop all
def do_quit(self, arg):
print "exiting ..."
return True
def postloop(self):
for test in self.running_tests():
self.stop_test(test.number)
if __name__ == "__main__":
Tester().cmdloop()
|