diff options
author | Gordon Sim <gsim@apache.org> | 2010-05-15 17:05:01 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-05-15 17:05:01 +0000 |
commit | d745be62c6477f03c5e4734a39c2be49943561c0 (patch) | |
tree | 4307689e41914ae30b7b809c50573db2d0df01af /cpp/src | |
parent | 94c33a9ea95ee56bb9e12018d053bfe68d86927f (diff) | |
download | qpid-python-d745be62c6477f03c5e4734a39c2be49943561c0.tar.gz |
QPID-2588: Prevent queue being destroyed while still in use.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@944683 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableQueue.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 26 | ||||
-rw-r--r-- | cpp/src/tests/IncompleteMessageList.cpp | 2 |
5 files changed, 68 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index 21fb421168..e5fbb71cbd 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -52,8 +52,8 @@ void PersistableMessage::flush() } for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) { PersistableQueue::shared_ptr q(i->lock()); - if (q && q->isValid()) { - store->flush(*q); + if (q) { + q->flush(); } } } diff --git a/cpp/src/qpid/broker/PersistableQueue.h b/cpp/src/qpid/broker/PersistableQueue.h index bcf4268158..655d26bc74 100644 --- a/cpp/src/qpid/broker/PersistableQueue.h +++ b/cpp/src/qpid/broker/PersistableQueue.h @@ -60,7 +60,7 @@ public: }; virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0; - virtual bool isValid() = 0; + virtual void flush() = 0; inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;}; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 90b0d4cc52..42c678cb68 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -103,7 +103,8 @@ Queue::Queue(const string& _name, bool _autodelete, eventMgr(0), insertSeqNo(0), broker(b), - deleted(false) + deleted(false), + barrier(*this) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -718,14 +719,17 @@ void Queue::setLastNodeFailure() } } + // return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck) { + ScopedUse u(barrier); + if (!u.acquired) return false; + if (policy.get() && !suppressPolicyCheck) { Messages dequeues; { Mutex::ScopedLock locker(messageLock); - if (deleted) return false; policy->tryEnqueue(msg); policy->getPendingDequeues(dequeues); } @@ -765,9 +769,12 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { + ScopedUse u(barrier); + if (!u.acquired) return false; + { Mutex::ScopedLock locker(messageLock); - if (deleted || !isEnqueued(msg)) return false; + if (!isEnqueued(msg)) return false; if (!ctxt) { dequeued(msg); } @@ -893,10 +900,10 @@ void Queue::destroy() popAndDequeue(); } alternateExchange->decAlternateUsers(); - deleted = true; } if (store) { + barrier.destroy(); store->flush(*this); store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue @@ -1172,8 +1179,34 @@ void Queue::checkNotDeleted() } } -bool Queue::isValid() +void Queue::flush() { - Mutex::ScopedLock locker(messageLock); - return !deleted; + ScopedUse u(barrier); + if (u.acquired && store) store->flush(*this); +} + +Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} + +bool Queue::UsageBarrier::acquire() +{ + Monitor::ScopedLock l(parent.messageLock); + if (parent.deleted) { + return false; + } else { + ++count; + return true; + } +} + +void Queue::UsageBarrier::release() +{ + Monitor::ScopedLock l(parent.messageLock); + if (--count == 0) parent.messageLock.notifyAll(); +} + +void Queue::UsageBarrier::destroy() +{ + Monitor::ScopedLock l(parent.messageLock); + parent.deleted = true; + while (count) parent.messageLock.wait(); } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 51b64ba036..cdfa8a1a1f 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -59,6 +59,8 @@ namespace qpid { using std::string; + + /** * The brokers representation of an amqp queue. Messages are * delivered to a queue from where they can be dispatched to @@ -68,6 +70,25 @@ namespace qpid { class Queue : public boost::enable_shared_from_this<Queue>, public PersistableQueue, public management::Manageable { + struct UsageBarrier + { + Queue& parent; + uint count; + + UsageBarrier(Queue&); + bool acquire(); + void release(); + void destroy(); + }; + + struct ScopedUse + { + UsageBarrier& barrier; + const bool acquired; + ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {} + ~ScopedUse() { if (acquired) barrier.release(); } + }; + typedef std::deque<QueuedMessage> Messages; typedef std::map<string,boost::intrusive_ptr<Message> > LVQ; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; @@ -90,7 +111,7 @@ namespace qpid { Messages pendingDequeues;//used to avoid dequeuing during recovery LVQ lvq; mutable qpid::sys::Mutex consumerLock; - mutable qpid::sys::Mutex messageLock; + mutable qpid::sys::Monitor messageLock; mutable qpid::sys::Mutex ownershipLock; mutable uint64_t persistenceId; framing::FieldTable settings; @@ -108,6 +129,7 @@ namespace qpid { std::string seqNoKey; Broker* broker; bool deleted; + UsageBarrier barrier; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -342,7 +364,7 @@ namespace qpid { */ void recoverPrepared(boost::intrusive_ptr<Message>& msg); - bool isValid(); + void flush(); }; } } diff --git a/cpp/src/tests/IncompleteMessageList.cpp b/cpp/src/tests/IncompleteMessageList.cpp index 303d83cd66..10782572e5 100644 --- a/cpp/src/tests/IncompleteMessageList.cpp +++ b/cpp/src/tests/IncompleteMessageList.cpp @@ -114,7 +114,7 @@ QPID_AUTO_TEST_CASE(testSyncProcessWithIncomplete) IncompleteMessageList list; SequenceNumber counter(1); MockStore store; - store.queue = Queue::shared_ptr(new Queue("mock-queue")); + store.queue = Queue::shared_ptr(new Queue("mock-queue", false, &store)); //fill up list with messages for (int i = 0; i < 5; i++) { boost::intrusive_ptr<Message> msg(new Message(counter++)); |