summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/amqpdriver.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers/amqpdriver.py')
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py10
1 files changed, 9 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 589baf5..24fdbc7 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object):
class AMQPListener(base.PollStyleListener):
+ use_cache = False
def __init__(self, driver, conn):
super(AMQPListener, self).__init__(driver.prefetch_size)
@@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener):
def __call__(self, message):
ctxt = rpc_amqp.unpack_context(message)
- unique_id = self.msg_id_cache.check_duplicate_message(message)
+ try:
+ unique_id = self.msg_id_cache.check_duplicate_message(message)
+ except rpc_common.DuplicateMessageError:
+ LOG.exception("ignoring duplicate message %s", ctxt.msg_id)
+ return
+ if self.use_cache:
+ self.msg_id_cache.add(unique_id)
if ctxt.msg_id:
LOG.debug("received message msg_id: %(msg_id)s reply to "
"%(queue)s", {'queue': ctxt.reply_q,
@@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener):
class RpcAMQPListener(AMQPListener):
message_cls = AMQPIncomingMessage
+ use_cache = True
def __call__(self, message):
# NOTE(kgiusti): In the original RPC implementation the RPC server