diff options
author | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-06-30 20:26:26 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-08-21 15:14:35 -0700 |
commit | 296e660cd3036db46a58ed3b16c907c769454f05 (patch) | |
tree | 5d9cf1cf1a8902409f12184b3c1ded50dcdbac33 /taskflow/conductors | |
parent | 73125ee0fd4fa42002e06285b193365c3f70c776 (diff) | |
download | taskflow-296e660cd3036db46a58ed3b16c907c769454f05.tar.gz |
Have the dispatch_job function return a future
To make it easier to add in a multi-threaded conductor
convert the base dispatch_job function to return a future
object. This future object will contain a single result,
whether the job should be consumed or abandoned. In the
single threaded conductor its dispatch_job function will
return a future, after completing the job (in a multi
threaded conductor it would not return a future after
doing the work).
Change-Id: I077334820d36c64e272e93d158e3a0cd0d66a937
Diffstat (limited to 'taskflow/conductors')
-rw-r--r-- | taskflow/conductors/base.py | 7 | ||||
-rw-r--r-- | taskflow/conductors/single_threaded.py | 6 |
2 files changed, 8 insertions, 5 deletions
diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index e7c9887..c881346 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -108,9 +108,10 @@ class Conductor(object): """Dispatches a claimed job for work completion. Accepts a single (already claimed) job and causes it to be run in - an engine. Returns a boolean that signifies whether the job should - be consumed. The job is consumed upon completion (unless False is - returned which will signify the job should be abandoned instead). + an engine. Returns a future object that represented the work to be + completed sometime in the future. The future should return a single + boolean from its result() method. This boolean determines whether the + job will be consumed (true) or whether it should be abandoned (false). :param job: A job instance that has already been claimed by the jobboard. diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py index 5e78e34..23994e7 100644 --- a/taskflow/conductors/single_threaded.py +++ b/taskflow/conductors/single_threaded.py @@ -21,6 +21,7 @@ from taskflow.conductors import base from taskflow import exceptions as excp from taskflow.listeners import logging as logging_listener from taskflow.types import timing as tt +from taskflow.utils import async_utils from taskflow.utils import lock_utils LOG = logging.getLogger(__name__) @@ -116,7 +117,7 @@ class SingleThreadedConductor(base.Conductor): job, exc_info=True) else: LOG.info("Job completed successfully: %s", job) - return consume + return async_utils.make_completed_future(consume) def run(self): self._dead.clear() @@ -136,12 +137,13 @@ class SingleThreadedConductor(base.Conductor): continue consume = False try: - consume = self._dispatch_job(job) + f = self._dispatch_job(job) except Exception: LOG.warn("Job dispatching failed: %s", job, exc_info=True) else: dispatched += 1 + consume = f.result() try: if consume: self._jobboard.consume(job, self._name) |