summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp10
1 files changed, 4 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 632e512a32..cda730965d 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -304,14 +304,14 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
+ ClusterAcquireScope acquireScope; // Outside the lock
Stoppable::Scope consumeScope(consuming);
+ Mutex::ScopedLock locker(messageLock);
if (!consumeScope) {
QPID_LOG(trace, "Queue stopped, can't consume: " << name);
listeners.addListener(c);
return NO_MESSAGES;
}
- ClusterAcquireScope acquireScope; // Outside the lock
- Mutex::ScopedLock locker(messageLock);
if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
@@ -701,9 +701,6 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
- // FIXME aconway 2011-09-13: new cluster needs tx/dtx support.
- if (!ctxt && broker) broker->getCluster().dequeue(msg);
-
ScopedUse u(barrier);
if (!u.acquired) return false;
{
@@ -713,6 +710,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
dequeued(msg);
}
}
+ if (!ctxt && broker) broker->getCluster().dequeue(msg); // call outside the lock.
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
@@ -1291,7 +1289,7 @@ void Queue::startConsumers() {
notifyListener();
}
-// Called when all busy threads exitd due to stopConsumers()
+// Called when all busy threads exited after stopConsumers()
void Queue::consumingStopped() {
QPID_LOG(trace, "Stopped consumers on " << getName());
if (broker) broker->getCluster().stopped(*this);