diff options
author | Gordon Sim <gsim@apache.org> | 2008-09-21 20:39:40 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-09-21 20:39:40 +0000 |
commit | ecd04f245b9cec7c61b52ac6aa60b9c37fd5749b (patch) | |
tree | 1322127f7a0da1542daff61ee1eb5c56bcecb2a3 | |
parent | b4bd86fd84d60b58eaa7d813780c4415c08deb0a (diff) | |
download | qpid-python-ecd04f245b9cec7c61b52ac6aa60b9c37fd5749b.tar.gz |
Refactoring of queue/queue-policy:
- moved some logic out of Queue.cpp into QueuePolicy.cpp
- moved QueuedMessage definition into its own header file
- added checks for requeue and dequeue
- split QueuePolicy logic into different sub classes
Added ability to request old messages to be discareded to make room for new ones when configured limit has been reached.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@697603 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 58 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 68 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.cpp | 183 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.h | 101 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuedMessage.h | 46 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueuePolicyTest.cpp | 147 |
10 files changed, 461 insertions, 155 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 235e0d35cc..797c183f2e 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -464,6 +464,7 @@ nobase_include_HEADERS = \ qpid/broker/PersistableQueue.h \ qpid/broker/Prefetch.h \ qpid/broker/QueueBindings.h \ + qpid/broker/QueuedMessage.h \ qpid/broker/QueuePolicy.h \ qpid/broker/QueueRegistry.h \ qpid/broker/RecoverableConfig.h \ diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index 18fc3ec763..5de00668b3 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -21,47 +21,33 @@ #ifndef _Consumer_ #define _Consumer_ -namespace qpid { - namespace broker { - class Queue; -}} - #include "Message.h" +#include "QueuedMessage.h" #include "OwnershipToken.h" namespace qpid { - namespace broker { - - struct QueuedMessage - { - boost::intrusive_ptr<Message> payload; - framing::SequenceNumber position; - Queue* queue; - - QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : - payload(msg), position(sn), queue(q) {} - QueuedMessage(Queue* q) : queue(q) {} - }; - +namespace broker { + +class Queue; + +class Consumer { + const bool acquires; + public: + typedef boost::shared_ptr<Consumer> shared_ptr; + + framing::SequenceNumber position; + + Consumer(bool preAcquires = true) : acquires(preAcquires) {} + bool preAcquires() const { return acquires; } + virtual bool deliver(QueuedMessage& msg) = 0; + virtual void notify() = 0; + virtual bool filter(boost::intrusive_ptr<Message>) { return true; } + virtual bool accept(boost::intrusive_ptr<Message>) { return true; } + virtual OwnershipToken* getSession() = 0; + virtual ~Consumer(){} +}; - class Consumer { - const bool acquires; - public: - typedef boost::shared_ptr<Consumer> shared_ptr; - - framing::SequenceNumber position; - - Consumer(bool preAcquires = true) : acquires(preAcquires) {} - bool preAcquires() const { return acquires; } - virtual bool deliver(QueuedMessage& msg) = 0; - virtual void notify() = 0; - virtual bool filter(boost::intrusive_ptr<Message>) { return true; } - virtual bool accept(boost::intrusive_ptr<Message>) { return true; } - virtual OwnershipToken* getSession() = 0; - virtual ~Consumer(){} - }; - } -} +}} #endif diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 530dca99a4..fb950b8a83 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -130,7 +130,7 @@ void DeliveryRecord::complete() void DeliveryRecord::accept(TransactionContext* ctxt) { if (acquired && !ended) { - queue->dequeue(ctxt, msg.payload); + queue->dequeue(ctxt, msg); setEnded(); QPID_LOG(debug, "Accepted " << id); } @@ -138,7 +138,7 @@ void DeliveryRecord::accept(TransactionContext* ctxt) { void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ if (acquired && !ended) { - queue->dequeue(ctxt, msg.payload); + queue->dequeue(ctxt, msg); } } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index befc5c4eff..8bbccda844 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -186,6 +186,8 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } void Queue::requeue(const QueuedMessage& msg){ + if (policy.get() && !policy->isEnqueued(msg)) return; + Listeners copy; { Mutex::ScopedLock locker(messageLock); @@ -415,29 +417,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ Listeners copy; { Mutex::ScopedLock locker(messageLock); - messages.push_back(QueuedMessage(this, msg, ++sequence)); - if (policy.get()) { - policy->enqueued(msg->contentSize()); - if (policy->limitExceeded()) { - if (!policyExceeded) { - policyExceeded = true; - QPID_LOG(info, "Queue size exceeded policy for " << name); - } - if (store) { - QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory"); - msg->releaseContent(store); - } else { - QPID_LOG(error, "Message " << msg << " on " << name - << " exceeds the policy for the queue but can't be released from memory as the queue is not durable"); - throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy)); - } - } else { - if (policyExceeded) { - policyExceeded = false; - QPID_LOG(info, "Queue size within policy for " << name); - } - } - } + QueuedMessage qm(this, msg, ++sequence); + if (policy.get()) policy->tryEnqueue(qm); + + messages.push_back(qm); listeners.swap(copy); } for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); @@ -486,15 +469,16 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) } // return true if store exists, -bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) +bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { + if (policy.get() && !policy->isEnqueued(msg)) return false; { Mutex::ScopedLock locker(messageLock); dequeued(msg); } - if (msg->isPersistent() && store) { - msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); + if (msg.payload->isPersistent() && store) { + msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); store->dequeue(ctxt, pmsg, *this); return true; } @@ -508,7 +492,7 @@ bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) */ void Queue::popAndDequeue() { - boost::intrusive_ptr<Message> msg = messages.front().payload; + QueuedMessage msg = messages.front(); messages.pop_front(); dequeue(0, msg); } @@ -517,15 +501,15 @@ void Queue::popAndDequeue() * Updates policy and management when a message has been dequeued, * expects messageLock to be held */ -void Queue::dequeued(boost::intrusive_ptr<Message>& msg) +void Queue::dequeued(const QueuedMessage& msg) { - if (policy.get()) policy->dequeued(msg->contentSize()); + if (policy.get()) policy->dequeued(msg); if (mgmtObject != 0){ mgmtObject->inc_msgTotalDequeues (); - mgmtObject->inc_byteTotalDequeues (msg->contentSize()); - if (msg->isPersistent ()){ + mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); + if (msg.payload->isPersistent ()){ mgmtObject->inc_msgPersistDequeues (); - mgmtObject->inc_bytePersistDequeues (msg->contentSize()); + mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); } } } @@ -551,10 +535,7 @@ void Queue::create(const FieldTable& _settings) void Queue::configure(const FieldTable& _settings) { - std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings)); - if (_policy->getMaxCount() || _policy->getMaxSize()) { - setPolicy(_policy); - } + setPolicy(QueuePolicy::createQueuePolicy(_settings)); //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue with no-local=" << noLocal); @@ -720,6 +701,19 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } +bool Queue::releaseMessageContent(const QueuedMessage& m) +{ + if (store) { + QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory"); + m.payload->releaseContent(store); + return true; + } else { + QPID_LOG(warning, "Message " << m.position << " on " << name + << " cannot be released from memory as the queue is not durable"); + return false; + } +} + ManagementObject* Queue::GetManagementObject (void) const { return (ManagementObject*) mgmtObject; diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index f9f249cda8..8f6ae0b967 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -101,7 +101,7 @@ namespace qpid { bool isExcluded(boost::intrusive_ptr<Message>& msg); - void dequeued(boost::intrusive_ptr<Message>& msg); + void dequeued(const QueuedMessage& msg); void popAndDequeue(); public: @@ -180,7 +180,7 @@ namespace qpid { /** * dequeue from store (only done once messages is acknowledged) */ - bool dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg); + bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg); /** * Gets the next available message @@ -219,6 +219,8 @@ namespace qpid { template <class F> void eachBinding(const F& f) { bindings.eachBinding(f); } + + bool releaseMessageContent(const QueuedMessage&); }; } } diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index 08838aac79..8aeaaabd55 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -19,39 +19,78 @@ * */ #include "QueuePolicy.h" +#include "Queue.h" #include "qpid/framing/FieldValue.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" using namespace qpid::broker; using namespace qpid::framing; -QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize) : - maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {} - -QueuePolicy::QueuePolicy(const FieldTable& settings) : - maxCount(getInt(settings, maxCountKey, 0)), - maxSize(getInt(settings, maxSizeKey, defaultMaxSize)), count(0), size(0) {} +QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {} void QueuePolicy::enqueued(uint64_t _size) { - if (maxCount) count++; + if (maxCount) ++count; if (maxSize) size += _size; } void QueuePolicy::dequeued(uint64_t _size) { - if (maxCount) count--; + if (maxCount) --count; if (maxSize) size -= _size; } -bool QueuePolicy::limitExceeded() +bool QueuePolicy::checkLimit(const QueuedMessage& m) +{ + bool exceeded = (maxSize && (size.get() + m.payload->contentSize()) > maxSize) || (maxCount && (count.get() + 1) > maxCount); + if (exceeded) { + if (!policyExceeded) { + policyExceeded = true; + QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName()); + } + } else { + if (policyExceeded) { + policyExceeded = false; + QPID_LOG(info, "Queue size within policy for " << m.queue->getName()); + } + } + return !exceeded; +} + +void QueuePolicy::tryEnqueue(const QueuedMessage& m) +{ + if (checkLimit(m)) { + enqueued(m); + } else { + std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue"); + throw ResourceLimitExceededException( + QPID_MSG("Policy exceeded on " << queue << " by message " << m.position + << " of size " << m.payload->contentSize() << " , policy: " << *this)); + } +} + +void QueuePolicy::enqueued(const QueuedMessage& m) +{ + enqueued(m.payload->contentSize()); +} + +void QueuePolicy::dequeued(const QueuedMessage& m) +{ + dequeued(m.payload->contentSize()); +} + +bool QueuePolicy::isEnqueued(const QueuedMessage&) { - return (maxSize && size > maxSize) || (maxCount && count > maxCount); + return true; } void QueuePolicy::update(FieldTable& settings) { if (maxCount) settings.setInt(maxCountKey, maxCount); - if (maxSize) settings.setInt(maxSizeKey, maxSize); + if (maxSize) settings.setInt(maxSizeKey, maxSize); + settings.setString(typeKey, type); } @@ -62,6 +101,17 @@ int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int else return defaultValue; } +std::string QueuePolicy::getType(const FieldTable& settings) +{ + FieldTable::ValuePtr v = settings.get(typeKey); + if (v && v->convertsTo<std::string>()) { + std::string t = v->get<std::string>(); + transform(t.begin(), t.end(), t.begin(), tolower); + if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t; + } + return REJECT; +} + void QueuePolicy::setDefaultMaxSize(uint64_t s) { defaultMaxSize = s; @@ -69,20 +119,123 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s) const std::string QueuePolicy::maxCountKey("qpid.max_count"); const std::string QueuePolicy::maxSizeKey("qpid.max_size"); +const std::string QueuePolicy::typeKey("qpid.policy_type"); +const std::string QueuePolicy::REJECT("reject"); +const std::string QueuePolicy::FLOW_TO_DISK("flow_to_disk"); +const std::string QueuePolicy::RING("ring"); +const std::string QueuePolicy::RING_STRICT("ring_strict"); uint64_t QueuePolicy::defaultMaxSize(0); +FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) : + QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {} + +bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m) +{ + return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m); +} + +RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} + +void RingQueuePolicy::enqueued(const QueuedMessage& m) +{ + QueuePolicy::enqueued(m); + qpid::sys::Mutex::ScopedLock l(lock); + queue.push_back(m); +} + +void RingQueuePolicy::dequeued(const QueuedMessage& m) +{ + qpid::sys::Mutex::ScopedLock l(lock); + QueuePolicy::dequeued(m); + //find and remove m from queue + for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) { + if (i->position == m.position) { + queue.erase(i); + break; + } + } +} + +bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) +{ + qpid::sys::Mutex::ScopedLock l(lock); + //for non-strict ring policy, a message can be dequeued before acked; need to detect this + for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) { + if (i->position == m.position) { + return true; + } + } + return false; +} + +bool RingQueuePolicy::checkLimit(const QueuedMessage& m) +{ + if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept + + QueuedMessage oldest; + { + qpid::sys::Mutex::ScopedLock l(lock); + oldest = queue.front(); + } + if (oldest.queue->acquire(oldest) || !strict) { + qpid::sys::Mutex::ScopedLock l(lock); + if (oldest.position == queue.front().position) { + queue.pop_front(); + QPID_LOG(debug, "Ring policy triggered in queue " + << (m.queue ? m.queue->getName() : std::string("unknown queue")) + << ": removed message " << oldest.position << " to make way for " << m.position); + } + return true; + } else { + QPID_LOG(debug, "Ring policy could not be triggered in queue " + << (m.queue ? m.queue->getName() : std::string("unknown queue")) + << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued"); + //in strict mode, if oldest message has been delivered (hence + //cannot be acquired) but not yet acked, it should not be + //removed and the attempted enqueue should fail + return false; + } +} + +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings) +{ + uint32_t maxCount = getInt(settings, maxCountKey, 0); + uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize); + if (maxCount || maxSize) { + return createQueuePolicy(maxCount, maxSize, getType(settings)); + } else { + return std::auto_ptr<QueuePolicy>(); + } +} + +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type) +{ + if (type == RING || type == RING_STRICT) { + return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type)); + } else if (type == FLOW_TO_DISK) { + return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize)); + } else { + return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type)); + } + +} + + namespace qpid { namespace broker { std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) { - if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size; - else out << "size unlimited, current=" << p.size; + if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get(); + else out << "size: unlimited"; out << "; "; - if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count; - else out << "count unlimited, current=" << p.count; + if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get(); + else out << "count: unlimited"; + out << "; type=" << p.type; return out; } } } + diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h index 4511a63b64..d39ce7dc11 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.h +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h @@ -21,40 +21,85 @@ #ifndef _QueuePolicy_ #define _QueuePolicy_ +#include <deque> #include <iostream> +#include <memory> +#include "QueuedMessage.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/AtomicValue.h" +#include "qpid/sys/Mutex.h" namespace qpid { - namespace broker { - class QueuePolicy - { - static const std::string maxCountKey; - static const std::string maxSizeKey; - - static uint64_t defaultMaxSize; +namespace broker { + +class QueuePolicy +{ + static uint64_t defaultMaxSize; - const uint32_t maxCount; - const uint64_t maxSize; - uint32_t count; - uint64_t size; + const uint32_t maxCount; + const uint64_t maxSize; + const std::string type; + qpid::sys::AtomicValue<uint32_t> count; + qpid::sys::AtomicValue<uint64_t> size; + bool policyExceeded; - static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); - - public: - QueuePolicy(uint32_t maxCount, uint64_t maxSize); - QueuePolicy(const qpid::framing::FieldTable& settings); - void enqueued(uint64_t size); - void dequeued(uint64_t size); - void update(qpid::framing::FieldTable& settings); - bool limitExceeded(); - uint32_t getMaxCount() const { return maxCount; } - uint64_t getMaxSize() const { return maxSize; } - - static void setDefaultMaxSize(uint64_t); - friend std::ostream& operator<<(std::ostream&, const QueuePolicy&); - }; - } -} + static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); + static std::string getType(const qpid::framing::FieldTable& settings); + + public: + static const std::string maxCountKey; + static const std::string maxSizeKey; + static const std::string typeKey; + static const std::string REJECT; + static const std::string FLOW_TO_DISK; + static const std::string RING; + static const std::string RING_STRICT; + + virtual ~QueuePolicy() {} + void tryEnqueue(const QueuedMessage&); + virtual void dequeued(const QueuedMessage&); + virtual bool isEnqueued(const QueuedMessage&); + virtual bool checkLimit(const QueuedMessage&); + void update(qpid::framing::FieldTable& settings); + uint32_t getMaxCount() const { return maxCount; } + uint64_t getMaxSize() const { return maxSize; } + + static std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings); + static std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + static void setDefaultMaxSize(uint64_t); + friend std::ostream& operator<<(std::ostream&, const QueuePolicy&); + protected: + QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + + virtual void enqueued(const QueuedMessage&); + void enqueued(uint64_t size); + void dequeued(uint64_t size); +}; + + +class FlowToDiskPolicy : public QueuePolicy +{ + public: + FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize); + bool checkLimit(const QueuedMessage&); +}; + +class RingQueuePolicy : public QueuePolicy +{ + public: + RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING); + void enqueued(const QueuedMessage&); + void dequeued(const QueuedMessage&); + bool isEnqueued(const QueuedMessage&); + bool checkLimit(const QueuedMessage&); + private: + typedef std::deque<QueuedMessage> Messages; + qpid::sys::Mutex lock; + Messages queue; + const bool strict; +}; + +}} #endif diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.h b/qpid/cpp/src/qpid/broker/QueuedMessage.h new file mode 100644 index 0000000000..82f5073d87 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/QueuedMessage.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QueuedMessage_ +#define _QueuedMessage_ + +#include "Message.h" + +namespace qpid { +namespace broker { + +class Queue; + +struct QueuedMessage +{ + boost::intrusive_ptr<Message> payload; + framing::SequenceNumber position; + Queue* queue; + + QueuedMessage() : queue(0) {} + QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : + payload(msg), position(sn), queue(q) {} + QueuedMessage(Queue* q) : queue(q) {} +}; + +}} + + +#endif diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 675a6a304c..64bb155c01 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -277,7 +277,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); } if (acquire && !ackExpected) { - queue->dequeue(0, msg.payload); + queue->dequeue(0, msg); } return true; } diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp index db88682010..4267047c3f 100644 --- a/qpid/cpp/src/tests/QueuePolicyTest.cpp +++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp @@ -19,63 +19,142 @@ * */ #include "qpid/broker/QueuePolicy.h" +#include "qpid/sys/Time.h" #include "unit_test.h" +#include "MessageUtils.h" +#include "BrokerFixture.h" using namespace qpid::broker; +using namespace qpid::client; using namespace qpid::framing; QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite) +QueuedMessage createMessage(uint32_t size) +{ + QueuedMessage msg; + msg.payload = MessageUtils::createMessage(); + MessageUtils::addContent(msg.payload, std::string (size, 'x')); + return msg; +} + + QPID_AUTO_TEST_CASE(testCount) { - QueuePolicy policy(5, 0); - BOOST_CHECK(!policy.limitExceeded()); - for (int i = 0; i < 5; i++) policy.enqueued(10); - BOOST_CHECK_EQUAL((uint64_t) 0, policy.getMaxSize()); - BOOST_CHECK_EQUAL((uint32_t) 5, policy.getMaxCount()); - BOOST_CHECK(!policy.limitExceeded()); - policy.enqueued(10); - BOOST_CHECK(policy.limitExceeded()); - policy.dequeued(10); - BOOST_CHECK(!policy.limitExceeded()); - policy.enqueued(10); - BOOST_CHECK(policy.limitExceeded()); + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(5, 0)); + BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize()); + BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount()); + + QueuedMessage msg = createMessage(10); + for (size_t i = 0; i < 5; i++) { + policy->tryEnqueue(msg); + } + try { + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on enqueuing sixth message"); + } catch (const ResourceLimitExceededException&) {} + + policy->dequeued(msg); + policy->tryEnqueue(msg); + + try { + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)"); + } catch (const ResourceLimitExceededException&) {} } QPID_AUTO_TEST_CASE(testSize) { - QueuePolicy policy(0, 50); - for (int i = 0; i < 5; i++) policy.enqueued(10); - BOOST_CHECK(!policy.limitExceeded()); - policy.enqueued(10); - BOOST_CHECK(policy.limitExceeded()); - policy.dequeued(10); - BOOST_CHECK(!policy.limitExceeded()); - policy.enqueued(10); - BOOST_CHECK(policy.limitExceeded()); + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(0, 50)); + QueuedMessage msg = createMessage(10); + + for (size_t i = 0; i < 5; i++) { + policy->tryEnqueue(msg); + } + try { + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy); + } catch (const ResourceLimitExceededException&) {} + + policy->dequeued(msg); + policy->tryEnqueue(msg); + + try { + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy); + } catch (const ResourceLimitExceededException&) {} } QPID_AUTO_TEST_CASE(testBoth) { - QueuePolicy policy(5, 50); - for (int i = 0; i < 5; i++) policy.enqueued(11); - BOOST_CHECK(policy.limitExceeded()); - policy.dequeued(20); - BOOST_CHECK(!policy.limitExceeded());//fails - policy.enqueued(5); - policy.enqueued(10); - BOOST_CHECK(policy.limitExceeded()); + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(5, 50)); + try { + QueuedMessage msg = createMessage(51); + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy); + } catch (const ResourceLimitExceededException&) {} + + std::vector<QueuedMessage> messages; + messages.push_back(createMessage(15)); + messages.push_back(createMessage(10)); + messages.push_back(createMessage(11)); + messages.push_back(createMessage(2)); + messages.push_back(createMessage(7)); + for (size_t i = 0; i < messages.size(); i++) { + policy->tryEnqueue(messages[i]); + } + //size = 45 at this point, count = 5 + try { + QueuedMessage msg = createMessage(5); + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy); + } catch (const ResourceLimitExceededException&) {} + try { + QueuedMessage msg = createMessage(10); + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy); + } catch (const ResourceLimitExceededException&) {} + + + policy->dequeued(messages[0]); + try { + QueuedMessage msg = createMessage(20); + policy->tryEnqueue(msg); + } catch (const ResourceLimitExceededException&) { + BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy); + } } QPID_AUTO_TEST_CASE(testSettings) { //test reading and writing the policy from/to field table + std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy(101, 303)); FieldTable settings; - QueuePolicy a(101, 303); - a.update(settings); - QueuePolicy b(settings); - BOOST_CHECK_EQUAL(a.getMaxCount(), b.getMaxCount()); - BOOST_CHECK_EQUAL(a.getMaxSize(), b.getMaxSize()); + a->update(settings); + std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy(settings)); + BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount()); + BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize()); +} + +QPID_AUTO_TEST_CASE(testRingPolicy) +{ + FieldTable args; + std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy(5, 0, QueuePolicy::RING); + policy->update(args); + + ProxySessionFixture f; + std::string q("my-ring-queue"); + f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); + for (int i = 0; i < 10; i++) { + f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); + } + client::Message msg; + for (int i = 5; i < 10; i++) { + BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData()); + } + BOOST_CHECK(!f.subs.get(msg, q)); } + QPID_AUTO_TEST_SUITE_END() |