summaryrefslogtreecommitdiff
path: root/taskflow/tests/unit/worker_based
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-01-16 15:37:44 -0800
committerJoshua Harlow <harlowja@yahoo-inc.com>2015-01-20 15:07:56 -0800
commitcb27080ea3cd5cddd7f91d866f6a9d1214c9e885 (patch)
tree52776449e8b1a09e7d1b4afdf7d70dbe0fe76de8 /taskflow/tests/unit/worker_based
parenteb92706cbabd8aaab176e3f79f995a33b85bb22f (diff)
downloadtaskflow-cb27080ea3cd5cddd7f91d866f6a9d1214c9e885.tar.gz
Increase robustness of WBE producer/consumers
Use the kombu provided ensure() decorator/wrapper along with sensible default settings to ensure that retries are attempted when kombu detects recoverable connection or recoverable channel errors have occurred. Change-Id: If47f72d02561d0b5d556ac386869a6122c8b871d
Diffstat (limited to 'taskflow/tests/unit/worker_based')
-rw-r--r--taskflow/tests/unit/worker_based/test_creation.py9
-rw-r--r--taskflow/tests/unit/worker_based/test_proxy.py68
2 files changed, 51 insertions, 26 deletions
diff --git a/taskflow/tests/unit/worker_based/test_creation.py b/taskflow/tests/unit/worker_based/test_creation.py
index 6764926..887498c 100644
--- a/taskflow/tests/unit/worker_based/test_creation.py
+++ b/taskflow/tests/unit/worker_based/test_creation.py
@@ -49,7 +49,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
topics=[],
transport=None,
transport_options=None,
- transition_timeout=mock.ANY)
+ transition_timeout=mock.ANY,
+ retry_options=None)
]
self.assertEqual(self.master_mock.mock_calls, expected_calls)
@@ -64,7 +65,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
transport='memory',
transport_options={},
transition_timeout=200,
- topics=topics)
+ topics=topics,
+ retry_options={})
expected_calls = [
mock.call.executor_class(uuid=eng.storage.flow_uuid,
url=broker_url,
@@ -72,7 +74,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
topics=topics,
transport='memory',
transport_options={},
- transition_timeout=200)
+ transition_timeout=200,
+ retry_options={})
]
self.assertEqual(self.master_mock.mock_calls, expected_calls)
diff --git a/taskflow/tests/unit/worker_based/test_proxy.py b/taskflow/tests/unit/worker_based/test_proxy.py
index a3c7d13..daf9b60 100644
--- a/taskflow/tests/unit/worker_based/test_proxy.py
+++ b/taskflow/tests/unit/worker_based/test_proxy.py
@@ -43,8 +43,11 @@ class TestProxy(test.MockTestCase):
proxy.kombu, 'Producer')
# connection mocking
+ def _ensure(obj, func, *args, **kwargs):
+ return func
self.conn_inst_mock.drain_events.side_effect = [
socket.timeout, socket.timeout, KeyboardInterrupt]
+ self.conn_inst_mock.ensure = mock.MagicMock(side_effect=_ensure)
# connections mocking
self.connections_mock = self.patch(
@@ -54,11 +57,8 @@ class TestProxy(test.MockTestCase):
self.conn_inst_mock
# producers mocking
- self.producers_mock = self.patch(
- "taskflow.engines.worker_based.proxy.kombu.producers",
- attach_as='producers')
- self.producers_mock.__getitem__().acquire().__enter__.return_value =\
- self.producer_inst_mock
+ self.conn_inst_mock.Producer.return_value.__enter__ = mock.MagicMock()
+ self.conn_inst_mock.Producer.return_value.__exit__ = mock.MagicMock()
# consumer mocking
self.conn_inst_mock.Consumer.return_value.__enter__ = mock.MagicMock()
@@ -85,11 +85,38 @@ class TestProxy(test.MockTestCase):
mock.call.connection.Consumer(queues=self.queue_inst_mock,
callbacks=[mock.ANY]),
mock.call.connection.Consumer().__enter__(),
+ mock.call.connection.ensure(mock.ANY, mock.ANY,
+ interval_start=mock.ANY,
+ interval_max=mock.ANY,
+ max_retries=mock.ANY,
+ interval_step=mock.ANY,
+ errback=mock.ANY),
] + calls + [
mock.call.connection.Consumer().__exit__(exc_type, mock.ANY,
mock.ANY)
]
+ def proxy_publish_calls(self, calls, routing_key, exc_type=mock.ANY):
+ return [
+ mock.call.connection.Producer(),
+ mock.call.connection.Producer().__enter__(),
+ mock.call.connection.ensure(mock.ANY, mock.ANY,
+ interval_start=mock.ANY,
+ interval_max=mock.ANY,
+ max_retries=mock.ANY,
+ interval_step=mock.ANY,
+ errback=mock.ANY),
+ mock.call.Queue(name=self._queue_name(routing_key),
+ routing_key=routing_key,
+ exchange=self.exchange_inst_mock,
+ durable=False,
+ auto_delete=True,
+ channel=None),
+ ] + calls + [
+ mock.call.connection.Producer().__exit__(exc_type, mock.ANY,
+ mock.ANY)
+ ]
+
def proxy(self, reset_master_mock=False, **kwargs):
proxy_kwargs = dict(topic=self.topic,
exchange_name=self.exchange_name,
@@ -133,24 +160,19 @@ class TestProxy(test.MockTestCase):
routing_key = 'routing-key'
task_uuid = 'task-uuid'
- self.proxy(reset_master_mock=True).publish(
- msg_mock, routing_key, correlation_id=task_uuid)
-
- master_mock_calls = [
- mock.call.Queue(name=self._queue_name(routing_key),
- exchange=self.exchange_inst_mock,
- routing_key=routing_key,
- durable=False,
- auto_delete=True,
- channel=None),
- mock.call.producer.publish(body=msg_data,
- routing_key=routing_key,
- exchange=self.exchange_inst_mock,
- correlation_id=task_uuid,
- declare=[self.queue_inst_mock],
- type=msg_mock.TYPE,
- reply_to=None)
- ]
+ p = self.proxy(reset_master_mock=True)
+ p.publish(msg_mock, routing_key, correlation_id=task_uuid)
+
+ mock_producer = mock.call.connection.Producer()
+ master_mock_calls = self.proxy_publish_calls([
+ mock_producer.__enter__().publish(body=msg_data,
+ routing_key=routing_key,
+ exchange=self.exchange_inst_mock,
+ correlation_id=task_uuid,
+ declare=[self.queue_inst_mock],
+ type=msg_mock.TYPE,
+ reply_to=None)
+ ], routing_key)
self.master_mock.assert_has_calls(master_mock_calls)
def test_start(self):