diff options
author | Joshua Harlow <harlowja@gmail.com> | 2015-06-08 22:16:47 -0700 |
---|---|---|
committer | Min Pae <sputnik13@gmail.com> | 2015-07-08 14:32:34 -0700 |
commit | 63c67302488bea87e2e2870dc83e73aacddeaa34 (patch) | |
tree | 25fba8ab22a54948bc62771bb16e26622af9a19f /taskflow/conductors | |
parent | 40d19c7696f1e0b7d75eacbd271974ee9155c019 (diff) | |
download | taskflow-63c67302488bea87e2e2870dc83e73aacddeaa34.tar.gz |
Notify on the individual engine steps
When a conductor is running it is quite useful to be able to how
long each engine step takes. To enable this information being
output, add a notifier to the base conductor and use it in the
blocking conductor to emit events around engine activities. This
makes it possible to track the timing (or other information that
can be gathered from these events) in a non-intrusive manner.
In the `99_bottles.py` demo we also now use this to be able to
easily see what the conductor is actively doing (without having
to enable the more verbose DEBUG level logging).
Change-Id: Ifd8ff38f82fc8135fe5fec4c8e41f0e06f4fdee3
Diffstat (limited to 'taskflow/conductors')
-rw-r--r-- | taskflow/conductors/backends/impl_blocking.py | 28 | ||||
-rw-r--r-- | taskflow/conductors/base.py | 13 |
2 files changed, 39 insertions, 2 deletions
diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py index 945f591..a87eacd 100644 --- a/taskflow/conductors/backends/impl_blocking.py +++ b/taskflow/conductors/backends/impl_blocking.py @@ -56,6 +56,16 @@ class BlockingConductor(base.Conductor): upon the jobboard capabilities to automatically abandon these jobs. """ + START_FINISH_EVENTS_EMITTED = tuple([ + 'compilation', 'preparation', + 'validation', 'running', + ]) + """Events will be emitted for the start and finish of each engine + activity defined above, the actual event name that can be registered + to subscribe to will be ``${event}_start`` and ``${event}_end`` where + the ``${event}`` in this pseudo-variable will be one of these events. + """ + def __init__(self, name, jobboard, persistence=None, engine=None, engine_options=None, wait_timeout=None): @@ -105,10 +115,24 @@ class BlockingConductor(base.Conductor): with ExitStack() as stack: for listener in listeners: stack.enter_context(listener) - LOG.debug("Dispatching engine %s for job: %s", engine, job) + LOG.debug("Dispatching engine for job '%s'", job) consume = True try: - engine.run() + for stage_func, event_name in [(engine.compile, 'compilation'), + (engine.prepare, 'preparation'), + (engine.validate, 'validation'), + (engine.run, 'running')]: + self._notifier.notify("%s_start" % event_name, { + 'job': job, + 'engine': engine, + 'conductor': self, + }) + stage_func() + self._notifier.notify("%s_end" % event_name, { + 'job': job, + 'engine': engine, + 'conductor': self, + }) except excp.WrappedFailure as e: if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)): consume = False diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index 6e46fff..6942423 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -20,6 +20,7 @@ import six from taskflow import engines from taskflow import exceptions as excp +from taskflow.types import notifier @six.add_metaclass(abc.ABCMeta) @@ -45,6 +46,18 @@ class Conductor(object): self._engine_options = engine_options.copy() self._persistence = persistence self._lock = threading.RLock() + self._notifier = notifier.Notifier() + + @property + def notifier(self): + """The conductor actions (or other state changes) notifier. + + NOTE(harlowja): different conductor implementations may emit + different events + event details at different times, so refer to your + conductor documentation to know exactly what can and what can not be + subscribed to. + """ + return self._notifier def _flow_detail_from_job(self, job): """Extracts a flow detail from a job (via some manner). |