summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/runtime.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-11-16 16:27:42 -0800
committerJoshua Harlow <harlowja@gmail.com>2016-01-09 22:42:17 -0800
commit8e8156c488dea8ae876b112c30e41e60da4f5be7 (patch)
treea4c119dda7d2338bd8c63aad593bbb42d24f3e1f /taskflow/engines/action_engine/runtime.py
parentf555a35f3081ba492db15d7bda11fbe50f2a8349 (diff)
downloadtaskflow-8e8156c488dea8ae876b112c30e41e60da4f5be7.tar.gz
Allow for alterations in decider 'area of influence'
Christmas came early. Closes-Bug: #1479466 Change-Id: I931d826690c925f022dbfffe9afb7bf41345b1d0
Diffstat (limited to 'taskflow/engines/action_engine/runtime.py')
-rw-r--r--taskflow/engines/action_engine/runtime.py45
1 files changed, 31 insertions, 14 deletions
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py
index 41dbd77..f6fc9c1 100644
--- a/taskflow/engines/action_engine/runtime.py
+++ b/taskflow/engines/action_engine/runtime.py
@@ -19,6 +19,7 @@ import functools
from futurist import waiters
+from taskflow import deciders as de
from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
from taskflow.engines.action_engine import analyzer as an
@@ -27,11 +28,17 @@ from taskflow.engines.action_engine import compiler as com
from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
+from taskflow.engines.action_engine import traversal as tr
from taskflow import exceptions as exc
-from taskflow.flow import LINK_DECIDER
from taskflow import states as st
from taskflow.utils import misc
+from taskflow.flow import (LINK_DECIDER, LINK_DECIDER_DEPTH) # noqa
+
+# Small helper to make the edge decider tuples more easily useable...
+_EdgeDecider = collections.namedtuple('_EdgeDecider',
+ 'from_node,kind,decider,depth')
+
class Runtime(object):
"""A aggregate of runtime objects, properties, ... used during execution.
@@ -42,7 +49,8 @@ class Runtime(object):
"""
def __init__(self, compilation, storage, atom_notifier,
- task_executor, retry_executor, options=None):
+ task_executor, retry_executor,
+ options=None):
self._atom_notifier = atom_notifier
self._task_executor = task_executor
self._retry_executor = retry_executor
@@ -51,11 +59,10 @@ class Runtime(object):
self._atom_cache = {}
self._options = misc.safe_copy_dict(options)
- @staticmethod
- def _walk_edge_deciders(graph, atom):
+ def _walk_edge_deciders(self, graph, atom):
"""Iterates through all nodes, deciders that alter atoms execution."""
# This is basically a reverse breadth first exploration, with
- # special logic to further traverse down flow nodes...
+ # special logic to further traverse down flow nodes as needed...
predecessors_iter = graph.predecessors_iter
nodes = collections.deque((u_node, atom)
for u_node in predecessors_iter(atom))
@@ -63,14 +70,19 @@ class Runtime(object):
while nodes:
u_node, v_node = nodes.popleft()
u_node_kind = graph.node[u_node]['kind']
+ u_v_data = graph.adj[u_node][v_node]
try:
- yield (u_node, u_node_kind,
- graph.adj[u_node][v_node][LINK_DECIDER])
+ decider = u_v_data[LINK_DECIDER]
+ decider_depth = u_v_data.get(LINK_DECIDER_DEPTH)
+ if decider_depth is None:
+ decider_depth = de.Depth.ALL
+ yield _EdgeDecider(u_node, u_node_kind,
+ decider, decider_depth)
except KeyError:
pass
if u_node_kind == com.FLOW and u_node not in visited:
- # Avoid re-exploring the same flow if we get to this
- # same flow by a different *future* path...
+ # Avoid re-exploring the same flow if we get to this same
+ # flow by a different *future* path...
visited.add(u_node)
# Since we *currently* jump over flow node(s), we need to make
# sure that any prior decider that was directed at this flow
@@ -108,7 +120,7 @@ class Runtime(object):
graph = self._compilation.execution_graph
for node, node_data in graph.nodes_iter(data=True):
node_kind = node_data['kind']
- if node_kind == com.FLOW:
+ if node_kind in com.FLOWS:
continue
elif node_kind in com.ATOMS:
check_transition_handler = check_transition_handlers[node_kind]
@@ -128,6 +140,10 @@ class Runtime(object):
metadata['edge_deciders'] = tuple(deciders_it)
metadata['action'] = action
self._atom_cache[node.name] = metadata
+ # TODO(harlowja): optimize the different decider depths to avoid
+ # repeated full successor searching; this can be done by searching
+ # for the widest depth of parent(s), and limiting the search of
+ # children by the that depth.
@property
def compilation(self):
@@ -246,11 +262,12 @@ class Runtime(object):
def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):
"""Resets a atoms subgraph to the given state and intention.
- The subgraph is contained of all of the atoms successors.
+ The subgraph is contained of **all** of the atoms successors.
"""
- return self.reset_atoms(
- self.analyzer.iterate_connected_atoms(atom),
- state=state, intention=intention)
+ execution_graph = self._compilation.execution_graph
+ atoms_it = tr.depth_first_iterate(execution_graph, atom,
+ tr.Direction.FORWARD)
+ return self.reset_atoms(atoms_it, state=state, intention=intention)
def retry_subflow(self, retry):
"""Prepares a retrys + its subgraph for execution.