diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 47 |
1 files changed, 40 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 90b0d4cc52..42c678cb68 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -103,7 +103,8 @@ Queue::Queue(const string& _name, bool _autodelete, eventMgr(0), insertSeqNo(0), broker(b), - deleted(false) + deleted(false), + barrier(*this) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -718,14 +719,17 @@ void Queue::setLastNodeFailure() } } + // return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck) { + ScopedUse u(barrier); + if (!u.acquired) return false; + if (policy.get() && !suppressPolicyCheck) { Messages dequeues; { Mutex::ScopedLock locker(messageLock); - if (deleted) return false; policy->tryEnqueue(msg); policy->getPendingDequeues(dequeues); } @@ -765,9 +769,12 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { + ScopedUse u(barrier); + if (!u.acquired) return false; + { Mutex::ScopedLock locker(messageLock); - if (deleted || !isEnqueued(msg)) return false; + if (!isEnqueued(msg)) return false; if (!ctxt) { dequeued(msg); } @@ -893,10 +900,10 @@ void Queue::destroy() popAndDequeue(); } alternateExchange->decAlternateUsers(); - deleted = true; } if (store) { + barrier.destroy(); store->flush(*this); store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue @@ -1172,8 +1179,34 @@ void Queue::checkNotDeleted() } } -bool Queue::isValid() +void Queue::flush() { - Mutex::ScopedLock locker(messageLock); - return !deleted; + ScopedUse u(barrier); + if (u.acquired && store) store->flush(*this); +} + +Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} + +bool Queue::UsageBarrier::acquire() +{ + Monitor::ScopedLock l(parent.messageLock); + if (parent.deleted) { + return false; + } else { + ++count; + return true; + } +} + +void Queue::UsageBarrier::release() +{ + Monitor::ScopedLock l(parent.messageLock); + if (--count == 0) parent.messageLock.notifyAll(); +} + +void Queue::UsageBarrier::destroy() +{ + Monitor::ScopedLock l(parent.messageLock); + parent.deleted = true; + while (count) parent.messageLock.wait(); } |