summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-07-25 13:39:55 +0000
committerGordon Sim <gsim@apache.org>2008-07-25 13:39:55 +0000
commit7c85133630b56456f45bb53b4f0aeba82b0974f9 (patch)
tree6f62a757014f20955306a421f55db80ac1c684e5
parentd8963856c5eff8523a92aa23a3664a52fa530a02 (diff)
downloadqpid-python-7c85133630b56456f45bb53b4f0aeba82b0974f9.tar.gz
Only reduce count and size maintained for queue plicy when messages are actually dequeued (i.e. acked).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-10@679801 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/Queue.cpp70
-rw-r--r--cpp/src/qpid/broker/Queue.h5
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp15
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.h2
4 files changed, 59 insertions, 33 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index becca8dfcf..1de998447e 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -250,7 +250,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
if (c.filter(msg.payload)) {
if (c.accept(msg.payload)) {
m = msg;
- pop();
+ messages.pop_front();
return true;
} else {
//message(s) are available but consumer hasn't got enough credit
@@ -262,7 +262,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
if (canExcludeUnwanted()) {
//hack for no-local on JMS topics; get rid of this message
QPID_LOG(debug, "Excluding message from '" << name << "'");
- pop();
+ messages.pop_front();
} else {
//leave it for another consumer
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
@@ -385,7 +385,7 @@ QueuedMessage Queue::dequeue(){
if(!messages.empty()){
msg = messages.front();
- pop();
+ messages.pop_front();
}
return msg;
}
@@ -394,37 +394,11 @@ uint32_t Queue::purge(){
Mutex::ScopedLock locker(messageLock);
int count = messages.size();
while(!messages.empty()) {
- QueuedMessage& msg = messages.front();
- if (store && msg.payload->isPersistent()) {
- boost::intrusive_ptr<PersistableMessage> pmsg =
- boost::static_pointer_cast<PersistableMessage>(msg.payload);
- store->dequeue(0, pmsg, *this);
- }
- pop();
+ popAndDequeue();
}
return count;
}
-/**
- * Assumes messageLock is held
- */
-void Queue::pop(){
- QueuedMessage& msg = messages.front();
-
- if (policy.get()) policy->dequeued(msg.payload->contentSize());
- if (mgmtObject.get() != 0){
- Mutex::ScopedLock mutex(mgmtObject->getLock());
- mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
- mgmtObject->dec_msgDepth ();
- if (msg.payload->isPersistent ()){
- mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
- }
- }
- messages.pop_front();
-}
-
void Queue::push(boost::intrusive_ptr<Message>& msg){
Mutex::ScopedLock locker(messageLock);
messages.push_back(QueuedMessage(this, msg, ++sequence));
@@ -441,7 +415,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
} else {
QPID_LOG(error, "Message " << msg << " on " << name
<< " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
- throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name));
+ throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
}
} else {
if (policyExceeded) {
@@ -495,6 +469,10 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
+ {
+ Mutex::ScopedLock locker(messageLock);
+ dequeued(msg);
+ }
if (msg->isPersistent() && store) {
msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
@@ -505,6 +483,34 @@ bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
return false;
}
+/**
+ * Removes a message from the in-memory delivery queue as well
+ * dequeing it from the logical (and persistent if applicable) queue
+ */
+void Queue::popAndDequeue()
+{
+ boost::intrusive_ptr<Message> msg = messages.front().payload;
+ messages.pop_front();
+ dequeue(0, msg);
+}
+
+/**
+ * Updates policy and management when a message has been dequeued,
+ * expects messageLock to be held
+ */
+void Queue::dequeued(boost::intrusive_ptr<Message>& msg)
+{
+ if (policy.get()) policy->dequeued(msg->contentSize());
+ if (mgmtObject != 0){
+ mgmtObject->inc_msgTotalDequeues ();
+ mgmtObject->inc_byteTotalDequeues (msg->contentSize());
+ if (msg->isPersistent ()){
+ mgmtObject->inc_msgPersistDequeues ();
+ mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+ }
+ }
+}
+
namespace
{
@@ -554,7 +560,7 @@ void Queue::destroy()
DeliverableMessage msg(messages.front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
msg.getMessage().getApplicationHeaders());
- pop();
+ popAndDequeue();
}
alternateExchange->decAlternateUsers();
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index f56cee0f22..792bed323f 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -88,7 +88,6 @@ namespace qpid {
framing::SequenceNumber sequence;
management::Queue::shared_ptr mgmtObject;
- void pop();
void push(boost::intrusive_ptr<Message>& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
bool seek(QueuedMessage& msg, Consumer& position);
@@ -103,6 +102,9 @@ namespace qpid {
bool isExcluded(boost::intrusive_ptr<Message>& msg);
+ void dequeued(boost::intrusive_ptr<Message>& msg);
+ void popAndDequeue();
+
public:
virtual void notifyDurableIOComplete();
typedef boost::shared_ptr<Queue> shared_ptr;
@@ -169,6 +171,7 @@ namespace qpid {
* dequeue from store (only done once messages is acknowledged)
*/
bool dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
+
/**
* dequeues from memory only
*/
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index de84362f8f..08838aac79 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -71,3 +71,18 @@ const std::string QueuePolicy::maxCountKey("qpid.max_count");
const std::string QueuePolicy::maxSizeKey("qpid.max_size");
uint64_t QueuePolicy::defaultMaxSize(0);
+namespace qpid {
+ namespace broker {
+
+std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
+{
+ if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
+ else out << "size unlimited, current=" << p.size;
+ out << "; ";
+ if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
+ else out << "count unlimited, current=" << p.count;
+ return out;
+}
+
+ }
+}
diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h
index 2135e327a7..4511a63b64 100644
--- a/cpp/src/qpid/broker/QueuePolicy.h
+++ b/cpp/src/qpid/broker/QueuePolicy.h
@@ -21,6 +21,7 @@
#ifndef _QueuePolicy_
#define _QueuePolicy_
+#include <iostream>
#include "qpid/framing/FieldTable.h"
namespace qpid {
@@ -50,6 +51,7 @@ namespace qpid {
uint64_t getMaxSize() const { return maxSize; }
static void setDefaultMaxSize(uint64_t);
+ friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
};
}
}