summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow/engines/action_engine/runner.py')
-rw-r--r--taskflow/engines/action_engine/runner.py20
1 files changed, 10 insertions, 10 deletions
diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/runner.py
index 0120bd6..b48f66a 100644
--- a/taskflow/engines/action_engine/runner.py
+++ b/taskflow/engines/action_engine/runner.py
@@ -72,12 +72,12 @@ class Runner(object):
timeout = _WAITING_TIMEOUT
# Prepare flow to be resumed
- yield st.RESUMING
+ yield (st.RESUMING, [])
next_nodes = self._completer.resume()
next_nodes.update(self._analyzer.get_next_nodes())
# Schedule nodes to be worked on
- yield st.SCHEDULING
+ yield (st.SCHEDULING, [])
if self.is_running():
not_done, failures = self._scheduler.schedule(next_nodes)
else:
@@ -90,7 +90,7 @@ class Runner(object):
# preempt those tasks (maybe in the future we will be better able to do
# this).
while not_done:
- yield st.WAITING
+ yield (st.WAITING, [])
# TODO(harlowja): maybe we should start doing 'yield from' this
# call sometime in the future, or equivalent that will work in
@@ -101,7 +101,7 @@ class Runner(object):
# failures). If failures occurred just continue processing what
# is running (so that we don't leave it abandoned) but do not
# schedule anything new.
- yield st.ANALYZING
+ yield (st.ANALYZING, [])
next_nodes = set()
for future in done:
try:
@@ -119,7 +119,7 @@ class Runner(object):
else:
next_nodes.update(more_nodes)
if next_nodes and not failures and self.is_running():
- yield st.SCHEDULING
+ yield (st.SCHEDULING, [])
# Recheck incase someone suspended it.
if self.is_running():
more_not_done, failures = self._scheduler.schedule(
@@ -127,10 +127,10 @@ class Runner(object):
not_done.update(more_not_done)
if failures:
- misc.Failure.reraise_if_any(failures)
- if self._analyzer.get_next_nodes():
- yield st.SUSPENDED
+ yield (st.FAILURE, failures)
+ elif self._analyzer.get_next_nodes():
+ yield (st.SUSPENDED, [])
elif self._analyzer.is_success():
- yield st.SUCCESS
+ yield (st.SUCCESS, [])
else:
- yield st.REVERTED
+ yield (st.REVERTED, [])