summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-11 20:54:24 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-11 20:54:24 +0000
commit42af48b0ecaf5d5dd62ef85400f913a85b9b00e4 (patch)
tree06142f455d736e8706b9eb0988a8c9993ce3e7f0
parentec94396bd9c3e5e05376dbbc0254d3030e0b1728 (diff)
downloadqpid-python-42af48b0ecaf5d5dd62ef85400f913a85b9b00e4.tar.gz
QPID-3543: correctly dequeue messages that are auto-acknowledged.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1182084 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp3
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/message.py33
2 files changed, 35 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 94d0cc87f7..dda481778d 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -348,7 +348,8 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
- record.accept( 0 /*no ctxt*/ );
+ queue->dequeue(0 /*ctxt*/, msg);
+ record.setEnded();
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
diff --git a/tests/src/py/qpid_tests/broker_0_10/message.py b/tests/src/py/qpid_tests/broker_0_10/message.py
index 89ba936b05..a2b3cce0d2 100644
--- a/tests/src/py/qpid_tests/broker_0_10/message.py
+++ b/tests/src/py/qpid_tests/broker_0_10/message.py
@@ -954,6 +954,39 @@ class MessageTests(TestBase010):
self.assertEmpty(messages)
+ def test_auto_ack(self):
+ """
+ Test implicit accept function
+ """
+ self.startQmf()
+ session = self.session
+ session.queue_declare(queue = "auto-ack", exclusive=True, auto_delete=True)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="auto-ack"), "ackackack"))
+
+ # verify one enqueued message, use both QMF and session query to verify consistency
+ self.assertEqual(1, session.queue_query(queue="auto-ack").message_count)
+ for queue in self.qmf.getObjects(_class="queue"):
+ if queue.name == "auto-ack":
+ break;
+ self.assertEquals("auto-ack", queue.name)
+ self.assertEquals(queue.msgDepth, 1)
+ self.assertEquals(queue.msgTotalEnqueues, 1)
+ self.assertEquals(queue.msgTotalDequeues, 0)
+
+ # implicit acquire and acknowledge
+ session.message_subscribe(queue = "auto-ack", destination = "a", acquire_mode=0, accept_mode=1)
+ session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ msg = session.incoming("a").get(timeout = 1)
+ self.assertEquals("ackackack", msg.body)
+
+ #message should not be on the queue:
+ self.assertEqual(0, session.queue_query(queue="auto-ack").message_count)
+ queue.update()
+ self.assertEquals(queue.msgDepth, 0)
+ self.assertEquals(queue.msgTotalEnqueues, 1)
+ self.assertEquals(queue.msgTotalDequeues, 1)
+
def assertDataEquals(self, session, msg, expected):
self.assertEquals(expected, msg.body)