summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-04-26 17:28:26 +0000
committerAlan Conway <aconway@apache.org>2013-04-26 17:28:26 +0000
commitfd4e9e7748e89830f5fa96a83fdc054d7aba5380 (patch)
tree437375b544abaf1d8e50620a2ece18971092051f
parentfe25b989382136eedd8a5869858e08c65dd2bf93 (diff)
downloadqpid-python-fd4e9e7748e89830f5fa96a83fdc054d7aba5380.tar.gz
QPID-4780: Bug 889552 - HA broker deadlock after loss of primary broker.
Lock ordering deadlock found by inspection of code and stack trace: - thread 1: Link::ioThreadProcessing(Link:lock)-> QueueReplicator::initializeBridge(QueueReplicator::lock) - thread 2: QueueReplicator::destroy(QueueReplicator::lock)-> Bridge::destroy(Link::lock) This patch breaks the lock by removing locking around Bridge::destroy in QueueReplicator::destroy. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1476305 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp55
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h3
3 files changed, 37 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 076bcac63f..f461a2f0e0 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -71,7 +71,6 @@ using namespace broker;
namespace {
const string QPID_CONFIGURATION_REPLICATOR("qpid.broker-replicator");
-
const string CLASS_NAME("_class_name");
const string EVENT("_event");
const string OBJECT_NAME("_object_name");
@@ -291,9 +290,9 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>&
link(l),
initialized(false),
alternates(hb.getBroker().getExchanges()),
- connection(0)
+ connection(0),
+ connectionObserver(new ConnectionObserver(*this))
{
- connectionObserver.reset(new ConnectionObserver(*this));
broker.getConnectionObservers().add(connectionObserver);
framing::FieldTable args = getArgs();
args.setString(QPID_REPLICATE, printable(NONE).str());
@@ -761,8 +760,6 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
if (replicationTest.getLevel(*queue) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
new QueueReplicator(haBroker, queue, link));
- if (!exchanges.registerExchange(qr))
- throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
return qr;
}
@@ -879,6 +876,7 @@ namespace {
}
}
+// Called by ConnectionObserver::disconnected, disconnected from the network side.
void BrokerReplicator::disconnected() {
QPID_LOG(info, logPrefix << "Disconnected from " << primary);
connection = 0;
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index dece9dd045..3580c49826 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -109,7 +109,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
haBroker(hb),
logPrefix("Backup queue "+q->getName()+": "),
queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
- settings(hb.getSettings())
+ settings(hb.getSettings()), destroyed(false)
{
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
@@ -119,10 +119,17 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
setArgs(args);
}
-// This must be separate from the constructor so we can call shared_from_this.
+// This must be called immediately after the constructor.
+// It has to be separate so we can call shared_from_this().
void QueueReplicator::activate() {
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
+
+ // Enable callback to route()
+ if (!getBroker()->getExchanges().registerExchange(shared_from_this()))
+ throw Exception(QPID_MSG("Duplicate queue replicator " << getName()));
+
+ // Enable callback to initializeBridge
std::pair<Bridge::shared_ptr, bool> result =
queue->getBroker()->getLinks().declare(
bridgeName,
@@ -145,29 +152,37 @@ void QueueReplicator::activate() {
bridge = result.first;
bridge->setErrorListener(
boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
- boost::shared_ptr<QueueObserver> observer(new QueueObserver(shared_from_this()));
- queue->addObserver(observer);
+
+ // Enable callback to destroy()
+ queue->addObserver(
+ boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this())));
}
QueueReplicator::~QueueReplicator() {}
+// Called from Queue::destroyed()
void QueueReplicator::destroy() {
- // Called from Queue::destroyed()
- Mutex::ScopedLock l(lock);
- if (!bridge) return;
- QPID_LOG(debug, logPrefix << "Destroyed.");
- bridge->close();
- // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
- queue.reset();
- link.reset();
- bridge.reset();
- getBroker()->getExchanges().destroy(getName());
+ boost::shared_ptr<Bridge> bridge2; // To call outside of lock
+ {
+ Mutex::ScopedLock l(lock);
+ if (destroyed) return;
+ destroyed = true;
+ QPID_LOG(debug, logPrefix << "Destroyed.");
+ // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
+ queue.reset();
+ link.reset();
+ bridge.reset();
+ getBroker()->getExchanges().destroy(getName());
+ bridge2 = bridge;
+ }
+ if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
}
// Called in a broker connection thread when the bridge is created.
+// Note: called with the Link lock held.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
Mutex::ScopedLock l(lock);
- if (!queue) return; // Already destroyed
+ if (destroyed) return; // Already destroyed
AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable arguments;
@@ -207,13 +222,7 @@ template <class T> T decodeContent(Message& m) {
}
void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) {
- boost::shared_ptr<Queue> q;
- {
- Mutex::ScopedLock l(lock);
- if (!queue) return; // Already destroyed
- q = queue;
- }
- // Thread safe: only calls thread safe Queue functions.
+ if (destroyed) return;
queue->dequeueMessageAt(n);
}
@@ -234,7 +243,7 @@ void QueueReplicator::route(Deliverable& msg)
try {
const std::string& key = msg.getMessage().getRoutingKey();
Mutex::ScopedLock l(lock);
- if (!queue) return; // Already destroyed
+ if (destroyed) return;
if (!isEventKey(key)) {
msg.deliverTo(queue);
// We are on a backup so the queue is not modified except via this.
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 757605a23a..7f0fa52480 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -72,7 +72,7 @@ class QueueReplicator : public broker::Exchange,
~QueueReplicator();
- void activate(); // Call after ctor
+ void activate(); // Must be called immediately after constructor.
std::string getType() const;
bool bind(boost::shared_ptr<broker::Queue
@@ -105,6 +105,7 @@ class QueueReplicator : public broker::Exchange,
BrokerInfo brokerInfo;
bool subscribed;
const Settings& settings;
+ bool destroyed;
};
}} // namespace qpid::ha