diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-11-16 16:27:42 -0800 |
|---|---|---|
| committer | Joshua Harlow <harlowja@gmail.com> | 2016-01-09 22:42:17 -0800 |
| commit | 8e8156c488dea8ae876b112c30e41e60da4f5be7 (patch) | |
| tree | a4c119dda7d2338bd8c63aad593bbb42d24f3e1f /taskflow/tests | |
| parent | f555a35f3081ba492db15d7bda11fbe50f2a8349 (diff) | |
| download | taskflow-8e8156c488dea8ae876b112c30e41e60da4f5be7.tar.gz | |
Allow for alterations in decider 'area of influence'
Christmas came early.
Closes-Bug: #1479466
Change-Id: I931d826690c925f022dbfffe9afb7bf41345b1d0
Diffstat (limited to 'taskflow/tests')
| -rw-r--r-- | taskflow/tests/unit/action_engine/test_compile.py | 529 | ||||
| -rw-r--r-- | taskflow/tests/unit/patterns/test_graph_flow.py | 10 | ||||
| -rw-r--r-- | taskflow/tests/unit/test_deciders.py | 58 | ||||
| -rw-r--r-- | taskflow/tests/unit/test_engines.py | 126 | ||||
| -rw-r--r-- | taskflow/tests/unit/test_utils.py | 18 |
5 files changed, 478 insertions, 263 deletions
diff --git a/taskflow/tests/unit/action_engine/test_compile.py b/taskflow/tests/unit/action_engine/test_compile.py index 6ccf358..757bde7 100644 --- a/taskflow/tests/unit/action_engine/test_compile.py +++ b/taskflow/tests/unit/action_engine/test_compile.py @@ -25,12 +25,27 @@ from taskflow import test from taskflow.tests import utils as test_utils +def _replicate_graph_with_names(compilation): + # Turn a graph of nodes into a graph of names only so that + # testing can use those names instead of having to use the exact + # node objects themselves (which is problematic for any end nodes that + # are added into the graph *dynamically*, and are not there in the + # original/source flow). + g = compilation.execution_graph + n_g = g.__class__(name=g.name) + for node, node_data in g.nodes_iter(data=True): + n_g.add_node(node.name, attr_dict=node_data) + for u, v, u_v_data in g.edges_iter(data=True): + n_g.add_edge(u.name, v.name, attr_dict=u_v_data) + return n_g + + class PatternCompileTest(test.TestCase): def test_task(self): task = test_utils.DummyTask(name='a') - compilation = compiler.PatternCompiler(task).compile() - g = compilation.execution_graph - self.assertEqual([task], list(g.nodes())) + g = _replicate_graph_with_names( + compiler.PatternCompiler(task).compile()) + self.assertEqual(['a'], list(g.nodes())) self.assertEqual([], list(g.edges())) def test_retry(self): @@ -54,19 +69,20 @@ class PatternCompileTest(test.TestCase): inner_flo.add(d) flo.add(inner_flo) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(6, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(8, len(g)) order = g.topological_sort() - self.assertEqual([flo, a, b, c, inner_flo, d], order) - self.assertTrue(g.has_edge(c, inner_flo)) - self.assertTrue(g.has_edge(inner_flo, d)) + self.assertEqual(['test', 'a', 'b', 'c', + "sub-test", 'd', "sub-test[$]", + 'test[$]'], order) + self.assertTrue(g.has_edge('c', "sub-test")) + self.assertTrue(g.has_edge("sub-test", 'd')) self.assertEqual({'invariant': True}, - g.get_edge_data(inner_flo, d)) - - self.assertEqual([d], list(g.no_successors_iter())) - self.assertEqual([flo], list(g.no_predecessors_iter())) + g.get_edge_data("sub-test", 'd')) + self.assertEqual(['test[$]'], list(g.no_successors_iter())) + self.assertEqual(['test'], list(g.no_predecessors_iter())) def test_invalid(self): a, b, c = test_utils.make_many(3) @@ -80,19 +96,21 @@ class PatternCompileTest(test.TestCase): a, b, c, d = test_utils.make_many(4) flo = uf.Flow("test") flo.add(a, b, c, d) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(5, len(g)) + + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(6, len(g)) self.assertItemsEqual(g.edges(), [ - (flo, a), - (flo, b), - (flo, c), - (flo, d), + ('test', 'a'), + ('test', 'b'), + ('test', 'c'), + ('test', 'd'), + ('a', 'test[$]'), + ('b', 'test[$]'), + ('c', 'test[$]'), + ('d', 'test[$]'), ]) - self.assertEqual(set([a, b, c, d]), - set(g.no_successors_iter())) - self.assertEqual(set([flo]), - set(g.no_predecessors_iter())) + self.assertEqual(set(['test']), set(g.no_predecessors_iter())) def test_linear_nested(self): a, b, c, d = test_utils.make_many(4) @@ -102,22 +120,22 @@ class PatternCompileTest(test.TestCase): inner_flo.add(c, d) flo.add(inner_flo) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph - self.assertEqual(6, len(graph)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(8, len(g)) - lb = graph.subgraph([a, b]) - self.assertFalse(lb.has_edge(b, a)) - self.assertTrue(lb.has_edge(a, b)) - self.assertEqual({'invariant': True}, graph.get_edge_data(a, b)) + sub_g = g.subgraph(['a', 'b']) + self.assertFalse(sub_g.has_edge('b', 'a')) + self.assertTrue(sub_g.has_edge('a', 'b')) + self.assertEqual({'invariant': True}, sub_g.get_edge_data("a", "b")) - ub = graph.subgraph([c, d]) - self.assertEqual(0, ub.number_of_edges()) + sub_g = g.subgraph(['c', 'd']) + self.assertEqual(0, sub_g.number_of_edges()) # This ensures that c and d do not start executing until after b. - self.assertTrue(graph.has_edge(b, inner_flo)) - self.assertTrue(graph.has_edge(inner_flo, c)) - self.assertTrue(graph.has_edge(inner_flo, d)) + self.assertTrue(g.has_edge('b', 'test2')) + self.assertTrue(g.has_edge('test2', 'c')) + self.assertTrue(g.has_edge('test2', 'd')) def test_unordered_nested(self): a, b, c, d = test_utils.make_many(4) @@ -127,15 +145,19 @@ class PatternCompileTest(test.TestCase): flo2.add(c, d) flo.add(flo2) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(6, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(8, len(g)) self.assertItemsEqual(g.edges(), [ - (flo, a), - (flo, b), - (flo, flo2), - (flo2, c), - (c, d) + ('test', 'a'), + ('test', 'b'), + ('test', 'test2'), + ('test2', 'c'), + ('c', 'd'), + ('d', 'test2[$]'), + ('test2[$]', 'test[$]'), + ('a', 'test[$]'), + ('b', 'test[$]'), ]) def test_unordered_nested_in_linear(self): @@ -143,27 +165,27 @@ class PatternCompileTest(test.TestCase): inner_flo = uf.Flow('ut').add(b, c) flo = lf.Flow('lt').add(a, inner_flo, d) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(6, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(8, len(g)) self.assertItemsEqual(g.edges(), [ - (flo, a), - (a, inner_flo), - (inner_flo, b), - (inner_flo, c), - (b, d), - (c, d), + ('lt', 'a'), + ('a', 'ut'), + ('ut', 'b'), + ('ut', 'c'), + ('b', 'ut[$]'), + ('c', 'ut[$]'), + ('ut[$]', 'd'), + ('d', 'lt[$]'), ]) def test_graph(self): a, b, c, d = test_utils.make_many(4) flo = gf.Flow("test") flo.add(a, b, c, d) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(5, len(g)) - self.assertEqual(4, g.number_of_edges()) + self.assertEqual(6, len(compilation.execution_graph)) + self.assertEqual(8, compilation.execution_graph.number_of_edges()) def test_graph_nested(self): a, b, c, d, e, f, g = test_utils.make_many(7) @@ -174,19 +196,26 @@ class PatternCompileTest(test.TestCase): flo2.add(e, f, g) flo.add(flo2) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph - self.assertEqual(9, len(graph)) - self.assertItemsEqual(graph.edges(), [ - (flo, a), - (flo, b), - (flo, c), - (flo, d), - (flo, flo2), - - (flo2, e), - (e, f), - (f, g), + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(11, len(g)) + self.assertItemsEqual(g.edges(), [ + ('test', 'a'), + ('test', 'b'), + ('test', 'c'), + ('test', 'd'), + ('a', 'test[$]'), + ('b', 'test[$]'), + ('c', 'test[$]'), + ('d', 'test[$]'), + + ('test', 'test2'), + ('test2', 'e'), + ('e', 'f'), + ('f', 'g'), + + ('g', 'test2[$]'), + ('test2[$]', 'test[$]'), ]) def test_graph_nested_graph(self): @@ -198,19 +227,29 @@ class PatternCompileTest(test.TestCase): flo2.add(e, f, g) flo.add(flo2) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph - self.assertEqual(9, len(graph)) - self.assertItemsEqual(graph.edges(), [ - (flo, a), - (flo, b), - (flo, c), - (flo, d), - (flo, flo2), - - (flo2, e), - (flo2, f), - (flo2, g), + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(11, len(g)) + self.assertItemsEqual(g.edges(), [ + ('test', 'a'), + ('test', 'b'), + ('test', 'c'), + ('test', 'd'), + ('test', 'test2'), + + ('test2', 'e'), + ('test2', 'f'), + ('test2', 'g'), + + ('e', 'test2[$]'), + ('f', 'test2[$]'), + ('g', 'test2[$]'), + + ('test2[$]', 'test[$]'), + ('a', 'test[$]'), + ('b', 'test[$]'), + ('c', 'test[$]'), + ('d', 'test[$]'), ]) def test_graph_links(self): @@ -221,33 +260,34 @@ class PatternCompileTest(test.TestCase): flo.link(b, c) flo.link(c, d) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(5, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(6, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, a, {'invariant': True}), - - (a, b, {'manual': True}), - (b, c, {'manual': True}), - (c, d, {'manual': True}), + ('test', 'a', {'invariant': True}), + ('a', 'b', {'manual': True}), + ('b', 'c', {'manual': True}), + ('c', 'd', {'manual': True}), + ('d', 'test[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], g.no_predecessors_iter()) - self.assertItemsEqual([d], g.no_successors_iter()) + self.assertItemsEqual(['test'], g.no_predecessors_iter()) + self.assertItemsEqual(['test[$]'], g.no_successors_iter()) def test_graph_dependencies(self): a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[]) b = test_utils.ProvidesRequiresTask('b', provides=[], requires=['x']) flo = gf.Flow("test").add(a, b) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(3, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(4, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, a, {'invariant': True}), - (a, b, {'reasons': set(['x'])}) + ('test', 'a', {'invariant': True}), + ('a', 'b', {'reasons': set(['x'])}), + ('b', 'test[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], g.no_predecessors_iter()) - self.assertItemsEqual([b], g.no_successors_iter()) + self.assertItemsEqual(['test'], g.no_predecessors_iter()) + self.assertItemsEqual(['test[$]'], g.no_successors_iter()) def test_graph_nested_requires(self): a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[]) @@ -256,17 +296,19 @@ class PatternCompileTest(test.TestCase): inner_flo = lf.Flow("test2").add(b, c) flo = gf.Flow("test").add(a, inner_flo) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph - self.assertEqual(5, len(graph)) - self.assertItemsEqual(graph.edges(data=True), [ - (flo, a, {'invariant': True}), - (inner_flo, b, {'invariant': True}), - (a, inner_flo, {'reasons': set(['x'])}), - (b, c, {'invariant': True}), + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(7, len(g)) + self.assertItemsEqual(g.edges(data=True), [ + ('test', 'a', {'invariant': True}), + ('test2', 'b', {'invariant': True}), + ('a', 'test2', {'reasons': set(['x'])}), + ('b', 'c', {'invariant': True}), + ('c', 'test2[$]', {'invariant': True}), + ('test2[$]', 'test[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], graph.no_predecessors_iter()) - self.assertItemsEqual([c], graph.no_successors_iter()) + self.assertItemsEqual(['test'], list(g.no_predecessors_iter())) + self.assertItemsEqual(['test[$]'], list(g.no_successors_iter())) def test_graph_nested_provides(self): a = test_utils.ProvidesRequiresTask('a', provides=[], requires=['x']) @@ -275,18 +317,22 @@ class PatternCompileTest(test.TestCase): inner_flo = lf.Flow("test2").add(b, c) flo = gf.Flow("test").add(a, inner_flo) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph - self.assertEqual(5, len(graph)) - self.assertItemsEqual(graph.edges(data=True), [ - (flo, inner_flo, {'invariant': True}), - - (inner_flo, b, {'invariant': True}), - (b, c, {'invariant': True}), - (c, a, {'reasons': set(['x'])}), + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(7, len(g)) + self.assertItemsEqual(g.edges(data=True), [ + ('test', 'test2', {'invariant': True}), + ('a', 'test[$]', {'invariant': True}), + + # The 'x' requirement is produced out of test2... + ('test2[$]', 'a', {'reasons': set(['x'])}), + + ('test2', 'b', {'invariant': True}), + ('b', 'c', {'invariant': True}), + ('c', 'test2[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], graph.no_predecessors_iter()) - self.assertItemsEqual([a], graph.no_successors_iter()) + self.assertItemsEqual(['test'], g.no_predecessors_iter()) + self.assertItemsEqual(['test[$]'], g.no_successors_iter()) def test_empty_flow_in_linear_flow(self): flo = lf.Flow('lf') @@ -295,12 +341,14 @@ class PatternCompileTest(test.TestCase): empty_flo = gf.Flow("empty") flo.add(a, empty_flo, b) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph - self.assertItemsEqual(graph.edges(), [ - (flo, a), - (a, empty_flo), - (empty_flo, b), + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertItemsEqual(g.edges(), [ + ("lf", "a"), + ("a", "empty"), + ("empty", "empty[$]"), + ("empty[$]", "b"), + ("b", "lf[$]"), ]) def test_many_empty_in_graph_flow(self): @@ -331,22 +379,24 @@ class PatternCompileTest(test.TestCase): flo.link(a, d) flo.link(c, d) - compilation = compiler.PatternCompiler(flo).compile() - graph = compilation.execution_graph + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) - self.assertTrue(graph.has_edge(flo, a)) + self.assertTrue(g.has_edge('root', 'a')) + self.assertTrue(g.has_edge('root', 'b')) + self.assertTrue(g.has_edge('root', 'c')) - self.assertTrue(graph.has_edge(flo, b)) - self.assertTrue(graph.has_edge(b_0, b_1)) - self.assertTrue(graph.has_edge(b_1, b_2)) - self.assertTrue(graph.has_edge(b_2, b_3)) + self.assertTrue(g.has_edge('b.0', 'b.1')) + self.assertTrue(g.has_edge('b.1[$]', 'b.2')) + self.assertTrue(g.has_edge('b.2[$]', 'b.3')) - self.assertTrue(graph.has_edge(flo, c)) - self.assertTrue(graph.has_edge(c_0, c_1)) - self.assertTrue(graph.has_edge(c_1, c_2)) + self.assertTrue(g.has_edge('c.0[$]', 'c.1')) + self.assertTrue(g.has_edge('c.1[$]', 'c.2')) - self.assertTrue(graph.has_edge(b_3, d)) - self.assertEqual(12, len(graph)) + self.assertTrue(g.has_edge('a', 'd')) + self.assertTrue(g.has_edge('b[$]', 'd')) + self.assertTrue(g.has_edge('c[$]', 'd')) + self.assertEqual(20, len(g)) def test_empty_flow_in_nested_flow(self): flow = lf.Flow('lf') @@ -360,13 +410,13 @@ class PatternCompileTest(test.TestCase): flow2.add(c, empty_flow, d) flow.add(a, flow2, b) - compilation = compiler.PatternCompiler(flow).compile() - g = compilation.execution_graph - - for source, target in [(flow, a), (a, flow2), - (flow2, c), (c, empty_flow), - (empty_flow, d), (d, b)]: - self.assertTrue(g.has_edge(source, target)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flow).compile()) + for u, v in [('lf', 'a'), ('a', 'lf-2'), + ('lf-2', 'c'), ('c', 'empty'), + ('empty[$]', 'd'), ('d', 'lf-2[$]'), + ('lf-2[$]', 'b'), ('b', 'lf[$]')]: + self.assertTrue(g.has_edge(u, v)) def test_empty_flow_in_graph_flow(self): flow = lf.Flow('lf') @@ -379,7 +429,14 @@ class PatternCompileTest(test.TestCase): g = compilation.execution_graph self.assertTrue(g.has_edge(flow, a)) self.assertTrue(g.has_edge(a, empty_flow)) - self.assertTrue(g.has_edge(empty_flow, b)) + + empty_flow_successors = g.successors(empty_flow) + self.assertEqual(1, len(empty_flow_successors)) + empty_flow_terminal = empty_flow_successors[0] + self.assertIs(empty_flow, empty_flow_terminal.flow) + self.assertEqual(compiler.FLOW_END, + g.node[empty_flow_terminal]['kind']) + self.assertTrue(g.has_edge(empty_flow_terminal, b)) def test_empty_flow_in_graph_flow_linkage(self): flow = gf.Flow('lf') @@ -417,146 +474,154 @@ class PatternCompileTest(test.TestCase): def test_retry_in_linear_flow(self): flo = lf.Flow("test", retry.AlwaysRevert("c")) compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(2, len(g)) - self.assertEqual(1, g.number_of_edges()) + self.assertEqual(3, len(compilation.execution_graph)) + self.assertEqual(2, compilation.execution_graph.number_of_edges()) def test_retry_in_unordered_flow(self): flo = uf.Flow("test", retry.AlwaysRevert("c")) compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(2, len(g)) - self.assertEqual(1, g.number_of_edges()) + self.assertEqual(3, len(compilation.execution_graph)) + self.assertEqual(2, compilation.execution_graph.number_of_edges()) def test_retry_in_graph_flow(self): flo = gf.Flow("test", retry.AlwaysRevert("c")) compilation = compiler.PatternCompiler(flo).compile() g = compilation.execution_graph - self.assertEqual(2, len(g)) - self.assertEqual(1, g.number_of_edges()) + self.assertEqual(3, len(g)) + self.assertEqual(2, g.number_of_edges()) def test_retry_in_nested_flows(self): c1 = retry.AlwaysRevert("c1") c2 = retry.AlwaysRevert("c2") inner_flo = lf.Flow("test2", c2) flo = lf.Flow("test", c1).add(inner_flo) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(4, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(6, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, c1, {'invariant': True}), - (c1, inner_flo, {'invariant': True, 'retry': True}), - (inner_flo, c2, {'invariant': True}), + ('test', 'c1', {'invariant': True}), + ('c1', 'test2', {'invariant': True, 'retry': True}), + ('test2', 'c2', {'invariant': True}), + ('c2', 'test2[$]', {'invariant': True}), + ('test2[$]', 'test[$]', {'invariant': True}), ]) - self.assertIs(c1, g.node[c2]['retry']) - self.assertItemsEqual([flo], g.no_predecessors_iter()) - self.assertItemsEqual([c2], g.no_successors_iter()) + self.assertIs(c1, g.node['c2']['retry']) + self.assertItemsEqual(['test'], list(g.no_predecessors_iter())) + self.assertItemsEqual(['test[$]'], list(g.no_successors_iter())) def test_retry_in_linear_flow_with_tasks(self): c = retry.AlwaysRevert("c") a, b = test_utils.make_many(2) flo = lf.Flow("test", c).add(a, b) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(4, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(5, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, c, {'invariant': True}), - (a, b, {'invariant': True}), - (c, a, {'invariant': True, 'retry': True}) + ('test', 'c', {'invariant': True}), + ('a', 'b', {'invariant': True}), + ('c', 'a', {'invariant': True, 'retry': True}), + ('b', 'test[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], g.no_predecessors_iter()) - self.assertItemsEqual([b], g.no_successors_iter()) - self.assertIs(c, g.node[a]['retry']) - self.assertIs(c, g.node[b]['retry']) + self.assertItemsEqual(['test'], g.no_predecessors_iter()) + self.assertItemsEqual(['test[$]'], g.no_successors_iter()) + self.assertIs(c, g.node['a']['retry']) + self.assertIs(c, g.node['b']['retry']) def test_retry_in_unordered_flow_with_tasks(self): c = retry.AlwaysRevert("c") a, b = test_utils.make_many(2) flo = uf.Flow("test", c).add(a, b) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(4, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(5, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, c, {'invariant': True}), - (c, a, {'invariant': True, 'retry': True}), - (c, b, {'invariant': True, 'retry': True}), + ('test', 'c', {'invariant': True}), + ('c', 'a', {'invariant': True, 'retry': True}), + ('c', 'b', {'invariant': True, 'retry': True}), + ('b', 'test[$]', {'invariant': True}), + ('a', 'test[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], g.no_predecessors_iter()) - self.assertItemsEqual([a, b], g.no_successors_iter()) - self.assertIs(c, g.node[a]['retry']) - self.assertIs(c, g.node[b]['retry']) + self.assertItemsEqual(['test'], list(g.no_predecessors_iter())) + self.assertItemsEqual(['test[$]'], list(g.no_successors_iter())) + self.assertIs(c, g.node['a']['retry']) + self.assertIs(c, g.node['b']['retry']) def test_retry_in_graph_flow_with_tasks(self): - r = retry.AlwaysRevert("cp") + r = retry.AlwaysRevert("r") a, b, c = test_utils.make_many(3) flo = gf.Flow("test", r).add(a, b, c).link(b, c) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(5, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) self.assertItemsEqual(g.edges(data=True), [ - (flo, r, {'invariant': True}), - (r, a, {'invariant': True, 'retry': True}), - (r, b, {'invariant': True, 'retry': True}), - (b, c, {'manual': True}) + ('test', 'r', {'invariant': True}), + ('r', 'a', {'invariant': True, 'retry': True}), + ('r', 'b', {'invariant': True, 'retry': True}), + ('b', 'c', {'manual': True}), + ('a', 'test[$]', {'invariant': True}), + ('c', 'test[$]', {'invariant': True}), ]) - self.assertItemsEqual([flo], g.no_predecessors_iter()) - self.assertItemsEqual([a, c], g.no_successors_iter()) - self.assertIs(r, g.node[a]['retry']) - self.assertIs(r, g.node[b]['retry']) - self.assertIs(r, g.node[c]['retry']) + self.assertItemsEqual(['test'], g.no_predecessors_iter()) + self.assertItemsEqual(['test[$]'], g.no_successors_iter()) + self.assertIs(r, g.node['a']['retry']) + self.assertIs(r, g.node['b']['retry']) + self.assertIs(r, g.node['c']['retry']) def test_retries_hierarchy(self): - c1 = retry.AlwaysRevert("cp1") - c2 = retry.AlwaysRevert("cp2") + c1 = retry.AlwaysRevert("c1") + c2 = retry.AlwaysRevert("c2") a, b, c, d = test_utils.make_many(4) - inner_flo = lf.Flow("test", c2).add(b, c) + inner_flo = lf.Flow("test2", c2).add(b, c) flo = lf.Flow("test", c1).add(a, inner_flo, d) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(8, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(10, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, c1, {'invariant': True}), - (c1, a, {'invariant': True, 'retry': True}), - (a, inner_flo, {'invariant': True}), - (inner_flo, c2, {'invariant': True}), - (c2, b, {'invariant': True, 'retry': True}), - (b, c, {'invariant': True}), - (c, d, {'invariant': True}), + ('test', 'c1', {'invariant': True}), + ('c1', 'a', {'invariant': True, 'retry': True}), + ('a', 'test2', {'invariant': True}), + ('test2', 'c2', {'invariant': True}), + ('c2', 'b', {'invariant': True, 'retry': True}), + ('b', 'c', {'invariant': True}), + ('c', 'test2[$]', {'invariant': True}), + ('test2[$]', 'd', {'invariant': True}), + ('d', 'test[$]', {'invariant': True}), ]) - self.assertIs(c1, g.node[a]['retry']) - self.assertIs(c1, g.node[d]['retry']) - self.assertIs(c2, g.node[b]['retry']) - self.assertIs(c2, g.node[c]['retry']) - self.assertIs(c1, g.node[c2]['retry']) - self.assertIs(None, g.node[c1].get('retry')) + self.assertIs(c1, g.node['a']['retry']) + self.assertIs(c1, g.node['d']['retry']) + self.assertIs(c2, g.node['b']['retry']) + self.assertIs(c2, g.node['c']['retry']) + self.assertIs(c1, g.node['c2']['retry']) + self.assertIs(None, g.node['c1'].get('retry')) def test_retry_subflows_hierarchy(self): - c1 = retry.AlwaysRevert("cp1") + c1 = retry.AlwaysRevert("c1") a, b, c, d = test_utils.make_many(4) - inner_flo = lf.Flow("test").add(b, c) + inner_flo = lf.Flow("test2").add(b, c) flo = lf.Flow("test", c1).add(a, inner_flo, d) - compilation = compiler.PatternCompiler(flo).compile() - g = compilation.execution_graph - self.assertEqual(7, len(g)) + g = _replicate_graph_with_names( + compiler.PatternCompiler(flo).compile()) + self.assertEqual(9, len(g)) self.assertItemsEqual(g.edges(data=True), [ - (flo, c1, {'invariant': True}), - (c1, a, {'invariant': True, 'retry': True}), - (a, inner_flo, {'invariant': True}), - (inner_flo, b, {'invariant': True}), - (b, c, {'invariant': True}), - (c, d, {'invariant': True}), + ('test', 'c1', {'invariant': True}), + ('c1', 'a', {'invariant': True, 'retry': True}), + ('a', 'test2', {'invariant': True}), + ('test2', 'b', {'invariant': True}), + ('b', 'c', {'invariant': True}), + ('c', 'test2[$]', {'invariant': True}), + ('test2[$]', 'd', {'invariant': True}), + ('d', 'test[$]', {'invariant': True}), ]) - self.assertIs(c1, g.node[a]['retry']) - self.assertIs(c1, g.node[d]['retry']) - self.assertIs(c1, g.node[b]['retry']) - self.assertIs(c1, g.node[c]['retry']) - self.assertIs(None, g.node[c1].get('retry')) + self.assertIs(c1, g.node['a']['retry']) + self.assertIs(c1, g.node['d']['retry']) + self.assertIs(c1, g.node['b']['retry']) + self.assertIs(c1, g.node['c']['retry']) + self.assertIs(None, g.node['c1'].get('retry')) diff --git a/taskflow/tests/unit/patterns/test_graph_flow.py b/taskflow/tests/unit/patterns/test_graph_flow.py index 7197dc7..429f68f 100644 --- a/taskflow/tests/unit/patterns/test_graph_flow.py +++ b/taskflow/tests/unit/patterns/test_graph_flow.py @@ -26,6 +26,16 @@ def _task(name, provides=None, requires=None): class GraphFlowTest(test.TestCase): + def test_invalid_decider_depth(self): + g_1 = utils.ProgressingTask(name='g-1') + g_2 = utils.ProgressingTask(name='g-2') + for not_a_depth in ['not-a-depth', object(), 2, 3.4, False]: + flow = gf.Flow('g') + flow.add(g_1, g_2) + self.assertRaises((ValueError, TypeError), + flow.link, g_1, g_2, + decider=lambda history: False, + decider_depth=not_a_depth) def test_graph_flow_stringy(self): f = gf.Flow('test') diff --git a/taskflow/tests/unit/test_deciders.py b/taskflow/tests/unit/test_deciders.py new file mode 100644 index 0000000..8bfc154 --- /dev/null +++ b/taskflow/tests/unit/test_deciders.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow import deciders +from taskflow import test + + +class TestDeciders(test.TestCase): + def test_translate(self): + for val in ['all', 'ALL', 'aLL', deciders.Depth.ALL]: + self.assertEqual(deciders.Depth.ALL, + deciders.Depth.translate(val)) + for val in ['atom', 'ATOM', 'atOM', deciders.Depth.ATOM]: + self.assertEqual(deciders.Depth.ATOM, + deciders.Depth.translate(val)) + for val in ['neighbors', 'Neighbors', + 'NEIGHBORS', deciders.Depth.NEIGHBORS]: + self.assertEqual(deciders.Depth.NEIGHBORS, + deciders.Depth.translate(val)) + for val in ['flow', 'FLOW', 'flOW', deciders.Depth.FLOW]: + self.assertEqual(deciders.Depth.FLOW, + deciders.Depth.translate(val)) + + def test_bad_translate(self): + self.assertRaises(TypeError, deciders.Depth.translate, 3) + self.assertRaises(TypeError, deciders.Depth.translate, object()) + self.assertRaises(ValueError, deciders.Depth.translate, "stuff") + + def test_pick_widest(self): + choices = [deciders.Depth.ATOM, deciders.Depth.FLOW] + self.assertEqual(deciders.Depth.FLOW, deciders.pick_widest(choices)) + choices = [deciders.Depth.ATOM, deciders.Depth.FLOW, + deciders.Depth.ALL] + self.assertEqual(deciders.Depth.ALL, deciders.pick_widest(choices)) + choices = [deciders.Depth.ATOM, deciders.Depth.FLOW, + deciders.Depth.ALL, deciders.Depth.NEIGHBORS] + self.assertEqual(deciders.Depth.ALL, deciders.pick_widest(choices)) + choices = [deciders.Depth.ATOM, deciders.Depth.NEIGHBORS] + self.assertEqual(deciders.Depth.NEIGHBORS, + deciders.pick_widest(choices)) + + def test_bad_pick_widest(self): + self.assertRaises(ValueError, deciders.pick_widest, []) + self.assertRaises(ValueError, deciders.pick_widest, ["a"]) + self.assertRaises(ValueError, deciders.pick_widest, set(['b'])) diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 288afb1..cb59a3e 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -669,6 +669,94 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): self.assertNotIn('task2.t REVERTED(None)', capturer.values) +class EngineDeciderDepthTest(utils.EngineTestBase): + + def test_run_graph_flow_decider_various_depths(self): + sub_flow_1 = gf.Flow('g_1') + g_1_1 = utils.ProgressingTask(name='g_1-1') + sub_flow_1.add(g_1_1) + g_1 = utils.ProgressingTask(name='g-1') + g_2 = utils.ProgressingTask(name='g-2') + g_3 = utils.ProgressingTask(name='g-3') + g_4 = utils.ProgressingTask(name='g-4') + for a_depth, ran_how_many in [('all', 1), + ('atom', 4), + ('flow', 2), + ('neighbors', 3)]: + flow = gf.Flow('g') + flow.add(g_1, g_2, sub_flow_1, g_3, g_4) + flow.link(g_1, g_2, + decider=lambda history: False, + decider_depth=a_depth) + flow.link(g_2, sub_flow_1) + flow.link(g_2, g_3) + flow.link(g_3, g_4) + flow.link(g_1, sub_flow_1, + decider=lambda history: True, + decider_depth=a_depth) + e = self._make_engine(flow) + with utils.CaptureListener(e, capture_flow=False) as capturer: + e.run() + ran_tasks = 0 + for outcome in capturer.values: + if outcome.endswith("RUNNING"): + ran_tasks += 1 + self.assertEqual(ran_how_many, ran_tasks) + + def test_run_graph_flow_decider_jump_over_atom(self): + flow = gf.Flow('g') + a = utils.AddOneSameProvidesRequires("a", inject={'value': 0}) + b = utils.AddOneSameProvidesRequires("b") + c = utils.AddOneSameProvidesRequires("c") + flow.add(a, b, c, resolve_requires=False) + flow.link(a, b, decider=lambda history: False, + decider_depth='atom') + flow.link(b, c) + e = self._make_engine(flow) + e.run() + self.assertEqual(2, e.storage.get('c')) + self.assertEqual(states.IGNORE, e.storage.get_atom_state('b')) + + def test_run_graph_flow_decider_jump_over_bad_atom(self): + flow = gf.Flow('g') + a = utils.NoopTask("a") + b = utils.FailingTask("b") + c = utils.NoopTask("c") + flow.add(a, b, c) + flow.link(a, b, decider=lambda history: False, + decider_depth='atom') + flow.link(b, c) + e = self._make_engine(flow) + e.run() + + def test_run_graph_flow_decider_revert(self): + flow = gf.Flow('g') + a = utils.NoopTask("a") + b = utils.NoopTask("b") + c = utils.FailingTask("c") + flow.add(a, b, c) + flow.link(a, b, decider=lambda history: False, + decider_depth='atom') + flow.link(b, c) + e = self._make_engine(flow) + with utils.CaptureListener(e, capture_flow=False) as capturer: + # Wrapped failure here for WBE engine, make this better in + # the future, perhaps via a custom testtools matcher?? + self.assertRaises((RuntimeError, exc.WrappedFailure), e.run) + expected = [ + 'a.t RUNNING', + 'a.t SUCCESS(None)', + 'b.t IGNORE', + 'c.t RUNNING', + 'c.t FAILURE(Failure: RuntimeError: Woot!)', + 'c.t REVERTING', + 'c.t REVERTED(None)', + 'a.t REVERTING', + 'a.t REVERTED(None)', + ] + self.assertEqual(expected, capturer.values) + + class EngineGraphFlowTest(utils.EngineTestBase): def test_run_empty_graph_flow(self): @@ -1043,9 +1131,10 @@ class EngineGraphConditionalFlowTest(utils.EngineTestBase): 'task4.t IGNORE', ] self.assertEqual(expected, capturer.values) - self.assertEqual(1, len(histories)) - self.assertIn('task1', histories[0]) - self.assertIn('task2', histories[0]) + self.assertEqual(2, len(histories)) + for i in range(0, 2): + self.assertIn('task1', histories[i]) + self.assertIn('task2', histories[i]) def test_graph_flow_conditional(self): flow = gf.Flow('root') @@ -1249,14 +1338,15 @@ class SerialEngineTest(EngineTaskTest, EngineResetTests, EngineGraphConditionalFlowTest, EngineCheckingTaskTest, + EngineDeciderDepthTest, test.TestCase): def _make_engine(self, flow, - flow_detail=None, store=None): + flow_detail=None, store=None, **kwargs): return taskflow.engines.load(flow, flow_detail=flow_detail, engine='serial', backend=self.backend, - store=store) + store=store, **kwargs) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) @@ -1278,11 +1368,13 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, EngineMissingDepsTest, EngineGraphConditionalFlowTest, EngineCheckingTaskTest, + EngineDeciderDepthTest, test.TestCase): _EXECUTOR_WORKERS = 2 def _make_engine(self, flow, - flow_detail=None, executor=None, store=None): + flow_detail=None, executor=None, store=None, + **kwargs): if executor is None: executor = 'threads' return taskflow.engines.load(flow, flow_detail=flow_detail, @@ -1290,7 +1382,8 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, executor=executor, engine='parallel', store=store, - max_workers=self._EXECUTOR_WORKERS) + max_workers=self._EXECUTOR_WORKERS, + **kwargs) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) @@ -1319,17 +1412,19 @@ class ParallelEngineWithEventletTest(EngineTaskTest, EngineMissingDepsTest, EngineGraphConditionalFlowTest, EngineCheckingTaskTest, + EngineDeciderDepthTest, test.TestCase): def _make_engine(self, flow, - flow_detail=None, executor=None, store=None): + flow_detail=None, executor=None, store=None, + **kwargs): if executor is None: executor = futurist.GreenThreadPoolExecutor() self.addCleanup(executor.shutdown) return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', executor=executor, - store=store) + store=store, **kwargs) class ParallelEngineWithProcessTest(EngineTaskTest, @@ -1342,6 +1437,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest, EngineResetTests, EngineMissingDepsTest, EngineGraphConditionalFlowTest, + EngineDeciderDepthTest, test.TestCase): _EXECUTOR_WORKERS = 2 @@ -1350,7 +1446,8 @@ class ParallelEngineWithProcessTest(EngineTaskTest, self.assertIsInstance(engine, eng.ParallelActionEngine) def _make_engine(self, flow, - flow_detail=None, executor=None, store=None): + flow_detail=None, executor=None, store=None, + **kwargs): if executor is None: executor = 'processes' return taskflow.engines.load(flow, flow_detail=flow_detail, @@ -1358,7 +1455,8 @@ class ParallelEngineWithProcessTest(EngineTaskTest, engine='parallel', executor=executor, store=store, - max_workers=self._EXECUTOR_WORKERS) + max_workers=self._EXECUTOR_WORKERS, + **kwargs) class WorkerBasedEngineTest(EngineTaskTest, @@ -1371,6 +1469,7 @@ class WorkerBasedEngineTest(EngineTaskTest, EngineResetTests, EngineMissingDepsTest, EngineGraphConditionalFlowTest, + EngineDeciderDepthTest, test.TestCase): def setUp(self): super(WorkerBasedEngineTest, self).setUp() @@ -1415,10 +1514,11 @@ class WorkerBasedEngineTest(EngineTaskTest, super(WorkerBasedEngineTest, self).tearDown() def _make_engine(self, flow, - flow_detail=None, store=None): + flow_detail=None, store=None, **kwargs): + kwargs.update(self.engine_conf) return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, - store=store, **self.engine_conf) + store=store, **kwargs) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index cf43500..effc54b 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -244,24 +244,6 @@ class TestCountdownIter(test.TestCase): self.assertRaises(ValueError, six.next, it) -class TestLookFor(test.TestCase): - def test_no_matches(self): - hay = [9, 10, 11] - self.assertEqual([], misc.look_for(hay, [1, 2, 3])) - - def test_match_order(self): - hay = [6, 5, 4, 3, 2, 1] - priors = [] - for i in range(0, 6): - priors.append(i + 1) - matches = misc.look_for(hay, priors) - self.assertGreater(0, len(matches)) - self.assertIsSuperAndSubsequence(hay, matches) - hay = [10, 1, 15, 3, 5, 8, 44] - self.assertEqual([1, 15], misc.look_for(hay, [15, 1])) - self.assertEqual([10, 44], misc.look_for(hay, [44, 10])) - - class TestMergeUri(test.TestCase): def test_merge(self): url = "http://www.yahoo.com/?a=b&c=d" |
