diff options
Diffstat (limited to 'qpid/tests/src/py/qpid_tests/broker_0_10/message.py')
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/message.py | 928 |
1 files changed, 928 insertions, 0 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py new file mode 100644 index 0000000000..b46c446833 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py @@ -0,0 +1,928 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.testlib import TestBase010 +from qpid.datatypes import Message, RangedSet +from qpid.session import SessionException + +from qpid.content import Content +from time import sleep + +class MessageTests(TestBase010): + """Tests for 'methods' on the amqp message 'class'""" + + def test_no_local(self): + """ + NOTE: this is a test of a QPID specific feature + + Test that the qpid specific no_local arg is honoured. + """ + session = self.session + #setup, declare two queues one of which excludes delivery of locally sent messages + session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True, arguments={'no-local':'true'}) + #establish two consumers + self.subscribe(destination="local_included", queue="test-queue-1a") + self.subscribe(destination="local_excluded", queue="test-queue-1b") + + #send a message + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) + + #send a message from another session on the same connection to each queue + session2 = self.conn.session("my-local-session") + session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well")) + session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either")) + + #send a message from a session on another connection to each queue + for q in ["test-queue-1a", "test-queue-1b"]: + session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key") + other = self.connect() + session3 = other.session("my-other-session") + session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local")) + other.close() + + #check the queues of the two consumers + excluded = session.incoming("local_excluded") + included = session.incoming("local_included") + for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]: + msg = included.get(timeout=1) + self.assertEqual(b, msg.body) + msg = excluded.get(timeout=1) + self.assertEqual("i-am-not-local", msg.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + + def test_no_local_awkward(self): + + """ + NOTE: this is a test of a QPID specific feature + + Check that messages which will be excluded through no-local + processing will not block subsequent deliveries + """ + + session = self.session + #setup: + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'no-local':'true'}) + #establish consumer which excludes delivery of locally sent messages + self.subscribe(destination="local_excluded", queue="test-queue") + + #send a 'local' message + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "local")) + + #send a non local message + other = self.connect() + session2 = other.session("my-session", 1) + session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"), "foreign")) + session2.close() + other.close() + + #check that the second message only is delivered + excluded = session.incoming("local_excluded") + msg = excluded.get(timeout=1) + self.assertEqual("foreign", msg.body) + try: + excluded.get(timeout=1) + self.fail("Received extra message") + except Empty: None + #check queue is empty + self.assertEqual(0, session.queue_query(queue="test-queue").message_count) + + def test_no_local_exclusive_subscribe(self): + """ + NOTE: this is a test of a QPID specific feature + + Test that the no_local processing works on queues not declared + as exclusive, but with an exclusive subscription + """ + session = self.session + + #setup, declare two queues one of which excludes delivery of + #locally sent messages but is not declared as exclusive + session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue-1b", auto_delete=True, arguments={'no-local':'true'}) + #establish two consumers + self.subscribe(destination="local_included", queue="test-queue-1a") + self.subscribe(destination="local_excluded", queue="test-queue-1b", exclusive=True) + + #send a message from the same session to each queue + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) + + #send a message from another session on the same connection to each queue + session2 = self.conn.session("my-session") + session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well")) + session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either")) + + #send a message from a session on another connection to each queue + for q in ["test-queue-1a", "test-queue-1b"]: + session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key") + other = self.connect() + session3 = other.session("my-other-session") + session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local")) + other.close() + + #check the queues of the two consumers + excluded = session.incoming("local_excluded") + included = session.incoming("local_included") + for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]: + msg = included.get(timeout=1) + self.assertEqual(b, msg.body) + msg = excluded.get(timeout=1) + self.assertEqual("i-am-not-local", msg.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + + + def test_consume_exclusive(self): + """ + Test an exclusive consumer prevents other consumer being created + """ + session = self.session + session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) + session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True) + try: + session.message_subscribe(destination="second", queue="test-queue-2") + self.fail("Expected consume request to fail due to previous exclusive consumer") + except SessionException, e: + self.assertEquals(405, e.args[0].error_code) + + def test_consume_exclusive2(self): + """ + Check that an exclusive consumer cannot be created if a consumer already exists: + """ + session = self.session + session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) + session.message_subscribe(destination="first", queue="test-queue-2") + try: + session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True) + self.fail("Expected exclusive consume request to fail due to previous consumer") + except SessionException, e: + self.assertEquals(405, e.args[0].error_code) + + def test_consume_queue_not_found(self): + """ + Test error conditions associated with the queue field of the consume method: + """ + session = self.session + try: + #queue specified but doesn't exist: + session.message_subscribe(queue="invalid-queue", destination="a") + self.fail("Expected failure when consuming from non-existent queue") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def test_consume_queue_not_specified(self): + session = self.session + try: + #queue not specified and none previously declared for channel: + session.message_subscribe(destination="a") + self.fail("Expected failure when consuming from unspecified queue") + except SessionException, e: + self.assertEquals(531, e.args[0].error_code) + + def test_consume_unique_consumers(self): + """ + Ensure unique consumer tags are enforced + """ + session = self.session + #setup, declare a queue: + session.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True) + + #check that attempts to use duplicate tags are detected and prevented: + session.message_subscribe(destination="first", queue="test-queue-3") + try: + session.message_subscribe(destination="first", queue="test-queue-3") + self.fail("Expected consume request to fail due to non-unique tag") + except SessionException, e: + self.assertEquals(530, e.args[0].error_code) + + def test_cancel(self): + """ + Test compliance of the basic.cancel method + """ + session = self.session + #setup, declare a queue: + session.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One")) + + session.message_subscribe(destination="my-consumer", queue="test-queue-4") + myqueue = session.incoming("my-consumer") + session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + + #should flush here + + #cancel should stop messages being delivered + session.message_cancel(destination="my-consumer") + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two")) + msg = myqueue.get(timeout=1) + self.assertEqual("One", msg.body) + try: + msg = myqueue.get(timeout=1) + self.fail("Got message after cancellation: " + msg) + except Empty: None + + #cancellation of non-existant consumers should be result in 404s + try: + session.message_cancel(destination="my-consumer") + self.fail("Expected 404 for recancellation of subscription.") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + session = self.conn.session("alternate-session", timeout=10) + try: + session.message_cancel(destination="this-never-existed") + self.fail("Expected 404 for cancellation of unknown subscription.") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + + def test_ack(self): + """ + Test basic ack/recover behaviour + """ + session = self.conn.session("alternate-session", timeout=10) + session.queue_declare(queue="test-ack-queue", auto_delete=True) + + session.message_subscribe(queue = "test-ack-queue", destination = "consumer") + session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + queue = session.incoming("consumer") + + delivery_properties = session.delivery_properties(routing_key="test-ack-queue") + for i in ["One", "Two", "Three", "Four", "Five"]: + session.message_transfer(message=Message(delivery_properties, i)) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.body) + self.assertEqual("Two", msg2.body) + self.assertEqual("Three", msg3.body) + self.assertEqual("Four", msg4.body) + self.assertEqual("Five", msg5.body) + + session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four + + #subscribe from second session here to ensure queue is not + #auto-deleted when alternate session closes (no need to ack on these): + self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1) + + #now close the session, and see that the unacked messages are + #then redelivered to another subscriber: + session.close(timeout=10) + + session = self.session + session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + queue = session.incoming("checker") + + msg3b = queue.get(timeout=1) + msg5b = queue.get(timeout=1) + + self.assertEqual("Three", msg3b.body) + self.assertEqual("Five", msg5b.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + def test_reject(self): + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout") + session.queue_declare(queue = "r", exclusive=True, auto_delete=True) + session.exchange_bind(queue = "r", exchange = "amq.fanout") + + session.message_subscribe(queue = "q", destination = "consumer") + session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah")) + msg = session.incoming("consumer").get(timeout = 1) + self.assertEquals(msg.body, "blah, blah") + session.message_reject(RangedSet(msg.id)) + + session.message_subscribe(queue = "r", destination = "checker") + session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + msg = session.incoming("checker").get(timeout = 1) + self.assertEquals(msg.body, "blah, blah") + + def test_credit_flow_messages(self): + """ + Test basic credit based flow control with unit = message + """ + #declare an exclusive queue + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 0, destination = "c") + #send batch of messages to queue + for i in range(1, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) + + #set message credit to finite amount (less than enough for all messages) + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") + #set infinite byte credit + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") + #check that expected number were received + q = session.incoming("c") + for i in range(1, 6): + self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + #increase credit again and check more are received + for i in range(6, 11): + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "c") + self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + def test_credit_flow_bytes(self): + """ + Test basic credit based flow control with unit = bytes + """ + #declare an exclusive queue + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 0, destination = "c") + #send batch of messages to queue + for i in range(10): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) + + #each message is currently interpreted as requiring msg_size bytes of credit + msg_size = 19 + + #set byte credit to finite amount (less than enough for all messages) + session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c") + #set infinite message credit + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c") + #check that expected number were received + q = session.incoming("c") + for i in range(5): + self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + #increase credit again and check more are received + for i in range(5): + session.message_flow(unit = session.credit_unit.byte, value = msg_size, destination = "c") + self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + + def test_window_flow_messages(self): + """ + Test basic window based flow control with unit = message + """ + #declare an exclusive queue + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 1, destination = "c") + #send batch of messages to queue + for i in range(1, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) + + #set message credit to finite amount (less than enough for all messages) + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") + #set infinite byte credit + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") + #check that expected number were received + q = session.incoming("c") + for i in range(1, 6): + msg = q.get(timeout = 1) + session.receiver._completed.add(msg.id)#TODO: this may be done automatically + self.assertDataEquals(session, msg, "Message %d" % i) + self.assertEmpty(q) + + #acknowledge messages and check more are received + #TODO: there may be a nicer way of doing this + session.channel.session_completed(session.receiver._completed) + + for i in range(6, 11): + self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + + def test_window_flow_bytes(self): + """ + Test basic window based flow control with unit = bytes + """ + #declare an exclusive queue + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 1, destination = "c") + #send batch of messages to queue + for i in range(10): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) + + #each message is currently interpreted as requiring msg_size bytes of credit + msg_size = 19 + + #set byte credit to finite amount (less than enough for all messages) + session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c") + #set infinite message credit + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c") + #check that expected number were received + q = session.incoming("c") + msgs = [] + for i in range(5): + msg = q.get(timeout = 1) + msgs.append(msg) + self.assertDataEquals(session, msg, "abcdefgh") + self.assertEmpty(q) + + #ack each message individually and check more are received + for i in range(5): + msg = msgs.pop() + #TODO: there may be a nicer way of doing this + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) + self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + def test_window_flush_ack_flow(self): + """ + Test basic window based flow control with unit = bytes + """ + #declare an exclusive queue + ssn = self.session + ssn.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer + ssn.message_subscribe(queue = "q", destination = "c", + accept_mode=ssn.accept_mode.explicit) + ssn.message_set_flow_mode(flow_mode = ssn.flow_mode.window, destination = "c") + + #send message A + ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "A")) + + for unit in ssn.credit_unit.VALUES: + ssn.message_flow("c", unit, 0xFFFFFFFFL) + + q = ssn.incoming("c") + msgA = q.get(timeout=10) + + ssn.message_flush(destination="c") + + # XXX + ssn.receiver._completed.add(msgA.id) + ssn.channel.session_completed(ssn.receiver._completed) + ssn.message_accept(RangedSet(msgA.id)) + + for unit in ssn.credit_unit.VALUES: + ssn.message_flow("c", unit, 0xFFFFFFFFL) + + #send message B + ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "B")) + + msgB = q.get(timeout=10) + + def test_subscribe_not_acquired(self): + """ + Test the not-acquired modes works as expected for a simple case + """ + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 6): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) + + session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "b") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") + + for i in range(6, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) + + #both subscribers should see all messages + qA = session.incoming("a") + qB = session.incoming("b") + for i in range(1, 11): + for q in [qA, qB]: + msg = q.get(timeout = 1) + self.assertEquals("Message %s" % i, msg.body) + #TODO: tidy up completion + session.receiver._completed.add(msg.id) + + #TODO: tidy up completion + session.channel.session_completed(session.receiver._completed) + #messages should still be on the queue: + self.assertEquals(10, session.queue_query(queue = "q").message_count) + + def test_acquire_with_no_accept_and_credit_flow(self): + """ + Test that messages recieved unacquired, with accept not + required in windowing mode can be acquired. + """ + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) + + session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, accept_mode = 1) + session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + msg = session.incoming("a").get(timeout = 1) + self.assertEquals("acquire me", msg.body) + #message should still be on the queue: + self.assertEquals(1, session.queue_query(queue = "q").message_count) + + transfers = RangedSet(msg.id) + response = session.message_acquire(transfers) + #check that we get notification (i.e. message_acquired) + self.assert_(msg.id in response.transfers) + #message should have been removed from the queue: + self.assertEquals(0, session.queue_query(queue = "q").message_count) + + def test_acquire(self): + """ + Test explicit acquire function + """ + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) + + session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + msg = session.incoming("a").get(timeout = 1) + self.assertEquals("acquire me", msg.body) + #message should still be on the queue: + self.assertEquals(1, session.queue_query(queue = "q").message_count) + + transfers = RangedSet(msg.id) + response = session.message_acquire(transfers) + #check that we get notification (i.e. message_acquired) + self.assert_(msg.id in response.transfers) + #message should have been removed from the queue: + self.assertEquals(0, session.queue_query(queue = "q").message_count) + session.message_accept(transfers) + + + def test_release(self): + """ + Test explicit release function + """ + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me")) + + session.message_subscribe(queue = "q", destination = "a") + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + msg = session.incoming("a").get(timeout = 1) + self.assertEquals("release me", msg.body) + session.message_cancel(destination = "a") + session.message_release(RangedSet(msg.id)) + + #message should not have been removed from the queue: + self.assertEquals(1, session.queue_query(queue = "q").message_count) + + def test_release_ordering(self): + """ + Test order of released messages is as expected + """ + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range (1, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i))) + + session.message_subscribe(queue = "q", destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + queue = session.incoming("a") + first = queue.get(timeout = 1) + for i in range(2, 10): + msg = queue.get(timeout = 1) + self.assertEquals("released message %s" % (i), msg.body) + + last = queue.get(timeout = 1) + self.assertEmpty(queue) + released = RangedSet() + released.add(first.id, last.id) + session.message_release(released) + + #TODO: may want to clean this up... + session.receiver._completed.add(first.id, last.id) + session.channel.session_completed(session.receiver._completed) + + for i in range(1, 11): + self.assertEquals("released message %s" % (i), queue.get(timeout = 1).body) + + def test_ranged_ack(self): + """ + Test acking of messages ranges + """ + session = self.conn.session("alternate-session", timeout=10) + + session.queue_declare(queue = "q", auto_delete=True) + delivery_properties = session.delivery_properties(routing_key="q") + for i in range (1, 11): + session.message_transfer(message=Message(delivery_properties, "message %s" % (i))) + + session.message_subscribe(queue = "q", destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + queue = session.incoming("a") + ids = [] + for i in range (1, 11): + msg = queue.get(timeout = 1) + self.assertEquals("message %s" % (i), msg.body) + ids.append(msg.id) + + self.assertEmpty(queue) + + #ack all but the fourth message (command id 2) + accepted = RangedSet() + accepted.add(ids[0], ids[2]) + accepted.add(ids[4], ids[9]) + session.message_accept(accepted) + + #subscribe from second session here to ensure queue is not + #auto-deleted when alternate session closes (no need to ack on these): + self.session.message_subscribe(queue = "q", destination = "checker") + + #now close the session, and see that the unacked messages are + #then redelivered to another subscriber: + session.close(timeout=10) + + session = self.session + session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + queue = session.incoming("checker") + + self.assertEquals("message 4", queue.get(timeout = 1).body) + self.assertEmpty(queue) + + def test_subscribe_not_acquired_2(self): + session = self.session + + #publish some messages + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) + + #consume some of them + session.message_subscribe(queue = "q", destination = "a") + session.message_set_flow_mode(flow_mode = 0, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + + queue = session.incoming("a") + for i in range(1, 6): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + #complete and accept + session.message_accept(RangedSet(msg.id)) + #TODO: tidy up completion + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) + self.assertEmpty(queue) + + #now create a not-acquired subscriber + session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") + + #check it gets those not consumed + queue = session.incoming("b") + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") + for i in range(6, 11): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + session.message_release(RangedSet(msg.id)) + #TODO: tidy up completion + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") + self.assertEmpty(queue) + + #check all 'browsed' messages are still on the queue + self.assertEqual(5, session.queue_query(queue="q").message_count) + + def test_subscribe_not_acquired_3(self): + session = self.session + + #publish some messages + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) + + #create a not-acquired subscriber + session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + + #browse through messages + queue = session.incoming("a") + for i in range(1, 11): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + if (i % 2): + #try to acquire every second message + response = session.message_acquire(RangedSet(msg.id)) + #check that acquire succeeds + self.assert_(msg.id in response.transfers) + session.message_accept(RangedSet(msg.id)) + else: + session.message_release(RangedSet(msg.id)) + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) + self.assertEmpty(queue) + + #create a second not-acquired subscriber + session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") + #check it gets those not consumed + queue = session.incoming("b") + for i in [2,4,6,8,10]: + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + session.message_release(RangedSet(msg.id)) + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") + self.assertEmpty(queue) + + #check all 'browsed' messages are still on the queue + self.assertEqual(5, session.queue_query(queue="q").message_count) + + def test_release_unacquired(self): + session = self.session + + #create queue + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + #send message + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "my-message")) + + #create two 'browsers' + session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + queueA = session.incoming("a") + + session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "b") + queueB = session.incoming("b") + + #have each browser release the message + msgA = queueA.get(timeout = 1) + session.message_release(RangedSet(msgA.id)) + + msgB = queueB.get(timeout = 1) + session.message_release(RangedSet(msgB.id)) + + #cancel browsers + session.message_cancel(destination = "a") + session.message_cancel(destination = "b") + + #create consumer + session.message_subscribe(queue = "q", destination = "c") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "c") + queueC = session.incoming("c") + #consume the message then ack it + msgC = queueC.get(timeout = 1) + session.message_accept(RangedSet(msgC.id)) + #ensure there are no other messages + self.assertEmpty(queueC) + + def test_release_order(self): + session = self.session + + #create queue + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + #send messages + for i in range(1, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) + + #subscribe: + session.message_subscribe(queue="q", destination="a") + a = session.incoming("a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + + for i in range(1, 11): + msg = a.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + if (i % 2): + #accept all odd messages + session.message_accept(RangedSet(msg.id)) + else: + #release all even messages + session.message_release(RangedSet(msg.id)) + + #browse: + session.message_subscribe(queue="q", destination="b", acquire_mode=1) + b = session.incoming("b") + b.start() + for i in [2, 4, 6, 8, 10]: + msg = b.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + + + def test_empty_body(self): + session = self.session + session.queue_declare(queue="xyz", exclusive=True, auto_delete=True) + props = session.delivery_properties(routing_key="xyz") + session.message_transfer(message=Message(props, "")) + + consumer_tag = "tag1" + session.message_subscribe(queue="xyz", destination=consumer_tag) + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) + queue = session.incoming(consumer_tag) + msg = queue.get(timeout=1) + self.assertEquals("", msg.body) + session.message_accept(RangedSet(msg.id)) + + def test_incoming_start(self): + q = "test_incoming_start" + session = self.session + + session.queue_declare(queue=q, exclusive=True, auto_delete=True) + session.message_subscribe(queue=q, destination="msgs") + messages = session.incoming("msgs") + assert messages.destination == "msgs" + + dp = session.delivery_properties(routing_key=q) + session.message_transfer(message=Message(dp, "test")) + + messages.start() + msg = messages.get() + assert msg.body == "test" + + def test_ttl(self): + q = "test_ttl" + session = self.session + + session.queue_declare(queue=q, exclusive=True, auto_delete=True) + + dp = session.delivery_properties(routing_key=q, ttl=500)#expire in half a second + session.message_transfer(message=Message(dp, "first")) + + dp = session.delivery_properties(routing_key=q, ttl=300000)#expire in fives minutes + session.message_transfer(message=Message(dp, "second")) + + d = "msgs" + session.message_subscribe(queue=q, destination=d) + messages = session.incoming(d) + sleep(1) + session.message_flow(unit = session.credit_unit.message, value=2, destination=d) + session.message_flow(unit = session.credit_unit.byte, value=0xFFFFFFFFL, destination=d) + assert messages.get(timeout=1).body == "second" + self.assertEmpty(messages) + + + def assertDataEquals(self, session, msg, expected): + self.assertEquals(expected, msg.body) + + def assertEmpty(self, queue): + try: + extra = queue.get(timeout=1) + self.fail("Queue not empty, contains: " + extra.body) + except Empty: None + +class SizelessContent(Content): + + def size(self): + return None |