diff options
author | Joshua Harlow <harlowja@gmail.com> | 2014-08-24 21:11:42 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-09-04 18:14:11 -0700 |
commit | bfaa109821b3f849ce4f5290d7a8248531aa1042 (patch) | |
tree | a3bc36a73ae9954284bfd137e165acf7f403a556 /taskflow/engines/action_engine/runner.py | |
parent | 7c3332e49bed1d51ca7aea696105e5d3e7087e75 (diff) | |
download | taskflow-bfaa109821b3f849ce4f5290d7a8248531aa1042.tar.gz |
Tweak engine iteration 'close-up shop' runtime path
1. Have the runner yield the final set of failures instead of
raising them, this allows the same yield syntax to be used
for all exit points that the runner run_iter() produces and
now raise failures from the main engine run loop to match this
change.
2. Use a context manager instead of try/finally to start and
stop the action engines task executor (teenie niceness...)
3. When the engine run_iter() is used and the generator that is
returned is closed, instead of breaking from the run loop, which
can leave running tasks incomplete instead continue running and
signal to the runner that the engine has suspended itself. This
ensures that the running atoms are not lost when the generator from
run_iter() is closed (for whatever reason) before finishing.
Also adds a bunch of useful tests that directly test the runner instead
of the indirect testing that we were doing before.
Fixes bug 1361013
Change-Id: I1b598e26f0b3877c8f7004f87bacdb7f5e9c9897
Diffstat (limited to 'taskflow/engines/action_engine/runner.py')
-rw-r--r-- | taskflow/engines/action_engine/runner.py | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/runner.py index 0120bd6..b48f66a 100644 --- a/taskflow/engines/action_engine/runner.py +++ b/taskflow/engines/action_engine/runner.py @@ -72,12 +72,12 @@ class Runner(object): timeout = _WAITING_TIMEOUT # Prepare flow to be resumed - yield st.RESUMING + yield (st.RESUMING, []) next_nodes = self._completer.resume() next_nodes.update(self._analyzer.get_next_nodes()) # Schedule nodes to be worked on - yield st.SCHEDULING + yield (st.SCHEDULING, []) if self.is_running(): not_done, failures = self._scheduler.schedule(next_nodes) else: @@ -90,7 +90,7 @@ class Runner(object): # preempt those tasks (maybe in the future we will be better able to do # this). while not_done: - yield st.WAITING + yield (st.WAITING, []) # TODO(harlowja): maybe we should start doing 'yield from' this # call sometime in the future, or equivalent that will work in @@ -101,7 +101,7 @@ class Runner(object): # failures). If failures occurred just continue processing what # is running (so that we don't leave it abandoned) but do not # schedule anything new. - yield st.ANALYZING + yield (st.ANALYZING, []) next_nodes = set() for future in done: try: @@ -119,7 +119,7 @@ class Runner(object): else: next_nodes.update(more_nodes) if next_nodes and not failures and self.is_running(): - yield st.SCHEDULING + yield (st.SCHEDULING, []) # Recheck incase someone suspended it. if self.is_running(): more_not_done, failures = self._scheduler.schedule( @@ -127,10 +127,10 @@ class Runner(object): not_done.update(more_not_done) if failures: - misc.Failure.reraise_if_any(failures) - if self._analyzer.get_next_nodes(): - yield st.SUSPENDED + yield (st.FAILURE, failures) + elif self._analyzer.get_next_nodes(): + yield (st.SUSPENDED, []) elif self._analyzer.is_success(): - yield st.SUCCESS + yield (st.SUCCESS, []) else: - yield st.REVERTED + yield (st.REVERTED, []) |