summaryrefslogtreecommitdiff
path: root/taskflow/conductors
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2015-06-08 22:16:47 -0700
committerMin Pae <sputnik13@gmail.com>2015-07-08 14:32:34 -0700
commit63c67302488bea87e2e2870dc83e73aacddeaa34 (patch)
tree25fba8ab22a54948bc62771bb16e26622af9a19f /taskflow/conductors
parent40d19c7696f1e0b7d75eacbd271974ee9155c019 (diff)
downloadtaskflow-63c67302488bea87e2e2870dc83e73aacddeaa34.tar.gz
Notify on the individual engine steps
When a conductor is running it is quite useful to be able to how long each engine step takes. To enable this information being output, add a notifier to the base conductor and use it in the blocking conductor to emit events around engine activities. This makes it possible to track the timing (or other information that can be gathered from these events) in a non-intrusive manner. In the `99_bottles.py` demo we also now use this to be able to easily see what the conductor is actively doing (without having to enable the more verbose DEBUG level logging). Change-Id: Ifd8ff38f82fc8135fe5fec4c8e41f0e06f4fdee3
Diffstat (limited to 'taskflow/conductors')
-rw-r--r--taskflow/conductors/backends/impl_blocking.py28
-rw-r--r--taskflow/conductors/base.py13
2 files changed, 39 insertions, 2 deletions
diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py
index 945f591..a87eacd 100644
--- a/taskflow/conductors/backends/impl_blocking.py
+++ b/taskflow/conductors/backends/impl_blocking.py
@@ -56,6 +56,16 @@ class BlockingConductor(base.Conductor):
upon the jobboard capabilities to automatically abandon these jobs.
"""
+ START_FINISH_EVENTS_EMITTED = tuple([
+ 'compilation', 'preparation',
+ 'validation', 'running',
+ ])
+ """Events will be emitted for the start and finish of each engine
+ activity defined above, the actual event name that can be registered
+ to subscribe to will be ``${event}_start`` and ``${event}_end`` where
+ the ``${event}`` in this pseudo-variable will be one of these events.
+ """
+
def __init__(self, name, jobboard,
persistence=None, engine=None,
engine_options=None, wait_timeout=None):
@@ -105,10 +115,24 @@ class BlockingConductor(base.Conductor):
with ExitStack() as stack:
for listener in listeners:
stack.enter_context(listener)
- LOG.debug("Dispatching engine %s for job: %s", engine, job)
+ LOG.debug("Dispatching engine for job '%s'", job)
consume = True
try:
- engine.run()
+ for stage_func, event_name in [(engine.compile, 'compilation'),
+ (engine.prepare, 'preparation'),
+ (engine.validate, 'validation'),
+ (engine.run, 'running')]:
+ self._notifier.notify("%s_start" % event_name, {
+ 'job': job,
+ 'engine': engine,
+ 'conductor': self,
+ })
+ stage_func()
+ self._notifier.notify("%s_end" % event_name, {
+ 'job': job,
+ 'engine': engine,
+ 'conductor': self,
+ })
except excp.WrappedFailure as e:
if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)):
consume = False
diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py
index 6e46fff..6942423 100644
--- a/taskflow/conductors/base.py
+++ b/taskflow/conductors/base.py
@@ -20,6 +20,7 @@ import six
from taskflow import engines
from taskflow import exceptions as excp
+from taskflow.types import notifier
@six.add_metaclass(abc.ABCMeta)
@@ -45,6 +46,18 @@ class Conductor(object):
self._engine_options = engine_options.copy()
self._persistence = persistence
self._lock = threading.RLock()
+ self._notifier = notifier.Notifier()
+
+ @property
+ def notifier(self):
+ """The conductor actions (or other state changes) notifier.
+
+ NOTE(harlowja): different conductor implementations may emit
+ different events + event details at different times, so refer to your
+ conductor documentation to know exactly what can and what can not be
+ subscribed to.
+ """
+ return self._notifier
def _flow_detail_from_job(self, job):
"""Extracts a flow detail from a job (via some manner).