diff options
Diffstat (limited to 'plac/plac_ext.py')
-rw-r--r-- | plac/plac_ext.py | 239 |
1 files changed, 192 insertions, 47 deletions
diff --git a/plac/plac_ext.py b/plac/plac_ext.py index 6678477..a7596c3 100644 --- a/plac/plac_ext.py +++ b/plac/plac_ext.py @@ -220,7 +220,10 @@ class BaseTask(object): self.str, self.etype, self.exc, self.tb = '', None, None, None self.status = 'SUBMITTED' self.outlist = [] - + + def notify(self, msg): + "Notifies the underlying monitor. To be implemented" + def _wrap(self, genobj, stringify_tb=False): """ Wrap the genobj into a generator managing the exceptions, @@ -234,6 +237,7 @@ class BaseTask(object): raise GeneratorExit if value is not None: # add output self.outlist.append(value) + self.notify(str(value)) yield except (GeneratorExit, TerminatedProcess, KeyboardInterrupt): # soft termination @@ -352,13 +356,19 @@ class MPTask(BaseTask): except: # the process died hard return [] - def __init__(self, no, arglist, genobj, mp_manager): + def notify(self, msg): + self.man.send('notify_task %d %r' % (self.no, msg)) + + def __init__(self, no, arglist, genobj, manager): + """ + The monitor has a .send method and a .man multiprocessing.Manager + """ self.no = no self.arglist = arglist self._genobj = self._wrap(genobj, stringify_tb=True) - self.mp_manager = mp_manager - self._outlist = self.mp_manager.list() - self.ns = self.mp_manager.Namespace() + self.man = manager + self._outlist = manager.mp.list() + self.ns = manager.mp.Namespace() self.status = 'SUBMITTED' self.etype, self.exc, self.tb = None, None, None self.str = repr(self) @@ -381,6 +391,7 @@ class MPTask(BaseTask): class HelpSummary(object): "Build the help summary consistently with the cmd module" + @classmethod def make(cls, obj, specialcommands): c = cmd.Cmd(stdout=cls()) @@ -394,10 +405,13 @@ class HelpSummary(object): c.print_topics('threaded commands', sorted(obj.thcommands), 15, 80) return c.stdout + def __init__(self): self._ls = [] + def write(self, s): self._ls.append(s) + def __str__(self): return ''.join(self._ls) @@ -415,7 +429,7 @@ class TaskManager(object): if obj.mpcommands or obj.thcommands: self.specialcommands.update(['.kill', '.list', '.output']) self.helpsummary = HelpSummary.make(obj, self.specialcommands) - self.mp_manager = multiprocessing.Manager() if obj.mpcommands else None + self.man = Manager() if obj.mpcommands else None signal.signal(signal.SIGTERM, terminatedProcess) def close(self): @@ -427,8 +441,8 @@ class TaskManager(object): task.wait() except: # task killed, nothing to wait pass - if self.mp_manager: - self.mp_manager.shutdown() + if self.man: + self.man.stop() def _get_latest(self, taskno=-1, status=None): "Get the latest submitted task from the registry" @@ -519,17 +533,22 @@ class Process(subprocess.Popen): "Start the interpreter specified by the params in a subprocess" def __init__(self, params): - code = '''import plac + signal.signal(signal.SIGPIPE, signal.SIG_DFL) + # to avoid broken pipe messages + code = '''import plac, sys +sys.argv[0] = '<%s>' plac.Interpreter(plac.import_main(*%s)).interact(prompt='i>\\n') -''' % params +''' % (params[0], params) subprocess.Popen.__init__( self, [sys.executable, '-u', '-c', code], stdin=subprocess.PIPE, stdout=subprocess.PIPE) + self.man = multiprocessing.Manager() def close(self): "Close stdin and stdout" self.stdin.close() self.stdout.close() + self.man.shutdown() def recv(self): # char-by-char cannot work "Return the output of the subprocess, line-by-line until the prompt" @@ -546,6 +565,111 @@ plac.Interpreter(plac.import_main(*%s)).interact(prompt='i>\\n') self.stdin.write(line + os.linesep) return self.recv() +class Monitor(object): + """ + Base monitor class with methods add_task/del_task/notify_task + and start/stop/schedule/slave. + """ + commands = 'add_task', 'del_task', 'notify_task' + def __init__(self, name): + self.name = name + def add_task(self, taskno): + pass + def del_task(self, taskno): + pass + def notify_task(self, taskno, msg): + pass + def start(self): + pass + def stop(self): + pass + def schedule(self, seconds, display, arg): + pass + +import Queue + +class SlaveProcess(object): + """ + Spawn a slave process reading from an input queue and displaying + on a monitor object. Methods are start/send/stop. + """ + def __init__(self, mon): + self.mon= mon + self.queue = multiprocessing.Queue() + self.proc = multiprocessing.Process(None, self._run) + + def start(self): + self.proc.start() + + def send(self, line): + self.queue.put(line) + + def stop(self): + self.queue.close() + self.proc.terminate() + + def _sendline(self, i): + "Send a line to the underlying monitor" + try: + line = self.queue.get_nowait() + except Queue.Empty: + pass + else: + i.send(line) + self.mon.schedule(.1, self._sendline, i) + + def _run(self): + with Interpreter(self.mon) as i: + # .schedule() must be invoked inside the with block + self.mon.schedule(.1, self._sendline, i) + self.mon.start() + +class StartStopObject(object): + started = False + def start(self): pass + def stop(self): pass + +class Manager(StartStopObject): + """ + The plac Manager contains a multiprocessing.Manager and a set + of slave monitor processes to which we can send commands. There + is a manager for each interpreter with mpcommands. + """ + def add(self, monitor): + 'Add or replace a monitor in the registry' + slave = SlaveProcess(monitor) + name = slave.name = monitor.name + self.registry[name] = slave + + def delete(self, name): + 'Remove a named monitor from the registry' + del self.registry[name] + + def __init__(self): + self.registry = {} + self.started = False + self.mp = None + + # can be called more than once + def start(self): + if self.mp is None: + self.mp = multiprocessing.Manager() + for slave in self.registry.itervalues(): + slave.start() + self.started = True + + def stop(self): + for slave in self.registry.itervalues(): + slave.stop() + if self.mp: + self.mp.shutdown() + self.mp = None + self.started = False + + def send(self, line): + for slave in self.registry.itervalues(): + slave.send(line) + ########################## plac server ############################## import asyncore, asynchat, socket @@ -621,29 +745,6 @@ class Interpreter(object): A context manager with a .send method and a few utility methods: execute, test and doctest. """ - - @classmethod - def call(cls, factory, arglist=sys.argv[1:], - commentchar='#', split=shlex.split, - stdin=sys.stdin, prompt='i> ', verbose=False): - """ - Call a container factory with the arglist and instantiate an - interpreter object. If there are remaining arguments, send them to the - interpreter, else start an interactive session. - """ - obj = partial_call(factory, arglist) - i = cls(obj, commentchar, split) - if i.obj._args_: - with i: - task = i.send(i.obj._args_) # synchronous - if task.exc: - raise task.etype, task.exc, task.tb - print(task) - elif i.obj._interact_: - i.interact(stdin, prompt, verbose) - else: - i.parser.print_usage() - def __init__(self, obj, commentchar='#', split=shlex.split): self.obj = obj try: @@ -654,17 +755,18 @@ class Interpreter(object): self.split = split self._set_commands(obj) self.tm = TaskManager(obj) + self.man = self.tm.man self.parser = plac_core.parser_from(obj, prog='', add_help=False) if self.commands: self.commands.update(self.tm.specialcommands) - self.parser.addsubcommands(self.tm.specialcommands, self.tm, - title='special commands') + self.parser.addsubcommands( + self.tm.specialcommands, self.tm, title='special commands') if obj.mpcommands: - self.parser.addsubcommands(obj.mpcommands, obj, - title='commands run in external processes') + self.parser.addsubcommands( + obj.mpcommands, obj, title='commands run in external processes') if obj.thcommands: - self.parser.addsubcommands(obj.thcommands, obj, - title='threaded commands') + self.parser.addsubcommands( + obj.thcommands, obj, title='threaded commands') self.parser.error = lambda msg: sys.exit(msg) # patch the parser self._interpreter = None @@ -705,9 +807,14 @@ class Interpreter(object): arglist = line if not arglist: return nulltask + m = self.tm.man # manager + if m and not m.started: + m.start() task = self._interpreter.send(arglist) # nonblocking if not plac_core._match_cmd(arglist[0], self.tm.specialcommands): self.tm.registry[task.no] = task + if m: + m.send('add_task %d' % task.no) return task def send(self, line): @@ -745,7 +852,7 @@ class Interpreter(object): if not plac_core.iterable(result): # atomic result task = SynTask(no, arglist, gen_val(result)) elif cmd in self.obj.mpcommands: - task = MPTask(no, arglist, result, self.tm.mp_manager) + task = MPTask(no, arglist, result, self.tm.man) elif cmd in self.obj.thcommands: task = ThreadedTask(no, arglist, result) else: # blocking task @@ -867,20 +974,62 @@ class Interpreter(object): finally: asyncore.close_all() - def stop_server(self, wait=0.0): + def stop_server(self, after=0.0): "Stops the asyncore server, possibly after a given number of seconds" - threading.Timer(wait, asyncore.socket_map.clear).start() + threading.Timer(after, asyncore.socket_map.clear).start() + + def add_monitor(self, mon): + self.man.add(mon) + + def del_monitor(self, name): + self.man.delete(name) + + @classmethod + def call(cls, factory, arglist=sys.argv[1:], + commentchar='#', split=shlex.split, + stdin=sys.stdin, prompt='i> ', verbose=False): + """ + Call a container factory with the arglist and instantiate an + interpreter object. If there are remaining arguments, send them to the + interpreter, else start an interactive session. + """ + obj = partial_call(factory, arglist) + i = cls(obj, commentchar, split) + if i.obj._args_: + with i: + task = i.send(i.obj._args_) # synchronous + if task.exc: + raise task.etype, task.exc, task.tb + print(task) + elif i.obj._interact_: + i.interact(stdin, prompt, verbose) + else: + i.parser.print_usage() + + @classmethod + def from_gen(cls, genseq, mode='p', monitors=()): + "Returns an interpreter object able to run the tasks in the sequence" + assert mode in 'pt', mode + launcher = _TaskLauncher(genseq, mode) + self = cls(launcher).__enter__() + for mon in monitors: + self.add_monitor(mon) + for i in range(len(launcher.genlist)): + self.submit('rungen %d' % (i + 1)) + return self #################################### runp ##################################### class _TaskLauncher(object): "Helper for runp" + def __init__(self, genseq, mode): if mode == 'p': self.mpcommands = ['rungen'] else: self.thcommands = ['rungen'] self.genlist = list(genseq) + def rungen(self, i): for out in self.genlist[int(i) - 1]: yield out @@ -890,11 +1039,7 @@ def runp(genseq, mode='p', start=True): or 't' (use threads). Return a list of running task objects. If start is False, the tasks are only submitted and not automatically started. """ - assert mode in 'pt', mode - launcher = _TaskLauncher(genseq, mode) - inter = Interpreter(launcher).__enter__() - for i in range(len(launcher.genlist)): - inter.submit('rungen %d' % (i + 1)) + inter = Interpreter.from_gen(genseq, mode) if start: for task in inter.tasks(): task.run() |