diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/PriorityQueue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/PriorityQueue.cpp | 234 |
1 files changed, 234 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp new file mode 100644 index 0000000000..5e60fe5cce --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp @@ -0,0 +1,234 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownersip. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/PriorityQueue.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueCursor.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include <algorithm> +#include <cmath> +#include <boost/bind.hpp> + +namespace qpid { +namespace broker { +namespace { +class PriorityContext : public CursorContext { + public: + std::vector<QueueCursor> position; + PriorityContext(size_t levels, SubscriptionType type) : position(levels, QueueCursor(type)) {} +}; +} + + +PriorityQueue::PriorityQueue(int l) : + levels(l), + messages(levels, Deque(boost::bind(&PriorityQueue::priorityPadding, this, _1))), + counters(levels, framing::SequenceNumber()), + fifo(boost::bind(&PriorityQueue::fifoPadding, this, _1)), + frontLevel(0), haveFront(false), cached(false) +{ +} + +bool PriorityQueue::deleted(const QueueCursor& c) +{ + MessagePointer* ptr = fifo.find(c); + if (ptr && ptr->holder) { + //mark the message as deleted + ptr->holder->message.setState(DELETED); + //clean the deque for the relevant priority level + boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(c.context); + messages[ptr->holder->priority].clean(); + //stop referencing that message holder (it may now have been + //deleted) + ptr->holder = 0; + //clean fifo index + fifo.clean(); + return true; + } else { + return false; + } +} + +size_t PriorityQueue::size() +{ + return fifo.size(); +} + +Message* PriorityQueue::next(QueueCursor& cursor) +{ + boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(cursor.context); + if (!ctxt) { + ctxt = boost::shared_ptr<PriorityContext>(new PriorityContext(levels, CONSUMER)); + cursor.context = ctxt; + } + if (cursor.type == REPLICATOR) { + //browse in fifo order + MessagePointer* ptr = fifo.next(cursor); + return ptr ? &(ptr->holder->message) : 0; + } else if (cursor.type == PURGE) { + //iterate over message in reverse priority order (i.e. purge lowest priority message first) + //ignore any fairshare configuration here as well + for (int p = 0; p < levels; ++p) { + MessageHolder* holder = messages[p].next(ctxt->position[p]); + if (holder) { + cursor.setPosition(holder->message.getSequence(), 0); + return &(holder->message); + } + } + return 0; + } else { + //check each level in turn, in priority order, for any more messages + Priority p = firstLevel(); + do { + MessageHolder* holder = messages[p.current].next(ctxt->position[p.current]); + if (holder) { + cursor.setPosition(holder->message.getSequence(), 0); + return &(holder->message); + } + } while (nextLevel(p)); + return 0; + } +} + +Message* PriorityQueue::find(const QueueCursor& cursor) +{ + return find(cursor.position, 0); +} + +Message* PriorityQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor) +{ + MessagePointer* ptr = fifo.find(position, cursor); + return ptr ? &(ptr->holder->message) : 0; +} + +void PriorityQueue::publish(const Message& published) +{ + MessageHolder holder; + holder.message = published; + holder.priority = getPriorityLevel(published); + holder.id = ++(counters[holder.priority]); + MessagePointer pointer; + pointer.holder = &(messages[holder.priority].publish(holder)); + pointer.id = published.getSequence(); + fifo.publish(pointer); +} + +Message* PriorityQueue::release(const QueueCursor& cursor) +{ + MessagePointer* ptr = fifo.release(cursor); + if (ptr) { + messages[ptr->holder->priority].resetCursors(); + return &(ptr->holder->message); + } else { + return 0; + } +} + +void PriorityQueue::foreach(Functor f) +{ + fifo.foreach(f); +} + +uint PriorityQueue::getPriorityLevel(const Message& m) const +{ + uint priority = m.getPriority(); + //Use AMQP 0-10 approach to mapping priorities to a fixed level + //(see rule priority-level-implementation) + const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0))); + if (priority <= firstLevel) return 0; + return std::min(priority - firstLevel, (uint)levels-1); +} +PriorityQueue::MessagePointer PriorityQueue::fifoPadding(qpid::framing::SequenceNumber id) +{ + PriorityQueue::MessagePointer pointer; + pointer.holder = 0; + pointer.id = id; + return pointer; +} + +PriorityQueue::MessageHolder PriorityQueue::priorityPadding(qpid::framing::SequenceNumber id) +{ + PriorityQueue::MessageHolder holder; + holder.id = id; + holder.message.setState(DELETED); + return holder; +} + +PriorityQueue::Priority PriorityQueue::firstLevel() +{ + return Priority(levels - 1); +} +bool PriorityQueue::nextLevel(Priority& p) +{ + if (p.current > 0) { + --(p.current); + return true; + } else { + return false; + } +} + +framing::SequenceNumber PriorityQueue::MessageHolder::getSequence() const +{ + return id; +} +void PriorityQueue::MessageHolder::setState(MessageState s) +{ + message.setState(s); +} +MessageState PriorityQueue::MessageHolder::getState() const +{ + return message.getState(); +} +PriorityQueue::MessageHolder::operator Message&() +{ + return message; +} +framing::SequenceNumber PriorityQueue::MessagePointer::getSequence() const +{ + if (holder) { + return holder->message.getSequence(); + } else { + //this is used when the instance is merely acting as padding + return id; + } +} +void PriorityQueue::MessagePointer::setState(MessageState s) +{ + if (holder) { + holder->message.setState(s); + } +} +MessageState PriorityQueue::MessagePointer::getState() const +{ + if (holder) { + return holder->message.getState(); + } else { + return DELETED; + } +} +PriorityQueue::MessagePointer::operator Message&() +{ + assert(holder); + return holder->message; +} +}} // namespace qpid::broker |