diff options
author | Doug Hellmann <doug@doughellmann.com> | 2015-04-28 12:18:34 +0000 |
---|---|---|
committer | Victor Sergeyev <vsergeyev@mirantis.com> | 2015-07-29 11:17:57 +0300 |
commit | c90525bfead1f495df86c5c5d795d25abad2e1d9 (patch) | |
tree | 8d81ad551288a3c37a1e6f8c71235820ffac43ec /tests/drivers/test_impl_rabbit.py | |
parent | 99b24b3888233656779a5baec1011554174604d7 (diff) | |
download | oslo-messaging-c90525bfead1f495df86c5c5d795d25abad2e1d9.tar.gz |
Remove oslo namespace package
Blueprint remove-namespace-packages
Cherry-picked from: 03265410e0294e176d18dd42b57268a8056eb8fc
Change-Id: Ibaba19ef10b4902c4f4f9fbdf7078e66b75f2035
Diffstat (limited to 'tests/drivers/test_impl_rabbit.py')
-rw-r--r-- | tests/drivers/test_impl_rabbit.py | 758 |
1 files changed, 0 insertions, 758 deletions
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py deleted file mode 100644 index b2da4a8..0000000 --- a/tests/drivers/test_impl_rabbit.py +++ /dev/null @@ -1,758 +0,0 @@ -# Copyright 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -import sys -import threading -import time -import uuid - -import fixtures -import kombu -from oslotest import mockpatch -import testscenarios - -from oslo import messaging -from oslo_config import cfg -from oslo_messaging._drivers import amqp -from oslo_messaging._drivers import amqpdriver -from oslo_messaging._drivers import common as driver_common -from oslo_messaging._drivers import impl_rabbit as rabbit_driver -from oslo_messaging.tests import utils as test_utils -from oslo_serialization import jsonutils -from six.moves import mock - -load_tests = testscenarios.load_tests_apply_scenarios - - -class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase): - - def setUp(self): - super(TestDeprecatedRabbitDriverLoad, self).setUp( - conf=cfg.ConfigOpts()) - self.messaging_conf.transport_driver = 'rabbit' - self.config(fake_rabbit=True, group="oslo_messaging_rabbit") - - def test_driver_load(self): - self.config(heartbeat_timeout_threshold=0, - group='oslo_messaging_rabbit') - transport = messaging.get_transport(self.conf) - self.addCleanup(transport.cleanup) - driver = transport._driver - url = driver._get_connection()._url - - self.assertIsInstance(driver, rabbit_driver.RabbitDriver) - self.assertEqual('memory:////', url) - - -class TestRabbitDriverLoad(test_utils.BaseTestCase): - - scenarios = [ - ('rabbit', dict(transport_driver='rabbit', - url='amqp://guest:guest@localhost:5672//')), - ('kombu', dict(transport_driver='kombu', - url='amqp://guest:guest@localhost:5672//')), - ('rabbit+memory', dict(transport_driver='kombu+memory', - url='memory:///')) - ] - - @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure') - @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset') - def test_driver_load(self, fake_ensure, fake_reset): - self.config(heartbeat_timeout_threshold=0, - group='oslo_messaging_rabbit') - self.messaging_conf.transport_driver = self.transport_driver - transport = messaging.get_transport(self.conf) - self.addCleanup(transport.cleanup) - driver = transport._driver - url = driver._get_connection()._url - - self.assertIsInstance(driver, rabbit_driver.RabbitDriver) - self.assertEqual(self.url, url) - - -class TestRabbitConsume(test_utils.BaseTestCase): - - def test_consume_timeout(self): - transport = messaging.get_transport(self.conf, 'kombu+memory:////') - self.addCleanup(transport.cleanup) - deadline = time.time() + 6 - with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: - self.assertRaises(driver_common.Timeout, - conn.consume, timeout=3) - - # kombu memory transport doesn't really raise error - # so just simulate a real driver behavior - conn.connection.connection.recoverable_channel_errors = (IOError,) - conn.declare_fanout_consumer("notif.info", lambda msg: True) - with mock.patch('kombu.connection.Connection.drain_events', - side_effect=IOError): - self.assertRaises(driver_common.Timeout, - conn.consume, timeout=3) - - self.assertEqual(0, int(deadline - time.time())) - - -class TestRabbitTransportURL(test_utils.BaseTestCase): - - scenarios = [ - ('none', dict(url=None, - expected=["amqp://guest:guest@localhost:5672//"])), - ('memory', dict(url='kombu+memory:////', - expected=["memory:///"])), - ('empty', - dict(url='rabbit:///', - expected=['amqp://guest:guest@localhost:5672/'])), - ('localhost', - dict(url='rabbit://localhost/', - expected=['amqp://:@localhost:5672/'])), - ('virtual_host', - dict(url='rabbit:///vhost', - expected=['amqp://guest:guest@localhost:5672/vhost'])), - ('no_creds', - dict(url='rabbit://host/virtual_host', - expected=['amqp://:@host:5672/virtual_host'])), - ('no_port', - dict(url='rabbit://user:password@host/virtual_host', - expected=['amqp://user:password@host:5672/virtual_host'])), - ('full_url', - dict(url='rabbit://user:password@host:10/virtual_host', - expected=['amqp://user:password@host:10/virtual_host'])), - ('full_two_url', - dict(url='rabbit://user:password@host:10,' - 'user2:password2@host2:12/virtual_host', - expected=["amqp://user:password@host:10/virtual_host", - "amqp://user2:password2@host2:12/virtual_host"] - )), - ('qpid', - dict(url='kombu+qpid://user:password@host:10/virtual_host', - expected=['qpid://user:password@host:10/virtual_host'])), - ('rabbit', - dict(url='kombu+rabbit://user:password@host:10/virtual_host', - expected=['amqp://user:password@host:10/virtual_host'])), - ('rabbit_ipv6', - dict(url='kombu+rabbit://u:p@[fd00:beef:dead:55::133]:10/vhost', - skip_py26='python 2.6 has broken urlparse for ipv6', - expected=['amqp://u:p@[fd00:beef:dead:55::133]:10/vhost'])), - ('rabbit_ipv4', - dict(url='kombu+rabbit://user:password@10.20.30.40:10/vhost', - expected=['amqp://user:password@10.20.30.40:10/vhost'])), - ] - - def setUp(self): - super(TestRabbitTransportURL, self).setUp() - self.config(heartbeat_timeout_threshold=0, - group='oslo_messaging_rabbit') - self.messaging_conf.transport_driver = 'rabbit' - - @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure') - @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset') - def test_transport_url(self, fake_ensure_connection, fake_reset): - if hasattr(self, 'skip_py26') and sys.version_info < (2, 7): - self.skipTest(self.skip_py26) - - transport = messaging.get_transport(self.conf, self.url) - self.addCleanup(transport.cleanup) - driver = transport._driver - - # NOTE(sileht): some kombu transport can depend on library that - # we don't want to depend yet, because selecting the transport - # is experimental, only amqp is supported - # for example kombu+qpid depends of qpid-tools - # so, mock the connection.info to skip call to qpid-tools - with mock.patch('kombu.connection.Connection.info'): - urls = driver._get_connection()._url.split(";") - self.assertEqual(sorted(self.expected), sorted(urls)) - - -class TestSendReceive(test_utils.BaseTestCase): - - _n_senders = [ - ('single_sender', dict(n_senders=1)), - ('multiple_senders', dict(n_senders=10)), - ] - - _context = [ - ('empty_context', dict(ctxt={})), - ('with_context', dict(ctxt={'user': 'mark'})), - ] - - _reply = [ - ('rx_id', dict(rx_id=True, reply=None)), - ('none', dict(rx_id=False, reply=None)), - ('empty_list', dict(rx_id=False, reply=[])), - ('empty_dict', dict(rx_id=False, reply={})), - ('false', dict(rx_id=False, reply=False)), - ('zero', dict(rx_id=False, reply=0)), - ] - - _failure = [ - ('success', dict(failure=False)), - ('failure', dict(failure=True, expected=False)), - ('expected_failure', dict(failure=True, expected=True)), - ] - - _timeout = [ - ('no_timeout', dict(timeout=None)), - ('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken? - ] - - @classmethod - def generate_scenarios(cls): - cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders, - cls._context, - cls._reply, - cls._failure, - cls._timeout) - - def test_send_receive(self): - self.config(heartbeat_timeout_threshold=0, - group='oslo_messaging_rabbit') - transport = messaging.get_transport(self.conf, 'kombu+memory:////') - self.addCleanup(transport.cleanup) - - driver = transport._driver - - target = messaging.Target(topic='testtopic') - - listener = driver.listen(target) - - senders = [] - replies = [] - msgs = [] - errors = [] - - def stub_error(msg, *a, **kw): - if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): - a = a[0] - errors.append(str(msg) % a) - - self.stubs.Set(driver_common.LOG, 'error', stub_error) - - def send_and_wait_for_reply(i): - try: - replies.append(driver.send(target, - self.ctxt, - {'tx_id': i}, - wait_for_reply=True, - timeout=self.timeout)) - self.assertFalse(self.failure) - self.assertIsNone(self.timeout) - except (ZeroDivisionError, messaging.MessagingTimeout) as e: - replies.append(e) - self.assertTrue(self.failure or self.timeout is not None) - - while len(senders) < self.n_senders: - senders.append(threading.Thread(target=send_and_wait_for_reply, - args=(len(senders), ))) - - for i in range(len(senders)): - senders[i].start() - - received = listener.poll() - self.assertIsNotNone(received) - self.assertEqual(self.ctxt, received.ctxt) - self.assertEqual({'tx_id': i}, received.message) - msgs.append(received) - - # reply in reverse, except reply to the first guy second from last - order = list(range(len(senders) - 1, -1, -1)) - if len(order) > 1: - order[-1], order[-2] = order[-2], order[-1] - - for i in order: - if self.timeout is None: - if self.failure: - try: - raise ZeroDivisionError - except Exception: - failure = sys.exc_info() - - # NOTE(noelbk) confirm that Publisher exchanges - # are always declared with passive=True - outer_self = self - test_exchange_was_called = [False] - old_init = kombu.entity.Exchange.__init__ - - def new_init(self, *args, **kwargs): - test_exchange_was_called[0] = True - outer_self.assertTrue(kwargs['passive']) - old_init(self, *args, **kwargs) - kombu.entity.Exchange.__init__ = new_init - - try: - msgs[i].reply(failure=failure, - log_failure=not self.expected) - finally: - kombu.entity.Exchange.__init__ = old_init - - self.assertTrue(test_exchange_was_called[0]) - - elif self.rx_id: - msgs[i].reply({'rx_id': i}) - else: - msgs[i].reply(self.reply) - senders[i].join() - - self.assertEqual(len(senders), len(replies)) - for i, reply in enumerate(replies): - if self.timeout is not None: - self.assertIsInstance(reply, messaging.MessagingTimeout) - elif self.failure: - self.assertIsInstance(reply, ZeroDivisionError) - elif self.rx_id: - self.assertEqual({'rx_id': order[i]}, reply) - else: - self.assertEqual(self.reply, reply) - - if not self.timeout and self.failure and not self.expected: - self.assertTrue(len(errors) > 0, errors) - else: - self.assertEqual(0, len(errors), errors) - - -TestSendReceive.generate_scenarios() - - -class TestPollAsync(test_utils.BaseTestCase): - - def test_poll_timeout(self): - transport = messaging.get_transport(self.conf, 'kombu+memory:////') - self.addCleanup(transport.cleanup) - driver = transport._driver - target = messaging.Target(topic='testtopic') - listener = driver.listen(target) - received = listener.poll(timeout=0.050) - self.assertIsNone(received) - - -class TestRacyWaitForReply(test_utils.BaseTestCase): - - def test_send_receive(self): - transport = messaging.get_transport(self.conf, 'kombu+memory:////') - self.addCleanup(transport.cleanup) - - driver = transport._driver - - target = messaging.Target(topic='testtopic') - - listener = driver.listen(target) - - senders = [] - replies = [] - msgs = [] - - wait_conditions = [] - orig_reply_waiter = amqpdriver.ReplyWaiter.wait - - def reply_waiter(self, msg_id, timeout): - if wait_conditions: - cond = wait_conditions.pop() - with cond: - cond.notify() - with cond: - cond.wait() - return orig_reply_waiter(self, msg_id, timeout) - - self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter) - - def send_and_wait_for_reply(i, wait_for_reply): - replies.append(driver.send(target, - {}, - {'tx_id': i}, - wait_for_reply=wait_for_reply, - timeout=None)) - - while len(senders) < 2: - t = threading.Thread(target=send_and_wait_for_reply, - args=(len(senders), True)) - t.daemon = True - senders.append(t) - - # test the case then msg_id is not set - t = threading.Thread(target=send_and_wait_for_reply, - args=(len(senders), False)) - t.daemon = True - senders.append(t) - - # Start the first guy, receive his message, but delay his polling - notify_condition = threading.Condition() - wait_conditions.append(notify_condition) - with notify_condition: - senders[0].start() - notify_condition.wait() - - msgs.append(listener.poll()) - self.assertEqual({'tx_id': 0}, msgs[-1].message) - - # Start the second guy, receive his message - senders[1].start() - - msgs.append(listener.poll()) - self.assertEqual({'tx_id': 1}, msgs[-1].message) - - # Reply to both in order, making the second thread queue - # the reply meant for the first thread - msgs[0].reply({'rx_id': 0}) - msgs[1].reply({'rx_id': 1}) - - # Wait for the second thread to finish - senders[1].join() - - # Start the 3rd guy, receive his message - senders[2].start() - - msgs.append(listener.poll()) - self.assertEqual({'tx_id': 2}, msgs[-1].message) - - # Verify the _send_reply was not invoked by driver: - with mock.patch.object(msgs[2], '_send_reply') as method: - msgs[2].reply({'rx_id': 2}) - self.assertEqual(method.call_count, 0) - - # Wait for the 3rd thread to finish - senders[2].join() - - # Let the first thread continue - with notify_condition: - notify_condition.notify() - - # Wait for the first thread to finish - senders[0].join() - - # Verify replies were received out of order - self.assertEqual(len(senders), len(replies)) - self.assertEqual({'rx_id': 1}, replies[0]) - self.assertIsNone(replies[1]) - self.assertEqual({'rx_id': 0}, replies[2]) - - -def _declare_queue(target): - connection = kombu.connection.BrokerConnection(transport='memory') - - # Kludge to speed up tests. - connection.transport.polling_interval = 0.0 - - connection.connect() - channel = connection.channel() - - # work around 'memory' transport bug in 1.1.3 - channel._new_queue('ae.undeliver') - - if target.fanout: - exchange = kombu.entity.Exchange(name=target.topic + '_fanout', - type='fanout', - durable=False, - auto_delete=True) - queue = kombu.entity.Queue(name=target.topic + '_fanout_12345', - channel=channel, - exchange=exchange, - routing_key=target.topic) - if target.server: - exchange = kombu.entity.Exchange(name='openstack', - type='topic', - durable=False, - auto_delete=False) - topic = '%s.%s' % (target.topic, target.server) - queue = kombu.entity.Queue(name=topic, - channel=channel, - exchange=exchange, - routing_key=topic) - else: - exchange = kombu.entity.Exchange(name='openstack', - type='topic', - durable=False, - auto_delete=False) - queue = kombu.entity.Queue(name=target.topic, - channel=channel, - exchange=exchange, - routing_key=target.topic) - - queue.declare() - - return connection, channel, queue - - -class TestRequestWireFormat(test_utils.BaseTestCase): - - _target = [ - ('topic_target', - dict(topic='testtopic', server=None, fanout=False)), - ('server_target', - dict(topic='testtopic', server='testserver', fanout=False)), - # NOTE(markmc): https://github.com/celery/kombu/issues/195 - ('fanout_target', - dict(topic='testtopic', server=None, fanout=True, - skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')), - ] - - _msg = [ - ('empty_msg', - dict(msg={}, expected={})), - ('primitive_msg', - dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})), - ('complex_msg', - dict(msg={'a': {'b': datetime.datetime(1920, 2, 3, 4, 5, 6, 7)}}, - expected={'a': {'b': '1920-02-03T04:05:06.000007'}})), - ] - - _context = [ - ('empty_ctxt', dict(ctxt={}, expected_ctxt={})), - ('user_project_ctxt', - dict(ctxt={'user': 'mark', 'project': 'snarkybunch'}, - expected_ctxt={'_context_user': 'mark', - '_context_project': 'snarkybunch'})), - ] - - @classmethod - def generate_scenarios(cls): - cls.scenarios = testscenarios.multiply_scenarios(cls._msg, - cls._context, - cls._target) - - def setUp(self): - super(TestRequestWireFormat, self).setUp() - self.uuids = [] - self.orig_uuid4 = uuid.uuid4 - self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4)) - - def mock_uuid4(self): - self.uuids.append(self.orig_uuid4()) - return self.uuids[-1] - - def test_request_wire_format(self): - if hasattr(self, 'skip_msg'): - self.skipTest(self.skip_msg) - - transport = messaging.get_transport(self.conf, 'kombu+memory:////') - self.addCleanup(transport.cleanup) - - driver = transport._driver - - target = messaging.Target(topic=self.topic, - server=self.server, - fanout=self.fanout) - - connection, channel, queue = _declare_queue(target) - self.addCleanup(connection.release) - - driver.send(target, self.ctxt, self.msg) - - msgs = [] - - def callback(msg): - msg = channel.message_to_python(msg) - msg.ack() - msgs.append(msg.payload) - - queue.consume(callback=callback, - consumer_tag='1', - nowait=False) - - connection.drain_events() - - self.assertEqual(1, len(msgs)) - self.assertIn('oslo.message', msgs[0]) - - received = msgs[0] - received['oslo.message'] = jsonutils.loads(received['oslo.message']) - - # FIXME(markmc): add _msg_id and _reply_q check - expected_msg = { - '_unique_id': self.uuids[0].hex, - } - expected_msg.update(self.expected) - expected_msg.update(self.expected_ctxt) - - expected = { - 'oslo.version': '2.0', - 'oslo.message': expected_msg, - } - - self.assertEqual(expected, received) - - -TestRequestWireFormat.generate_scenarios() - - -def _create_producer(target): - connection = kombu.connection.BrokerConnection(transport='memory') - - # Kludge to speed up tests. - connection.transport.polling_interval = 0.0 - - connection.connect() - channel = connection.channel() - - # work around 'memory' transport bug in 1.1.3 - channel._new_queue('ae.undeliver') - - if target.fanout: - exchange = kombu.entity.Exchange(name=target.topic + '_fanout', - type='fanout', - durable=False, - auto_delete=True) - producer = kombu.messaging.Producer(exchange=exchange, - channel=channel, - routing_key=target.topic) - elif target.server: - exchange = kombu.entity.Exchange(name='openstack', - type='topic', - durable=False, - auto_delete=False) - topic = '%s.%s' % (target.topic, target.server) - producer = kombu.messaging.Producer(exchange=exchange, - channel=channel, - routing_key=topic) - else: - exchange = kombu.entity.Exchange(name='openstack', - type='topic', - durable=False, - auto_delete=False) - producer = kombu.messaging.Producer(exchange=exchange, - channel=channel, - routing_key=target.topic) - - return connection, producer - - -class TestReplyWireFormat(test_utils.BaseTestCase): - - _target = [ - ('topic_target', - dict(topic='testtopic', server=None, fanout=False)), - ('server_target', - dict(topic='testtopic', server='testserver', fanout=False)), - # NOTE(markmc): https://github.com/celery/kombu/issues/195 - ('fanout_target', - dict(topic='testtopic', server=None, fanout=True, - skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')), - ] - - _msg = [ - ('empty_msg', - dict(msg={}, expected={})), - ('primitive_msg', - dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})), - ('complex_msg', - dict(msg={'a': {'b': '1920-02-03T04:05:06.000007'}}, - expected={'a': {'b': '1920-02-03T04:05:06.000007'}})), - ] - - _context = [ - ('empty_ctxt', dict(ctxt={}, expected_ctxt={})), - ('user_project_ctxt', - dict(ctxt={'_context_user': 'mark', - '_context_project': 'snarkybunch'}, - expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})), - ] - - @classmethod - def generate_scenarios(cls): - cls.scenarios = testscenarios.multiply_scenarios(cls._msg, - cls._context, - cls._target) - - def test_reply_wire_format(self): - if hasattr(self, 'skip_msg'): - self.skipTest(self.skip_msg) - - transport = messaging.get_transport(self.conf, 'kombu+memory:////') - self.addCleanup(transport.cleanup) - - driver = transport._driver - - target = messaging.Target(topic=self.topic, - server=self.server, - fanout=self.fanout) - - listener = driver.listen(target) - - connection, producer = _create_producer(target) - self.addCleanup(connection.release) - - msg = { - 'oslo.version': '2.0', - 'oslo.message': {} - } - - msg['oslo.message'].update(self.msg) - msg['oslo.message'].update(self.ctxt) - - msg['oslo.message'].update({ - '_msg_id': uuid.uuid4().hex, - '_unique_id': uuid.uuid4().hex, - '_reply_q': 'reply_' + uuid.uuid4().hex, - }) - - msg['oslo.message'] = jsonutils.dumps(msg['oslo.message']) - - producer.publish(msg) - - received = listener.poll() - self.assertIsNotNone(received) - self.assertEqual(self.expected_ctxt, received.ctxt) - self.assertEqual(self.expected, received.message) - - -TestReplyWireFormat.generate_scenarios() - - -class RpcKombuHATestCase(test_utils.BaseTestCase): - - def setUp(self): - super(RpcKombuHATestCase, self).setUp() - self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] - self.config(rabbit_hosts=self.brokers, - rabbit_retry_interval=0.01, - rabbit_retry_backoff=0.01, - kombu_reconnect_delay=0, - group="oslo_messaging_rabbit") - - self.kombu_connect = mock.Mock() - self.useFixture(mockpatch.Patch( - 'kombu.connection.Connection.connect', - side_effect=self.kombu_connect)) - self.useFixture(mockpatch.Patch( - 'kombu.connection.Connection.channel')) - - # starting from the first broker in the list - url = messaging.TransportURL.parse(self.conf, None) - self.connection = rabbit_driver.Connection(self.conf, url, - amqp.PURPOSE_SEND) - self.addCleanup(self.connection.close) - - def test_ensure_four_retry(self): - mock_callback = mock.Mock(side_effect=IOError) - self.assertRaises(messaging.MessageDeliveryFailure, - self.connection.ensure, mock_callback, - retry=4) - self.assertEqual(5, self.kombu_connect.call_count) - self.assertEqual(6, mock_callback.call_count) - - def test_ensure_one_retry(self): - mock_callback = mock.Mock(side_effect=IOError) - self.assertRaises(messaging.MessageDeliveryFailure, - self.connection.ensure, mock_callback, - retry=1) - self.assertEqual(2, self.kombu_connect.call_count) - self.assertEqual(3, mock_callback.call_count) - - def test_ensure_no_retry(self): - mock_callback = mock.Mock(side_effect=IOError) - self.assertRaises(messaging.MessageDeliveryFailure, - self.connection.ensure, mock_callback, - retry=0) - self.assertEqual(1, self.kombu_connect.call_count) - self.assertEqual(2, mock_callback.call_count) |