diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-13 14:26:06 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-13 14:26:06 +0000 |
commit | 927b8dfc86563f97e2ea7498f4ff8439d1c3387a (patch) | |
tree | c724e35f40f1899378af08f78775353f3cbd3e30 | |
parent | 8cd5af1b470ce9db501c65b2eec2f42a56a75490 (diff) | |
download | qpid-python-927b8dfc86563f97e2ea7498f4ff8439d1c3387a.tar.gz |
QPID-3543: augment existing test to check for implicit accept error instead of introducing a new test
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1182874 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/message.py | 141 |
1 files changed, 85 insertions, 56 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py index a2b3cce0d2..6c864bcd13 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py @@ -262,19 +262,29 @@ class MessageTests(TestBase010): def test_ack(self): """ - Test basic ack/recover behaviour + Test basic ack/recover behaviour using a combination of implicit and + explicit accept subscriptions. """ - session = self.conn.session("alternate-session", timeout=10) - session.queue_declare(queue="test-ack-queue", auto_delete=True) - - session.message_subscribe(queue = "test-ack-queue", destination = "consumer") - session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - queue = session.incoming("consumer") + self.startQmf() + session1 = self.conn.session("alternate-session", timeout=10) + session1.queue_declare(queue="test-ack-queue", auto_delete=True) - delivery_properties = session.delivery_properties(routing_key="test-ack-queue") + delivery_properties = session1.delivery_properties(routing_key="test-ack-queue") for i in ["One", "Two", "Three", "Four", "Five"]: - session.message_transfer(message=Message(delivery_properties, i)) + session1.message_transfer(message=Message(delivery_properties, i)) + + # verify enqueued message count, use both QMF and session query to verify consistency + self.assertEqual(5, session1.queue_query(queue="test-ack-queue").message_count) + queueObj = self.qmf.getObjects(_class="queue", name="test-ack-queue")[0] + self.assertEquals(queueObj.msgDepth, 5) + self.assertEquals(queueObj.msgTotalEnqueues, 5) + self.assertEquals(queueObj.msgTotalDequeues, 0) + + # subscribe with implied acquire, explicit accept: + session1.message_subscribe(queue = "test-ack-queue", destination = "consumer") + session1.message_flow(destination="consumer", unit=session1.credit_unit.message, value=0xFFFFFFFFL) + session1.message_flow(destination="consumer", unit=session1.credit_unit.byte, value=0xFFFFFFFFL) + queue = session1.incoming("consumer") msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) @@ -288,20 +298,46 @@ class MessageTests(TestBase010): self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) - session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four - - #subscribe from second session here to ensure queue is not - #auto-deleted when alternate session closes (no need to ack on these): - self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1) - - #now close the session, and see that the unacked messages are + # messages should not be on the queue: + self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count) + # QMF shows the dequeues as not having happened yet, since they are have + # not been accepted + queueObj.update() + self.assertEquals(queueObj.msgDepth, 5) + self.assertEquals(queueObj.msgTotalEnqueues, 5) + self.assertEquals(queueObj.msgTotalDequeues, 0) + + session1.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four + + # QMF should now reflect the accepted messages as being dequeued + self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count) + queueObj.update() + self.assertEquals(queueObj.msgDepth, 2) + self.assertEquals(queueObj.msgTotalEnqueues, 5) + self.assertEquals(queueObj.msgTotalDequeues, 3) + + #subscribe from second session here to ensure queue is not auto-deleted + #when alternate session closes. Use implicit accept mode to test that + #we don't need to explicitly accept + session2 = self.conn.session("alternate-session-2", timeout=10) + session2.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1) + + #now close the first session, and see that the unaccepted messages are #then redelivered to another subscriber: - session.close(timeout=10) + session1.close(timeout=10) - session = self.session - session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - queue = session.incoming("checker") + # check the statistics - the queue_query will show the non-accepted + # messages have been released. QMF never considered them dequeued, so + # those counts won't change + self.assertEqual(2, session2.queue_query(queue="test-ack-queue").message_count) + queueObj.update() + self.assertEquals(queueObj.msgDepth, 2) + self.assertEquals(queueObj.msgTotalEnqueues, 5) + self.assertEquals(queueObj.msgTotalDequeues, 3) + + session2.message_flow(destination="checker", unit=session2.credit_unit.message, value=0xFFFFFFFFL) + session2.message_flow(destination="checker", unit=session2.credit_unit.byte, value=0xFFFFFFFFL) + queue = session2.incoming("checker") msg3b = queue.get(timeout=1) msg5b = queue.get(timeout=1) @@ -314,6 +350,33 @@ class MessageTests(TestBase010): self.fail("Got unexpected message: " + extra.body) except Empty: None + self.assertEqual(0, session2.queue_query(queue="test-ack-queue").message_count) + queueObj.update() + self.assertEquals(queueObj.msgDepth, 0) + self.assertEquals(queueObj.msgTotalEnqueues, 5) + self.assertEquals(queueObj.msgTotalDequeues, 5) + + # Subscribe one last time to keep the queue available, and to verify + # that the implied accept worked by verifying no messages have been + # returned when session2 is closed. + self.session.message_subscribe(queue = "test-ack-queue", destination = "final-checker") + + session2.close(timeout=10) + + # check the statistics - they should not have changed + self.assertEqual(0, self.session.queue_query(queue="test-ack-queue").message_count) + queueObj.update() + self.assertEquals(queueObj.msgDepth, 0) + self.assertEquals(queueObj.msgTotalEnqueues, 5) + self.assertEquals(queueObj.msgTotalDequeues, 5) + + self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.message, value=0xFFFFFFFFL) + self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL) + try: + extra = self.session.incoming("final-checker").get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + def test_reject(self): session = self.session session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout") @@ -953,40 +1016,6 @@ class MessageTests(TestBase010): assert messages.get(timeout=1).body == "second" self.assertEmpty(messages) - - def test_auto_ack(self): - """ - Test implicit accept function - """ - self.startQmf() - session = self.session - session.queue_declare(queue = "auto-ack", exclusive=True, auto_delete=True) - session.message_transfer(message=Message(session.delivery_properties(routing_key="auto-ack"), "ackackack")) - - # verify one enqueued message, use both QMF and session query to verify consistency - self.assertEqual(1, session.queue_query(queue="auto-ack").message_count) - for queue in self.qmf.getObjects(_class="queue"): - if queue.name == "auto-ack": - break; - self.assertEquals("auto-ack", queue.name) - self.assertEquals(queue.msgDepth, 1) - self.assertEquals(queue.msgTotalEnqueues, 1) - self.assertEquals(queue.msgTotalDequeues, 0) - - # implicit acquire and acknowledge - session.message_subscribe(queue = "auto-ack", destination = "a", acquire_mode=0, accept_mode=1) - session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - msg = session.incoming("a").get(timeout = 1) - self.assertEquals("ackackack", msg.body) - - #message should not be on the queue: - self.assertEqual(0, session.queue_query(queue="auto-ack").message_count) - queue.update() - self.assertEquals(queue.msgDepth, 0) - self.assertEquals(queue.msgTotalEnqueues, 1) - self.assertEquals(queue.msgTotalDequeues, 1) - def assertDataEquals(self, session, msg, expected): self.assertEquals(expected, msg.body) |