summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-13 14:26:06 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-13 14:26:06 +0000
commit271b8d6270a15ddef43e3490ae8642378c438715 (patch)
treece9468d22258e7db4a617e19e8638d3a8cfb291e
parent682fc57dff2733aab8a3bd1f0c195bbf25c707c1 (diff)
downloadqpid-python-271b8d6270a15ddef43e3490ae8642378c438715.tar.gz
QPID-3543: augment existing test to check for implicit accept error instead of introducing a new test
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1182874 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/message.py141
1 files changed, 85 insertions, 56 deletions
diff --git a/tests/src/py/qpid_tests/broker_0_10/message.py b/tests/src/py/qpid_tests/broker_0_10/message.py
index a2b3cce0d2..6c864bcd13 100644
--- a/tests/src/py/qpid_tests/broker_0_10/message.py
+++ b/tests/src/py/qpid_tests/broker_0_10/message.py
@@ -262,19 +262,29 @@ class MessageTests(TestBase010):
def test_ack(self):
"""
- Test basic ack/recover behaviour
+ Test basic ack/recover behaviour using a combination of implicit and
+ explicit accept subscriptions.
"""
- session = self.conn.session("alternate-session", timeout=10)
- session.queue_declare(queue="test-ack-queue", auto_delete=True)
-
- session.message_subscribe(queue = "test-ack-queue", destination = "consumer")
- session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
- queue = session.incoming("consumer")
+ self.startQmf()
+ session1 = self.conn.session("alternate-session", timeout=10)
+ session1.queue_declare(queue="test-ack-queue", auto_delete=True)
- delivery_properties = session.delivery_properties(routing_key="test-ack-queue")
+ delivery_properties = session1.delivery_properties(routing_key="test-ack-queue")
for i in ["One", "Two", "Three", "Four", "Five"]:
- session.message_transfer(message=Message(delivery_properties, i))
+ session1.message_transfer(message=Message(delivery_properties, i))
+
+ # verify enqueued message count, use both QMF and session query to verify consistency
+ self.assertEqual(5, session1.queue_query(queue="test-ack-queue").message_count)
+ queueObj = self.qmf.getObjects(_class="queue", name="test-ack-queue")[0]
+ self.assertEquals(queueObj.msgDepth, 5)
+ self.assertEquals(queueObj.msgTotalEnqueues, 5)
+ self.assertEquals(queueObj.msgTotalDequeues, 0)
+
+ # subscribe with implied acquire, explicit accept:
+ session1.message_subscribe(queue = "test-ack-queue", destination = "consumer")
+ session1.message_flow(destination="consumer", unit=session1.credit_unit.message, value=0xFFFFFFFFL)
+ session1.message_flow(destination="consumer", unit=session1.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session1.incoming("consumer")
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
@@ -288,20 +298,46 @@ class MessageTests(TestBase010):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four
-
- #subscribe from second session here to ensure queue is not
- #auto-deleted when alternate session closes (no need to ack on these):
- self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)
-
- #now close the session, and see that the unacked messages are
+ # messages should not be on the queue:
+ self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count)
+ # QMF shows the dequeues as not having happened yet, since they are have
+ # not been accepted
+ queueObj.update()
+ self.assertEquals(queueObj.msgDepth, 5)
+ self.assertEquals(queueObj.msgTotalEnqueues, 5)
+ self.assertEquals(queueObj.msgTotalDequeues, 0)
+
+ session1.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four
+
+ # QMF should now reflect the accepted messages as being dequeued
+ self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count)
+ queueObj.update()
+ self.assertEquals(queueObj.msgDepth, 2)
+ self.assertEquals(queueObj.msgTotalEnqueues, 5)
+ self.assertEquals(queueObj.msgTotalDequeues, 3)
+
+ #subscribe from second session here to ensure queue is not auto-deleted
+ #when alternate session closes. Use implicit accept mode to test that
+ #we don't need to explicitly accept
+ session2 = self.conn.session("alternate-session-2", timeout=10)
+ session2.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)
+
+ #now close the first session, and see that the unaccepted messages are
#then redelivered to another subscriber:
- session.close(timeout=10)
+ session1.close(timeout=10)
- session = self.session
- session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
- queue = session.incoming("checker")
+ # check the statistics - the queue_query will show the non-accepted
+ # messages have been released. QMF never considered them dequeued, so
+ # those counts won't change
+ self.assertEqual(2, session2.queue_query(queue="test-ack-queue").message_count)
+ queueObj.update()
+ self.assertEquals(queueObj.msgDepth, 2)
+ self.assertEquals(queueObj.msgTotalEnqueues, 5)
+ self.assertEquals(queueObj.msgTotalDequeues, 3)
+
+ session2.message_flow(destination="checker", unit=session2.credit_unit.message, value=0xFFFFFFFFL)
+ session2.message_flow(destination="checker", unit=session2.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session2.incoming("checker")
msg3b = queue.get(timeout=1)
msg5b = queue.get(timeout=1)
@@ -314,6 +350,33 @@ class MessageTests(TestBase010):
self.fail("Got unexpected message: " + extra.body)
except Empty: None
+ self.assertEqual(0, session2.queue_query(queue="test-ack-queue").message_count)
+ queueObj.update()
+ self.assertEquals(queueObj.msgDepth, 0)
+ self.assertEquals(queueObj.msgTotalEnqueues, 5)
+ self.assertEquals(queueObj.msgTotalDequeues, 5)
+
+ # Subscribe one last time to keep the queue available, and to verify
+ # that the implied accept worked by verifying no messages have been
+ # returned when session2 is closed.
+ self.session.message_subscribe(queue = "test-ack-queue", destination = "final-checker")
+
+ session2.close(timeout=10)
+
+ # check the statistics - they should not have changed
+ self.assertEqual(0, self.session.queue_query(queue="test-ack-queue").message_count)
+ queueObj.update()
+ self.assertEquals(queueObj.msgDepth, 0)
+ self.assertEquals(queueObj.msgTotalEnqueues, 5)
+ self.assertEquals(queueObj.msgTotalDequeues, 5)
+
+ self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.message, value=0xFFFFFFFFL)
+ self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL)
+ try:
+ extra = self.session.incoming("final-checker").get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
def test_reject(self):
session = self.session
session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
@@ -953,40 +1016,6 @@ class MessageTests(TestBase010):
assert messages.get(timeout=1).body == "second"
self.assertEmpty(messages)
-
- def test_auto_ack(self):
- """
- Test implicit accept function
- """
- self.startQmf()
- session = self.session
- session.queue_declare(queue = "auto-ack", exclusive=True, auto_delete=True)
- session.message_transfer(message=Message(session.delivery_properties(routing_key="auto-ack"), "ackackack"))
-
- # verify one enqueued message, use both QMF and session query to verify consistency
- self.assertEqual(1, session.queue_query(queue="auto-ack").message_count)
- for queue in self.qmf.getObjects(_class="queue"):
- if queue.name == "auto-ack":
- break;
- self.assertEquals("auto-ack", queue.name)
- self.assertEquals(queue.msgDepth, 1)
- self.assertEquals(queue.msgTotalEnqueues, 1)
- self.assertEquals(queue.msgTotalDequeues, 0)
-
- # implicit acquire and acknowledge
- session.message_subscribe(queue = "auto-ack", destination = "a", acquire_mode=0, accept_mode=1)
- session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
- msg = session.incoming("a").get(timeout = 1)
- self.assertEquals("ackackack", msg.body)
-
- #message should not be on the queue:
- self.assertEqual(0, session.queue_query(queue="auto-ack").message_count)
- queue.update()
- self.assertEquals(queue.msgDepth, 0)
- self.assertEquals(queue.msgTotalEnqueues, 1)
- self.assertEquals(queue.msgTotalDequeues, 1)
-
def assertDataEquals(self, session, msg, expected):
self.assertEquals(expected, msg.body)