summaryrefslogtreecommitdiff
path: root/python/tests/message.py
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-03-19 20:02:29 +0000
committerAlan Conway <aconway@apache.org>2007-03-19 20:02:29 +0000
commitf934d8a2d5d91d802fcc791a8492bd92abafc327 (patch)
tree7c5b7375724054348150b8ab714f0f3f79733f12 /python/tests/message.py
parentce22dfa716f7b2727ead7693cac014b278421b0c (diff)
downloadqpid-python-f934d8a2d5d91d802fcc791a8492bd92abafc327.tar.gz
Merged revisions 504601-504602,504604-504609,504611-504702,504704-504707,504709-504849 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9 ........ r504601 | gsim | 2007-02-07 11:09:16 -0500 (Wed, 07 Feb 2007) | 3 lines Added list of valid responses to method descriptions where appropriate. ........ r504700 | aconway | 2007-02-07 16:30:32 -0500 (Wed, 07 Feb 2007) | 2 lines Use self.queue_open to ensure deletion of queue - was clashing with message.py tests. ........ r504849 | gsim | 2007-02-08 05:14:50 -0500 (Thu, 08 Feb 2007) | 3 lines Fixes to qos and get tests. Added test for correct completion of references. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@520073 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests/message.py')
-rw-r--r--python/tests/message.py86
1 files changed, 65 insertions, 21 deletions
diff --git a/python/tests/message.py b/python/tests/message.py
index 10d0f51448..916a9825bd 100644
--- a/python/tests/message.py
+++ b/python/tests/message.py
@@ -267,7 +267,7 @@ class MessageTests(TestBase):
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.add(msg)
+ msgs.append(msg)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 6th message in original queue: " + extra.body)
@@ -277,16 +277,16 @@ class MessageTests(TestBase):
#todo: once batching is implmented, send a single response for all messages
for msg in msgs:
msg.ok()
- msgs.clear()
+ del msgs
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.add(msg)
+ msgs.append(msg)
for msg in msgs:
msg.ok()
- msgs.clear()
+ del msgs
try:
extra = queue.get(timeout=1)
@@ -317,7 +317,8 @@ class MessageTests(TestBase):
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.add(msg)
+ print "Got Message %d" % i
+ msgs.append(msg)
try:
extra = queue.get(timeout=1)
@@ -327,16 +328,16 @@ class MessageTests(TestBase):
#ack messages and check that the next set arrive ok:
for msg in msgs:
msg.ok()
- msgs.clear()
+ del msgs
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.add(msg)
+ msgs.append(msg)
for msg in msgs:
msg.ok()
- msgs.clear()
+ del msgs
try:
extra = queue.get(timeout=1)
@@ -363,12 +364,13 @@ class MessageTests(TestBase):
#use message_get to read back the messages, and check that we get an empty at the end
for i in range(1, 11):
- reply = channel.message_get(no_ack=True)
+ tag = "queue %d" % i
+ reply = channel.message_get(no_ack=True, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "get-ok")
- self.assertEqual("Message %d" % i, reply.body)
+ self.assertEqual(reply.method.name, "ok")
+ self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
- reply = channel.message_get(no_ack=True)
+ reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "get-empty")
@@ -377,10 +379,11 @@ class MessageTests(TestBase):
channel.message_transfer(routing_key="test-get", body="Message %d" % i)
for i in range(11, 21):
- reply = channel.message_get(no_ack=False)
+ tag = "queue %d" % i
+ reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "get-ok")
- self.assertEqual("Message %d" % i, reply.body)
+ self.assertEqual(reply.method.name, "ok")
+ self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
reply.ok()
#todo: when batching is available, test ack multiple
@@ -389,7 +392,7 @@ class MessageTests(TestBase):
#if(i in [15, 17, 19]):
# channel.message_ack(delivery_tag=reply.delivery_tag)
- reply = channel.message_get(no_ack=True)
+ reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "get-empty")
@@ -398,20 +401,21 @@ class MessageTests(TestBase):
#get the unacked messages again (14, 16, 18, 20)
for i in [14, 16, 18, 20]:
- reply = channel.message_get(no_ack=False)
+ tag = "queue %d" % i
+ reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "get-ok")
- self.assertEqual("Message %d" % i, reply.body)
+ self.assertEqual(reply.method.name, "ok")
+ self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
reply.ok()
#channel.message_ack(delivery_tag=reply.delivery_tag)
- reply = channel.message_get(no_ack=True)
+ reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "get-empty")
channel.message_recover(requeue=True)
- reply = channel.message_get(no_ack=True)
+ reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "get-empty")
@@ -477,3 +481,43 @@ class MessageTests(TestBase):
msg = queue.get(timeout=1)
self.assertTrue(isinstance(msg, Reference))
self.assertEquals(data, msg.get_complete())
+
+ def test_reference_completion(self):
+ """
+ Test that reference transfer are not deemed complete until
+ closed (therefore are not acked or routed until that point)
+ """
+ channel = self.channel
+ channel.queue_declare(queue="ref_queue", exclusive=True)
+ channel.message_consume(queue="ref_queue", destination="c1")
+ queue = self.client.queue("c1")
+
+ refId = "myref"
+ channel.message_open(reference=refId)
+ channel.message_append(reference=refId, bytes="abcd")
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
+ channel.synchronous = True
+
+ try:
+ msg = queue.get(timeout=1)
+ self.fail("Got unexpected message on queue: " + msg)
+ except Empty: None
+
+ self.assertTrue(not ack.is_complete())
+
+ channel.message_close(reference=refId)
+
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
+ msg = queue.get(timeout=1)
+ if isinstance(msg, Reference):
+ #should we force broker to deliver as reference by frame
+ #size limit? or test that separately? for compliance,
+ #allowing either seems best for now...
+ data = msg.get_complete()
+ else:
+ data = msg.body
+ self.assertEquals("abcdefghijkl", data)
+