diff options
Diffstat (limited to 'cpp/src/qpid/broker/PriorityQueue.h')
-rw-r--r-- | cpp/src/qpid/broker/PriorityQueue.h | 72 |
1 files changed, 47 insertions, 25 deletions
diff --git a/cpp/src/qpid/broker/PriorityQueue.h b/cpp/src/qpid/broker/PriorityQueue.h index 301367358b..16432bfb54 100644 --- a/cpp/src/qpid/broker/PriorityQueue.h +++ b/cpp/src/qpid/broker/PriorityQueue.h @@ -22,6 +22,7 @@ * */ #include "qpid/broker/MessageDeque.h" +#include "qpid/broker/IndexedDeque.h" #include "qpid/sys/IntegerTypes.h" #include <deque> #include <vector> @@ -44,42 +45,63 @@ class PriorityQueue : public Messages virtual ~PriorityQueue() {} size_t size(); - bool deleted(const QueuedMessage&); - void release(const QueuedMessage&); - bool acquire(const framing::SequenceNumber&, QueuedMessage&); - bool find(const framing::SequenceNumber&, QueuedMessage&); - bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); - bool consume(QueuedMessage&); - bool push(const QueuedMessage& added, QueuedMessage& removed); - void updateAcquired(const QueuedMessage& acquired); - void setPosition(const framing::SequenceNumber&); - void foreach(Functor); - void removeIf(Predicate); - - static uint getPriority(const QueuedMessage&); + bool deleted(const QueueCursor&); + void publish(const Message& added); + Message* next(QueueCursor&); + Message* release(const QueueCursor& cursor); + Message* find(const QueueCursor&); + Message* find(const framing::SequenceNumber&, QueueCursor*); + void foreach(Functor); + static uint getPriority(const Message&); protected: - typedef std::deque<QueuedMessage*> Deque; - typedef std::vector<Deque> PriorityLevels; - virtual bool findFrontLevel(uint& p, PriorityLevels&); - const int levels; + struct Priority + { + const int start; + int current; + Priority(int s) : start(s), current(start) {} + }; + virtual Priority firstLevel(); + virtual bool nextLevel(Priority& ); private: - /** Available messages separated by priority and sorted in priority order. - * Holds pointers to the QueuedMessages in fifo + struct MessageHolder + { + Message message; + int priority; + framing::SequenceNumber id; + framing::SequenceNumber getSequence() const; + void setState(MessageState); + MessageState getState() const; + operator Message&(); + }; + struct MessagePointer + { + MessageHolder* holder; + framing::SequenceNumber id;//used only for padding + framing::SequenceNumber getSequence() const; + void setState(MessageState); + MessageState getState() const; + operator Message&(); + }; + typedef IndexedDeque<MessageHolder> Deque; + typedef std::vector<Deque> PriorityLevels; + typedef std::vector<framing::SequenceNumber> Counters; + + /** Holds pointers to messages (stored in the fifo index) separated by priority. */ PriorityLevels messages; - /** FIFO index of all messsagse (including acquired messages) for fast browsing and indexing */ - MessageDeque fifo; + Counters counters; + /** FIFO index of messages for fast browsing and indexing */ + IndexedDeque<MessagePointer> fifo; uint frontLevel; bool haveFront; bool cached; - void erase(const QueuedMessage&); - uint getPriorityLevel(const QueuedMessage&) const; - void clearCache(); - bool checkFront(); + uint getPriorityLevel(const Message&) const; + MessageHolder priorityPadding(qpid::framing::SequenceNumber); + MessagePointer fifoPadding(qpid::framing::SequenceNumber); }; }} // namespace qpid::broker |