summaryrefslogtreecommitdiff
path: root/python/tests_0-9/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests_0-9/message.py')
-rw-r--r--python/tests_0-9/message.py651
1 files changed, 651 insertions, 0 deletions
diff --git a/python/tests_0-9/message.py b/python/tests_0-9/message.py
new file mode 100644
index 0000000000..8da9978792
--- /dev/null
+++ b/python/tests_0-9/message.py
@@ -0,0 +1,651 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+from qpid.reference import Reference, ReferenceId
+
+class MessageTests(TestBase):
+ """Tests for 'methods' on the amqp message 'class'"""
+
+ def test_consume_no_local(self):
+ """
+ Test that the no_local flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare two queues:
+ channel.queue_declare(queue="test-queue-1a", exclusive=True)
+ channel.queue_declare(queue="test-queue-1b", exclusive=True)
+ #establish two consumers one of which excludes delivery of locally sent messages
+ channel.message_consume(destination="local_included", queue="test-queue-1a")
+ channel.message_consume(destination="local_excluded", queue="test-queue-1b", no_local=True)
+
+ #send a message
+ channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local")
+ channel.message_transfer(routing_key="test-queue-1b", body="consume_no_local")
+
+ #check the queues of the two consumers
+ excluded = self.client.queue("local_excluded")
+ included = self.client.queue("local_included")
+ msg = included.get(timeout=1)
+ self.assertEqual("consume_no_local", msg.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though no_local=true")
+ except Empty: None
+
+
+ def test_consume_exclusive(self):
+ """
+ Test that the exclusive flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-2", exclusive=True)
+
+ #check that an exclusive consumer prevents other consumer being created:
+ channel.message_consume(destination="first", queue="test-queue-2", exclusive=True)
+ try:
+ channel.message_consume(destination="second", queue="test-queue-2")
+ self.fail("Expected consume request to fail due to previous exclusive consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ #open new channel and cleanup last consumer:
+ channel = self.client.channel(2)
+ channel.channel_open()
+
+ #check that an exclusive consumer cannot be created if a consumer already exists:
+ channel.message_consume(destination="first", queue="test-queue-2")
+ try:
+ channel.message_consume(destination="second", queue="test-queue-2", exclusive=True)
+ self.fail("Expected exclusive consume request to fail due to previous consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ def test_consume_queue_errors(self):
+ """
+ Test error conditions associated with the queue field of the consume method:
+ """
+ channel = self.channel
+ try:
+ #queue specified but doesn't exist:
+ channel.message_consume(queue="invalid-queue")
+ self.fail("Expected failure when consuming from non-existent queue")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ channel = self.client.channel(2)
+ channel.channel_open()
+ try:
+ #queue not specified and none previously declared for channel:
+ channel.message_consume(queue="")
+ self.fail("Expected failure when consuming from unspecified queue")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ def test_consume_unique_consumers(self):
+ """
+ Ensure unique consumer tags are enforced
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-3", exclusive=True)
+
+ #check that attempts to use duplicate tags are detected and prevented:
+ channel.message_consume(destination="first", queue="test-queue-3")
+ try:
+ channel.message_consume(destination="first", queue="test-queue-3")
+ self.fail("Expected consume request to fail due to non-unique tag")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ def test_cancel(self):
+ """
+ Test compliance of the basic.cancel method
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-4", exclusive=True)
+ channel.message_consume(destination="my-consumer", queue="test-queue-4")
+ channel.message_transfer(routing_key="test-queue-4", body="One")
+
+ #cancel should stop messages being delivered
+ channel.message_cancel(destination="my-consumer")
+ channel.message_transfer(routing_key="test-queue-4", body="Two")
+ myqueue = self.client.queue("my-consumer")
+ msg = myqueue.get(timeout=1)
+ self.assertEqual("One", msg.body)
+ try:
+ msg = myqueue.get(timeout=1)
+ self.fail("Got message after cancellation: " + msg)
+ except Empty: None
+
+ #cancellation of non-existant consumers should be handled without error
+ channel.message_cancel(destination="my-consumer")
+ channel.message_cancel(destination="this-never-existed")
+
+
+ def test_ack(self):
+ """
+ Test basic ack/recover behaviour
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-ack-queue", exclusive=True)
+
+ channel.message_consume(queue="test-ack-queue", destination="consumer_tag", no_ack=False)
+ queue = self.client.queue("consumer_tag")
+
+ channel.message_transfer(routing_key="test-ack-queue", body="One")
+ channel.message_transfer(routing_key="test-ack-queue", body="Two")
+ channel.message_transfer(routing_key="test-ack-queue", body="Three")
+ channel.message_transfer(routing_key="test-ack-queue", body="Four")
+ channel.message_transfer(routing_key="test-ack-queue", body="Five")
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.body)
+ self.assertEqual("Two", msg2.body)
+ self.assertEqual("Three", msg3.body)
+ self.assertEqual("Four", msg4.body)
+ self.assertEqual("Five", msg5.body)
+
+ msg1.ok(batchoffset=1)#One and Two
+ msg4.ok()
+
+ channel.message_recover(requeue=False)
+
+ msg3b = queue.get(timeout=1)
+ msg5b = queue.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.body)
+ self.assertEqual("Five", msg5b.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ def test_recover_requeue(self):
+ """
+ Test requeing on recovery
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-requeue", exclusive=True)
+
+ channel.message_consume(queue="test-requeue", destination="consumer_tag", no_ack=False)
+ queue = self.client.queue("consumer_tag")
+
+ channel.message_transfer(routing_key="test-requeue", body="One")
+ channel.message_transfer(routing_key="test-requeue", body="Two")
+ channel.message_transfer(routing_key="test-requeue", body="Three")
+ channel.message_transfer(routing_key="test-requeue", body="Four")
+ channel.message_transfer(routing_key="test-requeue", body="Five")
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.body)
+ self.assertEqual("Two", msg2.body)
+ self.assertEqual("Three", msg3.body)
+ self.assertEqual("Four", msg4.body)
+ self.assertEqual("Five", msg5.body)
+
+ msg1.ok(batchoffset=1) #One and Two
+ msg4.ok() #Four
+
+ channel.message_cancel(destination="consumer_tag")
+ channel.message_consume(queue="test-requeue", destination="consumer_tag")
+ queue2 = self.client.queue("consumer_tag")
+
+ channel.message_recover(requeue=True)
+
+ msg3b = queue2.get(timeout=1)
+ msg5b = queue2.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.body)
+ self.assertEqual("Five", msg5b.body)
+
+ self.assertEqual(True, msg3b.redelivered)
+ self.assertEqual(True, msg5b.redelivered)
+
+ try:
+ extra = queue2.get(timeout=1)
+ self.fail("Got unexpected message in second queue: " + extra.body)
+ except Empty: None
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in original queue: " + extra.body)
+ except Empty: None
+
+
+ def test_qos_prefetch_count(self):
+ """
+ Test that the prefetch count specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-count", exclusive=True)
+ subscription = channel.message_consume(queue="test-prefetch-count", destination="consumer_tag", no_ack=False)
+ queue = self.client.queue("consumer_tag")
+
+ #set prefetch to 5:
+ channel.message_qos(prefetch_count=5)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i)
+
+ #only 5 messages should have been delivered:
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ #todo: once batching is implmented, send a single response for all messages
+ msg.ok(batchoffset=-4)#1-5
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ msg.ok(batchoffset=-4)#6-10
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.body)
+ except Empty: None
+
+
+
+ def test_qos_prefetch_size(self):
+ """
+ Test that the prefetch size specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-size", exclusive=True)
+ subscription = channel.message_consume(queue="test-prefetch-size", destination="consumer_tag", no_ack=False)
+ queue = self.client.queue("consumer_tag")
+
+ #set prefetch to 50 bytes (each message is 9 or 10 bytes):
+ channel.message_qos(prefetch_size=50)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i)
+
+ #only 5 messages should have been delivered (i.e. 45 bytes worth):
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ msg.ok(batchoffset=-4)#1-5
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ msg.ok(batchoffset=-4)#6-10
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.body)
+ except Empty: None
+
+ #make sure that a single oversized message still gets delivered
+ large = "abcdefghijklmnopqrstuvwxyz"
+ large = large + "-" + large;
+ channel.message_transfer(routing_key="test-prefetch-size", body=large)
+ msg = queue.get(timeout=1)
+ self.assertEqual(large, msg.body)
+
+ def test_get(self):
+ """
+ Test message_get method
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-get", exclusive=True)
+
+ #publish some messages (no_ack=True)
+ for i in range(1, 11):
+ channel.message_transfer(routing_key="test-get", body="Message %d" % i)
+
+ #use message_get to read back the messages, and check that we get an empty at the end
+ for i in range(1, 11):
+ tag = "queue %d" % i
+ reply = channel.message_get(no_ack=True, queue="test-get", destination=tag)
+ self.assertEqual(reply.method.klass.name, "message")
+ self.assertEqual(reply.method.name, "ok")
+ self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
+
+ reply = channel.message_get(no_ack=True, queue="test-get")
+ self.assertEqual(reply.method.klass.name, "message")
+ self.assertEqual(reply.method.name, "empty")
+
+ #repeat for no_ack=False
+ for i in range(11, 21):
+ channel.message_transfer(routing_key="test-get", body="Message %d" % i)
+
+ for i in range(11, 21):
+ tag = "queue %d" % i
+ reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
+ self.assertEqual(reply.method.klass.name, "message")
+ self.assertEqual(reply.method.name, "ok")
+ msg = self.client.queue(tag).get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ if (i==13):
+ msg.ok(batchoffset=-2)#11, 12 & 13
+ if(i in [15, 17, 19]):
+ msg.ok()
+
+ reply = channel.message_get(no_ack=True, queue="test-get")
+ self.assertEqual(reply.method.klass.name, "message")
+ self.assertEqual(reply.method.name, "empty")
+
+ #recover(requeue=True)
+ channel.message_recover(requeue=True)
+
+ #get the unacked messages again (14, 16, 18, 20)
+ for i in [14, 16, 18, 20]:
+ tag = "queue %d" % i
+ reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
+ self.assertEqual(reply.method.klass.name, "message")
+ self.assertEqual(reply.method.name, "ok")
+ msg = self.client.queue(tag).get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+ msg.ok()
+ #channel.message_ack(delivery_tag=reply.delivery_tag)
+
+ reply = channel.message_get(no_ack=True, queue="test-get")
+ self.assertEqual(reply.method.klass.name, "message")
+ self.assertEqual(reply.method.name, "empty")
+
+ channel.message_recover(requeue=True)
+
+ reply = channel.message_get(no_ack=True, queue="test-get")
+ self.assertEqual(reply.method.klass.name, "message")
+ self.assertEqual(reply.method.name, "empty")
+
+ def test_reference_simple(self):
+ """
+ Test basic ability to handle references
+ """
+ channel = self.channel
+ channel.queue_declare(queue="ref_queue", exclusive=True)
+ channel.message_consume(queue="ref_queue", destination="c1")
+ queue = self.client.queue("c1")
+
+ refId = "myref"
+ channel.message_open(reference=refId)
+ channel.message_append(reference=refId, bytes="abcd")
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
+ channel.synchronous = True
+
+ channel.message_append(reference=refId, bytes="efgh")
+ channel.message_append(reference=refId, bytes="ijkl")
+ channel.message_close(reference=refId)
+
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
+ self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl")
+
+
+ def test_reference_large(self):
+ """
+ Test basic ability to handle references whose content exceeds max frame size
+ """
+ channel = self.channel
+ self.queue_declare(queue="ref_queue")
+
+ #generate a big data string (> max frame size of consumer):
+ data = "0123456789"
+ for i in range(0, 10):
+ data += data
+ #send it inline
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", body=data)
+ channel.synchronous = True
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
+ #create a new connection for consumer, with specific max frame size (< data)
+ other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0})
+ ch2 = other.channel(1)
+ ch2.channel_open()
+ ch2.message_consume(queue="ref_queue", destination="c1")
+ queue = other.queue("c1")
+
+ msg = queue.get(timeout=1)
+ self.assertTrue(isinstance(msg.body, ReferenceId))
+ self.assertTrue(msg.reference)
+ self.assertEquals(data, msg.reference.get_complete())
+
+ def test_reference_completion(self):
+ """
+ Test that reference transfer are not deemed complete until
+ closed (therefore are not acked or routed until that point)
+ """
+ channel = self.channel
+ channel.queue_declare(queue="ref_queue", exclusive=True)
+ channel.message_consume(queue="ref_queue", destination="c1")
+ queue = self.client.queue("c1")
+
+ refId = "myref"
+ channel.message_open(reference=refId)
+ channel.message_append(reference=refId, bytes="abcd")
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
+ channel.synchronous = True
+
+ try:
+ msg = queue.get(timeout=1)
+ self.fail("Got unexpected message on queue: " + msg)
+ except Empty: None
+
+ self.assertTrue(not ack.is_complete())
+
+ channel.message_close(reference=refId)
+
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
+ self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
+
+ def test_reference_multi_transfer(self):
+ """
+ Test that multiple transfer requests for the same reference are
+ correctly handled.
+ """
+ channel = self.channel
+ #declare and consume from two queues
+ channel.queue_declare(queue="q-one", exclusive=True)
+ channel.queue_declare(queue="q-two", exclusive=True)
+ channel.message_consume(queue="q-one", destination="q-one")
+ channel.message_consume(queue="q-two", destination="q-two")
+ queue1 = self.client.queue("q-one")
+ queue2 = self.client.queue("q-two")
+
+ #transfer a single ref to both queues (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="my data")
+ ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ #check that both queues have the message
+ self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
+ self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
+ self.assertEmpty(queue1)
+ self.assertEmpty(queue2)
+
+ #transfer a single ref to the same queue twice (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="second message")
+ ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ msg1 = queue1.get(timeout=1)
+ msg2 = queue1.get(timeout=1)
+ #order is undefined
+ if msg1.message_id == "abc":
+ self.assertEquals(msg2.message_id, "xyz")
+ else:
+ self.assertEquals(msg1.message_id, "xyz")
+ self.assertEquals(msg2.message_id, "abc")
+
+ #would be legal for the incoming messages to be transfered
+ #inline or by reference in any combination
+
+ if isinstance(msg1.body, ReferenceId):
+ self.assertEquals("second message", msg1.reference.get_complete())
+ if isinstance(msg2.body, ReferenceId):
+ if msg1.body != msg2.body:
+ self.assertEquals("second message", msg2.reference.get_complete())
+ #else ok, as same ref as msg1
+ else:
+ self.assertEquals("second message", msg1.body)
+ if isinstance(msg2.body, ReferenceId):
+ self.assertEquals("second message", msg2.reference.get_complete())
+ else:
+ self.assertEquals("second message", msg2.body)
+
+ self.assertEmpty(queue1)
+
+ def test_reference_unopened_on_append_error(self):
+ channel = self.channel
+ try:
+ channel.message_append(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_close_error(self):
+ channel = self.channel
+ try:
+ channel.message_close(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_transfer_error(self):
+ channel = self.channel
+ try:
+ channel.message_transfer(body=ReferenceId("unopened"))
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_already_opened_error(self):
+ channel = self.channel
+ channel.message_open(reference="a")
+ try:
+ channel.message_open(reference="a")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_empty_reference(self):
+ channel = self.channel
+ channel.queue_declare(queue="ref_queue", exclusive=True)
+ channel.message_consume(queue="ref_queue", destination="c1")
+ queue = self.client.queue("c1")
+
+ refId = "myref"
+ channel.message_open(reference=refId)
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
+ channel.synchronous = True
+ channel.message_close(reference=refId)
+
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
+ msg = queue.get(timeout=1)
+ self.assertEquals(msg.message_id, "empty-msg")
+ self.assertDataEquals(channel, msg, "")
+
+ def test_reject(self):
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+
+ channel.message_consume(queue = "q", destination = "consumer")
+ channel.message_transfer(routing_key = "q", body="blah, blah")
+ msg = self.client.queue("consumer").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+ channel.message_cancel(destination = "consumer")
+ msg.reject()
+
+ channel.message_consume(queue = "q", destination = "checker")
+ msg = self.client.queue("checker").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+
+ def test_checkpoint(self):
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+
+ channel.message_open(reference="my-ref")
+ channel.message_append(reference="my-ref", bytes="abcdefgh")
+ channel.message_append(reference="my-ref", bytes="ijklmnop")
+ channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint")
+ channel.channel_close()
+
+ channel = self.client.channel(2)
+ channel.channel_open()
+ channel.message_consume(queue = "q", destination = "consumer")
+ offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value
+ self.assertTrue(offset<=16)
+ channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
+ channel.synchronous = False
+ channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
+ self.assertEmpty(self.client.queue("consumer"))
+
+
+ def assertDataEquals(self, channel, msg, expected):
+ if isinstance(msg.body, ReferenceId):
+ data = msg.reference.get_complete()
+ else:
+ data = msg.body
+ self.assertEquals(expected, data)