diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-11 20:54:24 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-11 20:54:24 +0000 |
commit | 42af48b0ecaf5d5dd62ef85400f913a85b9b00e4 (patch) | |
tree | 06142f455d736e8706b9eb0988a8c9993ce3e7f0 | |
parent | ec94396bd9c3e5e05376dbbc0254d3030e0b1728 (diff) | |
download | qpid-python-42af48b0ecaf5d5dd62ef85400f913a85b9b00e4.tar.gz |
QPID-3543: correctly dequeue messages that are auto-acknowledged.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1182084 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 3 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/message.py | 33 |
2 files changed, 35 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 94d0cc87f7..dda481778d 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -348,7 +348,8 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) parent->record(record); } if (acquire && !ackExpected) { // auto acquire && auto accept - record.accept( 0 /*no ctxt*/ ); + queue->dequeue(0 /*ctxt*/, msg); + record.setEnded(); } if (mgmtObject) { mgmtObject->inc_delivered(); } return true; diff --git a/tests/src/py/qpid_tests/broker_0_10/message.py b/tests/src/py/qpid_tests/broker_0_10/message.py index 89ba936b05..a2b3cce0d2 100644 --- a/tests/src/py/qpid_tests/broker_0_10/message.py +++ b/tests/src/py/qpid_tests/broker_0_10/message.py @@ -954,6 +954,39 @@ class MessageTests(TestBase010): self.assertEmpty(messages) + def test_auto_ack(self): + """ + Test implicit accept function + """ + self.startQmf() + session = self.session + session.queue_declare(queue = "auto-ack", exclusive=True, auto_delete=True) + session.message_transfer(message=Message(session.delivery_properties(routing_key="auto-ack"), "ackackack")) + + # verify one enqueued message, use both QMF and session query to verify consistency + self.assertEqual(1, session.queue_query(queue="auto-ack").message_count) + for queue in self.qmf.getObjects(_class="queue"): + if queue.name == "auto-ack": + break; + self.assertEquals("auto-ack", queue.name) + self.assertEquals(queue.msgDepth, 1) + self.assertEquals(queue.msgTotalEnqueues, 1) + self.assertEquals(queue.msgTotalDequeues, 0) + + # implicit acquire and acknowledge + session.message_subscribe(queue = "auto-ack", destination = "a", acquire_mode=0, accept_mode=1) + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + msg = session.incoming("a").get(timeout = 1) + self.assertEquals("ackackack", msg.body) + + #message should not be on the queue: + self.assertEqual(0, session.queue_query(queue="auto-ack").message_count) + queue.update() + self.assertEquals(queue.msgDepth, 0) + self.assertEquals(queue.msgTotalEnqueues, 1) + self.assertEquals(queue.msgTotalDequeues, 1) + def assertDataEquals(self, session, msg, expected): self.assertEquals(expected, msg.body) |