diff options
Diffstat (limited to 'cpp/src/qpid/broker/PriorityQueue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/PriorityQueue.cpp | 177 |
1 files changed, 96 insertions, 81 deletions
diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp index da52675a29..ab5ec7235a 100644 --- a/cpp/src/qpid/broker/PriorityQueue.cpp +++ b/cpp/src/qpid/broker/PriorityQueue.cpp @@ -3,13 +3,13 @@ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownersip. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,133 +22,124 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" #include <cmath> namespace qpid { namespace broker { -PriorityQueue::PriorityQueue(int l) : - levels(l) {} +PriorityQueue::PriorityQueue(int l) : + levels(l), + messages(levels, Deque()), + frontLevel(0), haveFront(false), cached(false) {} -bool PriorityQueue::deleted(const QueuedMessage& message) -{ - Index::iterator i = messages.find(message.position); - if (i != messages.end()) { - //remove from available list if necessary - if (i->second.status == QueuedMessage::AVAILABLE) { - Available::iterator j = std::find(available.begin(), available.end(), &i->second); - if (j != available.end()) available.erase(j); - } - //remove from messages map - messages.erase(i); - return true; - } else { - return false; - } +bool PriorityQueue::deleted(const QueuedMessage& qm) { + bool deleted = fifo.deleted(qm); + if (deleted) erase(qm); + return deleted; } size_t PriorityQueue::size() { - return available.size(); + return fifo.size(); +} + +namespace { +bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; } } void PriorityQueue::release(const QueuedMessage& message) { - Index::iterator i = messages.find(message.position); - if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) { - i->second.status = QueuedMessage::AVAILABLE; - //insert message back into the correct place in available queue, based on priority: - Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2)); - available.insert(j, &i->second); + QueuedMessage* qm = fifo.releasePtr(message); + if (qm) { + uint p = getPriorityLevel(message); + messages[p].insert( + lower_bound(messages[p].begin(), messages[p].end(), qm, before), qm); + clearCache(); + } +} + + +void PriorityQueue::erase(const QueuedMessage& qm) { + size_t i = getPriorityLevel(qm); + if (!messages[i].empty()) { + long diff = qm.position.getValue() - messages[i].front()->position.getValue(); + if (diff < 0) return; + long maxEnd = std::min(size_t(diff), messages[i].size()); + QueuedMessage mutableQm = qm; // need non-const qm for lower_bound + Deque::iterator l = + lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, &mutableQm, before); + if (l != messages[i].end() && (*l)->position == qm.position) { + messages[i].erase(l); + clearCache(); + return; + } } } bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { - Index::iterator i = messages.find(position); - if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { - i->second.status = QueuedMessage::ACQUIRED; - message = i->second; - //remove it from available list (could make this faster by using ordering): - Available::iterator j = std::find(available.begin(), available.end(), &i->second); - assert(j != available.end()); - available.erase(j); - return true; - } else { - return false; - } + bool acquired = fifo.acquire(position, message); + if (acquired) erase(message); // No longer available + return acquired; } bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message) { - Index::iterator i = messages.find(position); - if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { - message = i->second; - return true; - } else { - return false; - } + return fifo.find(position, message); } -bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) +bool PriorityQueue::browse( + const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { - Index::iterator i = messages.lower_bound(position+1); - if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) { - message = i->second; - return true; - } else { - return false; - } + return fifo.browse(position, message, unacquired); } bool PriorityQueue::consume(QueuedMessage& message) { - if (!available.empty()) { - QueuedMessage* next = available.front(); - messages[next->position].status = QueuedMessage::ACQUIRED; - message = *next; - available.pop_front(); + if (checkFront()) { + QueuedMessage* pm = messages[frontLevel].front(); + messages[frontLevel].pop_front(); + clearCache(); + pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index + message = *pm; return true; } else { return false; } } -bool PriorityQueue::compare(const QueuedMessage* a, const QueuedMessage* b) const +bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { - int priorityA = getPriorityLevel(*a); - int priorityB = getPriorityLevel(*b); - if (priorityA == priorityB) return a->position < b->position; - else return priorityA > priorityB; + QueuedMessage* qmp = fifo.pushPtr(added); + messages[getPriorityLevel(added)].push_back(qmp); + clearCache(); + return false; // Adding a message never causes one to be removed for deque } -bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) -{ - Index::iterator i = messages.insert(Index::value_type(added.position, added)).first; - i->second.status = QueuedMessage::AVAILABLE; - //insert message into the correct place in available queue, based on priority: - Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2)); - available.insert(j, &i->second); - return false;//adding a message never causes one to be removed for deque +void PriorityQueue::updateAcquired(const QueuedMessage& acquired) { + fifo.updateAcquired(acquired); } void PriorityQueue::foreach(Functor f) { - for (Available::iterator i = available.begin(); i != available.end(); ++i) { - f(**i); - } + fifo.foreach(f); } void PriorityQueue::removeIf(Predicate p) { - for (Available::iterator i = available.begin(); i != available.end();) { - if (p(**i)) { - messages[(*i)->position].status = QueuedMessage::REMOVED; - i = available.erase(i); - } else { - ++i; + for (int priority = 0; priority < levels; ++priority) { + for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) { + if (p(**i)) { + (*i)->status = QueuedMessage::DELETED; // Updates fifo index + i = messages[priority].erase(i); + clearCache(); + } else { + ++i; + } } } + fifo.clean(); } uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const @@ -161,6 +152,30 @@ uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const return std::min(priority - firstLevel, (uint)levels-1); } +void PriorityQueue::clearCache() +{ + cached = false; +} + +bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m) +{ + for (int p = levels-1; p >= 0; --p) { + if (!m[p].empty()) { + l = p; + return true; + } + } + return false; +} + +bool PriorityQueue::checkFront() +{ + if (!cached) { + haveFront = findFrontLevel(frontLevel, messages); + cached = true; + } + return haveFront; +} uint PriorityQueue::getPriority(const QueuedMessage& message) { |
