diff options
-rw-r--r-- | python/tests/message.py | 82 |
1 files changed, 63 insertions, 19 deletions
diff --git a/python/tests/message.py b/python/tests/message.py index 0cb758506c..916a9825bd 100644 --- a/python/tests/message.py +++ b/python/tests/message.py @@ -267,7 +267,7 @@ class MessageTests(TestBase): for i in range(1, 6): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msgs.add(msg) + msgs.append(msg) try: extra = queue.get(timeout=1) self.fail("Got unexpected 6th message in original queue: " + extra.body) @@ -277,16 +277,16 @@ class MessageTests(TestBase): #todo: once batching is implmented, send a single response for all messages for msg in msgs: msg.ok() - msgs.clear() + del msgs for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msgs.add(msg) + msgs.append(msg) for msg in msgs: msg.ok() - msgs.clear() + del msgs try: extra = queue.get(timeout=1) @@ -317,7 +317,8 @@ class MessageTests(TestBase): for i in range(1, 6): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msgs.add(msg) + print "Got Message %d" % i + msgs.append(msg) try: extra = queue.get(timeout=1) @@ -327,16 +328,16 @@ class MessageTests(TestBase): #ack messages and check that the next set arrive ok: for msg in msgs: msg.ok() - msgs.clear() + del msgs for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msgs.add(msg) + msgs.append(msg) for msg in msgs: msg.ok() - msgs.clear() + del msgs try: extra = queue.get(timeout=1) @@ -363,12 +364,13 @@ class MessageTests(TestBase): #use message_get to read back the messages, and check that we get an empty at the end for i in range(1, 11): - reply = channel.message_get(no_ack=True) + tag = "queue %d" % i + reply = channel.message_get(no_ack=True, queue="test-get", destination=tag) self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - self.assertEqual("Message %d" % i, reply.body) + self.assertEqual(reply.method.name, "ok") + self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body) - reply = channel.message_get(no_ack=True) + reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "get-empty") @@ -377,10 +379,11 @@ class MessageTests(TestBase): channel.message_transfer(routing_key="test-get", body="Message %d" % i) for i in range(11, 21): - reply = channel.message_get(no_ack=False) + tag = "queue %d" % i + reply = channel.message_get(no_ack=False, queue="test-get", destination=tag) self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "ok") - self.assertEqual("Message %d" % i, reply.body) + self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body) reply.ok() #todo: when batching is available, test ack multiple @@ -389,7 +392,7 @@ class MessageTests(TestBase): #if(i in [15, 17, 19]): # channel.message_ack(delivery_tag=reply.delivery_tag) - reply = channel.message_get(no_ack=True) + reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "get-empty") @@ -398,20 +401,21 @@ class MessageTests(TestBase): #get the unacked messages again (14, 16, 18, 20) for i in [14, 16, 18, 20]: - reply = channel.message_get(no_ack=False) + tag = "queue %d" % i + reply = channel.message_get(no_ack=False, queue="test-get", destination=tag) self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "ok") - self.assertEqual("Message %d" % i, reply.body) + self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body) reply.ok() #channel.message_ack(delivery_tag=reply.delivery_tag) - reply = channel.message_get(no_ack=True) + reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "get-empty") channel.message_recover(requeue=True) - reply = channel.message_get(no_ack=True) + reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "get-empty") @@ -477,3 +481,43 @@ class MessageTests(TestBase): msg = queue.get(timeout=1) self.assertTrue(isinstance(msg, Reference)) self.assertEquals(data, msg.get_complete()) + + def test_reference_completion(self): + """ + Test that reference transfer are not deemed complete until + closed (therefore are not acked or routed until that point) + """ + channel = self.channel + channel.queue_declare(queue="ref_queue", exclusive=True) + channel.message_consume(queue="ref_queue", destination="c1") + queue = self.client.queue("c1") + + refId = "myref" + channel.message_open(reference=refId) + channel.message_append(reference=refId, bytes="abcd") + channel.synchronous = False + ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) + channel.synchronous = True + + try: + msg = queue.get(timeout=1) + self.fail("Got unexpected message on queue: " + msg) + except Empty: None + + self.assertTrue(not ack.is_complete()) + + channel.message_close(reference=refId) + + #first, wait for the ok for the transfer + ack.get_response(timeout=1) + + msg = queue.get(timeout=1) + if isinstance(msg, Reference): + #should we force broker to deliver as reference by frame + #size limit? or test that separately? for compliance, + #allowing either seems best for now... + data = msg.get_complete() + else: + data = msg.body + self.assertEquals("abcdefghijkl", data) + |