diff options
| author | Joshua Harlow <jxharlow@godaddy.com> | 2016-06-10 17:35:01 -0700 |
|---|---|---|
| committer | ChangBo Guo(gcb) <glongwave@gmail.com> | 2017-01-03 06:37:57 +0000 |
| commit | 774d59813417a945fdbe50316786effd1203e8f7 (patch) | |
| tree | fcf0225f05135e955cf1ccaf397b347fc41f26cf /taskflow/engines/action_engine/runtime.py | |
| parent | dc15edc8423ea81120f0007938a66b46f2ab9b0f (diff) | |
| download | taskflow-774d59813417a945fdbe50316786effd1203e8f7.tar.gz | |
Rename engine analyzer to be named selector
This moves out the engine next to run (or revert)
selection process to a single class that just does
this and moves out the common functions the analyzer
class provided to the runtime object (which all
components can access).
This makes it easier to adjust the selection algorithm
in different ways.
Change-Id: I091c69297a7bff60729791d3ca6c3fae14d6eea5
Diffstat (limited to 'taskflow/engines/action_engine/runtime.py')
| -rw-r--r-- | taskflow/engines/action_engine/runtime.py | 50 |
1 files changed, 46 insertions, 4 deletions
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 3d5a207..32ce052 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -22,12 +22,12 @@ from futurist import waiters from taskflow import deciders as de from taskflow.engines.action_engine.actions import retry as ra from taskflow.engines.action_engine.actions import task as ta -from taskflow.engines.action_engine import analyzer as an from taskflow.engines.action_engine import builder as bu from taskflow.engines.action_engine import compiler as com from taskflow.engines.action_engine import completer as co from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scopes as sc +from taskflow.engines.action_engine import selector as se from taskflow.engines.action_engine import traversal as tr from taskflow import exceptions as exc from taskflow import logging @@ -163,8 +163,8 @@ class Runtime(object): return self._options @misc.cachedproperty - def analyzer(self): - return an.Analyzer(self) + def selector(self): + return se.Selector(self) @misc.cachedproperty def builder(self): @@ -245,6 +245,48 @@ class Runtime(object): # Various helper methods used by the runtime components; not for public # consumption... + def iterate_retries(self, state=None): + """Iterates retry atoms that match the provided state. + + If no state is provided it will yield back all retry atoms. + """ + if state: + atoms = list(self.iterate_nodes((com.RETRY,))) + atom_states = self._storage.get_atoms_states(atom.name + for atom in atoms) + for atom in atoms: + atom_state, _atom_intention = atom_states[atom.name] + if atom_state == state: + yield atom + else: + for atom in self.iterate_nodes((com.RETRY,)): + yield atom + + def iterate_nodes(self, allowed_kinds): + """Yields back all nodes of specified kinds in the execution graph.""" + graph = self._compilation.execution_graph + for node, node_data in graph.nodes_iter(data=True): + if node_data['kind'] in allowed_kinds: + yield node + + def is_success(self): + """Checks if all atoms in the execution graph are in 'happy' state.""" + atoms = list(self.iterate_nodes(com.ATOMS)) + atom_states = self._storage.get_atoms_states(atom.name + for atom in atoms) + for atom in atoms: + atom_state, _atom_intention = atom_states[atom.name] + if atom_state == st.IGNORE: + continue + if atom_state != st.SUCCESS: + return False + return True + + def find_retry(self, node): + """Returns the retry atom associated to the given node (or none).""" + graph = self._compilation.execution_graph + return graph.node[node].get(com.RETRY) + def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE): """Resets all the provided atoms to the given state and intention.""" tweaked = [] @@ -261,7 +303,7 @@ class Runtime(object): def reset_all(self, state=st.PENDING, intention=st.EXECUTE): """Resets all atoms to the given state and intention.""" - return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS), + return self.reset_atoms(self.iterate_nodes(com.ATOMS), state=state, intention=intention) def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE): |
