diff options
author | Min Pae <sputnik13@gmail.com> | 2015-11-04 09:27:25 -0800 |
---|---|---|
committer | Min Pae <sputnik13@gmail.com> | 2015-11-16 12:20:28 -0800 |
commit | 6918b8fab0d303bb7596df657f24897bbc67a1fd (patch) | |
tree | dfe1fc4432146911ab88bcd41ffa19679773ea9e /taskflow/conductors | |
parent | ae9c701f9073941fbe063d2b7854ff6eed5b5fc0 (diff) | |
download | taskflow-6918b8fab0d303bb7596df657f24897bbc67a1fd.tar.gz |
Adding notification points for job completion
Adding notifications for job completion, both consumed and abandoned, so that a
listener can take some action based on job completion.
Change-Id: I826285d4bfccd2406df7b59e53a9b724702ed094
Diffstat (limited to 'taskflow/conductors')
-rw-r--r-- | taskflow/conductors/backends/impl_executor.py | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/taskflow/conductors/backends/impl_executor.py b/taskflow/conductors/backends/impl_executor.py index c47488d..d61f3e0 100644 --- a/taskflow/conductors/backends/impl_executor.py +++ b/taskflow/conductors/backends/impl_executor.py @@ -106,6 +106,20 @@ class ExecutorConductor(base.Conductor): activity defined above, the actual event name that can be registered to subscribe to will be ``${event}_start`` and ``${event}_end`` where the ``${event}`` in this pseudo-variable will be one of these events. + + .. deprecated:: 1.23.0 + Use :py:attr:`~EVENTS_EMITTED` + """ + + EVENTS_EMITTED = tuple([ + 'compilation_start', 'compilation_end', + 'preparation_start', 'preparation_end', + 'validation_start', 'validation_end', + 'running_start', 'running_end', + 'job_consumed', 'job_abandoned', + ]) + """Events will be emitted for each of the events above. The event is + emitted to listeners registered with the conductor. """ def __init__(self, name, jobboard, @@ -217,8 +231,18 @@ class ExecutorConductor(base.Conductor): try: if consume: self._jobboard.consume(job, self._name) + self._notifier.notify("job_consumed", { + 'job': job, + 'conductor': self, + 'persistence': self._persistence, + }) else: self._jobboard.abandon(job, self._name) + self._notifier.notify("job_abandoned", { + 'job': job, + 'conductor': self, + 'persistence': self._persistence, + }) except (excp.JobFailure, excp.NotFound): if consume: self._log.warn("Failed job consumption: %s", job, |