diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 10 |
1 files changed, 4 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 632e512a32..cda730965d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -304,14 +304,14 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { + ClusterAcquireScope acquireScope; // Outside the lock Stoppable::Scope consumeScope(consuming); + Mutex::ScopedLock locker(messageLock); if (!consumeScope) { QPID_LOG(trace, "Queue stopped, can't consume: " << name); listeners.addListener(c); return NO_MESSAGES; } - ClusterAcquireScope acquireScope; // Outside the lock - Mutex::ScopedLock locker(messageLock); if (messages->empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); @@ -701,9 +701,6 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { - // FIXME aconway 2011-09-13: new cluster needs tx/dtx support. - if (!ctxt && broker) broker->getCluster().dequeue(msg); - ScopedUse u(barrier); if (!u.acquired) return false; { @@ -713,6 +710,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) dequeued(msg); } } + if (!ctxt && broker) broker->getCluster().dequeue(msg); // call outside the lock. // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. @@ -1291,7 +1289,7 @@ void Queue::startConsumers() { notifyListener(); } -// Called when all busy threads exitd due to stopConsumers() +// Called when all busy threads exited after stopConsumers() void Queue::consumingStopped() { QPID_LOG(trace, "Stopped consumers on " << getName()); if (broker) broker->getCluster().stopped(*this); |