summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexei Kornienko <alexei.kornienko@gmail.com>2014-07-24 01:48:27 +0300
committerBogdan Dobrelya <bdobrelia@mirantis.com>2014-08-19 13:30:40 +0300
commit7381ccdc2afb71761b31f766be15ee21944d3650 (patch)
tree2ae264f1e28aa0a7b8d598e66cdb942950fd0c71
parent1401fd1da8bd0d7adb32513a22cd168d26377278 (diff)
downloadoslo-messaging-7381ccdc2afb71761b31f766be15ee21944d3650.tar.gz
Should not send replies for cast messages
Cast messages will not contain a msg_id and we're currently sending messages to the default excahgne with empty msg_id. In order to fix it we should not send replies if there is no msg_id in message. Change-Id: I5b1142029d2c718c3929cf6cf1f6e958b95a5c96 Closes-bug: #1355058
-rw-r--r--oslo/messaging/_drivers/amqpdriver.py4
-rw-r--r--tests/drivers/test_impl_rabbit.py29
2 files changed, 29 insertions, 4 deletions
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index c5b2378..f9f1b06 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -64,6 +64,10 @@ class AMQPIncomingMessage(base.IncomingMessage):
conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
def reply(self, reply=None, failure=None, log_failure=True):
+ if not self.msg_id:
+ # NOTE(Alexei_987) not sending reply, if msg_id is empty
+ # because reply should not be expected by caller side
+ return
with self.listener.driver._get_connection() as conn:
self._send_reply(conn, reply, failure, log_failure=log_failure)
self._send_reply(conn, ending=True)
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
index 05b2ad9..198252c 100644
--- a/tests/drivers/test_impl_rabbit.py
+++ b/tests/drivers/test_impl_rabbit.py
@@ -317,19 +317,25 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter)
- def send_and_wait_for_reply(i):
+ def send_and_wait_for_reply(i, wait_for_reply):
replies.append(driver.send(target,
{},
{'tx_id': i},
- wait_for_reply=True,
+ wait_for_reply=wait_for_reply,
timeout=None))
while len(senders) < 2:
t = threading.Thread(target=send_and_wait_for_reply,
- args=(len(senders), ))
+ args=(len(senders), True))
t.daemon = True
senders.append(t)
+ # test the case then msg_id is not set
+ t = threading.Thread(target=send_and_wait_for_reply,
+ args=(len(senders), False))
+ t.daemon = True
+ senders.append(t)
+
# Start the first guy, receive his message, but delay his polling
notify_condition = threading.Condition()
wait_conditions.append(notify_condition)
@@ -354,6 +360,20 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
# Wait for the second thread to finish
senders[1].join()
+ # Start the 3rd guy, receive his message
+ senders[2].start()
+
+ msgs.append(listener.poll())
+ self.assertEqual({'tx_id': 2}, msgs[-1].message)
+
+ # Verify the _send_reply was not invoked by driver:
+ with mock.patch.object(msgs[2], '_send_reply') as method:
+ msgs[2].reply({'rx_id': 2})
+ self.assertEqual(method.call_count, 0)
+
+ # Wait for the 3rd thread to finish
+ senders[2].join()
+
# Let the first thread continue
with notify_condition:
notify_condition.notify()
@@ -364,7 +384,8 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
# Verify replies were received out of order
self.assertEqual(len(senders), len(replies))
self.assertEqual({'rx_id': 1}, replies[0])
- self.assertEqual({'rx_id': 0}, replies[1])
+ self.assertIsNone(replies[1])
+ self.assertEqual({'rx_id': 0}, replies[2])
def _declare_queue(target):