summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/runtime.py
diff options
context:
space:
mode:
authorJoshua Harlow <jxharlow@godaddy.com>2016-06-10 17:35:01 -0700
committerChangBo Guo(gcb) <glongwave@gmail.com>2017-01-03 06:37:57 +0000
commit774d59813417a945fdbe50316786effd1203e8f7 (patch)
treefcf0225f05135e955cf1ccaf397b347fc41f26cf /taskflow/engines/action_engine/runtime.py
parentdc15edc8423ea81120f0007938a66b46f2ab9b0f (diff)
downloadtaskflow-774d59813417a945fdbe50316786effd1203e8f7.tar.gz
Rename engine analyzer to be named selector
This moves out the engine next to run (or revert) selection process to a single class that just does this and moves out the common functions the analyzer class provided to the runtime object (which all components can access). This makes it easier to adjust the selection algorithm in different ways. Change-Id: I091c69297a7bff60729791d3ca6c3fae14d6eea5
Diffstat (limited to 'taskflow/engines/action_engine/runtime.py')
-rw-r--r--taskflow/engines/action_engine/runtime.py50
1 files changed, 46 insertions, 4 deletions
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py
index 3d5a207..32ce052 100644
--- a/taskflow/engines/action_engine/runtime.py
+++ b/taskflow/engines/action_engine/runtime.py
@@ -22,12 +22,12 @@ from futurist import waiters
from taskflow import deciders as de
from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
-from taskflow.engines.action_engine import analyzer as an
from taskflow.engines.action_engine import builder as bu
from taskflow.engines.action_engine import compiler as com
from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
+from taskflow.engines.action_engine import selector as se
from taskflow.engines.action_engine import traversal as tr
from taskflow import exceptions as exc
from taskflow import logging
@@ -163,8 +163,8 @@ class Runtime(object):
return self._options
@misc.cachedproperty
- def analyzer(self):
- return an.Analyzer(self)
+ def selector(self):
+ return se.Selector(self)
@misc.cachedproperty
def builder(self):
@@ -245,6 +245,48 @@ class Runtime(object):
# Various helper methods used by the runtime components; not for public
# consumption...
+ def iterate_retries(self, state=None):
+ """Iterates retry atoms that match the provided state.
+
+ If no state is provided it will yield back all retry atoms.
+ """
+ if state:
+ atoms = list(self.iterate_nodes((com.RETRY,)))
+ atom_states = self._storage.get_atoms_states(atom.name
+ for atom in atoms)
+ for atom in atoms:
+ atom_state, _atom_intention = atom_states[atom.name]
+ if atom_state == state:
+ yield atom
+ else:
+ for atom in self.iterate_nodes((com.RETRY,)):
+ yield atom
+
+ def iterate_nodes(self, allowed_kinds):
+ """Yields back all nodes of specified kinds in the execution graph."""
+ graph = self._compilation.execution_graph
+ for node, node_data in graph.nodes_iter(data=True):
+ if node_data['kind'] in allowed_kinds:
+ yield node
+
+ def is_success(self):
+ """Checks if all atoms in the execution graph are in 'happy' state."""
+ atoms = list(self.iterate_nodes(com.ATOMS))
+ atom_states = self._storage.get_atoms_states(atom.name
+ for atom in atoms)
+ for atom in atoms:
+ atom_state, _atom_intention = atom_states[atom.name]
+ if atom_state == st.IGNORE:
+ continue
+ if atom_state != st.SUCCESS:
+ return False
+ return True
+
+ def find_retry(self, node):
+ """Returns the retry atom associated to the given node (or none)."""
+ graph = self._compilation.execution_graph
+ return graph.node[node].get(com.RETRY)
+
def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE):
"""Resets all the provided atoms to the given state and intention."""
tweaked = []
@@ -261,7 +303,7 @@ class Runtime(object):
def reset_all(self, state=st.PENDING, intention=st.EXECUTE):
"""Resets all atoms to the given state and intention."""
- return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS),
+ return self.reset_atoms(self.iterate_nodes(com.ATOMS),
state=state, intention=intention)
def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):