summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Hill <greg.hill@rackspace.com>2015-10-28 09:07:58 -0500
committerGreg Hill <greg.hill@rackspace.com>2016-01-25 15:04:34 -0600
commit5ce07b2de15cbbd417748edd3fac12a166152aea (patch)
tree3db2c372c10f5a5b7eaef32d804d428642af7105
parent31764bfb9646e8ede4d0c6ef75888e9f33cb03e4 (diff)
downloadtaskflow-5ce07b2de15cbbd417748edd3fac12a166152aea.tar.gz
Retrieve the store from flowdetails as well, if it exists
Gives users a more permanent way to provide an initial set of arguments to a flow. Change-Id: Ib9c3d60882548120d467a645bbac9be78408bac3 Implements: blueprint flow-details-keep-store
-rw-r--r--doc/source/jobs.rst60
-rw-r--r--taskflow/conductors/base.py10
-rw-r--r--taskflow/tests/unit/test_conductors.py137
-rw-r--r--taskflow/utils/persistence_utils.py11
4 files changed, 213 insertions, 5 deletions
diff --git a/doc/source/jobs.rst b/doc/source/jobs.rst
index 6d0be30..55d343f 100644
--- a/doc/source/jobs.rst
+++ b/doc/source/jobs.rst
@@ -175,6 +175,66 @@ might look like:
time.sleep(coffee_break_time)
...
+There are a few ways to provide arguments to the flow. The first option is to
+add a ``store`` to the flowdetail object in the
+:py:class:`logbook <taskflow.persistence.models.LogBook>`.
+
+You can also provide a ``store`` in the
+:py:class:`job <taskflow.jobs.base.Job>` itself when posting it to the
+job board. If both ``store`` values are found, they will be combined,
+with the :py:class:`job <taskflow.jobs.base.Job>` ``store``
+overriding the :py:class:`logbook <taskflow.persistence.models.LogBook>`
+``store``.
+
+.. code-block:: python
+
+ import uuid
+
+ from taskflow import engines
+ from taskflow.persistence import backends as persistence_backends
+ from taskflow.persistence import models
+ from taskflow.jobs import backends as job_backends
+
+
+ ...
+ persistence = persistence_backends.fetch({
+ "connection': "mysql",
+ "user": ...,
+ "password": ...,
+ })
+ board = job_backends.fetch('my-board', {
+ "board": "zookeeper",
+ }, persistence=persistence)
+
+ book = models.LogBook('my-book', uuid.uuid4())
+
+ flow_detail = models.FlowDetail('my-job', uuid.uuid4())
+ book.add(flow_detail)
+
+ connection = persistence.get_connection()
+ connection.save_logbook(book)
+
+ flow_detail.meta['store'] = {'a': 1, 'c': 3}
+
+ job_details = {
+ "flow_uuid": flow_detail.uuid,
+ "store": {'a': 2, 'b': 1}
+ }
+
+ engines.save_factory_details(flow_detail, flow_factory,
+ factory_args=[],
+ factory_kwargs={},
+ backend=persistence)
+
+ jobboard = get_jobboard(zk_client)
+ jobboard.connect()
+ job = jobboard.post('my-job', book=book, details=job_details)
+
+ # the flow global parameters are now the combined store values
+ # {'a': 2, 'b': 1', 'c': 3}
+ ...
+
+
Types
=====
diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py
index 6cf8485..1f0da02 100644
--- a/taskflow/conductors/base.py
+++ b/taskflow/conductors/base.py
@@ -112,10 +112,14 @@ class Conductor(object):
def _engine_from_job(self, job):
"""Extracts an engine from a job (via some manner)."""
flow_detail = self._flow_detail_from_job(job)
+ store = {}
+
+ if flow_detail.meta and 'store' in flow_detail.meta:
+ store.update(flow_detail.meta['store'])
+
if job.details and 'store' in job.details:
- store = dict(job.details["store"])
- else:
- store = {}
+ store.update(job.details["store"])
+
engine = engines.load_from_detail(flow_detail, store=store,
engine=self._engine,
backend=self._persistence,
diff --git a/taskflow/tests/unit/test_conductors.py b/taskflow/tests/unit/test_conductors.py
index 9fa46f9..6177f26 100644
--- a/taskflow/tests/unit/test_conductors.py
+++ b/taskflow/tests/unit/test_conductors.py
@@ -53,6 +53,12 @@ def test_factory(blowup):
return f
+def test_store_factory():
+ f = lf.Flow("test")
+ f.add(test_utils.TaskMultiArg('task1'))
+ return f
+
+
def single_factory():
return futurist.ThreadPoolExecutor(max_workers=1)
@@ -229,6 +235,137 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
self.assertIsNotNone(fd)
self.assertEqual(st.REVERTED, fd.state)
+ def test_missing_store(self):
+ components = self.make_components()
+ components.conductor.connect()
+ consumed_event = threading.Event()
+
+ def on_consume(state, details):
+ consumed_event.set()
+
+ components.board.notifier.register(base.REMOVAL, on_consume)
+ with close_many(components.conductor, components.client):
+ t = threading_utils.daemon_thread(components.conductor.run)
+ t.start()
+ lb, fd = pu.temporary_flow_detail(components.persistence)
+ engines.save_factory_details(fd, test_store_factory,
+ [], {},
+ backend=components.persistence)
+ components.board.post('poke', lb,
+ details={'flow_uuid': fd.uuid})
+ self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
+ components.conductor.stop()
+ self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
+ self.assertFalse(components.conductor.dispatching)
+
+ persistence = components.persistence
+ with contextlib.closing(persistence.get_connection()) as conn:
+ lb = conn.get_logbook(lb.uuid)
+ fd = lb.find(fd.uuid)
+ self.assertIsNotNone(fd)
+ self.assertIsNone(fd.state)
+
+ def test_job_store(self):
+ components = self.make_components()
+ components.conductor.connect()
+ consumed_event = threading.Event()
+
+ def on_consume(state, details):
+ consumed_event.set()
+
+ store = {'x': True, 'y': False, 'z': None}
+
+ components.board.notifier.register(base.REMOVAL, on_consume)
+ with close_many(components.conductor, components.client):
+ t = threading_utils.daemon_thread(components.conductor.run)
+ t.start()
+ lb, fd = pu.temporary_flow_detail(components.persistence)
+ engines.save_factory_details(fd, test_store_factory,
+ [], {},
+ backend=components.persistence)
+ components.board.post('poke', lb,
+ details={'flow_uuid': fd.uuid,
+ 'store': store})
+ self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
+ components.conductor.stop()
+ self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
+ self.assertFalse(components.conductor.dispatching)
+
+ persistence = components.persistence
+ with contextlib.closing(persistence.get_connection()) as conn:
+ lb = conn.get_logbook(lb.uuid)
+ fd = lb.find(fd.uuid)
+ self.assertIsNotNone(fd)
+ self.assertEqual(st.SUCCESS, fd.state)
+
+ def test_flowdetails_store(self):
+ components = self.make_components()
+ components.conductor.connect()
+ consumed_event = threading.Event()
+
+ def on_consume(state, details):
+ consumed_event.set()
+
+ store = {'x': True, 'y': False, 'z': None}
+
+ components.board.notifier.register(base.REMOVAL, on_consume)
+ with close_many(components.conductor, components.client):
+ t = threading_utils.daemon_thread(components.conductor.run)
+ t.start()
+ lb, fd = pu.temporary_flow_detail(components.persistence,
+ meta={'store': store})
+ engines.save_factory_details(fd, test_store_factory,
+ [], {},
+ backend=components.persistence)
+ components.board.post('poke', lb,
+ details={'flow_uuid': fd.uuid})
+ self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
+ components.conductor.stop()
+ self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
+ self.assertFalse(components.conductor.dispatching)
+
+ persistence = components.persistence
+ with contextlib.closing(persistence.get_connection()) as conn:
+ lb = conn.get_logbook(lb.uuid)
+ fd = lb.find(fd.uuid)
+ self.assertIsNotNone(fd)
+ self.assertEqual(st.SUCCESS, fd.state)
+
+ def test_combined_store(self):
+ components = self.make_components()
+ components.conductor.connect()
+ consumed_event = threading.Event()
+
+ def on_consume(state, details):
+ consumed_event.set()
+
+ flow_store = {'x': True, 'y': False}
+ job_store = {'z': None}
+
+ components.board.notifier.register(base.REMOVAL, on_consume)
+ with close_many(components.conductor, components.client):
+ t = threading_utils.daemon_thread(components.conductor.run)
+ t.start()
+ lb, fd = pu.temporary_flow_detail(components.persistence,
+ meta={'store': flow_store})
+ engines.save_factory_details(fd, test_store_factory,
+ [], {},
+ backend=components.persistence)
+ components.board.post('poke', lb,
+ details={'flow_uuid': fd.uuid,
+ 'store': job_store})
+ self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
+ components.conductor.stop()
+ self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
+ self.assertFalse(components.conductor.dispatching)
+
+ persistence = components.persistence
+ with contextlib.closing(persistence.get_connection()) as conn:
+ lb = conn.get_logbook(lb.uuid)
+ fd = lb.find(fd.uuid)
+ self.assertIsNotNone(fd)
+ self.assertEqual(st.SUCCESS, fd.state)
+
class NonBlockingExecutorTest(test.TestCase):
def test_bad_wait_timeout(self):
diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py
index 1d4dc26..acdf4c1 100644
--- a/taskflow/utils/persistence_utils.py
+++ b/taskflow/utils/persistence_utils.py
@@ -37,7 +37,7 @@ def temporary_log_book(backend=None):
return book
-def temporary_flow_detail(backend=None):
+def temporary_flow_detail(backend=None, meta=None):
"""Creates a temporary flow detail and logbook in the given backend.
Mainly useful for tests and other use cases where a temporary flow detail
@@ -45,7 +45,14 @@ def temporary_flow_detail(backend=None):
"""
flow_id = uuidutils.generate_uuid()
book = temporary_log_book(backend)
- book.add(models.FlowDetail(name='tmp-flow-detail', uuid=flow_id))
+
+ flow_detail = models.FlowDetail(name='tmp-flow-detail', uuid=flow_id)
+ if meta is not None:
+ if flow_detail.meta is None:
+ flow_detail.meta = {}
+ flow_detail.meta.update(meta)
+ book.add(flow_detail)
+
if backend is not None:
with contextlib.closing(backend.get_connection()) as conn:
conn.save_logbook(book)