summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/PriorityQueue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.cpp234
1 files changed, 234 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
new file mode 100644
index 0000000000..5e60fe5cce
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownersip. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/PriorityQueue.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueCursor.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include <algorithm>
+#include <cmath>
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker {
+namespace {
+class PriorityContext : public CursorContext {
+ public:
+ std::vector<QueueCursor> position;
+ PriorityContext(size_t levels, SubscriptionType type) : position(levels, QueueCursor(type)) {}
+};
+}
+
+
+PriorityQueue::PriorityQueue(int l) :
+ levels(l),
+ messages(levels, Deque(boost::bind(&PriorityQueue::priorityPadding, this, _1))),
+ counters(levels, framing::SequenceNumber()),
+ fifo(boost::bind(&PriorityQueue::fifoPadding, this, _1)),
+ frontLevel(0), haveFront(false), cached(false)
+{
+}
+
+bool PriorityQueue::deleted(const QueueCursor& c)
+{
+ MessagePointer* ptr = fifo.find(c);
+ if (ptr && ptr->holder) {
+ //mark the message as deleted
+ ptr->holder->message.setState(DELETED);
+ //clean the deque for the relevant priority level
+ boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(c.context);
+ messages[ptr->holder->priority].clean();
+ //stop referencing that message holder (it may now have been
+ //deleted)
+ ptr->holder = 0;
+ //clean fifo index
+ fifo.clean();
+ return true;
+ } else {
+ return false;
+ }
+}
+
+size_t PriorityQueue::size()
+{
+ return fifo.size();
+}
+
+Message* PriorityQueue::next(QueueCursor& cursor)
+{
+ boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(cursor.context);
+ if (!ctxt) {
+ ctxt = boost::shared_ptr<PriorityContext>(new PriorityContext(levels, CONSUMER));
+ cursor.context = ctxt;
+ }
+ if (cursor.type == REPLICATOR) {
+ //browse in fifo order
+ MessagePointer* ptr = fifo.next(cursor);
+ return ptr ? &(ptr->holder->message) : 0;
+ } else if (cursor.type == PURGE) {
+ //iterate over message in reverse priority order (i.e. purge lowest priority message first)
+ //ignore any fairshare configuration here as well
+ for (int p = 0; p < levels; ++p) {
+ MessageHolder* holder = messages[p].next(ctxt->position[p]);
+ if (holder) {
+ cursor.setPosition(holder->message.getSequence(), 0);
+ return &(holder->message);
+ }
+ }
+ return 0;
+ } else {
+ //check each level in turn, in priority order, for any more messages
+ Priority p = firstLevel();
+ do {
+ MessageHolder* holder = messages[p.current].next(ctxt->position[p.current]);
+ if (holder) {
+ cursor.setPosition(holder->message.getSequence(), 0);
+ return &(holder->message);
+ }
+ } while (nextLevel(p));
+ return 0;
+ }
+}
+
+Message* PriorityQueue::find(const QueueCursor& cursor)
+{
+ return find(cursor.position, 0);
+}
+
+Message* PriorityQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor)
+{
+ MessagePointer* ptr = fifo.find(position, cursor);
+ return ptr ? &(ptr->holder->message) : 0;
+}
+
+void PriorityQueue::publish(const Message& published)
+{
+ MessageHolder holder;
+ holder.message = published;
+ holder.priority = getPriorityLevel(published);
+ holder.id = ++(counters[holder.priority]);
+ MessagePointer pointer;
+ pointer.holder = &(messages[holder.priority].publish(holder));
+ pointer.id = published.getSequence();
+ fifo.publish(pointer);
+}
+
+Message* PriorityQueue::release(const QueueCursor& cursor)
+{
+ MessagePointer* ptr = fifo.release(cursor);
+ if (ptr) {
+ messages[ptr->holder->priority].resetCursors();
+ return &(ptr->holder->message);
+ } else {
+ return 0;
+ }
+}
+
+void PriorityQueue::foreach(Functor f)
+{
+ fifo.foreach(f);
+}
+
+uint PriorityQueue::getPriorityLevel(const Message& m) const
+{
+ uint priority = m.getPriority();
+ //Use AMQP 0-10 approach to mapping priorities to a fixed level
+ //(see rule priority-level-implementation)
+ const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0)));
+ if (priority <= firstLevel) return 0;
+ return std::min(priority - firstLevel, (uint)levels-1);
+}
+PriorityQueue::MessagePointer PriorityQueue::fifoPadding(qpid::framing::SequenceNumber id)
+{
+ PriorityQueue::MessagePointer pointer;
+ pointer.holder = 0;
+ pointer.id = id;
+ return pointer;
+}
+
+PriorityQueue::MessageHolder PriorityQueue::priorityPadding(qpid::framing::SequenceNumber id)
+{
+ PriorityQueue::MessageHolder holder;
+ holder.id = id;
+ holder.message.setState(DELETED);
+ return holder;
+}
+
+PriorityQueue::Priority PriorityQueue::firstLevel()
+{
+ return Priority(levels - 1);
+}
+bool PriorityQueue::nextLevel(Priority& p)
+{
+ if (p.current > 0) {
+ --(p.current);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+framing::SequenceNumber PriorityQueue::MessageHolder::getSequence() const
+{
+ return id;
+}
+void PriorityQueue::MessageHolder::setState(MessageState s)
+{
+ message.setState(s);
+}
+MessageState PriorityQueue::MessageHolder::getState() const
+{
+ return message.getState();
+}
+PriorityQueue::MessageHolder::operator Message&()
+{
+ return message;
+}
+framing::SequenceNumber PriorityQueue::MessagePointer::getSequence() const
+{
+ if (holder) {
+ return holder->message.getSequence();
+ } else {
+ //this is used when the instance is merely acting as padding
+ return id;
+ }
+}
+void PriorityQueue::MessagePointer::setState(MessageState s)
+{
+ if (holder) {
+ holder->message.setState(s);
+ }
+}
+MessageState PriorityQueue::MessagePointer::getState() const
+{
+ if (holder) {
+ return holder->message.getState();
+ } else {
+ return DELETED;
+ }
+}
+PriorityQueue::MessagePointer::operator Message&()
+{
+ assert(holder);
+ return holder->message;
+}
+}} // namespace qpid::broker