summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2021-09-13 16:54:59 +0000
committerGerrit Code Review <review@openstack.org>2021-09-13 16:54:59 +0000
commitd4f7ea21fc10ec00aaac5ca16bcb5903c3e81bac (patch)
tree60587944198b0a9799bc65362dc17050f83dc4db
parentef0b31f1120ab10d83d1d56030ccd71494b2289a (diff)
parent129c223307141be1bc767dfca704b06192458d5b (diff)
downloadoslo-messaging-d4f7ea21fc10ec00aaac5ca16bcb5903c3e81bac.tar.gz
Merge "use message id cache for RPC listener"
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py10
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py52
2 files changed, 61 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 589baf5..24fdbc7 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object):
class AMQPListener(base.PollStyleListener):
+ use_cache = False
def __init__(self, driver, conn):
super(AMQPListener, self).__init__(driver.prefetch_size)
@@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener):
def __call__(self, message):
ctxt = rpc_amqp.unpack_context(message)
- unique_id = self.msg_id_cache.check_duplicate_message(message)
+ try:
+ unique_id = self.msg_id_cache.check_duplicate_message(message)
+ except rpc_common.DuplicateMessageError:
+ LOG.exception("ignoring duplicate message %s", ctxt.msg_id)
+ return
+ if self.use_cache:
+ self.msg_id_cache.add(unique_id)
if ctxt.msg_id:
LOG.debug("received message msg_id: %(msg_id)s reply to "
"%(queue)s", {'queue': ctxt.reply_q,
@@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener):
class RpcAMQPListener(AMQPListener):
message_cls = AMQPIncomingMessage
+ use_cache = True
def __call__(self, message):
# NOTE(kgiusti): In the original RPC implementation the RPC server
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index f8882f2..8955661 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -1107,3 +1107,55 @@ class TestPollTimeoutLimit(test_utils.BaseTestCase):
{},
{'tx_id': 'test'})
thread.join()
+
+
+class TestMsgIdCache(test_utils.BaseTestCase):
+ @mock.patch('kombu.message.Message.reject')
+ def test_reply_wire_format(self, reject_mock):
+ self.conf.oslo_messaging_rabbit.kombu_compression = None
+
+ transport = oslo_messaging.get_transport(self.conf,
+ 'kombu+memory:////')
+ self.addCleanup(transport.cleanup)
+
+ driver = transport._driver
+
+ target = oslo_messaging.Target(topic='testtopic',
+ server=None,
+ fanout=False)
+
+ listener = driver.listen(target, None, None)._poll_style_listener
+
+ connection, producer = _create_producer(target)
+ self.addCleanup(connection.release)
+
+ msg = {
+ 'oslo.version': '2.0',
+ 'oslo.message': {}
+ }
+
+ msg['oslo.message'].update({
+ '_msg_id': uuid.uuid4().hex,
+ '_unique_id': uuid.uuid4().hex,
+ '_reply_q': 'reply_' + uuid.uuid4().hex,
+ '_timeout': None,
+ })
+
+ msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
+
+ producer.publish(msg)
+
+ received = listener.poll()[0]
+ self.assertIsNotNone(received)
+ self.assertEqual({}, received.message)
+
+ # publish the same message a second time
+ producer.publish(msg)
+
+ received = listener.poll(timeout=1)
+
+ # duplicate message is ignored
+ self.assertEqual(len(received), 0)
+
+ # we should not reject duplicate message
+ reject_mock.assert_not_called()