diff options
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/TxPublishTest.cpp | 14 |
3 files changed, 18 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index e2b12ef316..3ae2ce8de3 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -87,6 +87,7 @@ void Queue::deliver(Message::shared_ptr& msg){ void Queue::recover(Message::shared_ptr& msg){ push(msg); + msg->enqueueComplete(); // mark the message as enqueued if (store && msg->expectedContentSize() != msg->encodedContentSize()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this @@ -189,10 +190,13 @@ Message::shared_ptr Queue::dequeue(){ Message::shared_ptr msg; if(!messages.empty()){ msg = messages.front(); - if (msg->isEnqueueComplete()) + if (msg->isEnqueueComplete()){ pop(); + return msg; + } } - return msg; + Message::shared_ptr msg_empty; + return msg_empty; } uint32_t Queue::purge(){ diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 2daf3b2d0a..954c50faee 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -194,6 +194,7 @@ void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) { + msg->enqueueComplete(); // recoved nmessage to enqueued in store already buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg))); } diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp index 3391be5128..d009dd9112 100644 --- a/cpp/src/tests/TxPublishTest.cpp +++ b/cpp/src/tests/TxPublishTest.cpp @@ -44,7 +44,8 @@ class TxPublishTest : public CppUnit::TestCase void enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue) { - enqueued.push_back(msg_queue_pair(queue.getName(), &msg)); + msg.enqueueComplete(); + enqueued.push_back(msg_queue_pair(queue.getName(), &msg)); } //dont care about any of the other methods: @@ -61,7 +62,7 @@ class TxPublishTest : public CppUnit::TestCase TestMessageStore store; Queue::shared_ptr queue1; Queue::shared_ptr queue2; - Message::shared_ptr const msg; + Message::shared_ptr msg; TxPublish op; public: @@ -88,14 +89,21 @@ public: CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[0].second); CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first); CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[1].second); + CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg.get())->isEnqueueComplete()); + + } void testCommit() { //ensure messages are delivered to queue + op.prepare(0); op.commit(); CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue1->getMessageCount()); - CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue()); + Message::shared_ptr msg_dequeue = queue1->dequeue(); + + CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg_dequeue.get())->isEnqueueComplete()); + CPPUNIT_ASSERT_EQUAL(msg, msg_dequeue); CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue2->getMessageCount()); CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue()); |