diff options
Diffstat (limited to 'python/tests_0-9/message.py')
-rw-r--r-- | python/tests_0-9/message.py | 651 |
1 files changed, 651 insertions, 0 deletions
diff --git a/python/tests_0-9/message.py b/python/tests_0-9/message.py new file mode 100644 index 0000000000..8da9978792 --- /dev/null +++ b/python/tests_0-9/message.py @@ -0,0 +1,651 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase +from qpid.reference import Reference, ReferenceId + +class MessageTests(TestBase): + """Tests for 'methods' on the amqp message 'class'""" + + def test_consume_no_local(self): + """ + Test that the no_local flag is honoured in the consume method + """ + channel = self.channel + #setup, declare two queues: + channel.queue_declare(queue="test-queue-1a", exclusive=True) + channel.queue_declare(queue="test-queue-1b", exclusive=True) + #establish two consumers one of which excludes delivery of locally sent messages + channel.message_consume(destination="local_included", queue="test-queue-1a") + channel.message_consume(destination="local_excluded", queue="test-queue-1b", no_local=True) + + #send a message + channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local") + channel.message_transfer(routing_key="test-queue-1b", body="consume_no_local") + + #check the queues of the two consumers + excluded = self.client.queue("local_excluded") + included = self.client.queue("local_included") + msg = included.get(timeout=1) + self.assertEqual("consume_no_local", msg.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + + + def test_consume_exclusive(self): + """ + Test that the exclusive flag is honoured in the consume method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-2", exclusive=True) + + #check that an exclusive consumer prevents other consumer being created: + channel.message_consume(destination="first", queue="test-queue-2", exclusive=True) + try: + channel.message_consume(destination="second", queue="test-queue-2") + self.fail("Expected consume request to fail due to previous exclusive consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + #open new channel and cleanup last consumer: + channel = self.client.channel(2) + channel.channel_open() + + #check that an exclusive consumer cannot be created if a consumer already exists: + channel.message_consume(destination="first", queue="test-queue-2") + try: + channel.message_consume(destination="second", queue="test-queue-2", exclusive=True) + self.fail("Expected exclusive consume request to fail due to previous consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + def test_consume_queue_errors(self): + """ + Test error conditions associated with the queue field of the consume method: + """ + channel = self.channel + try: + #queue specified but doesn't exist: + channel.message_consume(queue="invalid-queue") + self.fail("Expected failure when consuming from non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(2) + channel.channel_open() + try: + #queue not specified and none previously declared for channel: + channel.message_consume(queue="") + self.fail("Expected failure when consuming from unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_consume_unique_consumers(self): + """ + Ensure unique consumer tags are enforced + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-3", exclusive=True) + + #check that attempts to use duplicate tags are detected and prevented: + channel.message_consume(destination="first", queue="test-queue-3") + try: + channel.message_consume(destination="first", queue="test-queue-3") + self.fail("Expected consume request to fail due to non-unique tag") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_cancel(self): + """ + Test compliance of the basic.cancel method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-4", exclusive=True) + channel.message_consume(destination="my-consumer", queue="test-queue-4") + channel.message_transfer(routing_key="test-queue-4", body="One") + + #cancel should stop messages being delivered + channel.message_cancel(destination="my-consumer") + channel.message_transfer(routing_key="test-queue-4", body="Two") + myqueue = self.client.queue("my-consumer") + msg = myqueue.get(timeout=1) + self.assertEqual("One", msg.body) + try: + msg = myqueue.get(timeout=1) + self.fail("Got message after cancellation: " + msg) + except Empty: None + + #cancellation of non-existant consumers should be handled without error + channel.message_cancel(destination="my-consumer") + channel.message_cancel(destination="this-never-existed") + + + def test_ack(self): + """ + Test basic ack/recover behaviour + """ + channel = self.channel + channel.queue_declare(queue="test-ack-queue", exclusive=True) + + channel.message_consume(queue="test-ack-queue", destination="consumer_tag", no_ack=False) + queue = self.client.queue("consumer_tag") + + channel.message_transfer(routing_key="test-ack-queue", body="One") + channel.message_transfer(routing_key="test-ack-queue", body="Two") + channel.message_transfer(routing_key="test-ack-queue", body="Three") + channel.message_transfer(routing_key="test-ack-queue", body="Four") + channel.message_transfer(routing_key="test-ack-queue", body="Five") + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.body) + self.assertEqual("Two", msg2.body) + self.assertEqual("Three", msg3.body) + self.assertEqual("Four", msg4.body) + self.assertEqual("Five", msg5.body) + + msg1.ok(batchoffset=1)#One and Two + msg4.ok() + + channel.message_recover(requeue=False) + + msg3b = queue.get(timeout=1) + msg5b = queue.get(timeout=1) + + self.assertEqual("Three", msg3b.body) + self.assertEqual("Five", msg5b.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + def test_recover_requeue(self): + """ + Test requeing on recovery + """ + channel = self.channel + channel.queue_declare(queue="test-requeue", exclusive=True) + + channel.message_consume(queue="test-requeue", destination="consumer_tag", no_ack=False) + queue = self.client.queue("consumer_tag") + + channel.message_transfer(routing_key="test-requeue", body="One") + channel.message_transfer(routing_key="test-requeue", body="Two") + channel.message_transfer(routing_key="test-requeue", body="Three") + channel.message_transfer(routing_key="test-requeue", body="Four") + channel.message_transfer(routing_key="test-requeue", body="Five") + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.body) + self.assertEqual("Two", msg2.body) + self.assertEqual("Three", msg3.body) + self.assertEqual("Four", msg4.body) + self.assertEqual("Five", msg5.body) + + msg1.ok(batchoffset=1) #One and Two + msg4.ok() #Four + + channel.message_cancel(destination="consumer_tag") + channel.message_consume(queue="test-requeue", destination="consumer_tag") + queue2 = self.client.queue("consumer_tag") + + channel.message_recover(requeue=True) + + msg3b = queue2.get(timeout=1) + msg5b = queue2.get(timeout=1) + + self.assertEqual("Three", msg3b.body) + self.assertEqual("Five", msg5b.body) + + self.assertEqual(True, msg3b.redelivered) + self.assertEqual(True, msg5b.redelivered) + + try: + extra = queue2.get(timeout=1) + self.fail("Got unexpected message in second queue: " + extra.body) + except Empty: None + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in original queue: " + extra.body) + except Empty: None + + + def test_qos_prefetch_count(self): + """ + Test that the prefetch count specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-count", exclusive=True) + subscription = channel.message_consume(queue="test-prefetch-count", destination="consumer_tag", no_ack=False) + queue = self.client.queue("consumer_tag") + + #set prefetch to 5: + channel.message_qos(prefetch_count=5) + + #publish 10 messages: + for i in range(1, 11): + channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i) + + #only 5 messages should have been delivered: + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + #todo: once batching is implmented, send a single response for all messages + msg.ok(batchoffset=-4)#1-5 + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + + msg.ok(batchoffset=-4)#6-10 + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.body) + except Empty: None + + + + def test_qos_prefetch_size(self): + """ + Test that the prefetch size specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-size", exclusive=True) + subscription = channel.message_consume(queue="test-prefetch-size", destination="consumer_tag", no_ack=False) + queue = self.client.queue("consumer_tag") + + #set prefetch to 50 bytes (each message is 9 or 10 bytes): + channel.message_qos(prefetch_size=50) + + #publish 10 messages: + for i in range(1, 11): + channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i) + + #only 5 messages should have been delivered (i.e. 45 bytes worth): + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + msg.ok(batchoffset=-4)#1-5 + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + + msg.ok(batchoffset=-4)#6-10 + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.body) + except Empty: None + + #make sure that a single oversized message still gets delivered + large = "abcdefghijklmnopqrstuvwxyz" + large = large + "-" + large; + channel.message_transfer(routing_key="test-prefetch-size", body=large) + msg = queue.get(timeout=1) + self.assertEqual(large, msg.body) + + def test_get(self): + """ + Test message_get method + """ + channel = self.channel + channel.queue_declare(queue="test-get", exclusive=True) + + #publish some messages (no_ack=True) + for i in range(1, 11): + channel.message_transfer(routing_key="test-get", body="Message %d" % i) + + #use message_get to read back the messages, and check that we get an empty at the end + for i in range(1, 11): + tag = "queue %d" % i + reply = channel.message_get(no_ack=True, queue="test-get", destination=tag) + self.assertEqual(reply.method.klass.name, "message") + self.assertEqual(reply.method.name, "ok") + self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body) + + reply = channel.message_get(no_ack=True, queue="test-get") + self.assertEqual(reply.method.klass.name, "message") + self.assertEqual(reply.method.name, "empty") + + #repeat for no_ack=False + for i in range(11, 21): + channel.message_transfer(routing_key="test-get", body="Message %d" % i) + + for i in range(11, 21): + tag = "queue %d" % i + reply = channel.message_get(no_ack=False, queue="test-get", destination=tag) + self.assertEqual(reply.method.klass.name, "message") + self.assertEqual(reply.method.name, "ok") + msg = self.client.queue(tag).get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + + if (i==13): + msg.ok(batchoffset=-2)#11, 12 & 13 + if(i in [15, 17, 19]): + msg.ok() + + reply = channel.message_get(no_ack=True, queue="test-get") + self.assertEqual(reply.method.klass.name, "message") + self.assertEqual(reply.method.name, "empty") + + #recover(requeue=True) + channel.message_recover(requeue=True) + + #get the unacked messages again (14, 16, 18, 20) + for i in [14, 16, 18, 20]: + tag = "queue %d" % i + reply = channel.message_get(no_ack=False, queue="test-get", destination=tag) + self.assertEqual(reply.method.klass.name, "message") + self.assertEqual(reply.method.name, "ok") + msg = self.client.queue(tag).get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + msg.ok() + #channel.message_ack(delivery_tag=reply.delivery_tag) + + reply = channel.message_get(no_ack=True, queue="test-get") + self.assertEqual(reply.method.klass.name, "message") + self.assertEqual(reply.method.name, "empty") + + channel.message_recover(requeue=True) + + reply = channel.message_get(no_ack=True, queue="test-get") + self.assertEqual(reply.method.klass.name, "message") + self.assertEqual(reply.method.name, "empty") + + def test_reference_simple(self): + """ + Test basic ability to handle references + """ + channel = self.channel + channel.queue_declare(queue="ref_queue", exclusive=True) + channel.message_consume(queue="ref_queue", destination="c1") + queue = self.client.queue("c1") + + refId = "myref" + channel.message_open(reference=refId) + channel.message_append(reference=refId, bytes="abcd") + channel.synchronous = False + ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) + channel.synchronous = True + + channel.message_append(reference=refId, bytes="efgh") + channel.message_append(reference=refId, bytes="ijkl") + channel.message_close(reference=refId) + + #first, wait for the ok for the transfer + ack.get_response(timeout=1) + + self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl") + + + def test_reference_large(self): + """ + Test basic ability to handle references whose content exceeds max frame size + """ + channel = self.channel + self.queue_declare(queue="ref_queue") + + #generate a big data string (> max frame size of consumer): + data = "0123456789" + for i in range(0, 10): + data += data + #send it inline + channel.synchronous = False + ack = channel.message_transfer(routing_key="ref_queue", body=data) + channel.synchronous = True + #first, wait for the ok for the transfer + ack.get_response(timeout=1) + + #create a new connection for consumer, with specific max frame size (< data) + other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0}) + ch2 = other.channel(1) + ch2.channel_open() + ch2.message_consume(queue="ref_queue", destination="c1") + queue = other.queue("c1") + + msg = queue.get(timeout=1) + self.assertTrue(isinstance(msg.body, ReferenceId)) + self.assertTrue(msg.reference) + self.assertEquals(data, msg.reference.get_complete()) + + def test_reference_completion(self): + """ + Test that reference transfer are not deemed complete until + closed (therefore are not acked or routed until that point) + """ + channel = self.channel + channel.queue_declare(queue="ref_queue", exclusive=True) + channel.message_consume(queue="ref_queue", destination="c1") + queue = self.client.queue("c1") + + refId = "myref" + channel.message_open(reference=refId) + channel.message_append(reference=refId, bytes="abcd") + channel.synchronous = False + ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) + channel.synchronous = True + + try: + msg = queue.get(timeout=1) + self.fail("Got unexpected message on queue: " + msg) + except Empty: None + + self.assertTrue(not ack.is_complete()) + + channel.message_close(reference=refId) + + #first, wait for the ok for the transfer + ack.get_response(timeout=1) + + self.assertDataEquals(channel, queue.get(timeout=1), "abcd") + + def test_reference_multi_transfer(self): + """ + Test that multiple transfer requests for the same reference are + correctly handled. + """ + channel = self.channel + #declare and consume from two queues + channel.queue_declare(queue="q-one", exclusive=True) + channel.queue_declare(queue="q-two", exclusive=True) + channel.message_consume(queue="q-one", destination="q-one") + channel.message_consume(queue="q-two", destination="q-two") + queue1 = self.client.queue("q-one") + queue2 = self.client.queue("q-two") + + #transfer a single ref to both queues (in separate commands) + channel.message_open(reference="my-ref") + channel.synchronous = False + ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref")) + channel.message_append(reference="my-ref", bytes="my data") + ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref")) + channel.synchronous = True + channel.message_close(reference="my-ref") + + #check that both queues have the message + self.assertDataEquals(channel, queue1.get(timeout=1), "my data") + self.assertDataEquals(channel, queue2.get(timeout=1), "my data") + self.assertEmpty(queue1) + self.assertEmpty(queue2) + + #transfer a single ref to the same queue twice (in separate commands) + channel.message_open(reference="my-ref") + channel.synchronous = False + ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref")) + channel.message_append(reference="my-ref", bytes="second message") + ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref")) + channel.synchronous = True + channel.message_close(reference="my-ref") + + msg1 = queue1.get(timeout=1) + msg2 = queue1.get(timeout=1) + #order is undefined + if msg1.message_id == "abc": + self.assertEquals(msg2.message_id, "xyz") + else: + self.assertEquals(msg1.message_id, "xyz") + self.assertEquals(msg2.message_id, "abc") + + #would be legal for the incoming messages to be transfered + #inline or by reference in any combination + + if isinstance(msg1.body, ReferenceId): + self.assertEquals("second message", msg1.reference.get_complete()) + if isinstance(msg2.body, ReferenceId): + if msg1.body != msg2.body: + self.assertEquals("second message", msg2.reference.get_complete()) + #else ok, as same ref as msg1 + else: + self.assertEquals("second message", msg1.body) + if isinstance(msg2.body, ReferenceId): + self.assertEquals("second message", msg2.reference.get_complete()) + else: + self.assertEquals("second message", msg2.body) + + self.assertEmpty(queue1) + + def test_reference_unopened_on_append_error(self): + channel = self.channel + try: + channel.message_append(reference="unopened") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_reference_unopened_on_close_error(self): + channel = self.channel + try: + channel.message_close(reference="unopened") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_reference_unopened_on_transfer_error(self): + channel = self.channel + try: + channel.message_transfer(body=ReferenceId("unopened")) + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_reference_already_opened_error(self): + channel = self.channel + channel.message_open(reference="a") + try: + channel.message_open(reference="a") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_empty_reference(self): + channel = self.channel + channel.queue_declare(queue="ref_queue", exclusive=True) + channel.message_consume(queue="ref_queue", destination="c1") + queue = self.client.queue("c1") + + refId = "myref" + channel.message_open(reference=refId) + channel.synchronous = False + ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId)) + channel.synchronous = True + channel.message_close(reference=refId) + + #first, wait for the ok for the transfer + ack.get_response(timeout=1) + + msg = queue.get(timeout=1) + self.assertEquals(msg.message_id, "empty-msg") + self.assertDataEquals(channel, msg, "") + + def test_reject(self): + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + + channel.message_consume(queue = "q", destination = "consumer") + channel.message_transfer(routing_key = "q", body="blah, blah") + msg = self.client.queue("consumer").get(timeout = 1) + self.assertEquals(msg.body, "blah, blah") + channel.message_cancel(destination = "consumer") + msg.reject() + + channel.message_consume(queue = "q", destination = "checker") + msg = self.client.queue("checker").get(timeout = 1) + self.assertEquals(msg.body, "blah, blah") + + def test_checkpoint(self): + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + + channel.message_open(reference="my-ref") + channel.message_append(reference="my-ref", bytes="abcdefgh") + channel.message_append(reference="my-ref", bytes="ijklmnop") + channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint") + channel.channel_close() + + channel = self.client.channel(2) + channel.channel_open() + channel.message_consume(queue = "q", destination = "consumer") + offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value + self.assertTrue(offset<=16) + channel.message_append(reference="my-ref", bytes="qrstuvwxyz") + channel.synchronous = False + channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref")) + channel.synchronous = True + channel.message_close(reference="my-ref") + + self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz") + self.assertEmpty(self.client.queue("consumer")) + + + def assertDataEquals(self, channel, msg, expected): + if isinstance(msg.body, ReferenceId): + data = msg.reference.get_complete() + else: + data = msg.body + self.assertEquals(expected, data) |