summaryrefslogtreecommitdiff
path: root/taskflow/examples
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-10-17 19:27:19 -0700
committerJoshua Harlow <harlowja@gmail.com>2015-04-20 21:48:32 -0700
commit3bbbcc6842df5e12e10a17914118d9278738c465 (patch)
treec887f2edbac0371789ff13462a2a249b55cfe828 /taskflow/examples
parentf734467ddc3cebca5b88c6df37fbe3743aa04ca0 (diff)
downloadtaskflow-3bbbcc6842df5e12e10a17914118d9278738c465.tar.gz
Add a conductor running example
Create an example which can be extended to create a simplistic review checkout, tox running system which can be used to run some set of actions on every review posted. This could be expanded and connected into a gerrit pipeline to create a mini-jenkins like trigger/build/result system. Part of ongoing blueprint more-examples Change-Id: I5cf1bf02eeddf897ac7f098f1d73377f262a267b
Diffstat (limited to 'taskflow/examples')
-rw-r--r--taskflow/examples/tox_conductor.py243
1 files changed, 243 insertions, 0 deletions
diff --git a/taskflow/examples/tox_conductor.py b/taskflow/examples/tox_conductor.py
new file mode 100644
index 0000000..feff424
--- /dev/null
+++ b/taskflow/examples/tox_conductor.py
@@ -0,0 +1,243 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import itertools
+import logging
+import os
+import shutil
+import socket
+import sys
+import tempfile
+import threading
+import time
+
+logging.basicConfig(level=logging.ERROR)
+
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
+
+from oslo_utils import timeutils
+from oslo_utils import uuidutils
+import six
+from zake import fake_client
+
+from taskflow.conductors import backends as conductors
+from taskflow import engines
+from taskflow.jobs import backends as boards
+from taskflow.patterns import linear_flow
+from taskflow.persistence import backends as persistence
+from taskflow.persistence import logbook
+from taskflow import task
+from taskflow.utils import threading_utils
+
+# INTRO: This examples shows how a worker/producer can post desired work (jobs)
+# to a jobboard and a conductor can consume that work (jobs) from that jobboard
+# and execute those jobs in a reliable & async manner (for example, if the
+# conductor were to crash then the job will be released back onto the jobboard
+# and another conductor can attempt to finish it, from wherever that job last
+# left off).
+#
+# In this example a in-memory jobboard (and in-memory storage) is created and
+# used that simulates how this would be done at a larger scale (it is an
+# example after all).
+
+# Restrict how long this example runs for...
+RUN_TIME = 5
+REVIEW_CREATION_DELAY = 0.5
+SCAN_DELAY = 0.1
+NAME = "%s_%s" % (socket.getfqdn(), os.getpid())
+
+# This won't really use zookeeper but will use a local version of it using
+# the zake library that mimics an actual zookeeper cluster using threads and
+# an in-memory data structure.
+JOBBOARD_CONF = {
+ 'board': 'zookeeper://localhost?path=/taskflow/tox/jobs',
+}
+
+
+class RunReview(task.Task):
+ # A dummy task that clones the review and runs tox...
+
+ def _clone_review(self, review, temp_dir):
+ print("Cloning review '%s' into %s" % (review['id'], temp_dir))
+
+ def _run_tox(self, temp_dir):
+ print("Running tox in %s" % temp_dir)
+
+ def execute(self, review, temp_dir):
+ self._clone_review(review, temp_dir)
+ self._run_tox(temp_dir)
+
+
+class MakeTempDir(task.Task):
+ # A task that creates and destroys a temporary dir (on failure).
+ #
+ # It provides the location of the temporary dir for other tasks to use
+ # as they see fit.
+
+ default_provides = 'temp_dir'
+
+ def execute(self):
+ return tempfile.mkdtemp()
+
+ def revert(self, *args, **kwargs):
+ temp_dir = kwargs.get(task.REVERT_RESULT)
+ if temp_dir:
+ shutil.rmtree(temp_dir)
+
+
+class CleanResources(task.Task):
+ # A task that cleans up any workflow resources.
+
+ def execute(self, temp_dir):
+ print("Removing %s" % temp_dir)
+ shutil.rmtree(temp_dir)
+
+
+def review_iter():
+ """Makes reviews (never-ending iterator/generator)."""
+ review_id_gen = itertools.count(0)
+ while True:
+ review_id = six.next(review_id_gen)
+ review = {
+ 'id': review_id,
+ }
+ yield review
+
+
+# The reason this is at the module namespace level is important, since it must
+# be accessible from a conductor dispatching an engine, if it was a lambda
+# function for example, it would not be reimportable and the conductor would
+# be unable to reference it when creating the workflow to run.
+def create_review_workflow():
+ """Factory method used to create a review workflow to run."""
+ f = linear_flow.Flow("tester")
+ f.add(
+ MakeTempDir(name="maker"),
+ RunReview(name="runner"),
+ CleanResources(name="cleaner")
+ )
+ return f
+
+
+def generate_reviewer(client, saver, name=NAME):
+ """Creates a review producer thread with the given name prefix."""
+ real_name = "%s_reviewer" % name
+ no_more = threading.Event()
+ jb = boards.fetch(real_name, JOBBOARD_CONF,
+ client=client, persistence=saver)
+
+ def make_save_book(saver, review_id):
+ # Record what we want to happen (sometime in the future).
+ book = logbook.LogBook("book_%s" % review_id)
+ detail = logbook.FlowDetail("flow_%s" % review_id,
+ uuidutils.generate_uuid())
+ book.add(detail)
+ # Associate the factory method we want to be called (in the future)
+ # with the book, so that the conductor will be able to call into
+ # that factory to retrieve the workflow objects that represent the
+ # work.
+ #
+ # These args and kwargs *can* be used to save any specific parameters
+ # into the factory when it is being called to create the workflow
+ # objects (typically used to tell a factory how to create a unique
+ # workflow that represents this review).
+ factory_args = ()
+ factory_kwargs = {}
+ engines.save_factory_details(detail, create_review_workflow,
+ factory_args, factory_kwargs)
+ with contextlib.closing(saver.get_connection()) as conn:
+ conn.save_logbook(book)
+ return book
+
+ def run():
+ """Periodically publishes 'fake' reviews to analyze."""
+ jb.connect()
+ review_generator = review_iter()
+ with contextlib.closing(jb):
+ while not no_more.is_set():
+ review = six.next(review_generator)
+ details = {
+ 'store': {
+ 'review': review,
+ },
+ }
+ job_name = "%s_%s" % (real_name, review['id'])
+ print("Posting review '%s'" % review['id'])
+ jb.post(job_name,
+ book=make_save_book(saver, review['id']),
+ details=details)
+ time.sleep(REVIEW_CREATION_DELAY)
+
+ # Return the unstarted thread, and a callback that can be used
+ # shutdown that thread (to avoid running forever).
+ return (threading_utils.daemon_thread(target=run), no_more.set)
+
+
+def generate_conductor(client, saver, name=NAME):
+ """Creates a conductor thread with the given name prefix."""
+ real_name = "%s_conductor" % name
+ jb = boards.fetch(name, JOBBOARD_CONF,
+ client=client, persistence=saver)
+ conductor = conductors.fetch("blocking", real_name, jb,
+ engine='parallel', wait_timeout=SCAN_DELAY)
+
+ def run():
+ jb.connect()
+ with contextlib.closing(jb):
+ conductor.run()
+
+ # Return the unstarted thread, and a callback that can be used
+ # shutdown that thread (to avoid running forever).
+ return (threading_utils.daemon_thread(target=run), conductor.stop)
+
+
+def main():
+ # Need to share the same backend, so that data can be shared...
+ persistence_conf = {
+ 'connection': 'memory',
+ }
+ saver = persistence.fetch(persistence_conf)
+ with contextlib.closing(saver.get_connection()) as conn:
+ # This ensures that the needed backend setup/data directories/schema
+ # upgrades and so on... exist before they are attempted to be used...
+ conn.upgrade()
+ fc1 = fake_client.FakeClient()
+ # Done like this to share the same client storage location so the correct
+ # zookeeper features work across clients...
+ fc2 = fake_client.FakeClient(storage=fc1.storage)
+ entities = [
+ generate_reviewer(fc1, saver),
+ generate_conductor(fc2, saver),
+ ]
+ for t, stopper in entities:
+ t.start()
+ try:
+ watch = timeutils.StopWatch(duration=RUN_TIME)
+ watch.start()
+ while not watch.expired():
+ time.sleep(0.1)
+ finally:
+ for t, stopper in reversed(entities):
+ stopper()
+ t.join()
+
+
+if __name__ == '__main__':
+ main()