summaryrefslogtreecommitdiff
path: root/python/tests/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests/message.py')
-rw-r--r--python/tests/message.py214
1 files changed, 175 insertions, 39 deletions
diff --git a/python/tests/message.py b/python/tests/message.py
index 916a9825bd..d5f5d4dbc2 100644
--- a/python/tests/message.py
+++ b/python/tests/message.py
@@ -171,8 +171,7 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok()
- msg2.ok()
+ msg1.ok(batchoffset=1)#One and Two
msg4.ok()
channel.message_recover(requeue=False)
@@ -216,9 +215,8 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok() #One
- msg2.ok() #Two
- msg4.ok() #Two
+ msg1.ok(batchoffset=1) #One and Two
+ msg4.ok() #Four
channel.message_cancel(destination="consumer_tag")
channel.message_consume(queue="test-requeue", destination="consumer_tag")
@@ -263,11 +261,9 @@ class MessageTests(TestBase):
channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i)
#only 5 messages should have been delivered:
- msgs = []
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.append(msg)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 6th message in original queue: " + extra.body)
@@ -275,18 +271,13 @@ class MessageTests(TestBase):
#ack messages and check that the next set arrive ok:
#todo: once batching is implmented, send a single response for all messages
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#1-5
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.append(msg)
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#6-10
try:
extra = queue.get(timeout=1)
@@ -313,12 +304,9 @@ class MessageTests(TestBase):
channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i)
#only 5 messages should have been delivered (i.e. 45 bytes worth):
- msgs = []
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- print "Got Message %d" % i
- msgs.append(msg)
try:
extra = queue.get(timeout=1)
@@ -326,18 +314,13 @@ class MessageTests(TestBase):
except Empty: None
#ack messages and check that the next set arrive ok:
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#1-5
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.append(msg)
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#6-10
try:
extra = queue.get(timeout=1)
@@ -383,14 +366,13 @@ class MessageTests(TestBase):
reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
- reply.ok()
-
- #todo: when batching is available, test ack multiple
- #if(i == 13):
- # channel.message_ack(delivery_tag=reply.delivery_tag, multiple=True)
- #if(i in [15, 17, 19]):
- # channel.message_ack(delivery_tag=reply.delivery_tag)
+ msg = self.client.queue(tag).get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ if (i==13):
+ msg.ok(batchoffset=-2)#11, 12 & 13
+ if(i in [15, 17, 19]):
+ msg.ok()
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
@@ -479,8 +461,9 @@ class MessageTests(TestBase):
queue = other.queue("c1")
msg = queue.get(timeout=1)
- self.assertTrue(isinstance(msg, Reference))
- self.assertEquals(data, msg.get_complete())
+ self.assertTrue(isinstance(msg.body, ReferenceId))
+ self.assertTrue(msg.reference)
+ self.assertEquals(data, msg.reference.get_complete())
def test_reference_completion(self):
"""
@@ -511,12 +494,165 @@ class MessageTests(TestBase):
#first, wait for the ok for the transfer
ack.get_response(timeout=1)
+ self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
+
+ def test_reference_multi_transfer(self):
+ """
+ Test that multiple transfer requests for the same reference are
+ correctly handled.
+ """
+ channel = self.channel
+ #declare and consume from two queues
+ channel.queue_declare(queue="q-one", exclusive=True)
+ channel.queue_declare(queue="q-two", exclusive=True)
+ channel.message_consume(queue="q-one", destination="q-one")
+ channel.message_consume(queue="q-two", destination="q-two")
+ queue1 = self.client.queue("q-one")
+ queue2 = self.client.queue("q-two")
+
+ #transfer a single ref to both queues (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="my data")
+ ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ #check that both queues have the message
+ self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
+ self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
+ self.assertEmpty(queue1)
+ self.assertEmpty(queue2)
+
+ #transfer a single ref to the same queue twice (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="second message")
+ ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ msg1 = queue1.get(timeout=1)
+ msg2 = queue1.get(timeout=1)
+ #order is undefined
+ if msg1.message_id == "abc":
+ self.assertEquals(msg2.message_id, "xyz")
+ else:
+ self.assertEquals(msg1.message_id, "xyz")
+ self.assertEquals(msg2.message_id, "abc")
+
+ #would be legal for the incoming messages to be transfered
+ #inline or by reference in any combination
+
+ if isinstance(msg1.body, ReferenceId):
+ self.assertEquals("second message", msg1.reference.get_complete())
+ if isinstance(msg2.body, ReferenceId):
+ if msg1.body != msg2.body:
+ self.assertEquals("second message", msg2.reference.get_complete())
+ #else ok, as same ref as msg1
+ else:
+ self.assertEquals("second message", msg1.body)
+ if isinstance(msg2.body, ReferenceId):
+ self.assertEquals("second message", msg2.reference.get_complete())
+ else:
+ self.assertEquals("second message", msg2.body)
+
+ self.assertEmpty(queue1)
+
+ def test_reference_unopened_on_append_error(self):
+ channel = self.channel
+ try:
+ channel.message_append(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_close_error(self):
+ channel = self.channel
+ try:
+ channel.message_close(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_transfer_error(self):
+ channel = self.channel
+ try:
+ channel.message_transfer(body=ReferenceId("unopened"))
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_already_opened_error(self):
+ channel = self.channel
+ channel.message_open(reference="a")
+ try:
+ channel.message_open(reference="a")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_empty_reference(self):
+ channel = self.channel
+ channel.queue_declare(queue="ref_queue", exclusive=True)
+ channel.message_consume(queue="ref_queue", destination="c1")
+ queue = self.client.queue("c1")
+
+ refId = "myref"
+ channel.message_open(reference=refId)
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
+ channel.synchronous = True
+ channel.message_close(reference=refId)
+
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
msg = queue.get(timeout=1)
- if isinstance(msg, Reference):
- #should we force broker to deliver as reference by frame
- #size limit? or test that separately? for compliance,
- #allowing either seems best for now...
- data = msg.get_complete()
+ self.assertEquals(msg.message_id, "empty-msg")
+ self.assertDataEquals(channel, msg, "")
+
+ def test_reject(self):
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+
+ channel.message_consume(queue = "q", destination = "consumer")
+ channel.message_transfer(routing_key = "q", body="blah, blah")
+ msg = self.client.queue("consumer").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+ channel.message_cancel(destination = "consumer")
+ msg.reject()
+
+ channel.message_consume(queue = "q", destination = "checker")
+ msg = self.client.queue("checker").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+
+ def test_checkpoint(self):
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+
+ channel.message_open(reference="my-ref")
+ channel.message_append(reference="my-ref", bytes="abcdefgh")
+ channel.message_append(reference="my-ref", bytes="ijklmnop")
+ channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint")
+ channel.channel_close()
+
+ channel = self.client.channel(2)
+ channel.channel_open()
+ channel.message_consume(queue = "q", destination = "consumer")
+ offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value
+ self.assertEquals(offset, 16)
+ channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
+ channel.synchronous = False
+ channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
+ self.assertEmpty(self.client.queue("consumer"))
+
+
+ def assertDataEquals(self, channel, msg, expected):
+ if isinstance(msg.body, ReferenceId):
+ data = msg.reference.get_complete()
else:
data = msg.body
self.assertEquals("abcdefghijkl", data)