summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-02-10 10:12:41 +0000
committerGordon Sim <gsim@apache.org>2011-02-10 10:12:41 +0000
commit731d6c4b13ed7ae5941a4b0f969be357f3d7e831 (patch)
tree5fc47c2ce19bbc0872356ef9c5f5ef073752f2cb /cpp/src/qpid/broker/Queue.cpp
parent8ead4c97b75e508a877e8d446a5bef096e606d84 (diff)
downloadqpid-python-731d6c4b13ed7ae5941a4b0f969be357f3d7e831.tar.gz
QPID-529: Priority queue implementation
QPID-2104: LVQ enhancement These both required some refactoring of the Queue class to allow cleaner implementation of different types of behaviour. The in-memory storage of messages is now abstracted out behind an interface specified by qpid::broker::Messages which qpid::broker::Queue uses. Different implementations of that are available for the standard FIFO queue, priority queues and LVQ (I have also separated out the 'legacy' implementation of LVQ from the new version driven by QPID-2104). git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1069322 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp318
1 files changed, 99 insertions, 219 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index e59857462c..43d1a2b27c 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -23,7 +23,11 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Fairshare.h"
#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/LegacyLVQ.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/MessageMap.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
@@ -66,6 +70,7 @@ const std::string qpidMaxCount("qpid.max_count");
const std::string qpidNoLocal("no-local");
const std::string qpidTraceIdentity("qpid.trace.id");
const std::string qpidTraceExclude("qpid.trace.exclude");
+const std::string qpidLastValueQueueKey("qpid.last_value_queue_key");
const std::string qpidLastValueQueue("qpid.last_value_queue");
const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
const std::string qpidPersistLastNode("qpid.persist_last_node");
@@ -92,10 +97,9 @@ Queue::Queue(const string& _name, bool _autodelete,
consumerCount(0),
exclusive(0),
noLocal(false),
- lastValueQueue(false),
- lastValueQueueNoBrowse(false),
persistLastNode(false),
inLastNodeFailure(false),
+ messages(new MessageDeque()),
persistenceId(0),
policyExceeded(false),
mgmtObject(0),
@@ -212,7 +216,7 @@ void Queue::requeue(const QueuedMessage& msg){
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.insert(lower_bound(messages.begin(), messages.end(), msg), msg);
+ messages->reinsert(msg);
listeners.populate(copy);
// for persistLastNode - don't force a message twice to disk, but force it if no force before
@@ -227,57 +231,23 @@ void Queue::requeue(const QueuedMessage& msg){
copy.notify();
}
-void Queue::clearLVQIndex(const QueuedMessage& msg){
- assertClusterSafe();
- const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0;
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
- lvq.erase(key);
- }
-}
-
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
-
- Messages::iterator i = findAt(position);
- if (i != messages.end() ) {
- message = *i;
- if (lastValueQueue) {
- clearLVQIndex(*i);
- }
- QPID_LOG(debug,
- "Acquired message at " << i->position << " from " << name);
- messages.erase(i);
+ if (messages->remove(position, message)) {
+ QPID_LOG(debug, "Acquired message at " << position << " from " << name);
return true;
- }
- QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
- return false;
+ } else {
+ QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
+ return false;
+ }
}
bool Queue::acquire(const QueuedMessage& msg) {
- Mutex::ScopedLock locker(messageLock);
- assertClusterSafe();
-
- QPID_LOG(debug, "attempting to acquire " << msg.position);
- Messages::iterator i = findAt(msg.position);
- if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
- (!lastValueQueue ||
- (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
- ) {
-
- clearLVQIndex(msg);
- QPID_LOG(debug,
- "Match found, acquire succeeded: " <<
- i->position << " == " << msg.position);
- messages.erase(i);
- return true;
- }
-
- QPID_LOG(debug, "Acquire failed for " << msg.position);
- return false;
+ QueuedMessage copy = msg;
+ return acquireMessageAt(msg.position, copy);
}
void Queue::notifyListener()
@@ -286,7 +256,7 @@ void Queue::notifyListener()
QueueListeners::NotificationSet set;
{
Mutex::ScopedLock locker(messageLock);
- if (messages.size()) {
+ if (messages->size()) {
listeners.populate(set);
}
}
@@ -315,12 +285,12 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
+ if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
} else {
- QueuedMessage msg = getFront();
+ QueuedMessage msg = messages->front();
if (msg.payload->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
popAndDequeue();
@@ -330,7 +300,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
- popMsg(msg);
+ pop();
return CONSUMED;
} else {
//message(s) are available but consumer hasn't got enough credit
@@ -356,11 +326,6 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
//consumer wants the message
c->position = msg.position;
m = msg;
- if (!lastValueQueueNoBrowse) clearLVQIndex(msg);
- if (lastValueQueue) {
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) m.payload = replacement;
- }
return true;
} else {
//browser hasn't got enough credit for the message
@@ -382,7 +347,7 @@ void Queue::removeListener(Consumer::shared_ptr c)
{
Mutex::ScopedLock locker(messageLock);
listeners.removeListener(c);
- if (messages.size()) {
+ if (messages->size()) {
listeners.populate(set);
}
}
@@ -403,52 +368,20 @@ bool Queue::dispatch(Consumer::shared_ptr c)
// Find the next message
bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
- if (!messages.empty() && messages.back().position > c->position) {
- if (c->position < getFront().position) {
- msg = getFront();
- return true;
- } else {
- Messages::iterator pos = findAt(c->position);
- if (pos != messages.end() && pos+1 != messages.end()) {
- msg = *(pos+1);
- return true;
- }
- }
+ if (messages->next(c->position, msg)) {
+ return true;
+ } else {
+ listeners.addListener(c);
+ return false;
}
- listeners.addListener(c);
- return false;
}
-Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
-
- if(!messages.empty()){
- QueuedMessage compM;
- compM.position = pos;
- unsigned long diff = pos.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
-
- Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
- if (i!= messages.end() && i->position == pos)
- return i;
- }
- return messages.end(); // no match found.
-}
-
-
QueuedMessage Queue::find(SequenceNumber pos) const {
Mutex::ScopedLock locker(messageLock);
- if(!messages.empty()){
- QueuedMessage compM;
- compM.position = pos;
- unsigned long diff = pos.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
-
- Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
- if (i != messages.end())
- return *i;
- }
- return QueuedMessage();
+ QueuedMessage msg;
+ messages->find(pos, msg);
+ return msg;
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
@@ -482,12 +415,18 @@ void Queue::cancel(Consumer::shared_ptr c){
QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
+ messages->pop(msg);
+ return msg;
+}
- if(!messages.empty()){
- msg = getFront();
- popMsg(msg);
+bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+{
+ if (message.payload->hasExpired()) {
+ expired.push_back(message);
+ return true;
+ } else {
+ return false;
}
- return msg;
}
void Queue::purgeExpired()
@@ -496,37 +435,11 @@ void Queue::purgeExpired()
//bother explicitly expiring if the rate of dequeues since last
//attempt is less than one per second.
- //Note: This method is currently called periodically on the timer
- //thread. In a clustered broker this means that the purging does
- //not occur on the cluster event dispatch thread and consequently
- //that is not totally ordered w.r.t other events (including
- //publication of messages). However the cluster does ensure that
- //the actual expiration of messages (as distinct from the removing
- //of those expired messages from the queue) *is* consistently
- //ordered w.r.t. cluster events. This means that delivery of
- //messages is in general consistent across the cluster inspite of
- //any non-determinism in the triggering of a purge. However at
- //present purging a last value queue could potentially cause
- //inconsistencies in the cluster (as the order w.r.t publications
- //can affect the order in which messages appear in the
- //queue). Consequently periodic purging of an LVQ is not enabled
- //(expired messages will be removed on delivery and consolidated
- //by key as part of normal LVQ operation).
-
- if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) {
- Messages expired;
+ if (dequeueTracker.sampleRatePerSecond() < 1) {
+ std::deque<QueuedMessage> expired;
{
Mutex::ScopedLock locker(messageLock);
- for (Messages::iterator i = messages.begin(); i != messages.end();) {
- //Re-introduce management of LVQ-specific state here
- //if purging is renabled for that case (see note above)
- if (i->payload->hasExpired()) {
- expired.push_back(*i);
- i = messages.erase(i);
- } else {
- ++i;
- }
- }
+ messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
}
for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
@@ -552,13 +465,13 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
uint32_t count = 0;
// Either purge them all or just the some (purge_count) while the queue isn't empty.
- while((!purge_request || purge_count--) && !messages.empty()) {
+ while((!purge_request || purge_count--) && !messages->empty()) {
if (dest.get()) {
//
// If there is a destination exchange, stage the messages onto a reroute queue
// so they don't wind up getting purged more than once.
//
- DeliverableMessage msg(getFront().payload);
+ DeliverableMessage msg(messages->front().payload);
rerouteQueue.push_back(msg);
}
popAndDequeue();
@@ -584,64 +497,37 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
uint32_t move_count = qty; // only comes into play if qty >0
uint32_t count = 0; // count how many were moved for returning
- while((!qty || move_count--) && !messages.empty()) {
- QueuedMessage qmsg = getFront();
+ while((!qty || move_count--) && !messages->empty()) {
+ QueuedMessage qmsg = messages->front();
boost::intrusive_ptr<Message> msg = qmsg.payload;
destq->deliver(msg); // deliver message to the destination queue
- popMsg(qmsg);
+ pop();
dequeue(0, qmsg);
count++;
}
return count;
}
-void Queue::popMsg(QueuedMessage& qmsg)
+void Queue::pop()
{
assertClusterSafe();
- const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
- lvq.erase(key);
- }
- messages.pop_front();
+ messages->pop();
++dequeueTracker;
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
QueueListeners::NotificationSet copy;
+ QueuedMessage removed;
+ bool dequeueRequired = false;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
- LVQ::iterator i;
- const framing::FieldTable* ft = msg->getApplicationHeaders();
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
-
- i = lvq.find(key);
- if (i == lvq.end() || (broker && broker->isClusterUpdatee())) {
- messages.push_back(qm);
- listeners.populate(copy);
- lvq[key] = msg;
- }else {
- boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
- if (!old) old = i->second;
- i->second->setReplacementMessage(msg,this);
- if (isRecovery) {
- //can't issue new requests for the store until
- //recovery is complete
- pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
- } else {
- Mutex::ScopedUnlock u(messageLock);
- dequeue(0, QueuedMessage(qm.queue, old, qm.position));
- }
- }
- }else {
- messages.push_back(qm);
- listeners.populate(copy);
- }
+ dequeueRequired = messages->push(qm, removed);
+ listeners.populate(copy);
+
if (eventMode) {
if (eventMgr) eventMgr->enqueued(qm);
else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
@@ -651,32 +537,20 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
}
}
copy.notify();
-}
-
-QueuedMessage Queue::getFront()
-{
- QueuedMessage msg = messages.front();
- if (lastValueQueue) {
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) msg.payload = replacement;
+ if (dequeueRequired) {
+ if (isRecovery) {
+ //can't issue new requests for the store until
+ //recovery is complete
+ pendingDequeues.push_back(removed);
+ } else {
+ dequeue(0, removed);
+ }
}
- return msg;
}
-QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
+void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
{
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) {
- const framing::FieldTable* ft = replacement->getApplicationHeaders();
- if (ft) {
- string key = ft->getAsString(qpidVQMatchProperty);
- if (lvq.find(key) != lvq.end()){
- lvq[key] = replacement;
- }
- }
- msg.payload = replacement;
- }
- return msg;
+ if (message.payload->isEnqueueComplete()) (*result)++;
}
/** function only provided for unit tests, or code not in critical message path */
@@ -684,20 +558,14 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
uint32_t count = 0;
- for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
- //NOTE: don't need to use checkLvqReplace() here as it
- //is only relevant for LVQ which does not support persistence
- //so the enqueueComplete check has no effect
- if ( i->payload->isEnqueueComplete() ) count ++;
- }
-
+ messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
return count;
}
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
- return messages.size();
+ return messages->size();
}
uint32_t Queue::getConsumerCount() const
@@ -717,21 +585,22 @@ void Queue::clearLastNodeFailure()
inLastNodeFailure = false;
}
+void Queue::forcePersistent(QueuedMessage& message)
+{
+ if(!message.payload->isStoredOnQueue(shared_from_this())) {
+ message.payload->forcePersistent();
+ if (message.payload->isForcedPersistent() ){
+ enqueue(0, message.payload);
+ }
+ }
+}
+
void Queue::setLastNodeFailure()
{
if (persistLastNode){
Mutex::ScopedLock locker(messageLock);
try {
- for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
- if (lastValueQueue) checkLvqReplace(*i);
- // don't force a message twice to disk.
- if(!i->payload->isStoredOnQueue(shared_from_this())) {
- i->payload->forcePersistent();
- if (i->payload->isForcedPersistent() ){
- enqueue(0, i->payload);
- }
- }
- }
+ messages->foreach(boost::bind(&Queue::forcePersistent, this, _1));
} catch (const std::exception& e) {
// Could not go into last node standing (for example journal not large enough)
QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what());
@@ -748,7 +617,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
if (!u.acquired) return false;
if (policy.get() && !suppressPolicyCheck) {
- Messages dequeues;
+ std::deque<QueuedMessage> dequeues;
{
Mutex::ScopedLock locker(messageLock);
policy->tryEnqueue(msg);
@@ -835,8 +704,8 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
*/
void Queue::popAndDequeue()
{
- QueuedMessage msg = getFront();
- popMsg(msg);
+ QueuedMessage msg = messages->front();
+ pop();
dequeue(0, msg);
}
@@ -885,13 +754,22 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
- lastValueQueue= _settings.get(qpidLastValueQueue);
- if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
-
- lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
- if (lastValueQueueNoBrowse){
- QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
- lastValueQueue = lastValueQueueNoBrowse;
+ std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
+ if (lvqKey.size()) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
+ messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+ } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
+ messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
+ } else if (_settings.get(qpidLastValueQueue)) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
+ messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+ } else {
+ std::auto_ptr<Messages> m = Fairshare::create(_settings);
+ if (m.get()) {
+ messages = m;
+ QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
+ }
}
persistLastNode= _settings.get(qpidPersistLastNode);
@@ -919,8 +797,8 @@ void Queue::destroy()
{
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
- while(!messages.empty()){
- DeliverableMessage msg(getFront().payload);
+ while(!messages->empty()){
+ DeliverableMessage msg(messages->front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
msg.getMessage().getApplicationHeaders());
popAndDequeue();
@@ -1198,6 +1076,8 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
}
QueueListeners& Queue::getListeners() { return listeners; }
+Messages& Queue::getMessages() { return *messages; }
+const Messages& Queue::getMessages() const { return *messages; }
void Queue::checkNotDeleted()
{