diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-01-16 15:37:44 -0800 |
|---|---|---|
| committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-01-20 15:07:56 -0800 |
| commit | cb27080ea3cd5cddd7f91d866f6a9d1214c9e885 (patch) | |
| tree | 52776449e8b1a09e7d1b4afdf7d70dbe0fe76de8 /taskflow/tests/unit/worker_based | |
| parent | eb92706cbabd8aaab176e3f79f995a33b85bb22f (diff) | |
| download | taskflow-cb27080ea3cd5cddd7f91d866f6a9d1214c9e885.tar.gz | |
Increase robustness of WBE producer/consumers
Use the kombu provided ensure() decorator/wrapper along
with sensible default settings to ensure that retries are
attempted when kombu detects recoverable connection or
recoverable channel errors have occurred.
Change-Id: If47f72d02561d0b5d556ac386869a6122c8b871d
Diffstat (limited to 'taskflow/tests/unit/worker_based')
| -rw-r--r-- | taskflow/tests/unit/worker_based/test_creation.py | 9 | ||||
| -rw-r--r-- | taskflow/tests/unit/worker_based/test_proxy.py | 68 |
2 files changed, 51 insertions, 26 deletions
diff --git a/taskflow/tests/unit/worker_based/test_creation.py b/taskflow/tests/unit/worker_based/test_creation.py index 6764926..887498c 100644 --- a/taskflow/tests/unit/worker_based/test_creation.py +++ b/taskflow/tests/unit/worker_based/test_creation.py @@ -49,7 +49,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase): topics=[], transport=None, transport_options=None, - transition_timeout=mock.ANY) + transition_timeout=mock.ANY, + retry_options=None) ] self.assertEqual(self.master_mock.mock_calls, expected_calls) @@ -64,7 +65,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase): transport='memory', transport_options={}, transition_timeout=200, - topics=topics) + topics=topics, + retry_options={}) expected_calls = [ mock.call.executor_class(uuid=eng.storage.flow_uuid, url=broker_url, @@ -72,7 +74,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase): topics=topics, transport='memory', transport_options={}, - transition_timeout=200) + transition_timeout=200, + retry_options={}) ] self.assertEqual(self.master_mock.mock_calls, expected_calls) diff --git a/taskflow/tests/unit/worker_based/test_proxy.py b/taskflow/tests/unit/worker_based/test_proxy.py index a3c7d13..daf9b60 100644 --- a/taskflow/tests/unit/worker_based/test_proxy.py +++ b/taskflow/tests/unit/worker_based/test_proxy.py @@ -43,8 +43,11 @@ class TestProxy(test.MockTestCase): proxy.kombu, 'Producer') # connection mocking + def _ensure(obj, func, *args, **kwargs): + return func self.conn_inst_mock.drain_events.side_effect = [ socket.timeout, socket.timeout, KeyboardInterrupt] + self.conn_inst_mock.ensure = mock.MagicMock(side_effect=_ensure) # connections mocking self.connections_mock = self.patch( @@ -54,11 +57,8 @@ class TestProxy(test.MockTestCase): self.conn_inst_mock # producers mocking - self.producers_mock = self.patch( - "taskflow.engines.worker_based.proxy.kombu.producers", - attach_as='producers') - self.producers_mock.__getitem__().acquire().__enter__.return_value =\ - self.producer_inst_mock + self.conn_inst_mock.Producer.return_value.__enter__ = mock.MagicMock() + self.conn_inst_mock.Producer.return_value.__exit__ = mock.MagicMock() # consumer mocking self.conn_inst_mock.Consumer.return_value.__enter__ = mock.MagicMock() @@ -85,11 +85,38 @@ class TestProxy(test.MockTestCase): mock.call.connection.Consumer(queues=self.queue_inst_mock, callbacks=[mock.ANY]), mock.call.connection.Consumer().__enter__(), + mock.call.connection.ensure(mock.ANY, mock.ANY, + interval_start=mock.ANY, + interval_max=mock.ANY, + max_retries=mock.ANY, + interval_step=mock.ANY, + errback=mock.ANY), ] + calls + [ mock.call.connection.Consumer().__exit__(exc_type, mock.ANY, mock.ANY) ] + def proxy_publish_calls(self, calls, routing_key, exc_type=mock.ANY): + return [ + mock.call.connection.Producer(), + mock.call.connection.Producer().__enter__(), + mock.call.connection.ensure(mock.ANY, mock.ANY, + interval_start=mock.ANY, + interval_max=mock.ANY, + max_retries=mock.ANY, + interval_step=mock.ANY, + errback=mock.ANY), + mock.call.Queue(name=self._queue_name(routing_key), + routing_key=routing_key, + exchange=self.exchange_inst_mock, + durable=False, + auto_delete=True, + channel=None), + ] + calls + [ + mock.call.connection.Producer().__exit__(exc_type, mock.ANY, + mock.ANY) + ] + def proxy(self, reset_master_mock=False, **kwargs): proxy_kwargs = dict(topic=self.topic, exchange_name=self.exchange_name, @@ -133,24 +160,19 @@ class TestProxy(test.MockTestCase): routing_key = 'routing-key' task_uuid = 'task-uuid' - self.proxy(reset_master_mock=True).publish( - msg_mock, routing_key, correlation_id=task_uuid) - - master_mock_calls = [ - mock.call.Queue(name=self._queue_name(routing_key), - exchange=self.exchange_inst_mock, - routing_key=routing_key, - durable=False, - auto_delete=True, - channel=None), - mock.call.producer.publish(body=msg_data, - routing_key=routing_key, - exchange=self.exchange_inst_mock, - correlation_id=task_uuid, - declare=[self.queue_inst_mock], - type=msg_mock.TYPE, - reply_to=None) - ] + p = self.proxy(reset_master_mock=True) + p.publish(msg_mock, routing_key, correlation_id=task_uuid) + + mock_producer = mock.call.connection.Producer() + master_mock_calls = self.proxy_publish_calls([ + mock_producer.__enter__().publish(body=msg_data, + routing_key=routing_key, + exchange=self.exchange_inst_mock, + correlation_id=task_uuid, + declare=[self.queue_inst_mock], + type=msg_mock.TYPE, + reply_to=None) + ], routing_key) self.master_mock.assert_has_calls(master_mock_calls) def test_start(self): |
