summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichele Simionato <michele.simionato@gmail.com>2011-01-26 09:29:53 +0100
committerMichele Simionato <michele.simionato@gmail.com>2011-01-26 09:29:53 +0100
commitefbbee39f98003078d1d2c7856e2f2d502be0d21 (patch)
treee286a472a5d5c2e28f34ab3e99b90f6ce8dba3b6
parent88ace13b4700d03ed4437e5e727baff8e4f676bf (diff)
downloadmicheles-efbbee39f98003078d1d2c7856e2f2d502be0d21.tar.gz
Started work on the monitor framework for plac (and TkMonitor)
-rw-r--r--plac/doc/test_runp.py26
-rw-r--r--plac/doc/test_server.py18
-rw-r--r--plac/plac.py4
-rw-r--r--plac/plac_ext.py239
-rw-r--r--plac/setup.py3
5 files changed, 233 insertions, 57 deletions
diff --git a/plac/doc/test_runp.py b/plac/doc/test_runp.py
new file mode 100644
index 0000000..65980f8
--- /dev/null
+++ b/plac/doc/test_runp.py
@@ -0,0 +1,26 @@
+"""
+This test should work on Linux if you have both Tkinter installed.
+"""
+
+from __future__ import with_statement
+import plac, plac_tk, time
+
+def gen(n):
+ for i in range(n):
+ yield str(i)
+ time.sleep(.1)
+
+def test():
+ tasks = plac.runp([gen(3), gen(5), gen(10)])
+ for t in tasks:
+ t.result
+
+def test_tkmonitor():
+ mon = plac_tk.TkMonitor('tkmon')
+ i = plac.Interpreter.from_gen([gen(3), gen(5), gen(10)], 'p', [mon])
+ with i:
+ for t in i.tasks():
+ t.run()
+ for t in i.tasks():
+ t.result
+ i.man.stop()
diff --git a/plac/doc/test_server.py b/plac/doc/test_server.py
index a9904ee..81214ee 100644
--- a/plac/doc/test_server.py
+++ b/plac/doc/test_server.py
@@ -14,22 +14,26 @@ wrong command
showall
''']
-def client_send(commands, port):
+def telnet(commands, port):
time.sleep(.5) # wait a bit for the server to start
po = subprocess.Popen(['telnet', 'localhost', str(port)],
stdin=subprocess.PIPE)
- for cmd in commands.splitlines():
- po.stdin.write(cmd + '\n')
- time.sleep(.1) # wait a bit for the server to answer
+ try:
+ for cmd in commands.splitlines():
+ po.stdin.write(cmd + '\n')
+ time.sleep(.1) # wait a bit for the server to answer
+ finally:
+ po.stdin.close()
def test():
port = random.choice(range(2000, 20000))
clients = []
for cmds in COMMANDS:
- cl = multiprocessing.Process(target=client_send, args=(cmds, port))
+ cl = multiprocessing.Process(target=telnet, args=(cmds, port))
clients.append(cl)
cl.start()
- i.stop_server(wait=1)
- i.start_server(port, timeout=.1)
+
+ i.start_server(port, timeout=.5)
for cl in clients:
cl.join()
+ i.stop_server()
diff --git a/plac/plac.py b/plac/plac.py
index ed6d3a9..8296500 100644
--- a/plac/plac.py
+++ b/plac/plac.py
@@ -27,9 +27,9 @@
See doc/plac.pdf, doc/plac_adv.pdf for the documentation.
"""
-__version__ = '0.7.6'
+__version__ = '0.8.0'
from plac_core import *
if sys.version >= '2.5':
- from plac_ext import Interpreter, import_main, ReadlineInput, stdout, runp
+ from plac_ext import Interpreter, import_main, ReadlineInput, stdout, runp, Manager
diff --git a/plac/plac_ext.py b/plac/plac_ext.py
index 6678477..a7596c3 100644
--- a/plac/plac_ext.py
+++ b/plac/plac_ext.py
@@ -220,7 +220,10 @@ class BaseTask(object):
self.str, self.etype, self.exc, self.tb = '', None, None, None
self.status = 'SUBMITTED'
self.outlist = []
-
+
+ def notify(self, msg):
+ "Notifies the underlying monitor. To be implemented"
+
def _wrap(self, genobj, stringify_tb=False):
"""
Wrap the genobj into a generator managing the exceptions,
@@ -234,6 +237,7 @@ class BaseTask(object):
raise GeneratorExit
if value is not None: # add output
self.outlist.append(value)
+ self.notify(str(value))
yield
except (GeneratorExit, TerminatedProcess, KeyboardInterrupt):
# soft termination
@@ -352,13 +356,19 @@ class MPTask(BaseTask):
except: # the process died hard
return []
- def __init__(self, no, arglist, genobj, mp_manager):
+ def notify(self, msg):
+ self.man.send('notify_task %d %r' % (self.no, msg))
+
+ def __init__(self, no, arglist, genobj, manager):
+ """
+ The monitor has a .send method and a .man multiprocessing.Manager
+ """
self.no = no
self.arglist = arglist
self._genobj = self._wrap(genobj, stringify_tb=True)
- self.mp_manager = mp_manager
- self._outlist = self.mp_manager.list()
- self.ns = self.mp_manager.Namespace()
+ self.man = manager
+ self._outlist = manager.mp.list()
+ self.ns = manager.mp.Namespace()
self.status = 'SUBMITTED'
self.etype, self.exc, self.tb = None, None, None
self.str = repr(self)
@@ -381,6 +391,7 @@ class MPTask(BaseTask):
class HelpSummary(object):
"Build the help summary consistently with the cmd module"
+
@classmethod
def make(cls, obj, specialcommands):
c = cmd.Cmd(stdout=cls())
@@ -394,10 +405,13 @@ class HelpSummary(object):
c.print_topics('threaded commands',
sorted(obj.thcommands), 15, 80)
return c.stdout
+
def __init__(self):
self._ls = []
+
def write(self, s):
self._ls.append(s)
+
def __str__(self):
return ''.join(self._ls)
@@ -415,7 +429,7 @@ class TaskManager(object):
if obj.mpcommands or obj.thcommands:
self.specialcommands.update(['.kill', '.list', '.output'])
self.helpsummary = HelpSummary.make(obj, self.specialcommands)
- self.mp_manager = multiprocessing.Manager() if obj.mpcommands else None
+ self.man = Manager() if obj.mpcommands else None
signal.signal(signal.SIGTERM, terminatedProcess)
def close(self):
@@ -427,8 +441,8 @@ class TaskManager(object):
task.wait()
except: # task killed, nothing to wait
pass
- if self.mp_manager:
- self.mp_manager.shutdown()
+ if self.man:
+ self.man.stop()
def _get_latest(self, taskno=-1, status=None):
"Get the latest submitted task from the registry"
@@ -519,17 +533,22 @@ class Process(subprocess.Popen):
"Start the interpreter specified by the params in a subprocess"
def __init__(self, params):
- code = '''import plac
+ signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+ # to avoid broken pipe messages
+ code = '''import plac, sys
+sys.argv[0] = '<%s>'
plac.Interpreter(plac.import_main(*%s)).interact(prompt='i>\\n')
-''' % params
+''' % (params[0], params)
subprocess.Popen.__init__(
self, [sys.executable, '-u', '-c', code],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ self.man = multiprocessing.Manager()
def close(self):
"Close stdin and stdout"
self.stdin.close()
self.stdout.close()
+ self.man.shutdown()
def recv(self): # char-by-char cannot work
"Return the output of the subprocess, line-by-line until the prompt"
@@ -546,6 +565,111 @@ plac.Interpreter(plac.import_main(*%s)).interact(prompt='i>\\n')
self.stdin.write(line + os.linesep)
return self.recv()
+class Monitor(object):
+ """
+ Base monitor class with methods add_task/del_task/notify_task
+ and start/stop/schedule/slave.
+ """
+ commands = 'add_task', 'del_task', 'notify_task'
+ def __init__(self, name):
+ self.name = name
+ def add_task(self, taskno):
+ pass
+ def del_task(self, taskno):
+ pass
+ def notify_task(self, taskno, msg):
+ pass
+ def start(self):
+ pass
+ def stop(self):
+ pass
+ def schedule(self, seconds, display, arg):
+ pass
+
+import Queue
+
+class SlaveProcess(object):
+ """
+ Spawn a slave process reading from an input queue and displaying
+ on a monitor object. Methods are start/send/stop.
+ """
+ def __init__(self, mon):
+ self.mon= mon
+ self.queue = multiprocessing.Queue()
+ self.proc = multiprocessing.Process(None, self._run)
+
+ def start(self):
+ self.proc.start()
+
+ def send(self, line):
+ self.queue.put(line)
+
+ def stop(self):
+ self.queue.close()
+ self.proc.terminate()
+
+ def _sendline(self, i):
+ "Send a line to the underlying monitor"
+ try:
+ line = self.queue.get_nowait()
+ except Queue.Empty:
+ pass
+ else:
+ i.send(line)
+ self.mon.schedule(.1, self._sendline, i)
+
+ def _run(self):
+ with Interpreter(self.mon) as i:
+ # .schedule() must be invoked inside the with block
+ self.mon.schedule(.1, self._sendline, i)
+ self.mon.start()
+
+class StartStopObject(object):
+ started = False
+ def start(self): pass
+ def stop(self): pass
+
+class Manager(StartStopObject):
+ """
+ The plac Manager contains a multiprocessing.Manager and a set
+ of slave monitor processes to which we can send commands. There
+ is a manager for each interpreter with mpcommands.
+ """
+ def add(self, monitor):
+ 'Add or replace a monitor in the registry'
+ slave = SlaveProcess(monitor)
+ name = slave.name = monitor.name
+ self.registry[name] = slave
+
+ def delete(self, name):
+ 'Remove a named monitor from the registry'
+ del self.registry[name]
+
+ def __init__(self):
+ self.registry = {}
+ self.started = False
+ self.mp = None
+
+ # can be called more than once
+ def start(self):
+ if self.mp is None:
+ self.mp = multiprocessing.Manager()
+ for slave in self.registry.itervalues():
+ slave.start()
+ self.started = True
+
+ def stop(self):
+ for slave in self.registry.itervalues():
+ slave.stop()
+ if self.mp:
+ self.mp.shutdown()
+ self.mp = None
+ self.started = False
+
+ def send(self, line):
+ for slave in self.registry.itervalues():
+ slave.send(line)
+
########################## plac server ##############################
import asyncore, asynchat, socket
@@ -621,29 +745,6 @@ class Interpreter(object):
A context manager with a .send method and a few utility methods:
execute, test and doctest.
"""
-
- @classmethod
- def call(cls, factory, arglist=sys.argv[1:],
- commentchar='#', split=shlex.split,
- stdin=sys.stdin, prompt='i> ', verbose=False):
- """
- Call a container factory with the arglist and instantiate an
- interpreter object. If there are remaining arguments, send them to the
- interpreter, else start an interactive session.
- """
- obj = partial_call(factory, arglist)
- i = cls(obj, commentchar, split)
- if i.obj._args_:
- with i:
- task = i.send(i.obj._args_) # synchronous
- if task.exc:
- raise task.etype, task.exc, task.tb
- print(task)
- elif i.obj._interact_:
- i.interact(stdin, prompt, verbose)
- else:
- i.parser.print_usage()
-
def __init__(self, obj, commentchar='#', split=shlex.split):
self.obj = obj
try:
@@ -654,17 +755,18 @@ class Interpreter(object):
self.split = split
self._set_commands(obj)
self.tm = TaskManager(obj)
+ self.man = self.tm.man
self.parser = plac_core.parser_from(obj, prog='', add_help=False)
if self.commands:
self.commands.update(self.tm.specialcommands)
- self.parser.addsubcommands(self.tm.specialcommands, self.tm,
- title='special commands')
+ self.parser.addsubcommands(
+ self.tm.specialcommands, self.tm, title='special commands')
if obj.mpcommands:
- self.parser.addsubcommands(obj.mpcommands, obj,
- title='commands run in external processes')
+ self.parser.addsubcommands(
+ obj.mpcommands, obj, title='commands run in external processes')
if obj.thcommands:
- self.parser.addsubcommands(obj.thcommands, obj,
- title='threaded commands')
+ self.parser.addsubcommands(
+ obj.thcommands, obj, title='threaded commands')
self.parser.error = lambda msg: sys.exit(msg) # patch the parser
self._interpreter = None
@@ -705,9 +807,14 @@ class Interpreter(object):
arglist = line
if not arglist:
return nulltask
+ m = self.tm.man # manager
+ if m and not m.started:
+ m.start()
task = self._interpreter.send(arglist) # nonblocking
if not plac_core._match_cmd(arglist[0], self.tm.specialcommands):
self.tm.registry[task.no] = task
+ if m:
+ m.send('add_task %d' % task.no)
return task
def send(self, line):
@@ -745,7 +852,7 @@ class Interpreter(object):
if not plac_core.iterable(result): # atomic result
task = SynTask(no, arglist, gen_val(result))
elif cmd in self.obj.mpcommands:
- task = MPTask(no, arglist, result, self.tm.mp_manager)
+ task = MPTask(no, arglist, result, self.tm.man)
elif cmd in self.obj.thcommands:
task = ThreadedTask(no, arglist, result)
else: # blocking task
@@ -867,20 +974,62 @@ class Interpreter(object):
finally:
asyncore.close_all()
- def stop_server(self, wait=0.0):
+ def stop_server(self, after=0.0):
"Stops the asyncore server, possibly after a given number of seconds"
- threading.Timer(wait, asyncore.socket_map.clear).start()
+ threading.Timer(after, asyncore.socket_map.clear).start()
+
+ def add_monitor(self, mon):
+ self.man.add(mon)
+
+ def del_monitor(self, name):
+ self.man.delete(name)
+
+ @classmethod
+ def call(cls, factory, arglist=sys.argv[1:],
+ commentchar='#', split=shlex.split,
+ stdin=sys.stdin, prompt='i> ', verbose=False):
+ """
+ Call a container factory with the arglist and instantiate an
+ interpreter object. If there are remaining arguments, send them to the
+ interpreter, else start an interactive session.
+ """
+ obj = partial_call(factory, arglist)
+ i = cls(obj, commentchar, split)
+ if i.obj._args_:
+ with i:
+ task = i.send(i.obj._args_) # synchronous
+ if task.exc:
+ raise task.etype, task.exc, task.tb
+ print(task)
+ elif i.obj._interact_:
+ i.interact(stdin, prompt, verbose)
+ else:
+ i.parser.print_usage()
+
+ @classmethod
+ def from_gen(cls, genseq, mode='p', monitors=()):
+ "Returns an interpreter object able to run the tasks in the sequence"
+ assert mode in 'pt', mode
+ launcher = _TaskLauncher(genseq, mode)
+ self = cls(launcher).__enter__()
+ for mon in monitors:
+ self.add_monitor(mon)
+ for i in range(len(launcher.genlist)):
+ self.submit('rungen %d' % (i + 1))
+ return self
#################################### runp #####################################
class _TaskLauncher(object):
"Helper for runp"
+
def __init__(self, genseq, mode):
if mode == 'p':
self.mpcommands = ['rungen']
else:
self.thcommands = ['rungen']
self.genlist = list(genseq)
+
def rungen(self, i):
for out in self.genlist[int(i) - 1]:
yield out
@@ -890,11 +1039,7 @@ def runp(genseq, mode='p', start=True):
or 't' (use threads). Return a list of running task objects. If start is
False, the tasks are only submitted and not automatically started.
"""
- assert mode in 'pt', mode
- launcher = _TaskLauncher(genseq, mode)
- inter = Interpreter(launcher).__enter__()
- for i in range(len(launcher.genlist)):
- inter.submit('rungen %d' % (i + 1))
+ inter = Interpreter.from_gen(genseq, mode)
if start:
for task in inter.tasks():
task.run()
diff --git a/plac/setup.py b/plac/setup.py
index aafe43d..006786a 100644
--- a/plac/setup.py
+++ b/plac/setup.py
@@ -32,7 +32,7 @@ if __name__ == '__main__':
author_email='michele.simionato@gmail.com',
url='http://pypi.python.org/pypi/plac',
license="BSD License",
- py_modules = ['plac_core', 'plac_ext', 'plac'],
+ py_modules = ['plac_core', 'plac_ext', 'plac_tk', 'plac'],
scripts = ['plac_runner.py'],
install_requires=require('argparse', 'multiprocessing'),
use_2to3=True,
@@ -44,6 +44,7 @@ if __name__ == '__main__':
'Natural Language :: English',
'Operating System :: OS Independent',
'Programming Language :: Python',
+ 'Programming Language :: Python :: 3',
'Topic :: Software Development :: Libraries',
'Topic :: Utilities'],
zip_safe=False)