diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 27 |
1 files changed, 9 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 96c79d1b92..66e4c5fa22 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -26,6 +26,7 @@ #include "qpid/broker/OwnershipToken.h" #include "qpid/broker/Consumer.h" #include "qpid/broker/Message.h" +#include "qpid/broker/Messages.h" #include "qpid/broker/PersistableQueue.h" #include "qpid/broker/QueuePolicy.h" #include "qpid/broker/QueueBindings.h" @@ -85,10 +86,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, ~ScopedUse() { if (acquired) barrier.release(); } }; - typedef std::deque<QueuedMessage> Messages; - typedef std::map<std::string,boost::intrusive_ptr<Message> > LVQ; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; + const std::string name; const bool autodelete; MessageStore* store; @@ -96,16 +96,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t consumerCount; OwnershipToken* exclusive; bool noLocal; - bool lastValueQueue; - bool lastValueQueueNoBrowse; bool persistLastNode; bool inLastNodeFailure; std::string traceId; std::vector<std::string> traceExclude; QueueListeners listeners; - Messages messages; - Messages pendingDequeues;//used to avoid dequeuing during recovery - LVQ lvq; + std::auto_ptr<Messages> messages; + std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery mutable qpid::sys::Mutex consumerLock; mutable qpid::sys::Monitor messageLock; mutable qpid::sys::Mutex ownershipLock; @@ -140,11 +137,10 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool isExcluded(boost::intrusive_ptr<Message>& msg); void dequeued(const QueuedMessage& msg); - void popMsg(QueuedMessage& qmsg); + void pop(); void popAndDequeue(); QueuedMessage getFront(); - QueuedMessage& checkLvqReplace(QueuedMessage& msg); - void clearLVQIndex(const QueuedMessage& msg); + void forcePersistent(QueuedMessage& msg); inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg) { @@ -169,7 +165,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, } } - Messages::iterator findAt(framing::SequenceNumber pos); void checkNotDeleted(); public: @@ -320,13 +315,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, /** Apply f to each Message on the queue. */ template <class F> void eachMessage(F f) { sys::Mutex::ScopedLock l(messageLock); - if (lastValueQueue) { - for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) { - f(checkLvqReplace(*i)); - } - } else { - std::for_each(messages.begin(), messages.end(), f); - } + messages->foreach(f); } /** Apply f to each QueueBinding on the queue */ @@ -352,6 +341,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, // For cluster update QueueListeners& getListeners(); + Messages& getMessages(); + const Messages& getMessages() const; /** * Reserve space in policy for an enqueued message that |