summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
-rw-r--r--cpp/src/qpid/broker/Queue.h27
1 files changed, 9 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 96c79d1b92..66e4c5fa22 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -26,6 +26,7 @@
#include "qpid/broker/OwnershipToken.h"
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Message.h"
+#include "qpid/broker/Messages.h"
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueBindings.h"
@@ -85,10 +86,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
~ScopedUse() { if (acquired) barrier.release(); }
};
- typedef std::deque<QueuedMessage> Messages;
- typedef std::map<std::string,boost::intrusive_ptr<Message> > LVQ;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
+
const std::string name;
const bool autodelete;
MessageStore* store;
@@ -96,16 +96,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
uint32_t consumerCount;
OwnershipToken* exclusive;
bool noLocal;
- bool lastValueQueue;
- bool lastValueQueueNoBrowse;
bool persistLastNode;
bool inLastNodeFailure;
std::string traceId;
std::vector<std::string> traceExclude;
QueueListeners listeners;
- Messages messages;
- Messages pendingDequeues;//used to avoid dequeuing during recovery
- LVQ lvq;
+ std::auto_ptr<Messages> messages;
+ std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
mutable qpid::sys::Mutex consumerLock;
mutable qpid::sys::Monitor messageLock;
mutable qpid::sys::Mutex ownershipLock;
@@ -140,11 +137,10 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool isExcluded(boost::intrusive_ptr<Message>& msg);
void dequeued(const QueuedMessage& msg);
- void popMsg(QueuedMessage& qmsg);
+ void pop();
void popAndDequeue();
QueuedMessage getFront();
- QueuedMessage& checkLvqReplace(QueuedMessage& msg);
- void clearLVQIndex(const QueuedMessage& msg);
+ void forcePersistent(QueuedMessage& msg);
inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
{
@@ -169,7 +165,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
}
}
- Messages::iterator findAt(framing::SequenceNumber pos);
void checkNotDeleted();
public:
@@ -320,13 +315,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** Apply f to each Message on the queue. */
template <class F> void eachMessage(F f) {
sys::Mutex::ScopedLock l(messageLock);
- if (lastValueQueue) {
- for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) {
- f(checkLvqReplace(*i));
- }
- } else {
- std::for_each(messages.begin(), messages.end(), f);
- }
+ messages->foreach(f);
}
/** Apply f to each QueueBinding on the queue */
@@ -352,6 +341,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
// For cluster update
QueueListeners& getListeners();
+ Messages& getMessages();
+ const Messages& getMessages() const;
/**
* Reserve space in policy for an enqueued message that