summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/runner.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2014-08-24 21:11:42 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-09-04 18:14:11 -0700
commitbfaa109821b3f849ce4f5290d7a8248531aa1042 (patch)
treea3bc36a73ae9954284bfd137e165acf7f403a556 /taskflow/engines/action_engine/runner.py
parent7c3332e49bed1d51ca7aea696105e5d3e7087e75 (diff)
downloadtaskflow-bfaa109821b3f849ce4f5290d7a8248531aa1042.tar.gz
Tweak engine iteration 'close-up shop' runtime path
1. Have the runner yield the final set of failures instead of raising them, this allows the same yield syntax to be used for all exit points that the runner run_iter() produces and now raise failures from the main engine run loop to match this change. 2. Use a context manager instead of try/finally to start and stop the action engines task executor (teenie niceness...) 3. When the engine run_iter() is used and the generator that is returned is closed, instead of breaking from the run loop, which can leave running tasks incomplete instead continue running and signal to the runner that the engine has suspended itself. This ensures that the running atoms are not lost when the generator from run_iter() is closed (for whatever reason) before finishing. Also adds a bunch of useful tests that directly test the runner instead of the indirect testing that we were doing before. Fixes bug 1361013 Change-Id: I1b598e26f0b3877c8f7004f87bacdb7f5e9c9897
Diffstat (limited to 'taskflow/engines/action_engine/runner.py')
-rw-r--r--taskflow/engines/action_engine/runner.py20
1 files changed, 10 insertions, 10 deletions
diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/runner.py
index 0120bd6..b48f66a 100644
--- a/taskflow/engines/action_engine/runner.py
+++ b/taskflow/engines/action_engine/runner.py
@@ -72,12 +72,12 @@ class Runner(object):
timeout = _WAITING_TIMEOUT
# Prepare flow to be resumed
- yield st.RESUMING
+ yield (st.RESUMING, [])
next_nodes = self._completer.resume()
next_nodes.update(self._analyzer.get_next_nodes())
# Schedule nodes to be worked on
- yield st.SCHEDULING
+ yield (st.SCHEDULING, [])
if self.is_running():
not_done, failures = self._scheduler.schedule(next_nodes)
else:
@@ -90,7 +90,7 @@ class Runner(object):
# preempt those tasks (maybe in the future we will be better able to do
# this).
while not_done:
- yield st.WAITING
+ yield (st.WAITING, [])
# TODO(harlowja): maybe we should start doing 'yield from' this
# call sometime in the future, or equivalent that will work in
@@ -101,7 +101,7 @@ class Runner(object):
# failures). If failures occurred just continue processing what
# is running (so that we don't leave it abandoned) but do not
# schedule anything new.
- yield st.ANALYZING
+ yield (st.ANALYZING, [])
next_nodes = set()
for future in done:
try:
@@ -119,7 +119,7 @@ class Runner(object):
else:
next_nodes.update(more_nodes)
if next_nodes and not failures and self.is_running():
- yield st.SCHEDULING
+ yield (st.SCHEDULING, [])
# Recheck incase someone suspended it.
if self.is_running():
more_not_done, failures = self._scheduler.schedule(
@@ -127,10 +127,10 @@ class Runner(object):
not_done.update(more_not_done)
if failures:
- misc.Failure.reraise_if_any(failures)
- if self._analyzer.get_next_nodes():
- yield st.SUSPENDED
+ yield (st.FAILURE, failures)
+ elif self._analyzer.get_next_nodes():
+ yield (st.SUSPENDED, [])
elif self._analyzer.is_success():
- yield st.SUCCESS
+ yield (st.SUCCESS, [])
else:
- yield st.REVERTED
+ yield (st.REVERTED, [])