summaryrefslogtreecommitdiff
path: root/taskflow/conductors
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2015-03-13 00:03:35 -0700
committerJoshua Harlow <harlowja@gmail.com>2015-03-13 20:29:05 -0700
commita1f9321c3fda9af6ed4a240091f9c0c4917dc75d (patch)
tree700da99664ddaca03e004d336a7610ff47c6242f /taskflow/conductors
parent384eb9a065641141da20af1942d8c9e3ba518ada (diff)
downloadtaskflow-a1f9321c3fda9af6ed4a240091f9c0c4917dc75d.tar.gz
Ensure we register & deregister conductor listeners
Instead of just registering engine listeners that were returned, make sure we also deregister them when the engine has either finished or failed. This ensures that if a listener has hold of any resources (or other) that it can clean those up and be sure that its deregister call will be made. Change-Id: Ia1420c435156362698702fed2bda11c2a0fef803
Diffstat (limited to 'taskflow/conductors')
-rw-r--r--taskflow/conductors/backends/impl_blocking.py15
-rw-r--r--taskflow/conductors/base.py2
2 files changed, 12 insertions, 5 deletions
diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py
index 1f6a9ee..b53452a 100644
--- a/taskflow/conductors/backends/impl_blocking.py
+++ b/taskflow/conductors/backends/impl_blocking.py
@@ -21,6 +21,7 @@ from taskflow import logging
from taskflow.types import timing as tt
from taskflow.utils import async_utils
from taskflow.utils import deprecation
+from taskflow.utils import misc
from taskflow.utils import threading_utils
LOG = logging.getLogger(__name__)
@@ -88,11 +89,19 @@ class BlockingConductor(base.Conductor):
def dispatching(self):
return not self._dead.is_set()
+ def _listeners_from_job(self, job, engine):
+ listeners = super(BlockingConductor, self)._listeners_from_job(job,
+ engine)
+ listeners.append(logging_listener.LoggingListener(engine, log=LOG))
+ return listeners
+
def _dispatch_job(self, job):
engine = self._engine_from_job(job)
- consume = True
- with logging_listener.LoggingListener(engine, log=LOG):
+ listeners = self._listeners_from_job(job, engine)
+ with misc.ListenerStack(LOG) as stack:
+ stack.register(listeners)
LOG.debug("Dispatching engine %s for job: %s", engine, job)
+ consume = True
try:
engine.run()
except excp.WrappedFailure as e:
@@ -117,7 +126,7 @@ class BlockingConductor(base.Conductor):
job, exc_info=True)
else:
LOG.info("Job completed successfully: %s", job)
- return async_utils.make_completed_future(consume)
+ return async_utils.make_completed_future(consume)
def run(self):
self._dead.clear()
diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py
index 33c4441..7a6b8ce 100644
--- a/taskflow/conductors/base.py
+++ b/taskflow/conductors/base.py
@@ -92,8 +92,6 @@ class Conductor(object):
engine=self._engine,
backend=self._persistence,
**self._engine_options)
- for listener in self._listeners_from_job(job, engine):
- listener.register()
return engine
def _listeners_from_job(self, job, engine):