summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/PriorityQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/PriorityQueue.cpp')
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.cpp177
1 files changed, 96 insertions, 81 deletions
diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp
index da52675a29..ab5ec7235a 100644
--- a/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -3,13 +3,13 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+ * regarding copyright ownersip. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,133 +22,124 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
#include <cmath>
namespace qpid {
namespace broker {
-PriorityQueue::PriorityQueue(int l) :
- levels(l) {}
+PriorityQueue::PriorityQueue(int l) :
+ levels(l),
+ messages(levels, Deque()),
+ frontLevel(0), haveFront(false), cached(false) {}
-bool PriorityQueue::deleted(const QueuedMessage& message)
-{
- Index::iterator i = messages.find(message.position);
- if (i != messages.end()) {
- //remove from available list if necessary
- if (i->second.status == QueuedMessage::AVAILABLE) {
- Available::iterator j = std::find(available.begin(), available.end(), &i->second);
- if (j != available.end()) available.erase(j);
- }
- //remove from messages map
- messages.erase(i);
- return true;
- } else {
- return false;
- }
+bool PriorityQueue::deleted(const QueuedMessage& qm) {
+ bool deleted = fifo.deleted(qm);
+ if (deleted) erase(qm);
+ return deleted;
}
size_t PriorityQueue::size()
{
- return available.size();
+ return fifo.size();
+}
+
+namespace {
+bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; }
}
void PriorityQueue::release(const QueuedMessage& message)
{
- Index::iterator i = messages.find(message.position);
- if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
- i->second.status = QueuedMessage::AVAILABLE;
- //insert message back into the correct place in available queue, based on priority:
- Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2));
- available.insert(j, &i->second);
+ QueuedMessage* qm = fifo.releasePtr(message);
+ if (qm) {
+ uint p = getPriorityLevel(message);
+ messages[p].insert(
+ lower_bound(messages[p].begin(), messages[p].end(), qm, before), qm);
+ clearCache();
+ }
+}
+
+
+void PriorityQueue::erase(const QueuedMessage& qm) {
+ size_t i = getPriorityLevel(qm);
+ if (!messages[i].empty()) {
+ long diff = qm.position.getValue() - messages[i].front()->position.getValue();
+ if (diff < 0) return;
+ long maxEnd = std::min(size_t(diff), messages[i].size());
+ QueuedMessage mutableQm = qm; // need non-const qm for lower_bound
+ Deque::iterator l =
+ lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, &mutableQm, before);
+ if (l != messages[i].end() && (*l)->position == qm.position) {
+ messages[i].erase(l);
+ clearCache();
+ return;
+ }
}
}
bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
- Index::iterator i = messages.find(position);
- if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
- i->second.status = QueuedMessage::ACQUIRED;
- message = i->second;
- //remove it from available list (could make this faster by using ordering):
- Available::iterator j = std::find(available.begin(), available.end(), &i->second);
- assert(j != available.end());
- available.erase(j);
- return true;
- } else {
- return false;
- }
+ bool acquired = fifo.acquire(position, message);
+ if (acquired) erase(message); // No longer available
+ return acquired;
}
bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
- Index::iterator i = messages.find(position);
- if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
- message = i->second;
- return true;
- } else {
- return false;
- }
+ return fifo.find(position, message);
}
-bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
+bool PriorityQueue::browse(
+ const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
- Index::iterator i = messages.lower_bound(position+1);
- if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
- message = i->second;
- return true;
- } else {
- return false;
- }
+ return fifo.browse(position, message, unacquired);
}
bool PriorityQueue::consume(QueuedMessage& message)
{
- if (!available.empty()) {
- QueuedMessage* next = available.front();
- messages[next->position].status = QueuedMessage::ACQUIRED;
- message = *next;
- available.pop_front();
+ if (checkFront()) {
+ QueuedMessage* pm = messages[frontLevel].front();
+ messages[frontLevel].pop_front();
+ clearCache();
+ pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index
+ message = *pm;
return true;
} else {
return false;
}
}
-bool PriorityQueue::compare(const QueuedMessage* a, const QueuedMessage* b) const
+bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
{
- int priorityA = getPriorityLevel(*a);
- int priorityB = getPriorityLevel(*b);
- if (priorityA == priorityB) return a->position < b->position;
- else return priorityA > priorityB;
+ QueuedMessage* qmp = fifo.pushPtr(added);
+ messages[getPriorityLevel(added)].push_back(qmp);
+ clearCache();
+ return false; // Adding a message never causes one to be removed for deque
}
-bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
-{
- Index::iterator i = messages.insert(Index::value_type(added.position, added)).first;
- i->second.status = QueuedMessage::AVAILABLE;
- //insert message into the correct place in available queue, based on priority:
- Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2));
- available.insert(j, &i->second);
- return false;//adding a message never causes one to be removed for deque
+void PriorityQueue::updateAcquired(const QueuedMessage& acquired) {
+ fifo.updateAcquired(acquired);
}
void PriorityQueue::foreach(Functor f)
{
- for (Available::iterator i = available.begin(); i != available.end(); ++i) {
- f(**i);
- }
+ fifo.foreach(f);
}
void PriorityQueue::removeIf(Predicate p)
{
- for (Available::iterator i = available.begin(); i != available.end();) {
- if (p(**i)) {
- messages[(*i)->position].status = QueuedMessage::REMOVED;
- i = available.erase(i);
- } else {
- ++i;
+ for (int priority = 0; priority < levels; ++priority) {
+ for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
+ if (p(**i)) {
+ (*i)->status = QueuedMessage::DELETED; // Updates fifo index
+ i = messages[priority].erase(i);
+ clearCache();
+ } else {
+ ++i;
+ }
}
}
+ fifo.clean();
}
uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
@@ -161,6 +152,30 @@ uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
return std::min(priority - firstLevel, (uint)levels-1);
}
+void PriorityQueue::clearCache()
+{
+ cached = false;
+}
+
+bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
+{
+ for (int p = levels-1; p >= 0; --p) {
+ if (!m[p].empty()) {
+ l = p;
+ return true;
+ }
+ }
+ return false;
+}
+
+bool PriorityQueue::checkFront()
+{
+ if (!cached) {
+ haveFront = findFrontLevel(frontLevel, messages);
+ cached = true;
+ }
+ return haveFront;
+}
uint PriorityQueue::getPriority(const QueuedMessage& message)
{