diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Fairshare.cpp | 87 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Fairshare.h | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageDeque.cpp | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageDeque.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PriorityQueue.cpp | 177 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PriorityQueue.h | 45 |
6 files changed, 202 insertions, 153 deletions
diff --git a/qpid/cpp/src/qpid/broker/Fairshare.cpp b/qpid/cpp/src/qpid/broker/Fairshare.cpp index c30b64c7ae..7cdad1a44f 100644 --- a/qpid/cpp/src/qpid/broker/Fairshare.cpp +++ b/qpid/cpp/src/qpid/broker/Fairshare.cpp @@ -23,7 +23,6 @@ #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" -#include <algorithm> #include <boost/format.hpp> #include <boost/lexical_cast.hpp> #include <boost/assign/list_of.hpp> @@ -33,7 +32,7 @@ namespace broker { Fairshare::Fairshare(size_t levels, uint limit) : PriorityQueue(levels), - limits(levels, limit), counts(levels, 0) {} + limits(levels, limit), priority(levels-1), count(0) {} void Fairshare::setLimit(size_t level, uint limit) @@ -41,63 +40,70 @@ void Fairshare::setLimit(size_t level, uint limit) limits[level] = limit; } +bool Fairshare::limitReached() +{ + uint l = limits[priority]; + return l && ++count > l; +} + +uint Fairshare::currentLevel() +{ + if (limitReached()) { + return nextLevel(); + } else { + return priority; + } +} + +uint Fairshare::nextLevel() +{ + count = 1; + if (priority) --priority; + else priority = levels-1; + return priority; +} + bool Fairshare::isNull() { for (int i = 0; i < levels; i++) if (limits[i]) return false; return true; } -bool Fairshare::getState(qpid::framing::FieldTable& state) const +bool Fairshare::getState(uint& p, uint& c) const { - for (int i = 0; i < levels; i++) { - if (counts[i]) { - std::string key = (boost::format("fairshare-count-%1%") % i).str(); - state.setInt(key, counts[i]); - } - } + p = priority; + c = count; return true; } -bool Fairshare::checkLevel(uint level) +bool Fairshare::setState(uint p, uint c) { - if (!limits[level] || counts[level] < limits[level]) { - counts[level]++; - return true; - } else { - return false; - } + priority = p; + count = c; + return true; } -bool Fairshare::consume(QueuedMessage& message) +bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages) { - for (Available::iterator i = available.begin(); i != available.end(); ++i) { - QueuedMessage* next = *i; - if (checkLevel(getPriorityLevel(*next))) { - messages[next->position].status = QueuedMessage::ACQUIRED; - message = *next; - available.erase(i); - return true; - } - } - if (!available.empty()) { - std::fill(counts.begin(), counts.end(), 0);//reset counts - return consume(message); - } else { - return false; - } + const uint start = p = currentLevel(); + do { + if (!messages[p].empty()) return true; + } while ((p = nextLevel()) != start); + return false; } -bool Fairshare::getState(const Messages& m, qpid::framing::FieldTable& counts) + +bool Fairshare::getState(const Messages& m, uint& priority, uint& count) { const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m); - return fairshare && fairshare->getState(counts); + return fairshare && fairshare->getState(priority, count); } -bool Fairshare::setState(Messages& m, const qpid::framing::FieldTable& counts) +bool Fairshare::setState(Messages& m, uint priority, uint count) { Fairshare* fairshare = dynamic_cast<Fairshare*>(&m); - return fairshare && fairshare->setState(counts); + return fairshare && fairshare->setState(priority, count); } int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys) @@ -130,14 +136,7 @@ int getIntegerSettingForKey(const qpid::framing::FieldTable& settings, const std { return getIntegerSetting(settings, boost::assign::list_of<std::string>(key)); } -bool Fairshare::setState(const qpid::framing::FieldTable& state) -{ - for (int i = 0; i < levels; i++) { - std::string key = (boost::format("fairshare-count-%1%") % i).str(); - counts[i] = state.isSet(key) ? getIntegerSettingForKey(state, key) : 0; - } - return true; -} + int getSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys, int minvalue, int maxvalue) { return std::max(minvalue,std::min(getIntegerSetting(settings, keys), maxvalue)); diff --git a/qpid/cpp/src/qpid/broker/Fairshare.h b/qpid/cpp/src/qpid/broker/Fairshare.h index dfcbdf280e..1b25721e0c 100644 --- a/qpid/cpp/src/qpid/broker/Fairshare.h +++ b/qpid/cpp/src/qpid/broker/Fairshare.h @@ -22,7 +22,6 @@ * */ #include "qpid/broker/PriorityQueue.h" -#include <vector> namespace qpid { namespace framing { @@ -39,19 +38,23 @@ class Fairshare : public PriorityQueue { public: Fairshare(size_t levels, uint limit); - bool getState(qpid::framing::FieldTable& counts) const; - bool setState(const qpid::framing::FieldTable& counts); + bool getState(uint& priority, uint& count) const; + bool setState(uint priority, uint count); void setLimit(size_t level, uint limit); bool isNull(); - bool consume(QueuedMessage&); static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings); - static bool getState(const Messages&, qpid::framing::FieldTable& counts); - static bool setState(Messages&, const qpid::framing::FieldTable& counts); + static bool getState(const Messages&, uint& priority, uint& count); + static bool setState(Messages&, uint priority, uint count); private: std::vector<uint> limits; - std::vector<uint> counts; - bool checkLevel(uint level); + uint priority; + uint count; + + uint currentLevel(); + uint nextLevel(); + bool limitReached(); + bool findFrontLevel(uint& p, PriorityLevels&); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp index 709d99876b..f70c996975 100644 --- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp +++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/MessageDeque.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/log/Statement.h" +#include "assert.h" namespace qpid { namespace broker { @@ -39,7 +40,7 @@ size_t MessageDeque::index(const framing::SequenceNumber& position) bool MessageDeque::deleted(const QueuedMessage& m) { size_t i = index(m.position); - if (i < messages.size()) { + if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) { messages[i].status = QueuedMessage::DELETED; clean(); return true; @@ -53,7 +54,7 @@ size_t MessageDeque::size() return available; } -void MessageDeque::release(const QueuedMessage& message) +QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message) { size_t i = index(message.position); if (i < messages.size()) { @@ -62,12 +63,17 @@ void MessageDeque::release(const QueuedMessage& message) if (head > i) head = i; m.status = QueuedMessage::AVAILABLE; ++available; + return &messages[i]; } } else { + assert(0); QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")"); } + return 0; } +void MessageDeque::release(const QueuedMessage& message) { releasePtr(message); } + bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { if (position < messages.front().position) return false; @@ -129,8 +135,7 @@ QueuedMessage padding(uint32_t pos) { } } // namespace -bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) -{ +QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) { //add padding to prevent gaps in sequence, which break the index //calculation (needed for queue replication) while (messages.size() && (added.position - messages.back().position) > 1) @@ -139,7 +144,12 @@ bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed* messages.back().status = QueuedMessage::AVAILABLE; if (head >= messages.size()) head = messages.size() - 1; ++available; - return false;//adding a message never causes one to be removed for deque + return &messages.back(); +} + +bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { + pushPtr(added); + return false; // adding a message never causes one to be removed for deque } void MessageDeque::updateAcquired(const QueuedMessage& acquired) diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h index bb5943b09b..9b53716d4e 100644 --- a/qpid/cpp/src/qpid/broker/MessageDeque.h +++ b/qpid/cpp/src/qpid/broker/MessageDeque.h @@ -48,6 +48,12 @@ class MessageDeque : public Messages void foreach(Functor); void removeIf(Predicate); + // For use by other Messages implementations that use MessageDeque as a FIFO index + // and keep pointers to its elements in their own indexing strctures. + void clean(); + QueuedMessage* releasePtr(const QueuedMessage&); + QueuedMessage* pushPtr(const QueuedMessage& added); + private: typedef std::deque<QueuedMessage> Deque; Deque messages; @@ -55,7 +61,6 @@ class MessageDeque : public Messages size_t head; size_t index(const framing::SequenceNumber&); - void clean(); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp index da52675a29..ab5ec7235a 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp @@ -3,13 +3,13 @@ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownersip. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,133 +22,124 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" #include <cmath> namespace qpid { namespace broker { -PriorityQueue::PriorityQueue(int l) : - levels(l) {} +PriorityQueue::PriorityQueue(int l) : + levels(l), + messages(levels, Deque()), + frontLevel(0), haveFront(false), cached(false) {} -bool PriorityQueue::deleted(const QueuedMessage& message) -{ - Index::iterator i = messages.find(message.position); - if (i != messages.end()) { - //remove from available list if necessary - if (i->second.status == QueuedMessage::AVAILABLE) { - Available::iterator j = std::find(available.begin(), available.end(), &i->second); - if (j != available.end()) available.erase(j); - } - //remove from messages map - messages.erase(i); - return true; - } else { - return false; - } +bool PriorityQueue::deleted(const QueuedMessage& qm) { + bool deleted = fifo.deleted(qm); + if (deleted) erase(qm); + return deleted; } size_t PriorityQueue::size() { - return available.size(); + return fifo.size(); +} + +namespace { +bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; } } void PriorityQueue::release(const QueuedMessage& message) { - Index::iterator i = messages.find(message.position); - if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) { - i->second.status = QueuedMessage::AVAILABLE; - //insert message back into the correct place in available queue, based on priority: - Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2)); - available.insert(j, &i->second); + QueuedMessage* qm = fifo.releasePtr(message); + if (qm) { + uint p = getPriorityLevel(message); + messages[p].insert( + lower_bound(messages[p].begin(), messages[p].end(), qm, before), qm); + clearCache(); + } +} + + +void PriorityQueue::erase(const QueuedMessage& qm) { + size_t i = getPriorityLevel(qm); + if (!messages[i].empty()) { + long diff = qm.position.getValue() - messages[i].front()->position.getValue(); + if (diff < 0) return; + long maxEnd = std::min(size_t(diff), messages[i].size()); + QueuedMessage mutableQm = qm; // need non-const qm for lower_bound + Deque::iterator l = + lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, &mutableQm, before); + if (l != messages[i].end() && (*l)->position == qm.position) { + messages[i].erase(l); + clearCache(); + return; + } } } bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { - Index::iterator i = messages.find(position); - if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { - i->second.status = QueuedMessage::ACQUIRED; - message = i->second; - //remove it from available list (could make this faster by using ordering): - Available::iterator j = std::find(available.begin(), available.end(), &i->second); - assert(j != available.end()); - available.erase(j); - return true; - } else { - return false; - } + bool acquired = fifo.acquire(position, message); + if (acquired) erase(message); // No longer available + return acquired; } bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message) { - Index::iterator i = messages.find(position); - if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { - message = i->second; - return true; - } else { - return false; - } + return fifo.find(position, message); } -bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) +bool PriorityQueue::browse( + const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { - Index::iterator i = messages.lower_bound(position+1); - if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) { - message = i->second; - return true; - } else { - return false; - } + return fifo.browse(position, message, unacquired); } bool PriorityQueue::consume(QueuedMessage& message) { - if (!available.empty()) { - QueuedMessage* next = available.front(); - messages[next->position].status = QueuedMessage::ACQUIRED; - message = *next; - available.pop_front(); + if (checkFront()) { + QueuedMessage* pm = messages[frontLevel].front(); + messages[frontLevel].pop_front(); + clearCache(); + pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index + message = *pm; return true; } else { return false; } } -bool PriorityQueue::compare(const QueuedMessage* a, const QueuedMessage* b) const +bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { - int priorityA = getPriorityLevel(*a); - int priorityB = getPriorityLevel(*b); - if (priorityA == priorityB) return a->position < b->position; - else return priorityA > priorityB; + QueuedMessage* qmp = fifo.pushPtr(added); + messages[getPriorityLevel(added)].push_back(qmp); + clearCache(); + return false; // Adding a message never causes one to be removed for deque } -bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) -{ - Index::iterator i = messages.insert(Index::value_type(added.position, added)).first; - i->second.status = QueuedMessage::AVAILABLE; - //insert message into the correct place in available queue, based on priority: - Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2)); - available.insert(j, &i->second); - return false;//adding a message never causes one to be removed for deque +void PriorityQueue::updateAcquired(const QueuedMessage& acquired) { + fifo.updateAcquired(acquired); } void PriorityQueue::foreach(Functor f) { - for (Available::iterator i = available.begin(); i != available.end(); ++i) { - f(**i); - } + fifo.foreach(f); } void PriorityQueue::removeIf(Predicate p) { - for (Available::iterator i = available.begin(); i != available.end();) { - if (p(**i)) { - messages[(*i)->position].status = QueuedMessage::REMOVED; - i = available.erase(i); - } else { - ++i; + for (int priority = 0; priority < levels; ++priority) { + for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) { + if (p(**i)) { + (*i)->status = QueuedMessage::DELETED; // Updates fifo index + i = messages[priority].erase(i); + clearCache(); + } else { + ++i; + } } } + fifo.clean(); } uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const @@ -161,6 +152,30 @@ uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const return std::min(priority - firstLevel, (uint)levels-1); } +void PriorityQueue::clearCache() +{ + cached = false; +} + +bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m) +{ + for (int p = levels-1; p >= 0; --p) { + if (!m[p].empty()) { + l = p; + return true; + } + } + return false; +} + +bool PriorityQueue::checkFront() +{ + if (!cached) { + haveFront = findFrontLevel(frontLevel, messages); + cached = true; + } + return haveFront; +} uint PriorityQueue::getPriority(const QueuedMessage& message) { diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h index 590cf68003..8628745db1 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.h +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,10 +21,10 @@ * under the License. * */ -#include "qpid/broker/Messages.h" +#include "qpid/broker/MessageDeque.h" #include "qpid/sys/IntegerTypes.h" -#include <list> -#include <map> +#include <deque> +#include <vector> namespace qpid { namespace broker { @@ -32,7 +32,10 @@ namespace broker { /** * Basic priority queue with a configurable number of recognised * priority levels. This is implemented as a separate deque per - * priority level. Browsing is FIFO not priority order. + * priority level. + * + * Browsing is FIFO not priority order. There is a MessageDeque + * for fast browsing. */ class PriorityQueue : public Messages { @@ -46,22 +49,36 @@ class PriorityQueue : public Messages bool acquire(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); - virtual bool consume(QueuedMessage&); + bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); - + void updateAcquired(const QueuedMessage& acquired); void foreach(Functor); void removeIf(Predicate); + static uint getPriority(const QueuedMessage&); + protected: - typedef std::list<QueuedMessage*> Available; - typedef std::map<framing::SequenceNumber, QueuedMessage> Index; + typedef std::deque<QueuedMessage*> Deque; + typedef std::vector<Deque> PriorityLevels; + virtual bool findFrontLevel(uint& p, PriorityLevels&); const int levels; - Index messages; - Available available; - bool compare(const QueuedMessage* a, const QueuedMessage* b) const; - uint getPriorityLevel(const QueuedMessage& m) const; + private: + /** Available messages separated by priority and sorted in priority order. + * Holds pointers to the QueuedMessages in fifo + */ + PriorityLevels messages; + /** FIFO index of all messsagse (including acquired messages) for fast browsing and indexing */ + MessageDeque fifo; + uint frontLevel; + bool haveFront; + bool cached; + + void erase(const QueuedMessage&); + uint getPriorityLevel(const QueuedMessage&) const; + void clearCache(); + bool checkFront(); }; }} // namespace qpid::broker |