summaryrefslogtreecommitdiff
path: root/taskflow/conductors
diff options
context:
space:
mode:
authorMin Pae <sputnik13@gmail.com>2015-11-04 09:27:25 -0800
committerMin Pae <sputnik13@gmail.com>2015-11-16 12:20:28 -0800
commit6918b8fab0d303bb7596df657f24897bbc67a1fd (patch)
treedfe1fc4432146911ab88bcd41ffa19679773ea9e /taskflow/conductors
parentae9c701f9073941fbe063d2b7854ff6eed5b5fc0 (diff)
downloadtaskflow-6918b8fab0d303bb7596df657f24897bbc67a1fd.tar.gz
Adding notification points for job completion
Adding notifications for job completion, both consumed and abandoned, so that a listener can take some action based on job completion. Change-Id: I826285d4bfccd2406df7b59e53a9b724702ed094
Diffstat (limited to 'taskflow/conductors')
-rw-r--r--taskflow/conductors/backends/impl_executor.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/taskflow/conductors/backends/impl_executor.py b/taskflow/conductors/backends/impl_executor.py
index c47488d..d61f3e0 100644
--- a/taskflow/conductors/backends/impl_executor.py
+++ b/taskflow/conductors/backends/impl_executor.py
@@ -106,6 +106,20 @@ class ExecutorConductor(base.Conductor):
activity defined above, the actual event name that can be registered
to subscribe to will be ``${event}_start`` and ``${event}_end`` where
the ``${event}`` in this pseudo-variable will be one of these events.
+
+ .. deprecated:: 1.23.0
+ Use :py:attr:`~EVENTS_EMITTED`
+ """
+
+ EVENTS_EMITTED = tuple([
+ 'compilation_start', 'compilation_end',
+ 'preparation_start', 'preparation_end',
+ 'validation_start', 'validation_end',
+ 'running_start', 'running_end',
+ 'job_consumed', 'job_abandoned',
+ ])
+ """Events will be emitted for each of the events above. The event is
+ emitted to listeners registered with the conductor.
"""
def __init__(self, name, jobboard,
@@ -217,8 +231,18 @@ class ExecutorConductor(base.Conductor):
try:
if consume:
self._jobboard.consume(job, self._name)
+ self._notifier.notify("job_consumed", {
+ 'job': job,
+ 'conductor': self,
+ 'persistence': self._persistence,
+ })
else:
self._jobboard.abandon(job, self._name)
+ self._notifier.notify("job_abandoned", {
+ 'job': job,
+ 'conductor': self,
+ 'persistence': self._persistence,
+ })
except (excp.JobFailure, excp.NotFound):
if consume:
self._log.warn("Failed job consumption: %s", job,