summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabriele <gsantomaggio@suse.com>2019-04-04 14:56:25 +0200
committerGabriele Santomaggio <g.santomaggio@gmail.com>2019-04-23 07:50:24 +0000
commit6ea1bb5fad84079d07d8ad48d2c2a71aa29cdda4 (patch)
tree5a3a9f8c59a230f76649b0bef1744bb20e0f136c
parent50b4451503122fc100fa1d8714d052267a0d5650 (diff)
downloadoslo-messaging-6ea1bb5fad84079d07d8ad48d2c2a71aa29cdda4.tar.gz
Retry to declare a queue after internal error5.35.5
Without this commit, the client can lose the messages, because the client does not handler the 'AMQP internal error 541', read here [2] for details. The fix retries to create the queue after a delay. When the virtual-host is ready the declare does not fail. This is a rare condiction, please read the bug [1] for details. Closes-Bug: #1822778 [1] https://bugs.launchpad.net/oslo.messaging/+bug/1822778 [2] https://www.rabbitmq.com/amqp-0-9-1-reference.html Change-Id: I7ab1f9d21ebb807285bf1422bc14cc6e07dcd32a (cherry picked from commit 4d2787227b00b973973554f7387e621d2664c0d8)
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py23
1 files changed, 22 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index d1ce127..5f0afd5 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -277,7 +277,6 @@ class Consumer(object):
self.nowait = nowait
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
rabbit_queue_ttl)
-
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
@@ -312,6 +311,28 @@ class Consumer(object):
self.queue.declare()
else:
raise
+ except kombu.exceptions.ConnectionError as exc:
+ # NOTE(gsantomaggio): This exception happens when the
+ # connection is established,but it fails to create the queue.
+ # Add some delay to avoid too many requests to the server.
+ # See: https://bugs.launchpad.net/oslo.messaging/+bug/1822778
+ # for details.
+ if exc.code == 541:
+ interval = 2
+ info = {'sleep_time': interval,
+ 'queue': self.queue_name,
+ 'err_str': exc
+ }
+ LOG.error(_LE('Internal amqp error (541) '
+ 'during queue declare,'
+ 'retrying in %(sleep_time)s seconds. '
+ 'Queue: [%(queue)s], '
+ 'error message: [%(err_str)s]'), info)
+ time.sleep(interval)
+ self.queue.declare()
+ else:
+ raise
+
self._declared_on = conn.channel
def consume(self, conn, tag):