diff options
author | Zuul <zuul@review.opendev.org> | 2021-09-13 16:54:59 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2021-09-13 16:54:59 +0000 |
commit | d4f7ea21fc10ec00aaac5ca16bcb5903c3e81bac (patch) | |
tree | 60587944198b0a9799bc65362dc17050f83dc4db /oslo_messaging/_drivers/amqpdriver.py | |
parent | ef0b31f1120ab10d83d1d56030ccd71494b2289a (diff) | |
parent | 129c223307141be1bc767dfca704b06192458d5b (diff) | |
download | oslo-messaging-d4f7ea21fc10ec00aaac5ca16bcb5903c3e81bac.tar.gz |
Merge "use message id cache for RPC listener"
Diffstat (limited to 'oslo_messaging/_drivers/amqpdriver.py')
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 589baf5..24fdbc7 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object): class AMQPListener(base.PollStyleListener): + use_cache = False def __init__(self, driver, conn): super(AMQPListener, self).__init__(driver.prefetch_size) @@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener): def __call__(self, message): ctxt = rpc_amqp.unpack_context(message) - unique_id = self.msg_id_cache.check_duplicate_message(message) + try: + unique_id = self.msg_id_cache.check_duplicate_message(message) + except rpc_common.DuplicateMessageError: + LOG.exception("ignoring duplicate message %s", ctxt.msg_id) + return + if self.use_cache: + self.msg_id_cache.add(unique_id) if ctxt.msg_id: LOG.debug("received message msg_id: %(msg_id)s reply to " "%(queue)s", {'queue': ctxt.reply_q, @@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener): class RpcAMQPListener(AMQPListener): message_cls = AMQPIncomingMessage + use_cache = True def __call__(self, message): # NOTE(kgiusti): In the original RPC implementation the RPC server |