From b24711832e66d887c4d4879cc167c3ba403e120b Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Mon, 4 Apr 2011 14:25:25 +0000 Subject: QPID-3174: remove unnecessary enqueueComplete() calls (merge of r1087868 and r1088539) git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.10@1088634 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Queue.cpp | 6 +++--- qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp | 1 - qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp | 1 - qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 1 - qpid/cpp/src/qpid/broker/TxPublish.cpp | 9 +-------- 5 files changed, 4 insertions(+), 14 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 7a266fb4e3..8efa8be3dc 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -183,7 +183,6 @@ void Queue::recover(boost::intrusive_ptr& msg){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure msg->addToSyncList(shared_from_this(), store); } - msg->enqueueComplete(); // mark the message as enqueued if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) { //content has not been loaded, need to ensure that lazy loading mode is set: @@ -210,7 +209,6 @@ void Queue::requeue(const QueuedMessage& msg){ { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; - msg.payload->enqueueComplete(); // mark the message as enqueued messages->reinsert(msg); listeners.populate(copy); @@ -632,7 +630,9 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr& msg } if ((msg->isPersistent() || msg->checkContentReleasable()) && store) { - msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete() + // when it considers the message stored. + msg->enqueueAsync(shared_from_this(), store); boost::intrusive_ptr pmsg = boost::static_pointer_cast(msg); store->enqueue(ctxt, pmsg, *this); return true; diff --git a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp index 38cb8043c9..cd6735328f 100644 --- a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp @@ -43,7 +43,6 @@ void RecoveredDequeue::commit() throw() void RecoveredDequeue::rollback() throw() { - msg->enqueueComplete(); queue->process(msg); } diff --git a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp index 6263c63e3d..6d2eaee6c4 100644 --- a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp @@ -36,7 +36,6 @@ bool RecoveredEnqueue::prepare(TransactionContext*) throw(){ } void RecoveredEnqueue::commit() throw(){ - msg->enqueueComplete(); queue->process(msg); } diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 6f8b125b8b..d08409695e 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -252,7 +252,6 @@ void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) { - msg->enqueueComplete(); // recoved nmessage to enqueued in store already buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg))); } diff --git a/qpid/cpp/src/qpid/broker/TxPublish.cpp b/qpid/cpp/src/qpid/broker/TxPublish.cpp index 36a451e62c..9c2cf4a467 100644 --- a/qpid/cpp/src/qpid/broker/TxPublish.cpp +++ b/qpid/cpp/src/qpid/broker/TxPublish.cpp @@ -90,14 +90,7 @@ void TxPublish::deliverTo(const boost::shared_ptr& queue){ void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr queue) { - if (!queue->enqueue(ctxt, msg)){ - /** - * if not store then mark message for ack and deleivery once - * commit happens, as async IO will never set it when no store - * exists - */ - msg->enqueueComplete(); - } + queue->enqueue(ctxt, msg); } TxPublish::Commit::Commit(intrusive_ptr& _msg) : msg(_msg){} -- cgit v1.2.1