diff options
author | Joshua Harlow <harlowja@yahoo-inc.com> | 2016-02-05 12:44:27 -0800 |
---|---|---|
committer | Joshua Harlow <harlowja@gmail.com> | 2016-02-14 11:43:35 -0800 |
commit | a70bd8a7e59f52bc20dd4e219c4242b0f15664b4 (patch) | |
tree | 54a6eb4a86651812fd907f9cef3518b4f457339d | |
parent | 6cff7b27218377838295dffc3d16ad8ee6ddce4e (diff) | |
download | taskflow-a70bd8a7e59f52bc20dd4e219c4242b0f15664b4.tar.gz |
Remove need for separate notify thread
Instead of having a periodic notification thread that
will drop messages to try to find workers we can just
have this same work be done in the periodically called
on_wait callback that is already used for expiring and
matching workers to new/updated workers.
This avoids having one more thread that doesn't do all
that much (and activating it during waiting calls will
be often enough to achieve its goal in life).
Change-Id: If80233d13d914f2ed3665001a27627b78e6ee780
-rw-r--r-- | taskflow/engines/worker_based/executor.py | 59 | ||||
-rw-r--r-- | taskflow/engines/worker_based/types.py | 54 | ||||
-rw-r--r-- | taskflow/tests/unit/worker_based/test_executor.py | 5 |
3 files changed, 69 insertions, 49 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index d274953..46c3fdf 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -17,7 +17,6 @@ import functools import threading -from futurist import periodics from oslo_utils import timeutils import six @@ -47,12 +46,7 @@ class WorkerTaskExecutor(executor.TaskExecutor): self._ongoing_requests = {} self._ongoing_requests_lock = threading.RLock() self._transition_timeout = transition_timeout - type_handlers = { - pr.RESPONSE: dispatcher.Handler(self._process_response, - validator=pr.Response.validate), - } self._proxy = proxy.Proxy(uuid, exchange, - type_handlers=type_handlers, on_wait=self._on_wait, url=url, transport=transport, transport_options=transport_options, @@ -64,16 +58,17 @@ class WorkerTaskExecutor(executor.TaskExecutor): # pre-existing knowledge of the topics those workers are on to gather # and update this information). self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics) - self._helpers = tu.ThreadBundle() - self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start), - after_start=lambda t: self._proxy.wait(), - before_join=lambda t: self._proxy.stop()) - p_worker = periodics.PeriodicWorker.create([self._finder]) - if p_worker: - self._helpers.bind(lambda: tu.daemon_thread(p_worker.start), - before_join=lambda t: p_worker.stop(), - after_join=lambda t: p_worker.reset(), - before_start=lambda t: p_worker.reset()) + self._proxy.dispatcher.type_handlers.update({ + pr.RESPONSE: dispatcher.Handler(self._process_response, + validator=pr.Response.validate), + pr.NOTIFY: dispatcher.Handler( + self._finder.process_response, + validator=functools.partial(pr.Notify.validate, + response=True)), + }) + # Thread that will run the message dispatching (and periodically + # call the on_wait callback to do various things) loop... + self._helper = None self._messages_processed = { 'finder': self._finder.messages_processed, } @@ -138,8 +133,9 @@ class WorkerTaskExecutor(executor.TaskExecutor): return True return False - def _on_wait(self): - """This function is called cyclically between draining events.""" + def _clean(self): + if not self._ongoing_requests: + return with self._ongoing_requests_lock: ongoing_requests_uuids = set(six.iterkeys(self._ongoing_requests)) waiting_requests = {} @@ -178,6 +174,15 @@ class WorkerTaskExecutor(executor.TaskExecutor): self._publish_request(request, worker) self._messages_processed['finder'] = new_messages_processed + def _on_wait(self): + """This function is called cyclically between draining events.""" + # Publish any finding messages (used to locate workers). + self._finder.maybe_publish() + # Process any expired requests or requests that have no current + # worker located (publish messages for those if we now do have + # a worker located). + self._clean() + def _submit_task(self, task, task_uuid, action, arguments, progress_callback=None, **kwargs): """Submit task request to a worker.""" @@ -249,15 +254,23 @@ class WorkerTaskExecutor(executor.TaskExecutor): timeout=timeout) def start(self): - """Starts proxy thread and associated topic notification thread.""" - self._helpers.start() + """Starts message processing thread.""" + if self._helper is not None: + raise RuntimeError("Worker executor must be stopped before" + " it can be started") + self._helper = tu.daemon_thread(self._proxy.start) + self._helper.start() + self._proxy.wait() def stop(self): - """Stops proxy thread and associated topic notification thread.""" - self._helpers.stop() + """Stops message processing thread.""" + if self._helper is not None: + self._proxy.stop() + self._helper.join() + self._helper = None with self._ongoing_requests_lock: while self._ongoing_requests: _request_uuid, request = self._ongoing_requests.popitem() self._handle_expired_request(request) - self._finder.clear() + self._finder.reset() self._messages_processed['finder'] = self._finder.messages_processed diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index c8f11e2..6bc215e 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -15,17 +15,13 @@ # under the License. import abc -import functools -import itertools import random import threading -from futurist import periodics from oslo_utils import reflection from oslo_utils import timeutils import six -from taskflow.engines.worker_based import dispatcher from taskflow.engines.worker_based import protocol as pr from taskflow import logging from taskflow.utils import kombu_utils as ku @@ -132,27 +128,21 @@ class WorkerFinder(object): def get_worker_for_task(self, task): """Gets a worker that can perform a given task.""" - def clear(self): - pass - class ProxyWorkerFinder(WorkerFinder): """Requests and receives responses about workers topic+task details.""" - def __init__(self, uuid, proxy, topics): + def __init__(self, uuid, proxy, topics, + beat_periodicity=pr.NOTIFY_PERIOD): super(ProxyWorkerFinder, self).__init__() self._proxy = proxy self._topics = topics self._workers = {} self._uuid = uuid - self._proxy.dispatcher.type_handlers.update({ - pr.NOTIFY: dispatcher.Handler( - self._process_response, - validator=functools.partial(pr.Notify.validate, - response=True)), - }) - self._counter = itertools.count() + self._seen_workers = 0 self._messages_processed = 0 + self._messages_published = 0 + self._watch = timeutils.StopWatch(duration=beat_periodicity) @property def messages_processed(self): @@ -160,15 +150,30 @@ class ProxyWorkerFinder(WorkerFinder): def _next_worker(self, topic, tasks, temporary=False): if not temporary: - return TopicWorker(topic, tasks, - identity=six.next(self._counter)) + w = TopicWorker(topic, tasks, identity=self._seen_workers) + self._seen_workers += 1 + return w else: return TopicWorker(topic, tasks) - @periodics.periodic(pr.NOTIFY_PERIOD, run_immediately=True) - def beat(self): - """Cyclically called to publish notify message to each topic.""" - self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid) + def maybe_publish(self): + """Periodically called to publish notify message to each topic. + + These messages (especially the responses) are how this find learns + about workers and what tasks they can perform (so that we can then + match workers to tasks to run). + """ + if self._messages_published == 0: + self._proxy.publish(pr.Notify(), + self._topics, reply_to=self._uuid) + self._messages_published += 1 + self._watch.restart() + else: + if self._watch.expired(): + self._proxy.publish(pr.Notify(), + self._topics, reply_to=self._uuid) + self._messages_published += 1 + self._watch.restart() def _total_workers(self): return len(self._workers) @@ -191,7 +196,7 @@ class ProxyWorkerFinder(WorkerFinder): self._workers[topic] = worker return (worker, True) - def _process_response(self, data, message): + def process_response(self, data, message): """Process notify message sent from remote side.""" LOG.debug("Started processing notify response message '%s'", ku.DelayedPretty(message)) @@ -206,9 +211,12 @@ class ProxyWorkerFinder(WorkerFinder): self._cond.notify_all() self._messages_processed += 1 - def clear(self): + def reset(self): with self._cond: self._workers.clear() + self._messages_processed = 0 + self._messages_published = 0 + self._seen_workers = 0 self._cond.notify_all() def get_worker_for_task(self, task): diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index 372f48b..d81fd78 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -85,8 +85,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): on_wait=ex._on_wait, url=self.broker_url, transport=mock.ANY, transport_options=mock.ANY, - retry_options=mock.ANY, - type_handlers=mock.ANY), + retry_options=mock.ANY), mock.call.proxy.dispatcher.type_handlers.update(mock.ANY), ] self.assertEqual(master_mock_calls, self.master_mock.mock_calls) @@ -284,7 +283,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT)) # start executor again - ex.start() + self.assertRaises(RuntimeError, ex.start) # stop executor ex.stop() |