summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/PriorityQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/PriorityQueue.cpp')
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.cpp256
1 files changed, 147 insertions, 109 deletions
diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp
index 9a0fead744..99488ded13 100644
--- a/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -19,24 +19,53 @@
*
*/
#include "qpid/broker/PriorityQueue.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/QueueCursor.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
+#include <algorithm>
#include <cmath>
+#include <boost/bind.hpp>
namespace qpid {
namespace broker {
+namespace {
+class PriorityContext : public CursorContext {
+ public:
+ std::vector<QueueCursor> position;
+ PriorityContext(size_t levels, SubscriptionType type) : position(levels, QueueCursor(type)) {}
+};
+}
+
PriorityQueue::PriorityQueue(int l) :
levels(l),
- messages(levels, Deque()),
- frontLevel(0), haveFront(false), cached(false) {}
+ messages(levels, Deque(boost::bind(&PriorityQueue::priorityPadding, this, _1))),
+ counters(levels, framing::SequenceNumber()),
+ fifo(boost::bind(&PriorityQueue::fifoPadding, this, _1)),
+ frontLevel(0), haveFront(false), cached(false)
+{
+}
-bool PriorityQueue::deleted(const QueuedMessage& qm) {
- bool deleted = fifo.deleted(qm);
- if (deleted) erase(qm);
- return deleted;
+bool PriorityQueue::deleted(const QueueCursor& c)
+{
+ MessagePointer* ptr = fifo.find(c);
+ if (ptr && ptr->holder) {
+ //mark the message as deleted
+ ptr->holder->message.setState(DELETED);
+ //clean the deque for the relevant priority level
+ boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(c.context);
+ messages[ptr->holder->priority].clean();
+ //stop referencing that message holder (it may now have been
+ //deleted)
+ ptr->holder = 0;
+ //clean fifo index
+ fifo.clean();
+ return true;
+ } else {
+ return false;
+ }
}
size_t PriorityQueue::size()
@@ -44,85 +73,69 @@ size_t PriorityQueue::size()
return fifo.size();
}
-namespace {
-bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; }
-}
-
-void PriorityQueue::release(const QueuedMessage& message)
-{
- QueuedMessage* qm = fifo.releasePtr(message);
- if (qm) {
- uint p = getPriorityLevel(message);
- messages[p].insert(
- lower_bound(messages[p].begin(), messages[p].end(), qm, before), qm);
- clearCache();
+Message* PriorityQueue::next(QueueCursor& cursor)
+{
+ boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(cursor.context);
+ if (!ctxt) {
+ ctxt = boost::shared_ptr<PriorityContext>(new PriorityContext(levels, CONSUMER));
+ cursor.context = ctxt;
}
-}
-
-
-void PriorityQueue::erase(const QueuedMessage& qm) {
- size_t i = getPriorityLevel(qm);
- if (!messages[i].empty()) {
- long diff = qm.position.getValue() - messages[i].front()->position.getValue();
- if (diff < 0) return;
- long maxEnd = std::min(size_t(diff), messages[i].size());
- QueuedMessage mutableQm = qm; // need non-const qm for lower_bound
- Deque::iterator l =
- lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, &mutableQm, before);
- if (l != messages[i].end() && (*l)->position == qm.position) {
- messages[i].erase(l);
- clearCache();
- return;
+ if (cursor.type == REPLICATOR) {
+ //browse in fifo order
+ MessagePointer* ptr = fifo.next(cursor);
+ return ptr ? &(ptr->holder->message) : 0;
+ } else if (cursor.type == PURGE) {
+ //iterate over message in reverse priority order (i.e. purge lowest priority message first)
+ //ignore any fairshare configuration here as well
+ for (int p = 0; p < levels; ++p) {
+ MessageHolder* holder = messages[p].next(ctxt->position[p]);
+ if (holder) {
+ cursor.setPosition(holder->message.getSequence(), 0);
+ return &(holder->message);
+ }
}
+ return 0;
+ } else {
+ //check each level in turn, in priority order, for any more messages
+ Priority p = firstLevel();
+ do {
+ MessageHolder* holder = messages[p.current].next(ctxt->position[p.current]);
+ if (holder) {
+ cursor.setPosition(holder->message.getSequence(), 0);
+ return &(holder->message);
+ }
+ } while (nextLevel(p));
+ return 0;
}
}
-bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
+Message* PriorityQueue::find(const QueueCursor& cursor)
{
- bool acquired = fifo.acquire(position, message);
- if (acquired) erase(message); // No longer available
- return acquired;
+ return find(cursor.position, 0);
}
-bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
+Message* PriorityQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor)
{
- return fifo.find(position, message);
+ MessagePointer* ptr = fifo.find(position, cursor);
+ return ptr ? &(ptr->holder->message) : 0;
}
-bool PriorityQueue::browse(
- const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
+void PriorityQueue::publish(const Message& published)
{
- return fifo.browse(position, message, unacquired);
+ MessageHolder holder;
+ holder.message = published;
+ holder.priority = getPriorityLevel(published);
+ holder.id = ++(counters[holder.priority]);
+ MessagePointer pointer;
+ pointer.holder = &(messages[holder.priority].publish(holder));
+ pointer.id = published.getSequence();
+ fifo.publish(pointer);
}
-bool PriorityQueue::consume(QueuedMessage& message)
-{
- if (checkFront()) {
- QueuedMessage* pm = messages[frontLevel].front();
- messages[frontLevel].pop_front();
- clearCache();
- pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index
- message = *pm;
- return true;
- } else {
- return false;
- }
-}
-
-bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
+Message* PriorityQueue::release(const QueueCursor& cursor)
{
- QueuedMessage* qmp = fifo.pushPtr(added);
- messages[getPriorityLevel(added)].push_back(qmp);
- clearCache();
- return false; // Adding a message never causes one to be removed for deque
-}
-
-void PriorityQueue::updateAcquired(const QueuedMessage& acquired) {
- fifo.updateAcquired(acquired);
-}
-
-void PriorityQueue::setPosition(const framing::SequenceNumber& n) {
- fifo.setPosition(n);
+ MessagePointer* ptr = fifo.release(cursor);
+ return ptr ? &(ptr->holder->message) : 0;
}
void PriorityQueue::foreach(Functor f)
@@ -130,62 +143,87 @@ void PriorityQueue::foreach(Functor f)
fifo.foreach(f);
}
-void PriorityQueue::removeIf(Predicate p)
-{
- for (int priority = 0; priority < levels; ++priority) {
- for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
- if (p(**i)) {
- (*i)->status = QueuedMessage::DELETED; // Updates fifo index
- i = messages[priority].erase(i);
- clearCache();
- } else {
- ++i;
- }
- }
- }
- fifo.clean();
-}
-
-uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
+uint PriorityQueue::getPriorityLevel(const Message& m) const
{
- uint priority = m.payload->getPriority();
+ uint priority = m.getPriority();
//Use AMQP 0-10 approach to mapping priorities to a fixed level
//(see rule priority-level-implementation)
const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0)));
if (priority <= firstLevel) return 0;
return std::min(priority - firstLevel, (uint)levels-1);
}
+PriorityQueue::MessagePointer PriorityQueue::fifoPadding(qpid::framing::SequenceNumber id)
+{
+ PriorityQueue::MessagePointer pointer;
+ pointer.holder = 0;
+ pointer.id = id;
+ return pointer;
+}
-void PriorityQueue::clearCache()
+PriorityQueue::MessageHolder PriorityQueue::priorityPadding(qpid::framing::SequenceNumber id)
{
- cached = false;
+ PriorityQueue::MessageHolder holder;
+ holder.id = id;
+ holder.message.setState(DELETED);
+ return holder;
}
-bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
+PriorityQueue::Priority PriorityQueue::firstLevel()
{
- for (int p = levels-1; p >= 0; --p) {
- if (!m[p].empty()) {
- l = p;
- return true;
- }
+ return Priority(levels - 1);
+}
+bool PriorityQueue::nextLevel(Priority& p)
+{
+ if (p.current > 0) {
+ --(p.current);
+ return true;
+ } else {
+ return false;
}
- return false;
}
-bool PriorityQueue::checkFront()
+framing::SequenceNumber PriorityQueue::MessageHolder::getSequence() const
+{
+ return id;
+}
+void PriorityQueue::MessageHolder::setState(MessageState s)
+{
+ message.setState(s);
+}
+MessageState PriorityQueue::MessageHolder::getState() const
+{
+ return message.getState();
+}
+PriorityQueue::MessageHolder::operator Message&()
+{
+ return message;
+}
+framing::SequenceNumber PriorityQueue::MessagePointer::getSequence() const
{
- if (!cached) {
- haveFront = findFrontLevel(frontLevel, messages);
- cached = true;
+ if (holder) {
+ return holder->message.getSequence();
+ } else {
+ //this is used when the instance is merely acting as padding
+ return id;
}
- return haveFront;
}
-
-uint PriorityQueue::getPriority(const QueuedMessage& message)
+void PriorityQueue::MessagePointer::setState(MessageState s)
{
- const PriorityQueue* queue = dynamic_cast<const PriorityQueue*>(&(message.queue->getMessages()));
- if (queue) return queue->getPriorityLevel(message);
- else return 0;
+ if (holder) {
+ holder->message.setState(s);
+ }
+}
+MessageState PriorityQueue::MessagePointer::getState() const
+{
+ if (holder) {
+ return holder->message.getState();
+ } else {
+ return DELETED;
+ }
+}
+PriorityQueue::MessagePointer::operator Message&()
+{
+ assert(holder);
+ return holder->message;
}
-
}} // namespace qpid::broker