summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2016-02-05 12:44:27 -0800
committerJoshua Harlow <harlowja@gmail.com>2016-02-14 11:43:35 -0800
commita70bd8a7e59f52bc20dd4e219c4242b0f15664b4 (patch)
tree54a6eb4a86651812fd907f9cef3518b4f457339d
parent6cff7b27218377838295dffc3d16ad8ee6ddce4e (diff)
downloadtaskflow-a70bd8a7e59f52bc20dd4e219c4242b0f15664b4.tar.gz
Remove need for separate notify thread
Instead of having a periodic notification thread that will drop messages to try to find workers we can just have this same work be done in the periodically called on_wait callback that is already used for expiring and matching workers to new/updated workers. This avoids having one more thread that doesn't do all that much (and activating it during waiting calls will be often enough to achieve its goal in life). Change-Id: If80233d13d914f2ed3665001a27627b78e6ee780
-rw-r--r--taskflow/engines/worker_based/executor.py59
-rw-r--r--taskflow/engines/worker_based/types.py54
-rw-r--r--taskflow/tests/unit/worker_based/test_executor.py5
3 files changed, 69 insertions, 49 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py
index d274953..46c3fdf 100644
--- a/taskflow/engines/worker_based/executor.py
+++ b/taskflow/engines/worker_based/executor.py
@@ -17,7 +17,6 @@
import functools
import threading
-from futurist import periodics
from oslo_utils import timeutils
import six
@@ -47,12 +46,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
self._ongoing_requests = {}
self._ongoing_requests_lock = threading.RLock()
self._transition_timeout = transition_timeout
- type_handlers = {
- pr.RESPONSE: dispatcher.Handler(self._process_response,
- validator=pr.Response.validate),
- }
self._proxy = proxy.Proxy(uuid, exchange,
- type_handlers=type_handlers,
on_wait=self._on_wait, url=url,
transport=transport,
transport_options=transport_options,
@@ -64,16 +58,17 @@ class WorkerTaskExecutor(executor.TaskExecutor):
# pre-existing knowledge of the topics those workers are on to gather
# and update this information).
self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics)
- self._helpers = tu.ThreadBundle()
- self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start),
- after_start=lambda t: self._proxy.wait(),
- before_join=lambda t: self._proxy.stop())
- p_worker = periodics.PeriodicWorker.create([self._finder])
- if p_worker:
- self._helpers.bind(lambda: tu.daemon_thread(p_worker.start),
- before_join=lambda t: p_worker.stop(),
- after_join=lambda t: p_worker.reset(),
- before_start=lambda t: p_worker.reset())
+ self._proxy.dispatcher.type_handlers.update({
+ pr.RESPONSE: dispatcher.Handler(self._process_response,
+ validator=pr.Response.validate),
+ pr.NOTIFY: dispatcher.Handler(
+ self._finder.process_response,
+ validator=functools.partial(pr.Notify.validate,
+ response=True)),
+ })
+ # Thread that will run the message dispatching (and periodically
+ # call the on_wait callback to do various things) loop...
+ self._helper = None
self._messages_processed = {
'finder': self._finder.messages_processed,
}
@@ -138,8 +133,9 @@ class WorkerTaskExecutor(executor.TaskExecutor):
return True
return False
- def _on_wait(self):
- """This function is called cyclically between draining events."""
+ def _clean(self):
+ if not self._ongoing_requests:
+ return
with self._ongoing_requests_lock:
ongoing_requests_uuids = set(six.iterkeys(self._ongoing_requests))
waiting_requests = {}
@@ -178,6 +174,15 @@ class WorkerTaskExecutor(executor.TaskExecutor):
self._publish_request(request, worker)
self._messages_processed['finder'] = new_messages_processed
+ def _on_wait(self):
+ """This function is called cyclically between draining events."""
+ # Publish any finding messages (used to locate workers).
+ self._finder.maybe_publish()
+ # Process any expired requests or requests that have no current
+ # worker located (publish messages for those if we now do have
+ # a worker located).
+ self._clean()
+
def _submit_task(self, task, task_uuid, action, arguments,
progress_callback=None, **kwargs):
"""Submit task request to a worker."""
@@ -249,15 +254,23 @@ class WorkerTaskExecutor(executor.TaskExecutor):
timeout=timeout)
def start(self):
- """Starts proxy thread and associated topic notification thread."""
- self._helpers.start()
+ """Starts message processing thread."""
+ if self._helper is not None:
+ raise RuntimeError("Worker executor must be stopped before"
+ " it can be started")
+ self._helper = tu.daemon_thread(self._proxy.start)
+ self._helper.start()
+ self._proxy.wait()
def stop(self):
- """Stops proxy thread and associated topic notification thread."""
- self._helpers.stop()
+ """Stops message processing thread."""
+ if self._helper is not None:
+ self._proxy.stop()
+ self._helper.join()
+ self._helper = None
with self._ongoing_requests_lock:
while self._ongoing_requests:
_request_uuid, request = self._ongoing_requests.popitem()
self._handle_expired_request(request)
- self._finder.clear()
+ self._finder.reset()
self._messages_processed['finder'] = self._finder.messages_processed
diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py
index c8f11e2..6bc215e 100644
--- a/taskflow/engines/worker_based/types.py
+++ b/taskflow/engines/worker_based/types.py
@@ -15,17 +15,13 @@
# under the License.
import abc
-import functools
-import itertools
import random
import threading
-from futurist import periodics
from oslo_utils import reflection
from oslo_utils import timeutils
import six
-from taskflow.engines.worker_based import dispatcher
from taskflow.engines.worker_based import protocol as pr
from taskflow import logging
from taskflow.utils import kombu_utils as ku
@@ -132,27 +128,21 @@ class WorkerFinder(object):
def get_worker_for_task(self, task):
"""Gets a worker that can perform a given task."""
- def clear(self):
- pass
-
class ProxyWorkerFinder(WorkerFinder):
"""Requests and receives responses about workers topic+task details."""
- def __init__(self, uuid, proxy, topics):
+ def __init__(self, uuid, proxy, topics,
+ beat_periodicity=pr.NOTIFY_PERIOD):
super(ProxyWorkerFinder, self).__init__()
self._proxy = proxy
self._topics = topics
self._workers = {}
self._uuid = uuid
- self._proxy.dispatcher.type_handlers.update({
- pr.NOTIFY: dispatcher.Handler(
- self._process_response,
- validator=functools.partial(pr.Notify.validate,
- response=True)),
- })
- self._counter = itertools.count()
+ self._seen_workers = 0
self._messages_processed = 0
+ self._messages_published = 0
+ self._watch = timeutils.StopWatch(duration=beat_periodicity)
@property
def messages_processed(self):
@@ -160,15 +150,30 @@ class ProxyWorkerFinder(WorkerFinder):
def _next_worker(self, topic, tasks, temporary=False):
if not temporary:
- return TopicWorker(topic, tasks,
- identity=six.next(self._counter))
+ w = TopicWorker(topic, tasks, identity=self._seen_workers)
+ self._seen_workers += 1
+ return w
else:
return TopicWorker(topic, tasks)
- @periodics.periodic(pr.NOTIFY_PERIOD, run_immediately=True)
- def beat(self):
- """Cyclically called to publish notify message to each topic."""
- self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid)
+ def maybe_publish(self):
+ """Periodically called to publish notify message to each topic.
+
+ These messages (especially the responses) are how this find learns
+ about workers and what tasks they can perform (so that we can then
+ match workers to tasks to run).
+ """
+ if self._messages_published == 0:
+ self._proxy.publish(pr.Notify(),
+ self._topics, reply_to=self._uuid)
+ self._messages_published += 1
+ self._watch.restart()
+ else:
+ if self._watch.expired():
+ self._proxy.publish(pr.Notify(),
+ self._topics, reply_to=self._uuid)
+ self._messages_published += 1
+ self._watch.restart()
def _total_workers(self):
return len(self._workers)
@@ -191,7 +196,7 @@ class ProxyWorkerFinder(WorkerFinder):
self._workers[topic] = worker
return (worker, True)
- def _process_response(self, data, message):
+ def process_response(self, data, message):
"""Process notify message sent from remote side."""
LOG.debug("Started processing notify response message '%s'",
ku.DelayedPretty(message))
@@ -206,9 +211,12 @@ class ProxyWorkerFinder(WorkerFinder):
self._cond.notify_all()
self._messages_processed += 1
- def clear(self):
+ def reset(self):
with self._cond:
self._workers.clear()
+ self._messages_processed = 0
+ self._messages_published = 0
+ self._seen_workers = 0
self._cond.notify_all()
def get_worker_for_task(self, task):
diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py
index 372f48b..d81fd78 100644
--- a/taskflow/tests/unit/worker_based/test_executor.py
+++ b/taskflow/tests/unit/worker_based/test_executor.py
@@ -85,8 +85,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
on_wait=ex._on_wait,
url=self.broker_url, transport=mock.ANY,
transport_options=mock.ANY,
- retry_options=mock.ANY,
- type_handlers=mock.ANY),
+ retry_options=mock.ANY),
mock.call.proxy.dispatcher.type_handlers.update(mock.ANY),
]
self.assertEqual(master_mock_calls, self.master_mock.mock_calls)
@@ -284,7 +283,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT))
# start executor again
- ex.start()
+ self.assertRaises(RuntimeError, ex.start)
# stop executor
ex.stop()