diff options
author | Nikita Kalyanov <nikitakalyanov@gmail.com> | 2021-07-13 03:31:27 +0300 |
---|---|---|
committer | Nikita Kalyanov <nikitakalyanov@gmail.com> | 2021-09-10 11:19:43 +0300 |
commit | 129c223307141be1bc767dfca704b06192458d5b (patch) | |
tree | 95ea5979ef1ee6336959b81cb9eacefa6ca13850 /oslo_messaging/_drivers/amqpdriver.py | |
parent | bdcf915e788bb368774e5462ccc15e6f5b7223d7 (diff) | |
download | oslo-messaging-129c223307141be1bc767dfca704b06192458d5b.tar.gz |
use message id cache for RPC listener
Return back the message id cache feature to RPC listener, it was
removed while refactoring in I708c3d6676b974d8daac6817c15f596cdf35817b
See attached bug for more info.
We should not raise DuplicateMessageError to avoid rejecting the
previously ACK'ed message.
Closes-Bug: #1935883
Change-Id: Ie237e9e3fdc3fc27b3deb18b94751cdc3afd190e
Diffstat (limited to 'oslo_messaging/_drivers/amqpdriver.py')
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 589baf5..24fdbc7 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object): class AMQPListener(base.PollStyleListener): + use_cache = False def __init__(self, driver, conn): super(AMQPListener, self).__init__(driver.prefetch_size) @@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener): def __call__(self, message): ctxt = rpc_amqp.unpack_context(message) - unique_id = self.msg_id_cache.check_duplicate_message(message) + try: + unique_id = self.msg_id_cache.check_duplicate_message(message) + except rpc_common.DuplicateMessageError: + LOG.exception("ignoring duplicate message %s", ctxt.msg_id) + return + if self.use_cache: + self.msg_id_cache.add(unique_id) if ctxt.msg_id: LOG.debug("received message msg_id: %(msg_id)s reply to " "%(queue)s", {'queue': ctxt.reply_q, @@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener): class RpcAMQPListener(AMQPListener): message_cls = AMQPIncomingMessage + use_cache = True def __call__(self, message): # NOTE(kgiusti): In the original RPC implementation the RPC server |