summaryrefslogtreecommitdiff
path: root/plac/plac_ext.py
diff options
context:
space:
mode:
Diffstat (limited to 'plac/plac_ext.py')
-rw-r--r--plac/plac_ext.py1063
1 files changed, 0 insertions, 1063 deletions
diff --git a/plac/plac_ext.py b/plac/plac_ext.py
deleted file mode 100644
index 35106ae..0000000
--- a/plac/plac_ext.py
+++ /dev/null
@@ -1,1063 +0,0 @@
-# this module requires Python 2.5+
-from __future__ import with_statement
-from contextlib import contextmanager
-from operator import attrgetter
-from gettext import gettext as _
-import imp, inspect, os, sys, cmd, shlex, subprocess
-import itertools, traceback, multiprocessing, signal, threading
-import plac_core
-
-############################# generic utils ################################
-
-@contextmanager
-def stdout(fileobj):
- "usage: with stdout(file('out.txt', 'a')): do_something()"
- orig_stdout = sys.stdout
- sys.stdout = fileobj
- try:
- yield
- finally:
- sys.stdout = orig_stdout
-
-def write(x):
- "Write str(x) on stdout and flush, no newline added"
- sys.stdout.write(str(x))
- sys.stdout.flush()
-
-def gen_val(value):
- "Return a generator object with a single element"
- yield value
-
-def gen_exc(etype, exc, tb):
- "Return a generator object raising an exception"
- raise etype, exc, tb
- yield
-
-def less(text):
- "Send a text to less via a pipe"
- # -c clear the screen before starting less
- po = subprocess.Popen(['less', '-c'], stdin=subprocess.PIPE)
- try:
- po.stdin.write(text)
- except IOError:
- pass
- po.stdin.close()
- po.wait()
-
-use_less = (sys.platform != 'win32') # unices
-
-class TerminatedProcess(Exception):
- pass
-
-def terminatedProcess(signum, frame):
- raise TerminatedProcess
-
-########################### readline support #############################
-
-def read_line(stdin, prompt=''):
- "Read a line from stdin, using readline when possible"
- if isinstance(stdin, ReadlineInput):
- return stdin.readline(prompt)
- else:
- write(prompt)
- return stdin.readline()
-
-def read_long_line(stdin, terminator):
- """
- Read multiple lines from stdin until the terminator character is found, then
- yield a single space-separated long line.
- """
- while True:
- lines = []
- while True:
- line = stdin.readline() # ends with \n
- if not line: # EOF
- return
- line = line.strip()
- if not line:
- continue
- elif line[-1] == terminator:
- lines.append(line[:-1])
- break
- else:
- lines.append(line)
- yield ' '.join(lines)
-
-class ReadlineInput(object):
- """
- An iterable with a .readline method reading from stdin.
- """
- def __init__(self, completions, case_sensitive=True, histfile=None):
- self.completions = completions
- self.case_sensitive = case_sensitive
- self.histfile = histfile
- if not case_sensitive:
- self.completions = map(str.upper, completions)
- import readline
- self.rl = readline
- readline.parse_and_bind("tab: complete")
- readline.set_completer(self.complete)
-
- def __enter__(self):
- self.old_completer = self.rl.get_completer()
- try:
- if self.histfile:
- self.rl.read_history_file(self.histfile)
- except IOError: # the first time
- pass
- return self
-
- def __exit__(self, etype, exc, tb):
- self.rl.set_completer(self.old_completer)
- if self.histfile:
- self.rl.write_history_file(self.histfile)
-
- def complete(self, kw, state):
- # state is 0, 1, 2, ... and increases by hitting TAB
- if not self.case_sensitive:
- kw = kw.upper()
- try:
- return [k for k in self.completions if k.startswith(kw)][state]
- except IndexError: # no completions
- return # exit
-
- def readline(self, prompt=''):
- try:
- return raw_input(prompt) + '\n'
- except EOFError:
- return ''
-
- def __iter__(self):
- return iter(self.readline, '')
-
-################### help functionality in plac interpreters ###################
-
-class HelpSummary(object):
- "Build the help summary consistently with the cmd module"
-
- @classmethod
- def add(cls, obj, specialcommands):
- p = plac_core.parser_from(obj)
- c = cmd.Cmd(stdout=cls())
- c.stdout.write('\n')
- c.print_topics('special commands',
- sorted(specialcommands), 15, 80)
- c.print_topics('custom commands',
- sorted(obj.syncommands), 15, 80)
- c.print_topics('commands run in external processes',
- sorted(obj.mpcommands), 15, 80)
- c.print_topics('threaded commands',
- sorted(obj.thcommands), 15, 80)
- p.helpsummary = unicode(c.stdout)
-
- def __init__(self):
- self._ls = []
-
- def write(self, s):
- self._ls.append(s)
-
- def __str__(self):
- return ''.join(self._ls)
-
-def format_help(self):
- "Attached to plac_core.ArgumentParser for plac interpreters"
- try:
- return self.helpsummary
- except AttributeError:
- return super(plac_core.ArgumentParser, self).format_help()
-plac_core.ArgumentParser.format_help = format_help
-
-def default_help(obj, cmd=None):
- "An utility for implementing the help functionality in plac interpreters"
- parser = plac_core.parser_from(obj)
- if cmd is None:
- yield parser.format_help()
- subp = parser.subparsers._name_parser_map.get(cmd)
- if subp is None:
- yield _('Unknown command %s' % cmd)
- else:
- yield subp.format_help()
-
-########################### import management ################################
-
-try:
- PLACDIRS = os.environ.get('PLACPATH', '.').split(':')
-except:
- raise ValueError(_('Ill-formed PLACPATH: got %PLACPATHs') % os.environ)
-
-def partial_call(factory, arglist):
- "Call a container factory with the arglist and return a plac object"
- a = plac_core.parser_from(factory).argspec
- if a.defaults or a.varargs or a.varkw:
- raise TypeError('Interpreter.call must be invoked on '
- 'factories with required arguments only')
- required_args = ', '.join(a.args)
- if required_args:
- required_args += ',' # trailing comma
- code = '''def makeobj(interact, %s *args):
- obj = factory(%s)
- obj._interact_ = interact
- obj._args_ = args
- return obj\n'''% (required_args, required_args)
- dic = dict(factory=factory)
- exec code in dic
- makeobj = dic['makeobj']
- makeobj.add_help = False
- if inspect.isclass(factory):
- makeobj.__annotations__ = getattr(
- factory.__init__, '__annotations__', {})
- else:
- makeobj.__annotations__ = getattr(
- factory, '__annotations__', {})
- makeobj.__annotations__['interact'] = (
- 'start interactive interpreter', 'flag', 'i')
- return plac_core.call(makeobj, arglist)
-
-def import_main(path, *args, **pconf):
- """
- An utility to import the main function of a plac tool. It also
- works with command container factories.
- """
- if ':' in path: # importing a factory
- path, factory_name = path.split(':')
- else: # importing the main function
- factory_name = None
- if not os.path.isabs(path): # relative path, look at PLACDIRS
- for placdir in PLACDIRS:
- fullpath = os.path.join(placdir, path)
- if os.path.exists(fullpath):
- break
- else: # no break
- raise ImportError(_('Cannot find %s' % path))
- else:
- fullpath = path
- name, ext = os.path.splitext(os.path.basename(fullpath))
- module = imp.load_module(name, open(fullpath), fullpath, (ext, 'U', 1))
- if factory_name:
- tool = partial_call(getattr(module, factory_name), args)
- else:
- tool = module.main
- # set the parser configuration
- plac_core.parser_from(tool, **pconf)
- return tool
-
-############################## Task classes ##############################
-
-# base class not instantiated directly
-class BaseTask(object):
- """
- A task is a wrapper over a generator object with signature
- Task(no, arglist, genobj), attributes
- .no
- .arglist
- .outlist
- .str
- .etype
- .exc
- .tb
- .status
- and methods .run and .kill.
- """
- STATES = ('SUBMITTED', 'RUNNING', 'TOBEKILLED', 'KILLED', 'FINISHED',
- 'ABORTED')
-
- def __init__(self, no, arglist, genobj):
- self.no = no
- self.arglist = arglist
- self._genobj = self._wrap(genobj)
- self.str, self.etype, self.exc, self.tb = '', None, None, None
- self.status = 'SUBMITTED'
- self.outlist = []
-
- def notify(self, msg):
- "Notifies the underlying monitor. To be implemented"
-
- def _wrap(self, genobj, stringify_tb=False):
- """
- Wrap the genobj into a generator managing the exceptions,
- populating the .outlist, setting the .status and yielding None.
- stringify_tb must be True if the traceback must be sent to a process.
- """
- self.status = 'RUNNING'
- try:
- for value in genobj:
- if self.status == 'TOBEKILLED': # exit from the loop
- raise GeneratorExit
- if value is not None: # add output
- self.outlist.append(value)
- self.notify(unicode(value))
- yield
- except (GeneratorExit, TerminatedProcess, KeyboardInterrupt):
- # soft termination
- self.status = 'KILLED'
- except: # unexpected exception
- self.etype, self.exc, tb = sys.exc_info()
- self.tb = ''.join(traceback.format_tb(tb)) if stringify_tb else tb
- self.status = 'ABORTED'
- else: # regular exit
- self.status = 'FINISHED'
- try:
- self.str = '\n'.join(map(unicode, self.outlist))
- except IndexError:
- self.str = 'no result'
-
- def run(self):
- "Run the inner generator"
- for none in self._genobj:
- pass
-
- def kill(self):
- "Set a TOBEKILLED status"
- self.status = 'TOBEKILLED'
-
- def wait(self):
- "Wait for the task to finish: to be overridden"
-
- @property
- def traceback(self):
- "Return the traceback as a (possibly empty) string"
- if self.tb is None:
- return ''
- elif isinstance(self.tb, basestring):
- return self.tb
- else:
- return ''.join(traceback.format_tb(self.tb))
-
- @property
- def result(self):
- self.wait()
- if self.exc:
- raise self.etype, self.exc, self.tb or None
- if not self.outlist:
- return None
- return self.outlist[-1]
-
- def __repr__(self):
- "String representation containing class name, number, arglist, status"
- return '<%s %d [%s] %s>' % (
- self.__class__.__name__, self.no,
- ' '.join(self.arglist), self.status)
-
-nulltask = BaseTask(0, [], ('skip' for dummy in (1,)))
-
-########################## synchronous tasks ###############################
-
-class SynTask(BaseTask):
- """
- Synchronous task running in the interpreter loop and displaying its
- output as soon as available.
- """
- def __str__(self):
- "Return the output string or the error message"
- if self.etype: # there was an error
- return '%s: %s' % (self.etype.__name__, self.exc)
- else:
- return '\n'.join(map(unicode, self.outlist))
-
-class ThreadedTask(BaseTask):
- """
- A task running in a separated thread.
- """
- def __init__(self, no, arglist, genobj):
- BaseTask.__init__(self, no, arglist, genobj)
- self.thread = threading.Thread(target=super(ThreadedTask, self).run)
-
- def run(self):
- "Run the task into a thread"
- self.thread.start()
-
- def wait(self):
- "Block until the thread ends"
- self.thread.join()
-
-######################### multiprocessing tasks ##########################
-
-def sharedattr(name, on_error):
- "Return a property to be attached to an MPTask"
- def get(self):
- try:
- return getattr(self.ns, name)
- except: # the process was killed or died hard
- return on_error
- def set(self, value):
- try:
- setattr(self.ns, name, value)
- except: # the process was killed or died hard
- pass
- return property(get, set)
-
-class MPTask(BaseTask):
- """
- A task running as an external process. The current implementation
- only works on Unix-like systems, where multiprocessing use forks.
- """
- str = sharedattr('str', '')
- etype = sharedattr('etype', None)
- exc = sharedattr('exc', None)
- tb = sharedattr('tb', None)
- status = sharedattr('status', 'ABORTED')
-
- @property
- def outlist(self):
- try:
- return self._outlist
- except: # the process died hard
- return []
-
- def notify(self, msg):
- self.man.send('notify_listener %d %r' % (self.no, msg))
-
- def __init__(self, no, arglist, genobj, manager):
- """
- The monitor has a .send method and a .man multiprocessing.Manager
- """
- self.no = no
- self.arglist = arglist
- self._genobj = self._wrap(genobj, stringify_tb=True)
- self.man = manager
- self._outlist = manager.mp.list()
- self.ns = manager.mp.Namespace()
- self.status = 'SUBMITTED'
- self.etype, self.exc, self.tb = None, None, None
- self.str = repr(self)
- self.proc = multiprocessing.Process(target=super(MPTask, self).run)
-
- def run(self):
- "Run the task into an external process"
- self.proc.start()
-
- def wait(self):
- "Block until the external process ends or is killed"
- self.proc.join()
-
- def kill(self):
- """Kill the process with a SIGTERM inducing a TerminatedProcess
- exception in the children"""
- self.proc.terminate()
-
-######################### Task Manager #######################
-
-class TaskManager(object):
- """
- Store the given commands into a task registry. Provides methods to
- manage the submitted tasks.
- """
- cmdprefix = '.'
- specialcommands = set(['.last_tb'])
-
- def __init__(self, obj):
- self.obj = obj
- self.registry = {} # {taskno : task}
- if obj.mpcommands or obj.thcommands:
- self.specialcommands.update(['.kill', '.list', '.output'])
- self.parser = plac_core.parser_from(obj)
- HelpSummary.add(obj, self.specialcommands)
- self.man = Manager() if obj.mpcommands else None
- signal.signal(signal.SIGTERM, terminatedProcess)
-
- def close(self):
- "Kill all the running tasks"
- for task in self.registry.itervalues():
- try:
- if task.status == 'RUNNING':
- task.kill()
- task.wait()
- except: # task killed, nothing to wait
- pass
- if self.man:
- self.man.stop()
-
- def _get_latest(self, taskno=-1, status=None):
- "Get the latest submitted task from the registry"
- assert taskno < 0, 'You must pass a negative number'
- if status:
- tasks = [t for t in self.registry.itervalues()
- if t.status == status]
- else:
- tasks = [t for t in self.registry.itervalues()]
- tasks.sort(key=attrgetter('no'))
- if len(tasks) >= abs(taskno):
- return tasks[taskno]
-
- ########################### special commands #########################
-
- @plac_core.annotations(
- taskno=('task to kill', 'positional', None, int))
- def kill(self, taskno=-1):
- 'kill the given task (-1 to kill the latest running task)'
- if taskno < 0:
- task = self._get_latest(taskno, status='RUNNING')
- if task is None:
- yield 'Nothing to kill'
- return
- elif not taskno in self.registry:
- yield 'Unknown task %d' % taskno
- return
- else:
- task = self.registry[taskno]
- if task.status in ('ABORTED', 'KILLED', 'FINISHED'):
- yield 'Already finished %s' % task
- return
- task.kill()
- yield task
-
- @plac_core.annotations(
- status=('', 'positional', None, str, BaseTask.STATES))
- def list(self, status='RUNNING'):
- 'list tasks with a given status'
- for task in self.registry.values():
- if task.status == status:
- yield task
-
- @plac_core.annotations(
- taskno=('task number', 'positional', None, int))
- def output(self, taskno=-1, fname=None):
- 'show the output of a given task (and optionally save it to a file)'
- if taskno < 0:
- task = self._get_latest(taskno)
- if task is None:
- yield 'Nothing to show'
- return
- elif taskno not in self.registry:
- yield 'Unknown task %d' % taskno
- return
- else:
- task = self.registry[taskno]
- outstr = '\n'.join(map(unicode, task.outlist))
- if fname:
- open(fname, 'w').write(outstr)
- yield 'saved output of %d into %s' % (taskno, fname); return
- yield task
- if len(task.outlist) > 20 and use_less:
- less(outstr)
- else:
- yield outstr
-
- @plac_core.annotations(
- taskno=('task number', 'positional', None, int))
- def last_tb(self, taskno=-1):
- "show the traceback of a given task, if any"
- task = self._get_latest(taskno)
- if task:
- yield task.traceback
- else:
- yield 'Nothing to show'
-
-########################### SyncProcess ##############################
-
-class Process(subprocess.Popen):
- "Start the interpreter specified by the params in a subprocess"
-
- def __init__(self, params):
- signal.signal(signal.SIGPIPE, signal.SIG_DFL)
- # to avoid broken pipe messages
- code = '''import plac, sys
-sys.argv[0] = '<%s>'
-plac.Interpreter(plac.import_main(*%s)).interact(prompt='i>\\n')
-''' % (params[0], params)
- subprocess.Popen.__init__(
- self, [sys.executable, '-u', '-c', code],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE)
- self.man = multiprocessing.Manager()
-
- def close(self):
- "Close stdin and stdout"
- self.stdin.close()
- self.stdout.close()
- self.man.shutdown()
-
- def recv(self): # char-by-char cannot work
- "Return the output of the subprocess, line-by-line until the prompt"
- lines = []
- while True:
- lines.append(self.stdout.readline())
- if lines[-1] == 'i>\n':
- out = ''.join(lines)
- return out[:-1] + ' ' # remove last newline
-
- def send(self, line):
- """Send a line (adding a newline) to the underlying subprocess
- and wait for the answer"""
- self.stdin.write(line + os.linesep)
- return self.recv()
-
-class Monitor(object):
- """
- Base monitor class with methods add_listener/del_listener/notify_listener
- and start/stop/schedule/slave.
- """
- commands = 'add_listener', 'del_listener', 'notify_listener'
- def __init__(self, name):
- self.name = name
- def add_listener(self, taskno):
- pass
- def del_listener(self, taskno):
- pass
- def notify_listener(self, taskno, msg):
- pass
- def start(self):
- pass
- def stop(self):
- pass
- def schedule(self, seconds, display, arg):
- pass
-
-import Queue
-
-class SlaveProcess(object):
- """
- Spawn a slave process reading from an input queue and displaying
- on a monitor object. Methods are start/send/stop.
- """
- def __init__(self, mon):
- self.mon= mon
- self.queue = multiprocessing.Queue()
- self.proc = multiprocessing.Process(None, self._run)
-
- def start(self):
- self.proc.start()
-
- def send(self, line):
- self.queue.put(line)
-
- def stop(self):
- self.queue.close()
- self.proc.terminate()
-
- def _sendline(self, i):
- "Send a line to the underlying monitor"
- try:
- line = self.queue.get_nowait()
- except Queue.Empty:
- pass
- else:
- i.send(line)
- self.mon.schedule(.1, self._sendline, i)
-
- def _run(self):
- with Interpreter(self.mon) as i:
- # .schedule() must be invoked inside the with block
- self.mon.schedule(.1, self._sendline, i)
- self.mon.run()
-
-class StartStopObject(object):
- started = False
- def start(self): pass
- def stop(self): pass
-
-class Manager(StartStopObject):
- """
- The plac Manager contains a multiprocessing.Manager and a set
- of slave monitor processes to which we can send commands. There
- is a manager for each interpreter with mpcommands.
- """
- def add(self, monitor):
- 'Add or replace a monitor in the registry'
- slave = SlaveProcess(monitor)
- name = slave.name = monitor.name
- self.registry[name] = slave
-
- def delete(self, name):
- 'Remove a named monitor from the registry'
- del self.registry[name]
-
- def __init__(self):
- self.registry = {}
- self.started = False
- self.mp = None
-
- # can be called more than once
- def start(self):
- if self.mp is None:
- self.mp = multiprocessing.Manager()
- for slave in self.registry.itervalues():
- slave.start()
- self.started = True
-
- def stop(self):
- for slave in self.registry.itervalues():
- slave.stop()
- if self.mp:
- self.mp.shutdown()
- self.mp = None
- self.started = False
-
- def send(self, line):
- for slave in self.registry.itervalues():
- slave.send(line)
-
-########################## plac server ##############################
-
-import asyncore, asynchat, socket
-
-class _AsynHandler(asynchat.async_chat):
- "asynchat handler starting a new interpreter loop for each connection"
-
- terminator = '\r\n' # the standard one for telnet
- prompt = 'i> '
-
- def __init__(self, socket, interpreter):
- asynchat.async_chat.__init__(self, socket)
- self.set_terminator(self.terminator)
- self.i = interpreter
- self.i.__enter__()
- self.data = []
- self.write(self.prompt)
-
- def write(self, data, *args):
- "Push a string back to the client"
- if args:
- data %= args
- if data.endswith('\n') and not data.endswith(self.terminator):
- data = data[:-1] + self.terminator # fix newlines
- self.push(data)
-
- def collect_incoming_data(self, data):
- "Collect one character at the time"
- self.data.append(data)
-
- def found_terminator(self):
- "Put in the queue the line received from the client"
- line = ''.join(self.data)
- self.log('Received line %r from %s' % (line, self.addr))
- if line == 'EOF':
- self.i.__exit__(None, None, None)
- self.handle_close()
- else:
- task = self.i.submit(line)
- task.run() # synchronous or not
- if task.etype: # manage exception
- error = '%s: %s\nReceived: %s' % (
- task.etype.__name__, task.exc, ' '.join(task.arglist))
- self.log_info(task.traceback + error) # on the server
- self.write(error + self.terminator) # back to the client
- else: # no exception
- self.write(task.str + self.terminator)
- self.data = []
- self.write(self.prompt)
-
-class _AsynServer(asyncore.dispatcher):
- "asyncore-based server spawning AsynHandlers"
-
- def __init__(self, interpreter, newhandler, port, listen=5):
- self.interpreter = interpreter
- self.newhandler = newhandler
- self.port = port
- asyncore.dispatcher.__init__(self)
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.bind(('', port))
- self.listen(listen)
-
- def handle_accept(self):
- clientsock, clientaddr = self.accept()
- self.log('Connected from %s' % str(clientaddr))
- i = self.interpreter.__class__(self.interpreter.obj) # new interpreter
- self.newhandler(clientsock, i) # spawn a new handler
-
-########################### the Interpreter #############################
-
-class Interpreter(object):
- """
- A context manager with a .send method and a few utility methods:
- execute, test and doctest.
- """
- def __init__(self, obj, commentchar='#', split=shlex.split):
- self.obj = obj
- try:
- self.name = obj.__module__
- except AttributeError:
- self.name = 'plac'
- self.commentchar = commentchar
- self.split = split
- self._set_commands(obj)
- self.tm = TaskManager(obj)
- self.man = self.tm.man
- self.parser = plac_core.parser_from(obj, prog='')
- if self.commands:
- self.commands.update(self.tm.specialcommands)
- self.parser.addsubcommands(
- self.tm.specialcommands, self.tm, title='special commands')
- if obj.mpcommands:
- self.parser.addsubcommands(
- obj.mpcommands, obj, title='commands run in external processes')
- if obj.thcommands:
- self.parser.addsubcommands(
- obj.thcommands, obj, title='threaded commands')
- self.parser.error = lambda msg: sys.exit(msg) # patch the parser
- self._interpreter = None
-
- def _set_commands(self, obj):
- "Make sure obj has the right command attributes as Python sets"
- for attrname in ('commands', 'syncommands', 'mpcommands', 'thcommands'):
- try:
- sequence = getattr(obj, attrname)
- except AttributeError:
- sequence = []
- if not isinstance(sequence, set):
- sequence = set(sequence)
- setattr(obj, attrname, sequence)
- obj.syncommands.update(obj.commands)
- self.commands = obj.commands
- self.commands.update(obj.syncommands)
- self.commands.update(obj.mpcommands)
- self.commands.update(obj.thcommands)
-
- def __enter__(self):
- "Start the inner interpreter loop"
- self._interpreter = self._make_interpreter()
- self._interpreter.send(None)
- return self
-
- def __exit__(self, exctype, exc, tb):
- "Close the inner interpreter and the task manager"
- self.close(exctype, exc, tb)
-
- def submit(self, line):
- "Send a line to the underlying interpreter and return a task object"
- if self._interpreter is None:
- raise RuntimeError(_('%r not initialized: probably you forgot to '
- 'use the with statement') % self)
- if isinstance(line, basestring):
- arglist = self.split(line, self.commentchar)
- else: # expects a list of strings
- arglist = line
- if not arglist:
- return nulltask
- m = self.tm.man # manager
- if m and not m.started:
- m.start()
- task = self._interpreter.send(arglist) # nonblocking
- if not plac_core._match_cmd(arglist[0], self.tm.specialcommands):
- self.tm.registry[task.no] = task
- if m:
- m.send('add_listener %d' % task.no)
- return task
-
- def send(self, line):
- "Send a line to the underlying interpreter and return the finished task"
- task = self.submit(line)
- BaseTask.run(task) # blocking
- return task
-
- def tasks(self):
- "The full lists of the submitted tasks"
- return self.tm.registry.values()
-
- def close(self, exctype=None, exc=None, tb=None):
- "Can be called to close the interpreter prematurely"
- self.tm.close()
- if exctype is not None:
- self._interpreter.throw(exctype, exc, tb)
- else:
- self._interpreter.close()
-
- def _make_interpreter(self):
- "The interpreter main loop, from lists of arguments to task objects"
- enter = getattr(self.obj, '__enter__', lambda : None)
- exit = getattr(self.obj, '__exit__', lambda et, ex, tb: None)
- enter()
- task = None
- try:
- for no in itertools.count(1):
- arglist = yield task
- try:
- cmd, result = self.parser.consume(arglist)
- except SystemExit: # for invalid commands
- task = SynTask(no, arglist, iter([]))
- continue
- except: # anything else
- task = SynTask(no, arglist, gen_exc(*sys.exc_info()))
- continue
- if not plac_core.iterable(result): # atomic result
- task = SynTask(no, arglist, gen_val(result))
- elif cmd in self.obj.mpcommands:
- task = MPTask(no, arglist, result, self.tm.man)
- elif cmd in self.obj.thcommands:
- task = ThreadedTask(no, arglist, result)
- else: # blocking task
- task = SynTask(no, arglist, result)
- except GeneratorExit: # regular exit
- exit(None, None, None)
- except: # exceptional exit
- exit(*sys.exc_info())
- raise
-
- def check(self, given_input, expected_output):
- "Make sure you get the expected_output from the given_input"
- output = self.send(given_input).str # blocking
- ok = (output == expected_output)
- if not ok:
- # the message here is not internationalized on purpose
- msg = 'input: %s\noutput: %s\nexpected: %s' % (
- given_input, output, expected_output)
- raise AssertionError(msg)
-
- def _parse_doctest(self, lineiter):
- "Returns the lines of input, the lines of output, and the line number"
- lines = [line.strip() for line in lineiter]
- inputs = []
- positions = []
- for i, line in enumerate(lines):
- if line.startswith('i> '):
- inputs.append(line[3:])
- positions.append(i)
- positions.append(len(lines) + 1) # last position
- outputs = []
- for i, start in enumerate(positions[:-1]):
- end = positions[i + 1]
- outputs.append('\n'.join(lines[start+1:end]))
- return zip(inputs, outputs, positions)
-
- def doctest(self, lineiter, verbose=False):
- """
- Parse a text containing doctests in a context and tests of all them.
- Raise an error even if a single doctest if broken. Use this for
- sequential tests which are logically grouped.
- """
- with self:
- for input, output, no in self._parse_doctest(lineiter):
- if verbose:
- write('i> %s\n' % input)
- write('-> %s\n' % output)
- task = self.send(input) # blocking
- if not str(task) == output:
- msg = 'line %d: input: %s\noutput: %s\nexpected: %s\n' % (
- no + 1, input, task, output)
- write(msg)
- if task.exc:
- raise task.etype, task.exc, task.tb
-
- def execute(self, lineiter, verbose=False):
- "Execute a lineiter of commands in a context and print the output"
- with self:
- for line in lineiter:
- if verbose:
- write('i> ' + line)
- task = self.send(line) # finished task
- if task.etype: # there was an error
- raise task.etype, task.exc, task.tb
- write('%s\n' % task.str)
-
- def multiline(self, stdin=sys.stdin, terminator=';', verbose=False):
- "The multiline mode is especially suited for usage with emacs"
- with self:
- for line in read_long_line(stdin, terminator):
- task = self.submit(line)
- task.run()
- write('%s\n' % task.str)
- if verbose and task.traceback:
- write(task.traceback)
-
- def interact(self, stdin=sys.stdin, prompt='i> ', verbose=False):
- "Starts an interactive command loop reading commands from the consolle"
- try:
- import readline
- readline_present = True
- except ImportError:
- readline_present = False
- if stdin is sys.stdin and readline_present: # use readline
- histfile = os.path.expanduser('~/.%s.history' % self.name)
- completions = list(self.commands) + ['help']
- self.stdin = ReadlineInput(completions, histfile=histfile)
- else:
- self.stdin = stdin
- self.prompt = prompt
- self.verbose = verbose
- intro = self.obj.__doc__ or ''
- write(intro + '\n')
- with self:
- if self.stdin is sys.stdin: # do not close stdin automatically
- self._manage_input()
- else:
- with self.stdin: # close stdin automatically
- self._manage_input()
-
- def _manage_input(self):
- "Convert input lines into task which are then executed"
- for line in iter(lambda : read_line(self.stdin, self.prompt), ''):
- line = line.strip()
- if not line:
- continue
- task = self.submit(line)
- task.run() # synchronous or not
- write(str(task) + '\n')
- if self.verbose and task.etype:
- write(task.traceback)
-
- def start_server(self, port=2199, **kw):
- """Starts an asyncore server reading commands for clients and opening
- a new interpreter for each connection."""
- _AsynServer(self, _AsynHandler, port) # register the server
- try:
- asyncore.loop(**kw)
- except (KeyboardInterrupt, TerminatedProcess):
- pass
- finally:
- asyncore.close_all()
-
- def add_monitor(self, mon):
- self.man.add(mon)
-
- def del_monitor(self, name):
- self.man.delete(name)
-
- @classmethod
- def call(cls, factory, arglist=sys.argv[1:],
- commentchar='#', split=shlex.split,
- stdin=sys.stdin, prompt='i> ', verbose=False):
- """
- Call a container factory with the arglist and instantiate an
- interpreter object. If there are remaining arguments, send them to the
- interpreter, else start an interactive session.
- """
- obj = partial_call(factory, arglist)
- if not hasattr(obj, 'help'):
- # help is recognized as an alias for --help
- aliases = dict(help='--help')
- plac_core.parser_from(obj).alias = lambda a: aliases.get(a, a)
- i = cls(obj, commentchar, split)
- if i.obj._args_:
- with i:
- task = i.send(i.obj._args_) # synchronous
- if task.exc:
- raise task.etype, task.exc, task.tb
- out = str(task)
- if out:
- print(out)
- elif i.obj._interact_:
- i.interact(stdin, prompt, verbose)
- else:
- i.parser.print_usage()
-
-#################################### runp #####################################
-
-class _TaskLauncher(object):
- "Helper for runp"
-
- def __init__(self, genseq, mode):
- if mode == 'p':
- self.mpcommands = ['rungen']
- else:
- self.thcommands = ['rungen']
- self.genlist = list(genseq)
-
- def rungen(self, i):
- for out in self.genlist[int(i) - 1]:
- yield out
-
-def runp(genseq, mode='p', monitors=(), start=True):
- """Run a sequence of generators in parallel. Mode can be 'p' (use processes)
- or 't' (use threads). Return a list of running task objects. If start is
- False, the tasks are only submitted and not automatically started.
- """
- assert mode in 'pt', mode
- launcher = _TaskLauncher(genseq, mode)
- inter = Interpreter(launcher).__enter__()
- for mon in monitors: # must be added before submit
- inter.add_monitor(mon)
- for i in range(len(launcher.genlist)):
- inter.submit('rungen %d' % (i + 1))
- if start:
- for task in inter.tasks():
- task.run()
- return inter.tasks()