diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-11-16 16:27:42 -0800 |
|---|---|---|
| committer | Joshua Harlow <harlowja@gmail.com> | 2016-01-09 22:42:17 -0800 |
| commit | 8e8156c488dea8ae876b112c30e41e60da4f5be7 (patch) | |
| tree | a4c119dda7d2338bd8c63aad593bbb42d24f3e1f /taskflow/engines/action_engine/runtime.py | |
| parent | f555a35f3081ba492db15d7bda11fbe50f2a8349 (diff) | |
| download | taskflow-8e8156c488dea8ae876b112c30e41e60da4f5be7.tar.gz | |
Allow for alterations in decider 'area of influence'
Christmas came early.
Closes-Bug: #1479466
Change-Id: I931d826690c925f022dbfffe9afb7bf41345b1d0
Diffstat (limited to 'taskflow/engines/action_engine/runtime.py')
| -rw-r--r-- | taskflow/engines/action_engine/runtime.py | 45 |
1 files changed, 31 insertions, 14 deletions
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 41dbd77..f6fc9c1 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -19,6 +19,7 @@ import functools from futurist import waiters +from taskflow import deciders as de from taskflow.engines.action_engine.actions import retry as ra from taskflow.engines.action_engine.actions import task as ta from taskflow.engines.action_engine import analyzer as an @@ -27,11 +28,17 @@ from taskflow.engines.action_engine import compiler as com from taskflow.engines.action_engine import completer as co from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scopes as sc +from taskflow.engines.action_engine import traversal as tr from taskflow import exceptions as exc -from taskflow.flow import LINK_DECIDER from taskflow import states as st from taskflow.utils import misc +from taskflow.flow import (LINK_DECIDER, LINK_DECIDER_DEPTH) # noqa + +# Small helper to make the edge decider tuples more easily useable... +_EdgeDecider = collections.namedtuple('_EdgeDecider', + 'from_node,kind,decider,depth') + class Runtime(object): """A aggregate of runtime objects, properties, ... used during execution. @@ -42,7 +49,8 @@ class Runtime(object): """ def __init__(self, compilation, storage, atom_notifier, - task_executor, retry_executor, options=None): + task_executor, retry_executor, + options=None): self._atom_notifier = atom_notifier self._task_executor = task_executor self._retry_executor = retry_executor @@ -51,11 +59,10 @@ class Runtime(object): self._atom_cache = {} self._options = misc.safe_copy_dict(options) - @staticmethod - def _walk_edge_deciders(graph, atom): + def _walk_edge_deciders(self, graph, atom): """Iterates through all nodes, deciders that alter atoms execution.""" # This is basically a reverse breadth first exploration, with - # special logic to further traverse down flow nodes... + # special logic to further traverse down flow nodes as needed... predecessors_iter = graph.predecessors_iter nodes = collections.deque((u_node, atom) for u_node in predecessors_iter(atom)) @@ -63,14 +70,19 @@ class Runtime(object): while nodes: u_node, v_node = nodes.popleft() u_node_kind = graph.node[u_node]['kind'] + u_v_data = graph.adj[u_node][v_node] try: - yield (u_node, u_node_kind, - graph.adj[u_node][v_node][LINK_DECIDER]) + decider = u_v_data[LINK_DECIDER] + decider_depth = u_v_data.get(LINK_DECIDER_DEPTH) + if decider_depth is None: + decider_depth = de.Depth.ALL + yield _EdgeDecider(u_node, u_node_kind, + decider, decider_depth) except KeyError: pass if u_node_kind == com.FLOW and u_node not in visited: - # Avoid re-exploring the same flow if we get to this - # same flow by a different *future* path... + # Avoid re-exploring the same flow if we get to this same + # flow by a different *future* path... visited.add(u_node) # Since we *currently* jump over flow node(s), we need to make # sure that any prior decider that was directed at this flow @@ -108,7 +120,7 @@ class Runtime(object): graph = self._compilation.execution_graph for node, node_data in graph.nodes_iter(data=True): node_kind = node_data['kind'] - if node_kind == com.FLOW: + if node_kind in com.FLOWS: continue elif node_kind in com.ATOMS: check_transition_handler = check_transition_handlers[node_kind] @@ -128,6 +140,10 @@ class Runtime(object): metadata['edge_deciders'] = tuple(deciders_it) metadata['action'] = action self._atom_cache[node.name] = metadata + # TODO(harlowja): optimize the different decider depths to avoid + # repeated full successor searching; this can be done by searching + # for the widest depth of parent(s), and limiting the search of + # children by the that depth. @property def compilation(self): @@ -246,11 +262,12 @@ class Runtime(object): def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE): """Resets a atoms subgraph to the given state and intention. - The subgraph is contained of all of the atoms successors. + The subgraph is contained of **all** of the atoms successors. """ - return self.reset_atoms( - self.analyzer.iterate_connected_atoms(atom), - state=state, intention=intention) + execution_graph = self._compilation.execution_graph + atoms_it = tr.depth_first_iterate(execution_graph, atom, + tr.Direction.FORWARD) + return self.reset_atoms(atoms_it, state=state, intention=intention) def retry_subflow(self, retry): """Prepares a retrys + its subgraph for execution. |
