/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/broker/PriorityQueue.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/framing/reply_exceptions.h" #include namespace qpid { namespace broker { PriorityQueue::PriorityQueue(int l) : levels(l), messages(levels, Deque()), frontLevel(0), haveFront(false), cached(false) {} bool PriorityQueue::deleted(const QueuedMessage&) { return true; } size_t PriorityQueue::size() { size_t total(0); for (int i = 0; i < levels; ++i) { total += messages[i].size(); } return total; } void PriorityQueue::release(const QueuedMessage& message) { uint p = getPriorityLevel(message); messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message); clearCache(); } bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) { QueuedMessage comp; comp.position = position; for (int i = 0; i < levels; ++i) { if (!messages[i].empty()) { unsigned long diff = position.getValue() - messages[i].front().position.getValue(); long maxEnd = diff < messages[i].size() ? diff : messages[i].size(); Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp); if (l != messages[i].end() && l->position == position) { message = *l; if (remove) { messages[i].erase(l); clearCache(); } return true; } } } return false; } bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { return find(position, message, true); } bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message) { return find(position, message, false); } bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool) { QueuedMessage match; match.position = position+1; Deque::iterator lowest; bool found = false; for (int i = 0; i < levels; ++i) { Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match); if (m != messages[i].end()) { if (m->position == match.position) { message = *m; return true; } else if (!found || m->position < lowest->position) { lowest = m; found = true; } } } if (found) { message = *lowest; } return found; } bool PriorityQueue::consume(QueuedMessage& message) { if (checkFront()) { message = messages[frontLevel].front(); messages[frontLevel].pop_front(); clearCache(); return true; } else { return false; } } bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { messages[getPriorityLevel(added)].push_back(added); clearCache(); return false;//adding a message never causes one to be removed for deque } void PriorityQueue::foreach(Functor f) { for (int i = 0; i < levels; ++i) { std::for_each(messages[i].begin(), messages[i].end(), f); } } void PriorityQueue::removeIf(Predicate p) { for (int priority = 0; priority < levels; ++priority) { for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) { if (p(*i)) { i = messages[priority].erase(i); clearCache(); } else { ++i; } } } } uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const { uint priority = m.payload->getPriority(); //Use AMQP 0-10 approach to mapping priorities to a fixed level //(see rule priority-level-implementation) const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0))); if (priority <= firstLevel) return 0; return std::min(priority - firstLevel, (uint)levels-1); } void PriorityQueue::clearCache() { cached = false; } bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m) { for (int p = levels-1; p >= 0; --p) { if (!m[p].empty()) { l = p; return true; } } return false; } bool PriorityQueue::checkFront() { if (!cached) { haveFront = findFrontLevel(frontLevel, messages); cached = true; } return haveFront; } uint PriorityQueue::getPriority(const QueuedMessage& message) { const PriorityQueue* queue = dynamic_cast(&(message.queue->getMessages())); if (queue) return queue->getPriorityLevel(message); else return 0; } }} // namespace qpid::broker