diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-02-15 05:22:43 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-02-15 05:22:43 +0000 |
commit | 17e0f16ce9aa4ff8837416098975be17b3607dba (patch) | |
tree | 868a033bc5ff13b8b903f0f8f24a366aff66ae87 | |
parent | 9da3ceacff761c4695fa363a83cdcca5b690b5f5 (diff) | |
parent | 63b380fc7b4a633911ec65e87ca326cf677986ce (diff) | |
download | taskflow-17e0f16ce9aa4ff8837416098975be17b3607dba.tar.gz |
Merge "Add WBE worker expiry"1.28.0
-rw-r--r-- | taskflow/engines/worker_based/engine.py | 11 | ||||
-rw-r--r-- | taskflow/engines/worker_based/executor.py | 8 | ||||
-rw-r--r-- | taskflow/engines/worker_based/protocol.py | 4 | ||||
-rw-r--r-- | taskflow/engines/worker_based/types.py | 42 | ||||
-rw-r--r-- | taskflow/tests/unit/worker_based/test_creation.py | 9 | ||||
-rw-r--r-- | taskflow/tests/unit/worker_based/test_types.py | 16 |
6 files changed, 78 insertions, 12 deletions
diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py index a22a5d9..52a30f6 100644 --- a/taskflow/engines/worker_based/engine.py +++ b/taskflow/engines/worker_based/engine.py @@ -44,6 +44,12 @@ class WorkerBasedActionEngine(engine.ActionEngine): options imply and are expected to be) :param retry_options: retry specific options (see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`) + :param worker_expiry: numeric value (or negative/zero/None for + infinite) that defines the number of seconds to + continue to send messages to workers that + have **not** responded back to a prior + notification/ping request (this defaults + to 60 seconds). """ def __init__(self, flow, flow_detail, backend, options): @@ -73,4 +79,7 @@ class WorkerBasedActionEngine(engine.ActionEngine): transport=options.get('transport'), transport_options=options.get('transport_options'), transition_timeout=options.get('transition_timeout', - pr.REQUEST_TIMEOUT)) + pr.REQUEST_TIMEOUT), + worker_expiry=options.get('worker_expiry', + pr.EXPIRES_AFTER), + ) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index dfbe5b3..70c448b 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -41,7 +41,7 @@ class WorkerTaskExecutor(executor.TaskExecutor): def __init__(self, uuid, exchange, topics, transition_timeout=pr.REQUEST_TIMEOUT, url=None, transport=None, transport_options=None, - retry_options=None): + retry_options=None, worker_expiry=pr.EXPIRES_AFTER): self._uuid = uuid self._ongoing_requests = {} self._ongoing_requests_lock = threading.RLock() @@ -57,7 +57,8 @@ class WorkerTaskExecutor(executor.TaskExecutor): # to workers to 'learn' of the tasks they can perform (and requires # pre-existing knowledge of the topics those workers are on to gather # and update this information). - self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics) + self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics, + worker_expiry=worker_expiry) self._proxy.dispatcher.type_handlers.update({ pr.RESPONSE: dispatcher.Handler(self._process_response, validator=pr.Response.validate), @@ -181,6 +182,9 @@ class WorkerTaskExecutor(executor.TaskExecutor): """This function is called cyclically between draining events.""" # Publish any finding messages (used to locate workers). self._finder.maybe_publish() + # If the finder hasn't heard from workers in a given amount + # of time, then those workers are likely dead, so clean them out... + self._finder.clean() # Process any expired requests or requests that have no current # worker located (publish messages for those if we now do have # a worker located). diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index bd95501..1784ab3 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -93,6 +93,10 @@ QUEUE_EXPIRE_TIMEOUT = REQUEST_TIMEOUT # Workers notify period. NOTIFY_PERIOD = 5 +# When a worker hasn't notified in this many seconds, it will get expired from +# being used/targeted for further work. +EXPIRES_AFTER = 60 + # Message types. NOTIFY = 'NOTIFY' REQUEST = 'REQUEST' diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index bd6df5f..9b660b5 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -44,6 +44,7 @@ class TopicWorker(object): self.tasks.append(task) self.topic = topic self.identity = identity + self.last_seen = None def performs(self, task): if not isinstance(task, six.string_types): @@ -80,7 +81,8 @@ class ProxyWorkerFinder(object): """Requests and receives responses about workers topic+task details.""" def __init__(self, uuid, proxy, topics, - beat_periodicity=pr.NOTIFY_PERIOD): + beat_periodicity=pr.NOTIFY_PERIOD, + worker_expiry=pr.EXPIRES_AFTER): self._cond = threading.Condition() self._proxy = proxy self._topics = topics @@ -89,8 +91,10 @@ class ProxyWorkerFinder(object): self._seen_workers = 0 self._messages_processed = 0 self._messages_published = 0 + self._worker_expiry = worker_expiry self._watch = timeutils.StopWatch(duration=beat_periodicity) + @property def total_workers(self): """Number of workers currently known.""" return len(self._workers) @@ -109,9 +113,9 @@ class ProxyWorkerFinder(object): watch = timeutils.StopWatch(duration=timeout) watch.start() with self._cond: - while len(self._workers) < workers: + while self.total_workers < workers: if watch.expired(): - return max(0, workers - len(self._workers)) + return max(0, workers - self.total_workers) self._cond.wait(watch.leftover(return_none=True)) return 0 @@ -192,11 +196,41 @@ class ProxyWorkerFinder(object): response.tasks) if new_or_updated: LOG.debug("Updated worker '%s' (%s total workers are" - " currently known)", worker, len(self._workers)) + " currently known)", worker, self.total_workers) self._cond.notify_all() + worker.last_seen = timeutils.now() self._messages_processed += 1 + def clean(self): + """Cleans out any dead/expired/not responding workers. + + Returns how many workers were removed. + """ + if (not self._workers or + (self._worker_expiry is None or self._worker_expiry <= 0)): + return 0 + dead_workers = {} + with self._cond: + now = timeutils.now() + for topic, worker in six.iteritems(self._workers): + if worker.last_seen is None: + continue + secs_since_last_seen = max(0, now - worker.last_seen) + if secs_since_last_seen >= self._worker_expiry: + dead_workers[topic] = (worker, secs_since_last_seen) + for topic in six.iterkeys(dead_workers): + self._workers.pop(topic) + if dead_workers: + self._cond.notify_all() + if dead_workers and LOG.isEnabledFor(logging.INFO): + for worker, secs_since_last_seen in six.itervalues(dead_workers): + LOG.info("Removed worker '%s' as it has not responded to" + " notification requests in %0.3f seconds", + worker, secs_since_last_seen) + return len(dead_workers) + def reset(self): + """Resets finders internal state.""" with self._cond: self._workers.clear() self._messages_processed = 0 diff --git a/taskflow/tests/unit/worker_based/test_creation.py b/taskflow/tests/unit/worker_based/test_creation.py index 5b689de..7d2b75d 100644 --- a/taskflow/tests/unit/worker_based/test_creation.py +++ b/taskflow/tests/unit/worker_based/test_creation.py @@ -50,7 +50,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase): transport=None, transport_options=None, transition_timeout=mock.ANY, - retry_options=None) + retry_options=None, + worker_expiry=mock.ANY) ] self.assertEqual(expected_calls, self.master_mock.mock_calls) @@ -66,7 +67,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase): transport_options={}, transition_timeout=200, topics=topics, - retry_options={}) + retry_options={}, + worker_expiry=1) expected_calls = [ mock.call.executor_class(uuid=eng.storage.flow_uuid, url=broker_url, @@ -75,7 +77,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase): transport='memory', transport_options={}, transition_timeout=200, - retry_options={}) + retry_options={}, + worker_expiry=1) ] self.assertEqual(expected_calls, self.master_mock.mock_calls) diff --git a/taskflow/tests/unit/worker_based/test_types.py b/taskflow/tests/unit/worker_based/test_types.py index 6809aef..e81aa79 100644 --- a/taskflow/tests/unit/worker_based/test_types.py +++ b/taskflow/tests/unit/worker_based/test_types.py @@ -33,12 +33,24 @@ class TestTopicWorker(test.TestCase): class TestProxyFinder(test.TestCase): + + @mock.patch("oslo_utils.timeutils.now") + def test_expiry(self, mock_now): + finder = worker_types.ProxyWorkerFinder('me', mock.MagicMock(), [], + worker_expiry=60) + w, emit = finder._add('dummy-topic', [utils.DummyTask]) + w.last_seen = 0 + mock_now.side_effect = [120] + gone = finder.clean() + self.assertEqual(0, finder.total_workers) + self.assertEqual(1, gone) + def test_single_topic_worker(self): finder = worker_types.ProxyWorkerFinder('me', mock.MagicMock(), []) w, emit = finder._add('dummy-topic', [utils.DummyTask]) self.assertIsNotNone(w) self.assertTrue(emit) - self.assertEqual(1, finder.total_workers()) + self.assertEqual(1, finder.total_workers) w2 = finder.get_worker_for_task(utils.DummyTask) self.assertEqual(w.identity, w2.identity) @@ -60,7 +72,7 @@ class TestProxyFinder(test.TestCase): added.append(finder._add('dummy-topic', [utils.DummyTask])) added.append(finder._add('dummy-topic-2', [utils.DummyTask])) added.append(finder._add('dummy-topic-3', [utils.NastyTask])) - self.assertEqual(3, finder.total_workers()) + self.assertEqual(3, finder.total_workers) w = finder.get_worker_for_task(utils.NastyTask) self.assertEqual(added[-1][0].identity, w.identity) w = finder.get_worker_for_task(utils.DummyTask) |