summaryrefslogtreecommitdiff
path: root/taskflow/conductors
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-06-30 20:26:26 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-08-21 15:14:35 -0700
commit296e660cd3036db46a58ed3b16c907c769454f05 (patch)
tree5d9cf1cf1a8902409f12184b3c1ded50dcdbac33 /taskflow/conductors
parent73125ee0fd4fa42002e06285b193365c3f70c776 (diff)
downloadtaskflow-296e660cd3036db46a58ed3b16c907c769454f05.tar.gz
Have the dispatch_job function return a future
To make it easier to add in a multi-threaded conductor convert the base dispatch_job function to return a future object. This future object will contain a single result, whether the job should be consumed or abandoned. In the single threaded conductor its dispatch_job function will return a future, after completing the job (in a multi threaded conductor it would not return a future after doing the work). Change-Id: I077334820d36c64e272e93d158e3a0cd0d66a937
Diffstat (limited to 'taskflow/conductors')
-rw-r--r--taskflow/conductors/base.py7
-rw-r--r--taskflow/conductors/single_threaded.py6
2 files changed, 8 insertions, 5 deletions
diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py
index e7c9887..c881346 100644
--- a/taskflow/conductors/base.py
+++ b/taskflow/conductors/base.py
@@ -108,9 +108,10 @@ class Conductor(object):
"""Dispatches a claimed job for work completion.
Accepts a single (already claimed) job and causes it to be run in
- an engine. Returns a boolean that signifies whether the job should
- be consumed. The job is consumed upon completion (unless False is
- returned which will signify the job should be abandoned instead).
+ an engine. Returns a future object that represented the work to be
+ completed sometime in the future. The future should return a single
+ boolean from its result() method. This boolean determines whether the
+ job will be consumed (true) or whether it should be abandoned (false).
:param job: A job instance that has already been claimed by the
jobboard.
diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py
index 5e78e34..23994e7 100644
--- a/taskflow/conductors/single_threaded.py
+++ b/taskflow/conductors/single_threaded.py
@@ -21,6 +21,7 @@ from taskflow.conductors import base
from taskflow import exceptions as excp
from taskflow.listeners import logging as logging_listener
from taskflow.types import timing as tt
+from taskflow.utils import async_utils
from taskflow.utils import lock_utils
LOG = logging.getLogger(__name__)
@@ -116,7 +117,7 @@ class SingleThreadedConductor(base.Conductor):
job, exc_info=True)
else:
LOG.info("Job completed successfully: %s", job)
- return consume
+ return async_utils.make_completed_future(consume)
def run(self):
self._dead.clear()
@@ -136,12 +137,13 @@ class SingleThreadedConductor(base.Conductor):
continue
consume = False
try:
- consume = self._dispatch_job(job)
+ f = self._dispatch_job(job)
except Exception:
LOG.warn("Job dispatching failed: %s", job,
exc_info=True)
else:
dispatched += 1
+ consume = f.result()
try:
if consume:
self._jobboard.consume(job, self._name)