summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp8
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp1
-rw-r--r--cpp/src/tests/TxPublishTest.cpp14
3 files changed, 18 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index e2b12ef316..3ae2ce8de3 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -87,6 +87,7 @@ void Queue::deliver(Message::shared_ptr& msg){
void Queue::recover(Message::shared_ptr& msg){
push(msg);
+ msg->enqueueComplete(); // mark the message as enqueued
if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
@@ -189,10 +190,13 @@ Message::shared_ptr Queue::dequeue(){
Message::shared_ptr msg;
if(!messages.empty()){
msg = messages.front();
- if (msg->isEnqueueComplete())
+ if (msg->isEnqueueComplete()){
pop();
+ return msg;
+ }
}
- return msg;
+ Message::shared_ptr msg_empty;
+ return msg_empty;
}
uint32_t Queue::purge(){
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 2daf3b2d0a..954c50faee 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -194,6 +194,7 @@ void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared
void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
{
+ msg->enqueueComplete(); // recoved nmessage to enqueued in store already
buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
}
diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp
index 3391be5128..d009dd9112 100644
--- a/cpp/src/tests/TxPublishTest.cpp
+++ b/cpp/src/tests/TxPublishTest.cpp
@@ -44,7 +44,8 @@ class TxPublishTest : public CppUnit::TestCase
void enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue)
{
- enqueued.push_back(msg_queue_pair(queue.getName(), &msg));
+ msg.enqueueComplete();
+ enqueued.push_back(msg_queue_pair(queue.getName(), &msg));
}
//dont care about any of the other methods:
@@ -61,7 +62,7 @@ class TxPublishTest : public CppUnit::TestCase
TestMessageStore store;
Queue::shared_ptr queue1;
Queue::shared_ptr queue2;
- Message::shared_ptr const msg;
+ Message::shared_ptr msg;
TxPublish op;
public:
@@ -88,14 +89,21 @@ public:
CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[0].second);
CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[1].second);
+ CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg.get())->isEnqueueComplete());
+
+
}
void testCommit()
{
//ensure messages are delivered to queue
+ op.prepare(0);
op.commit();
CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue1->getMessageCount());
- CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue());
+ Message::shared_ptr msg_dequeue = queue1->dequeue();
+
+ CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg_dequeue.get())->isEnqueueComplete());
+ CPPUNIT_ASSERT_EQUAL(msg, msg_dequeue);
CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue2->getMessageCount());
CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());