diff options
Diffstat (limited to 'M4-RCs/qpid/python/tests_0-10/message.py')
-rw-r--r-- | M4-RCs/qpid/python/tests_0-10/message.py | 847 |
1 files changed, 0 insertions, 847 deletions
diff --git a/M4-RCs/qpid/python/tests_0-10/message.py b/M4-RCs/qpid/python/tests_0-10/message.py deleted file mode 100644 index cbcef5602f..0000000000 --- a/M4-RCs/qpid/python/tests_0-10/message.py +++ /dev/null @@ -1,847 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.testlib import TestBase010 -from qpid.datatypes import Message, RangedSet -from qpid.session import SessionException - -from qpid.content import Content -from time import sleep - -class MessageTests(TestBase010): - """Tests for 'methods' on the amqp message 'class'""" - - def test_no_local(self): - """ - NOTE: this is a test of a QPID specific feature - - Test that the qpid specific no_local arg is honoured. - """ - session = self.session - #setup, declare two queues one of which excludes delivery of locally sent messages - session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) - session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True, arguments={'no-local':'true'}) - #establish two consumers - self.subscribe(destination="local_included", queue="test-queue-1a") - self.subscribe(destination="local_excluded", queue="test-queue-1b") - - #send a message - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) - - #send a message from another session on the same connection to each queue - session2 = self.conn.session("my-local-session") - session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well")) - session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either")) - - #send a message from a session on another connection to each queue - for q in ["test-queue-1a", "test-queue-1b"]: - session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key") - other = self.connect() - session3 = other.session("my-other-session") - session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local")) - other.close() - - #check the queues of the two consumers - excluded = session.incoming("local_excluded") - included = session.incoming("local_included") - for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]: - msg = included.get(timeout=1) - self.assertEqual(b, msg.body) - msg = excluded.get(timeout=1) - self.assertEqual("i-am-not-local", msg.body) - try: - excluded.get(timeout=1) - self.fail("Received locally published message though no_local=true") - except Empty: None - - def test_no_local_awkward(self): - - """ - NOTE: this is a test of a QPID specific feature - - Check that messages which will be excluded through no-local - processing will not block subsequent deliveries - """ - - session = self.session - #setup: - session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'no-local':'true'}) - #establish consumer which excludes delivery of locally sent messages - self.subscribe(destination="local_excluded", queue="test-queue") - - #send a 'local' message - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "local")) - - #send a non local message - other = self.connect() - session2 = other.session("my-session", 1) - session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"), "foreign")) - session2.close() - other.close() - - #check that the second message only is delivered - excluded = session.incoming("local_excluded") - msg = excluded.get(timeout=1) - self.assertEqual("foreign", msg.body) - try: - excluded.get(timeout=1) - self.fail("Received extra message") - except Empty: None - #check queue is empty - self.assertEqual(0, session.queue_query(queue="test-queue").message_count) - - def test_no_local_exclusive_subscribe(self): - """ - NOTE: this is a test of a QPID specific feature - - Test that the no_local processing works on queues not declared - as exclusive, but with an exclusive subscription - """ - session = self.session - - #setup, declare two queues one of which excludes delivery of - #locally sent messages but is not declared as exclusive - session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) - session.queue_declare(queue="test-queue-1b", auto_delete=True, arguments={'no-local':'true'}) - #establish two consumers - self.subscribe(destination="local_included", queue="test-queue-1a") - self.subscribe(destination="local_excluded", queue="test-queue-1b", exclusive=True) - - #send a message from the same session to each queue - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) - - #send a message from another session on the same connection to each queue - session2 = self.conn.session("my-session") - session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well")) - session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either")) - - #send a message from a session on another connection to each queue - for q in ["test-queue-1a", "test-queue-1b"]: - session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key") - other = self.connect() - session3 = other.session("my-other-session") - session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local")) - other.close() - - #check the queues of the two consumers - excluded = session.incoming("local_excluded") - included = session.incoming("local_included") - for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]: - msg = included.get(timeout=1) - self.assertEqual(b, msg.body) - msg = excluded.get(timeout=1) - self.assertEqual("i-am-not-local", msg.body) - try: - excluded.get(timeout=1) - self.fail("Received locally published message though no_local=true") - except Empty: None - - - def test_consume_exclusive(self): - """ - Test an exclusive consumer prevents other consumer being created - """ - session = self.session - session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) - session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True) - try: - session.message_subscribe(destination="second", queue="test-queue-2") - self.fail("Expected consume request to fail due to previous exclusive consumer") - except SessionException, e: - self.assertEquals(405, e.args[0].error_code) - - def test_consume_exclusive2(self): - """ - Check that an exclusive consumer cannot be created if a consumer already exists: - """ - session = self.session - session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) - session.message_subscribe(destination="first", queue="test-queue-2") - try: - session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True) - self.fail("Expected exclusive consume request to fail due to previous consumer") - except SessionException, e: - self.assertEquals(405, e.args[0].error_code) - - def test_consume_queue_not_found(self): - """ - Test error conditions associated with the queue field of the consume method: - """ - session = self.session - try: - #queue specified but doesn't exist: - session.message_subscribe(queue="invalid-queue", destination="a") - self.fail("Expected failure when consuming from non-existent queue") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - def test_consume_queue_not_specified(self): - session = self.session - try: - #queue not specified and none previously declared for channel: - session.message_subscribe(destination="a") - self.fail("Expected failure when consuming from unspecified queue") - except SessionException, e: - self.assertEquals(531, e.args[0].error_code) - - def test_consume_unique_consumers(self): - """ - Ensure unique consumer tags are enforced - """ - session = self.session - #setup, declare a queue: - session.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True) - - #check that attempts to use duplicate tags are detected and prevented: - session.message_subscribe(destination="first", queue="test-queue-3") - try: - session.message_subscribe(destination="first", queue="test-queue-3") - self.fail("Expected consume request to fail due to non-unique tag") - except SessionException, e: - self.assertEquals(530, e.args[0].error_code) - - def test_cancel(self): - """ - Test compliance of the basic.cancel method - """ - session = self.session - #setup, declare a queue: - session.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True) - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One")) - - session.message_subscribe(destination="my-consumer", queue="test-queue-4") - myqueue = session.incoming("my-consumer") - session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFF) - session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF) - - #should flush here - - #cancel should stop messages being delivered - session.message_cancel(destination="my-consumer") - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two")) - msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.body) - try: - msg = myqueue.get(timeout=1) - self.fail("Got message after cancellation: " + msg) - except Empty: None - - #cancellation of non-existant consumers should be handled without error - session.message_cancel(destination="my-consumer") - session.message_cancel(destination="this-never-existed") - - - def test_ack(self): - """ - Test basic ack/recover behaviour - """ - session = self.conn.session("alternate-session", timeout=10) - session.queue_declare(queue="test-ack-queue", auto_delete=True) - - session.message_subscribe(queue = "test-ack-queue", destination = "consumer") - session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFF) - session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF) - queue = session.incoming("consumer") - - delivery_properties = session.delivery_properties(routing_key="test-ack-queue") - for i in ["One", "Two", "Three", "Four", "Five"]: - session.message_transfer(message=Message(delivery_properties, i)) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.body) - self.assertEqual("Two", msg2.body) - self.assertEqual("Three", msg3.body) - self.assertEqual("Four", msg4.body) - self.assertEqual("Five", msg5.body) - - session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four - - #subscribe from second session here to ensure queue is not - #auto-deleted when alternate session closes (no need to ack on these): - self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1) - - #now close the session, and see that the unacked messages are - #then redelivered to another subscriber: - session.close(timeout=10) - - session = self.session - session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFF) - session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFF) - queue = session.incoming("checker") - - msg3b = queue.get(timeout=1) - msg5b = queue.get(timeout=1) - - self.assertEqual("Three", msg3b.body) - self.assertEqual("Five", msg5b.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - def test_reject(self): - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout") - session.queue_declare(queue = "r", exclusive=True, auto_delete=True) - session.exchange_bind(queue = "r", exchange = "amq.fanout") - - session.message_subscribe(queue = "q", destination = "consumer") - session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFF) - session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF) - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah")) - msg = session.incoming("consumer").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") - session.message_reject(RangedSet(msg.id)) - - session.message_subscribe(queue = "r", destination = "checker") - session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFF) - session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFF) - msg = session.incoming("checker").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") - - def test_credit_flow_messages(self): - """ - Test basic credit based flow control with unit = message - """ - #declare an exclusive queue - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - #create consumer (for now that defaults to infinite credit) - session.message_subscribe(queue = "q", destination = "c") - session.message_set_flow_mode(flow_mode = 0, destination = "c") - #send batch of messages to queue - for i in range(1, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) - - #set message credit to finite amount (less than enough for all messages) - session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") - #set infinite byte credit - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "c") - #check that expected number were received - q = session.incoming("c") - for i in range(1, 6): - self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) - self.assertEmpty(q) - - #increase credit again and check more are received - for i in range(6, 11): - session.message_flow(unit = session.credit_unit.message, value = 1, destination = "c") - self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) - self.assertEmpty(q) - - def test_credit_flow_bytes(self): - """ - Test basic credit based flow control with unit = bytes - """ - #declare an exclusive queue - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - #create consumer (for now that defaults to infinite credit) - session.message_subscribe(queue = "q", destination = "c") - session.message_set_flow_mode(flow_mode = 0, destination = "c") - #send batch of messages to queue - for i in range(10): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) - - #each message is currently interpreted as requiring msg_size bytes of credit - msg_size = 19 - - #set byte credit to finite amount (less than enough for all messages) - session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c") - #set infinite message credit - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "c") - #check that expected number were received - q = session.incoming("c") - for i in range(5): - self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") - self.assertEmpty(q) - - #increase credit again and check more are received - for i in range(5): - session.message_flow(unit = session.credit_unit.byte, value = msg_size, destination = "c") - self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") - self.assertEmpty(q) - - - def test_window_flow_messages(self): - """ - Test basic window based flow control with unit = message - """ - #declare an exclusive queue - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - #create consumer (for now that defaults to infinite credit) - session.message_subscribe(queue = "q", destination = "c") - session.message_set_flow_mode(flow_mode = 1, destination = "c") - #send batch of messages to queue - for i in range(1, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) - - #set message credit to finite amount (less than enough for all messages) - session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") - #set infinite byte credit - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "c") - #check that expected number were received - q = session.incoming("c") - for i in range(1, 6): - msg = q.get(timeout = 1) - session.receiver._completed.add(msg.id)#TODO: this may be done automatically - self.assertDataEquals(session, msg, "Message %d" % i) - self.assertEmpty(q) - - #acknowledge messages and check more are received - #TODO: there may be a nicer way of doing this - session.channel.session_completed(session.receiver._completed) - - for i in range(6, 11): - self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) - self.assertEmpty(q) - - - def test_window_flow_bytes(self): - """ - Test basic window based flow control with unit = bytes - """ - #declare an exclusive queue - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - #create consumer (for now that defaults to infinite credit) - session.message_subscribe(queue = "q", destination = "c") - session.message_set_flow_mode(flow_mode = 1, destination = "c") - #send batch of messages to queue - for i in range(10): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) - - #each message is currently interpreted as requiring msg_size bytes of credit - msg_size = 19 - - #set byte credit to finite amount (less than enough for all messages) - session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c") - #set infinite message credit - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "c") - #check that expected number were received - q = session.incoming("c") - msgs = [] - for i in range(5): - msg = q.get(timeout = 1) - msgs.append(msg) - self.assertDataEquals(session, msg, "abcdefgh") - self.assertEmpty(q) - - #ack each message individually and check more are received - for i in range(5): - msg = msgs.pop() - #TODO: there may be a nicer way of doing this - session.receiver._completed.add(msg.id) - session.channel.session_completed(session.receiver._completed) - self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") - self.assertEmpty(q) - - def test_subscribe_not_acquired(self): - """ - Test the not-acquired modes works as expected for a simple case - """ - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - for i in range(1, 6): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) - - session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "a") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") - session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "b") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "b") - - for i in range(6, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) - - #both subscribers should see all messages - qA = session.incoming("a") - qB = session.incoming("b") - for i in range(1, 11): - for q in [qA, qB]: - msg = q.get(timeout = 1) - self.assertEquals("Message %s" % i, msg.body) - #TODO: tidy up completion - session.receiver._completed.add(msg.id) - - #TODO: tidy up completion - session.channel.session_completed(session.receiver._completed) - #messages should still be on the queue: - self.assertEquals(10, session.queue_query(queue = "q").message_count) - - def test_acquire_with_no_accept_and_credit_flow(self): - """ - Test that messages recieved unacquired, with accept not - required in windowing mode can be acquired. - """ - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) - - session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, accept_mode = 1) - session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "a") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") - msg = session.incoming("a").get(timeout = 1) - self.assertEquals("acquire me", msg.body) - #message should still be on the queue: - self.assertEquals(1, session.queue_query(queue = "q").message_count) - - transfers = RangedSet(msg.id) - response = session.message_acquire(transfers) - #check that we get notification (i.e. message_acquired) - self.assert_(msg.id in response.transfers) - #message should have been removed from the queue: - self.assertEquals(0, session.queue_query(queue = "q").message_count) - - def test_acquire(self): - """ - Test explicit acquire function - """ - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) - - session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) - session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFF) - session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFF) - msg = session.incoming("a").get(timeout = 1) - self.assertEquals("acquire me", msg.body) - #message should still be on the queue: - self.assertEquals(1, session.queue_query(queue = "q").message_count) - - transfers = RangedSet(msg.id) - response = session.message_acquire(transfers) - #check that we get notification (i.e. message_acquired) - self.assert_(msg.id in response.transfers) - #message should have been removed from the queue: - self.assertEquals(0, session.queue_query(queue = "q").message_count) - session.message_accept(transfers) - - - def test_release(self): - """ - Test explicit release function - """ - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me")) - - session.message_subscribe(queue = "q", destination = "a") - session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFF) - session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFF) - msg = session.incoming("a").get(timeout = 1) - self.assertEquals("release me", msg.body) - session.message_cancel(destination = "a") - session.message_release(RangedSet(msg.id)) - - #message should not have been removed from the queue: - self.assertEquals(1, session.queue_query(queue = "q").message_count) - - def test_release_ordering(self): - """ - Test order of released messages is as expected - """ - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - for i in range (1, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i))) - - session.message_subscribe(queue = "q", destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") - queue = session.incoming("a") - first = queue.get(timeout = 1) - for i in range(2, 10): - msg = queue.get(timeout = 1) - self.assertEquals("released message %s" % (i), msg.body) - - last = queue.get(timeout = 1) - self.assertEmpty(queue) - released = RangedSet() - released.add(first.id, last.id) - session.message_release(released) - - #TODO: may want to clean this up... - session.receiver._completed.add(first.id, last.id) - session.channel.session_completed(session.receiver._completed) - - for i in range(1, 11): - self.assertEquals("released message %s" % (i), queue.get(timeout = 1).body) - - def test_ranged_ack(self): - """ - Test acking of messages ranges - """ - session = self.conn.session("alternate-session", timeout=10) - - session.queue_declare(queue = "q", auto_delete=True) - delivery_properties = session.delivery_properties(routing_key="q") - for i in range (1, 11): - session.message_transfer(message=Message(delivery_properties, "message %s" % (i))) - - session.message_subscribe(queue = "q", destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") - queue = session.incoming("a") - ids = [] - for i in range (1, 11): - msg = queue.get(timeout = 1) - self.assertEquals("message %s" % (i), msg.body) - ids.append(msg.id) - - self.assertEmpty(queue) - - #ack all but the fourth message (command id 2) - accepted = RangedSet() - accepted.add(ids[0], ids[2]) - accepted.add(ids[4], ids[9]) - session.message_accept(accepted) - - #subscribe from second session here to ensure queue is not - #auto-deleted when alternate session closes (no need to ack on these): - self.session.message_subscribe(queue = "q", destination = "checker") - - #now close the session, and see that the unacked messages are - #then redelivered to another subscriber: - session.close(timeout=10) - - session = self.session - session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFF) - session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFF) - queue = session.incoming("checker") - - self.assertEquals("message 4", queue.get(timeout = 1).body) - self.assertEmpty(queue) - - def test_subscribe_not_acquired_2(self): - session = self.session - - #publish some messages - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - for i in range(1, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) - - #consume some of them - session.message_subscribe(queue = "q", destination = "a") - session.message_set_flow_mode(flow_mode = 0, destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") - - queue = session.incoming("a") - for i in range(1, 6): - msg = queue.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.body) - #complete and accept - session.message_accept(RangedSet(msg.id)) - #TODO: tidy up completion - session.receiver._completed.add(msg.id) - session.channel.session_completed(session.receiver._completed) - self.assertEmpty(queue) - - #now create a not-acquired subscriber - session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "b") - - #check it gets those not consumed - queue = session.incoming("b") - session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") - for i in range(6, 11): - msg = queue.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.body) - session.message_release(RangedSet(msg.id)) - #TODO: tidy up completion - session.receiver._completed.add(msg.id) - session.channel.session_completed(session.receiver._completed) - session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") - self.assertEmpty(queue) - - #check all 'browsed' messages are still on the queue - self.assertEqual(5, session.queue_query(queue="q").message_count) - - def test_subscribe_not_acquired_3(self): - session = self.session - - #publish some messages - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - for i in range(1, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) - - #create a not-acquired subscriber - session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") - - #browse through messages - queue = session.incoming("a") - for i in range(1, 11): - msg = queue.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.body) - if (i % 2): - #try to acquire every second message - response = session.message_acquire(RangedSet(msg.id)) - #check that acquire succeeds - self.assert_(msg.id in response.transfers) - session.message_accept(RangedSet(msg.id)) - else: - session.message_release(RangedSet(msg.id)) - session.receiver._completed.add(msg.id) - session.channel.session_completed(session.receiver._completed) - self.assertEmpty(queue) - - #create a second not-acquired subscriber - session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "b") - session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") - #check it gets those not consumed - queue = session.incoming("b") - for i in [2,4,6,8,10]: - msg = queue.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.body) - session.message_release(RangedSet(msg.id)) - session.receiver._completed.add(msg.id) - session.channel.session_completed(session.receiver._completed) - session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") - self.assertEmpty(queue) - - #check all 'browsed' messages are still on the queue - self.assertEqual(5, session.queue_query(queue="q").message_count) - - def test_release_unacquired(self): - session = self.session - - #create queue - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - - #send message - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "my-message")) - - #create two 'browsers' - session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") - queueA = session.incoming("a") - - session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "b") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "b") - queueB = session.incoming("b") - - #have each browser release the message - msgA = queueA.get(timeout = 1) - session.message_release(RangedSet(msgA.id)) - - msgB = queueB.get(timeout = 1) - session.message_release(RangedSet(msgB.id)) - - #cancel browsers - session.message_cancel(destination = "a") - session.message_cancel(destination = "b") - - #create consumer - session.message_subscribe(queue = "q", destination = "c") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "c") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "c") - queueC = session.incoming("c") - #consume the message then ack it - msgC = queueC.get(timeout = 1) - session.message_accept(RangedSet(msgC.id)) - #ensure there are no other messages - self.assertEmpty(queueC) - - def test_empty_body(self): - session = self.session - session.queue_declare(queue="xyz", exclusive=True, auto_delete=True) - props = session.delivery_properties(routing_key="xyz") - session.message_transfer(message=Message(props, "")) - - consumer_tag = "tag1" - session.message_subscribe(queue="xyz", destination=consumer_tag) - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = consumer_tag) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = consumer_tag) - queue = session.incoming(consumer_tag) - msg = queue.get(timeout=1) - self.assertEquals("", msg.body) - session.message_accept(RangedSet(msg.id)) - - def test_incoming_start(self): - q = "test_incoming_start" - session = self.session - - session.queue_declare(queue=q, exclusive=True, auto_delete=True) - session.message_subscribe(queue=q, destination="msgs") - messages = session.incoming("msgs") - assert messages.destination == "msgs" - - dp = session.delivery_properties(routing_key=q) - session.message_transfer(message=Message(dp, "test")) - - messages.start() - msg = messages.get() - assert msg.body == "test" - - def test_ttl(self): - q = "test_ttl" - session = self.session - - session.queue_declare(queue=q, exclusive=True, auto_delete=True) - - dp = session.delivery_properties(routing_key=q, ttl=500)#expire in half a second - session.message_transfer(message=Message(dp, "first")) - - dp = session.delivery_properties(routing_key=q, ttl=300000)#expire in fives minutes - session.message_transfer(message=Message(dp, "second")) - - d = "msgs" - session.message_subscribe(queue=q, destination=d) - messages = session.incoming(d) - sleep(1) - session.message_flow(unit = session.credit_unit.message, value=2, destination=d) - session.message_flow(unit = session.credit_unit.byte, value=0xFFFFFFFF, destination=d) - assert messages.get(timeout=1).body == "second" - self.assertEmpty(messages) - - - def assertDataEquals(self, session, msg, expected): - self.assertEquals(expected, msg.body) - - def assertEmpty(self, queue): - try: - extra = queue.get(timeout=1) - self.fail("Queue not empty, contains: " + extra.body) - except Empty: None - -class SizelessContent(Content): - - def size(self): - return None |