diff options
author | Nikita Kalyanov <nikitakalyanov@gmail.com> | 2021-07-13 03:31:27 +0300 |
---|---|---|
committer | Nikita Kalyanov <nikitakalyanov@gmail.com> | 2021-09-10 11:19:43 +0300 |
commit | 129c223307141be1bc767dfca704b06192458d5b (patch) | |
tree | 95ea5979ef1ee6336959b81cb9eacefa6ca13850 | |
parent | bdcf915e788bb368774e5462ccc15e6f5b7223d7 (diff) | |
download | oslo-messaging-129c223307141be1bc767dfca704b06192458d5b.tar.gz |
use message id cache for RPC listener
Return back the message id cache feature to RPC listener, it was
removed while refactoring in I708c3d6676b974d8daac6817c15f596cdf35817b
See attached bug for more info.
We should not raise DuplicateMessageError to avoid rejecting the
previously ACK'ed message.
Closes-Bug: #1935883
Change-Id: Ie237e9e3fdc3fc27b3deb18b94751cdc3afd190e
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 10 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_rabbit.py | 52 |
2 files changed, 61 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 589baf5..24fdbc7 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object): class AMQPListener(base.PollStyleListener): + use_cache = False def __init__(self, driver, conn): super(AMQPListener, self).__init__(driver.prefetch_size) @@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener): def __call__(self, message): ctxt = rpc_amqp.unpack_context(message) - unique_id = self.msg_id_cache.check_duplicate_message(message) + try: + unique_id = self.msg_id_cache.check_duplicate_message(message) + except rpc_common.DuplicateMessageError: + LOG.exception("ignoring duplicate message %s", ctxt.msg_id) + return + if self.use_cache: + self.msg_id_cache.add(unique_id) if ctxt.msg_id: LOG.debug("received message msg_id: %(msg_id)s reply to " "%(queue)s", {'queue': ctxt.reply_q, @@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener): class RpcAMQPListener(AMQPListener): message_cls = AMQPIncomingMessage + use_cache = True def __call__(self, message): # NOTE(kgiusti): In the original RPC implementation the RPC server diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index f8882f2..8955661 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -1107,3 +1107,55 @@ class TestPollTimeoutLimit(test_utils.BaseTestCase): {}, {'tx_id': 'test'}) thread.join() + + +class TestMsgIdCache(test_utils.BaseTestCase): + @mock.patch('kombu.message.Message.reject') + def test_reply_wire_format(self, reject_mock): + self.conf.oslo_messaging_rabbit.kombu_compression = None + + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + + driver = transport._driver + + target = oslo_messaging.Target(topic='testtopic', + server=None, + fanout=False) + + listener = driver.listen(target, None, None)._poll_style_listener + + connection, producer = _create_producer(target) + self.addCleanup(connection.release) + + msg = { + 'oslo.version': '2.0', + 'oslo.message': {} + } + + msg['oslo.message'].update({ + '_msg_id': uuid.uuid4().hex, + '_unique_id': uuid.uuid4().hex, + '_reply_q': 'reply_' + uuid.uuid4().hex, + '_timeout': None, + }) + + msg['oslo.message'] = jsonutils.dumps(msg['oslo.message']) + + producer.publish(msg) + + received = listener.poll()[0] + self.assertIsNotNone(received) + self.assertEqual({}, received.message) + + # publish the same message a second time + producer.publish(msg) + + received = listener.poll(timeout=1) + + # duplicate message is ignored + self.assertEqual(len(received), 0) + + # we should not reject duplicate message + reject_mock.assert_not_called() |