summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-21 14:56:20 +0000
committerAlan Conway <aconway@apache.org>2011-09-21 14:56:20 +0000
commitbca0f185b11399bbab801726393ee5c341b8cf33 (patch)
treef7d03493d3ce0253316f8c61798feba7e3518bc2
parent2b72be0347e8cdc7bf85ed14b6490afc811e363e (diff)
downloadqpid-python-bca0f185b11399bbab801726393ee5c341b8cf33.tar.gz
QPID-2920: Fixed race condition around Queue::listeners.
- moved call to cluster dequeue, no deferred dequeue. - removed unused function broker::Cluster::empty git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1173697 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Cluster.h5
-rw-r--r--qpid/cpp/src/qpid/broker/NullCluster.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp13
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/LockedMap.h12
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp24
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp10
-rw-r--r--qpid/cpp/src/tests/BrokerClusterCalls.cpp5
12 files changed, 42 insertions, 54 deletions
diff --git a/qpid/cpp/src/qpid/broker/Cluster.h b/qpid/cpp/src/qpid/broker/Cluster.h
index cb606ba52d..cc1cd862aa 100644
--- a/qpid/cpp/src/qpid/broker/Cluster.h
+++ b/qpid/cpp/src/qpid/broker/Cluster.h
@@ -82,8 +82,9 @@ class Cluster
virtual void consume(Queue&, size_t consumerCount) = 0;
/** A consumer cancels its subscription to a queue */
virtual void cancel(Queue&, size_t consumerCount) = 0;
- /** A queue becomes empty */
- virtual void empty(Queue&) = 0;
+
+ // Queues
+
/** A queue has been stopped */
virtual void stopped(Queue&) = 0;
diff --git a/qpid/cpp/src/qpid/broker/NullCluster.h b/qpid/cpp/src/qpid/broker/NullCluster.h
index 8e82526ef8..2ca9efaad8 100644
--- a/qpid/cpp/src/qpid/broker/NullCluster.h
+++ b/qpid/cpp/src/qpid/broker/NullCluster.h
@@ -52,7 +52,6 @@ class NullCluster : public Cluster
// Queues
virtual void stopped(Queue&) {}
- virtual void empty(Queue&) {}
// Wiring
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 632e512a32..cda730965d 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -304,14 +304,14 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
+ ClusterAcquireScope acquireScope; // Outside the lock
Stoppable::Scope consumeScope(consuming);
+ Mutex::ScopedLock locker(messageLock);
if (!consumeScope) {
QPID_LOG(trace, "Queue stopped, can't consume: " << name);
listeners.addListener(c);
return NO_MESSAGES;
}
- ClusterAcquireScope acquireScope; // Outside the lock
- Mutex::ScopedLock locker(messageLock);
if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
@@ -701,9 +701,6 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
- // FIXME aconway 2011-09-13: new cluster needs tx/dtx support.
- if (!ctxt && broker) broker->getCluster().dequeue(msg);
-
ScopedUse u(barrier);
if (!u.acquired) return false;
{
@@ -713,6 +710,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
dequeued(msg);
}
}
+ if (!ctxt && broker) broker->getCluster().dequeue(msg); // call outside the lock.
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
@@ -1291,7 +1289,7 @@ void Queue::startConsumers() {
notifyListener();
}
-// Called when all busy threads exitd due to stopConsumers()
+// Called when all busy threads exited after stopConsumers()
void Queue::consumingStopped() {
QPID_LOG(trace, "Stopped consumers on " << getName());
if (broker) broker->getCluster().stopped(*this);
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 6c9c111dbb..150348590b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -21,8 +21,6 @@
* under the License.
*
*/
-#include "qpid/log/Statement.h" // FIXME XXX aconway 2011-06-08: remove
-
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/OwnershipToken.h"
#include "qpid/broker/Consumer.h"
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index eac9854aa1..4a59d83da0 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -51,7 +51,7 @@ using namespace broker;
namespace {
// noReplicate means the current thread is handling a message
-// received from the cluster so it should not be replciated.
+// received from the cluster so it should not be replicated.
QPID_TSS bool tssNoReplicate = false;
// Routing ID of the message being routed in the current thread.
@@ -111,9 +111,6 @@ void BrokerContext::acquire(const broker::QueuedMessage& qm) {
}
void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
- // FIXME aconway 2011-09-15: should dequeue locally immediately
- // instead of waiting for redeliver. No need for CPG order on
- // dequeues.
if (!tssNoReplicate)
core.mcast(ClusterMessageDequeueBody(
ProtocolVersion(), qm.queue->getName(), qm.position));
@@ -177,25 +174,19 @@ void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex,
}
// n is the number of consumers including the one just added.
-// FIXME aconway 2011-06-27: rename, conflicting terms. subscribe?
void BrokerContext::consume(broker::Queue& q, size_t n) {
QueueContext::get(q)->consume(n);
}
-// FIXME aconway 2011-09-13: rename unsubscribe?
// n is the number of consumers after the cancel.
void BrokerContext::cancel(broker::Queue& q, size_t n) {
QueueContext::get(q)->cancel(n);
}
-void BrokerContext::empty(broker::Queue& ) {
- // FIXME aconway 2011-06-28: is this needed?
-}
-
void BrokerContext::stopped(broker::Queue& q) {
boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q);
// Don't forward the stopped call if the queue does not yet have a
- // cluster context this when the queue is first created locally.
+ // cluster context - this when the queue is first created locally.
if (qc) qc->stopped();
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
index ea759eb53c..f0444882a1 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
@@ -66,7 +66,6 @@ class BrokerContext : public broker::Cluster
void cancel(broker::Queue&, size_t);
// Queues
- void empty(broker::Queue&);
void stopped(broker::Queue&);
// Wiring
diff --git a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
index 7294ff767e..cacb9c792c 100644
--- a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
+++ b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
@@ -75,6 +75,18 @@ class LockedMap
return map.erase(key);
}
+ /** Remove and return value associated with key, returns Value() if none. */
+ Value pop(const Key& key) {
+ sys::RWlock::ScopedWlock w(lock);
+ Value value;
+ typename Map::iterator i = map.find(key);
+ if (i != map.end()) {
+ value = i->second;
+ map.erase(i);
+ }
+ return value;
+ }
+
private:
typedef std::map<Key, Value> Map;
Map map;
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index d3c6b763e8..d9fce02d75 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -105,7 +105,8 @@ void MessageHandler::acquire(const std::string& q, uint32_t position) {
assert(qm.payload);
// Save on context for possible requeue if released/rejected.
QueueContext::get(*queue)->acquire(qm);
- // FIXME aconway 2011-09-19: need to record by member-ID to requeue if member leaves.
+ // FIXME aconway 2011-09-19: need to record by member-ID to
+ // requeue if member leaves.
}
}
@@ -118,12 +119,11 @@ void MessageHandler::dequeue(const std::string& q, uint32_t position) {
// complete the ack that initiated the dequeue at this point, see
// BrokerContext::dequeue
+ // My own dequeues were processed in the connection thread before multicasting.
if (sender() != self()) {
boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
- // Remove fom the unacked list
- QueueContext::get(*queue)->dequeue(position);
+ QueuedMessage qm = QueueContext::get(*queue)->dequeue(position);
BrokerContext::ScopedSuppressReplication ssr;
- QueuedMessage qm = queue->find(position);
if (qm.queue) queue->dequeue(0, qm);
}
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index de7109e131..fc4f6d7bf8 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -37,8 +37,7 @@ namespace cluster {
// FIXME aconway 2011-09-16: configurable timeout.
QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
- : ownership(UNSUBSCRIBED),
- timer(boost::bind(&QueueContext::timeout, this),
+ : timer(boost::bind(&QueueContext::timeout, this),
q.getBroker()->getTimer(),
100*sys::TIME_MSEC),
queue(q), mcast(m), consumers(0)
@@ -53,7 +52,8 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
}
// Called by QueueReplica in CPG deliver thread when state changes.
-void QueueContext::replicaState(QueueOwnership newOwnership) {
+void QueueContext::replicaState(QueueOwnership before, QueueOwnership after) {
+ assert(before != after);
// Invariants for ownership:
// UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped
@@ -61,16 +61,11 @@ void QueueContext::replicaState(QueueOwnership newOwnership) {
// SHARED_OWNER <=> timer started, queue started
sys::Mutex::ScopedLock l(lock);
- QueueOwnership before = ownership;
- QueueOwnership after = newOwnership;
- assert(before != after);
- ownership = newOwnership;
-
if (!isOwner(before) && isOwner(after)) { // Took ownership
queue.startConsumers();
if (after == SHARED_OWNER) timer.start();
}
- else if (isOwner(before) && isOwner(after) && before != after) {
+ else if (isOwner(before) && isOwner(after)) {
// Changed from shared to sole owner or vice versa
if (after == SOLE_OWNER) timer.stop();
else timer.start();
@@ -78,7 +73,8 @@ void QueueContext::replicaState(QueueOwnership newOwnership) {
// If we lost ownership then the queue and timer will already have
// been stopped by timeout()
}
-// FIXME aconway 2011-07-27: Dont spin the token on an empty or idle queue.
+
+// FIXME aconway 2011-07-27: Dont spin the token on an empty queue.
// Called in connection threads when a consumer is added
void QueueContext::consume(size_t n) {
@@ -95,13 +91,12 @@ void QueueContext::cancel(size_t n) {
// When consuming threads are stopped, this->stopped will be called.
if (n == 0) {
timer.stop();
- queue.stopConsumers(); // FIXME aconway 2011-07-28: Ok inside lock?
+ queue.stopConsumers();
}
}
// Called in timer thread.
void QueueContext::timeout() {
- QPID_LOG(debug, "FIXME QueueContext::timeout");
// When all threads have stopped, queue will call stopped()
queue.stopConsumers();
}
@@ -109,7 +104,6 @@ void QueueContext::timeout() {
// Callback set up by queue.stopConsumers() called in connection thread.
// Called when no threads are dispatching from the queue.
void QueueContext::stopped() {
- QPID_LOG(debug, "FIXME QueueContext::stopped");
sys::Mutex::ScopedLock l(lock);
if (consumers == 0)
mcast.mcast(framing::ClusterQueueUnsubscribeBody(
@@ -134,8 +128,8 @@ void QueueContext::acquire(const broker::QueuedMessage& qm) {
unacked.put(qm.position, qm);
}
-void QueueContext::dequeue(uint32_t position) {
- unacked.erase(position);
+broker::QueuedMessage QueueContext::dequeue(uint32_t position) {
+ return unacked.pop(position);
}
boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
index 54bc81b175..d0e68641d9 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -55,7 +55,7 @@ class QueueContext : public RefCounted {
~QueueContext();
/** Replica state has changed, called in deliver thread. */
- void replicaState(QueueOwnership);
+ void replicaState(QueueOwnership before, QueueOwnership after);
/** Called when queue is stopped, no threads are dispatching.
* May be called in connection or deliver thread.
@@ -85,11 +85,10 @@ class QueueContext : public RefCounted {
void acquire(const broker::QueuedMessage& qm);
/** Called by MesageHandler when a message is dequeued. */
- void dequeue(uint32_t position);
+ broker::QueuedMessage dequeue(uint32_t position);
private:
sys::Mutex lock;
- QueueOwnership ownership;
CountdownTimer timer;
broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr?
Multicaster& mcast;
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index 013e50a175..ffa6716536 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -56,6 +56,7 @@ void QueueReplica::subscribe(const MemberId& member) {
update(before);
}
+// FIXME aconway 2011-09-20: need to requeue.
void QueueReplica::unsubscribe(const MemberId& member) {
QueueOwnership before = getState();
MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
@@ -76,9 +77,11 @@ void QueueReplica::resubscribe(const MemberId& member) {
void QueueReplica::update(QueueOwnership before) {
QueueOwnership after = getState();
- QPID_LOG(trace, "cluster queue replica: " << queue->getName() << ": "
- << before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]");
- if (before != after) context->replicaState(after);
+ if (before != after) {
+ QPID_LOG(trace, "cluster queue replica: " << queue->getName() << ": "
+ << before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]");
+ context->replicaState(before, after);
+ }
}
QueueOwnership QueueReplica::getState() const {
@@ -93,7 +96,6 @@ bool QueueReplica::isOwner() const {
}
bool QueueReplica::isSubscriber(const MemberId& member) const {
- // FIXME aconway 2011-06-27: linear search here, is it a performance issue?
return std::find(subscribers.begin(), subscribers.end(), member) != subscribers.end();
}
diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
index db2dd59579..ca44c2377c 100644
--- a/qpid/cpp/src/tests/BrokerClusterCalls.cpp
+++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
@@ -108,8 +108,6 @@ class DummyCluster : public broker::Cluster
}
// Queues
- // FIXME aconway 2011-05-18: update test to exercise empty()
- virtual void empty(broker::Queue& q) { recordStr("empty", q.getName()); }
virtual void stopped(broker::Queue& q) { recordStr("stopped", q.getName()); }
// Wiring
@@ -249,9 +247,6 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
- // FIXME aconway 2011-07-25: empty called once per receiver?
- BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
- BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
BOOST_CHECK_EQUAL(h.size(), i);
// Message replaced on LVQ