summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/PriorityQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/PriorityQueue.h')
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.h72
1 files changed, 47 insertions, 25 deletions
diff --git a/cpp/src/qpid/broker/PriorityQueue.h b/cpp/src/qpid/broker/PriorityQueue.h
index 301367358b..16432bfb54 100644
--- a/cpp/src/qpid/broker/PriorityQueue.h
+++ b/cpp/src/qpid/broker/PriorityQueue.h
@@ -22,6 +22,7 @@
*
*/
#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/IndexedDeque.h"
#include "qpid/sys/IntegerTypes.h"
#include <deque>
#include <vector>
@@ -44,42 +45,63 @@ class PriorityQueue : public Messages
virtual ~PriorityQueue() {}
size_t size();
- bool deleted(const QueuedMessage&);
- void release(const QueuedMessage&);
- bool acquire(const framing::SequenceNumber&, QueuedMessage&);
- bool find(const framing::SequenceNumber&, QueuedMessage&);
- bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
- bool consume(QueuedMessage&);
- bool push(const QueuedMessage& added, QueuedMessage& removed);
- void updateAcquired(const QueuedMessage& acquired);
- void setPosition(const framing::SequenceNumber&);
- void foreach(Functor);
- void removeIf(Predicate);
-
- static uint getPriority(const QueuedMessage&);
+ bool deleted(const QueueCursor&);
+ void publish(const Message& added);
+ Message* next(QueueCursor&);
+ Message* release(const QueueCursor& cursor);
+ Message* find(const QueueCursor&);
+ Message* find(const framing::SequenceNumber&, QueueCursor*);
+ void foreach(Functor);
+ static uint getPriority(const Message&);
protected:
- typedef std::deque<QueuedMessage*> Deque;
- typedef std::vector<Deque> PriorityLevels;
- virtual bool findFrontLevel(uint& p, PriorityLevels&);
-
const int levels;
+ struct Priority
+ {
+ const int start;
+ int current;
+ Priority(int s) : start(s), current(start) {}
+ };
+ virtual Priority firstLevel();
+ virtual bool nextLevel(Priority& );
private:
- /** Available messages separated by priority and sorted in priority order.
- * Holds pointers to the QueuedMessages in fifo
+ struct MessageHolder
+ {
+ Message message;
+ int priority;
+ framing::SequenceNumber id;
+ framing::SequenceNumber getSequence() const;
+ void setState(MessageState);
+ MessageState getState() const;
+ operator Message&();
+ };
+ struct MessagePointer
+ {
+ MessageHolder* holder;
+ framing::SequenceNumber id;//used only for padding
+ framing::SequenceNumber getSequence() const;
+ void setState(MessageState);
+ MessageState getState() const;
+ operator Message&();
+ };
+ typedef IndexedDeque<MessageHolder> Deque;
+ typedef std::vector<Deque> PriorityLevels;
+ typedef std::vector<framing::SequenceNumber> Counters;
+
+ /** Holds pointers to messages (stored in the fifo index) separated by priority.
*/
PriorityLevels messages;
- /** FIFO index of all messsagse (including acquired messages) for fast browsing and indexing */
- MessageDeque fifo;
+ Counters counters;
+ /** FIFO index of messages for fast browsing and indexing */
+ IndexedDeque<MessagePointer> fifo;
uint frontLevel;
bool haveFront;
bool cached;
- void erase(const QueuedMessage&);
- uint getPriorityLevel(const QueuedMessage&) const;
- void clearCache();
- bool checkFront();
+ uint getPriorityLevel(const Message&) const;
+ MessageHolder priorityPadding(qpid::framing::SequenceNumber);
+ MessagePointer fifoPadding(qpid::framing::SequenceNumber);
};
}} // namespace qpid::broker