summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo/messaging/_drivers/amqpdriver.py8
-rw-r--r--tests/test_rabbit.py22
2 files changed, 22 insertions, 8 deletions
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index 0dacec4..6ab24dc 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -215,9 +215,9 @@ class ReplyWaiter(object):
try:
while True:
reply, ending = self._poll_connection(msg_id, timeout)
- if reply:
+ if not ending:
final_reply = reply
- elif ending:
+ else:
return final_reply
finally:
self.conn_lock.release()
@@ -232,9 +232,9 @@ class ReplyWaiter(object):
# The first thread got its reply, let's try and take over
# the responsibility for polling
continue
- if reply:
+ if not ending:
final_reply = reply
- elif ending:
+ else:
return final_reply
diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py
index 5cff175..142252c 100644
--- a/tests/test_rabbit.py
+++ b/tests/test_rabbit.py
@@ -114,6 +114,15 @@ class TestSendReceive(test_utils.BaseTestCase):
('with_context', dict(ctxt={'user': 'mark'})),
]
+ _reply = [
+ ('rx_id', dict(rx_id=True, reply=None)),
+ ('none', dict(rx_id=False, reply=None)),
+ ('empty_list', dict(rx_id=False, reply=[])),
+ ('empty_dict', dict(rx_id=False, reply={})),
+ ('false', dict(rx_id=False, reply=False)),
+ ('zero', dict(rx_id=False, reply=0)),
+ ]
+
_failure = [
('success', dict(failure=False)),
('failure', dict(failure=True, expected=False)),
@@ -129,6 +138,7 @@ class TestSendReceive(test_utils.BaseTestCase):
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
cls._context,
+ cls._reply,
cls._failure,
cls._timeout)
@@ -163,7 +173,7 @@ class TestSendReceive(test_utils.BaseTestCase):
try:
replies.append(driver.send(target,
self.ctxt,
- {'foo': i},
+ {'tx_id': i},
wait_for_reply=True,
timeout=self.timeout))
self.assertFalse(self.failure)
@@ -182,7 +192,7 @@ class TestSendReceive(test_utils.BaseTestCase):
received = listener.poll()
self.assertIsNotNone(received)
self.assertEqual(received.ctxt, self.ctxt)
- self.assertEqual(received.message, {'foo': i})
+ self.assertEqual(received.message, {'tx_id': i})
msgs.append(received)
# reply in reverse, except reply to the first guy second from last
@@ -199,8 +209,10 @@ class TestSendReceive(test_utils.BaseTestCase):
failure = sys.exc_info()
msgs[i].reply(failure=failure,
log_failure=not self.expected)
+ elif self.rx_id:
+ msgs[i].reply({'rx_id': i})
else:
- msgs[i].reply({'bar': msgs[i].message['foo']})
+ msgs[i].reply(self.reply)
senders[i].join()
self.assertEqual(len(replies), len(senders))
@@ -209,8 +221,10 @@ class TestSendReceive(test_utils.BaseTestCase):
self.assertIsInstance(reply, messaging.MessagingTimeout)
elif self.failure:
self.assertIsInstance(reply, ZeroDivisionError)
+ elif self.rx_id:
+ self.assertEqual(reply, {'rx_id': order[i]})
else:
- self.assertEqual(reply, {'bar': order[i]})
+ self.assertEqual(reply, self.reply)
if not self.timeout and self.failure and not self.expected:
self.assertTrue(len(errors) > 0, errors)