summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-08-30 16:21:04 +0000
committerGerrit Code Review <review@openstack.org>2016-08-30 16:21:04 +0000
commit70c2a484f9dccb7702ec287ca9504771c89845e7 (patch)
tree1b86dc67662142b4a7697216eed7f2fd59379852
parent34ea4f6edeaaee93d6a694ed7959bb223bc2240e (diff)
parenteaa30a8887b33e4b3e7d4240ecef6cec1d5aa108 (diff)
downloadoslo-messaging-70c2a484f9dccb7702ec287ca9504771c89845e7.tar.gz
Merge "Fix consuming from missing queues" into stable/mitaka
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py30
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py45
2 files changed, 71 insertions, 4 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index faa49aa..cb25bb0 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -248,7 +248,7 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
- nowait=True, rabbit_ha_queues=None, rabbit_queue_ttl=0):
+ nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0):
"""Init the Publisher class with the exchange_name, routing_key,
type, durable auto_delete
"""
@@ -989,11 +989,33 @@ class Connection(object):
if not self.connection.connected:
raise self.connection.recoverable_connection_errors[0]
- if self._new_tags:
+ consume_max_retries = 1
+ while self._new_tags and consume_max_retries >= 0:
for consumer, tag in self._consumers.items():
if tag in self._new_tags:
- consumer.consume(tag=tag)
- self._new_tags.remove(tag)
+ try:
+ consumer.consume(tag=tag)
+ self._new_tags.remove(tag)
+ except self.connection.channel_errors as exc:
+ # NOTE(kbespalov): during the interval between
+ # a queue declaration and consumer declaration
+ # the queue can disappear. In this case
+ # we must redeclare queue and try to re-consume.
+ # More details is here:
+ # bugs.launchpad.net/oslo.messaging/+bug/1581148
+ LOG.debug("Failed to declare consumer: a queue is "
+ "not exists. Trying to create queue...")
+ if exc.code == 404 and consume_max_retries:
+ consumer.declare(self)
+ # NOTE(kbespalov): the broker closes a channel
+ # at any channel error. The py-amqp catches
+ # this situation and re-open a new channel.
+ # So, we must re-declare all consumers again.
+ self._new_tags = set(self._consumers.values())
+ consume_max_retries -= 1
+ break
+ else:
+ raise
poll_timeout = (self._poll_timeout if timeout is None
else min(timeout, self._poll_timeout))
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index bb5a57b..b718cbd 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -309,6 +309,51 @@ class TestRabbitConsume(test_utils.BaseTestCase):
self.assertEqual(0, int(deadline - time.time()))
+ def test_consume_from_missing_queue(self):
+ transport = oslo_messaging.get_transport(self.conf, 'kombu+memory://')
+ self.addCleanup(transport.cleanup)
+ with transport._driver._get_connection(
+ driver_common.PURPOSE_LISTEN) as conn:
+ with mock.patch('kombu.Queue.consume') as consume, mock.patch(
+ 'kombu.Queue.declare') as declare:
+ conn.declare_topic_consumer(exchange_name='test',
+ topic='test',
+ callback=lambda msg: True)
+ import amqp
+ consume.side_effect = [amqp.NotFound, None]
+ conn.connection.connection.recoverable_connection_errors = ()
+ conn.connection.connection.recoverable_channel_errors = ()
+ self.assertEqual(1, declare.call_count)
+ conn.connection.connection.transport.drain_events = mock.Mock()
+ # Ensure that a queue will be re-declared if the consume method
+ # of kombu.Queue raise amqp.NotFound
+ conn.consume()
+ self.assertEqual(2, declare.call_count)
+
+ def test_consume_from_missing_queue_with_io_error_on_redeclaration(self):
+ transport = oslo_messaging.get_transport(self.conf, 'kombu+memory://')
+ self.addCleanup(transport.cleanup)
+ with transport._driver._get_connection(
+ driver_common.PURPOSE_LISTEN) as conn:
+ with mock.patch('kombu.Queue.consume') as consume, mock.patch(
+ 'kombu.Queue.declare') as declare:
+ conn.declare_topic_consumer(exchange_name='test',
+ topic='test',
+ callback=lambda msg: True)
+ import amqp
+ consume.side_effect = [amqp.NotFound, None]
+ declare.side_effect = [IOError, None]
+
+ conn.connection.connection.recoverable_connection_errors = (
+ IOError,)
+ conn.connection.connection.recoverable_channel_errors = ()
+ self.assertEqual(1, declare.call_count)
+ conn.connection.connection.transport.drain_events = mock.Mock()
+ # Ensure that a queue will be re-declared after
+ # 'queue not found' exception despite on connection error.
+ conn.consume()
+ self.assertEqual(3, declare.call_count)
+
def test_connection_ack_have_disconnected_kombu_connection(self):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')