summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/runtime.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow/engines/action_engine/runtime.py')
-rw-r--r--taskflow/engines/action_engine/runtime.py50
1 files changed, 46 insertions, 4 deletions
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py
index 3d5a207..32ce052 100644
--- a/taskflow/engines/action_engine/runtime.py
+++ b/taskflow/engines/action_engine/runtime.py
@@ -22,12 +22,12 @@ from futurist import waiters
from taskflow import deciders as de
from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
-from taskflow.engines.action_engine import analyzer as an
from taskflow.engines.action_engine import builder as bu
from taskflow.engines.action_engine import compiler as com
from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
+from taskflow.engines.action_engine import selector as se
from taskflow.engines.action_engine import traversal as tr
from taskflow import exceptions as exc
from taskflow import logging
@@ -163,8 +163,8 @@ class Runtime(object):
return self._options
@misc.cachedproperty
- def analyzer(self):
- return an.Analyzer(self)
+ def selector(self):
+ return se.Selector(self)
@misc.cachedproperty
def builder(self):
@@ -245,6 +245,48 @@ class Runtime(object):
# Various helper methods used by the runtime components; not for public
# consumption...
+ def iterate_retries(self, state=None):
+ """Iterates retry atoms that match the provided state.
+
+ If no state is provided it will yield back all retry atoms.
+ """
+ if state:
+ atoms = list(self.iterate_nodes((com.RETRY,)))
+ atom_states = self._storage.get_atoms_states(atom.name
+ for atom in atoms)
+ for atom in atoms:
+ atom_state, _atom_intention = atom_states[atom.name]
+ if atom_state == state:
+ yield atom
+ else:
+ for atom in self.iterate_nodes((com.RETRY,)):
+ yield atom
+
+ def iterate_nodes(self, allowed_kinds):
+ """Yields back all nodes of specified kinds in the execution graph."""
+ graph = self._compilation.execution_graph
+ for node, node_data in graph.nodes_iter(data=True):
+ if node_data['kind'] in allowed_kinds:
+ yield node
+
+ def is_success(self):
+ """Checks if all atoms in the execution graph are in 'happy' state."""
+ atoms = list(self.iterate_nodes(com.ATOMS))
+ atom_states = self._storage.get_atoms_states(atom.name
+ for atom in atoms)
+ for atom in atoms:
+ atom_state, _atom_intention = atom_states[atom.name]
+ if atom_state == st.IGNORE:
+ continue
+ if atom_state != st.SUCCESS:
+ return False
+ return True
+
+ def find_retry(self, node):
+ """Returns the retry atom associated to the given node (or none)."""
+ graph = self._compilation.execution_graph
+ return graph.node[node].get(com.RETRY)
+
def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE):
"""Resets all the provided atoms to the given state and intention."""
tweaked = []
@@ -261,7 +303,7 @@ class Runtime(object):
def reset_all(self, state=st.PENDING, intention=st.EXECUTE):
"""Resets all atoms to the given state and intention."""
- return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS),
+ return self.reset_atoms(self.iterate_nodes(com.ATOMS),
state=state, intention=intention)
def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):