summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-11 20:54:24 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-11 20:54:24 +0000
commitf8536f21056ac76648348df4e3bc543185f59abb (patch)
tree30efa4f6d174f6c3c93709a97adf7c39a3a63ce0
parentc58ae888d6d104bb1120d298bbc934e13d85b250 (diff)
downloadqpid-python-f8536f21056ac76648348df4e3bc543185f59abb.tar.gz
QPID-3543: correctly dequeue messages that are auto-acknowledged.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1182084 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp3
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/message.py33
2 files changed, 35 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 94d0cc87f7..dda481778d 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -348,7 +348,8 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
- record.accept( 0 /*no ctxt*/ );
+ queue->dequeue(0 /*ctxt*/, msg);
+ record.setEnded();
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
index 89ba936b05..a2b3cce0d2 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
@@ -954,6 +954,39 @@ class MessageTests(TestBase010):
self.assertEmpty(messages)
+ def test_auto_ack(self):
+ """
+ Test implicit accept function
+ """
+ self.startQmf()
+ session = self.session
+ session.queue_declare(queue = "auto-ack", exclusive=True, auto_delete=True)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="auto-ack"), "ackackack"))
+
+ # verify one enqueued message, use both QMF and session query to verify consistency
+ self.assertEqual(1, session.queue_query(queue="auto-ack").message_count)
+ for queue in self.qmf.getObjects(_class="queue"):
+ if queue.name == "auto-ack":
+ break;
+ self.assertEquals("auto-ack", queue.name)
+ self.assertEquals(queue.msgDepth, 1)
+ self.assertEquals(queue.msgTotalEnqueues, 1)
+ self.assertEquals(queue.msgTotalDequeues, 0)
+
+ # implicit acquire and acknowledge
+ session.message_subscribe(queue = "auto-ack", destination = "a", acquire_mode=0, accept_mode=1)
+ session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ msg = session.incoming("a").get(timeout = 1)
+ self.assertEquals("ackackack", msg.body)
+
+ #message should not be on the queue:
+ self.assertEqual(0, session.queue_query(queue="auto-ack").message_count)
+ queue.update()
+ self.assertEquals(queue.msgDepth, 0)
+ self.assertEquals(queue.msgTotalEnqueues, 1)
+ self.assertEquals(queue.msgTotalDequeues, 1)
+
def assertDataEquals(self, session, msg, expected):
self.assertEquals(expected, msg.body)