summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/executor.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2014-08-23 20:09:10 -0700
committerJoshua Harlow <harlowja@gmail.com>2014-10-18 20:21:48 -0700
commitb014fc7d48969bd6812a11a5a0342c9324108876 (patch)
tree76bf5edab6fd6fbc0751c95caf149625a3db35ea /taskflow/engines/action_engine/executor.py
parentc95a681a9fdee40eefb098b52e98548092a6dbe4 (diff)
downloadtaskflow-b014fc7d48969bd6812a11a5a0342c9324108876.tar.gz
Add a futures type that can unify our future functionality
Move the currently existing green future executor and associated code to a new futures types module so that it can be accessed from this new location (TODO: deprecate the old location and link the old to the new for one release so that we can remove the old link in N + 1 release). This unifies the API that the existing pool (thread or process) future executors and the green thread pool future executor, and the newly added synchronous executor (replacing the previous `make_completed_future` function) provide so there usage is as seamless as possible. Part of blueprint top-level-types Change-Id: Ie5500eaa7f4425edb604b2dd13a15f82909a673b
Diffstat (limited to 'taskflow/engines/action_engine/executor.py')
-rw-r--r--taskflow/engines/action_engine/executor.py17
1 files changed, 9 insertions, 8 deletions
diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py
index 40a671e..83da3b6 100644
--- a/taskflow/engines/action_engine/executor.py
+++ b/taskflow/engines/action_engine/executor.py
@@ -16,10 +16,10 @@
import abc
-from concurrent import futures
import six
from taskflow import task as _task
+from taskflow.types import futures
from taskflow.utils import async_utils
from taskflow.utils import misc
from taskflow.utils import threading_utils
@@ -94,19 +94,20 @@ class TaskExecutorBase(object):
class SerialTaskExecutor(TaskExecutorBase):
"""Execute task one after another."""
+ def __init__(self):
+ self._executor = futures.SynchronousExecutor()
+
def execute_task(self, task, task_uuid, arguments, progress_callback=None):
- return async_utils.make_completed_future(
- _execute_task(task, arguments, progress_callback))
+ return self._executor.submit(_execute_task, task, arguments,
+ progress_callback)
def revert_task(self, task, task_uuid, arguments, result, failures,
progress_callback=None):
- return async_utils.make_completed_future(
- _revert_task(task, arguments, result,
- failures, progress_callback))
+ return self._executor.submit(_revert_task, task, arguments, result,
+ failures, progress_callback)
def wait_for_any(self, fs, timeout=None):
- # NOTE(imelnikov): this executor returns only done futures.
- return (fs, set())
+ return async_utils.wait_for_any(fs, timeout)
class ParallelTaskExecutor(TaskExecutorBase):