diff options
author | Gabriele <gsantomaggio@suse.com> | 2019-04-04 14:56:25 +0200 |
---|---|---|
committer | Gabriele Santomaggio <g.santomaggio@gmail.com> | 2019-04-23 07:50:24 +0000 |
commit | 6ea1bb5fad84079d07d8ad48d2c2a71aa29cdda4 (patch) | |
tree | 5a3a9f8c59a230f76649b0bef1744bb20e0f136c | |
parent | 50b4451503122fc100fa1d8714d052267a0d5650 (diff) | |
download | oslo-messaging-6ea1bb5fad84079d07d8ad48d2c2a71aa29cdda4.tar.gz |
Retry to declare a queue after internal error5.35.5
Without this commit, the client can lose the messages, because the
client does not handler the 'AMQP internal error 541',
read here [2] for details.
The fix retries to create the queue after a delay.
When the virtual-host is ready the declare does not fail.
This is a rare condiction, please read the bug [1] for details.
Closes-Bug: #1822778
[1] https://bugs.launchpad.net/oslo.messaging/+bug/1822778
[2] https://www.rabbitmq.com/amqp-0-9-1-reference.html
Change-Id: I7ab1f9d21ebb807285bf1422bc14cc6e07dcd32a
(cherry picked from commit 4d2787227b00b973973554f7387e621d2664c0d8)
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 23 |
1 files changed, 22 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index d1ce127..5f0afd5 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -277,7 +277,6 @@ class Consumer(object): self.nowait = nowait self.queue_arguments = _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl) - self.queue = None self._declared_on = None self.exchange = kombu.entity.Exchange( @@ -312,6 +311,28 @@ class Consumer(object): self.queue.declare() else: raise + except kombu.exceptions.ConnectionError as exc: + # NOTE(gsantomaggio): This exception happens when the + # connection is established,but it fails to create the queue. + # Add some delay to avoid too many requests to the server. + # See: https://bugs.launchpad.net/oslo.messaging/+bug/1822778 + # for details. + if exc.code == 541: + interval = 2 + info = {'sleep_time': interval, + 'queue': self.queue_name, + 'err_str': exc + } + LOG.error(_LE('Internal amqp error (541) ' + 'during queue declare,' + 'retrying in %(sleep_time)s seconds. ' + 'Queue: [%(queue)s], ' + 'error message: [%(err_str)s]'), info) + time.sleep(interval) + self.queue.declare() + else: + raise + self._declared_on = conn.channel def consume(self, conn, tag): |