diff options
author | Gordon Sim <gsim@apache.org> | 2009-09-28 12:12:41 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-09-28 12:12:41 +0000 |
commit | 9376922f0fce58400c1e9b5b20f6c6f7b279a55b (patch) | |
tree | 52b9422da0032defb0b3b94caef6ec168e264efd /cpp/src | |
parent | 46b7c031c27b7c047d7f2361c4d8287ee1578f05 (diff) | |
download | qpid-python-9376922f0fce58400c1e9b5b20f6c6f7b279a55b.tar.gz |
QPID-2102: Changed QueuePolicy to rely on external locking and require dequeues to be handled by policy user rather.
(r817742 introduced a deadlock in ring queue policy which this checkin fixes)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819505 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.h | 6 | ||||
-rwxr-xr-x | cpp/src/tests/ring_queue_test | 7 |
5 files changed, 22 insertions, 37 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 80794f791f..5bfec0f24e 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -208,11 +208,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } void Queue::requeue(const QueuedMessage& msg){ - if (!isEnqueued(msg)) return; - QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); + if (!isEnqueued(msg)) return; msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); listeners.populate(copy); @@ -603,7 +602,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); } if (policy.get()) { - Mutex::ScopedUnlock locker(messageLock); policy->enqueued(qm); } } @@ -696,7 +694,14 @@ void Queue::setLastNodeFailure() bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck) { if (policy.get() && !suppressPolicyCheck) { - policy->tryEnqueue(msg); + Messages dequeues; + { + Mutex::ScopedLock locker(messageLock); + policy->tryEnqueue(msg); + policy->getPendingDequeues(dequeues); + } + //depending on policy, may have some dequeues that need to performed without holding the lock + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } if (inLastNodeFailure && persistLastNode){ @@ -1072,10 +1077,4 @@ bool Queue::isEnqueued(const QueuedMessage& msg) return !policy.get() || policy->isEnqueued(msg); } -void Queue::addPendingDequeue(const QueuedMessage& msg) -{ - //assumes lock is held - true at present but rather nasty as this is a public method - pendingDequeues.push_back(msg); -} - QueueListeners& Queue::getListeners() { return listeners; } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 9ac5e3f5e9..661e46f619 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -334,18 +334,6 @@ namespace qpid { */ void recoveryComplete(); - /** - * This is a hack to avoid deadlocks in durable ring - * queues. It is used for dequeueing messages in response - * to an enqueue while avoid holding lock over call to - * store. - * - * Assumes messageLock is held - true for curent use case - * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public - * method - **/ - void addPendingDequeue(const QueuedMessage &msg); - // For cluster update QueueListeners& getListeners(); }; diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index 0f1f7f370f..a8aa674c53 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -77,7 +77,6 @@ bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m) { - qpid::sys::Mutex::ScopedLock l(lock); if (checkLimit(m)) { enqueued(m->contentSize()); } else { @@ -87,13 +86,11 @@ void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m) void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m) { - qpid::sys::Mutex::ScopedLock l(lock); enqueued(m->contentSize()); } void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m) { - qpid::sys::Mutex::ScopedLock l(lock); dequeued(m->contentSize()); } @@ -101,7 +98,6 @@ void QueuePolicy::enqueued(const QueuedMessage&) {} void QueuePolicy::dequeued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); dequeued(m.payload->contentSize()); } @@ -141,6 +137,7 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s) defaultMaxSize = s; } +void QueuePolicy::getPendingDequeues(Messages&) {} @@ -200,14 +197,12 @@ bool before(const QueuedMessage& a, const QueuedMessage& b) void RingQueuePolicy::enqueued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //need to insert in correct location based on position queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m); } void RingQueuePolicy::dequeued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //find and remove m from queue if (find(m, pendingDequeues, true) || find(m, queue, true)) { //now update count and size @@ -217,7 +212,6 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m) bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //for non-strict ring policy, a message can be replaced (and //therefore dequeued) before it is accepted or released by //subscriber; need to detect this @@ -241,8 +235,6 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) pendingDequeues.push_back(oldest); QPID_LOG(debug, "Ring policy triggered in " << name << ": removed message " << oldest.position << " to make way for new message"); - qpid::sys::Mutex::ScopedUnlock u(lock); - oldest.queue->dequeue(0, oldest); return true; } else { QPID_LOG(debug, "Ring policy could not be triggered in " << name @@ -254,6 +246,11 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) } } +void RingQueuePolicy::getPendingDequeues(Messages& result) +{ + result = pendingDequeues; +} + bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) { for (Messages::iterator i = q.begin(); i != q.end(); i++) { diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h index 65c52304f2..b2937e94c7 100644 --- a/cpp/src/qpid/broker/QueuePolicy.h +++ b/cpp/src/qpid/broker/QueuePolicy.h @@ -47,6 +47,7 @@ class QueuePolicy static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); public: + typedef std::deque<QueuedMessage> Messages; static QPID_BROKER_EXTERN const std::string maxCountKey; static QPID_BROKER_EXTERN const std::string maxSizeKey; static QPID_BROKER_EXTERN const std::string typeKey; @@ -68,7 +69,7 @@ class QueuePolicy void encode(framing::Buffer& buffer) const; void decode ( framing::Buffer& buffer ); uint32_t encodedSize() const; - + virtual void getPendingDequeues(Messages& result); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); @@ -80,7 +81,6 @@ class QueuePolicy const QueuePolicy&); protected: const std::string name; - qpid::sys::Mutex lock; QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); @@ -105,8 +105,8 @@ class RingQueuePolicy : public QueuePolicy void dequeued(const QueuedMessage&); bool isEnqueued(const QueuedMessage&); bool checkLimit(boost::intrusive_ptr<Message> msg); + void getPendingDequeues(Messages& result); private: - typedef std::deque<QueuedMessage> Messages; Messages pendingDequeues; Messages queue; const bool strict; diff --git a/cpp/src/tests/ring_queue_test b/cpp/src/tests/ring_queue_test index 5805989d7e..553746eb49 100755 --- a/cpp/src/tests/ring_queue_test +++ b/cpp/src/tests/ring_queue_test @@ -48,7 +48,7 @@ receive() { cleanup() { rm -f sender_${QUEUE_NAME}_* receiver_${QUEUE_NAME}_* - qpid-config $BROKER_URL add queue $QUEUE_NAME + qpid-config $BROKER_URL del queue $QUEUE_NAME --force } log() { @@ -64,10 +64,11 @@ validate() { if [[ $RECEIVERS -eq 0 ]]; then #queue should have $LIMIT messages on it, but need to send an eos also sender --routing-key $QUEUE_NAME --send-eos 1 < /dev/null - if [[ $(receiver --queue $QUEUE_NAME --browse | wc -l) -eq $(( $LIMIT - 1)) ]]; then + received=$(receiver --queue $QUEUE_NAME --browse | wc -l) + if [[ received -eq $(( $LIMIT - 1)) ]]; then log "queue contains $LIMIT messages as expected" else - fail "queue does not contain the expected $LIMIT messages" + fail "queue does not contain the expected $LIMIT messages (received $received)" fi elif [[ $CONCURRENT -eq 0 ]]; then #sum of length of all output files should be equal to $LIMIT - $RECEIVERS (1 eos message each) |