summaryrefslogtreecommitdiff
path: root/taskflow/tests
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-11-16 16:27:42 -0800
committerJoshua Harlow <harlowja@gmail.com>2016-01-09 22:42:17 -0800
commit8e8156c488dea8ae876b112c30e41e60da4f5be7 (patch)
treea4c119dda7d2338bd8c63aad593bbb42d24f3e1f /taskflow/tests
parentf555a35f3081ba492db15d7bda11fbe50f2a8349 (diff)
downloadtaskflow-8e8156c488dea8ae876b112c30e41e60da4f5be7.tar.gz
Allow for alterations in decider 'area of influence'
Christmas came early. Closes-Bug: #1479466 Change-Id: I931d826690c925f022dbfffe9afb7bf41345b1d0
Diffstat (limited to 'taskflow/tests')
-rw-r--r--taskflow/tests/unit/action_engine/test_compile.py529
-rw-r--r--taskflow/tests/unit/patterns/test_graph_flow.py10
-rw-r--r--taskflow/tests/unit/test_deciders.py58
-rw-r--r--taskflow/tests/unit/test_engines.py126
-rw-r--r--taskflow/tests/unit/test_utils.py18
5 files changed, 478 insertions, 263 deletions
diff --git a/taskflow/tests/unit/action_engine/test_compile.py b/taskflow/tests/unit/action_engine/test_compile.py
index 6ccf358..757bde7 100644
--- a/taskflow/tests/unit/action_engine/test_compile.py
+++ b/taskflow/tests/unit/action_engine/test_compile.py
@@ -25,12 +25,27 @@ from taskflow import test
from taskflow.tests import utils as test_utils
+def _replicate_graph_with_names(compilation):
+ # Turn a graph of nodes into a graph of names only so that
+ # testing can use those names instead of having to use the exact
+ # node objects themselves (which is problematic for any end nodes that
+ # are added into the graph *dynamically*, and are not there in the
+ # original/source flow).
+ g = compilation.execution_graph
+ n_g = g.__class__(name=g.name)
+ for node, node_data in g.nodes_iter(data=True):
+ n_g.add_node(node.name, attr_dict=node_data)
+ for u, v, u_v_data in g.edges_iter(data=True):
+ n_g.add_edge(u.name, v.name, attr_dict=u_v_data)
+ return n_g
+
+
class PatternCompileTest(test.TestCase):
def test_task(self):
task = test_utils.DummyTask(name='a')
- compilation = compiler.PatternCompiler(task).compile()
- g = compilation.execution_graph
- self.assertEqual([task], list(g.nodes()))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(task).compile())
+ self.assertEqual(['a'], list(g.nodes()))
self.assertEqual([], list(g.edges()))
def test_retry(self):
@@ -54,19 +69,20 @@ class PatternCompileTest(test.TestCase):
inner_flo.add(d)
flo.add(inner_flo)
- compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(6, len(g))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(8, len(g))
order = g.topological_sort()
- self.assertEqual([flo, a, b, c, inner_flo, d], order)
- self.assertTrue(g.has_edge(c, inner_flo))
- self.assertTrue(g.has_edge(inner_flo, d))
+ self.assertEqual(['test', 'a', 'b', 'c',
+ "sub-test", 'd', "sub-test[$]",
+ 'test[$]'], order)
+ self.assertTrue(g.has_edge('c', "sub-test"))
+ self.assertTrue(g.has_edge("sub-test", 'd'))
self.assertEqual({'invariant': True},
- g.get_edge_data(inner_flo, d))
-
- self.assertEqual([d], list(g.no_successors_iter()))
- self.assertEqual([flo], list(g.no_predecessors_iter()))
+ g.get_edge_data("sub-test", 'd'))
+ self.assertEqual(['test[$]'], list(g.no_successors_iter()))
+ self.assertEqual(['test'], list(g.no_predecessors_iter()))
def test_invalid(self):
a, b, c = test_utils.make_many(3)
@@ -80,19 +96,21 @@ class PatternCompileTest(test.TestCase):
a, b, c, d = test_utils.make_many(4)
flo = uf.Flow("test")
flo.add(a, b, c, d)
- compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(5, len(g))
+
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(), [
- (flo, a),
- (flo, b),
- (flo, c),
- (flo, d),
+ ('test', 'a'),
+ ('test', 'b'),
+ ('test', 'c'),
+ ('test', 'd'),
+ ('a', 'test[$]'),
+ ('b', 'test[$]'),
+ ('c', 'test[$]'),
+ ('d', 'test[$]'),
])
- self.assertEqual(set([a, b, c, d]),
- set(g.no_successors_iter()))
- self.assertEqual(set([flo]),
- set(g.no_predecessors_iter()))
+ self.assertEqual(set(['test']), set(g.no_predecessors_iter()))
def test_linear_nested(self):
a, b, c, d = test_utils.make_many(4)
@@ -102,22 +120,22 @@ class PatternCompileTest(test.TestCase):
inner_flo.add(c, d)
flo.add(inner_flo)
- compilation = compiler.PatternCompiler(flo).compile()
- graph = compilation.execution_graph
- self.assertEqual(6, len(graph))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(8, len(g))
- lb = graph.subgraph([a, b])
- self.assertFalse(lb.has_edge(b, a))
- self.assertTrue(lb.has_edge(a, b))
- self.assertEqual({'invariant': True}, graph.get_edge_data(a, b))
+ sub_g = g.subgraph(['a', 'b'])
+ self.assertFalse(sub_g.has_edge('b', 'a'))
+ self.assertTrue(sub_g.has_edge('a', 'b'))
+ self.assertEqual({'invariant': True}, sub_g.get_edge_data("a", "b"))
- ub = graph.subgraph([c, d])
- self.assertEqual(0, ub.number_of_edges())
+ sub_g = g.subgraph(['c', 'd'])
+ self.assertEqual(0, sub_g.number_of_edges())
# This ensures that c and d do not start executing until after b.
- self.assertTrue(graph.has_edge(b, inner_flo))
- self.assertTrue(graph.has_edge(inner_flo, c))
- self.assertTrue(graph.has_edge(inner_flo, d))
+ self.assertTrue(g.has_edge('b', 'test2'))
+ self.assertTrue(g.has_edge('test2', 'c'))
+ self.assertTrue(g.has_edge('test2', 'd'))
def test_unordered_nested(self):
a, b, c, d = test_utils.make_many(4)
@@ -127,15 +145,19 @@ class PatternCompileTest(test.TestCase):
flo2.add(c, d)
flo.add(flo2)
- compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(6, len(g))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(8, len(g))
self.assertItemsEqual(g.edges(), [
- (flo, a),
- (flo, b),
- (flo, flo2),
- (flo2, c),
- (c, d)
+ ('test', 'a'),
+ ('test', 'b'),
+ ('test', 'test2'),
+ ('test2', 'c'),
+ ('c', 'd'),
+ ('d', 'test2[$]'),
+ ('test2[$]', 'test[$]'),
+ ('a', 'test[$]'),
+ ('b', 'test[$]'),
])
def test_unordered_nested_in_linear(self):
@@ -143,27 +165,27 @@ class PatternCompileTest(test.TestCase):
inner_flo = uf.Flow('ut').add(b, c)
flo = lf.Flow('lt').add(a, inner_flo, d)
- compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(6, len(g))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(8, len(g))
self.assertItemsEqual(g.edges(), [
- (flo, a),
- (a, inner_flo),
- (inner_flo, b),
- (inner_flo, c),
- (b, d),
- (c, d),
+ ('lt', 'a'),
+ ('a', 'ut'),
+ ('ut', 'b'),
+ ('ut', 'c'),
+ ('b', 'ut[$]'),
+ ('c', 'ut[$]'),
+ ('ut[$]', 'd'),
+ ('d', 'lt[$]'),
])
def test_graph(self):
a, b, c, d = test_utils.make_many(4)
flo = gf.Flow("test")
flo.add(a, b, c, d)
-
compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(5, len(g))
- self.assertEqual(4, g.number_of_edges())
+ self.assertEqual(6, len(compilation.execution_graph))
+ self.assertEqual(8, compilation.execution_graph.number_of_edges())
def test_graph_nested(self):
a, b, c, d, e, f, g = test_utils.make_many(7)
@@ -174,19 +196,26 @@ class PatternCompileTest(test.TestCase):
flo2.add(e, f, g)
flo.add(flo2)
- compilation = compiler.PatternCompiler(flo).compile()
- graph = compilation.execution_graph
- self.assertEqual(9, len(graph))
- self.assertItemsEqual(graph.edges(), [
- (flo, a),
- (flo, b),
- (flo, c),
- (flo, d),
- (flo, flo2),
-
- (flo2, e),
- (e, f),
- (f, g),
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(11, len(g))
+ self.assertItemsEqual(g.edges(), [
+ ('test', 'a'),
+ ('test', 'b'),
+ ('test', 'c'),
+ ('test', 'd'),
+ ('a', 'test[$]'),
+ ('b', 'test[$]'),
+ ('c', 'test[$]'),
+ ('d', 'test[$]'),
+
+ ('test', 'test2'),
+ ('test2', 'e'),
+ ('e', 'f'),
+ ('f', 'g'),
+
+ ('g', 'test2[$]'),
+ ('test2[$]', 'test[$]'),
])
def test_graph_nested_graph(self):
@@ -198,19 +227,29 @@ class PatternCompileTest(test.TestCase):
flo2.add(e, f, g)
flo.add(flo2)
- compilation = compiler.PatternCompiler(flo).compile()
- graph = compilation.execution_graph
- self.assertEqual(9, len(graph))
- self.assertItemsEqual(graph.edges(), [
- (flo, a),
- (flo, b),
- (flo, c),
- (flo, d),
- (flo, flo2),
-
- (flo2, e),
- (flo2, f),
- (flo2, g),
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(11, len(g))
+ self.assertItemsEqual(g.edges(), [
+ ('test', 'a'),
+ ('test', 'b'),
+ ('test', 'c'),
+ ('test', 'd'),
+ ('test', 'test2'),
+
+ ('test2', 'e'),
+ ('test2', 'f'),
+ ('test2', 'g'),
+
+ ('e', 'test2[$]'),
+ ('f', 'test2[$]'),
+ ('g', 'test2[$]'),
+
+ ('test2[$]', 'test[$]'),
+ ('a', 'test[$]'),
+ ('b', 'test[$]'),
+ ('c', 'test[$]'),
+ ('d', 'test[$]'),
])
def test_graph_links(self):
@@ -221,33 +260,34 @@ class PatternCompileTest(test.TestCase):
flo.link(b, c)
flo.link(c, d)
- compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(5, len(g))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (flo, a, {'invariant': True}),
-
- (a, b, {'manual': True}),
- (b, c, {'manual': True}),
- (c, d, {'manual': True}),
+ ('test', 'a', {'invariant': True}),
+ ('a', 'b', {'manual': True}),
+ ('b', 'c', {'manual': True}),
+ ('c', 'd', {'manual': True}),
+ ('d', 'test[$]', {'invariant': True}),
])
- self.assertItemsEqual([flo], g.no_predecessors_iter())
- self.assertItemsEqual([d], g.no_successors_iter())
+ self.assertItemsEqual(['test'], g.no_predecessors_iter())
+ self.assertItemsEqual(['test[$]'], g.no_successors_iter())
def test_graph_dependencies(self):
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=['x'])
flo = gf.Flow("test").add(a, b)
- compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(3, len(g))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (flo, a, {'invariant': True}),
- (a, b, {'reasons': set(['x'])})
+ ('test', 'a', {'invariant': True}),
+ ('a', 'b', {'reasons': set(['x'])}),
+ ('b', 'test[$]', {'invariant': True}),
])
- self.assertItemsEqual([flo], g.no_predecessors_iter())
- self.assertItemsEqual([b], g.no_successors_iter())
+ self.assertItemsEqual(['test'], g.no_predecessors_iter())
+ self.assertItemsEqual(['test[$]'], g.no_successors_iter())
def test_graph_nested_requires(self):
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
@@ -256,17 +296,19 @@ class PatternCompileTest(test.TestCase):
inner_flo = lf.Flow("test2").add(b, c)
flo = gf.Flow("test").add(a, inner_flo)
- compilation = compiler.PatternCompiler(flo).compile()
- graph = compilation.execution_graph
- self.assertEqual(5, len(graph))
- self.assertItemsEqual(graph.edges(data=True), [
- (flo, a, {'invariant': True}),
- (inner_flo, b, {'invariant': True}),
- (a, inner_flo, {'reasons': set(['x'])}),
- (b, c, {'invariant': True}),
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(7, len(g))
+ self.assertItemsEqual(g.edges(data=True), [
+ ('test', 'a', {'invariant': True}),
+ ('test2', 'b', {'invariant': True}),
+ ('a', 'test2', {'reasons': set(['x'])}),
+ ('b', 'c', {'invariant': True}),
+ ('c', 'test2[$]', {'invariant': True}),
+ ('test2[$]', 'test[$]', {'invariant': True}),
])
- self.assertItemsEqual([flo], graph.no_predecessors_iter())
- self.assertItemsEqual([c], graph.no_successors_iter())
+ self.assertItemsEqual(['test'], list(g.no_predecessors_iter()))
+ self.assertItemsEqual(['test[$]'], list(g.no_successors_iter()))
def test_graph_nested_provides(self):
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=['x'])
@@ -275,18 +317,22 @@ class PatternCompileTest(test.TestCase):
inner_flo = lf.Flow("test2").add(b, c)
flo = gf.Flow("test").add(a, inner_flo)
- compilation = compiler.PatternCompiler(flo).compile()
- graph = compilation.execution_graph
- self.assertEqual(5, len(graph))
- self.assertItemsEqual(graph.edges(data=True), [
- (flo, inner_flo, {'invariant': True}),
-
- (inner_flo, b, {'invariant': True}),
- (b, c, {'invariant': True}),
- (c, a, {'reasons': set(['x'])}),
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(7, len(g))
+ self.assertItemsEqual(g.edges(data=True), [
+ ('test', 'test2', {'invariant': True}),
+ ('a', 'test[$]', {'invariant': True}),
+
+ # The 'x' requirement is produced out of test2...
+ ('test2[$]', 'a', {'reasons': set(['x'])}),
+
+ ('test2', 'b', {'invariant': True}),
+ ('b', 'c', {'invariant': True}),
+ ('c', 'test2[$]', {'invariant': True}),
])
- self.assertItemsEqual([flo], graph.no_predecessors_iter())
- self.assertItemsEqual([a], graph.no_successors_iter())
+ self.assertItemsEqual(['test'], g.no_predecessors_iter())
+ self.assertItemsEqual(['test[$]'], g.no_successors_iter())
def test_empty_flow_in_linear_flow(self):
flo = lf.Flow('lf')
@@ -295,12 +341,14 @@ class PatternCompileTest(test.TestCase):
empty_flo = gf.Flow("empty")
flo.add(a, empty_flo, b)
- compilation = compiler.PatternCompiler(flo).compile()
- graph = compilation.execution_graph
- self.assertItemsEqual(graph.edges(), [
- (flo, a),
- (a, empty_flo),
- (empty_flo, b),
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertItemsEqual(g.edges(), [
+ ("lf", "a"),
+ ("a", "empty"),
+ ("empty", "empty[$]"),
+ ("empty[$]", "b"),
+ ("b", "lf[$]"),
])
def test_many_empty_in_graph_flow(self):
@@ -331,22 +379,24 @@ class PatternCompileTest(test.TestCase):
flo.link(a, d)
flo.link(c, d)
- compilation = compiler.PatternCompiler(flo).compile()
- graph = compilation.execution_graph
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
- self.assertTrue(graph.has_edge(flo, a))
+ self.assertTrue(g.has_edge('root', 'a'))
+ self.assertTrue(g.has_edge('root', 'b'))
+ self.assertTrue(g.has_edge('root', 'c'))
- self.assertTrue(graph.has_edge(flo, b))
- self.assertTrue(graph.has_edge(b_0, b_1))
- self.assertTrue(graph.has_edge(b_1, b_2))
- self.assertTrue(graph.has_edge(b_2, b_3))
+ self.assertTrue(g.has_edge('b.0', 'b.1'))
+ self.assertTrue(g.has_edge('b.1[$]', 'b.2'))
+ self.assertTrue(g.has_edge('b.2[$]', 'b.3'))
- self.assertTrue(graph.has_edge(flo, c))
- self.assertTrue(graph.has_edge(c_0, c_1))
- self.assertTrue(graph.has_edge(c_1, c_2))
+ self.assertTrue(g.has_edge('c.0[$]', 'c.1'))
+ self.assertTrue(g.has_edge('c.1[$]', 'c.2'))
- self.assertTrue(graph.has_edge(b_3, d))
- self.assertEqual(12, len(graph))
+ self.assertTrue(g.has_edge('a', 'd'))
+ self.assertTrue(g.has_edge('b[$]', 'd'))
+ self.assertTrue(g.has_edge('c[$]', 'd'))
+ self.assertEqual(20, len(g))
def test_empty_flow_in_nested_flow(self):
flow = lf.Flow('lf')
@@ -360,13 +410,13 @@ class PatternCompileTest(test.TestCase):
flow2.add(c, empty_flow, d)
flow.add(a, flow2, b)
- compilation = compiler.PatternCompiler(flow).compile()
- g = compilation.execution_graph
-
- for source, target in [(flow, a), (a, flow2),
- (flow2, c), (c, empty_flow),
- (empty_flow, d), (d, b)]:
- self.assertTrue(g.has_edge(source, target))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flow).compile())
+ for u, v in [('lf', 'a'), ('a', 'lf-2'),
+ ('lf-2', 'c'), ('c', 'empty'),
+ ('empty[$]', 'd'), ('d', 'lf-2[$]'),
+ ('lf-2[$]', 'b'), ('b', 'lf[$]')]:
+ self.assertTrue(g.has_edge(u, v))
def test_empty_flow_in_graph_flow(self):
flow = lf.Flow('lf')
@@ -379,7 +429,14 @@ class PatternCompileTest(test.TestCase):
g = compilation.execution_graph
self.assertTrue(g.has_edge(flow, a))
self.assertTrue(g.has_edge(a, empty_flow))
- self.assertTrue(g.has_edge(empty_flow, b))
+
+ empty_flow_successors = g.successors(empty_flow)
+ self.assertEqual(1, len(empty_flow_successors))
+ empty_flow_terminal = empty_flow_successors[0]
+ self.assertIs(empty_flow, empty_flow_terminal.flow)
+ self.assertEqual(compiler.FLOW_END,
+ g.node[empty_flow_terminal]['kind'])
+ self.assertTrue(g.has_edge(empty_flow_terminal, b))
def test_empty_flow_in_graph_flow_linkage(self):
flow = gf.Flow('lf')
@@ -417,146 +474,154 @@ class PatternCompileTest(test.TestCase):
def test_retry_in_linear_flow(self):
flo = lf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(2, len(g))
- self.assertEqual(1, g.number_of_edges())
+ self.assertEqual(3, len(compilation.execution_graph))
+ self.assertEqual(2, compilation.execution_graph.number_of_edges())
def test_retry_in_unordered_flow(self):
flo = uf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(2, len(g))
- self.assertEqual(1, g.number_of_edges())
+ self.assertEqual(3, len(compilation.execution_graph))
+ self.assertEqual(2, compilation.execution_graph.number_of_edges())
def test_retry_in_graph_flow(self):
flo = gf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(2, len(g))
- self.assertEqual(1, g.number_of_edges())
+ self.assertEqual(3, len(g))
+ self.assertEqual(2, g.number_of_edges())
def test_retry_in_nested_flows(self):
c1 = retry.AlwaysRevert("c1")
c2 = retry.AlwaysRevert("c2")
inner_flo = lf.Flow("test2", c2)
flo = lf.Flow("test", c1).add(inner_flo)
- compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (flo, c1, {'invariant': True}),
- (c1, inner_flo, {'invariant': True, 'retry': True}),
- (inner_flo, c2, {'invariant': True}),
+ ('test', 'c1', {'invariant': True}),
+ ('c1', 'test2', {'invariant': True, 'retry': True}),
+ ('test2', 'c2', {'invariant': True}),
+ ('c2', 'test2[$]', {'invariant': True}),
+ ('test2[$]', 'test[$]', {'invariant': True}),
])
- self.assertIs(c1, g.node[c2]['retry'])
- self.assertItemsEqual([flo], g.no_predecessors_iter())
- self.assertItemsEqual([c2], g.no_successors_iter())
+ self.assertIs(c1, g.node['c2']['retry'])
+ self.assertItemsEqual(['test'], list(g.no_predecessors_iter()))
+ self.assertItemsEqual(['test[$]'], list(g.no_successors_iter()))
def test_retry_in_linear_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
a, b = test_utils.make_many(2)
flo = lf.Flow("test", c).add(a, b)
- compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (flo, c, {'invariant': True}),
- (a, b, {'invariant': True}),
- (c, a, {'invariant': True, 'retry': True})
+ ('test', 'c', {'invariant': True}),
+ ('a', 'b', {'invariant': True}),
+ ('c', 'a', {'invariant': True, 'retry': True}),
+ ('b', 'test[$]', {'invariant': True}),
])
- self.assertItemsEqual([flo], g.no_predecessors_iter())
- self.assertItemsEqual([b], g.no_successors_iter())
- self.assertIs(c, g.node[a]['retry'])
- self.assertIs(c, g.node[b]['retry'])
+ self.assertItemsEqual(['test'], g.no_predecessors_iter())
+ self.assertItemsEqual(['test[$]'], g.no_successors_iter())
+ self.assertIs(c, g.node['a']['retry'])
+ self.assertIs(c, g.node['b']['retry'])
def test_retry_in_unordered_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
a, b = test_utils.make_many(2)
flo = uf.Flow("test", c).add(a, b)
- compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (flo, c, {'invariant': True}),
- (c, a, {'invariant': True, 'retry': True}),
- (c, b, {'invariant': True, 'retry': True}),
+ ('test', 'c', {'invariant': True}),
+ ('c', 'a', {'invariant': True, 'retry': True}),
+ ('c', 'b', {'invariant': True, 'retry': True}),
+ ('b', 'test[$]', {'invariant': True}),
+ ('a', 'test[$]', {'invariant': True}),
])
- self.assertItemsEqual([flo], g.no_predecessors_iter())
- self.assertItemsEqual([a, b], g.no_successors_iter())
- self.assertIs(c, g.node[a]['retry'])
- self.assertIs(c, g.node[b]['retry'])
+ self.assertItemsEqual(['test'], list(g.no_predecessors_iter()))
+ self.assertItemsEqual(['test[$]'], list(g.no_successors_iter()))
+ self.assertIs(c, g.node['a']['retry'])
+ self.assertIs(c, g.node['b']['retry'])
def test_retry_in_graph_flow_with_tasks(self):
- r = retry.AlwaysRevert("cp")
+ r = retry.AlwaysRevert("r")
a, b, c = test_utils.make_many(3)
flo = gf.Flow("test", r).add(a, b, c).link(b, c)
- compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(5, len(g))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
self.assertItemsEqual(g.edges(data=True), [
- (flo, r, {'invariant': True}),
- (r, a, {'invariant': True, 'retry': True}),
- (r, b, {'invariant': True, 'retry': True}),
- (b, c, {'manual': True})
+ ('test', 'r', {'invariant': True}),
+ ('r', 'a', {'invariant': True, 'retry': True}),
+ ('r', 'b', {'invariant': True, 'retry': True}),
+ ('b', 'c', {'manual': True}),
+ ('a', 'test[$]', {'invariant': True}),
+ ('c', 'test[$]', {'invariant': True}),
])
- self.assertItemsEqual([flo], g.no_predecessors_iter())
- self.assertItemsEqual([a, c], g.no_successors_iter())
- self.assertIs(r, g.node[a]['retry'])
- self.assertIs(r, g.node[b]['retry'])
- self.assertIs(r, g.node[c]['retry'])
+ self.assertItemsEqual(['test'], g.no_predecessors_iter())
+ self.assertItemsEqual(['test[$]'], g.no_successors_iter())
+ self.assertIs(r, g.node['a']['retry'])
+ self.assertIs(r, g.node['b']['retry'])
+ self.assertIs(r, g.node['c']['retry'])
def test_retries_hierarchy(self):
- c1 = retry.AlwaysRevert("cp1")
- c2 = retry.AlwaysRevert("cp2")
+ c1 = retry.AlwaysRevert("c1")
+ c2 = retry.AlwaysRevert("c2")
a, b, c, d = test_utils.make_many(4)
- inner_flo = lf.Flow("test", c2).add(b, c)
+ inner_flo = lf.Flow("test2", c2).add(b, c)
flo = lf.Flow("test", c1).add(a, inner_flo, d)
- compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(8, len(g))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(10, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (flo, c1, {'invariant': True}),
- (c1, a, {'invariant': True, 'retry': True}),
- (a, inner_flo, {'invariant': True}),
- (inner_flo, c2, {'invariant': True}),
- (c2, b, {'invariant': True, 'retry': True}),
- (b, c, {'invariant': True}),
- (c, d, {'invariant': True}),
+ ('test', 'c1', {'invariant': True}),
+ ('c1', 'a', {'invariant': True, 'retry': True}),
+ ('a', 'test2', {'invariant': True}),
+ ('test2', 'c2', {'invariant': True}),
+ ('c2', 'b', {'invariant': True, 'retry': True}),
+ ('b', 'c', {'invariant': True}),
+ ('c', 'test2[$]', {'invariant': True}),
+ ('test2[$]', 'd', {'invariant': True}),
+ ('d', 'test[$]', {'invariant': True}),
])
- self.assertIs(c1, g.node[a]['retry'])
- self.assertIs(c1, g.node[d]['retry'])
- self.assertIs(c2, g.node[b]['retry'])
- self.assertIs(c2, g.node[c]['retry'])
- self.assertIs(c1, g.node[c2]['retry'])
- self.assertIs(None, g.node[c1].get('retry'))
+ self.assertIs(c1, g.node['a']['retry'])
+ self.assertIs(c1, g.node['d']['retry'])
+ self.assertIs(c2, g.node['b']['retry'])
+ self.assertIs(c2, g.node['c']['retry'])
+ self.assertIs(c1, g.node['c2']['retry'])
+ self.assertIs(None, g.node['c1'].get('retry'))
def test_retry_subflows_hierarchy(self):
- c1 = retry.AlwaysRevert("cp1")
+ c1 = retry.AlwaysRevert("c1")
a, b, c, d = test_utils.make_many(4)
- inner_flo = lf.Flow("test").add(b, c)
+ inner_flo = lf.Flow("test2").add(b, c)
flo = lf.Flow("test", c1).add(a, inner_flo, d)
- compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(7, len(g))
+ g = _replicate_graph_with_names(
+ compiler.PatternCompiler(flo).compile())
+ self.assertEqual(9, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (flo, c1, {'invariant': True}),
- (c1, a, {'invariant': True, 'retry': True}),
- (a, inner_flo, {'invariant': True}),
- (inner_flo, b, {'invariant': True}),
- (b, c, {'invariant': True}),
- (c, d, {'invariant': True}),
+ ('test', 'c1', {'invariant': True}),
+ ('c1', 'a', {'invariant': True, 'retry': True}),
+ ('a', 'test2', {'invariant': True}),
+ ('test2', 'b', {'invariant': True}),
+ ('b', 'c', {'invariant': True}),
+ ('c', 'test2[$]', {'invariant': True}),
+ ('test2[$]', 'd', {'invariant': True}),
+ ('d', 'test[$]', {'invariant': True}),
])
- self.assertIs(c1, g.node[a]['retry'])
- self.assertIs(c1, g.node[d]['retry'])
- self.assertIs(c1, g.node[b]['retry'])
- self.assertIs(c1, g.node[c]['retry'])
- self.assertIs(None, g.node[c1].get('retry'))
+ self.assertIs(c1, g.node['a']['retry'])
+ self.assertIs(c1, g.node['d']['retry'])
+ self.assertIs(c1, g.node['b']['retry'])
+ self.assertIs(c1, g.node['c']['retry'])
+ self.assertIs(None, g.node['c1'].get('retry'))
diff --git a/taskflow/tests/unit/patterns/test_graph_flow.py b/taskflow/tests/unit/patterns/test_graph_flow.py
index 7197dc7..429f68f 100644
--- a/taskflow/tests/unit/patterns/test_graph_flow.py
+++ b/taskflow/tests/unit/patterns/test_graph_flow.py
@@ -26,6 +26,16 @@ def _task(name, provides=None, requires=None):
class GraphFlowTest(test.TestCase):
+ def test_invalid_decider_depth(self):
+ g_1 = utils.ProgressingTask(name='g-1')
+ g_2 = utils.ProgressingTask(name='g-2')
+ for not_a_depth in ['not-a-depth', object(), 2, 3.4, False]:
+ flow = gf.Flow('g')
+ flow.add(g_1, g_2)
+ self.assertRaises((ValueError, TypeError),
+ flow.link, g_1, g_2,
+ decider=lambda history: False,
+ decider_depth=not_a_depth)
def test_graph_flow_stringy(self):
f = gf.Flow('test')
diff --git a/taskflow/tests/unit/test_deciders.py b/taskflow/tests/unit/test_deciders.py
new file mode 100644
index 0000000..8bfc154
--- /dev/null
+++ b/taskflow/tests/unit/test_deciders.py
@@ -0,0 +1,58 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from taskflow import deciders
+from taskflow import test
+
+
+class TestDeciders(test.TestCase):
+ def test_translate(self):
+ for val in ['all', 'ALL', 'aLL', deciders.Depth.ALL]:
+ self.assertEqual(deciders.Depth.ALL,
+ deciders.Depth.translate(val))
+ for val in ['atom', 'ATOM', 'atOM', deciders.Depth.ATOM]:
+ self.assertEqual(deciders.Depth.ATOM,
+ deciders.Depth.translate(val))
+ for val in ['neighbors', 'Neighbors',
+ 'NEIGHBORS', deciders.Depth.NEIGHBORS]:
+ self.assertEqual(deciders.Depth.NEIGHBORS,
+ deciders.Depth.translate(val))
+ for val in ['flow', 'FLOW', 'flOW', deciders.Depth.FLOW]:
+ self.assertEqual(deciders.Depth.FLOW,
+ deciders.Depth.translate(val))
+
+ def test_bad_translate(self):
+ self.assertRaises(TypeError, deciders.Depth.translate, 3)
+ self.assertRaises(TypeError, deciders.Depth.translate, object())
+ self.assertRaises(ValueError, deciders.Depth.translate, "stuff")
+
+ def test_pick_widest(self):
+ choices = [deciders.Depth.ATOM, deciders.Depth.FLOW]
+ self.assertEqual(deciders.Depth.FLOW, deciders.pick_widest(choices))
+ choices = [deciders.Depth.ATOM, deciders.Depth.FLOW,
+ deciders.Depth.ALL]
+ self.assertEqual(deciders.Depth.ALL, deciders.pick_widest(choices))
+ choices = [deciders.Depth.ATOM, deciders.Depth.FLOW,
+ deciders.Depth.ALL, deciders.Depth.NEIGHBORS]
+ self.assertEqual(deciders.Depth.ALL, deciders.pick_widest(choices))
+ choices = [deciders.Depth.ATOM, deciders.Depth.NEIGHBORS]
+ self.assertEqual(deciders.Depth.NEIGHBORS,
+ deciders.pick_widest(choices))
+
+ def test_bad_pick_widest(self):
+ self.assertRaises(ValueError, deciders.pick_widest, [])
+ self.assertRaises(ValueError, deciders.pick_widest, ["a"])
+ self.assertRaises(ValueError, deciders.pick_widest, set(['b']))
diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py
index 288afb1..cb59a3e 100644
--- a/taskflow/tests/unit/test_engines.py
+++ b/taskflow/tests/unit/test_engines.py
@@ -669,6 +669,94 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
self.assertNotIn('task2.t REVERTED(None)', capturer.values)
+class EngineDeciderDepthTest(utils.EngineTestBase):
+
+ def test_run_graph_flow_decider_various_depths(self):
+ sub_flow_1 = gf.Flow('g_1')
+ g_1_1 = utils.ProgressingTask(name='g_1-1')
+ sub_flow_1.add(g_1_1)
+ g_1 = utils.ProgressingTask(name='g-1')
+ g_2 = utils.ProgressingTask(name='g-2')
+ g_3 = utils.ProgressingTask(name='g-3')
+ g_4 = utils.ProgressingTask(name='g-4')
+ for a_depth, ran_how_many in [('all', 1),
+ ('atom', 4),
+ ('flow', 2),
+ ('neighbors', 3)]:
+ flow = gf.Flow('g')
+ flow.add(g_1, g_2, sub_flow_1, g_3, g_4)
+ flow.link(g_1, g_2,
+ decider=lambda history: False,
+ decider_depth=a_depth)
+ flow.link(g_2, sub_flow_1)
+ flow.link(g_2, g_3)
+ flow.link(g_3, g_4)
+ flow.link(g_1, sub_flow_1,
+ decider=lambda history: True,
+ decider_depth=a_depth)
+ e = self._make_engine(flow)
+ with utils.CaptureListener(e, capture_flow=False) as capturer:
+ e.run()
+ ran_tasks = 0
+ for outcome in capturer.values:
+ if outcome.endswith("RUNNING"):
+ ran_tasks += 1
+ self.assertEqual(ran_how_many, ran_tasks)
+
+ def test_run_graph_flow_decider_jump_over_atom(self):
+ flow = gf.Flow('g')
+ a = utils.AddOneSameProvidesRequires("a", inject={'value': 0})
+ b = utils.AddOneSameProvidesRequires("b")
+ c = utils.AddOneSameProvidesRequires("c")
+ flow.add(a, b, c, resolve_requires=False)
+ flow.link(a, b, decider=lambda history: False,
+ decider_depth='atom')
+ flow.link(b, c)
+ e = self._make_engine(flow)
+ e.run()
+ self.assertEqual(2, e.storage.get('c'))
+ self.assertEqual(states.IGNORE, e.storage.get_atom_state('b'))
+
+ def test_run_graph_flow_decider_jump_over_bad_atom(self):
+ flow = gf.Flow('g')
+ a = utils.NoopTask("a")
+ b = utils.FailingTask("b")
+ c = utils.NoopTask("c")
+ flow.add(a, b, c)
+ flow.link(a, b, decider=lambda history: False,
+ decider_depth='atom')
+ flow.link(b, c)
+ e = self._make_engine(flow)
+ e.run()
+
+ def test_run_graph_flow_decider_revert(self):
+ flow = gf.Flow('g')
+ a = utils.NoopTask("a")
+ b = utils.NoopTask("b")
+ c = utils.FailingTask("c")
+ flow.add(a, b, c)
+ flow.link(a, b, decider=lambda history: False,
+ decider_depth='atom')
+ flow.link(b, c)
+ e = self._make_engine(flow)
+ with utils.CaptureListener(e, capture_flow=False) as capturer:
+ # Wrapped failure here for WBE engine, make this better in
+ # the future, perhaps via a custom testtools matcher??
+ self.assertRaises((RuntimeError, exc.WrappedFailure), e.run)
+ expected = [
+ 'a.t RUNNING',
+ 'a.t SUCCESS(None)',
+ 'b.t IGNORE',
+ 'c.t RUNNING',
+ 'c.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'c.t REVERTING',
+ 'c.t REVERTED(None)',
+ 'a.t REVERTING',
+ 'a.t REVERTED(None)',
+ ]
+ self.assertEqual(expected, capturer.values)
+
+
class EngineGraphFlowTest(utils.EngineTestBase):
def test_run_empty_graph_flow(self):
@@ -1043,9 +1131,10 @@ class EngineGraphConditionalFlowTest(utils.EngineTestBase):
'task4.t IGNORE',
]
self.assertEqual(expected, capturer.values)
- self.assertEqual(1, len(histories))
- self.assertIn('task1', histories[0])
- self.assertIn('task2', histories[0])
+ self.assertEqual(2, len(histories))
+ for i in range(0, 2):
+ self.assertIn('task1', histories[i])
+ self.assertIn('task2', histories[i])
def test_graph_flow_conditional(self):
flow = gf.Flow('root')
@@ -1249,14 +1338,15 @@ class SerialEngineTest(EngineTaskTest,
EngineResetTests,
EngineGraphConditionalFlowTest,
EngineCheckingTaskTest,
+ EngineDeciderDepthTest,
test.TestCase):
def _make_engine(self, flow,
- flow_detail=None, store=None):
+ flow_detail=None, store=None, **kwargs):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine='serial',
backend=self.backend,
- store=store)
+ store=store, **kwargs)
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
@@ -1278,11 +1368,13 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
EngineCheckingTaskTest,
+ EngineDeciderDepthTest,
test.TestCase):
_EXECUTOR_WORKERS = 2
def _make_engine(self, flow,
- flow_detail=None, executor=None, store=None):
+ flow_detail=None, executor=None, store=None,
+ **kwargs):
if executor is None:
executor = 'threads'
return taskflow.engines.load(flow, flow_detail=flow_detail,
@@ -1290,7 +1382,8 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
executor=executor,
engine='parallel',
store=store,
- max_workers=self._EXECUTOR_WORKERS)
+ max_workers=self._EXECUTOR_WORKERS,
+ **kwargs)
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
@@ -1319,17 +1412,19 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
EngineCheckingTaskTest,
+ EngineDeciderDepthTest,
test.TestCase):
def _make_engine(self, flow,
- flow_detail=None, executor=None, store=None):
+ flow_detail=None, executor=None, store=None,
+ **kwargs):
if executor is None:
executor = futurist.GreenThreadPoolExecutor()
self.addCleanup(executor.shutdown)
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, engine='parallel',
executor=executor,
- store=store)
+ store=store, **kwargs)
class ParallelEngineWithProcessTest(EngineTaskTest,
@@ -1342,6 +1437,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
EngineResetTests,
EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
+ EngineDeciderDepthTest,
test.TestCase):
_EXECUTOR_WORKERS = 2
@@ -1350,7 +1446,8 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
self.assertIsInstance(engine, eng.ParallelActionEngine)
def _make_engine(self, flow,
- flow_detail=None, executor=None, store=None):
+ flow_detail=None, executor=None, store=None,
+ **kwargs):
if executor is None:
executor = 'processes'
return taskflow.engines.load(flow, flow_detail=flow_detail,
@@ -1358,7 +1455,8 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
engine='parallel',
executor=executor,
store=store,
- max_workers=self._EXECUTOR_WORKERS)
+ max_workers=self._EXECUTOR_WORKERS,
+ **kwargs)
class WorkerBasedEngineTest(EngineTaskTest,
@@ -1371,6 +1469,7 @@ class WorkerBasedEngineTest(EngineTaskTest,
EngineResetTests,
EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
+ EngineDeciderDepthTest,
test.TestCase):
def setUp(self):
super(WorkerBasedEngineTest, self).setUp()
@@ -1415,10 +1514,11 @@ class WorkerBasedEngineTest(EngineTaskTest,
super(WorkerBasedEngineTest, self).tearDown()
def _make_engine(self, flow,
- flow_detail=None, store=None):
+ flow_detail=None, store=None, **kwargs):
+ kwargs.update(self.engine_conf)
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend,
- store=store, **self.engine_conf)
+ store=store, **kwargs)
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py
index cf43500..effc54b 100644
--- a/taskflow/tests/unit/test_utils.py
+++ b/taskflow/tests/unit/test_utils.py
@@ -244,24 +244,6 @@ class TestCountdownIter(test.TestCase):
self.assertRaises(ValueError, six.next, it)
-class TestLookFor(test.TestCase):
- def test_no_matches(self):
- hay = [9, 10, 11]
- self.assertEqual([], misc.look_for(hay, [1, 2, 3]))
-
- def test_match_order(self):
- hay = [6, 5, 4, 3, 2, 1]
- priors = []
- for i in range(0, 6):
- priors.append(i + 1)
- matches = misc.look_for(hay, priors)
- self.assertGreater(0, len(matches))
- self.assertIsSuperAndSubsequence(hay, matches)
- hay = [10, 1, 15, 3, 5, 8, 44]
- self.assertEqual([1, 15], misc.look_for(hay, [15, 1]))
- self.assertEqual([10, 44], misc.look_for(hay, [44, 10]))
-
-
class TestMergeUri(test.TestCase):
def test_merge(self):
url = "http://www.yahoo.com/?a=b&c=d"