summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNikita Kalyanov <nikitakalyanov@gmail.com>2021-07-13 03:31:27 +0300
committerNikita Kalyanov <nikitakalyanov@gmail.com>2021-09-10 11:19:43 +0300
commit129c223307141be1bc767dfca704b06192458d5b (patch)
tree95ea5979ef1ee6336959b81cb9eacefa6ca13850
parentbdcf915e788bb368774e5462ccc15e6f5b7223d7 (diff)
downloadoslo-messaging-129c223307141be1bc767dfca704b06192458d5b.tar.gz
use message id cache for RPC listener
Return back the message id cache feature to RPC listener, it was removed while refactoring in I708c3d6676b974d8daac6817c15f596cdf35817b See attached bug for more info. We should not raise DuplicateMessageError to avoid rejecting the previously ACK'ed message. Closes-Bug: #1935883 Change-Id: Ie237e9e3fdc3fc27b3deb18b94751cdc3afd190e
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py10
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py52
2 files changed, 61 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 589baf5..24fdbc7 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object):
class AMQPListener(base.PollStyleListener):
+ use_cache = False
def __init__(self, driver, conn):
super(AMQPListener, self).__init__(driver.prefetch_size)
@@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener):
def __call__(self, message):
ctxt = rpc_amqp.unpack_context(message)
- unique_id = self.msg_id_cache.check_duplicate_message(message)
+ try:
+ unique_id = self.msg_id_cache.check_duplicate_message(message)
+ except rpc_common.DuplicateMessageError:
+ LOG.exception("ignoring duplicate message %s", ctxt.msg_id)
+ return
+ if self.use_cache:
+ self.msg_id_cache.add(unique_id)
if ctxt.msg_id:
LOG.debug("received message msg_id: %(msg_id)s reply to "
"%(queue)s", {'queue': ctxt.reply_q,
@@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener):
class RpcAMQPListener(AMQPListener):
message_cls = AMQPIncomingMessage
+ use_cache = True
def __call__(self, message):
# NOTE(kgiusti): In the original RPC implementation the RPC server
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index f8882f2..8955661 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -1107,3 +1107,55 @@ class TestPollTimeoutLimit(test_utils.BaseTestCase):
{},
{'tx_id': 'test'})
thread.join()
+
+
+class TestMsgIdCache(test_utils.BaseTestCase):
+ @mock.patch('kombu.message.Message.reject')
+ def test_reply_wire_format(self, reject_mock):
+ self.conf.oslo_messaging_rabbit.kombu_compression = None
+
+ transport = oslo_messaging.get_transport(self.conf,
+ 'kombu+memory:////')
+ self.addCleanup(transport.cleanup)
+
+ driver = transport._driver
+
+ target = oslo_messaging.Target(topic='testtopic',
+ server=None,
+ fanout=False)
+
+ listener = driver.listen(target, None, None)._poll_style_listener
+
+ connection, producer = _create_producer(target)
+ self.addCleanup(connection.release)
+
+ msg = {
+ 'oslo.version': '2.0',
+ 'oslo.message': {}
+ }
+
+ msg['oslo.message'].update({
+ '_msg_id': uuid.uuid4().hex,
+ '_unique_id': uuid.uuid4().hex,
+ '_reply_q': 'reply_' + uuid.uuid4().hex,
+ '_timeout': None,
+ })
+
+ msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
+
+ producer.publish(msg)
+
+ received = listener.poll()[0]
+ self.assertIsNotNone(received)
+ self.assertEqual({}, received.message)
+
+ # publish the same message a second time
+ producer.publish(msg)
+
+ received = listener.poll(timeout=1)
+
+ # duplicate message is ignored
+ self.assertEqual(len(received), 0)
+
+ # we should not reject duplicate message
+ reject_mock.assert_not_called()