summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/amqpdriver.py
diff options
context:
space:
mode:
authorNikita Kalyanov <nikitakalyanov@gmail.com>2021-07-13 03:31:27 +0300
committerNikita Kalyanov <nikitakalyanov@gmail.com>2021-09-10 11:19:43 +0300
commit129c223307141be1bc767dfca704b06192458d5b (patch)
tree95ea5979ef1ee6336959b81cb9eacefa6ca13850 /oslo_messaging/_drivers/amqpdriver.py
parentbdcf915e788bb368774e5462ccc15e6f5b7223d7 (diff)
downloadoslo-messaging-129c223307141be1bc767dfca704b06192458d5b.tar.gz
use message id cache for RPC listener
Return back the message id cache feature to RPC listener, it was removed while refactoring in I708c3d6676b974d8daac6817c15f596cdf35817b See attached bug for more info. We should not raise DuplicateMessageError to avoid rejecting the previously ACK'ed message. Closes-Bug: #1935883 Change-Id: Ie237e9e3fdc3fc27b3deb18b94751cdc3afd190e
Diffstat (limited to 'oslo_messaging/_drivers/amqpdriver.py')
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py10
1 files changed, 9 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 589baf5..24fdbc7 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object):
class AMQPListener(base.PollStyleListener):
+ use_cache = False
def __init__(self, driver, conn):
super(AMQPListener, self).__init__(driver.prefetch_size)
@@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener):
def __call__(self, message):
ctxt = rpc_amqp.unpack_context(message)
- unique_id = self.msg_id_cache.check_duplicate_message(message)
+ try:
+ unique_id = self.msg_id_cache.check_duplicate_message(message)
+ except rpc_common.DuplicateMessageError:
+ LOG.exception("ignoring duplicate message %s", ctxt.msg_id)
+ return
+ if self.use_cache:
+ self.msg_id_cache.add(unique_id)
if ctxt.msg_id:
LOG.debug("received message msg_id: %(msg_id)s reply to "
"%(queue)s", {'queue': ctxt.reply_q,
@@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener):
class RpcAMQPListener(AMQPListener):
message_cls = AMQPIncomingMessage
+ use_cache = True
def __call__(self, message):
# NOTE(kgiusti): In the original RPC implementation the RPC server