diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 830b4215c7..8f7f275fe6 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -167,11 +167,11 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg) { - if (policy.get()) policy->recoverEnqueued(msg); // KAG INC COUNTERS + if (policy.get()) policy->recoverEnqueued(msg); } void Queue::recover(boost::intrusive_ptr<Message>& msg){ - if (policy.get()) policy->recoverEnqueued(msg); // KAG INC COUNTERS + if (policy.get()) policy->recoverEnqueued(msg); push(msg, true); if (store){ @@ -352,7 +352,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) //consumer wants the message c->position = msg.position; m = msg; - if (!lastValueQueueNoBrowse) clearLVQIndex(msg); // prevent this msg from being replaced by LVQ + if (!lastValueQueueNoBrowse) clearLVQIndex(msg); if (lastValueQueue) { boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); if (replacement.get()) m.payload = replacement; @@ -643,7 +643,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); } if (policy.get()) { - policy->enqueued(qm); // KAG STORE COPY + policy->enqueued(qm); } if (flowLimit.get()) flowLimit->consume(qm); } @@ -748,7 +748,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg Messages dequeues; { Mutex::ScopedLock locker(messageLock); - policy->tryEnqueue(msg); // KAG INC COUNTERS + policy->tryEnqueue(msg); policy->getPendingDequeues(dequeues); } //depending on policy, may have some dequeues that need to performed without holding the lock @@ -786,7 +786,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) { Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->enqueueAborted(msg); // KAG DEC COUNTERS + if (policy.get()) policy->enqueueAborted(msg); } // return true if store exists, @@ -843,7 +843,7 @@ void Queue::popAndDequeue() */ void Queue::dequeued(const QueuedMessage& msg) { - if (policy.get()) policy->dequeued(msg); // KAG REMOVE COPY, DEC COUNTERS + if (policy.get()) policy->dequeued(msg); if (flowLimit.get()) flowLimit->replenish(msg); mgntDeqStats(msg.payload); if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { @@ -1181,8 +1181,8 @@ void Queue::enqueued(const QueuedMessage& m) { if (m.payload) { if (policy.get()) { - policy->recoverEnqueued(m.payload); // KAG INC COUNTERS - policy->enqueued(m); // KAG STORE COPY + policy->recoverEnqueued(m.payload); + policy->enqueued(m); } if (flowLimit.get()) flowLimit->consume(m); mgntEnqStats(m.payload); |