diff options
Diffstat (limited to 'plac/plac_ext.py')
-rw-r--r-- | plac/plac_ext.py | 38 |
1 files changed, 16 insertions, 22 deletions
diff --git a/plac/plac_ext.py b/plac/plac_ext.py index a7596c3..4429aec 100644 --- a/plac/plac_ext.py +++ b/plac/plac_ext.py @@ -357,7 +357,7 @@ class MPTask(BaseTask): return [] def notify(self, msg): - self.man.send('notify_task %d %r' % (self.no, msg)) + self.man.send('notify_listener %d %r' % (self.no, msg)) def __init__(self, no, arglist, genobj, manager): """ @@ -567,17 +567,17 @@ plac.Interpreter(plac.import_main(*%s)).interact(prompt='i>\\n') class Monitor(object): """ - Base monitor class with methods add_task/del_task/notify_task + Base monitor class with methods add_listener/del_listener/notify_listener and start/stop/schedule/slave. """ - commands = 'add_task', 'del_task', 'notify_task' + commands = 'add_listener', 'del_listener', 'notify_listener' def __init__(self, name): self.name = name - def add_task(self, taskno): + def add_listener(self, taskno): pass - def del_task(self, taskno): + def del_listener(self, taskno): pass - def notify_task(self, taskno, msg): + def notify_listener(self, taskno, msg): pass def start(self): pass @@ -622,7 +622,7 @@ class SlaveProcess(object): with Interpreter(self.mon) as i: # .schedule() must be invoked inside the with block self.mon.schedule(.1, self._sendline, i) - self.mon.start() + self.mon.run() class StartStopObject(object): started = False @@ -814,7 +814,7 @@ class Interpreter(object): if not plac_core._match_cmd(arglist[0], self.tm.specialcommands): self.tm.registry[task.no] = task if m: - m.send('add_task %d' % task.no) + m.send('add_listener %d' % task.no) return task def send(self, line): @@ -1006,18 +1006,6 @@ class Interpreter(object): else: i.parser.print_usage() - @classmethod - def from_gen(cls, genseq, mode='p', monitors=()): - "Returns an interpreter object able to run the tasks in the sequence" - assert mode in 'pt', mode - launcher = _TaskLauncher(genseq, mode) - self = cls(launcher).__enter__() - for mon in monitors: - self.add_monitor(mon) - for i in range(len(launcher.genlist)): - self.submit('rungen %d' % (i + 1)) - return self - #################################### runp ##################################### class _TaskLauncher(object): @@ -1034,12 +1022,18 @@ class _TaskLauncher(object): for out in self.genlist[int(i) - 1]: yield out -def runp(genseq, mode='p', start=True): +def runp(genseq, mode='p', monitors=(), start=True): """Run a sequence of generators in parallel. Mode can be 'p' (use processes) or 't' (use threads). Return a list of running task objects. If start is False, the tasks are only submitted and not automatically started. """ - inter = Interpreter.from_gen(genseq, mode) + assert mode in 'pt', mode + launcher = _TaskLauncher(genseq, mode) + inter = Interpreter(launcher).__enter__() + for mon in monitors: # must be added before submit + inter.add_monitor(mon) + for i in range(len(launcher.genlist)): + inter.submit('rungen %d' % (i + 1)) if start: for task in inter.tasks(): task.run() |