diff options
author | Alan Conway <aconway@apache.org> | 2011-09-21 14:56:20 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-09-21 14:56:20 +0000 |
commit | bca0f185b11399bbab801726393ee5c341b8cf33 (patch) | |
tree | f7d03493d3ce0253316f8c61798feba7e3518bc2 | |
parent | 2b72be0347e8cdc7bf85ed14b6490afc811e363e (diff) | |
download | qpid-python-bca0f185b11399bbab801726393ee5c341b8cf33.tar.gz |
QPID-2920: Fixed race condition around Queue::listeners.
- moved call to cluster dequeue, no deferred dequeue.
- removed unused function broker::Cluster::empty
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1173697 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Cluster.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/NullCluster.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/BrokerContext.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/LockedMap.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/tests/BrokerClusterCalls.cpp | 5 |
12 files changed, 42 insertions, 54 deletions
diff --git a/qpid/cpp/src/qpid/broker/Cluster.h b/qpid/cpp/src/qpid/broker/Cluster.h index cb606ba52d..cc1cd862aa 100644 --- a/qpid/cpp/src/qpid/broker/Cluster.h +++ b/qpid/cpp/src/qpid/broker/Cluster.h @@ -82,8 +82,9 @@ class Cluster virtual void consume(Queue&, size_t consumerCount) = 0; /** A consumer cancels its subscription to a queue */ virtual void cancel(Queue&, size_t consumerCount) = 0; - /** A queue becomes empty */ - virtual void empty(Queue&) = 0; + + // Queues + /** A queue has been stopped */ virtual void stopped(Queue&) = 0; diff --git a/qpid/cpp/src/qpid/broker/NullCluster.h b/qpid/cpp/src/qpid/broker/NullCluster.h index 8e82526ef8..2ca9efaad8 100644 --- a/qpid/cpp/src/qpid/broker/NullCluster.h +++ b/qpid/cpp/src/qpid/broker/NullCluster.h @@ -52,7 +52,6 @@ class NullCluster : public Cluster // Queues virtual void stopped(Queue&) {} - virtual void empty(Queue&) {} // Wiring diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 632e512a32..cda730965d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -304,14 +304,14 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { + ClusterAcquireScope acquireScope; // Outside the lock Stoppable::Scope consumeScope(consuming); + Mutex::ScopedLock locker(messageLock); if (!consumeScope) { QPID_LOG(trace, "Queue stopped, can't consume: " << name); listeners.addListener(c); return NO_MESSAGES; } - ClusterAcquireScope acquireScope; // Outside the lock - Mutex::ScopedLock locker(messageLock); if (messages->empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); @@ -701,9 +701,6 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { - // FIXME aconway 2011-09-13: new cluster needs tx/dtx support. - if (!ctxt && broker) broker->getCluster().dequeue(msg); - ScopedUse u(barrier); if (!u.acquired) return false; { @@ -713,6 +710,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) dequeued(msg); } } + if (!ctxt && broker) broker->getCluster().dequeue(msg); // call outside the lock. // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. @@ -1291,7 +1289,7 @@ void Queue::startConsumers() { notifyListener(); } -// Called when all busy threads exitd due to stopConsumers() +// Called when all busy threads exited after stopConsumers() void Queue::consumingStopped() { QPID_LOG(trace, "Stopped consumers on " << getName()); if (broker) broker->getCluster().stopped(*this); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 6c9c111dbb..150348590b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -21,8 +21,6 @@ * under the License. * */ -#include "qpid/log/Statement.h" // FIXME XXX aconway 2011-06-08: remove - #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/OwnershipToken.h" #include "qpid/broker/Consumer.h" diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index eac9854aa1..4a59d83da0 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -51,7 +51,7 @@ using namespace broker; namespace { // noReplicate means the current thread is handling a message -// received from the cluster so it should not be replciated. +// received from the cluster so it should not be replicated. QPID_TSS bool tssNoReplicate = false; // Routing ID of the message being routed in the current thread. @@ -111,9 +111,6 @@ void BrokerContext::acquire(const broker::QueuedMessage& qm) { } void BrokerContext::dequeue(const broker::QueuedMessage& qm) { - // FIXME aconway 2011-09-15: should dequeue locally immediately - // instead of waiting for redeliver. No need for CPG order on - // dequeues. if (!tssNoReplicate) core.mcast(ClusterMessageDequeueBody( ProtocolVersion(), qm.queue->getName(), qm.position)); @@ -177,25 +174,19 @@ void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex, } // n is the number of consumers including the one just added. -// FIXME aconway 2011-06-27: rename, conflicting terms. subscribe? void BrokerContext::consume(broker::Queue& q, size_t n) { QueueContext::get(q)->consume(n); } -// FIXME aconway 2011-09-13: rename unsubscribe? // n is the number of consumers after the cancel. void BrokerContext::cancel(broker::Queue& q, size_t n) { QueueContext::get(q)->cancel(n); } -void BrokerContext::empty(broker::Queue& ) { - // FIXME aconway 2011-06-28: is this needed? -} - void BrokerContext::stopped(broker::Queue& q) { boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q); // Don't forward the stopped call if the queue does not yet have a - // cluster context this when the queue is first created locally. + // cluster context - this when the queue is first created locally. if (qc) qc->stopped(); } diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h index ea759eb53c..f0444882a1 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h @@ -66,7 +66,6 @@ class BrokerContext : public broker::Cluster void cancel(broker::Queue&, size_t); // Queues - void empty(broker::Queue&); void stopped(broker::Queue&); // Wiring diff --git a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h index 7294ff767e..cacb9c792c 100644 --- a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h +++ b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h @@ -75,6 +75,18 @@ class LockedMap return map.erase(key); } + /** Remove and return value associated with key, returns Value() if none. */ + Value pop(const Key& key) { + sys::RWlock::ScopedWlock w(lock); + Value value; + typename Map::iterator i = map.find(key); + if (i != map.end()) { + value = i->second; + map.erase(i); + } + return value; + } + private: typedef std::map<Key, Value> Map; Map map; diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index d3c6b763e8..d9fce02d75 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -105,7 +105,8 @@ void MessageHandler::acquire(const std::string& q, uint32_t position) { assert(qm.payload); // Save on context for possible requeue if released/rejected. QueueContext::get(*queue)->acquire(qm); - // FIXME aconway 2011-09-19: need to record by member-ID to requeue if member leaves. + // FIXME aconway 2011-09-19: need to record by member-ID to + // requeue if member leaves. } } @@ -118,12 +119,11 @@ void MessageHandler::dequeue(const std::string& q, uint32_t position) { // complete the ack that initiated the dequeue at this point, see // BrokerContext::dequeue + // My own dequeues were processed in the connection thread before multicasting. if (sender() != self()) { boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed"); - // Remove fom the unacked list - QueueContext::get(*queue)->dequeue(position); + QueuedMessage qm = QueueContext::get(*queue)->dequeue(position); BrokerContext::ScopedSuppressReplication ssr; - QueuedMessage qm = queue->find(position); if (qm.queue) queue->dequeue(0, qm); } } diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index de7109e131..fc4f6d7bf8 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -37,8 +37,7 @@ namespace cluster { // FIXME aconway 2011-09-16: configurable timeout. QueueContext::QueueContext(broker::Queue& q, Multicaster& m) - : ownership(UNSUBSCRIBED), - timer(boost::bind(&QueueContext::timeout, this), + : timer(boost::bind(&QueueContext::timeout, this), q.getBroker()->getTimer(), 100*sys::TIME_MSEC), queue(q), mcast(m), consumers(0) @@ -53,7 +52,8 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; } } // Called by QueueReplica in CPG deliver thread when state changes. -void QueueContext::replicaState(QueueOwnership newOwnership) { +void QueueContext::replicaState(QueueOwnership before, QueueOwnership after) { + assert(before != after); // Invariants for ownership: // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped @@ -61,16 +61,11 @@ void QueueContext::replicaState(QueueOwnership newOwnership) { // SHARED_OWNER <=> timer started, queue started sys::Mutex::ScopedLock l(lock); - QueueOwnership before = ownership; - QueueOwnership after = newOwnership; - assert(before != after); - ownership = newOwnership; - if (!isOwner(before) && isOwner(after)) { // Took ownership queue.startConsumers(); if (after == SHARED_OWNER) timer.start(); } - else if (isOwner(before) && isOwner(after) && before != after) { + else if (isOwner(before) && isOwner(after)) { // Changed from shared to sole owner or vice versa if (after == SOLE_OWNER) timer.stop(); else timer.start(); @@ -78,7 +73,8 @@ void QueueContext::replicaState(QueueOwnership newOwnership) { // If we lost ownership then the queue and timer will already have // been stopped by timeout() } -// FIXME aconway 2011-07-27: Dont spin the token on an empty or idle queue. + +// FIXME aconway 2011-07-27: Dont spin the token on an empty queue. // Called in connection threads when a consumer is added void QueueContext::consume(size_t n) { @@ -95,13 +91,12 @@ void QueueContext::cancel(size_t n) { // When consuming threads are stopped, this->stopped will be called. if (n == 0) { timer.stop(); - queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock? + queue.stopConsumers(); } } // Called in timer thread. void QueueContext::timeout() { - QPID_LOG(debug, "FIXME QueueContext::timeout"); // When all threads have stopped, queue will call stopped() queue.stopConsumers(); } @@ -109,7 +104,6 @@ void QueueContext::timeout() { // Callback set up by queue.stopConsumers() called in connection thread. // Called when no threads are dispatching from the queue. void QueueContext::stopped() { - QPID_LOG(debug, "FIXME QueueContext::stopped"); sys::Mutex::ScopedLock l(lock); if (consumers == 0) mcast.mcast(framing::ClusterQueueUnsubscribeBody( @@ -134,8 +128,8 @@ void QueueContext::acquire(const broker::QueuedMessage& qm) { unacked.put(qm.position, qm); } -void QueueContext::dequeue(uint32_t position) { - unacked.erase(position); +broker::QueuedMessage QueueContext::dequeue(uint32_t position) { + return unacked.pop(position); } boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) { diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h index 54bc81b175..d0e68641d9 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -55,7 +55,7 @@ class QueueContext : public RefCounted { ~QueueContext(); /** Replica state has changed, called in deliver thread. */ - void replicaState(QueueOwnership); + void replicaState(QueueOwnership before, QueueOwnership after); /** Called when queue is stopped, no threads are dispatching. * May be called in connection or deliver thread. @@ -85,11 +85,10 @@ class QueueContext : public RefCounted { void acquire(const broker::QueuedMessage& qm); /** Called by MesageHandler when a message is dequeued. */ - void dequeue(uint32_t position); + broker::QueuedMessage dequeue(uint32_t position); private: sys::Mutex lock; - QueueOwnership ownership; CountdownTimer timer; broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr? Multicaster& mcast; diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp index 013e50a175..ffa6716536 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp @@ -56,6 +56,7 @@ void QueueReplica::subscribe(const MemberId& member) { update(before); } +// FIXME aconway 2011-09-20: need to requeue. void QueueReplica::unsubscribe(const MemberId& member) { QueueOwnership before = getState(); MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member); @@ -76,9 +77,11 @@ void QueueReplica::resubscribe(const MemberId& member) { void QueueReplica::update(QueueOwnership before) { QueueOwnership after = getState(); - QPID_LOG(trace, "cluster queue replica: " << queue->getName() << ": " - << before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]"); - if (before != after) context->replicaState(after); + if (before != after) { + QPID_LOG(trace, "cluster queue replica: " << queue->getName() << ": " + << before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]"); + context->replicaState(before, after); + } } QueueOwnership QueueReplica::getState() const { @@ -93,7 +96,6 @@ bool QueueReplica::isOwner() const { } bool QueueReplica::isSubscriber(const MemberId& member) const { - // FIXME aconway 2011-06-27: linear search here, is it a performance issue? return std::find(subscribers.begin(), subscribers.end(), member) != subscribers.end(); } diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp index db2dd59579..ca44c2377c 100644 --- a/qpid/cpp/src/tests/BrokerClusterCalls.cpp +++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp @@ -108,8 +108,6 @@ class DummyCluster : public broker::Cluster } // Queues - // FIXME aconway 2011-05-18: update test to exercise empty() - virtual void empty(broker::Queue& q) { recordStr("empty", q.getName()); } virtual void stopped(broker::Queue& q) { recordStr("stopped", q.getName()); } // Wiring @@ -249,9 +247,6 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); - // FIXME aconway 2011-07-25: empty called once per receiver? - BOOST_CHECK_EQUAL(h.at(i++), "empty(q)"); - BOOST_CHECK_EQUAL(h.at(i++), "empty(q)"); BOOST_CHECK_EQUAL(h.size(), i); // Message replaced on LVQ |