summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-21 19:10:52 +0000
committerAlan Conway <aconway@apache.org>2011-09-21 19:10:52 +0000
commitf062bb3306834b22143d9adada4310ed5989928b (patch)
tree750234a89c0481d4d6c0c1b140484ee2b5c90591
parentbca0f185b11399bbab801726393ee5c341b8cf33 (diff)
downloadqpid-python-f062bb3306834b22143d9adada4310ed5989928b.tar.gz
QPID-2920: Fix deadlock in QueueContext/QueueContext
Deadlock between to brokers occured if a SHARED_OWNER broker sent a resubscribe, then the other broker left making the remaining broker SOLE_OWNER. Previous logic ignored the SOLE_OWNER -> SOLE_OWNER transition. Fixed several other minor bugs showing up in make check. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1173796 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp1
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp22
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp27
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.h2
-rw-r--r--qpid/cpp/src/tests/BrokerClusterCalls.cpp17
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp4
8 files changed, 42 insertions, 42 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index cda730965d..298c8b8cd2 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -246,7 +246,8 @@ struct ClusterAcquireScope {
ClusterAcquireScope() {}
~ClusterAcquireScope() {
- if (qmsg.queue) qmsg.queue->getBroker()->getCluster().acquire(qmsg);
+ if (qmsg.queue && qmsg.queue->getBroker())
+ qmsg.queue->getBroker()->getCluster().acquire(qmsg);
}
};
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index 4a59d83da0..22b7f8f97c 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -127,7 +127,6 @@ void BrokerContext::requeue(const broker::QueuedMessage& qm) {
// FIXME aconway 2011-06-08: should be be using shared_ptr to q here?
void BrokerContext::create(broker::Queue& q) {
- q.stopConsumers(); // Stop queue initially.
if (tssNoReplicate) return;
assert(!QueueContext::get(q));
boost::intrusive_ptr<QueueContext> context(
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index fc4f6d7bf8..fa36b9225d 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -20,8 +20,8 @@
*/
#include "QueueContext.h"
-
#include "Multicaster.h"
+#include "qpid/cluster/types.h"
#include "BrokerContext.h" // for ScopedSuppressReplication
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/ClusterQueueResubscribeBody.h"
@@ -43,6 +43,7 @@ QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
queue(q), mcast(m), consumers(0)
{
q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
+ q.stopConsumers(); // Stop queue initially.
}
QueueContext::~QueueContext() {}
@@ -52,24 +53,23 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
}
// Called by QueueReplica in CPG deliver thread when state changes.
-void QueueContext::replicaState(QueueOwnership before, QueueOwnership after) {
- assert(before != after);
-
+void QueueContext::replicaState(
+ QueueOwnership before, QueueOwnership after, bool selfDelivered)
+{
// Invariants for ownership:
// UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped
// SOLE_OWNER <=> timer stopped, queue started
// SHARED_OWNER <=> timer started, queue started
- sys::Mutex::ScopedLock l(lock);
- if (!isOwner(before) && isOwner(after)) { // Took ownership
+ // Interested in state changes and my own events which lead to
+ // ownership.
+ if ((before != after || selfDelivered) && isOwner(after)) {
+ sys::Mutex::ScopedLock l(lock);
queue.startConsumers();
if (after == SHARED_OWNER) timer.start();
+ else timer.stop();
}
- else if (isOwner(before) && isOwner(after)) {
- // Changed from shared to sole owner or vice versa
- if (after == SOLE_OWNER) timer.stop();
- else timer.start();
- }
+
// If we lost ownership then the queue and timer will already have
// been stopped by timeout()
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
index d0e68641d9..8ed54596a8 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -54,8 +54,12 @@ class QueueContext : public RefCounted {
QueueContext(broker::Queue& q, Multicaster& m);
~QueueContext();
- /** Replica state has changed, called in deliver thread. */
- void replicaState(QueueOwnership before, QueueOwnership after);
+ /** Replica state has changed, called in deliver thread.
+ * @param before replica state before the event.
+ * @param before replica state after the event.
+ * @param self is true if this was a self-delivered event.
+ */
+ void replicaState(QueueOwnership before, QueueOwnership after, bool self);
/** Called when queue is stopped, no threads are dispatching.
* May be called in connection or deliver thread.
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index ffa6716536..41b76f23a3 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -53,35 +53,30 @@ std::ostream& operator<<(std::ostream& o, QueueOwnership s) {
void QueueReplica::subscribe(const MemberId& member) {
QueueOwnership before = getState();
subscribers.push_back(member);
- update(before);
+ update(before, member);
}
// FIXME aconway 2011-09-20: need to requeue.
void QueueReplica::unsubscribe(const MemberId& member) {
QueueOwnership before = getState();
MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
- if (i != subscribers.end()) {
- subscribers.erase(i, subscribers.end());
- update(before);
- }
+ if (i != subscribers.end()) subscribers.erase(i, subscribers.end());
+ update(before, member);
}
void QueueReplica::resubscribe(const MemberId& member) {
- if (member == subscribers.front()) { // FIXME aconway 2011-09-13: should be assert?
- QueueOwnership before = getState();
- subscribers.pop_front();
- subscribers.push_back(member);
- update(before);
- }
+ assert (member == subscribers.front());
+ QueueOwnership before = getState();
+ subscribers.pop_front();
+ subscribers.push_back(member);
+ update(before, member);
}
-void QueueReplica::update(QueueOwnership before) {
+void QueueReplica::update(QueueOwnership before, MemberId member) {
QueueOwnership after = getState();
- if (before != after) {
- QPID_LOG(trace, "cluster queue replica: " << queue->getName() << ": "
+ QPID_LOG(trace, "cluster queue replica: " << queue->getName() << ": "
<< before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]");
- context->replicaState(before, after);
- }
+ context->replicaState(before, after, member == self);
}
QueueOwnership QueueReplica::getState() const {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
index 31faf4853a..ee93727ca9 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
@@ -68,7 +68,7 @@ class QueueReplica : public RefCounted
QueueOwnership getState() const;
bool isOwner() const;
bool isSubscriber(const MemberId&) const;
- void update(QueueOwnership before);
+ void update(QueueOwnership before, MemberId from);
friend struct PrintSubscribers;
friend std::ostream& operator<<(std::ostream&, QueueOwnership);
diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
index ca44c2377c..45f4787685 100644
--- a/qpid/cpp/src/tests/BrokerClusterCalls.cpp
+++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
@@ -91,10 +91,10 @@ class DummyCluster : public broker::Cluster
virtual void acquire(const broker::QueuedMessage& qm) {
if (!isRouting) recordQm("acquire", qm);
}
- virtual void release(const broker::QueuedMessage& qm) {
- if (!isRouting) recordQm("release", qm);
+ virtual void requeue(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("requeue", qm);
}
- virtual bool dequeue(const broker::QueuedMessage& qm) {
+ virtual void dequeue(const broker::QueuedMessage& qm) {
if (!isRouting) recordQm("dequeue", qm);
}
@@ -190,7 +190,7 @@ QPID_AUTO_TEST_CASE(testSimplePubSub) {
BOOST_CHECK_EQUAL(h.size(), i);
}
-QPID_AUTO_TEST_CASE(testReleaseReject) {
+QPID_AUTO_TEST_CASE(testRequeueReject) {
DummyClusterFixture f;
vector<string>& h = f.dc->history;
@@ -201,14 +201,14 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
Message m = receiver.fetch(Duration::SECOND);
h.clear();
- // Explicit release
+ // Explicit requeue
f.s.release(m);
f.s.sync();
size_t i = 0;
- BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "requeue(q, 1, a)");
BOOST_CHECK_EQUAL(h.size(), i);
- // Implicit release on closing connection.
+ // Implicit requeue on closing connection.
Connection c("localhost:"+lexical_cast<string>(f.getPort()));
c.open();
Session s = c.createSession();
@@ -218,7 +218,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
i = 0;
c.close();
BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)");
- BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "requeue(q, 1, a)");
BOOST_CHECK_EQUAL(h.size(), i);
// Reject message, goes to alternate exchange.
@@ -376,6 +376,7 @@ QPID_AUTO_TEST_CASE(testRingQueue) {
}
QPID_AUTO_TEST_CASE(testTransactions) {
+ return; // Test disabled till transactions are supported.
DummyClusterFixture f;
vector<string>& h = f.dc->history;
Session ts = f.c.createTransactionalSession();
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index ac02ad38ad..acf13727a7 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -1132,13 +1132,13 @@ QPID_AUTO_TEST_CASE(testStopStart) {
BOOST_CHECK(c->received);
c->reset();
// Stop q, should not receive message
- q->stop();
+ q->stopConsumers();
q->deliver(m);
BOOST_CHECK(!q->dispatch(c));
BOOST_CHECK(!c->received);
BOOST_CHECK(!c->notified);
// Start q, should be notified and delivered
- q->start();
+ q->startConsumers();
q->deliver(m);
BOOST_CHECK(c->notified);
BOOST_CHECK(q->dispatch(c));