summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-09-21 20:39:40 +0000
committerGordon Sim <gsim@apache.org>2008-09-21 20:39:40 +0000
commitecd04f245b9cec7c61b52ac6aa60b9c37fd5749b (patch)
tree1322127f7a0da1542daff61ee1eb5c56bcecb2a3 /qpid/cpp
parentb4bd86fd84d60b58eaa7d813780c4415c08deb0a (diff)
downloadqpid-python-ecd04f245b9cec7c61b52ac6aa60b9c37fd5749b.tar.gz
Refactoring of queue/queue-policy:
- moved some logic out of Queue.cpp into QueuePolicy.cpp - moved QueuedMessage definition into its own header file - added checks for requeue and dequeue - split QueuePolicy logic into different sub classes Added ability to request old messages to be discareded to make room for new ones when configured limit has been reached. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@697603 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/Makefile.am1
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h58
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h6
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp183
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.h101
-rw-r--r--qpid/cpp/src/qpid/broker/QueuedMessage.h46
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--qpid/cpp/src/tests/QueuePolicyTest.cpp147
10 files changed, 461 insertions, 155 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 235e0d35cc..797c183f2e 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -464,6 +464,7 @@ nobase_include_HEADERS = \
qpid/broker/PersistableQueue.h \
qpid/broker/Prefetch.h \
qpid/broker/QueueBindings.h \
+ qpid/broker/QueuedMessage.h \
qpid/broker/QueuePolicy.h \
qpid/broker/QueueRegistry.h \
qpid/broker/RecoverableConfig.h \
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index 18fc3ec763..5de00668b3 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -21,47 +21,33 @@
#ifndef _Consumer_
#define _Consumer_
-namespace qpid {
- namespace broker {
- class Queue;
-}}
-
#include "Message.h"
+#include "QueuedMessage.h"
#include "OwnershipToken.h"
namespace qpid {
- namespace broker {
-
- struct QueuedMessage
- {
- boost::intrusive_ptr<Message> payload;
- framing::SequenceNumber position;
- Queue* queue;
-
- QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
- payload(msg), position(sn), queue(q) {}
- QueuedMessage(Queue* q) : queue(q) {}
- };
-
+namespace broker {
+
+class Queue;
+
+class Consumer {
+ const bool acquires;
+ public:
+ typedef boost::shared_ptr<Consumer> shared_ptr;
+
+ framing::SequenceNumber position;
+
+ Consumer(bool preAcquires = true) : acquires(preAcquires) {}
+ bool preAcquires() const { return acquires; }
+ virtual bool deliver(QueuedMessage& msg) = 0;
+ virtual void notify() = 0;
+ virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
+ virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
+ virtual OwnershipToken* getSession() = 0;
+ virtual ~Consumer(){}
+};
- class Consumer {
- const bool acquires;
- public:
- typedef boost::shared_ptr<Consumer> shared_ptr;
-
- framing::SequenceNumber position;
-
- Consumer(bool preAcquires = true) : acquires(preAcquires) {}
- bool preAcquires() const { return acquires; }
- virtual bool deliver(QueuedMessage& msg) = 0;
- virtual void notify() = 0;
- virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
- virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
- virtual OwnershipToken* getSession() = 0;
- virtual ~Consumer(){}
- };
- }
-}
+}}
#endif
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 530dca99a4..fb950b8a83 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -130,7 +130,7 @@ void DeliveryRecord::complete()
void DeliveryRecord::accept(TransactionContext* ctxt) {
if (acquired && !ended) {
- queue->dequeue(ctxt, msg.payload);
+ queue->dequeue(ctxt, msg);
setEnded();
QPID_LOG(debug, "Accepted " << id);
}
@@ -138,7 +138,7 @@ void DeliveryRecord::accept(TransactionContext* ctxt) {
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
if (acquired && !ended) {
- queue->dequeue(ctxt, msg.payload);
+ queue->dequeue(ctxt, msg);
}
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index befc5c4eff..8bbccda844 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -186,6 +186,8 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
}
void Queue::requeue(const QueuedMessage& msg){
+ if (policy.get() && !policy->isEnqueued(msg)) return;
+
Listeners copy;
{
Mutex::ScopedLock locker(messageLock);
@@ -415,29 +417,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
Listeners copy;
{
Mutex::ScopedLock locker(messageLock);
- messages.push_back(QueuedMessage(this, msg, ++sequence));
- if (policy.get()) {
- policy->enqueued(msg->contentSize());
- if (policy->limitExceeded()) {
- if (!policyExceeded) {
- policyExceeded = true;
- QPID_LOG(info, "Queue size exceeded policy for " << name);
- }
- if (store) {
- QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory");
- msg->releaseContent(store);
- } else {
- QPID_LOG(error, "Message " << msg << " on " << name
- << " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
- throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
- }
- } else {
- if (policyExceeded) {
- policyExceeded = false;
- QPID_LOG(info, "Queue size within policy for " << name);
- }
- }
- }
+ QueuedMessage qm(this, msg, ++sequence);
+ if (policy.get()) policy->tryEnqueue(qm);
+
+ messages.push_back(qm);
listeners.swap(copy);
}
for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
@@ -486,15 +469,16 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
}
// return true if store exists,
-bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
+bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
+ if (policy.get() && !policy->isEnqueued(msg)) return false;
{
Mutex::ScopedLock locker(messageLock);
dequeued(msg);
}
- if (msg->isPersistent() && store) {
- msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
+ if (msg.payload->isPersistent() && store) {
+ msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
store->dequeue(ctxt, pmsg, *this);
return true;
}
@@ -508,7 +492,7 @@ bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
*/
void Queue::popAndDequeue()
{
- boost::intrusive_ptr<Message> msg = messages.front().payload;
+ QueuedMessage msg = messages.front();
messages.pop_front();
dequeue(0, msg);
}
@@ -517,15 +501,15 @@ void Queue::popAndDequeue()
* Updates policy and management when a message has been dequeued,
* expects messageLock to be held
*/
-void Queue::dequeued(boost::intrusive_ptr<Message>& msg)
+void Queue::dequeued(const QueuedMessage& msg)
{
- if (policy.get()) policy->dequeued(msg->contentSize());
+ if (policy.get()) policy->dequeued(msg);
if (mgmtObject != 0){
mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg->contentSize());
- if (msg->isPersistent ()){
+ mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
+ if (msg.payload->isPersistent ()){
mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+ mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
}
}
}
@@ -551,10 +535,7 @@ void Queue::create(const FieldTable& _settings)
void Queue::configure(const FieldTable& _settings)
{
- std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings));
- if (_policy->getMaxCount() || _policy->getMaxSize()) {
- setPolicy(_policy);
- }
+ setPolicy(QueuePolicy::createQueuePolicy(_settings));
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
@@ -720,6 +701,19 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
+bool Queue::releaseMessageContent(const QueuedMessage& m)
+{
+ if (store) {
+ QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
+ m.payload->releaseContent(store);
+ return true;
+ } else {
+ QPID_LOG(warning, "Message " << m.position << " on " << name
+ << " cannot be released from memory as the queue is not durable");
+ return false;
+ }
+}
+
ManagementObject* Queue::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index f9f249cda8..8f6ae0b967 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -101,7 +101,7 @@ namespace qpid {
bool isExcluded(boost::intrusive_ptr<Message>& msg);
- void dequeued(boost::intrusive_ptr<Message>& msg);
+ void dequeued(const QueuedMessage& msg);
void popAndDequeue();
public:
@@ -180,7 +180,7 @@ namespace qpid {
/**
* dequeue from store (only done once messages is acknowledged)
*/
- bool dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
+ bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
/**
* Gets the next available message
@@ -219,6 +219,8 @@ namespace qpid {
template <class F> void eachBinding(const F& f) {
bindings.eachBinding(f);
}
+
+ bool releaseMessageContent(const QueuedMessage&);
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index 08838aac79..8aeaaabd55 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -19,39 +19,78 @@
*
*/
#include "QueuePolicy.h"
+#include "Queue.h"
#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
using namespace qpid::broker;
using namespace qpid::framing;
-QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize) :
- maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {}
-
-QueuePolicy::QueuePolicy(const FieldTable& settings) :
- maxCount(getInt(settings, maxCountKey, 0)),
- maxSize(getInt(settings, maxSizeKey, defaultMaxSize)), count(0), size(0) {}
+QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {}
void QueuePolicy::enqueued(uint64_t _size)
{
- if (maxCount) count++;
+ if (maxCount) ++count;
if (maxSize) size += _size;
}
void QueuePolicy::dequeued(uint64_t _size)
{
- if (maxCount) count--;
+ if (maxCount) --count;
if (maxSize) size -= _size;
}
-bool QueuePolicy::limitExceeded()
+bool QueuePolicy::checkLimit(const QueuedMessage& m)
+{
+ bool exceeded = (maxSize && (size.get() + m.payload->contentSize()) > maxSize) || (maxCount && (count.get() + 1) > maxCount);
+ if (exceeded) {
+ if (!policyExceeded) {
+ policyExceeded = true;
+ QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName());
+ }
+ } else {
+ if (policyExceeded) {
+ policyExceeded = false;
+ QPID_LOG(info, "Queue size within policy for " << m.queue->getName());
+ }
+ }
+ return !exceeded;
+}
+
+void QueuePolicy::tryEnqueue(const QueuedMessage& m)
+{
+ if (checkLimit(m)) {
+ enqueued(m);
+ } else {
+ std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue");
+ throw ResourceLimitExceededException(
+ QPID_MSG("Policy exceeded on " << queue << " by message " << m.position
+ << " of size " << m.payload->contentSize() << " , policy: " << *this));
+ }
+}
+
+void QueuePolicy::enqueued(const QueuedMessage& m)
+{
+ enqueued(m.payload->contentSize());
+}
+
+void QueuePolicy::dequeued(const QueuedMessage& m)
+{
+ dequeued(m.payload->contentSize());
+}
+
+bool QueuePolicy::isEnqueued(const QueuedMessage&)
{
- return (maxSize && size > maxSize) || (maxCount && count > maxCount);
+ return true;
}
void QueuePolicy::update(FieldTable& settings)
{
if (maxCount) settings.setInt(maxCountKey, maxCount);
- if (maxSize) settings.setInt(maxSizeKey, maxSize);
+ if (maxSize) settings.setInt(maxSizeKey, maxSize);
+ settings.setString(typeKey, type);
}
@@ -62,6 +101,17 @@ int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int
else return defaultValue;
}
+std::string QueuePolicy::getType(const FieldTable& settings)
+{
+ FieldTable::ValuePtr v = settings.get(typeKey);
+ if (v && v->convertsTo<std::string>()) {
+ std::string t = v->get<std::string>();
+ transform(t.begin(), t.end(), t.begin(), tolower);
+ if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
+ }
+ return REJECT;
+}
+
void QueuePolicy::setDefaultMaxSize(uint64_t s)
{
defaultMaxSize = s;
@@ -69,20 +119,123 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s)
const std::string QueuePolicy::maxCountKey("qpid.max_count");
const std::string QueuePolicy::maxSizeKey("qpid.max_size");
+const std::string QueuePolicy::typeKey("qpid.policy_type");
+const std::string QueuePolicy::REJECT("reject");
+const std::string QueuePolicy::FLOW_TO_DISK("flow_to_disk");
+const std::string QueuePolicy::RING("ring");
+const std::string QueuePolicy::RING_STRICT("ring_strict");
uint64_t QueuePolicy::defaultMaxSize(0);
+FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
+ QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {}
+
+bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m)
+{
+ return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m);
+}
+
+RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
+
+void RingQueuePolicy::enqueued(const QueuedMessage& m)
+{
+ QueuePolicy::enqueued(m);
+ qpid::sys::Mutex::ScopedLock l(lock);
+ queue.push_back(m);
+}
+
+void RingQueuePolicy::dequeued(const QueuedMessage& m)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ QueuePolicy::dequeued(m);
+ //find and remove m from queue
+ for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
+ if (i->position == m.position) {
+ queue.erase(i);
+ break;
+ }
+ }
+}
+
+bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ //for non-strict ring policy, a message can be dequeued before acked; need to detect this
+ for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
+ if (i->position == m.position) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
+{
+ if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
+
+ QueuedMessage oldest;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ oldest = queue.front();
+ }
+ if (oldest.queue->acquire(oldest) || !strict) {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (oldest.position == queue.front().position) {
+ queue.pop_front();
+ QPID_LOG(debug, "Ring policy triggered in queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ << ": removed message " << oldest.position << " to make way for " << m.position);
+ }
+ return true;
+ } else {
+ QPID_LOG(debug, "Ring policy could not be triggered in queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
+ //in strict mode, if oldest message has been delivered (hence
+ //cannot be acquired) but not yet acked, it should not be
+ //removed and the attempted enqueue should fail
+ return false;
+ }
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings)
+{
+ uint32_t maxCount = getInt(settings, maxCountKey, 0);
+ uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize);
+ if (maxCount || maxSize) {
+ return createQueuePolicy(maxCount, maxSize, getType(settings));
+ } else {
+ return std::auto_ptr<QueuePolicy>();
+ }
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+{
+ if (type == RING || type == RING_STRICT) {
+ return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type));
+ } else if (type == FLOW_TO_DISK) {
+ return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize));
+ } else {
+ return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type));
+ }
+
+}
+
+
namespace qpid {
namespace broker {
std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
{
- if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
- else out << "size unlimited, current=" << p.size;
+ if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get();
+ else out << "size: unlimited";
out << "; ";
- if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
- else out << "count unlimited, current=" << p.count;
+ if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get();
+ else out << "count: unlimited";
+ out << "; type=" << p.type;
return out;
}
}
}
+
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h
index 4511a63b64..d39ce7dc11 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.h
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h
@@ -21,40 +21,85 @@
#ifndef _QueuePolicy_
#define _QueuePolicy_
+#include <deque>
#include <iostream>
+#include <memory>
+#include "QueuedMessage.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/Mutex.h"
namespace qpid {
- namespace broker {
- class QueuePolicy
- {
- static const std::string maxCountKey;
- static const std::string maxSizeKey;
-
- static uint64_t defaultMaxSize;
+namespace broker {
+
+class QueuePolicy
+{
+ static uint64_t defaultMaxSize;
- const uint32_t maxCount;
- const uint64_t maxSize;
- uint32_t count;
- uint64_t size;
+ const uint32_t maxCount;
+ const uint64_t maxSize;
+ const std::string type;
+ qpid::sys::AtomicValue<uint32_t> count;
+ qpid::sys::AtomicValue<uint64_t> size;
+ bool policyExceeded;
- static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
-
- public:
- QueuePolicy(uint32_t maxCount, uint64_t maxSize);
- QueuePolicy(const qpid::framing::FieldTable& settings);
- void enqueued(uint64_t size);
- void dequeued(uint64_t size);
- void update(qpid::framing::FieldTable& settings);
- bool limitExceeded();
- uint32_t getMaxCount() const { return maxCount; }
- uint64_t getMaxSize() const { return maxSize; }
-
- static void setDefaultMaxSize(uint64_t);
- friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
- };
- }
-}
+ static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
+ static std::string getType(const qpid::framing::FieldTable& settings);
+
+ public:
+ static const std::string maxCountKey;
+ static const std::string maxSizeKey;
+ static const std::string typeKey;
+ static const std::string REJECT;
+ static const std::string FLOW_TO_DISK;
+ static const std::string RING;
+ static const std::string RING_STRICT;
+
+ virtual ~QueuePolicy() {}
+ void tryEnqueue(const QueuedMessage&);
+ virtual void dequeued(const QueuedMessage&);
+ virtual bool isEnqueued(const QueuedMessage&);
+ virtual bool checkLimit(const QueuedMessage&);
+ void update(qpid::framing::FieldTable& settings);
+ uint32_t getMaxCount() const { return maxCount; }
+ uint64_t getMaxSize() const { return maxSize; }
+
+ static std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings);
+ static std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ static void setDefaultMaxSize(uint64_t);
+ friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
+ protected:
+ QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+
+ virtual void enqueued(const QueuedMessage&);
+ void enqueued(uint64_t size);
+ void dequeued(uint64_t size);
+};
+
+
+class FlowToDiskPolicy : public QueuePolicy
+{
+ public:
+ FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize);
+ bool checkLimit(const QueuedMessage&);
+};
+
+class RingQueuePolicy : public QueuePolicy
+{
+ public:
+ RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
+ void enqueued(const QueuedMessage&);
+ void dequeued(const QueuedMessage&);
+ bool isEnqueued(const QueuedMessage&);
+ bool checkLimit(const QueuedMessage&);
+ private:
+ typedef std::deque<QueuedMessage> Messages;
+ qpid::sys::Mutex lock;
+ Messages queue;
+ const bool strict;
+};
+
+}}
#endif
diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.h b/qpid/cpp/src/qpid/broker/QueuedMessage.h
new file mode 100644
index 0000000000..82f5073d87
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/QueuedMessage.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueuedMessage_
+#define _QueuedMessage_
+
+#include "Message.h"
+
+namespace qpid {
+namespace broker {
+
+class Queue;
+
+struct QueuedMessage
+{
+ boost::intrusive_ptr<Message> payload;
+ framing::SequenceNumber position;
+ Queue* queue;
+
+ QueuedMessage() : queue(0) {}
+ QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
+ payload(msg), position(sn), queue(q) {}
+ QueuedMessage(Queue* q) : queue(q) {}
+};
+
+}}
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 675a6a304c..64bb155c01 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -277,7 +277,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
}
if (acquire && !ackExpected) {
- queue->dequeue(0, msg.payload);
+ queue->dequeue(0, msg);
}
return true;
}
diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp
index db88682010..4267047c3f 100644
--- a/qpid/cpp/src/tests/QueuePolicyTest.cpp
+++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp
@@ -19,63 +19,142 @@
*
*/
#include "qpid/broker/QueuePolicy.h"
+#include "qpid/sys/Time.h"
#include "unit_test.h"
+#include "MessageUtils.h"
+#include "BrokerFixture.h"
using namespace qpid::broker;
+using namespace qpid::client;
using namespace qpid::framing;
QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite)
+QueuedMessage createMessage(uint32_t size)
+{
+ QueuedMessage msg;
+ msg.payload = MessageUtils::createMessage();
+ MessageUtils::addContent(msg.payload, std::string (size, 'x'));
+ return msg;
+}
+
+
QPID_AUTO_TEST_CASE(testCount)
{
- QueuePolicy policy(5, 0);
- BOOST_CHECK(!policy.limitExceeded());
- for (int i = 0; i < 5; i++) policy.enqueued(10);
- BOOST_CHECK_EQUAL((uint64_t) 0, policy.getMaxSize());
- BOOST_CHECK_EQUAL((uint32_t) 5, policy.getMaxCount());
- BOOST_CHECK(!policy.limitExceeded());
- policy.enqueued(10);
- BOOST_CHECK(policy.limitExceeded());
- policy.dequeued(10);
- BOOST_CHECK(!policy.limitExceeded());
- policy.enqueued(10);
- BOOST_CHECK(policy.limitExceeded());
+ std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(5, 0));
+ BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize());
+ BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount());
+
+ QueuedMessage msg = createMessage(10);
+ for (size_t i = 0; i < 5; i++) {
+ policy->tryEnqueue(msg);
+ }
+ try {
+ policy->tryEnqueue(msg);
+ BOOST_FAIL("Policy did not fail on enqueuing sixth message");
+ } catch (const ResourceLimitExceededException&) {}
+
+ policy->dequeued(msg);
+ policy->tryEnqueue(msg);
+
+ try {
+ policy->tryEnqueue(msg);
+ BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)");
+ } catch (const ResourceLimitExceededException&) {}
}
QPID_AUTO_TEST_CASE(testSize)
{
- QueuePolicy policy(0, 50);
- for (int i = 0; i < 5; i++) policy.enqueued(10);
- BOOST_CHECK(!policy.limitExceeded());
- policy.enqueued(10);
- BOOST_CHECK(policy.limitExceeded());
- policy.dequeued(10);
- BOOST_CHECK(!policy.limitExceeded());
- policy.enqueued(10);
- BOOST_CHECK(policy.limitExceeded());
+ std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(0, 50));
+ QueuedMessage msg = createMessage(10);
+
+ for (size_t i = 0; i < 5; i++) {
+ policy->tryEnqueue(msg);
+ }
+ try {
+ policy->tryEnqueue(msg);
+ BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
+ } catch (const ResourceLimitExceededException&) {}
+
+ policy->dequeued(msg);
+ policy->tryEnqueue(msg);
+
+ try {
+ policy->tryEnqueue(msg);
+ BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy);
+ } catch (const ResourceLimitExceededException&) {}
}
QPID_AUTO_TEST_CASE(testBoth)
{
- QueuePolicy policy(5, 50);
- for (int i = 0; i < 5; i++) policy.enqueued(11);
- BOOST_CHECK(policy.limitExceeded());
- policy.dequeued(20);
- BOOST_CHECK(!policy.limitExceeded());//fails
- policy.enqueued(5);
- policy.enqueued(10);
- BOOST_CHECK(policy.limitExceeded());
+ std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(5, 50));
+ try {
+ QueuedMessage msg = createMessage(51);
+ policy->tryEnqueue(msg);
+ BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy);
+ } catch (const ResourceLimitExceededException&) {}
+
+ std::vector<QueuedMessage> messages;
+ messages.push_back(createMessage(15));
+ messages.push_back(createMessage(10));
+ messages.push_back(createMessage(11));
+ messages.push_back(createMessage(2));
+ messages.push_back(createMessage(7));
+ for (size_t i = 0; i < messages.size(); i++) {
+ policy->tryEnqueue(messages[i]);
+ }
+ //size = 45 at this point, count = 5
+ try {
+ QueuedMessage msg = createMessage(5);
+ policy->tryEnqueue(msg);
+ BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy);
+ } catch (const ResourceLimitExceededException&) {}
+ try {
+ QueuedMessage msg = createMessage(10);
+ policy->tryEnqueue(msg);
+ BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
+ } catch (const ResourceLimitExceededException&) {}
+
+
+ policy->dequeued(messages[0]);
+ try {
+ QueuedMessage msg = createMessage(20);
+ policy->tryEnqueue(msg);
+ } catch (const ResourceLimitExceededException&) {
+ BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy);
+ }
}
QPID_AUTO_TEST_CASE(testSettings)
{
//test reading and writing the policy from/to field table
+ std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy(101, 303));
FieldTable settings;
- QueuePolicy a(101, 303);
- a.update(settings);
- QueuePolicy b(settings);
- BOOST_CHECK_EQUAL(a.getMaxCount(), b.getMaxCount());
- BOOST_CHECK_EQUAL(a.getMaxSize(), b.getMaxSize());
+ a->update(settings);
+ std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy(settings));
+ BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount());
+ BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize());
+}
+
+QPID_AUTO_TEST_CASE(testRingPolicy)
+{
+ FieldTable args;
+ std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy(5, 0, QueuePolicy::RING);
+ policy->update(args);
+
+ ProxySessionFixture f;
+ std::string q("my-ring-queue");
+ f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
+ for (int i = 0; i < 10; i++) {
+ f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ client::Message msg;
+ for (int i = 5; i < 10; i++) {
+ BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData());
+ }
+ BOOST_CHECK(!f.subs.get(msg, q));
}
+
QPID_AUTO_TEST_SUITE_END()