diff options
author | Gordon Sim <gsim@apache.org> | 2010-05-15 17:05:01 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-05-15 17:05:01 +0000 |
commit | d745be62c6477f03c5e4734a39c2be49943561c0 (patch) | |
tree | 4307689e41914ae30b7b809c50573db2d0df01af /cpp/src/qpid/broker/Queue.cpp | |
parent | 94c33a9ea95ee56bb9e12018d053bfe68d86927f (diff) | |
download | qpid-python-d745be62c6477f03c5e4734a39c2be49943561c0.tar.gz |
QPID-2588: Prevent queue being destroyed while still in use.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@944683 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 47 |
1 files changed, 40 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 90b0d4cc52..42c678cb68 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -103,7 +103,8 @@ Queue::Queue(const string& _name, bool _autodelete, eventMgr(0), insertSeqNo(0), broker(b), - deleted(false) + deleted(false), + barrier(*this) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -718,14 +719,17 @@ void Queue::setLastNodeFailure() } } + // return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck) { + ScopedUse u(barrier); + if (!u.acquired) return false; + if (policy.get() && !suppressPolicyCheck) { Messages dequeues; { Mutex::ScopedLock locker(messageLock); - if (deleted) return false; policy->tryEnqueue(msg); policy->getPendingDequeues(dequeues); } @@ -765,9 +769,12 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { + ScopedUse u(barrier); + if (!u.acquired) return false; + { Mutex::ScopedLock locker(messageLock); - if (deleted || !isEnqueued(msg)) return false; + if (!isEnqueued(msg)) return false; if (!ctxt) { dequeued(msg); } @@ -893,10 +900,10 @@ void Queue::destroy() popAndDequeue(); } alternateExchange->decAlternateUsers(); - deleted = true; } if (store) { + barrier.destroy(); store->flush(*this); store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue @@ -1172,8 +1179,34 @@ void Queue::checkNotDeleted() } } -bool Queue::isValid() +void Queue::flush() { - Mutex::ScopedLock locker(messageLock); - return !deleted; + ScopedUse u(barrier); + if (u.acquired && store) store->flush(*this); +} + +Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} + +bool Queue::UsageBarrier::acquire() +{ + Monitor::ScopedLock l(parent.messageLock); + if (parent.deleted) { + return false; + } else { + ++count; + return true; + } +} + +void Queue::UsageBarrier::release() +{ + Monitor::ScopedLock l(parent.messageLock); + if (--count == 0) parent.messageLock.notifyAll(); +} + +void Queue::UsageBarrier::destroy() +{ + Monitor::ScopedLock l(parent.messageLock); + parent.deleted = true; + while (count) parent.messageLock.wait(); } |