diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-01-26 03:12:02 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-01-26 03:12:02 +0000 |
commit | f9a2d78bd8dd5d5d0698a89474fab14f60ddd77f (patch) | |
tree | 9d1074a3ebeb3893f29d61c2bd53750d9d1dc89f | |
parent | 179eac734db25cdb62a3fe401b5cd02242dd5319 (diff) | |
parent | 5ce07b2de15cbbd417748edd3fac12a166152aea (diff) | |
download | taskflow-f9a2d78bd8dd5d5d0698a89474fab14f60ddd77f.tar.gz |
Merge "Retrieve the store from flowdetails as well, if it exists"
-rw-r--r-- | doc/source/jobs.rst | 60 | ||||
-rw-r--r-- | taskflow/conductors/base.py | 10 | ||||
-rw-r--r-- | taskflow/tests/unit/test_conductors.py | 137 | ||||
-rw-r--r-- | taskflow/utils/persistence_utils.py | 11 |
4 files changed, 213 insertions, 5 deletions
diff --git a/doc/source/jobs.rst b/doc/source/jobs.rst index 6d0be30..55d343f 100644 --- a/doc/source/jobs.rst +++ b/doc/source/jobs.rst @@ -175,6 +175,66 @@ might look like: time.sleep(coffee_break_time) ... +There are a few ways to provide arguments to the flow. The first option is to +add a ``store`` to the flowdetail object in the +:py:class:`logbook <taskflow.persistence.models.LogBook>`. + +You can also provide a ``store`` in the +:py:class:`job <taskflow.jobs.base.Job>` itself when posting it to the +job board. If both ``store`` values are found, they will be combined, +with the :py:class:`job <taskflow.jobs.base.Job>` ``store`` +overriding the :py:class:`logbook <taskflow.persistence.models.LogBook>` +``store``. + +.. code-block:: python + + import uuid + + from taskflow import engines + from taskflow.persistence import backends as persistence_backends + from taskflow.persistence import models + from taskflow.jobs import backends as job_backends + + + ... + persistence = persistence_backends.fetch({ + "connection': "mysql", + "user": ..., + "password": ..., + }) + board = job_backends.fetch('my-board', { + "board": "zookeeper", + }, persistence=persistence) + + book = models.LogBook('my-book', uuid.uuid4()) + + flow_detail = models.FlowDetail('my-job', uuid.uuid4()) + book.add(flow_detail) + + connection = persistence.get_connection() + connection.save_logbook(book) + + flow_detail.meta['store'] = {'a': 1, 'c': 3} + + job_details = { + "flow_uuid": flow_detail.uuid, + "store": {'a': 2, 'b': 1} + } + + engines.save_factory_details(flow_detail, flow_factory, + factory_args=[], + factory_kwargs={}, + backend=persistence) + + jobboard = get_jobboard(zk_client) + jobboard.connect() + job = jobboard.post('my-job', book=book, details=job_details) + + # the flow global parameters are now the combined store values + # {'a': 2, 'b': 1', 'c': 3} + ... + + Types ===== diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index 6cf8485..1f0da02 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -112,10 +112,14 @@ class Conductor(object): def _engine_from_job(self, job): """Extracts an engine from a job (via some manner).""" flow_detail = self._flow_detail_from_job(job) + store = {} + + if flow_detail.meta and 'store' in flow_detail.meta: + store.update(flow_detail.meta['store']) + if job.details and 'store' in job.details: - store = dict(job.details["store"]) - else: - store = {} + store.update(job.details["store"]) + engine = engines.load_from_detail(flow_detail, store=store, engine=self._engine, backend=self._persistence, diff --git a/taskflow/tests/unit/test_conductors.py b/taskflow/tests/unit/test_conductors.py index 9fa46f9..6177f26 100644 --- a/taskflow/tests/unit/test_conductors.py +++ b/taskflow/tests/unit/test_conductors.py @@ -53,6 +53,12 @@ def test_factory(blowup): return f +def test_store_factory(): + f = lf.Flow("test") + f.add(test_utils.TaskMultiArg('task1')) + return f + + def single_factory(): return futurist.ThreadPoolExecutor(max_workers=1) @@ -229,6 +235,137 @@ class ManyConductorTest(testscenarios.TestWithScenarios, self.assertIsNotNone(fd) self.assertEqual(st.REVERTED, fd.state) + def test_missing_store(self): + components = self.make_components() + components.conductor.connect() + consumed_event = threading.Event() + + def on_consume(state, details): + consumed_event.set() + + components.board.notifier.register(base.REMOVAL, on_consume) + with close_many(components.conductor, components.client): + t = threading_utils.daemon_thread(components.conductor.run) + t.start() + lb, fd = pu.temporary_flow_detail(components.persistence) + engines.save_factory_details(fd, test_store_factory, + [], {}, + backend=components.persistence) + components.board.post('poke', lb, + details={'flow_uuid': fd.uuid}) + self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + components.conductor.stop() + self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) + self.assertFalse(components.conductor.dispatching) + + persistence = components.persistence + with contextlib.closing(persistence.get_connection()) as conn: + lb = conn.get_logbook(lb.uuid) + fd = lb.find(fd.uuid) + self.assertIsNotNone(fd) + self.assertIsNone(fd.state) + + def test_job_store(self): + components = self.make_components() + components.conductor.connect() + consumed_event = threading.Event() + + def on_consume(state, details): + consumed_event.set() + + store = {'x': True, 'y': False, 'z': None} + + components.board.notifier.register(base.REMOVAL, on_consume) + with close_many(components.conductor, components.client): + t = threading_utils.daemon_thread(components.conductor.run) + t.start() + lb, fd = pu.temporary_flow_detail(components.persistence) + engines.save_factory_details(fd, test_store_factory, + [], {}, + backend=components.persistence) + components.board.post('poke', lb, + details={'flow_uuid': fd.uuid, + 'store': store}) + self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + components.conductor.stop() + self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) + self.assertFalse(components.conductor.dispatching) + + persistence = components.persistence + with contextlib.closing(persistence.get_connection()) as conn: + lb = conn.get_logbook(lb.uuid) + fd = lb.find(fd.uuid) + self.assertIsNotNone(fd) + self.assertEqual(st.SUCCESS, fd.state) + + def test_flowdetails_store(self): + components = self.make_components() + components.conductor.connect() + consumed_event = threading.Event() + + def on_consume(state, details): + consumed_event.set() + + store = {'x': True, 'y': False, 'z': None} + + components.board.notifier.register(base.REMOVAL, on_consume) + with close_many(components.conductor, components.client): + t = threading_utils.daemon_thread(components.conductor.run) + t.start() + lb, fd = pu.temporary_flow_detail(components.persistence, + meta={'store': store}) + engines.save_factory_details(fd, test_store_factory, + [], {}, + backend=components.persistence) + components.board.post('poke', lb, + details={'flow_uuid': fd.uuid}) + self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + components.conductor.stop() + self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) + self.assertFalse(components.conductor.dispatching) + + persistence = components.persistence + with contextlib.closing(persistence.get_connection()) as conn: + lb = conn.get_logbook(lb.uuid) + fd = lb.find(fd.uuid) + self.assertIsNotNone(fd) + self.assertEqual(st.SUCCESS, fd.state) + + def test_combined_store(self): + components = self.make_components() + components.conductor.connect() + consumed_event = threading.Event() + + def on_consume(state, details): + consumed_event.set() + + flow_store = {'x': True, 'y': False} + job_store = {'z': None} + + components.board.notifier.register(base.REMOVAL, on_consume) + with close_many(components.conductor, components.client): + t = threading_utils.daemon_thread(components.conductor.run) + t.start() + lb, fd = pu.temporary_flow_detail(components.persistence, + meta={'store': flow_store}) + engines.save_factory_details(fd, test_store_factory, + [], {}, + backend=components.persistence) + components.board.post('poke', lb, + details={'flow_uuid': fd.uuid, + 'store': job_store}) + self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + components.conductor.stop() + self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) + self.assertFalse(components.conductor.dispatching) + + persistence = components.persistence + with contextlib.closing(persistence.get_connection()) as conn: + lb = conn.get_logbook(lb.uuid) + fd = lb.find(fd.uuid) + self.assertIsNotNone(fd) + self.assertEqual(st.SUCCESS, fd.state) + class NonBlockingExecutorTest(test.TestCase): def test_bad_wait_timeout(self): diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py index 1d4dc26..acdf4c1 100644 --- a/taskflow/utils/persistence_utils.py +++ b/taskflow/utils/persistence_utils.py @@ -37,7 +37,7 @@ def temporary_log_book(backend=None): return book -def temporary_flow_detail(backend=None): +def temporary_flow_detail(backend=None, meta=None): """Creates a temporary flow detail and logbook in the given backend. Mainly useful for tests and other use cases where a temporary flow detail @@ -45,7 +45,14 @@ def temporary_flow_detail(backend=None): """ flow_id = uuidutils.generate_uuid() book = temporary_log_book(backend) - book.add(models.FlowDetail(name='tmp-flow-detail', uuid=flow_id)) + + flow_detail = models.FlowDetail(name='tmp-flow-detail', uuid=flow_id) + if meta is not None: + if flow_detail.meta is None: + flow_detail.meta = {} + flow_detail.meta.update(meta) + book.add(flow_detail) + if backend is not None: with contextlib.closing(backend.get_connection()) as conn: conn.save_logbook(book) |