summaryrefslogtreecommitdiff
path: root/taskflow/tests/unit
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2016-02-04 18:09:24 -0800
committerJoshua Harlow <harlowja@yahoo-inc.com>2016-02-05 14:40:25 -0800
commitcea71f27998cfa911044103fcb8fca79b6989717 (patch)
tree57e539cd0f2f7ae517040ef4976cf45ff66def28 /taskflow/tests/unit
parent61efc31e965b8defac84aa494095fd3734f739eb (diff)
downloadtaskflow-cea71f27998cfa911044103fcb8fca79b6989717.tar.gz
Fix for WBE sporadic timeout of tasks
This fixes the sporadic of tasks that would happen under certain circumstances. What happened was that a new worker notification would be sent to a callback while at the same time a task submission would come in and there would be a small race period where the task would insert itself into the requests cache while the callback was processing. So to work around this the whole concept of a requests cache was revamped and now the WBE executor just maintains its own local dictionary of ongoing requests and accesses it safely. During the on_wait function that is periodically called by kombu the previous expiry of work happens but now any requests that are pending are matched to any new workers that may have appeared. This avoids the race (and ensures that even if a new worker is found but a submission is in progress that the duration until that submission happens will only be until the next on_wait call happens). Related-Bug: #1431097 Change-Id: I98b0caeedc77ab2f7214847763ae1eb0433d4a78
Diffstat (limited to 'taskflow/tests/unit')
-rw-r--r--taskflow/tests/unit/worker_based/test_executor.py56
-rw-r--r--taskflow/tests/unit/worker_based/test_types.py51
2 files changed, 19 insertions, 88 deletions
diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py
index d01a294..372f48b 100644
--- a/taskflow/tests/unit/worker_based/test_executor.py
+++ b/taskflow/tests/unit/worker_based/test_executor.py
@@ -17,9 +17,6 @@
import threading
import time
-from oslo_utils import fixture
-from oslo_utils import timeutils
-
from taskflow.engines.worker_based import executor
from taskflow.engines.worker_based import protocol as pr
from taskflow import task as task_atom
@@ -56,6 +53,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.proxy_inst_mock.stop.side_effect = self._fake_proxy_stop
self.request_inst_mock.uuid = self.task_uuid
self.request_inst_mock.expired = False
+ self.request_inst_mock.created_on = 0
self.request_inst_mock.task_cls = self.task.name
self.message_mock = mock.MagicMock(name='message')
self.message_mock.properties = {'correlation_id': self.task_uuid,
@@ -96,7 +94,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
def test_on_message_response_state_running(self):
response = pr.Response(pr.RUNNING)
ex = self.executor()
- ex._requests_cache[self.task_uuid] = self.request_inst_mock
+ ex._ongoing_requests[self.task_uuid] = self.request_inst_mock
ex._process_response(response.to_dict(), self.message_mock)
expected_calls = [
@@ -109,7 +107,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
event_type=task_atom.EVENT_UPDATE_PROGRESS,
details={'progress': 1.0})
ex = self.executor()
- ex._requests_cache[self.task_uuid] = self.request_inst_mock
+ ex._ongoing_requests[self.task_uuid] = self.request_inst_mock
ex._process_response(response.to_dict(), self.message_mock)
expected_calls = [
@@ -123,10 +121,10 @@ class TestWorkerTaskExecutor(test.MockTestCase):
failure_dict = a_failure.to_dict()
response = pr.Response(pr.FAILURE, result=failure_dict)
ex = self.executor()
- ex._requests_cache[self.task_uuid] = self.request_inst_mock
+ ex._ongoing_requests[self.task_uuid] = self.request_inst_mock
ex._process_response(response.to_dict(), self.message_mock)
- self.assertEqual(0, len(ex._requests_cache))
+ self.assertEqual(0, len(ex._ongoing_requests))
expected_calls = [
mock.call.transition_and_log_error(pr.FAILURE, logger=mock.ANY),
mock.call.set_result(result=test_utils.FailureMatcher(a_failure))
@@ -137,7 +135,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
response = pr.Response(pr.SUCCESS, result=self.task_result,
event='executed')
ex = self.executor()
- ex._requests_cache[self.task_uuid] = self.request_inst_mock
+ ex._ongoing_requests[self.task_uuid] = self.request_inst_mock
ex._process_response(response.to_dict(), self.message_mock)
expected_calls = [
@@ -149,7 +147,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
def test_on_message_response_unknown_state(self):
response = pr.Response(state='<unknown>')
ex = self.executor()
- ex._requests_cache[self.task_uuid] = self.request_inst_mock
+ ex._ongoing_requests[self.task_uuid] = self.request_inst_mock
ex._process_response(response.to_dict(), self.message_mock)
self.assertEqual([], self.request_inst_mock.mock_calls)
@@ -158,7 +156,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.message_mock.properties['correlation_id'] = '<unknown>'
response = pr.Response(pr.RUNNING)
ex = self.executor()
- ex._requests_cache[self.task_uuid] = self.request_inst_mock
+ ex._ongoing_requests[self.task_uuid] = self.request_inst_mock
ex._process_response(response.to_dict(), self.message_mock)
self.assertEqual([], self.request_inst_mock.mock_calls)
@@ -167,48 +165,32 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.message_mock.properties = {'type': pr.RESPONSE}
response = pr.Response(pr.RUNNING)
ex = self.executor()
- ex._requests_cache[self.task_uuid] = self.request_inst_mock
+ ex._ongoing_requests[self.task_uuid] = self.request_inst_mock
ex._process_response(response.to_dict(), self.message_mock)
self.assertEqual([], self.request_inst_mock.mock_calls)
def test_on_wait_task_not_expired(self):
ex = self.executor()
- ex._requests_cache[self.task_uuid] = self.request_inst_mock
+ ex._ongoing_requests[self.task_uuid] = self.request_inst_mock
- self.assertEqual(1, len(ex._requests_cache))
+ self.assertEqual(1, len(ex._ongoing_requests))
ex._on_wait()
- self.assertEqual(1, len(ex._requests_cache))
+ self.assertEqual(1, len(ex._ongoing_requests))
- def test_on_wait_task_expired(self):
- now = timeutils.utcnow()
- f = self.useFixture(fixture.TimeFixture(override_time=now))
+ @mock.patch('oslo_utils.timeutils.now')
+ def test_on_wait_task_expired(self, mock_now):
+ mock_now.side_effect = [0, 120]
self.request_inst_mock.expired = True
- self.request_inst_mock.created_on = now
+ self.request_inst_mock.created_on = 0
- f.advance_time_seconds(120)
ex = self.executor()
- ex._requests_cache[self.task_uuid] = self.request_inst_mock
+ ex._ongoing_requests[self.task_uuid] = self.request_inst_mock
+ self.assertEqual(1, len(ex._ongoing_requests))
- self.assertEqual(1, len(ex._requests_cache))
ex._on_wait()
- self.assertEqual(0, len(ex._requests_cache))
-
- def test_remove_task_non_existent(self):
- ex = self.executor()
- ex._requests_cache[self.task_uuid] = self.request_inst_mock
-
- self.assertEqual(1, len(ex._requests_cache))
- del ex._requests_cache[self.task_uuid]
- self.assertEqual(0, len(ex._requests_cache))
-
- # delete non-existent
- try:
- del ex._requests_cache[self.task_uuid]
- except KeyError:
- pass
- self.assertEqual(0, len(ex._requests_cache))
+ self.assertEqual(0, len(ex._ongoing_requests))
def test_execute_task(self):
ex = self.executor()
diff --git a/taskflow/tests/unit/worker_based/test_types.py b/taskflow/tests/unit/worker_based/test_types.py
index 095bf1e..7ee4847 100644
--- a/taskflow/tests/unit/worker_based/test_types.py
+++ b/taskflow/tests/unit/worker_based/test_types.py
@@ -16,63 +16,12 @@
from oslo_utils import reflection
-from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import types as worker_types
from taskflow import test
from taskflow.test import mock
from taskflow.tests import utils
-class TestRequestCache(test.TestCase):
-
- def setUp(self):
- super(TestRequestCache, self).setUp()
- self.task = utils.DummyTask()
- self.task_uuid = 'task-uuid'
- self.task_action = 'execute'
- self.task_args = {'a': 'a'}
- self.timeout = 60
-
- def request(self, **kwargs):
- request_kwargs = dict(task=self.task,
- uuid=self.task_uuid,
- action=self.task_action,
- arguments=self.task_args,
- progress_callback=None,
- timeout=self.timeout)
- request_kwargs.update(kwargs)
- return pr.Request(**request_kwargs)
-
- @mock.patch('oslo_utils.timeutils.now')
- def test_requests_cache_expiry(self, now):
- # Mock out the calls the underlying objects will soon use to return
- # times that we can control more easily...
- overrides = [
- 0,
- 1,
- self.timeout + 1,
- ]
- now.side_effect = overrides
-
- cache = worker_types.RequestsCache()
- cache[self.task_uuid] = self.request()
- cache.cleanup()
- self.assertEqual(1, len(cache))
- cache.cleanup()
- self.assertEqual(0, len(cache))
-
- def test_requests_cache_match(self):
- cache = worker_types.RequestsCache()
- cache[self.task_uuid] = self.request()
- cache['task-uuid-2'] = self.request(task=utils.NastyTask(),
- uuid='task-uuid-2')
- worker = worker_types.TopicWorker("dummy-topic", [utils.DummyTask],
- identity="dummy")
- matches = cache.get_waiting_requests(worker)
- self.assertEqual(1, len(matches))
- self.assertEqual(2, len(cache))
-
-
class TestTopicWorker(test.TestCase):
def test_topic_worker(self):
worker = worker_types.TopicWorker("dummy-topic",