diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2016-02-04 18:09:24 -0800 |
|---|---|---|
| committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2016-02-05 14:40:25 -0800 |
| commit | cea71f27998cfa911044103fcb8fca79b6989717 (patch) | |
| tree | 57e539cd0f2f7ae517040ef4976cf45ff66def28 /taskflow/tests/unit | |
| parent | 61efc31e965b8defac84aa494095fd3734f739eb (diff) | |
| download | taskflow-cea71f27998cfa911044103fcb8fca79b6989717.tar.gz | |
Fix for WBE sporadic timeout of tasks
This fixes the sporadic of tasks that would happen
under certain circumstances. What happened was that
a new worker notification would be sent to a callback
while at the same time a task submission would come in
and there would be a small race period where the task
would insert itself into the requests cache while the
callback was processing.
So to work around this the whole concept of a requests
cache was revamped and now the WBE executor just maintains
its own local dictionary of ongoing requests and accesses
it safely.
During the on_wait function that is periodically called
by kombu the previous expiry of work happens but now any
requests that are pending are matched to any new workers
that may have appeared.
This avoids the race (and ensures that even if a new
worker is found but a submission is in progress that the
duration until that submission happens will only be until
the next on_wait call happens).
Related-Bug: #1431097
Change-Id: I98b0caeedc77ab2f7214847763ae1eb0433d4a78
Diffstat (limited to 'taskflow/tests/unit')
| -rw-r--r-- | taskflow/tests/unit/worker_based/test_executor.py | 56 | ||||
| -rw-r--r-- | taskflow/tests/unit/worker_based/test_types.py | 51 |
2 files changed, 19 insertions, 88 deletions
diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index d01a294..372f48b 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -17,9 +17,6 @@ import threading import time -from oslo_utils import fixture -from oslo_utils import timeutils - from taskflow.engines.worker_based import executor from taskflow.engines.worker_based import protocol as pr from taskflow import task as task_atom @@ -56,6 +53,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.proxy_inst_mock.stop.side_effect = self._fake_proxy_stop self.request_inst_mock.uuid = self.task_uuid self.request_inst_mock.expired = False + self.request_inst_mock.created_on = 0 self.request_inst_mock.task_cls = self.task.name self.message_mock = mock.MagicMock(name='message') self.message_mock.properties = {'correlation_id': self.task_uuid, @@ -96,7 +94,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): def test_on_message_response_state_running(self): response = pr.Response(pr.RUNNING) ex = self.executor() - ex._requests_cache[self.task_uuid] = self.request_inst_mock + ex._ongoing_requests[self.task_uuid] = self.request_inst_mock ex._process_response(response.to_dict(), self.message_mock) expected_calls = [ @@ -109,7 +107,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): event_type=task_atom.EVENT_UPDATE_PROGRESS, details={'progress': 1.0}) ex = self.executor() - ex._requests_cache[self.task_uuid] = self.request_inst_mock + ex._ongoing_requests[self.task_uuid] = self.request_inst_mock ex._process_response(response.to_dict(), self.message_mock) expected_calls = [ @@ -123,10 +121,10 @@ class TestWorkerTaskExecutor(test.MockTestCase): failure_dict = a_failure.to_dict() response = pr.Response(pr.FAILURE, result=failure_dict) ex = self.executor() - ex._requests_cache[self.task_uuid] = self.request_inst_mock + ex._ongoing_requests[self.task_uuid] = self.request_inst_mock ex._process_response(response.to_dict(), self.message_mock) - self.assertEqual(0, len(ex._requests_cache)) + self.assertEqual(0, len(ex._ongoing_requests)) expected_calls = [ mock.call.transition_and_log_error(pr.FAILURE, logger=mock.ANY), mock.call.set_result(result=test_utils.FailureMatcher(a_failure)) @@ -137,7 +135,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): response = pr.Response(pr.SUCCESS, result=self.task_result, event='executed') ex = self.executor() - ex._requests_cache[self.task_uuid] = self.request_inst_mock + ex._ongoing_requests[self.task_uuid] = self.request_inst_mock ex._process_response(response.to_dict(), self.message_mock) expected_calls = [ @@ -149,7 +147,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): def test_on_message_response_unknown_state(self): response = pr.Response(state='<unknown>') ex = self.executor() - ex._requests_cache[self.task_uuid] = self.request_inst_mock + ex._ongoing_requests[self.task_uuid] = self.request_inst_mock ex._process_response(response.to_dict(), self.message_mock) self.assertEqual([], self.request_inst_mock.mock_calls) @@ -158,7 +156,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.message_mock.properties['correlation_id'] = '<unknown>' response = pr.Response(pr.RUNNING) ex = self.executor() - ex._requests_cache[self.task_uuid] = self.request_inst_mock + ex._ongoing_requests[self.task_uuid] = self.request_inst_mock ex._process_response(response.to_dict(), self.message_mock) self.assertEqual([], self.request_inst_mock.mock_calls) @@ -167,48 +165,32 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.message_mock.properties = {'type': pr.RESPONSE} response = pr.Response(pr.RUNNING) ex = self.executor() - ex._requests_cache[self.task_uuid] = self.request_inst_mock + ex._ongoing_requests[self.task_uuid] = self.request_inst_mock ex._process_response(response.to_dict(), self.message_mock) self.assertEqual([], self.request_inst_mock.mock_calls) def test_on_wait_task_not_expired(self): ex = self.executor() - ex._requests_cache[self.task_uuid] = self.request_inst_mock + ex._ongoing_requests[self.task_uuid] = self.request_inst_mock - self.assertEqual(1, len(ex._requests_cache)) + self.assertEqual(1, len(ex._ongoing_requests)) ex._on_wait() - self.assertEqual(1, len(ex._requests_cache)) + self.assertEqual(1, len(ex._ongoing_requests)) - def test_on_wait_task_expired(self): - now = timeutils.utcnow() - f = self.useFixture(fixture.TimeFixture(override_time=now)) + @mock.patch('oslo_utils.timeutils.now') + def test_on_wait_task_expired(self, mock_now): + mock_now.side_effect = [0, 120] self.request_inst_mock.expired = True - self.request_inst_mock.created_on = now + self.request_inst_mock.created_on = 0 - f.advance_time_seconds(120) ex = self.executor() - ex._requests_cache[self.task_uuid] = self.request_inst_mock + ex._ongoing_requests[self.task_uuid] = self.request_inst_mock + self.assertEqual(1, len(ex._ongoing_requests)) - self.assertEqual(1, len(ex._requests_cache)) ex._on_wait() - self.assertEqual(0, len(ex._requests_cache)) - - def test_remove_task_non_existent(self): - ex = self.executor() - ex._requests_cache[self.task_uuid] = self.request_inst_mock - - self.assertEqual(1, len(ex._requests_cache)) - del ex._requests_cache[self.task_uuid] - self.assertEqual(0, len(ex._requests_cache)) - - # delete non-existent - try: - del ex._requests_cache[self.task_uuid] - except KeyError: - pass - self.assertEqual(0, len(ex._requests_cache)) + self.assertEqual(0, len(ex._ongoing_requests)) def test_execute_task(self): ex = self.executor() diff --git a/taskflow/tests/unit/worker_based/test_types.py b/taskflow/tests/unit/worker_based/test_types.py index 095bf1e..7ee4847 100644 --- a/taskflow/tests/unit/worker_based/test_types.py +++ b/taskflow/tests/unit/worker_based/test_types.py @@ -16,63 +16,12 @@ from oslo_utils import reflection -from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import types as worker_types from taskflow import test from taskflow.test import mock from taskflow.tests import utils -class TestRequestCache(test.TestCase): - - def setUp(self): - super(TestRequestCache, self).setUp() - self.task = utils.DummyTask() - self.task_uuid = 'task-uuid' - self.task_action = 'execute' - self.task_args = {'a': 'a'} - self.timeout = 60 - - def request(self, **kwargs): - request_kwargs = dict(task=self.task, - uuid=self.task_uuid, - action=self.task_action, - arguments=self.task_args, - progress_callback=None, - timeout=self.timeout) - request_kwargs.update(kwargs) - return pr.Request(**request_kwargs) - - @mock.patch('oslo_utils.timeutils.now') - def test_requests_cache_expiry(self, now): - # Mock out the calls the underlying objects will soon use to return - # times that we can control more easily... - overrides = [ - 0, - 1, - self.timeout + 1, - ] - now.side_effect = overrides - - cache = worker_types.RequestsCache() - cache[self.task_uuid] = self.request() - cache.cleanup() - self.assertEqual(1, len(cache)) - cache.cleanup() - self.assertEqual(0, len(cache)) - - def test_requests_cache_match(self): - cache = worker_types.RequestsCache() - cache[self.task_uuid] = self.request() - cache['task-uuid-2'] = self.request(task=utils.NastyTask(), - uuid='task-uuid-2') - worker = worker_types.TopicWorker("dummy-topic", [utils.DummyTask], - identity="dummy") - matches = cache.get_waiting_requests(worker) - self.assertEqual(1, len(matches)) - self.assertEqual(2, len(cache)) - - class TestTopicWorker(test.TestCase): def test_topic_worker(self): worker = worker_types.TopicWorker("dummy-topic", |
