summaryrefslogtreecommitdiff
path: root/plac/plac_ext.py
diff options
context:
space:
mode:
Diffstat (limited to 'plac/plac_ext.py')
-rw-r--r--plac/plac_ext.py38
1 files changed, 16 insertions, 22 deletions
diff --git a/plac/plac_ext.py b/plac/plac_ext.py
index a7596c3..4429aec 100644
--- a/plac/plac_ext.py
+++ b/plac/plac_ext.py
@@ -357,7 +357,7 @@ class MPTask(BaseTask):
return []
def notify(self, msg):
- self.man.send('notify_task %d %r' % (self.no, msg))
+ self.man.send('notify_listener %d %r' % (self.no, msg))
def __init__(self, no, arglist, genobj, manager):
"""
@@ -567,17 +567,17 @@ plac.Interpreter(plac.import_main(*%s)).interact(prompt='i>\\n')
class Monitor(object):
"""
- Base monitor class with methods add_task/del_task/notify_task
+ Base monitor class with methods add_listener/del_listener/notify_listener
and start/stop/schedule/slave.
"""
- commands = 'add_task', 'del_task', 'notify_task'
+ commands = 'add_listener', 'del_listener', 'notify_listener'
def __init__(self, name):
self.name = name
- def add_task(self, taskno):
+ def add_listener(self, taskno):
pass
- def del_task(self, taskno):
+ def del_listener(self, taskno):
pass
- def notify_task(self, taskno, msg):
+ def notify_listener(self, taskno, msg):
pass
def start(self):
pass
@@ -622,7 +622,7 @@ class SlaveProcess(object):
with Interpreter(self.mon) as i:
# .schedule() must be invoked inside the with block
self.mon.schedule(.1, self._sendline, i)
- self.mon.start()
+ self.mon.run()
class StartStopObject(object):
started = False
@@ -814,7 +814,7 @@ class Interpreter(object):
if not plac_core._match_cmd(arglist[0], self.tm.specialcommands):
self.tm.registry[task.no] = task
if m:
- m.send('add_task %d' % task.no)
+ m.send('add_listener %d' % task.no)
return task
def send(self, line):
@@ -1006,18 +1006,6 @@ class Interpreter(object):
else:
i.parser.print_usage()
- @classmethod
- def from_gen(cls, genseq, mode='p', monitors=()):
- "Returns an interpreter object able to run the tasks in the sequence"
- assert mode in 'pt', mode
- launcher = _TaskLauncher(genseq, mode)
- self = cls(launcher).__enter__()
- for mon in monitors:
- self.add_monitor(mon)
- for i in range(len(launcher.genlist)):
- self.submit('rungen %d' % (i + 1))
- return self
-
#################################### runp #####################################
class _TaskLauncher(object):
@@ -1034,12 +1022,18 @@ class _TaskLauncher(object):
for out in self.genlist[int(i) - 1]:
yield out
-def runp(genseq, mode='p', start=True):
+def runp(genseq, mode='p', monitors=(), start=True):
"""Run a sequence of generators in parallel. Mode can be 'p' (use processes)
or 't' (use threads). Return a list of running task objects. If start is
False, the tasks are only submitted and not automatically started.
"""
- inter = Interpreter.from_gen(genseq, mode)
+ assert mode in 'pt', mode
+ launcher = _TaskLauncher(genseq, mode)
+ inter = Interpreter(launcher).__enter__()
+ for mon in monitors: # must be added before submit
+ inter.add_monitor(mon)
+ for i in range(len(launcher.genlist)):
+ inter.submit('rungen %d' % (i + 1))
if start:
for task in inter.tasks():
task.run()