diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-08-30 16:21:04 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-08-30 16:21:04 +0000 |
commit | 70c2a484f9dccb7702ec287ca9504771c89845e7 (patch) | |
tree | 1b86dc67662142b4a7697216eed7f2fd59379852 | |
parent | 34ea4f6edeaaee93d6a694ed7959bb223bc2240e (diff) | |
parent | eaa30a8887b33e4b3e7d4240ecef6cec1d5aa108 (diff) | |
download | oslo-messaging-70c2a484f9dccb7702ec287ca9504771c89845e7.tar.gz |
Merge "Fix consuming from missing queues" into stable/mitaka
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 30 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_rabbit.py | 45 |
2 files changed, 71 insertions, 4 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index faa49aa..cb25bb0 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -248,7 +248,7 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, - nowait=True, rabbit_ha_queues=None, rabbit_queue_ttl=0): + nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0): """Init the Publisher class with the exchange_name, routing_key, type, durable auto_delete """ @@ -989,11 +989,33 @@ class Connection(object): if not self.connection.connected: raise self.connection.recoverable_connection_errors[0] - if self._new_tags: + consume_max_retries = 1 + while self._new_tags and consume_max_retries >= 0: for consumer, tag in self._consumers.items(): if tag in self._new_tags: - consumer.consume(tag=tag) - self._new_tags.remove(tag) + try: + consumer.consume(tag=tag) + self._new_tags.remove(tag) + except self.connection.channel_errors as exc: + # NOTE(kbespalov): during the interval between + # a queue declaration and consumer declaration + # the queue can disappear. In this case + # we must redeclare queue and try to re-consume. + # More details is here: + # bugs.launchpad.net/oslo.messaging/+bug/1581148 + LOG.debug("Failed to declare consumer: a queue is " + "not exists. Trying to create queue...") + if exc.code == 404 and consume_max_retries: + consumer.declare(self) + # NOTE(kbespalov): the broker closes a channel + # at any channel error. The py-amqp catches + # this situation and re-open a new channel. + # So, we must re-declare all consumers again. + self._new_tags = set(self._consumers.values()) + consume_max_retries -= 1 + break + else: + raise poll_timeout = (self._poll_timeout if timeout is None else min(timeout, self._poll_timeout)) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index bb5a57b..b718cbd 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -309,6 +309,51 @@ class TestRabbitConsume(test_utils.BaseTestCase): self.assertEqual(0, int(deadline - time.time())) + def test_consume_from_missing_queue(self): + transport = oslo_messaging.get_transport(self.conf, 'kombu+memory://') + self.addCleanup(transport.cleanup) + with transport._driver._get_connection( + driver_common.PURPOSE_LISTEN) as conn: + with mock.patch('kombu.Queue.consume') as consume, mock.patch( + 'kombu.Queue.declare') as declare: + conn.declare_topic_consumer(exchange_name='test', + topic='test', + callback=lambda msg: True) + import amqp + consume.side_effect = [amqp.NotFound, None] + conn.connection.connection.recoverable_connection_errors = () + conn.connection.connection.recoverable_channel_errors = () + self.assertEqual(1, declare.call_count) + conn.connection.connection.transport.drain_events = mock.Mock() + # Ensure that a queue will be re-declared if the consume method + # of kombu.Queue raise amqp.NotFound + conn.consume() + self.assertEqual(2, declare.call_count) + + def test_consume_from_missing_queue_with_io_error_on_redeclaration(self): + transport = oslo_messaging.get_transport(self.conf, 'kombu+memory://') + self.addCleanup(transport.cleanup) + with transport._driver._get_connection( + driver_common.PURPOSE_LISTEN) as conn: + with mock.patch('kombu.Queue.consume') as consume, mock.patch( + 'kombu.Queue.declare') as declare: + conn.declare_topic_consumer(exchange_name='test', + topic='test', + callback=lambda msg: True) + import amqp + consume.side_effect = [amqp.NotFound, None] + declare.side_effect = [IOError, None] + + conn.connection.connection.recoverable_connection_errors = ( + IOError,) + conn.connection.connection.recoverable_channel_errors = () + self.assertEqual(1, declare.call_count) + conn.connection.connection.transport.drain_events = mock.Mock() + # Ensure that a queue will be re-declared after + # 'queue not found' exception despite on connection error. + conn.consume() + self.assertEqual(3, declare.call_count) + def test_connection_ack_have_disconnected_kombu_connection(self): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') |