diff options
author | Hervé Beraud <hberaud@redhat.com> | 2021-01-08 15:54:54 +0100 |
---|---|---|
committer | Hervé Beraud <hberaud@redhat.com> | 2021-03-15 13:17:17 +0100 |
commit | 82281a0d4e6abffa93175f30b2cbd7acca3cf9fb (patch) | |
tree | 3cb7aa4562a8af5feee54bb52374c4d1839c23ef | |
parent | a0aa3a88a370c33baf733fff39de8722e4c7e3e3 (diff) | |
download | oslo-messaging-10.2.4.tar.gz |
Currently, setting the '[oslo_messaging] direct_mandatory_flag' config
option to 'True' (the default) will result in a 'MessageUndeliverable'
exception being raised when sending a reply if a RabbitMQ queue is
missing [1]. It was the responsibility of the application to handle
this exception, however, many applications are not doing so. This has
resulted in a number of bug reports.
Start handling this error condition, using a retry loop to attempt to
resend the message and work around any temporary glitches. Since
attempting to send a reply will will no longer raise an exception,
there is little benefit in retaining the '[oslo_messaging]
direct_mandatory_flag' config option: users setting this to False will
simply not benefit from the retry logic and improved logging added
here. This option is already deprecated though and will be fully
removed in a future release.
[1] https://www.rabbitmq.com/channels.html
Change-Id: Id5cddbefbe24ef100f1cc522f44430df77d217cb
Closes-Bug: #1905965
(cherry picked from commit 4937949dffecdf8863a7876e5a6b0b18e811c3ac)
(cherry picked from commit 391ce7fc69adc8d713d8ca64e76d901eb3e65df1)
(cherry picked from commit 36b5e06c92337cd04d1c8ae4840dbc224faeaa9f)
-rw-r--r-- | doc/source/admin/rabbit.rst | 3 | ||||
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 68 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 3 | ||||
-rw-r--r-- | releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml | 5 |
4 files changed, 58 insertions, 21 deletions
diff --git a/doc/source/admin/rabbit.rst b/doc/source/admin/rabbit.rst index 72c438c..687bc42 100644 --- a/doc/source/admin/rabbit.rst +++ b/doc/source/admin/rabbit.rst @@ -66,7 +66,8 @@ flag is used`_. through the *Connection* class. With mandatory flag RabbitMQ raises a callback if the message is not routed to -any queue. +any queue. This callback will be used to loop for a timeout and let's a chance +to sender to recover. .. _Exchange is a AMQP mechanism: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges .. _queues: https://www.rabbitmq.com/queues.html diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 18bcd24..d4658cd 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -143,39 +143,67 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): while True: try: with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: + rpc_common.PURPOSE_SEND, + ) as conn: self._send_reply(conn, reply, failure) + return - except rpc_amqp.AMQPDestinationNotFound: - if timer.check_return() > 0: - LOG.debug(("The reply %(msg_id)s cannot be sent " - "%(reply_q)s reply queue doesn't exist, " - "retrying..."), { - 'msg_id': self.msg_id, - 'reply_q': self.reply_q}) - time.sleep(0.25) - else: + except oslo_messaging.MessageUndeliverable: + # queue not found + if timer.check_return() <= 0: self._obsolete_reply_queues.add(self.reply_q, self.msg_id) - infos = { + LOG.error( + 'The reply %(msg_id)s failed to send after ' + '%(duration)d seconds due to a missing queue ' + '(%(reply_q)s). Abandoning...', { + 'msg_id': self.msg_id, + 'duration': duration, + 'reply_q': self.reply_q}) + return + + LOG.debug( + 'The reply %(msg_id)s could not be sent due to a missing ' + 'queue (%(reply_q)s). Retrying...', { 'msg_id': self.msg_id, - 'reply_q': self.reply_q, - 'duration': duration - } - LOG.info("The reply %(msg_id)s cannot be sent " - "%(reply_q)s reply queue don't exist after " - "%(duration)s sec abandoning...", infos) + 'reply_q': self.reply_q}) + time.sleep(0.25) + except rpc_amqp.AMQPDestinationNotFound as exc: + # exchange not found/down + if timer.check_return() <= 0: + self._obsolete_reply_queues.add(self.reply_q, self.msg_id) + LOG.error( + 'The reply %(msg_id)s failed to send after ' + '%(duration)d seconds due to a broker issue ' + '(%(exc)s). Abandoning...', { + 'msg_id': self.msg_id, + 'duration': duration, + 'exc': exc}) return + LOG.debug( + 'The reply %(msg_id)s could not be sent due to a broker ' + 'issue (%(exc)s). Retrying...', { + 'msg_id': self.msg_id, + 'exc': exc}) + time.sleep(0.25) + def heartbeat(self): # generate a keep alive for RPC call monitoring with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: + rpc_common.PURPOSE_SEND, + ) as conn: try: self._send_reply(conn, None, None, ending=False) + except oslo_messaging.MessageUndeliverable: + # internal exception that indicates queue gone - + # broker unreachable. + raise MessageDeliveryFailure( + "Heartbeat send failed. Missing queue") except rpc_amqp.AMQPDestinationNotFound: - # internal exception that indicates queue/exchange gone - + # internal exception that indicates exchange gone - # broker unreachable. - raise MessageDeliveryFailure("Heartbeat send failed") + raise MessageDeliveryFailure( + "Heartbeat send failed. Missing exchange") # NOTE(sileht): Those have already be ack in RpcListener IO thread # We keep them as noop until all drivers do the same diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 504fb71..6b89f89 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -176,6 +176,8 @@ rabbit_opts = [ 'flag for direct send. The direct send is used as reply, ' 'so the MessageUndeliverable exception is raised ' 'in case the client queue does not exist.' + 'MessageUndeliverable exception will be used to loop for a ' + 'timeout to lets a chance to sender to recover.' 'This flag is deprecated and it will not be possible to ' 'deactivate this functionality anymore'), cfg.BoolOpt('enable_cancel_on_failover', @@ -517,6 +519,7 @@ class Connection(object): # if it was already monkey patched by eventlet/greenlet. global threading threading = stdlib_threading + self.direct_mandatory_flag = driver_conf.direct_mandatory_flag if self.ssl: diff --git a/releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml b/releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml new file mode 100644 index 0000000..0407e62 --- /dev/null +++ b/releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Adding retry strategy based on the mandatory flag. Missing exchanges and + queues are now identified separately for logging purposes. |