summaryrefslogtreecommitdiff
path: root/taskflow/conductors
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-07-08 23:52:37 +0000
committerGerrit Code Review <review@openstack.org>2015-07-08 23:52:37 +0000
commitdf5fbe469c4c85c771bc0afcc5026d9790edc942 (patch)
treeaa207f64c0c1e47e95a725884995b2d6b72bd406 /taskflow/conductors
parent10b3826de831e643cc5259051b96307a11f678ac (diff)
parent63c67302488bea87e2e2870dc83e73aacddeaa34 (diff)
downloadtaskflow-df5fbe469c4c85c771bc0afcc5026d9790edc942.tar.gz
Merge "Notify on the individual engine steps"
Diffstat (limited to 'taskflow/conductors')
-rw-r--r--taskflow/conductors/backends/impl_blocking.py28
-rw-r--r--taskflow/conductors/base.py13
2 files changed, 39 insertions, 2 deletions
diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py
index b534632..26a75c8 100644
--- a/taskflow/conductors/backends/impl_blocking.py
+++ b/taskflow/conductors/backends/impl_blocking.py
@@ -57,6 +57,16 @@ class BlockingConductor(base.Conductor):
upon the jobboard capabilities to automatically abandon these jobs.
"""
+ START_FINISH_EVENTS_EMITTED = tuple([
+ 'compilation', 'preparation',
+ 'validation', 'running',
+ ])
+ """Events will be emitted for the start and finish of each engine
+ activity defined above, the actual event name that can be registered
+ to subscribe to will be ``${event}_start`` and ``${event}_end`` where
+ the ``${event}`` in this pseudo-variable will be one of these events.
+ """
+
def __init__(self, name, jobboard,
persistence=None, engine=None,
engine_options=None, wait_timeout=None):
@@ -108,10 +118,24 @@ class BlockingConductor(base.Conductor):
with ExitStack() as stack:
for listener in listeners:
stack.enter_context(listener)
- LOG.debug("Dispatching engine %s for job: %s", engine, job)
+ LOG.debug("Dispatching engine for job '%s'", job)
consume = True
try:
- engine.run()
+ for stage_func, event_name in [(engine.compile, 'compilation'),
+ (engine.prepare, 'preparation'),
+ (engine.validate, 'validation'),
+ (engine.run, 'running')]:
+ self._notifier.notify("%s_start" % event_name, {
+ 'job': job,
+ 'engine': engine,
+ 'conductor': self,
+ })
+ stage_func()
+ self._notifier.notify("%s_end" % event_name, {
+ 'job': job,
+ 'engine': engine,
+ 'conductor': self,
+ })
except excp.WrappedFailure as e:
if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)):
consume = False
diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py
index 6e46fff..6942423 100644
--- a/taskflow/conductors/base.py
+++ b/taskflow/conductors/base.py
@@ -20,6 +20,7 @@ import six
from taskflow import engines
from taskflow import exceptions as excp
+from taskflow.types import notifier
@six.add_metaclass(abc.ABCMeta)
@@ -45,6 +46,18 @@ class Conductor(object):
self._engine_options = engine_options.copy()
self._persistence = persistence
self._lock = threading.RLock()
+ self._notifier = notifier.Notifier()
+
+ @property
+ def notifier(self):
+ """The conductor actions (or other state changes) notifier.
+
+ NOTE(harlowja): different conductor implementations may emit
+ different events + event details at different times, so refer to your
+ conductor documentation to know exactly what can and what can not be
+ subscribed to.
+ """
+ return self._notifier
def _flow_detail_from_job(self, job):
"""Extracts a flow detail from a job (via some manner).