From 7c85133630b56456f45bb53b4f0aeba82b0974f9 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 25 Jul 2008 13:39:55 +0000 Subject: Only reduce count and size maintained for queue plicy when messages are actually dequeued (i.e. acked). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-10@679801 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Queue.cpp | 70 ++++++++++++++++++++----------------- cpp/src/qpid/broker/Queue.h | 5 ++- cpp/src/qpid/broker/QueuePolicy.cpp | 15 ++++++++ cpp/src/qpid/broker/QueuePolicy.h | 2 ++ 4 files changed, 59 insertions(+), 33 deletions(-) diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index becca8dfcf..1de998447e 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -250,7 +250,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) if (c.filter(msg.payload)) { if (c.accept(msg.payload)) { m = msg; - pop(); + messages.pop_front(); return true; } else { //message(s) are available but consumer hasn't got enough credit @@ -262,7 +262,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) if (canExcludeUnwanted()) { //hack for no-local on JMS topics; get rid of this message QPID_LOG(debug, "Excluding message from '" << name << "'"); - pop(); + messages.pop_front(); } else { //leave it for another consumer QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); @@ -385,7 +385,7 @@ QueuedMessage Queue::dequeue(){ if(!messages.empty()){ msg = messages.front(); - pop(); + messages.pop_front(); } return msg; } @@ -394,37 +394,11 @@ uint32_t Queue::purge(){ Mutex::ScopedLock locker(messageLock); int count = messages.size(); while(!messages.empty()) { - QueuedMessage& msg = messages.front(); - if (store && msg.payload->isPersistent()) { - boost::intrusive_ptr pmsg = - boost::static_pointer_cast(msg.payload); - store->dequeue(0, pmsg, *this); - } - pop(); + popAndDequeue(); } return count; } -/** - * Assumes messageLock is held - */ -void Queue::pop(){ - QueuedMessage& msg = messages.front(); - - if (policy.get()) policy->dequeued(msg.payload->contentSize()); - if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); - mgmtObject->inc_msgTotalDequeues (); - mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); - mgmtObject->dec_msgDepth (); - if (msg.payload->isPersistent ()){ - mgmtObject->inc_msgPersistDequeues (); - mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); - } - } - messages.pop_front(); -} - void Queue::push(boost::intrusive_ptr& msg){ Mutex::ScopedLock locker(messageLock); messages.push_back(QueuedMessage(this, msg, ++sequence)); @@ -441,7 +415,7 @@ void Queue::push(boost::intrusive_ptr& msg){ } else { QPID_LOG(error, "Message " << msg << " on " << name << " exceeds the policy for the queue but can't be released from memory as the queue is not durable"); - throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name)); + throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy)); } } else { if (policyExceeded) { @@ -495,6 +469,10 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr msg) // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr msg) { + { + Mutex::ScopedLock locker(messageLock); + dequeued(msg); + } if (msg->isPersistent() && store) { msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr pmsg = boost::static_pointer_cast(msg); @@ -505,6 +483,34 @@ bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr msg) return false; } +/** + * Removes a message from the in-memory delivery queue as well + * dequeing it from the logical (and persistent if applicable) queue + */ +void Queue::popAndDequeue() +{ + boost::intrusive_ptr msg = messages.front().payload; + messages.pop_front(); + dequeue(0, msg); +} + +/** + * Updates policy and management when a message has been dequeued, + * expects messageLock to be held + */ +void Queue::dequeued(boost::intrusive_ptr& msg) +{ + if (policy.get()) policy->dequeued(msg->contentSize()); + if (mgmtObject != 0){ + mgmtObject->inc_msgTotalDequeues (); + mgmtObject->inc_byteTotalDequeues (msg->contentSize()); + if (msg->isPersistent ()){ + mgmtObject->inc_msgPersistDequeues (); + mgmtObject->inc_bytePersistDequeues (msg->contentSize()); + } + } +} + namespace { @@ -554,7 +560,7 @@ void Queue::destroy() DeliverableMessage msg(messages.front().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); - pop(); + popAndDequeue(); } alternateExchange->decAlternateUsers(); } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index f56cee0f22..792bed323f 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -88,7 +88,6 @@ namespace qpid { framing::SequenceNumber sequence; management::Queue::shared_ptr mgmtObject; - void pop(); void push(boost::intrusive_ptr& msg); void setPolicy(std::auto_ptr policy); bool seek(QueuedMessage& msg, Consumer& position); @@ -103,6 +102,9 @@ namespace qpid { bool isExcluded(boost::intrusive_ptr& msg); + void dequeued(boost::intrusive_ptr& msg); + void popAndDequeue(); + public: virtual void notifyDurableIOComplete(); typedef boost::shared_ptr shared_ptr; @@ -169,6 +171,7 @@ namespace qpid { * dequeue from store (only done once messages is acknowledged) */ bool dequeue(TransactionContext* ctxt, boost::intrusive_ptr msg); + /** * dequeues from memory only */ diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index de84362f8f..08838aac79 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -71,3 +71,18 @@ const std::string QueuePolicy::maxCountKey("qpid.max_count"); const std::string QueuePolicy::maxSizeKey("qpid.max_size"); uint64_t QueuePolicy::defaultMaxSize(0); +namespace qpid { + namespace broker { + +std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) +{ + if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size; + else out << "size unlimited, current=" << p.size; + out << "; "; + if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count; + else out << "count unlimited, current=" << p.count; + return out; +} + + } +} diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h index 2135e327a7..4511a63b64 100644 --- a/cpp/src/qpid/broker/QueuePolicy.h +++ b/cpp/src/qpid/broker/QueuePolicy.h @@ -21,6 +21,7 @@ #ifndef _QueuePolicy_ #define _QueuePolicy_ +#include #include "qpid/framing/FieldTable.h" namespace qpid { @@ -50,6 +51,7 @@ namespace qpid { uint64_t getMaxSize() const { return maxSize; } static void setDefaultMaxSize(uint64_t); + friend std::ostream& operator<<(std::ostream&, const QueuePolicy&); }; } } -- cgit v1.2.1