From 129c223307141be1bc767dfca704b06192458d5b Mon Sep 17 00:00:00 2001 From: Nikita Kalyanov Date: Tue, 13 Jul 2021 03:31:27 +0300 Subject: use message id cache for RPC listener Return back the message id cache feature to RPC listener, it was removed while refactoring in I708c3d6676b974d8daac6817c15f596cdf35817b See attached bug for more info. We should not raise DuplicateMessageError to avoid rejecting the previously ACK'ed message. Closes-Bug: #1935883 Change-Id: Ie237e9e3fdc3fc27b3deb18b94751cdc3afd190e --- oslo_messaging/_drivers/amqpdriver.py | 10 ++++- oslo_messaging/tests/drivers/test_impl_rabbit.py | 52 ++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 589baf5..24fdbc7 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object): class AMQPListener(base.PollStyleListener): + use_cache = False def __init__(self, driver, conn): super(AMQPListener, self).__init__(driver.prefetch_size) @@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener): def __call__(self, message): ctxt = rpc_amqp.unpack_context(message) - unique_id = self.msg_id_cache.check_duplicate_message(message) + try: + unique_id = self.msg_id_cache.check_duplicate_message(message) + except rpc_common.DuplicateMessageError: + LOG.exception("ignoring duplicate message %s", ctxt.msg_id) + return + if self.use_cache: + self.msg_id_cache.add(unique_id) if ctxt.msg_id: LOG.debug("received message msg_id: %(msg_id)s reply to " "%(queue)s", {'queue': ctxt.reply_q, @@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener): class RpcAMQPListener(AMQPListener): message_cls = AMQPIncomingMessage + use_cache = True def __call__(self, message): # NOTE(kgiusti): In the original RPC implementation the RPC server diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index f8882f2..8955661 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -1107,3 +1107,55 @@ class TestPollTimeoutLimit(test_utils.BaseTestCase): {}, {'tx_id': 'test'}) thread.join() + + +class TestMsgIdCache(test_utils.BaseTestCase): + @mock.patch('kombu.message.Message.reject') + def test_reply_wire_format(self, reject_mock): + self.conf.oslo_messaging_rabbit.kombu_compression = None + + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + + driver = transport._driver + + target = oslo_messaging.Target(topic='testtopic', + server=None, + fanout=False) + + listener = driver.listen(target, None, None)._poll_style_listener + + connection, producer = _create_producer(target) + self.addCleanup(connection.release) + + msg = { + 'oslo.version': '2.0', + 'oslo.message': {} + } + + msg['oslo.message'].update({ + '_msg_id': uuid.uuid4().hex, + '_unique_id': uuid.uuid4().hex, + '_reply_q': 'reply_' + uuid.uuid4().hex, + '_timeout': None, + }) + + msg['oslo.message'] = jsonutils.dumps(msg['oslo.message']) + + producer.publish(msg) + + received = listener.poll()[0] + self.assertIsNotNone(received) + self.assertEqual({}, received.message) + + # publish the same message a second time + producer.publish(msg) + + received = listener.poll(timeout=1) + + # duplicate message is ignored + self.assertEqual(len(received), 0) + + # we should not reject duplicate message + reject_mock.assert_not_called() -- cgit v1.2.1