diff options
Diffstat (limited to 'taskflow/engines/action_engine/runtime.py')
-rw-r--r-- | taskflow/engines/action_engine/runtime.py | 50 |
1 files changed, 46 insertions, 4 deletions
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 3d5a207..32ce052 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -22,12 +22,12 @@ from futurist import waiters from taskflow import deciders as de from taskflow.engines.action_engine.actions import retry as ra from taskflow.engines.action_engine.actions import task as ta -from taskflow.engines.action_engine import analyzer as an from taskflow.engines.action_engine import builder as bu from taskflow.engines.action_engine import compiler as com from taskflow.engines.action_engine import completer as co from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scopes as sc +from taskflow.engines.action_engine import selector as se from taskflow.engines.action_engine import traversal as tr from taskflow import exceptions as exc from taskflow import logging @@ -163,8 +163,8 @@ class Runtime(object): return self._options @misc.cachedproperty - def analyzer(self): - return an.Analyzer(self) + def selector(self): + return se.Selector(self) @misc.cachedproperty def builder(self): @@ -245,6 +245,48 @@ class Runtime(object): # Various helper methods used by the runtime components; not for public # consumption... + def iterate_retries(self, state=None): + """Iterates retry atoms that match the provided state. + + If no state is provided it will yield back all retry atoms. + """ + if state: + atoms = list(self.iterate_nodes((com.RETRY,))) + atom_states = self._storage.get_atoms_states(atom.name + for atom in atoms) + for atom in atoms: + atom_state, _atom_intention = atom_states[atom.name] + if atom_state == state: + yield atom + else: + for atom in self.iterate_nodes((com.RETRY,)): + yield atom + + def iterate_nodes(self, allowed_kinds): + """Yields back all nodes of specified kinds in the execution graph.""" + graph = self._compilation.execution_graph + for node, node_data in graph.nodes_iter(data=True): + if node_data['kind'] in allowed_kinds: + yield node + + def is_success(self): + """Checks if all atoms in the execution graph are in 'happy' state.""" + atoms = list(self.iterate_nodes(com.ATOMS)) + atom_states = self._storage.get_atoms_states(atom.name + for atom in atoms) + for atom in atoms: + atom_state, _atom_intention = atom_states[atom.name] + if atom_state == st.IGNORE: + continue + if atom_state != st.SUCCESS: + return False + return True + + def find_retry(self, node): + """Returns the retry atom associated to the given node (or none).""" + graph = self._compilation.execution_graph + return graph.node[node].get(com.RETRY) + def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE): """Resets all the provided atoms to the given state and intention.""" tweaked = [] @@ -261,7 +303,7 @@ class Runtime(object): def reset_all(self, state=st.PENDING, intention=st.EXECUTE): """Resets all atoms to the given state and intention.""" - return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS), + return self.reset_atoms(self.iterate_nodes(com.ATOMS), state=state, intention=intention) def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE): |