diff options
| author | Jenkins <jenkins@review.openstack.org> | 2015-07-09 21:25:17 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2015-07-09 21:25:17 +0000 |
| commit | 87c12603eb2455802c9baa53caa31fb12e11279e (patch) | |
| tree | 8122352f887628c21207e3ae30c5f3b0749a1317 /taskflow/tests/unit | |
| parent | df5fbe469c4c85c771bc0afcc5026d9790edc942 (diff) | |
| parent | 2b827e1e363a90e4037172ccb0d57ac0873497fb (diff) | |
| download | taskflow-87c12603eb2455802c9baa53caa31fb12e11279e.tar.gz | |
Merge "Add support for conditional execution"
Diffstat (limited to 'taskflow/tests/unit')
| -rw-r--r-- | taskflow/tests/unit/test_engines.py | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 4e38dfa..ed073e6 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -15,7 +15,9 @@ # under the License. import contextlib +import functools +import six import testtools import taskflow.engines @@ -772,6 +774,126 @@ class EngineMissingDepsTest(utils.EngineTestBase): self.assertIsNotNone(c_e.cause) +class EngineGraphConditionalFlowTest(utils.EngineTestBase): + + def test_graph_flow_conditional(self): + flow = gf.Flow('root') + + task1 = utils.ProgressingTask(name='task1') + task2 = utils.ProgressingTask(name='task2') + task2_2 = utils.ProgressingTask(name='task2_2') + task3 = utils.ProgressingTask(name='task3') + + flow.add(task1, task2, task2_2, task3) + flow.link(task1, task2, decider=lambda history: False) + flow.link(task2, task2_2) + flow.link(task1, task3, decider=lambda history: True) + + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + + expected = set([ + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + + 'task2.t IGNORE', + 'task2_2.t IGNORE', + + 'task3.t RUNNING', + 'task3.t SUCCESS(5)', + ]) + self.assertEqual(expected, set(capturer.values)) + + def test_graph_flow_diamond_ignored(self): + flow = gf.Flow('root') + + task1 = utils.ProgressingTask(name='task1') + task2 = utils.ProgressingTask(name='task2') + task3 = utils.ProgressingTask(name='task3') + task4 = utils.ProgressingTask(name='task4') + + flow.add(task1, task2, task3, task4) + flow.link(task1, task2) + flow.link(task2, task4, decider=lambda history: False) + flow.link(task1, task3) + flow.link(task3, task4, decider=lambda history: True) + + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + + expected = set([ + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + + 'task2.t RUNNING', + 'task2.t SUCCESS(5)', + + 'task3.t RUNNING', + 'task3.t SUCCESS(5)', + + 'task4.t IGNORE', + ]) + self.assertEqual(expected, set(capturer.values)) + self.assertEqual(states.IGNORE, + engine.storage.get_atom_state('task4')) + self.assertEqual(states.IGNORE, + engine.storage.get_atom_intention('task4')) + + def test_graph_flow_conditional_history(self): + + def even_odd_decider(history, allowed): + total = sum(six.itervalues(history)) + if total == allowed: + return True + return False + + flow = gf.Flow('root') + + task1 = utils.TaskMultiArgOneReturn(name='task1') + task2 = utils.ProgressingTask(name='task2') + task2_2 = utils.ProgressingTask(name='task2_2') + task3 = utils.ProgressingTask(name='task3') + task3_3 = utils.ProgressingTask(name='task3_3') + + flow.add(task1, task2, task2_2, task3, task3_3) + flow.link(task1, task2, + decider=functools.partial(even_odd_decider, allowed=2)) + flow.link(task2, task2_2) + + flow.link(task1, task3, + decider=functools.partial(even_odd_decider, allowed=1)) + flow.link(task3, task3_3) + + engine = self._make_engine(flow) + engine.storage.inject({'x': 0, 'y': 1, 'z': 1}) + + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + + expected = set([ + 'task1.t RUNNING', 'task1.t SUCCESS(2)', + 'task3.t IGNORE', 'task3_3.t IGNORE', + 'task2.t RUNNING', 'task2.t SUCCESS(5)', + 'task2_2.t RUNNING', 'task2_2.t SUCCESS(5)', + ]) + self.assertEqual(expected, set(capturer.values)) + + engine = self._make_engine(flow) + engine.storage.inject({'x': 0, 'y': 0, 'z': 1}) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + + expected = set([ + 'task1.t RUNNING', 'task1.t SUCCESS(1)', + 'task2.t IGNORE', 'task2_2.t IGNORE', + 'task3.t RUNNING', 'task3.t SUCCESS(5)', + 'task3_3.t RUNNING', 'task3_3.t SUCCESS(5)', + ]) + self.assertEqual(expected, set(capturer.values)) + + class EngineCheckingTaskTest(utils.EngineTestBase): # FIXME: this test uses a inner class that workers/process engines can't # get to, so we need to do something better to make this test useful for @@ -805,6 +927,7 @@ class SerialEngineTest(EngineTaskTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineMissingDepsTest, + EngineGraphConditionalFlowTest, EngineCheckingTaskTest, test.TestCase): def _make_engine(self, flow, @@ -832,6 +955,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineMissingDepsTest, + EngineGraphConditionalFlowTest, EngineCheckingTaskTest, test.TestCase): _EXECUTOR_WORKERS = 2 @@ -871,6 +995,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineMissingDepsTest, + EngineGraphConditionalFlowTest, EngineCheckingTaskTest, test.TestCase): @@ -893,6 +1018,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineMissingDepsTest, + EngineGraphConditionalFlowTest, test.TestCase): _EXECUTOR_WORKERS = 2 @@ -920,6 +1046,7 @@ class WorkerBasedEngineTest(EngineTaskTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineMissingDepsTest, + EngineGraphConditionalFlowTest, test.TestCase): def setUp(self): super(WorkerBasedEngineTest, self).setUp() |
