summaryrefslogtreecommitdiff
path: root/taskflow/tests/unit
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-07-09 21:25:17 +0000
committerGerrit Code Review <review@openstack.org>2015-07-09 21:25:17 +0000
commit87c12603eb2455802c9baa53caa31fb12e11279e (patch)
tree8122352f887628c21207e3ae30c5f3b0749a1317 /taskflow/tests/unit
parentdf5fbe469c4c85c771bc0afcc5026d9790edc942 (diff)
parent2b827e1e363a90e4037172ccb0d57ac0873497fb (diff)
downloadtaskflow-87c12603eb2455802c9baa53caa31fb12e11279e.tar.gz
Merge "Add support for conditional execution"
Diffstat (limited to 'taskflow/tests/unit')
-rw-r--r--taskflow/tests/unit/test_engines.py127
1 files changed, 127 insertions, 0 deletions
diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py
index 4e38dfa..ed073e6 100644
--- a/taskflow/tests/unit/test_engines.py
+++ b/taskflow/tests/unit/test_engines.py
@@ -15,7 +15,9 @@
# under the License.
import contextlib
+import functools
+import six
import testtools
import taskflow.engines
@@ -772,6 +774,126 @@ class EngineMissingDepsTest(utils.EngineTestBase):
self.assertIsNotNone(c_e.cause)
+class EngineGraphConditionalFlowTest(utils.EngineTestBase):
+
+ def test_graph_flow_conditional(self):
+ flow = gf.Flow('root')
+
+ task1 = utils.ProgressingTask(name='task1')
+ task2 = utils.ProgressingTask(name='task2')
+ task2_2 = utils.ProgressingTask(name='task2_2')
+ task3 = utils.ProgressingTask(name='task3')
+
+ flow.add(task1, task2, task2_2, task3)
+ flow.link(task1, task2, decider=lambda history: False)
+ flow.link(task2, task2_2)
+ flow.link(task1, task3, decider=lambda history: True)
+
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+
+ expected = set([
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+
+ 'task2.t IGNORE',
+ 'task2_2.t IGNORE',
+
+ 'task3.t RUNNING',
+ 'task3.t SUCCESS(5)',
+ ])
+ self.assertEqual(expected, set(capturer.values))
+
+ def test_graph_flow_diamond_ignored(self):
+ flow = gf.Flow('root')
+
+ task1 = utils.ProgressingTask(name='task1')
+ task2 = utils.ProgressingTask(name='task2')
+ task3 = utils.ProgressingTask(name='task3')
+ task4 = utils.ProgressingTask(name='task4')
+
+ flow.add(task1, task2, task3, task4)
+ flow.link(task1, task2)
+ flow.link(task2, task4, decider=lambda history: False)
+ flow.link(task1, task3)
+ flow.link(task3, task4, decider=lambda history: True)
+
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+
+ expected = set([
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+
+ 'task2.t RUNNING',
+ 'task2.t SUCCESS(5)',
+
+ 'task3.t RUNNING',
+ 'task3.t SUCCESS(5)',
+
+ 'task4.t IGNORE',
+ ])
+ self.assertEqual(expected, set(capturer.values))
+ self.assertEqual(states.IGNORE,
+ engine.storage.get_atom_state('task4'))
+ self.assertEqual(states.IGNORE,
+ engine.storage.get_atom_intention('task4'))
+
+ def test_graph_flow_conditional_history(self):
+
+ def even_odd_decider(history, allowed):
+ total = sum(six.itervalues(history))
+ if total == allowed:
+ return True
+ return False
+
+ flow = gf.Flow('root')
+
+ task1 = utils.TaskMultiArgOneReturn(name='task1')
+ task2 = utils.ProgressingTask(name='task2')
+ task2_2 = utils.ProgressingTask(name='task2_2')
+ task3 = utils.ProgressingTask(name='task3')
+ task3_3 = utils.ProgressingTask(name='task3_3')
+
+ flow.add(task1, task2, task2_2, task3, task3_3)
+ flow.link(task1, task2,
+ decider=functools.partial(even_odd_decider, allowed=2))
+ flow.link(task2, task2_2)
+
+ flow.link(task1, task3,
+ decider=functools.partial(even_odd_decider, allowed=1))
+ flow.link(task3, task3_3)
+
+ engine = self._make_engine(flow)
+ engine.storage.inject({'x': 0, 'y': 1, 'z': 1})
+
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+
+ expected = set([
+ 'task1.t RUNNING', 'task1.t SUCCESS(2)',
+ 'task3.t IGNORE', 'task3_3.t IGNORE',
+ 'task2.t RUNNING', 'task2.t SUCCESS(5)',
+ 'task2_2.t RUNNING', 'task2_2.t SUCCESS(5)',
+ ])
+ self.assertEqual(expected, set(capturer.values))
+
+ engine = self._make_engine(flow)
+ engine.storage.inject({'x': 0, 'y': 0, 'z': 1})
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+
+ expected = set([
+ 'task1.t RUNNING', 'task1.t SUCCESS(1)',
+ 'task2.t IGNORE', 'task2_2.t IGNORE',
+ 'task3.t RUNNING', 'task3.t SUCCESS(5)',
+ 'task3_3.t RUNNING', 'task3_3.t SUCCESS(5)',
+ ])
+ self.assertEqual(expected, set(capturer.values))
+
+
class EngineCheckingTaskTest(utils.EngineTestBase):
# FIXME: this test uses a inner class that workers/process engines can't
# get to, so we need to do something better to make this test useful for
@@ -805,6 +927,7 @@ class SerialEngineTest(EngineTaskTest,
EngineOptionalRequirementsTest,
EngineGraphFlowTest,
EngineMissingDepsTest,
+ EngineGraphConditionalFlowTest,
EngineCheckingTaskTest,
test.TestCase):
def _make_engine(self, flow,
@@ -832,6 +955,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
EngineOptionalRequirementsTest,
EngineGraphFlowTest,
EngineMissingDepsTest,
+ EngineGraphConditionalFlowTest,
EngineCheckingTaskTest,
test.TestCase):
_EXECUTOR_WORKERS = 2
@@ -871,6 +995,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
EngineOptionalRequirementsTest,
EngineGraphFlowTest,
EngineMissingDepsTest,
+ EngineGraphConditionalFlowTest,
EngineCheckingTaskTest,
test.TestCase):
@@ -893,6 +1018,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
EngineOptionalRequirementsTest,
EngineGraphFlowTest,
EngineMissingDepsTest,
+ EngineGraphConditionalFlowTest,
test.TestCase):
_EXECUTOR_WORKERS = 2
@@ -920,6 +1046,7 @@ class WorkerBasedEngineTest(EngineTaskTest,
EngineOptionalRequirementsTest,
EngineGraphFlowTest,
EngineMissingDepsTest,
+ EngineGraphConditionalFlowTest,
test.TestCase):
def setUp(self):
super(WorkerBasedEngineTest, self).setUp()