/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownersip. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/broker/PriorityQueue.h" #include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueCursor.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include #include #include namespace qpid { namespace broker { namespace { class PriorityContext : public CursorContext { public: std::vector position; PriorityContext(size_t levels, SubscriptionType type) : position(levels, QueueCursor(type)) {} }; } PriorityQueue::PriorityQueue(int l) : levels(l), messages(levels, Deque(boost::bind(&PriorityQueue::priorityPadding, this, _1))), counters(levels, framing::SequenceNumber()), fifo(boost::bind(&PriorityQueue::fifoPadding, this, _1)), frontLevel(0), haveFront(false), cached(false) { } bool PriorityQueue::deleted(const QueueCursor& c) { MessagePointer* ptr = fifo.find(c); if (ptr && ptr->holder) { //mark the message as deleted ptr->holder->message.setState(DELETED); //clean the deque for the relevant priority level boost::shared_ptr ctxt = boost::dynamic_pointer_cast(c.context); messages[ptr->holder->priority].clean(); //stop referencing that message holder (it may now have been //deleted) ptr->holder = 0; //clean fifo index fifo.clean(); return true; } else { return false; } } size_t PriorityQueue::size() { return fifo.size(); } Message* PriorityQueue::next(QueueCursor& cursor) { boost::shared_ptr ctxt = boost::dynamic_pointer_cast(cursor.context); if (!ctxt) { ctxt = boost::shared_ptr(new PriorityContext(levels, cursor.type)); cursor.context = ctxt; } if (cursor.type == REPLICATOR) { //browse in fifo order MessagePointer* ptr = fifo.next(cursor); return ptr ? &(ptr->holder->message) : 0; } else if (cursor.type == PURGE) { //iterate over message in reverse priority order (i.e. purge lowest priority message first) //ignore any fairshare configuration here as well for (int p = 0; p < levels; ++p) { MessageHolder* holder = messages[p].next(ctxt->position[p]); if (holder) { cursor.setPosition(holder->message.getSequence(), 0); return &(holder->message); } } return 0; } else { //check each level in turn, in priority order, for any more messages Priority p = firstLevel(); do { MessageHolder* holder = messages[p.current].next(ctxt->position[p.current]); if (holder) { cursor.setPosition(holder->message.getSequence(), 0); return &(holder->message); } } while (nextLevel(p)); return 0; } } Message* PriorityQueue::find(const QueueCursor& cursor) { return find(cursor.position, 0); } Message* PriorityQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor) { MessagePointer* ptr = fifo.find(position, cursor); return ptr ? &(ptr->holder->message) : 0; } void PriorityQueue::publish(const Message& published) { MessageHolder holder; holder.message = published; holder.priority = getPriorityLevel(published); holder.id = ++(counters[holder.priority]); MessagePointer pointer; pointer.holder = &(messages[holder.priority].publish(holder)); pointer.id = published.getSequence(); fifo.publish(pointer); } Message* PriorityQueue::release(const QueueCursor& cursor) { MessagePointer* ptr = fifo.release(cursor); if (ptr) { messages[ptr->holder->priority].resetCursors(); return &(ptr->holder->message); } else { return 0; } } void PriorityQueue::foreach(Functor f) { fifo.foreach(f); } uint PriorityQueue::getPriorityLevel(const Message& m) const { uint priority = m.getPriority(); //Use AMQP 0-10 approach to mapping priorities to a fixed level //(see rule priority-level-implementation) const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0))); if (priority <= firstLevel) return 0; return std::min(priority - firstLevel, (uint)levels-1); } PriorityQueue::MessagePointer PriorityQueue::fifoPadding(qpid::framing::SequenceNumber id) { PriorityQueue::MessagePointer pointer; pointer.holder = 0; pointer.id = id; return pointer; } PriorityQueue::MessageHolder PriorityQueue::priorityPadding(qpid::framing::SequenceNumber id) { PriorityQueue::MessageHolder holder; holder.id = id; holder.message.setState(DELETED); return holder; } PriorityQueue::Priority PriorityQueue::firstLevel() { return Priority(levels - 1); } bool PriorityQueue::nextLevel(Priority& p) { if (p.current > 0) { --(p.current); return true; } else { return false; } } framing::SequenceNumber PriorityQueue::MessageHolder::getSequence() const { return id; } void PriorityQueue::MessageHolder::setState(MessageState s) { message.setState(s); } MessageState PriorityQueue::MessageHolder::getState() const { return message.getState(); } PriorityQueue::MessageHolder::operator Message&() { return message; } framing::SequenceNumber PriorityQueue::MessagePointer::getSequence() const { if (holder) { return holder->message.getSequence(); } else { //this is used when the instance is merely acting as padding return id; } } void PriorityQueue::MessagePointer::setState(MessageState s) { if (holder) { holder->message.setState(s); } } MessageState PriorityQueue::MessagePointer::getState() const { if (holder) { return holder->message.getState(); } else { return DELETED; } } PriorityQueue::MessagePointer::operator Message&() { assert(holder); return holder->message; } }} // namespace qpid::broker