diff options
| author | Joshua Harlow <harlowja@gmail.com> | 2014-08-23 20:09:10 -0700 |
|---|---|---|
| committer | Joshua Harlow <harlowja@gmail.com> | 2014-10-18 20:21:48 -0700 |
| commit | b014fc7d48969bd6812a11a5a0342c9324108876 (patch) | |
| tree | 76bf5edab6fd6fbc0751c95caf149625a3db35ea /taskflow/engines/action_engine/executor.py | |
| parent | c95a681a9fdee40eefb098b52e98548092a6dbe4 (diff) | |
| download | taskflow-b014fc7d48969bd6812a11a5a0342c9324108876.tar.gz | |
Add a futures type that can unify our future functionality
Move the currently existing green future executor and associated
code to a new futures types module so that it can be accessed from
this new location (TODO: deprecate the old location and link the
old to the new for one release so that we can remove the old link
in N + 1 release).
This unifies the API that the existing pool (thread or process) future
executors and the green thread pool future executor, and the newly added
synchronous executor (replacing the previous `make_completed_future`
function) provide so there usage is as seamless as possible.
Part of blueprint top-level-types
Change-Id: Ie5500eaa7f4425edb604b2dd13a15f82909a673b
Diffstat (limited to 'taskflow/engines/action_engine/executor.py')
| -rw-r--r-- | taskflow/engines/action_engine/executor.py | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 40a671e..83da3b6 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -16,10 +16,10 @@ import abc -from concurrent import futures import six from taskflow import task as _task +from taskflow.types import futures from taskflow.utils import async_utils from taskflow.utils import misc from taskflow.utils import threading_utils @@ -94,19 +94,20 @@ class TaskExecutorBase(object): class SerialTaskExecutor(TaskExecutorBase): """Execute task one after another.""" + def __init__(self): + self._executor = futures.SynchronousExecutor() + def execute_task(self, task, task_uuid, arguments, progress_callback=None): - return async_utils.make_completed_future( - _execute_task(task, arguments, progress_callback)) + return self._executor.submit(_execute_task, task, arguments, + progress_callback) def revert_task(self, task, task_uuid, arguments, result, failures, progress_callback=None): - return async_utils.make_completed_future( - _revert_task(task, arguments, result, - failures, progress_callback)) + return self._executor.submit(_revert_task, task, arguments, result, + failures, progress_callback) def wait_for_any(self, fs, timeout=None): - # NOTE(imelnikov): this executor returns only done futures. - return (fs, set()) + return async_utils.wait_for_any(fs, timeout) class ParallelTaskExecutor(TaskExecutorBase): |
