summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <jxharlow@godaddy.com>2016-06-10 17:35:01 -0700
committerChangBo Guo(gcb) <glongwave@gmail.com>2017-01-03 06:37:57 +0000
commit774d59813417a945fdbe50316786effd1203e8f7 (patch)
treefcf0225f05135e955cf1ccaf397b347fc41f26cf
parentdc15edc8423ea81120f0007938a66b46f2ab9b0f (diff)
downloadtaskflow-774d59813417a945fdbe50316786effd1203e8f7.tar.gz
Rename engine analyzer to be named selector
This moves out the engine next to run (or revert) selection process to a single class that just does this and moves out the common functions the analyzer class provided to the runtime object (which all components can access). This makes it easier to adjust the selection algorithm in different ways. Change-Id: I091c69297a7bff60729791d3ca6c3fae14d6eea5
-rw-r--r--doc/source/engines.rst6
-rw-r--r--taskflow/engines/action_engine/builder.py6
-rw-r--r--taskflow/engines/action_engine/completer.py10
-rw-r--r--taskflow/engines/action_engine/engine.py6
-rw-r--r--taskflow/engines/action_engine/runtime.py50
-rw-r--r--taskflow/engines/action_engine/selector.py (renamed from taskflow/engines/action_engine/analyzer.py)62
6 files changed, 70 insertions, 70 deletions
diff --git a/doc/source/engines.rst b/doc/source/engines.rst
index bfabe06..aef0c0d 100644
--- a/doc/source/engines.rst
+++ b/doc/source/engines.rst
@@ -284,7 +284,7 @@ analyzing the current state of the task; which is determined by looking at the
state in the task detail object for that task and analyzing edges of the graph
for things like retry atom which can influence what a tasks intention should be
(this is aided by the usage of the
-:py:class:`~taskflow.engines.action_engine.analyzer.Analyzer` helper
+:py:class:`~taskflow.engines.action_engine.selector.Selector` helper
object which was designed to provide helper methods for this analysis). Once
these intentions are determined and associated with each task (the intention is
also stored in the :py:class:`~taskflow.persistence.models.AtomDetail` object)
@@ -299,7 +299,7 @@ This stage selects which atoms are eligible to run by using a
:py:class:`~taskflow.engines.action_engine.scheduler.Scheduler` implementation
(the default implementation looks at their intention, checking if predecessor
atoms have ran and so-on, using a
-:py:class:`~taskflow.engines.action_engine.analyzer.Analyzer` helper
+:py:class:`~taskflow.engines.action_engine.selector.Selector` helper
object as needed) and submits those atoms to a previously provided compatible
`executor`_ for asynchronous execution. This
:py:class:`~taskflow.engines.action_engine.scheduler.Scheduler` will return a
@@ -444,7 +444,6 @@ Components
other locations **without** notice (and without the typical deprecation
cycle).
-.. automodule:: taskflow.engines.action_engine.analyzer
.. automodule:: taskflow.engines.action_engine.builder
.. automodule:: taskflow.engines.action_engine.compiler
.. automodule:: taskflow.engines.action_engine.completer
@@ -453,6 +452,7 @@ Components
.. automodule:: taskflow.engines.action_engine.process_executor
.. automodule:: taskflow.engines.action_engine.runtime
.. automodule:: taskflow.engines.action_engine.scheduler
+.. automodule:: taskflow.engines.action_engine.selector
.. autoclass:: taskflow.engines.action_engine.scopes.ScopeWalker
:special-members: __iter__
.. automodule:: taskflow.engines.action_engine.traversal
diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py
index a5746af..0666307 100644
--- a/taskflow/engines/action_engine/builder.py
+++ b/taskflow/engines/action_engine/builder.py
@@ -112,7 +112,7 @@ class MachineBuilder(object):
def __init__(self, runtime, waiter):
self._runtime = weakref.proxy(runtime)
- self._analyzer = runtime.analyzer
+ self._selector = runtime.selector
self._completer = runtime.completer
self._scheduler = runtime.scheduler
self._storage = runtime.storage
@@ -150,7 +150,7 @@ class MachineBuilder(object):
def iter_next_atoms(atom=None, apply_deciders=True):
# Yields and filters and tweaks the next atoms to run...
- maybe_atoms_it = self._analyzer.iter_next_atoms(atom=atom)
+ maybe_atoms_it = self._selector.iter_next_atoms(atom=atom)
for atom, late_decider in maybe_atoms_it:
if apply_deciders:
proceed = late_decider.check_and_affect(self._runtime)
@@ -188,7 +188,7 @@ class MachineBuilder(object):
" since (at least) %s atoms have been left in an"
" unfinished state", leftover_atoms)
return SUSPENDED
- elif self._analyzer.is_success():
+ elif self._runtime.is_success():
return SUCCESS
else:
return REVERTED
diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py
index 3d37664..59a2dbf 100644
--- a/taskflow/engines/action_engine/completer.py
+++ b/taskflow/engines/action_engine/completer.py
@@ -76,11 +76,10 @@ class RevertAll(Strategy):
def __init__(self, runtime):
super(RevertAll, self).__init__(runtime)
- self._analyzer = runtime.analyzer
def apply(self):
return self._runtime.reset_atoms(
- self._analyzer.iterate_nodes(co.ATOMS),
+ self._runtime.iterate_nodes(co.ATOMS),
state=None, intention=st.REVERT)
@@ -106,7 +105,6 @@ class Completer(object):
def __init__(self, runtime):
self._runtime = weakref.proxy(runtime)
- self._analyzer = runtime.analyzer
self._storage = runtime.storage
self._undefined_resolver = RevertAll(self._runtime)
self._defer_reverts = strutils.bool_from_string(
@@ -125,7 +123,7 @@ class Completer(object):
atoms that were previously not finished (due to a RUNNING or REVERTING
attempt not previously finishing).
"""
- atoms = list(self._analyzer.iterate_nodes(co.ATOMS))
+ atoms = list(self._runtime.iterate_nodes(co.ATOMS))
atom_states = self._storage.get_atoms_states(atom.name
for atom in atoms)
if self._resolve:
@@ -134,7 +132,7 @@ class Completer(object):
if atom_state == st.FAILURE:
self._process_atom_failure(
atom, self._storage.get(atom.name))
- for retry in self._analyzer.iterate_retries(st.RETRYING):
+ for retry in self._runtime.iterate_retries(st.RETRYING):
retry_affected_atoms_it = self._runtime.retry_subflow(retry)
for atom, state, intention in retry_affected_atoms_it:
if state:
@@ -173,7 +171,7 @@ class Completer(object):
def _determine_resolution(self, atom, failure):
"""Determines which resolution strategy to activate/apply."""
- retry = self._analyzer.find_retry(atom)
+ retry = self._runtime.find_retry(atom)
if retry is not None:
# Ask retry controller what to do in case of failure.
handler = self._runtime.fetch_action(retry)
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index 8b0b99f..a1d1fc0 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -375,8 +375,8 @@ class ActionEngine(base.Engine):
def _ensure_storage(self):
"""Ensure all contained atoms exist in the storage unit."""
self.storage.ensure_atoms(
- self._runtime.analyzer.iterate_nodes(compiler.ATOMS))
- for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
+ self._runtime.iterate_nodes(compiler.ATOMS))
+ for atom in self._runtime.iterate_nodes(compiler.ATOMS):
if atom.inject:
self.storage.inject_atom_args(atom.name, atom.inject,
transient=self._inject_transient)
@@ -402,7 +402,7 @@ class ActionEngine(base.Engine):
last_cause = None
last_node = None
missing_nodes = 0
- for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
+ for atom in self._runtime.iterate_nodes(compiler.ATOMS):
exec_missing = self.storage.fetch_unsatisfied_args(
atom.name, atom.rebind, optional_args=atom.optional)
revert_missing = self.storage.fetch_unsatisfied_args(
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py
index 3d5a207..32ce052 100644
--- a/taskflow/engines/action_engine/runtime.py
+++ b/taskflow/engines/action_engine/runtime.py
@@ -22,12 +22,12 @@ from futurist import waiters
from taskflow import deciders as de
from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
-from taskflow.engines.action_engine import analyzer as an
from taskflow.engines.action_engine import builder as bu
from taskflow.engines.action_engine import compiler as com
from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
+from taskflow.engines.action_engine import selector as se
from taskflow.engines.action_engine import traversal as tr
from taskflow import exceptions as exc
from taskflow import logging
@@ -163,8 +163,8 @@ class Runtime(object):
return self._options
@misc.cachedproperty
- def analyzer(self):
- return an.Analyzer(self)
+ def selector(self):
+ return se.Selector(self)
@misc.cachedproperty
def builder(self):
@@ -245,6 +245,48 @@ class Runtime(object):
# Various helper methods used by the runtime components; not for public
# consumption...
+ def iterate_retries(self, state=None):
+ """Iterates retry atoms that match the provided state.
+
+ If no state is provided it will yield back all retry atoms.
+ """
+ if state:
+ atoms = list(self.iterate_nodes((com.RETRY,)))
+ atom_states = self._storage.get_atoms_states(atom.name
+ for atom in atoms)
+ for atom in atoms:
+ atom_state, _atom_intention = atom_states[atom.name]
+ if atom_state == state:
+ yield atom
+ else:
+ for atom in self.iterate_nodes((com.RETRY,)):
+ yield atom
+
+ def iterate_nodes(self, allowed_kinds):
+ """Yields back all nodes of specified kinds in the execution graph."""
+ graph = self._compilation.execution_graph
+ for node, node_data in graph.nodes_iter(data=True):
+ if node_data['kind'] in allowed_kinds:
+ yield node
+
+ def is_success(self):
+ """Checks if all atoms in the execution graph are in 'happy' state."""
+ atoms = list(self.iterate_nodes(com.ATOMS))
+ atom_states = self._storage.get_atoms_states(atom.name
+ for atom in atoms)
+ for atom in atoms:
+ atom_state, _atom_intention = atom_states[atom.name]
+ if atom_state == st.IGNORE:
+ continue
+ if atom_state != st.SUCCESS:
+ return False
+ return True
+
+ def find_retry(self, node):
+ """Returns the retry atom associated to the given node (or none)."""
+ graph = self._compilation.execution_graph
+ return graph.node[node].get(com.RETRY)
+
def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE):
"""Resets all the provided atoms to the given state and intention."""
tweaked = []
@@ -261,7 +303,7 @@ class Runtime(object):
def reset_all(self, state=st.PENDING, intention=st.EXECUTE):
"""Resets all atoms to the given state and intention."""
- return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS),
+ return self.reset_atoms(self.iterate_nodes(com.ATOMS),
state=state, intention=intention)
def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):
diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/selector.py
index 3504f08..162e368 100644
--- a/taskflow/engines/action_engine/analyzer.py
+++ b/taskflow/engines/action_engine/selector.py
@@ -27,8 +27,8 @@ from taskflow.utils import iter_utils
LOG = logging.getLogger(__name__)
-class Analyzer(object):
- """Analyzes a compilation and aids in execution processes.
+class Selector(object):
+ """Selector that uses a compilation and aids in execution processes.
Its primary purpose is to get the next atoms for execution or reversion
by utilizing the compilations underlying structures (graphs, nodes and
@@ -45,8 +45,8 @@ class Analyzer(object):
def iter_next_atoms(self, atom=None):
"""Iterate next atoms to run (originating from atom or all atoms)."""
if atom is None:
- return iter_utils.unique_seen((self.browse_atoms_for_execute(),
- self.browse_atoms_for_revert()),
+ return iter_utils.unique_seen((self._browse_atoms_for_execute(),
+ self._browse_atoms_for_revert()),
seen_selector=operator.itemgetter(0))
state = self._storage.get_atom_state(atom.name)
intention = self._storage.get_atom_intention(atom.name)
@@ -56,17 +56,17 @@ class Analyzer(object):
(atom, deciders.NoOpDecider()),
])
elif intention == st.EXECUTE:
- return self.browse_atoms_for_execute(atom=atom)
+ return self._browse_atoms_for_execute(atom=atom)
else:
return iter([])
elif state == st.REVERTED:
- return self.browse_atoms_for_revert(atom=atom)
+ return self._browse_atoms_for_revert(atom=atom)
elif state == st.FAILURE:
- return self.browse_atoms_for_revert()
+ return self._browse_atoms_for_revert()
else:
return iter([])
- def browse_atoms_for_execute(self, atom=None):
+ def _browse_atoms_for_execute(self, atom=None):
"""Browse next atoms to execute.
This returns a iterator of atoms that *may* be ready to be
@@ -74,7 +74,7 @@ class Analyzer(object):
of that atom, otherwise it will examine the whole graph.
"""
if atom is None:
- atom_it = self.iterate_nodes(co.ATOMS)
+ atom_it = self._runtime.iterate_nodes(co.ATOMS)
else:
# NOTE(harlowja): the reason this uses breadth first is so that
# when deciders are applied that those deciders can be applied
@@ -90,7 +90,7 @@ class Analyzer(object):
if is_ready:
yield (atom, late_decider)
- def browse_atoms_for_revert(self, atom=None):
+ def _browse_atoms_for_revert(self, atom=None):
"""Browse next atoms to revert.
This returns a iterator of atoms that *may* be ready to be be
@@ -99,7 +99,7 @@ class Analyzer(object):
graph.
"""
if atom is None:
- atom_it = self.iterate_nodes(co.ATOMS)
+ atom_it = self._runtime.iterate_nodes(co.ATOMS)
else:
atom_it = traversal.breadth_first_iterate(
self._execution_graph, atom, traversal.Direction.BACKWARD,
@@ -226,43 +226,3 @@ class Analyzer(object):
return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY],
connected_fetcher, ready_checker,
decider_fetcher, for_what='revert')
-
- def iterate_retries(self, state=None):
- """Iterates retry atoms that match the provided state.
-
- If no state is provided it will yield back all retry atoms.
- """
- if state:
- atoms = list(self.iterate_nodes((co.RETRY,)))
- atom_states = self._storage.get_atoms_states(atom.name
- for atom in atoms)
- for atom in atoms:
- atom_state, _atom_intention = atom_states[atom.name]
- if atom_state == state:
- yield atom
- else:
- for atom in self.iterate_nodes((co.RETRY,)):
- yield atom
-
- def iterate_nodes(self, allowed_kinds):
- """Yields back all nodes of specified kinds in the execution graph."""
- for node, node_data in self._execution_graph.nodes_iter(data=True):
- if node_data['kind'] in allowed_kinds:
- yield node
-
- def find_retry(self, node):
- """Returns the retry atom associated to the given node (or none)."""
- return self._execution_graph.node[node].get(co.RETRY)
-
- def is_success(self):
- """Checks if all atoms in the execution graph are in 'happy' state."""
- atoms = list(self.iterate_nodes(co.ATOMS))
- atom_states = self._storage.get_atoms_states(atom.name
- for atom in atoms)
- for atom in atoms:
- atom_state, _atom_intention = atom_states[atom.name]
- if atom_state == st.IGNORE:
- continue
- if atom_state != st.SUCCESS:
- return False
- return True