summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-09-04 13:14:25 -0700
committerJoshua Harlow <harlowja@gmail.com>2015-10-01 22:38:30 -0700
commit79d25e69e8300db5debdfd717ffd80f91c246c10 (patch)
tree134e9764a021dc190a1b158fb27f222d9304908e
parentba4704cd18ab6d799a2de59bdf0feab9b5430a30 (diff)
downloadtaskflow-79d25e69e8300db5debdfd717ffd80f91c246c10.tar.gz
Simplify flow action engine compilation1.22.0
Instead of the added complexity of discarding flow nodes we can simplify the compilation process by just retaining them and jumping over them in further iteration and graph and tree runtime usage. This change moves toward a model that does just this, which makes it also easier to in the future use the newly added flow graph nodes to do meaningful things (like use them as a point to change which flow_detail is used). Change-Id: Icb1695f4b995a0392f940837514774768f222db4
-rw-r--r--taskflow/engines/action_engine/analyzer.py160
-rw-r--r--taskflow/engines/action_engine/builder.py55
-rw-r--r--taskflow/engines/action_engine/compiler.py261
-rw-r--r--taskflow/engines/action_engine/completer.py38
-rw-r--r--taskflow/engines/action_engine/engine.py28
-rw-r--r--taskflow/engines/action_engine/runtime.py78
-rw-r--r--taskflow/engines/action_engine/scopes.py39
-rw-r--r--taskflow/formatters.py21
-rw-r--r--taskflow/tests/unit/action_engine/test_builder.py13
-rw-r--r--taskflow/tests/unit/action_engine/test_compile.py344
-rw-r--r--taskflow/types/graph.py7
-rw-r--r--taskflow/utils/iter_utils.py13
12 files changed, 533 insertions, 524 deletions
diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py
index 77f7df3..bdde897 100644
--- a/taskflow/engines/action_engine/analyzer.py
+++ b/taskflow/engines/action_engine/analyzer.py
@@ -18,10 +18,31 @@ import abc
import itertools
import weakref
-from networkx.algorithms import traversal
import six
+from taskflow.engines.action_engine import compiler as co
from taskflow import states as st
+from taskflow.utils import iter_utils
+
+
+def _depth_first_iterate(graph, connected_to_functors, initial_nodes_iter):
+ """Iterates connected nodes in execution graph (from starting set).
+
+ Jumps over nodes with ``noop`` attribute (does not yield them back).
+ """
+ stack = list(initial_nodes_iter)
+ while stack:
+ node = stack.pop()
+ node_attrs = graph.node[node]
+ if not node_attrs.get('noop'):
+ yield node
+ try:
+ node_kind = node_attrs['kind']
+ connected_to_functor = connected_to_functors[node_kind]
+ except KeyError:
+ pass
+ else:
+ stack.extend(connected_to_functor(node))
@six.add_metaclass(abc.ABCMeta)
@@ -74,8 +95,8 @@ class IgnoreDecider(Decider):
state to ``IGNORE`` so that they are ignored in future runtime
activities.
"""
- successors_iter = runtime.analyzer.iterate_subgraph(self._atom)
- runtime.reset_nodes(itertools.chain([self._atom], successors_iter),
+ successors_iter = runtime.analyzer.iterate_connected_atoms(self._atom)
+ runtime.reset_atoms(itertools.chain([self._atom], successors_iter),
state=st.IGNORE, intention=st.IGNORE)
@@ -105,66 +126,67 @@ class Analyzer(object):
self._storage = runtime.storage
self._execution_graph = runtime.compilation.execution_graph
- def get_next_nodes(self, node=None):
- """Get next nodes to run (originating from node or all nodes)."""
- if node is None:
- execute = self.browse_nodes_for_execute()
- revert = self.browse_nodes_for_revert()
- return execute + revert
- state = self.get_state(node)
- intention = self._storage.get_atom_intention(node.name)
+ def iter_next_atoms(self, atom=None):
+ """Iterate next atoms to run (originating from atom or all atoms)."""
+ if atom is None:
+ return iter_utils.unique_seen(self.browse_atoms_for_execute(),
+ self.browse_atoms_for_revert())
+ state = self.get_state(atom)
+ intention = self._storage.get_atom_intention(atom.name)
if state == st.SUCCESS:
if intention == st.REVERT:
- return [
- (node, NoOpDecider()),
- ]
+ return iter([
+ (atom, NoOpDecider()),
+ ])
elif intention == st.EXECUTE:
- return self.browse_nodes_for_execute(node)
+ return self.browse_atoms_for_execute(atom=atom)
else:
- return []
+ return iter([])
elif state == st.REVERTED:
- return self.browse_nodes_for_revert(node)
+ return self.browse_atoms_for_revert(atom=atom)
elif state == st.FAILURE:
- return self.browse_nodes_for_revert()
+ return self.browse_atoms_for_revert()
else:
- return []
+ return iter([])
- def browse_nodes_for_execute(self, node=None):
- """Browse next nodes to execute.
+ def browse_atoms_for_execute(self, atom=None):
+ """Browse next atoms to execute.
- This returns a collection of nodes that *may* be ready to be
- executed, if given a specific node it will only examine the successors
- of that node, otherwise it will examine the whole graph.
+ This returns a iterator of atoms that *may* be ready to be
+ executed, if given a specific atom, it will only examine the successors
+ of that atom, otherwise it will examine the whole graph.
"""
- if node is not None:
- nodes = self._execution_graph.successors(node)
+ if atom is None:
+ atom_it = self.iterate_nodes(co.ATOMS)
else:
- nodes = self._execution_graph.nodes_iter()
- ready_nodes = []
- for node in nodes:
- is_ready, late_decider = self._get_maybe_ready_for_execute(node)
+ successors_iter = self._execution_graph.successors_iter
+ atom_it = _depth_first_iterate(self._execution_graph,
+ {co.FLOW: successors_iter},
+ successors_iter(atom))
+ for atom in atom_it:
+ is_ready, late_decider = self._get_maybe_ready_for_execute(atom)
if is_ready:
- ready_nodes.append((node, late_decider))
- return ready_nodes
+ yield (atom, late_decider)
- def browse_nodes_for_revert(self, node=None):
- """Browse next nodes to revert.
+ def browse_atoms_for_revert(self, atom=None):
+ """Browse next atoms to revert.
- This returns a collection of nodes that *may* be ready to be be
- reverted, if given a specific node it will only examine the
- predecessors of that node, otherwise it will examine the whole
+ This returns a iterator of atoms that *may* be ready to be be
+ reverted, if given a specific atom it will only examine the
+ predecessors of that atom, otherwise it will examine the whole
graph.
"""
- if node is not None:
- nodes = self._execution_graph.predecessors(node)
+ if atom is None:
+ atom_it = self.iterate_nodes(co.ATOMS)
else:
- nodes = self._execution_graph.nodes_iter()
- ready_nodes = []
- for node in nodes:
- is_ready, late_decider = self._get_maybe_ready_for_revert(node)
+ predecessors_iter = self._execution_graph.predecessors_iter
+ atom_it = _depth_first_iterate(self._execution_graph,
+ {co.FLOW: predecessors_iter},
+ predecessors_iter(atom))
+ for atom in atom_it:
+ is_ready, late_decider = self._get_maybe_ready_for_revert(atom)
if is_ready:
- ready_nodes.append((node, late_decider))
- return ready_nodes
+ yield (atom, late_decider)
def _get_maybe_ready(self, atom, transition_to, allowed_intentions,
connected_fetcher, connected_checker,
@@ -187,59 +209,71 @@ class Analyzer(object):
def _get_maybe_ready_for_execute(self, atom):
"""Returns if an atom is *likely* ready to be executed."""
-
def decider_fetcher(atom):
edge_deciders = self._runtime.fetch_edge_deciders(atom)
if edge_deciders:
return IgnoreDecider(atom, edge_deciders)
else:
return NoOpDecider()
-
+ predecessors_iter = self._execution_graph.predecessors_iter
+ connected_fetcher = lambda atom: \
+ _depth_first_iterate(self._execution_graph,
+ {co.FLOW: predecessors_iter},
+ predecessors_iter(atom))
connected_checker = lambda connected_iter: \
all(state == st.SUCCESS and intention == st.EXECUTE
for state, intention in connected_iter)
- connected_fetcher = self._execution_graph.predecessors_iter
return self._get_maybe_ready(atom, st.RUNNING, [st.EXECUTE],
connected_fetcher, connected_checker,
decider_fetcher)
def _get_maybe_ready_for_revert(self, atom):
"""Returns if an atom is *likely* ready to be reverted."""
+ successors_iter = self._execution_graph.successors_iter
+ connected_fetcher = lambda atom: \
+ _depth_first_iterate(self._execution_graph,
+ {co.FLOW: successors_iter},
+ successors_iter(atom))
connected_checker = lambda connected_iter: \
all(state in (st.PENDING, st.REVERTED)
for state, _intention in connected_iter)
decider_fetcher = lambda atom: NoOpDecider()
- connected_fetcher = self._execution_graph.successors_iter
return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY],
connected_fetcher, connected_checker,
decider_fetcher)
- def iterate_subgraph(self, atom):
- """Iterates a subgraph connected to given atom."""
- for _src, dst in traversal.dfs_edges(self._execution_graph, atom):
- yield dst
+ def iterate_connected_atoms(self, atom):
+ """Iterates **all** successor atoms connected to given atom."""
+ successors_iter = self._execution_graph.successors_iter
+ return _depth_first_iterate(
+ self._execution_graph, {
+ co.FLOW: successors_iter,
+ co.TASK: successors_iter,
+ co.RETRY: successors_iter,
+ }, successors_iter(atom))
def iterate_retries(self, state=None):
"""Iterates retry atoms that match the provided state.
If no state is provided it will yield back all retry atoms.
"""
- for atom in self._runtime.fetch_atoms_by_kind('retry'):
+ for atom in self.iterate_nodes((co.RETRY,)):
if not state or self.get_state(atom) == state:
yield atom
- def iterate_all_nodes(self):
- """Yields back all nodes in the execution graph."""
- for node in self._execution_graph.nodes_iter():
- yield node
+ def iterate_nodes(self, allowed_kinds):
+ """Yields back all nodes of specified kinds in the execution graph."""
+ for node, node_data in self._execution_graph.nodes_iter(data=True):
+ if node_data['kind'] in allowed_kinds:
+ yield node
- def find_atom_retry(self, atom):
- """Returns the retry atom associated to the given atom (or none)."""
- return self._execution_graph.node[atom].get('retry')
+ def find_retry(self, node):
+ """Returns the retry atom associated to the given node (or none)."""
+ return self._execution_graph.node[node].get(co.RETRY)
def is_success(self):
- """Checks if all nodes in the execution graph are in 'happy' state."""
- for atom in self.iterate_all_nodes():
+ """Checks if all atoms in the execution graph are in 'happy' state."""
+ for atom in self.iterate_nodes(co.ATOMS):
atom_state = self.get_state(atom)
if atom_state == st.IGNORE:
continue
diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py
index 034e64a..cdf3646 100644
--- a/taskflow/engines/action_engine/builder.py
+++ b/taskflow/engines/action_engine/builder.py
@@ -49,7 +49,7 @@ class MachineMemory(object):
"""State machine memory."""
def __init__(self):
- self.next_nodes = set()
+ self.next_up = set()
self.not_done = set()
self.failures = []
self.done = set()
@@ -115,24 +115,25 @@ class MachineBuilder(object):
# Checks if the storage says the flow is still runnable...
return self._storage.get_flow_state() == st.RUNNING
- def iter_next_nodes(target_node=None, apply_deciders=True):
- # Yields and filters and tweaks the next nodes to execute...
- maybe_nodes = self._analyzer.get_next_nodes(node=target_node)
- for node, late_decider in maybe_nodes:
+ def iter_next_atoms(atom=None, apply_deciders=True):
+ # Yields and filters and tweaks the next atoms to run...
+ maybe_atoms_it = self._analyzer.iter_next_atoms(atom=atom)
+ for atom, late_decider in maybe_atoms_it:
if apply_deciders:
proceed = late_decider.check_and_affect(self._runtime)
if proceed:
- yield node
+ yield atom
else:
- yield node
+ yield atom
def resume(old_state, new_state, event):
# This reaction function just updates the state machines memory
# to include any nodes that need to be executed (from a previous
# attempt, which may be empty if never ran before) and any nodes
# that are now ready to be ran.
- memory.next_nodes.update(self._completer.resume())
- memory.next_nodes.update(iter_next_nodes())
+ memory.next_up.update(
+ iter_utils.unique_seen(self._completer.resume(),
+ iter_next_atoms()))
return SCHEDULE
def game_over(old_state, new_state, event):
@@ -142,17 +143,17 @@ class MachineBuilder(object):
# it is *always* called before the final state is entered.
if memory.failures:
return FAILED
- leftover_nodes = iter_utils.count(
+ leftover_atoms = iter_utils.count(
# Avoid activating the deciders, since at this point
# the engine is finishing and there will be no more further
# work done anyway...
- iter_next_nodes(apply_deciders=False))
- if leftover_nodes:
+ iter_next_atoms(apply_deciders=False))
+ if leftover_atoms:
# Ok we didn't finish (either reverting or executing...) so
# that means we must of been stopped at some point...
LOG.blather("Suspension determined to have been reacted to"
- " since (at least) %s nodes have been left in an"
- " unfinished state", leftover_nodes)
+ " since (at least) %s atoms have been left in an"
+ " unfinished state", leftover_atoms)
return SUSPENDED
elif self._analyzer.is_success():
return SUCCESS
@@ -165,13 +166,13 @@ class MachineBuilder(object):
# if the user of this engine has requested the engine/storage
# that holds this information to stop or suspend); handles failures
# that occur during this process safely...
- if is_runnable() and memory.next_nodes:
- not_done, failures = do_schedule(memory.next_nodes)
+ if is_runnable() and memory.next_up:
+ not_done, failures = do_schedule(memory.next_up)
if not_done:
memory.not_done.update(not_done)
if failures:
memory.failures.extend(failures)
- memory.next_nodes.intersection_update(not_done)
+ memory.next_up.intersection_update(not_done)
return WAIT
def wait(old_state, new_state, event):
@@ -190,13 +191,13 @@ class MachineBuilder(object):
# out what nodes are now ready to be ran (and then triggering those
# nodes to be scheduled in the future); handles failures that
# occur during this process safely...
- next_nodes = set()
+ next_up = set()
while memory.done:
fut = memory.done.pop()
- node = fut.atom
+ atom = fut.atom
try:
event, result = fut.result()
- retain = do_complete(node, event, result)
+ retain = do_complete(atom, event, result)
if isinstance(result, failure.Failure):
if retain:
memory.failures.append(result)
@@ -208,24 +209,24 @@ class MachineBuilder(object):
# is not enabled, which would suck...)
if LOG.isEnabledFor(logging.DEBUG):
intention = self._storage.get_atom_intention(
- node.name)
+ atom.name)
LOG.debug("Discarding failure '%s' (in"
" response to event '%s') under"
" completion units request during"
- " completion of node '%s' (intention"
+ " completion of atom '%s' (intention"
" is to %s)", result, event,
- node, intention)
+ atom, intention)
except Exception:
memory.failures.append(failure.Failure())
else:
try:
- more_nodes = set(iter_next_nodes(target_node=node))
+ more_work = set(iter_next_atoms(atom=atom))
except Exception:
memory.failures.append(failure.Failure())
else:
- next_nodes.update(more_nodes)
- if is_runnable() and next_nodes and not memory.failures:
- memory.next_nodes.update(next_nodes)
+ next_up.update(more_work)
+ if is_runnable() and next_up and not memory.failures:
+ memory.next_up.update(next_up)
return SCHEDULE
elif memory.not_done:
return WAIT
diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py
index 50ce4eb..0d3e288 100644
--- a/taskflow/engines/action_engine/compiler.py
+++ b/taskflow/engines/action_engine/compiler.py
@@ -14,10 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
-import collections
import threading
import fasteners
+import six
from taskflow import exceptions as exc
from taskflow import flow
@@ -28,18 +28,35 @@ from taskflow.types import tree as tr
from taskflow.utils import iter_utils
from taskflow.utils import misc
+from taskflow.flow import (LINK_INVARIANT, LINK_RETRY) # noqa
+
LOG = logging.getLogger(__name__)
-_RETRY_EDGE_DATA = {
- flow.LINK_RETRY: True,
-}
-_EDGE_INVARIANTS = (flow.LINK_INVARIANT, flow.LINK_MANUAL, flow.LINK_RETRY)
-_EDGE_REASONS = flow.LINK_REASONS
+# Constants attached to node attributes in the execution graph (and tree
+# node metadata), provided as constants here and constants in the compilation
+# class (so that users will not have to import this file to access them); but
+# provide them as module constants so that internal code can more
+# easily access them...
+TASK = 'task'
+RETRY = 'retry'
+FLOW = 'flow'
+
+# Quite often used together, so make a tuple everyone can share...
+ATOMS = (TASK, RETRY)
class Compilation(object):
"""The result of a compilers compile() is this *immutable* object."""
+ #: Task nodes will have a ``kind`` attribute/metadata key with this value.
+ TASK = TASK
+
+ #: Retry nodes will have a ``kind`` attribute/metadata key with this value.
+ RETRY = RETRY
+
+ #: Flow nodes will have a ``kind`` attribute/metadata key with this value.
+ FLOW = FLOW
+
def __init__(self, execution_graph, hierarchy):
self._execution_graph = execution_graph
self._hierarchy = hierarchy
@@ -55,6 +72,12 @@ class Compilation(object):
return self._hierarchy
+def _overlap_occurence_detector(to_graph, from_graph):
+ """Returns how many nodes in 'from' graph are in 'to' graph (if any)."""
+ return iter_utils.count(node for node in from_graph.nodes_iter()
+ if node in to_graph)
+
+
def _add_update_edges(graph, nodes_from, nodes_to, attr_dict=None):
"""Adds/updates edges from nodes to other nodes in the specified graph.
@@ -79,118 +102,7 @@ def _add_update_edges(graph, nodes_from, nodes_to, attr_dict=None):
graph.add_edge(u, v, attr_dict=attr_dict.copy())
-class Linker(object):
- """Compiler helper that adds pattern(s) constraints onto a graph."""
-
- @staticmethod
- def _is_not_empty(graph):
- # Returns true if the given graph is *not* empty...
- return graph.number_of_nodes() > 0
-
- @staticmethod
- def _find_first_decomposed(node, priors,
- decomposed_members, decomposed_filter):
- # How this works; traverse backwards and find only the predecessor
- # items that are actually connected to this entity, and avoid any
- # linkage that is not directly connected. This is guaranteed to be
- # valid since we always iter_links() over predecessors before
- # successors in all currently known patterns; a queue is used here
- # since it is possible for a node to have 2+ different predecessors so
- # we must search back through all of them in a reverse BFS order...
- #
- # Returns the first decomposed graph of those nodes (including the
- # passed in node) that passes the provided filter
- # function (returns none if none match).
- frontier = collections.deque([node])
- # NOTE(harowja): None is in this initial set since the first prior in
- # the priors list has None as its predecessor (which we don't want to
- # look for a decomposed member of).
- visited = set([None])
- while frontier:
- node = frontier.popleft()
- if node in visited:
- continue
- node_graph = decomposed_members[node]
- if decomposed_filter(node_graph):
- return node_graph
- visited.add(node)
- # TODO(harlowja): optimize this more to avoid searching through
- # things already searched...
- for (u, v) in reversed(priors):
- if node == v:
- # Queue its predecessor to be searched in the future...
- frontier.append(u)
- else:
- return None
-
- def apply_constraints(self, graph, flow, decomposed_members):
- # This list is used to track the links that have been previously
- # iterated over, so that when we are trying to find a entry to
- # connect to that we iterate backwards through this list, finding
- # connected nodes to the current target (lets call it v) and find
- # the first (u_n, or u_n - 1, u_n - 2...) that was decomposed into
- # a non-empty graph. We also retain all predecessors of v so that we
- # can correctly locate u_n - 1 if u_n turns out to have decomposed into
- # an empty graph (and so on).
- priors = []
- # NOTE(harlowja): u, v are flows/tasks (also graph terminology since
- # we are compiling things down into a flattened graph), the meaning
- # of this link iteration via iter_links() is that u -> v (with the
- # provided dictionary attributes, if any).
- for (u, v, attr_dict) in flow.iter_links():
- if not priors:
- priors.append((None, u))
- v_g = decomposed_members[v]
- if not v_g.number_of_nodes():
- priors.append((u, v))
- continue
- invariant = any(attr_dict.get(k) for k in _EDGE_INVARIANTS)
- if not invariant:
- # This is a symbol *only* dependency, connect
- # corresponding providers and consumers to allow the consumer
- # to be executed immediately after the provider finishes (this
- # is an optimization for these types of dependencies...)
- u_g = decomposed_members[u]
- if not u_g.number_of_nodes():
- # This must always exist, but incase it somehow doesn't...
- raise exc.CompilationFailure(
- "Non-invariant link being created from '%s' ->"
- " '%s' even though the target '%s' was found to be"
- " decomposed into an empty graph" % (v, u, u))
- for u in u_g.nodes_iter():
- for v in v_g.nodes_iter():
- # This is using the intersection() method vs the &
- # operator since the latter doesn't work with frozen
- # sets (when used in combination with ordered sets).
- #
- # If this is not done the following happens...
- #
- # TypeError: unsupported operand type(s)
- # for &: 'frozenset' and 'OrderedSet'
- depends_on = u.provides.intersection(v.requires)
- if depends_on:
- edge_attrs = {
- _EDGE_REASONS: frozenset(depends_on),
- }
- _add_update_edges(graph,
- [u], [v],
- attr_dict=edge_attrs)
- else:
- # Connect nodes with no predecessors in v to nodes with no
- # successors in the *first* non-empty predecessor of v (thus
- # maintaining the edge dependency).
- match = self._find_first_decomposed(u, priors,
- decomposed_members,
- self._is_not_empty)
- if match is not None:
- _add_update_edges(graph,
- match.no_successors_iter(),
- list(v_g.no_predecessors_iter()),
- attr_dict=attr_dict)
- priors.append((u, v))
-
-
-class _TaskCompiler(object):
+class TaskCompiler(object):
"""Non-recursive compiler of tasks."""
@staticmethod
@@ -199,71 +111,67 @@ class _TaskCompiler(object):
def compile(self, task, parent=None):
graph = gr.DiGraph(name=task.name)
- graph.add_node(task)
- node = tr.Node(task)
+ graph.add_node(task, kind=TASK)
+ node = tr.Node(task, kind=TASK)
if parent is not None:
parent.add(node)
return graph, node
-class _FlowCompiler(object):
+class FlowCompiler(object):
"""Recursive compiler of flows."""
@staticmethod
def handles(obj):
return isinstance(obj, flow.Flow)
- def __init__(self, deep_compiler_func, linker):
+ def __init__(self, deep_compiler_func):
self._deep_compiler_func = deep_compiler_func
- self._linker = linker
- def _connect_retry(self, retry, graph):
- graph.add_node(retry)
-
- # All nodes that have no predecessors should depend on this retry.
- nodes_to = [n for n in graph.no_predecessors_iter() if n is not retry]
- if nodes_to:
- _add_update_edges(graph, [retry], nodes_to,
- attr_dict=_RETRY_EDGE_DATA)
-
- # Add association for each node of graph that has no existing retry.
- for n in graph.nodes_iter():
- if n is not retry and flow.LINK_RETRY not in graph.node[n]:
- graph.node[n][flow.LINK_RETRY] = retry
-
- @staticmethod
- def _occurence_detector(to_graph, from_graph):
- return iter_utils.count(node for node in from_graph.nodes_iter()
- if node in to_graph)
-
- def _decompose_flow(self, flow, parent=None):
- """Decomposes a flow into a graph, tree node + decomposed subgraphs."""
+ def compile(self, flow, parent=None):
+ """Decomposes a flow into a graph and scope tree hierarchy."""
graph = gr.DiGraph(name=flow.name)
- node = tr.Node(flow)
+ graph.add_node(flow, kind=FLOW, noop=True)
+ tree_node = tr.Node(flow, kind=FLOW, noop=True)
if parent is not None:
- parent.add(node)
+ parent.add(tree_node)
if flow.retry is not None:
- node.add(tr.Node(flow.retry))
- decomposed_members = {}
- for item in flow:
- subgraph, _subnode = self._deep_compiler_func(item, parent=node)
- decomposed_members[item] = subgraph
- if subgraph.number_of_nodes():
- graph = gr.merge_graphs(
- graph, subgraph,
- # We can specialize this to be simpler than the default
- # algorithm which creates overhead that we don't
- # need for our purposes...
- overlap_detector=self._occurence_detector)
- return graph, node, decomposed_members
-
- def compile(self, flow, parent=None):
- graph, node, decomposed_members = self._decompose_flow(flow,
- parent=parent)
- self._linker.apply_constraints(graph, flow, decomposed_members)
+ tree_node.add(tr.Node(flow.retry, kind=RETRY))
+ decomposed = dict(
+ (child, self._deep_compiler_func(child, parent=tree_node)[0])
+ for child in flow)
+ decomposed_graphs = list(six.itervalues(decomposed))
+ graph = gr.merge_graphs(graph, *decomposed_graphs,
+ overlap_detector=_overlap_occurence_detector)
+ for u, v, attr_dict in flow.iter_links():
+ u_graph = decomposed[u]
+ v_graph = decomposed[v]
+ _add_update_edges(graph, u_graph.no_successors_iter(),
+ list(v_graph.no_predecessors_iter()),
+ attr_dict=attr_dict)
if flow.retry is not None:
- self._connect_retry(flow.retry, graph)
- return graph, node
+ graph.add_node(flow.retry, kind=RETRY)
+ _add_update_edges(graph, [flow], [flow.retry],
+ attr_dict={LINK_INVARIANT: True})
+ for node in graph.nodes_iter():
+ if node is not flow.retry and node is not flow:
+ graph.node[node].setdefault(RETRY, flow.retry)
+ from_nodes = [flow.retry]
+ connected_attr_dict = {LINK_INVARIANT: True, LINK_RETRY: True}
+ else:
+ from_nodes = [flow]
+ connected_attr_dict = {LINK_INVARIANT: True}
+ connected_to = [
+ node for node in graph.no_predecessors_iter() if node is not flow
+ ]
+ if connected_to:
+ # Ensure all nodes in this graph(s) that have no
+ # predecessors depend on this flow (or this flow's retry) so that
+ # we can depend on the flow being traversed before its
+ # children (even though at the current time it will be skipped).
+ _add_update_edges(graph, from_nodes, connected_to,
+ attr_dict=connected_attr_dict)
+ return graph, tree_node
class PatternCompiler(object):
@@ -288,8 +196,8 @@ class PatternCompiler(object):
the recursion (now with a decomposed mapping from contained patterns or
atoms to there corresponding subgraph) we have to then connect the
subgraphs (and the atom(s) there-in) that were decomposed for a pattern
- correctly into a new graph (using a :py:class:`.Linker` object to ensure
- the pattern mandated constraints are retained) and then return to the
+ correctly into a new graph and then ensure the pattern mandated
+ constraints are retained. Finally we then return to the
caller (and they will do the same thing up until the root node, which by
that point one graph is created with all contained atoms in the
pattern/nested patterns mandated ordering).
@@ -364,14 +272,10 @@ class PatternCompiler(object):
def __init__(self, root, freeze=True):
self._root = root
self._history = set()
- self._linker = Linker()
self._freeze = freeze
self._lock = threading.Lock()
self._compilation = None
- self._matchers = [
- _FlowCompiler(self._compile, self._linker),
- _TaskCompiler(),
- ]
+ self._matchers = (FlowCompiler(self._compile), TaskCompiler())
self._level = 0
def _compile(self, item, parent=None):
@@ -418,12 +322,17 @@ class PatternCompiler(object):
def _post_compile(self, graph, node):
"""Called after the compilation of the root finishes successfully."""
- dup_names = misc.get_duplicate_keys(graph.nodes_iter(),
- key=lambda node: node.name)
+ dup_names = misc.get_duplicate_keys(
+ (node for node, node_attrs in graph.nodes_iter(data=True)
+ if node_attrs['kind'] in ATOMS),
+ key=lambda node: node.name)
if dup_names:
raise exc.Duplicate(
"Atoms with duplicate names found: %s" % (sorted(dup_names)))
- if graph.number_of_nodes() == 0:
+ atoms = iter_utils.count(
+ node for node, node_attrs in graph.nodes_iter(data=True)
+ if node_attrs['kind'] in ATOMS)
+ if atoms == 0:
raise exc.Empty("Root container '%s' (%s) is empty"
% (self._root, type(self._root)))
self._history.clear()
diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py
index 0ab727a..e3ab54d 100644
--- a/taskflow/engines/action_engine/completer.py
+++ b/taskflow/engines/action_engine/completer.py
@@ -20,6 +20,7 @@ import weakref
from oslo_utils import reflection
import six
+from taskflow.engines.action_engine import compiler as co
from taskflow.engines.action_engine import executor as ex
from taskflow import logging
from taskflow import retry as retry_atom
@@ -62,7 +63,7 @@ class RevertAndRetry(Strategy):
self._retry = retry
def apply(self):
- tweaked = self._runtime.reset_nodes([self._retry], state=None,
+ tweaked = self._runtime.reset_atoms([self._retry], state=None,
intention=st.RETRY)
tweaked.extend(self._runtime.reset_subgraph(self._retry, state=None,
intention=st.REVERT))
@@ -79,8 +80,9 @@ class RevertAll(Strategy):
self._analyzer = runtime.analyzer
def apply(self):
- return self._runtime.reset_nodes(self._analyzer.iterate_all_nodes(),
- state=None, intention=st.REVERT)
+ return self._runtime.reset_atoms(
+ self._analyzer.iterate_nodes(co.ATOMS),
+ state=None, intention=st.REVERT)
class Revert(Strategy):
@@ -93,7 +95,7 @@ class Revert(Strategy):
self._atom = atom
def apply(self):
- tweaked = self._runtime.reset_nodes([self._atom], state=None,
+ tweaked = self._runtime.reset_atoms([self._atom], state=None,
intention=st.REVERT)
tweaked.extend(self._runtime.reset_subgraph(self._atom, state=None,
intention=st.REVERT))
@@ -126,26 +128,26 @@ class Completer(object):
self._retry_action.complete_reversion(retry, result)
def resume(self):
- """Resumes nodes in the contained graph.
+ """Resumes atoms in the contained graph.
- This is done to allow any previously completed or failed nodes to
- be analyzed, there results processed and any potential nodes affected
+ This is done to allow any previously completed or failed atoms to
+ be analyzed, there results processed and any potential atoms affected
to be adjusted as needed.
- This should return a set of nodes which should be the initial set of
- nodes that were previously not finished (due to a RUNNING or REVERTING
+ This should return a set of atoms which should be the initial set of
+ atoms that were previously not finished (due to a RUNNING or REVERTING
attempt not previously finishing).
"""
- for node in self._analyzer.iterate_all_nodes():
- if self._analyzer.get_state(node) == st.FAILURE:
- self._process_atom_failure(node, self._storage.get(node.name))
+ for atom in self._analyzer.iterate_nodes(co.ATOMS):
+ if self._analyzer.get_state(atom) == st.FAILURE:
+ self._process_atom_failure(atom, self._storage.get(atom.name))
for retry in self._analyzer.iterate_retries(st.RETRYING):
self._runtime.retry_subflow(retry)
- unfinished_nodes = set()
- for node in self._analyzer.iterate_all_nodes():
- if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING):
- unfinished_nodes.add(node)
- return unfinished_nodes
+ unfinished_atoms = set()
+ for atom in self._analyzer.iterate_nodes(co.ATOMS):
+ if self._analyzer.get_state(atom) in (st.RUNNING, st.REVERTING):
+ unfinished_atoms.add(atom)
+ return unfinished_atoms
def complete(self, node, event, result):
"""Performs post-execution completion of a node.
@@ -167,7 +169,7 @@ class Completer(object):
def _determine_resolution(self, atom, failure):
"""Determines which resolution strategy to activate/apply."""
- retry = self._analyzer.find_atom_retry(atom)
+ retry = self._analyzer.find_retry(atom)
if retry is not None:
# Ask retry controller what to do in case of failure.
strategy = self._retry_action.on_failure(retry, atom, failure)
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index cc6b1ac..74e150c 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -241,11 +241,10 @@ class ActionEngine(base.Engine):
transient = strutils.bool_from_string(
self._options.get('inject_transient', True))
self.storage.ensure_atoms(
- self._compilation.execution_graph.nodes_iter())
- for node in self._compilation.execution_graph.nodes_iter():
- if node.inject:
- self.storage.inject_atom_args(node.name,
- node.inject,
+ self._runtime.analyzer.iterate_nodes(compiler.ATOMS))
+ for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
+ if atom.inject:
+ self.storage.inject_atom_args(atom.name, atom.inject,
transient=transient)
@fasteners.locked
@@ -255,8 +254,8 @@ class ActionEngine(base.Engine):
# flow/task provided or storage provided, if there are still missing
# dependencies then this flow will fail at runtime (which we can avoid
# by failing at validation time).
- execution_graph = self._compilation.execution_graph
if LOG.isEnabledFor(logging.BLATHER):
+ execution_graph = self._compilation.execution_graph
LOG.blather("Validating scoping and argument visibility for"
" execution graph with %s nodes and %s edges with"
" density %0.3f", execution_graph.number_of_nodes(),
@@ -269,18 +268,17 @@ class ActionEngine(base.Engine):
last_cause = None
last_node = None
missing_nodes = 0
- fetch_func = self.storage.fetch_unsatisfied_args
- for node in execution_graph.nodes_iter():
- node_missing = fetch_func(node.name, node.rebind,
- optional_args=node.optional)
- if node_missing:
- cause = exc.MissingDependencies(node,
- sorted(node_missing),
+ for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
+ atom_missing = self.storage.fetch_unsatisfied_args(
+ atom.name, atom.rebind, optional_args=atom.optional)
+ if atom_missing:
+ cause = exc.MissingDependencies(atom,
+ sorted(atom_missing),
cause=last_cause)
last_cause = cause
- last_node = node
+ last_node = atom
missing_nodes += 1
- missing.update(node_missing)
+ missing.update(atom_missing)
if missing:
# For when a task is provided (instead of a flow) and that
# task is the only item in the graph and its missing deps, avoid
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py
index d97ba96..6780e93 100644
--- a/taskflow/engines/action_engine/runtime.py
+++ b/taskflow/engines/action_engine/runtime.py
@@ -22,12 +22,13 @@ from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
from taskflow.engines.action_engine import analyzer as an
from taskflow.engines.action_engine import builder as bu
+from taskflow.engines.action_engine import compiler as com
from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
-from taskflow import flow
+from taskflow import exceptions as exc
+from taskflow.flow import LINK_DECIDER
from taskflow import states as st
-from taskflow import task
from taskflow.utils import misc
@@ -47,7 +48,6 @@ class Runtime(object):
self._storage = storage
self._compilation = compilation
self._atom_cache = {}
- self._atoms_by_kind = {}
def compile(self):
"""Compiles & caches frequently used execution helper objects.
@@ -59,47 +59,47 @@ class Runtime(object):
specific scheduler and so-on).
"""
change_state_handlers = {
- 'task': functools.partial(self.task_action.change_state,
- progress=0.0),
- 'retry': self.retry_action.change_state,
+ com.TASK: functools.partial(self.task_action.change_state,
+ progress=0.0),
+ com.RETRY: self.retry_action.change_state,
}
schedulers = {
- 'retry': self.retry_scheduler,
- 'task': self.task_scheduler,
+ com.RETRY: self.retry_scheduler,
+ com.TASK: self.task_scheduler,
}
- execution_graph = self._compilation.execution_graph
- all_retry_atoms = []
- all_task_atoms = []
- for atom in self.analyzer.iterate_all_nodes():
- metadata = {}
- walker = sc.ScopeWalker(self.compilation, atom, names_only=True)
- if isinstance(atom, task.BaseTask):
- check_transition_handler = st.check_task_transition
- change_state_handler = change_state_handlers['task']
- scheduler = schedulers['task']
- all_task_atoms.append(atom)
+ check_transition_handlers = {
+ com.TASK: st.check_task_transition,
+ com.RETRY: st.check_retry_transition,
+ }
+ graph = self._compilation.execution_graph
+ for node, node_data in graph.nodes_iter(data=True):
+ node_kind = node_data['kind']
+ if node_kind == com.FLOW:
+ continue
+ elif node_kind in com.ATOMS:
+ check_transition_handler = check_transition_handlers[node_kind]
+ change_state_handler = change_state_handlers[node_kind]
+ scheduler = schedulers[node_kind]
else:
- check_transition_handler = st.check_retry_transition
- change_state_handler = change_state_handlers['retry']
- scheduler = schedulers['retry']
- all_retry_atoms.append(atom)
+ raise exc.CompilationFailure("Unknown node kind '%s'"
+ " encountered" % node_kind)
+ metadata = {}
+ walker = sc.ScopeWalker(self.compilation, node, names_only=True)
edge_deciders = {}
- for previous_atom in execution_graph.predecessors(atom):
+ for prev_node in graph.predecessors_iter(node):
# If there is any link function that says if this connection
# is able to run (or should not) ensure we retain it and use
# it later as needed.
- u_v_data = execution_graph.adj[previous_atom][atom]
- u_v_decider = u_v_data.get(flow.LINK_DECIDER)
+ u_v_data = graph.adj[prev_node][node]
+ u_v_decider = u_v_data.get(LINK_DECIDER)
if u_v_decider is not None:
- edge_deciders[previous_atom.name] = u_v_decider
+ edge_deciders[prev_node.name] = u_v_decider
metadata['scope_walker'] = walker
metadata['check_transition_handler'] = check_transition_handler
metadata['change_state_handler'] = change_state_handler
metadata['scheduler'] = scheduler
metadata['edge_deciders'] = edge_deciders
- self._atom_cache[atom.name] = metadata
- self._atoms_by_kind['retry'] = all_retry_atoms
- self._atoms_by_kind['task'] = all_task_atoms
+ self._atom_cache[node.name] = metadata
@property
def compilation(self):
@@ -162,15 +162,6 @@ class Runtime(object):
metadata = self._atom_cache[atom.name]
return metadata['edge_deciders']
- def fetch_atoms_by_kind(self, kind):
- """Fetches all the atoms of a given kind.
-
- NOTE(harlowja): Currently only ``task`` or ``retry`` are valid
- kinds of atoms (requesting other kinds will just
- return empty lists).
- """
- return self._atoms_by_kind.get(kind, [])
-
def fetch_scheduler(self, atom):
"""Fetches the cached specific scheduler for the given atom."""
# This does not check if the name exists (since this is only used
@@ -197,7 +188,7 @@ class Runtime(object):
# Various helper methods used by the runtime components; not for public
# consumption...
- def reset_nodes(self, atoms, state=st.PENDING, intention=st.EXECUTE):
+ def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE):
"""Resets all the provided atoms to the given state and intention."""
tweaked = []
for atom in atoms:
@@ -213,7 +204,7 @@ class Runtime(object):
def reset_all(self, state=st.PENDING, intention=st.EXECUTE):
"""Resets all atoms to the given state and intention."""
- return self.reset_nodes(self.analyzer.iterate_all_nodes(),
+ return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS),
state=state, intention=intention)
def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):
@@ -221,8 +212,9 @@ class Runtime(object):
The subgraph is contained of all of the atoms successors.
"""
- return self.reset_nodes(self.analyzer.iterate_subgraph(atom),
- state=state, intention=intention)
+ return self.reset_atoms(
+ self.analyzer.iterate_connected_atoms(atom),
+ state=state, intention=intention)
def retry_subflow(self, retry):
"""Prepares a retrys + its subgraph for execution.
diff --git a/taskflow/engines/action_engine/scopes.py b/taskflow/engines/action_engine/scopes.py
index 5fd7ee6..1d309d8 100644
--- a/taskflow/engines/action_engine/scopes.py
+++ b/taskflow/engines/action_engine/scopes.py
@@ -14,14 +14,14 @@
# License for the specific language governing permissions and limitations
# under the License.
-from taskflow import atom as atom_type
-from taskflow import flow as flow_type
+from taskflow.engines.action_engine import compiler as co
from taskflow import logging
LOG = logging.getLogger(__name__)
-def _extract_atoms_iter(node, idx=-1):
+def _depth_first_reverse_iterate(node, idx=-1):
+ """Iterates connected (in reverse) nodes in tree (from starting node)."""
# Always go left to right, since right to left is the pattern order
# and we want to go backwards and not forwards through that ordering...
if idx == -1:
@@ -29,15 +29,17 @@ def _extract_atoms_iter(node, idx=-1):
else:
children_iter = reversed(node[0:idx])
for child in children_iter:
- if isinstance(child.item, flow_type.Flow):
- for atom in _extract_atoms_iter(child):
+ child_kind = child.metadata['kind']
+ if child_kind == co.FLOW:
+ # Jump through these...
+ #
+ # TODO(harlowja): make this non-recursive and remove this
+ # style of doing this when
+ # https://review.openstack.org/#/c/205731/ merges...
+ for atom in _depth_first_reverse_iterate(child):
yield atom
- elif isinstance(child.item, atom_type.Atom):
- yield child.item
else:
- raise TypeError(
- "Unknown extraction item '%s' (%s)" % (child.item,
- type(child.item)))
+ yield child.item
class ScopeWalker(object):
@@ -57,13 +59,10 @@ class ScopeWalker(object):
" hierarchy" % atom)
self._level_cache = {}
self._atom = atom
- self._graph = compilation.execution_graph
+ self._execution_graph = compilation.execution_graph
self._names_only = names_only
self._predecessors = None
- #: Function that extracts the *associated* atoms of a given tree node.
- _extract_atoms_iter = staticmethod(_extract_atoms_iter)
-
def __iter__(self):
"""Iterates over the visible scopes.
@@ -99,10 +98,14 @@ class ScopeWalker(object):
nodes (aka we have reached the top of the tree) or we run out of
predecessors.
"""
+ graph = self._execution_graph
if self._predecessors is None:
- pred_iter = self._graph.bfs_predecessors_iter(self._atom)
- self._predecessors = set(pred_iter)
- predecessors = self._predecessors.copy()
+ predecessors = set(
+ node for node in graph.bfs_predecessors_iter(self._atom)
+ if graph.node[node]['kind'] in co.ATOMS)
+ self._predecessors = predecessors.copy()
+ else:
+ predecessors = self._predecessors.copy()
last = self._node
for lvl, parent in enumerate(self._node.path_iter(include_self=False)):
if not predecessors:
@@ -114,7 +117,7 @@ class ScopeWalker(object):
except KeyError:
visible = []
removals = set()
- for atom in self._extract_atoms_iter(parent, idx=last_idx):
+ for atom in _depth_first_reverse_iterate(parent, idx=last_idx):
if atom in predecessors:
predecessors.remove(atom)
removals.add(atom)
diff --git a/taskflow/formatters.py b/taskflow/formatters.py
index 33fb708..41b409c 100644
--- a/taskflow/formatters.py
+++ b/taskflow/formatters.py
@@ -16,6 +16,7 @@
import functools
+from taskflow.engines.action_engine import compiler
from taskflow import exceptions as exc
from taskflow import states
from taskflow.types import tree
@@ -45,7 +46,8 @@ def _fetch_predecessor_tree(graph, atom):
while stack:
parent, node = stack.pop()
for pred_node in graph.predecessors_iter(node):
- child = tree.Node(pred_node)
+ child = tree.Node(pred_node,
+ **graph.node[pred_node])
parent.add(child)
stack.append((child, pred_node))
seen.add(pred_node)
@@ -62,8 +64,13 @@ class FailureFormatter(object):
def __init__(self, engine, hide_inputs_outputs_of=()):
self._hide_inputs_outputs_of = hide_inputs_outputs_of
self._engine = engine
+ self._formatter_funcs = {
+ compiler.FLOW: self._format_flow,
+ }
+ for kind in compiler.ATOMS:
+ self._formatter_funcs[kind] = self._format_atom
- def _format_node(self, storage, cache, node):
+ def _format_atom(self, storage, cache, node):
"""Formats a single tree node (atom) into a string version."""
atom = node.item
atom_name = atom.name
@@ -101,6 +108,16 @@ class FailureFormatter(object):
else:
return "Atom '%s'" % (atom_name)
+ def _format_flow(self, storage, cache, node):
+ """Formats a single tree node (flow) into a string version."""
+ flow = node.item
+ return flow.name
+
+ def _format_node(self, storage, cache, node):
+ """Formats a single tree node into a string version."""
+ formatter_func = self. _formatter_funcs[node.metadata['kind']]
+ return formatter_func(storage, cache, node)
+
def format(self, fail, atom_matcher):
"""Returns a (exc_info, details) tuple about the failure.
diff --git a/taskflow/tests/unit/action_engine/test_builder.py b/taskflow/tests/unit/action_engine/test_builder.py
index b406744..08877f8 100644
--- a/taskflow/tests/unit/action_engine/test_builder.py
+++ b/taskflow/tests/unit/action_engine/test_builder.py
@@ -37,18 +37,19 @@ class BuildersTest(test.TestCase):
compilation = compiler.PatternCompiler(flow).compile()
flow_detail = pu.create_flow_detail(flow)
store = storage.Storage(flow_detail)
- # This ensures the tasks exist in storage...
- for task in compilation.execution_graph:
- store.ensure_atom(task)
+ nodes_iter = compilation.execution_graph.nodes_iter(data=True)
+ for node, node_attrs in nodes_iter:
+ if node_attrs['kind'] in ('task', 'retry'):
+ store.ensure_atom(node)
if initial_state:
store.set_flow_state(initial_state)
- task_notifier = notifier.Notifier()
+ atom_notifier = notifier.Notifier()
task_executor = executor.SerialTaskExecutor()
retry_executor = executor.SerialRetryExecutor()
task_executor.start()
self.addCleanup(task_executor.stop)
r = runtime.Runtime(compilation, store,
- task_notifier, task_executor,
+ atom_notifier, task_executor,
retry_executor)
r.compile()
return r
@@ -305,6 +306,6 @@ class BuildersTest(test.TestCase):
self.assertEqual(1, occurrences.get((builder.GAME_OVER, st.SUCCESS)))
self.assertEqual(1, occurrences.get((builder.UNDEFINED, st.RESUMING)))
- self.assertEqual(0, len(memory.next_nodes))
+ self.assertEqual(0, len(memory.next_up))
self.assertEqual(0, len(memory.not_done))
self.assertEqual(0, len(memory.failures))
diff --git a/taskflow/tests/unit/action_engine/test_compile.py b/taskflow/tests/unit/action_engine/test_compile.py
index 884cd8d..b676e0e 100644
--- a/taskflow/tests/unit/action_engine/test_compile.py
+++ b/taskflow/tests/unit/action_engine/test_compile.py
@@ -49,21 +49,22 @@ class PatternCompileTest(test.TestCase):
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow("test")
flo.add(a, b, c)
- sflo = lf.Flow("sub-test")
- sflo.add(d)
- flo.add(sflo)
+ inner_flo = lf.Flow("sub-test")
+ inner_flo.add(d)
+ flo.add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ self.assertEqual(6, len(g))
order = g.topological_sort()
- self.assertEqual([a, b, c, d], order)
- self.assertTrue(g.has_edge(c, d))
- self.assertEqual(g.get_edge_data(c, d), {'invariant': True})
+ self.assertEqual([flo, a, b, c, inner_flo, d], order)
+ self.assertTrue(g.has_edge(c, inner_flo))
+ self.assertTrue(g.has_edge(inner_flo, d))
+ self.assertEqual(g.get_edge_data(inner_flo, d), {'invariant': True})
self.assertEqual([d], list(g.no_successors_iter()))
- self.assertEqual([a], list(g.no_predecessors_iter()))
+ self.assertEqual([flo], list(g.no_predecessors_iter()))
def test_invalid(self):
a, b, c = test_utils.make_many(3)
@@ -79,36 +80,42 @@ class PatternCompileTest(test.TestCase):
flo.add(a, b, c, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
- self.assertEqual(0, g.number_of_edges())
+ self.assertEqual(5, len(g))
+ self.assertItemsEqual(g.edges(), [
+ (flo, a),
+ (flo, b),
+ (flo, c),
+ (flo, d),
+ ])
self.assertEqual(set([a, b, c, d]),
set(g.no_successors_iter()))
- self.assertEqual(set([a, b, c, d]),
+ self.assertEqual(set([flo]),
set(g.no_predecessors_iter()))
def test_linear_nested(self):
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow("test")
flo.add(a, b)
- flo2 = uf.Flow("test2")
- flo2.add(c, d)
- flo.add(flo2)
+ inner_flo = uf.Flow("test2")
+ inner_flo.add(c, d)
+ flo.add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ graph = compilation.execution_graph
+ self.assertEqual(6, len(graph))
- lb = g.subgraph([a, b])
+ lb = graph.subgraph([a, b])
self.assertFalse(lb.has_edge(b, a))
self.assertTrue(lb.has_edge(a, b))
- self.assertEqual(g.get_edge_data(a, b), {'invariant': True})
+ self.assertEqual(graph.get_edge_data(a, b), {'invariant': True})
- ub = g.subgraph([c, d])
+ ub = graph.subgraph([c, d])
self.assertEqual(0, ub.number_of_edges())
# This ensures that c and d do not start executing until after b.
- self.assertTrue(g.has_edge(b, c))
- self.assertTrue(g.has_edge(b, d))
+ self.assertTrue(graph.has_edge(b, inner_flo))
+ self.assertTrue(graph.has_edge(inner_flo, c))
+ self.assertTrue(graph.has_edge(inner_flo, d))
def test_unordered_nested(self):
a, b, c, d = test_utils.make_many(4)
@@ -120,34 +127,30 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
- for n in [a, b]:
- self.assertFalse(g.has_edge(n, c))
- self.assertFalse(g.has_edge(n, d))
- self.assertFalse(g.has_edge(d, c))
- self.assertTrue(g.has_edge(c, d))
- self.assertEqual(g.get_edge_data(c, d), {'invariant': True})
-
- ub = g.subgraph([a, b])
- self.assertEqual(0, ub.number_of_edges())
- lb = g.subgraph([c, d])
- self.assertEqual(1, lb.number_of_edges())
+ self.assertEqual(6, len(g))
+ self.assertItemsEqual(g.edges(), [
+ (flo, a),
+ (flo, b),
+ (flo, flo2),
+ (flo2, c),
+ (c, d)
+ ])
def test_unordered_nested_in_linear(self):
a, b, c, d = test_utils.make_many(4)
- flo = lf.Flow('lt').add(
- a,
- uf.Flow('ut').add(b, c),
- d)
+ inner_flo = uf.Flow('ut').add(b, c)
+ flo = lf.Flow('lt').add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(), [
- (a, b),
- (a, c),
+ (flo, a),
+ (a, inner_flo),
+ (inner_flo, b),
+ (inner_flo, c),
(b, d),
- (c, d)
+ (c, d),
])
def test_graph(self):
@@ -157,8 +160,8 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
- self.assertEqual(0, g.number_of_edges())
+ self.assertEqual(5, len(g))
+ self.assertEqual(4, g.number_of_edges())
def test_graph_nested(self):
a, b, c, d, e, f, g = test_utils.make_many(7)
@@ -171,10 +174,17 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
graph = compilation.execution_graph
- self.assertEqual(7, len(graph))
- self.assertItemsEqual(graph.edges(data=True), [
- (e, f, {'invariant': True}),
- (f, g, {'invariant': True})
+ self.assertEqual(9, len(graph))
+ self.assertItemsEqual(graph.edges(), [
+ (flo, a),
+ (flo, b),
+ (flo, c),
+ (flo, d),
+ (flo, flo2),
+
+ (flo2, e),
+ (e, f),
+ (f, g),
])
def test_graph_nested_graph(self):
@@ -187,9 +197,19 @@ class PatternCompileTest(test.TestCase):
flo.add(flo2)
compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(7, len(g))
- self.assertEqual(0, g.number_of_edges())
+ graph = compilation.execution_graph
+ self.assertEqual(9, len(graph))
+ self.assertItemsEqual(graph.edges(), [
+ (flo, a),
+ (flo, b),
+ (flo, c),
+ (flo, d),
+ (flo, flo2),
+
+ (flo2, e),
+ (flo2, f),
+ (flo2, g),
+ ])
def test_graph_links(self):
a, b, c, d = test_utils.make_many(4)
@@ -201,13 +221,15 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [
+ (flo, a, {'invariant': True}),
+
(a, b, {'manual': True}),
(b, c, {'manual': True}),
(c, d, {'manual': True}),
])
- self.assertItemsEqual([a], g.no_predecessors_iter())
+ self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([d], g.no_successors_iter())
def test_graph_dependencies(self):
@@ -217,96 +239,112 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(2, len(g))
+ self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
+ (flo, a, {'invariant': True}),
(a, b, {'reasons': set(['x'])})
])
- self.assertItemsEqual([a], g.no_predecessors_iter())
+ self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([b], g.no_successors_iter())
def test_graph_nested_requires(self):
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
c = test_utils.ProvidesRequiresTask('c', provides=[], requires=['x'])
- flo = gf.Flow("test").add(
- a,
- lf.Flow("test2").add(b, c)
- )
+ inner_flo = lf.Flow("test2").add(b, c)
+ flo = gf.Flow("test").add(a, inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(3, len(g))
- self.assertItemsEqual(g.edges(data=True), [
- (a, c, {'reasons': set(['x'])}),
- (b, c, {'invariant': True})
+ graph = compilation.execution_graph
+ self.assertEqual(5, len(graph))
+ self.assertItemsEqual(graph.edges(data=True), [
+ (flo, a, {'invariant': True}),
+ (inner_flo, b, {'invariant': True}),
+ (a, inner_flo, {'reasons': set(['x'])}),
+ (b, c, {'invariant': True}),
])
- self.assertItemsEqual([a, b], g.no_predecessors_iter())
- self.assertItemsEqual([c], g.no_successors_iter())
+ self.assertItemsEqual([flo], graph.no_predecessors_iter())
+ self.assertItemsEqual([c], graph.no_successors_iter())
def test_graph_nested_provides(self):
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=['x'])
b = test_utils.ProvidesRequiresTask('b', provides=['x'], requires=[])
c = test_utils.ProvidesRequiresTask('c', provides=[], requires=[])
- flo = gf.Flow("test").add(
- a,
- lf.Flow("test2").add(b, c)
- )
+ inner_flo = lf.Flow("test2").add(b, c)
+ flo = gf.Flow("test").add(a, inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(3, len(g))
- self.assertItemsEqual(g.edges(data=True), [
+ graph = compilation.execution_graph
+ self.assertEqual(5, len(graph))
+ self.assertItemsEqual(graph.edges(data=True), [
+ (flo, inner_flo, {'invariant': True}),
+
+ (inner_flo, b, {'invariant': True}),
(b, c, {'invariant': True}),
- (b, a, {'reasons': set(['x'])})
+ (c, a, {'reasons': set(['x'])}),
])
- self.assertItemsEqual([b], g.no_predecessors_iter())
- self.assertItemsEqual([a, c], g.no_successors_iter())
+ self.assertItemsEqual([flo], graph.no_predecessors_iter())
+ self.assertItemsEqual([a], graph.no_successors_iter())
def test_empty_flow_in_linear_flow(self):
- flow = lf.Flow('lf')
+ flo = lf.Flow('lf')
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
- empty_flow = gf.Flow("empty")
- flow.add(a, empty_flow, b)
+ empty_flo = gf.Flow("empty")
+ flo.add(a, empty_flo, b)
- compilation = compiler.PatternCompiler(flow).compile()
- g = compilation.execution_graph
- self.assertItemsEqual(g.edges(data=True), [
- (a, b, {'invariant': True}),
+ compilation = compiler.PatternCompiler(flo).compile()
+ graph = compilation.execution_graph
+ self.assertItemsEqual(graph.edges(), [
+ (flo, a),
+ (a, empty_flo),
+ (empty_flo, b),
])
def test_many_empty_in_graph_flow(self):
- flow = gf.Flow('root')
+ flo = gf.Flow('root')
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
- flow.add(a)
+ flo.add(a)
b = lf.Flow('b')
b_0 = test_utils.ProvidesRequiresTask('b.0', provides=[], requires=[])
+ b_1 = lf.Flow('b.1')
+ b_2 = lf.Flow('b.2')
b_3 = test_utils.ProvidesRequiresTask('b.3', provides=[], requires=[])
- b.add(
- b_0,
- lf.Flow('b.1'), lf.Flow('b.2'),
- b_3,
- )
- flow.add(b)
+ b.add(b_0, b_1, b_2, b_3)
+ flo.add(b)
c = lf.Flow('c')
- c.add(lf.Flow('c.0'), lf.Flow('c.1'), lf.Flow('c.2'))
- flow.add(c)
+ c_0 = lf.Flow('c.0')
+ c_1 = lf.Flow('c.1')
+ c_2 = lf.Flow('c.2')
+ c.add(c_0, c_1, c_2)
+ flo.add(c)
d = test_utils.ProvidesRequiresTask('d', provides=[], requires=[])
- flow.add(d)
+ flo.add(d)
- flow.link(b, d)
- flow.link(a, d)
- flow.link(c, d)
+ flo.link(b, d)
+ flo.link(a, d)
+ flo.link(c, d)
- compilation = compiler.PatternCompiler(flow).compile()
- g = compilation.execution_graph
- self.assertTrue(g.has_edge(b_0, b_3))
- self.assertTrue(g.has_edge(b_3, d))
- self.assertEqual(4, len(g))
+ compilation = compiler.PatternCompiler(flo).compile()
+ graph = compilation.execution_graph
+
+ self.assertTrue(graph.has_edge(flo, a))
+
+ self.assertTrue(graph.has_edge(flo, b))
+ self.assertTrue(graph.has_edge(b_0, b_1))
+ self.assertTrue(graph.has_edge(b_1, b_2))
+ self.assertTrue(graph.has_edge(b_2, b_3))
+
+ self.assertTrue(graph.has_edge(flo, c))
+ self.assertTrue(graph.has_edge(c_0, c_1))
+ self.assertTrue(graph.has_edge(c_1, c_2))
+
+ self.assertTrue(graph.has_edge(b_3, d))
+ self.assertEqual(12, len(graph))
def test_empty_flow_in_nested_flow(self):
flow = lf.Flow('lf')
@@ -323,9 +361,10 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
- self.assertTrue(g.has_edge(a, c))
- self.assertTrue(g.has_edge(c, d))
- self.assertTrue(g.has_edge(d, b))
+ for source, target in [(flow, a), (a, flow2),
+ (flow2, c), (c, empty_flow),
+ (empty_flow, d), (d, b)]:
+ self.assertTrue(g.has_edge(source, target))
def test_empty_flow_in_graph_flow(self):
flow = lf.Flow('lf')
@@ -336,19 +375,9 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
- self.assertTrue(g.has_edge(a, b))
-
- def test_empty_flow_in_graph_flow_empty_linkage(self):
- flow = gf.Flow('lf')
- a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
- b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
- empty_flow = lf.Flow("empty")
- flow.add(a, empty_flow, b)
- flow.link(empty_flow, b)
-
- compilation = compiler.PatternCompiler(flow).compile()
- g = compilation.execution_graph
- self.assertEqual(0, len(g.edges()))
+ self.assertTrue(g.has_edge(flow, a))
+ self.assertTrue(g.has_edge(a, empty_flow))
+ self.assertTrue(g.has_edge(empty_flow, b))
def test_empty_flow_in_graph_flow_linkage(self):
flow = gf.Flow('lf')
@@ -360,8 +389,9 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
- self.assertEqual(1, len(g.edges()))
self.assertTrue(g.has_edge(a, b))
+ self.assertTrue(g.has_edge(flow, a))
+ self.assertTrue(g.has_edge(flow, empty_flow))
def test_checks_for_dups(self):
flo = gf.Flow("test").add(
@@ -384,36 +414,39 @@ class PatternCompileTest(test.TestCase):
flo = lf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(1, len(g))
- self.assertEqual(0, g.number_of_edges())
+ self.assertEqual(2, len(g))
+ self.assertEqual(1, g.number_of_edges())
def test_retry_in_unordered_flow(self):
flo = uf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(1, len(g))
- self.assertEqual(0, g.number_of_edges())
+ self.assertEqual(2, len(g))
+ self.assertEqual(1, g.number_of_edges())
def test_retry_in_graph_flow(self):
flo = gf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(1, len(g))
- self.assertEqual(0, g.number_of_edges())
+ self.assertEqual(2, len(g))
+ self.assertEqual(1, g.number_of_edges())
def test_retry_in_nested_flows(self):
c1 = retry.AlwaysRevert("c1")
c2 = retry.AlwaysRevert("c2")
- flo = lf.Flow("test", c1).add(lf.Flow("test2", c2))
+ inner_flo = lf.Flow("test2", c2)
+ flo = lf.Flow("test", c1).add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(2, len(g))
+ self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (c1, c2, {'retry': True})
+ (flo, c1, {'invariant': True}),
+ (c1, inner_flo, {'invariant': True, 'retry': True}),
+ (inner_flo, c2, {'invariant': True}),
])
self.assertIs(c1, g.node[c2]['retry'])
- self.assertItemsEqual([c1], g.no_predecessors_iter())
+ self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([c2], g.no_successors_iter())
def test_retry_in_linear_flow_with_tasks(self):
@@ -423,13 +456,14 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(3, len(g))
+ self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
+ (flo, c, {'invariant': True}),
(a, b, {'invariant': True}),
- (c, a, {'retry': True})
+ (c, a, {'invariant': True, 'retry': True})
])
- self.assertItemsEqual([c], g.no_predecessors_iter())
+ self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([b], g.no_successors_iter())
self.assertIs(c, g.node[a]['retry'])
self.assertIs(c, g.node[b]['retry'])
@@ -441,13 +475,14 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(3, len(g))
+ self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (c, a, {'retry': True}),
- (c, b, {'retry': True})
+ (flo, c, {'invariant': True}),
+ (c, a, {'invariant': True, 'retry': True}),
+ (c, b, {'invariant': True, 'retry': True}),
])
- self.assertItemsEqual([c], g.no_predecessors_iter())
+ self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([a, b], g.no_successors_iter())
self.assertIs(c, g.node[a]['retry'])
self.assertIs(c, g.node[b]['retry'])
@@ -458,15 +493,16 @@ class PatternCompileTest(test.TestCase):
flo = gf.Flow("test", r).add(a, b, c).link(b, c)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (r, a, {'retry': True}),
- (r, b, {'retry': True}),
+ (flo, r, {'invariant': True}),
+ (r, a, {'invariant': True, 'retry': True}),
+ (r, b, {'invariant': True, 'retry': True}),
(b, c, {'manual': True})
])
- self.assertItemsEqual([r], g.no_predecessors_iter())
+ self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([a, c], g.no_successors_iter())
self.assertIs(r, g.node[a]['retry'])
self.assertIs(r, g.node[b]['retry'])
@@ -476,18 +512,18 @@ class PatternCompileTest(test.TestCase):
c1 = retry.AlwaysRevert("cp1")
c2 = retry.AlwaysRevert("cp2")
a, b, c, d = test_utils.make_many(4)
- flo = lf.Flow("test", c1).add(
- a,
- lf.Flow("test", c2).add(b, c),
- d)
+ inner_flo = lf.Flow("test", c2).add(b, c)
+ flo = lf.Flow("test", c1).add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(6, len(g))
+ self.assertEqual(8, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (c1, a, {'retry': True}),
- (a, c2, {'invariant': True}),
- (c2, b, {'retry': True}),
+ (flo, c1, {'invariant': True}),
+ (c1, a, {'invariant': True, 'retry': True}),
+ (a, inner_flo, {'invariant': True}),
+ (inner_flo, c2, {'invariant': True}),
+ (c2, b, {'invariant': True, 'retry': True}),
(b, c, {'invariant': True}),
(c, d, {'invariant': True}),
])
@@ -501,17 +537,17 @@ class PatternCompileTest(test.TestCase):
def test_retry_subflows_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
a, b, c, d = test_utils.make_many(4)
- flo = lf.Flow("test", c1).add(
- a,
- lf.Flow("test").add(b, c),
- d)
+ inner_flo = lf.Flow("test").add(b, c)
+ flo = lf.Flow("test", c1).add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(5, len(g))
+ self.assertEqual(7, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (c1, a, {'retry': True}),
- (a, b, {'invariant': True}),
+ (flo, c1, {'invariant': True}),
+ (c1, a, {'invariant': True, 'retry': True}),
+ (a, inner_flo, {'invariant': True}),
+ (inner_flo, b, {'invariant': True}),
(b, c, {'invariant': True}),
(c, d, {'invariant': True}),
])
diff --git a/taskflow/types/graph.py b/taskflow/types/graph.py
index 7462c9b..349dc09 100644
--- a/taskflow/types/graph.py
+++ b/taskflow/types/graph.py
@@ -28,8 +28,11 @@ def _common_format(g, edge_notation):
lines.append("Frozen: %s" % nx.is_frozen(g))
lines.append("Density: %0.3f" % nx.density(g))
lines.append("Nodes: %s" % g.number_of_nodes())
- for n in g.nodes_iter():
- lines.append(" - %s" % n)
+ for n, n_data in g.nodes_iter(data=True):
+ if n_data:
+ lines.append(" - %s (%s)" % (n, n_data))
+ else:
+ lines.append(" - %s" % n)
lines.append("Edges: %s" % g.number_of_edges())
for (u, v, e_data) in g.edges_iter(data=True):
if e_data:
diff --git a/taskflow/utils/iter_utils.py b/taskflow/utils/iter_utils.py
index 68810e8..a96e9cf 100644
--- a/taskflow/utils/iter_utils.py
+++ b/taskflow/utils/iter_utils.py
@@ -16,12 +16,25 @@
# License for the specific language governing permissions and limitations
# under the License.
+import itertools
+
def count(it):
"""Returns how many values in the iterator (depletes the iterator)."""
return sum(1 for _value in it)
+def unique_seen(it, *its):
+ """Yields unique values from iterator(s) (and retains order)."""
+ seen = set()
+ for value in itertools.chain(it, *its):
+ if value in seen:
+ continue
+ else:
+ yield value
+ seen.add(value)
+
+
def find_first_match(it, matcher, not_found_value=None):
"""Searches iterator for first value that matcher callback returns true."""
for value in it: