diff options
Diffstat (limited to 'oslo_messaging/_drivers/amqpdriver.py')
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 589baf5..24fdbc7 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object): class AMQPListener(base.PollStyleListener): + use_cache = False def __init__(self, driver, conn): super(AMQPListener, self).__init__(driver.prefetch_size) @@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener): def __call__(self, message): ctxt = rpc_amqp.unpack_context(message) - unique_id = self.msg_id_cache.check_duplicate_message(message) + try: + unique_id = self.msg_id_cache.check_duplicate_message(message) + except rpc_common.DuplicateMessageError: + LOG.exception("ignoring duplicate message %s", ctxt.msg_id) + return + if self.use_cache: + self.msg_id_cache.add(unique_id) if ctxt.msg_id: LOG.debug("received message msg_id: %(msg_id)s reply to " "%(queue)s", {'queue': ctxt.reply_q, @@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener): class RpcAMQPListener(AMQPListener): message_cls = AMQPIncomingMessage + use_cache = True def __call__(self, message): # NOTE(kgiusti): In the original RPC implementation the RPC server |