From efbbee39f98003078d1d2c7856e2f2d502be0d21 Mon Sep 17 00:00:00 2001 From: Michele Simionato Date: Wed, 26 Jan 2011 09:29:53 +0100 Subject: Started work on the monitor framework for plac (and TkMonitor) --- plac/doc/test_runp.py | 26 ++++++ plac/doc/test_server.py | 18 ++-- plac/plac.py | 4 +- plac/plac_ext.py | 239 ++++++++++++++++++++++++++++++++++++++---------- plac/setup.py | 3 +- 5 files changed, 233 insertions(+), 57 deletions(-) create mode 100644 plac/doc/test_runp.py diff --git a/plac/doc/test_runp.py b/plac/doc/test_runp.py new file mode 100644 index 0000000..65980f8 --- /dev/null +++ b/plac/doc/test_runp.py @@ -0,0 +1,26 @@ +""" +This test should work on Linux if you have both Tkinter installed. +""" + +from __future__ import with_statement +import plac, plac_tk, time + +def gen(n): + for i in range(n): + yield str(i) + time.sleep(.1) + +def test(): + tasks = plac.runp([gen(3), gen(5), gen(10)]) + for t in tasks: + t.result + +def test_tkmonitor(): + mon = plac_tk.TkMonitor('tkmon') + i = plac.Interpreter.from_gen([gen(3), gen(5), gen(10)], 'p', [mon]) + with i: + for t in i.tasks(): + t.run() + for t in i.tasks(): + t.result + i.man.stop() diff --git a/plac/doc/test_server.py b/plac/doc/test_server.py index a9904ee..81214ee 100644 --- a/plac/doc/test_server.py +++ b/plac/doc/test_server.py @@ -14,22 +14,26 @@ wrong command showall '''] -def client_send(commands, port): +def telnet(commands, port): time.sleep(.5) # wait a bit for the server to start po = subprocess.Popen(['telnet', 'localhost', str(port)], stdin=subprocess.PIPE) - for cmd in commands.splitlines(): - po.stdin.write(cmd + '\n') - time.sleep(.1) # wait a bit for the server to answer + try: + for cmd in commands.splitlines(): + po.stdin.write(cmd + '\n') + time.sleep(.1) # wait a bit for the server to answer + finally: + po.stdin.close() def test(): port = random.choice(range(2000, 20000)) clients = [] for cmds in COMMANDS: - cl = multiprocessing.Process(target=client_send, args=(cmds, port)) + cl = multiprocessing.Process(target=telnet, args=(cmds, port)) clients.append(cl) cl.start() - i.stop_server(wait=1) - i.start_server(port, timeout=.1) + + i.start_server(port, timeout=.5) for cl in clients: cl.join() + i.stop_server() diff --git a/plac/plac.py b/plac/plac.py index ed6d3a9..8296500 100644 --- a/plac/plac.py +++ b/plac/plac.py @@ -27,9 +27,9 @@ See doc/plac.pdf, doc/plac_adv.pdf for the documentation. """ -__version__ = '0.7.6' +__version__ = '0.8.0' from plac_core import * if sys.version >= '2.5': - from plac_ext import Interpreter, import_main, ReadlineInput, stdout, runp + from plac_ext import Interpreter, import_main, ReadlineInput, stdout, runp, Manager diff --git a/plac/plac_ext.py b/plac/plac_ext.py index 6678477..a7596c3 100644 --- a/plac/plac_ext.py +++ b/plac/plac_ext.py @@ -220,7 +220,10 @@ class BaseTask(object): self.str, self.etype, self.exc, self.tb = '', None, None, None self.status = 'SUBMITTED' self.outlist = [] - + + def notify(self, msg): + "Notifies the underlying monitor. To be implemented" + def _wrap(self, genobj, stringify_tb=False): """ Wrap the genobj into a generator managing the exceptions, @@ -234,6 +237,7 @@ class BaseTask(object): raise GeneratorExit if value is not None: # add output self.outlist.append(value) + self.notify(str(value)) yield except (GeneratorExit, TerminatedProcess, KeyboardInterrupt): # soft termination @@ -352,13 +356,19 @@ class MPTask(BaseTask): except: # the process died hard return [] - def __init__(self, no, arglist, genobj, mp_manager): + def notify(self, msg): + self.man.send('notify_task %d %r' % (self.no, msg)) + + def __init__(self, no, arglist, genobj, manager): + """ + The monitor has a .send method and a .man multiprocessing.Manager + """ self.no = no self.arglist = arglist self._genobj = self._wrap(genobj, stringify_tb=True) - self.mp_manager = mp_manager - self._outlist = self.mp_manager.list() - self.ns = self.mp_manager.Namespace() + self.man = manager + self._outlist = manager.mp.list() + self.ns = manager.mp.Namespace() self.status = 'SUBMITTED' self.etype, self.exc, self.tb = None, None, None self.str = repr(self) @@ -381,6 +391,7 @@ class MPTask(BaseTask): class HelpSummary(object): "Build the help summary consistently with the cmd module" + @classmethod def make(cls, obj, specialcommands): c = cmd.Cmd(stdout=cls()) @@ -394,10 +405,13 @@ class HelpSummary(object): c.print_topics('threaded commands', sorted(obj.thcommands), 15, 80) return c.stdout + def __init__(self): self._ls = [] + def write(self, s): self._ls.append(s) + def __str__(self): return ''.join(self._ls) @@ -415,7 +429,7 @@ class TaskManager(object): if obj.mpcommands or obj.thcommands: self.specialcommands.update(['.kill', '.list', '.output']) self.helpsummary = HelpSummary.make(obj, self.specialcommands) - self.mp_manager = multiprocessing.Manager() if obj.mpcommands else None + self.man = Manager() if obj.mpcommands else None signal.signal(signal.SIGTERM, terminatedProcess) def close(self): @@ -427,8 +441,8 @@ class TaskManager(object): task.wait() except: # task killed, nothing to wait pass - if self.mp_manager: - self.mp_manager.shutdown() + if self.man: + self.man.stop() def _get_latest(self, taskno=-1, status=None): "Get the latest submitted task from the registry" @@ -519,17 +533,22 @@ class Process(subprocess.Popen): "Start the interpreter specified by the params in a subprocess" def __init__(self, params): - code = '''import plac + signal.signal(signal.SIGPIPE, signal.SIG_DFL) + # to avoid broken pipe messages + code = '''import plac, sys +sys.argv[0] = '<%s>' plac.Interpreter(plac.import_main(*%s)).interact(prompt='i>\\n') -''' % params +''' % (params[0], params) subprocess.Popen.__init__( self, [sys.executable, '-u', '-c', code], stdin=subprocess.PIPE, stdout=subprocess.PIPE) + self.man = multiprocessing.Manager() def close(self): "Close stdin and stdout" self.stdin.close() self.stdout.close() + self.man.shutdown() def recv(self): # char-by-char cannot work "Return the output of the subprocess, line-by-line until the prompt" @@ -546,6 +565,111 @@ plac.Interpreter(plac.import_main(*%s)).interact(prompt='i>\\n') self.stdin.write(line + os.linesep) return self.recv() +class Monitor(object): + """ + Base monitor class with methods add_task/del_task/notify_task + and start/stop/schedule/slave. + """ + commands = 'add_task', 'del_task', 'notify_task' + def __init__(self, name): + self.name = name + def add_task(self, taskno): + pass + def del_task(self, taskno): + pass + def notify_task(self, taskno, msg): + pass + def start(self): + pass + def stop(self): + pass + def schedule(self, seconds, display, arg): + pass + +import Queue + +class SlaveProcess(object): + """ + Spawn a slave process reading from an input queue and displaying + on a monitor object. Methods are start/send/stop. + """ + def __init__(self, mon): + self.mon= mon + self.queue = multiprocessing.Queue() + self.proc = multiprocessing.Process(None, self._run) + + def start(self): + self.proc.start() + + def send(self, line): + self.queue.put(line) + + def stop(self): + self.queue.close() + self.proc.terminate() + + def _sendline(self, i): + "Send a line to the underlying monitor" + try: + line = self.queue.get_nowait() + except Queue.Empty: + pass + else: + i.send(line) + self.mon.schedule(.1, self._sendline, i) + + def _run(self): + with Interpreter(self.mon) as i: + # .schedule() must be invoked inside the with block + self.mon.schedule(.1, self._sendline, i) + self.mon.start() + +class StartStopObject(object): + started = False + def start(self): pass + def stop(self): pass + +class Manager(StartStopObject): + """ + The plac Manager contains a multiprocessing.Manager and a set + of slave monitor processes to which we can send commands. There + is a manager for each interpreter with mpcommands. + """ + def add(self, monitor): + 'Add or replace a monitor in the registry' + slave = SlaveProcess(monitor) + name = slave.name = monitor.name + self.registry[name] = slave + + def delete(self, name): + 'Remove a named monitor from the registry' + del self.registry[name] + + def __init__(self): + self.registry = {} + self.started = False + self.mp = None + + # can be called more than once + def start(self): + if self.mp is None: + self.mp = multiprocessing.Manager() + for slave in self.registry.itervalues(): + slave.start() + self.started = True + + def stop(self): + for slave in self.registry.itervalues(): + slave.stop() + if self.mp: + self.mp.shutdown() + self.mp = None + self.started = False + + def send(self, line): + for slave in self.registry.itervalues(): + slave.send(line) + ########################## plac server ############################## import asyncore, asynchat, socket @@ -621,29 +745,6 @@ class Interpreter(object): A context manager with a .send method and a few utility methods: execute, test and doctest. """ - - @classmethod - def call(cls, factory, arglist=sys.argv[1:], - commentchar='#', split=shlex.split, - stdin=sys.stdin, prompt='i> ', verbose=False): - """ - Call a container factory with the arglist and instantiate an - interpreter object. If there are remaining arguments, send them to the - interpreter, else start an interactive session. - """ - obj = partial_call(factory, arglist) - i = cls(obj, commentchar, split) - if i.obj._args_: - with i: - task = i.send(i.obj._args_) # synchronous - if task.exc: - raise task.etype, task.exc, task.tb - print(task) - elif i.obj._interact_: - i.interact(stdin, prompt, verbose) - else: - i.parser.print_usage() - def __init__(self, obj, commentchar='#', split=shlex.split): self.obj = obj try: @@ -654,17 +755,18 @@ class Interpreter(object): self.split = split self._set_commands(obj) self.tm = TaskManager(obj) + self.man = self.tm.man self.parser = plac_core.parser_from(obj, prog='', add_help=False) if self.commands: self.commands.update(self.tm.specialcommands) - self.parser.addsubcommands(self.tm.specialcommands, self.tm, - title='special commands') + self.parser.addsubcommands( + self.tm.specialcommands, self.tm, title='special commands') if obj.mpcommands: - self.parser.addsubcommands(obj.mpcommands, obj, - title='commands run in external processes') + self.parser.addsubcommands( + obj.mpcommands, obj, title='commands run in external processes') if obj.thcommands: - self.parser.addsubcommands(obj.thcommands, obj, - title='threaded commands') + self.parser.addsubcommands( + obj.thcommands, obj, title='threaded commands') self.parser.error = lambda msg: sys.exit(msg) # patch the parser self._interpreter = None @@ -705,9 +807,14 @@ class Interpreter(object): arglist = line if not arglist: return nulltask + m = self.tm.man # manager + if m and not m.started: + m.start() task = self._interpreter.send(arglist) # nonblocking if not plac_core._match_cmd(arglist[0], self.tm.specialcommands): self.tm.registry[task.no] = task + if m: + m.send('add_task %d' % task.no) return task def send(self, line): @@ -745,7 +852,7 @@ class Interpreter(object): if not plac_core.iterable(result): # atomic result task = SynTask(no, arglist, gen_val(result)) elif cmd in self.obj.mpcommands: - task = MPTask(no, arglist, result, self.tm.mp_manager) + task = MPTask(no, arglist, result, self.tm.man) elif cmd in self.obj.thcommands: task = ThreadedTask(no, arglist, result) else: # blocking task @@ -867,20 +974,62 @@ class Interpreter(object): finally: asyncore.close_all() - def stop_server(self, wait=0.0): + def stop_server(self, after=0.0): "Stops the asyncore server, possibly after a given number of seconds" - threading.Timer(wait, asyncore.socket_map.clear).start() + threading.Timer(after, asyncore.socket_map.clear).start() + + def add_monitor(self, mon): + self.man.add(mon) + + def del_monitor(self, name): + self.man.delete(name) + + @classmethod + def call(cls, factory, arglist=sys.argv[1:], + commentchar='#', split=shlex.split, + stdin=sys.stdin, prompt='i> ', verbose=False): + """ + Call a container factory with the arglist and instantiate an + interpreter object. If there are remaining arguments, send them to the + interpreter, else start an interactive session. + """ + obj = partial_call(factory, arglist) + i = cls(obj, commentchar, split) + if i.obj._args_: + with i: + task = i.send(i.obj._args_) # synchronous + if task.exc: + raise task.etype, task.exc, task.tb + print(task) + elif i.obj._interact_: + i.interact(stdin, prompt, verbose) + else: + i.parser.print_usage() + + @classmethod + def from_gen(cls, genseq, mode='p', monitors=()): + "Returns an interpreter object able to run the tasks in the sequence" + assert mode in 'pt', mode + launcher = _TaskLauncher(genseq, mode) + self = cls(launcher).__enter__() + for mon in monitors: + self.add_monitor(mon) + for i in range(len(launcher.genlist)): + self.submit('rungen %d' % (i + 1)) + return self #################################### runp ##################################### class _TaskLauncher(object): "Helper for runp" + def __init__(self, genseq, mode): if mode == 'p': self.mpcommands = ['rungen'] else: self.thcommands = ['rungen'] self.genlist = list(genseq) + def rungen(self, i): for out in self.genlist[int(i) - 1]: yield out @@ -890,11 +1039,7 @@ def runp(genseq, mode='p', start=True): or 't' (use threads). Return a list of running task objects. If start is False, the tasks are only submitted and not automatically started. """ - assert mode in 'pt', mode - launcher = _TaskLauncher(genseq, mode) - inter = Interpreter(launcher).__enter__() - for i in range(len(launcher.genlist)): - inter.submit('rungen %d' % (i + 1)) + inter = Interpreter.from_gen(genseq, mode) if start: for task in inter.tasks(): task.run() diff --git a/plac/setup.py b/plac/setup.py index aafe43d..006786a 100644 --- a/plac/setup.py +++ b/plac/setup.py @@ -32,7 +32,7 @@ if __name__ == '__main__': author_email='michele.simionato@gmail.com', url='http://pypi.python.org/pypi/plac', license="BSD License", - py_modules = ['plac_core', 'plac_ext', 'plac'], + py_modules = ['plac_core', 'plac_ext', 'plac_tk', 'plac'], scripts = ['plac_runner.py'], install_requires=require('argparse', 'multiprocessing'), use_2to3=True, @@ -44,6 +44,7 @@ if __name__ == '__main__': 'Natural Language :: English', 'Operating System :: OS Independent', 'Programming Language :: Python', + 'Programming Language :: Python :: 3', 'Topic :: Software Development :: Libraries', 'Topic :: Utilities'], zip_safe=False) -- cgit v1.2.1