summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/broker/BrokerQueue.cpp9
-rw-r--r--cpp/lib/broker/QueuePolicy.cpp24
-rw-r--r--cpp/lib/broker/QueuePolicy.h9
3 files changed, 20 insertions, 22 deletions
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp
index a8c5343ca3..bfea1918a4 100644
--- a/cpp/lib/broker/BrokerQueue.cpp
+++ b/cpp/lib/broker/BrokerQueue.cpp
@@ -161,14 +161,19 @@ u_int32_t Queue::purge(){
}
void Queue::pop(){
- if (policy.get()) policy->dequeued(messages.front(), store);
+ if (policy.get()) policy->dequeued(messages.front()->contentSize());
messages.pop();
}
void Queue::push(Message::shared_ptr& msg){
queueing = true;
messages.push(msg);
- if (policy.get()) policy->enqueued(messages.front(), store);
+ if (policy.get()) {
+ policy->enqueued(msg->contentSize());
+ if (policy->limitExceeded()) {
+ msg->releaseContent(store);
+ }
+ }
}
u_int32_t Queue::getMessageCount() const{
diff --git a/cpp/lib/broker/QueuePolicy.cpp b/cpp/lib/broker/QueuePolicy.cpp
index 055d415226..e13fd62fc6 100644
--- a/cpp/lib/broker/QueuePolicy.cpp
+++ b/cpp/lib/broker/QueuePolicy.cpp
@@ -24,33 +24,27 @@ using namespace qpid::broker;
using namespace qpid::framing;
QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) :
- maxCount(_maxCount), maxSize(_maxSize) {}
+ maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {}
QueuePolicy::QueuePolicy(const FieldTable& settings) :
maxCount(getInt(settings, maxCountKey, 0)),
- maxSize(getInt(settings, maxSizeKey, 0)) {}
+ maxSize(getInt(settings, maxSizeKey, 0)), count(0), size(0) {}
-void QueuePolicy::enqueued(Message::shared_ptr& msg, MessageStore* store)
+void QueuePolicy::enqueued(u_int64_t _size)
{
- if (checkCount(msg) || checkSize(msg)) {
- msg->releaseContent(store);
- }
+ if (maxCount) count++;
+ if (maxSize) size += _size;
}
-void QueuePolicy::dequeued(Message::shared_ptr& msg, MessageStore* /*store*/)
+void QueuePolicy::dequeued(u_int64_t _size)
{
if (maxCount) count--;
- if (maxSize) size -= msg->contentSize();
-}
-
-bool QueuePolicy::checkCount(Message::shared_ptr& /*msg*/)
-{
- return maxCount && ++count > maxCount;
+ if (maxSize) size -= _size;
}
-bool QueuePolicy::checkSize(Message::shared_ptr& msg)
+bool QueuePolicy::limitExceeded()
{
- return maxSize && (size += msg->contentSize()) > maxSize;
+ return (maxSize && size > maxSize) || (maxCount && count > maxCount);
}
void QueuePolicy::update(FieldTable& settings)
diff --git a/cpp/lib/broker/QueuePolicy.h b/cpp/lib/broker/QueuePolicy.h
index c31e9ec968..597cfe7ce8 100644
--- a/cpp/lib/broker/QueuePolicy.h
+++ b/cpp/lib/broker/QueuePolicy.h
@@ -21,7 +21,6 @@
#ifndef _QueuePolicy_
#define _QueuePolicy_
-#include <BrokerMessage.h>
#include <FieldTable.h>
namespace qpid {
@@ -37,14 +36,14 @@ namespace qpid {
u_int64_t size;
static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
- bool checkCount(Message::shared_ptr& msg);
- bool checkSize(Message::shared_ptr& msg);
+
public:
QueuePolicy(u_int32_t maxCount, u_int64_t maxSize);
QueuePolicy(const qpid::framing::FieldTable& settings);
- void enqueued(Message::shared_ptr& msg, MessageStore* store);
- void dequeued(Message::shared_ptr& msg, MessageStore* store);
+ void enqueued(u_int64_t size);
+ void dequeued(u_int64_t size);
void update(qpid::framing::FieldTable& settings);
+ bool limitExceeded();
u_int32_t getMaxCount() const { return maxCount; }
u_int64_t getMaxSize() const { return maxSize; }
};