summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-02-15 05:22:43 +0000
committerGerrit Code Review <review@openstack.org>2016-02-15 05:22:43 +0000
commit17e0f16ce9aa4ff8837416098975be17b3607dba (patch)
tree868a033bc5ff13b8b903f0f8f24a366aff66ae87
parent9da3ceacff761c4695fa363a83cdcca5b690b5f5 (diff)
parent63b380fc7b4a633911ec65e87ca326cf677986ce (diff)
downloadtaskflow-17e0f16ce9aa4ff8837416098975be17b3607dba.tar.gz
Merge "Add WBE worker expiry"1.28.0
-rw-r--r--taskflow/engines/worker_based/engine.py11
-rw-r--r--taskflow/engines/worker_based/executor.py8
-rw-r--r--taskflow/engines/worker_based/protocol.py4
-rw-r--r--taskflow/engines/worker_based/types.py42
-rw-r--r--taskflow/tests/unit/worker_based/test_creation.py9
-rw-r--r--taskflow/tests/unit/worker_based/test_types.py16
6 files changed, 78 insertions, 12 deletions
diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py
index a22a5d9..52a30f6 100644
--- a/taskflow/engines/worker_based/engine.py
+++ b/taskflow/engines/worker_based/engine.py
@@ -44,6 +44,12 @@ class WorkerBasedActionEngine(engine.ActionEngine):
options imply and are expected to be)
:param retry_options: retry specific options
(see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`)
+ :param worker_expiry: numeric value (or negative/zero/None for
+ infinite) that defines the number of seconds to
+ continue to send messages to workers that
+ have **not** responded back to a prior
+ notification/ping request (this defaults
+ to 60 seconds).
"""
def __init__(self, flow, flow_detail, backend, options):
@@ -73,4 +79,7 @@ class WorkerBasedActionEngine(engine.ActionEngine):
transport=options.get('transport'),
transport_options=options.get('transport_options'),
transition_timeout=options.get('transition_timeout',
- pr.REQUEST_TIMEOUT))
+ pr.REQUEST_TIMEOUT),
+ worker_expiry=options.get('worker_expiry',
+ pr.EXPIRES_AFTER),
+ )
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py
index dfbe5b3..70c448b 100644
--- a/taskflow/engines/worker_based/executor.py
+++ b/taskflow/engines/worker_based/executor.py
@@ -41,7 +41,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
def __init__(self, uuid, exchange, topics,
transition_timeout=pr.REQUEST_TIMEOUT,
url=None, transport=None, transport_options=None,
- retry_options=None):
+ retry_options=None, worker_expiry=pr.EXPIRES_AFTER):
self._uuid = uuid
self._ongoing_requests = {}
self._ongoing_requests_lock = threading.RLock()
@@ -57,7 +57,8 @@ class WorkerTaskExecutor(executor.TaskExecutor):
# to workers to 'learn' of the tasks they can perform (and requires
# pre-existing knowledge of the topics those workers are on to gather
# and update this information).
- self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics)
+ self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics,
+ worker_expiry=worker_expiry)
self._proxy.dispatcher.type_handlers.update({
pr.RESPONSE: dispatcher.Handler(self._process_response,
validator=pr.Response.validate),
@@ -181,6 +182,9 @@ class WorkerTaskExecutor(executor.TaskExecutor):
"""This function is called cyclically between draining events."""
# Publish any finding messages (used to locate workers).
self._finder.maybe_publish()
+ # If the finder hasn't heard from workers in a given amount
+ # of time, then those workers are likely dead, so clean them out...
+ self._finder.clean()
# Process any expired requests or requests that have no current
# worker located (publish messages for those if we now do have
# a worker located).
diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py
index bd95501..1784ab3 100644
--- a/taskflow/engines/worker_based/protocol.py
+++ b/taskflow/engines/worker_based/protocol.py
@@ -93,6 +93,10 @@ QUEUE_EXPIRE_TIMEOUT = REQUEST_TIMEOUT
# Workers notify period.
NOTIFY_PERIOD = 5
+# When a worker hasn't notified in this many seconds, it will get expired from
+# being used/targeted for further work.
+EXPIRES_AFTER = 60
+
# Message types.
NOTIFY = 'NOTIFY'
REQUEST = 'REQUEST'
diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py
index bd6df5f..9b660b5 100644
--- a/taskflow/engines/worker_based/types.py
+++ b/taskflow/engines/worker_based/types.py
@@ -44,6 +44,7 @@ class TopicWorker(object):
self.tasks.append(task)
self.topic = topic
self.identity = identity
+ self.last_seen = None
def performs(self, task):
if not isinstance(task, six.string_types):
@@ -80,7 +81,8 @@ class ProxyWorkerFinder(object):
"""Requests and receives responses about workers topic+task details."""
def __init__(self, uuid, proxy, topics,
- beat_periodicity=pr.NOTIFY_PERIOD):
+ beat_periodicity=pr.NOTIFY_PERIOD,
+ worker_expiry=pr.EXPIRES_AFTER):
self._cond = threading.Condition()
self._proxy = proxy
self._topics = topics
@@ -89,8 +91,10 @@ class ProxyWorkerFinder(object):
self._seen_workers = 0
self._messages_processed = 0
self._messages_published = 0
+ self._worker_expiry = worker_expiry
self._watch = timeutils.StopWatch(duration=beat_periodicity)
+ @property
def total_workers(self):
"""Number of workers currently known."""
return len(self._workers)
@@ -109,9 +113,9 @@ class ProxyWorkerFinder(object):
watch = timeutils.StopWatch(duration=timeout)
watch.start()
with self._cond:
- while len(self._workers) < workers:
+ while self.total_workers < workers:
if watch.expired():
- return max(0, workers - len(self._workers))
+ return max(0, workers - self.total_workers)
self._cond.wait(watch.leftover(return_none=True))
return 0
@@ -192,11 +196,41 @@ class ProxyWorkerFinder(object):
response.tasks)
if new_or_updated:
LOG.debug("Updated worker '%s' (%s total workers are"
- " currently known)", worker, len(self._workers))
+ " currently known)", worker, self.total_workers)
self._cond.notify_all()
+ worker.last_seen = timeutils.now()
self._messages_processed += 1
+ def clean(self):
+ """Cleans out any dead/expired/not responding workers.
+
+ Returns how many workers were removed.
+ """
+ if (not self._workers or
+ (self._worker_expiry is None or self._worker_expiry <= 0)):
+ return 0
+ dead_workers = {}
+ with self._cond:
+ now = timeutils.now()
+ for topic, worker in six.iteritems(self._workers):
+ if worker.last_seen is None:
+ continue
+ secs_since_last_seen = max(0, now - worker.last_seen)
+ if secs_since_last_seen >= self._worker_expiry:
+ dead_workers[topic] = (worker, secs_since_last_seen)
+ for topic in six.iterkeys(dead_workers):
+ self._workers.pop(topic)
+ if dead_workers:
+ self._cond.notify_all()
+ if dead_workers and LOG.isEnabledFor(logging.INFO):
+ for worker, secs_since_last_seen in six.itervalues(dead_workers):
+ LOG.info("Removed worker '%s' as it has not responded to"
+ " notification requests in %0.3f seconds",
+ worker, secs_since_last_seen)
+ return len(dead_workers)
+
def reset(self):
+ """Resets finders internal state."""
with self._cond:
self._workers.clear()
self._messages_processed = 0
diff --git a/taskflow/tests/unit/worker_based/test_creation.py b/taskflow/tests/unit/worker_based/test_creation.py
index 5b689de..7d2b75d 100644
--- a/taskflow/tests/unit/worker_based/test_creation.py
+++ b/taskflow/tests/unit/worker_based/test_creation.py
@@ -50,7 +50,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
transport=None,
transport_options=None,
transition_timeout=mock.ANY,
- retry_options=None)
+ retry_options=None,
+ worker_expiry=mock.ANY)
]
self.assertEqual(expected_calls, self.master_mock.mock_calls)
@@ -66,7 +67,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
transport_options={},
transition_timeout=200,
topics=topics,
- retry_options={})
+ retry_options={},
+ worker_expiry=1)
expected_calls = [
mock.call.executor_class(uuid=eng.storage.flow_uuid,
url=broker_url,
@@ -75,7 +77,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
transport='memory',
transport_options={},
transition_timeout=200,
- retry_options={})
+ retry_options={},
+ worker_expiry=1)
]
self.assertEqual(expected_calls, self.master_mock.mock_calls)
diff --git a/taskflow/tests/unit/worker_based/test_types.py b/taskflow/tests/unit/worker_based/test_types.py
index 6809aef..e81aa79 100644
--- a/taskflow/tests/unit/worker_based/test_types.py
+++ b/taskflow/tests/unit/worker_based/test_types.py
@@ -33,12 +33,24 @@ class TestTopicWorker(test.TestCase):
class TestProxyFinder(test.TestCase):
+
+ @mock.patch("oslo_utils.timeutils.now")
+ def test_expiry(self, mock_now):
+ finder = worker_types.ProxyWorkerFinder('me', mock.MagicMock(), [],
+ worker_expiry=60)
+ w, emit = finder._add('dummy-topic', [utils.DummyTask])
+ w.last_seen = 0
+ mock_now.side_effect = [120]
+ gone = finder.clean()
+ self.assertEqual(0, finder.total_workers)
+ self.assertEqual(1, gone)
+
def test_single_topic_worker(self):
finder = worker_types.ProxyWorkerFinder('me', mock.MagicMock(), [])
w, emit = finder._add('dummy-topic', [utils.DummyTask])
self.assertIsNotNone(w)
self.assertTrue(emit)
- self.assertEqual(1, finder.total_workers())
+ self.assertEqual(1, finder.total_workers)
w2 = finder.get_worker_for_task(utils.DummyTask)
self.assertEqual(w.identity, w2.identity)
@@ -60,7 +72,7 @@ class TestProxyFinder(test.TestCase):
added.append(finder._add('dummy-topic', [utils.DummyTask]))
added.append(finder._add('dummy-topic-2', [utils.DummyTask]))
added.append(finder._add('dummy-topic-3', [utils.NastyTask]))
- self.assertEqual(3, finder.total_workers())
+ self.assertEqual(3, finder.total_workers)
w = finder.get_worker_for_task(utils.NastyTask)
self.assertEqual(added[-1][0].identity, w.identity)
w = finder.get_worker_for_task(utils.DummyTask)