diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-07-08 23:52:37 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-07-08 23:52:37 +0000 |
commit | df5fbe469c4c85c771bc0afcc5026d9790edc942 (patch) | |
tree | aa207f64c0c1e47e95a725884995b2d6b72bd406 /taskflow/conductors | |
parent | 10b3826de831e643cc5259051b96307a11f678ac (diff) | |
parent | 63c67302488bea87e2e2870dc83e73aacddeaa34 (diff) | |
download | taskflow-df5fbe469c4c85c771bc0afcc5026d9790edc942.tar.gz |
Merge "Notify on the individual engine steps"
Diffstat (limited to 'taskflow/conductors')
-rw-r--r-- | taskflow/conductors/backends/impl_blocking.py | 28 | ||||
-rw-r--r-- | taskflow/conductors/base.py | 13 |
2 files changed, 39 insertions, 2 deletions
diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py index b534632..26a75c8 100644 --- a/taskflow/conductors/backends/impl_blocking.py +++ b/taskflow/conductors/backends/impl_blocking.py @@ -57,6 +57,16 @@ class BlockingConductor(base.Conductor): upon the jobboard capabilities to automatically abandon these jobs. """ + START_FINISH_EVENTS_EMITTED = tuple([ + 'compilation', 'preparation', + 'validation', 'running', + ]) + """Events will be emitted for the start and finish of each engine + activity defined above, the actual event name that can be registered + to subscribe to will be ``${event}_start`` and ``${event}_end`` where + the ``${event}`` in this pseudo-variable will be one of these events. + """ + def __init__(self, name, jobboard, persistence=None, engine=None, engine_options=None, wait_timeout=None): @@ -108,10 +118,24 @@ class BlockingConductor(base.Conductor): with ExitStack() as stack: for listener in listeners: stack.enter_context(listener) - LOG.debug("Dispatching engine %s for job: %s", engine, job) + LOG.debug("Dispatching engine for job '%s'", job) consume = True try: - engine.run() + for stage_func, event_name in [(engine.compile, 'compilation'), + (engine.prepare, 'preparation'), + (engine.validate, 'validation'), + (engine.run, 'running')]: + self._notifier.notify("%s_start" % event_name, { + 'job': job, + 'engine': engine, + 'conductor': self, + }) + stage_func() + self._notifier.notify("%s_end" % event_name, { + 'job': job, + 'engine': engine, + 'conductor': self, + }) except excp.WrappedFailure as e: if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)): consume = False diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index 6e46fff..6942423 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -20,6 +20,7 @@ import six from taskflow import engines from taskflow import exceptions as excp +from taskflow.types import notifier @six.add_metaclass(abc.ABCMeta) @@ -45,6 +46,18 @@ class Conductor(object): self._engine_options = engine_options.copy() self._persistence = persistence self._lock = threading.RLock() + self._notifier = notifier.Notifier() + + @property + def notifier(self): + """The conductor actions (or other state changes) notifier. + + NOTE(harlowja): different conductor implementations may emit + different events + event details at different times, so refer to your + conductor documentation to know exactly what can and what can not be + subscribed to. + """ + return self._notifier def _flow_detail_from_job(self, job): """Extracts a flow detail from a job (via some manner). |