diff options
author | Alan Conway <aconway@apache.org> | 2013-04-26 17:28:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-04-26 17:28:26 +0000 |
commit | fd4e9e7748e89830f5fa96a83fdc054d7aba5380 (patch) | |
tree | 437375b544abaf1d8e50620a2ece18971092051f | |
parent | fe25b989382136eedd8a5869858e08c65dd2bf93 (diff) | |
download | qpid-python-fd4e9e7748e89830f5fa96a83fdc054d7aba5380.tar.gz |
QPID-4780: Bug 889552 - HA broker deadlock after loss of primary broker.
Lock ordering deadlock found by inspection of code and stack trace:
- thread 1: Link::ioThreadProcessing(Link:lock)-> QueueReplicator::initializeBridge(QueueReplicator::lock)
- thread 2: QueueReplicator::destroy(QueueReplicator::lock)-> Bridge::destroy(Link::lock)
This patch breaks the lock by removing locking around Bridge::destroy in QueueReplicator::destroy.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1476305 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 55 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 3 |
3 files changed, 37 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 076bcac63f..f461a2f0e0 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -71,7 +71,6 @@ using namespace broker; namespace { const string QPID_CONFIGURATION_REPLICATOR("qpid.broker-replicator"); - const string CLASS_NAME("_class_name"); const string EVENT("_event"); const string OBJECT_NAME("_object_name"); @@ -291,9 +290,9 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& link(l), initialized(false), alternates(hb.getBroker().getExchanges()), - connection(0) + connection(0), + connectionObserver(new ConnectionObserver(*this)) { - connectionObserver.reset(new ConnectionObserver(*this)); broker.getConnectionObservers().add(connectionObserver); framing::FieldTable args = getArgs(); args.setString(QPID_REPLICATE, printable(NONE).str()); @@ -761,8 +760,6 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator( if (replicationTest.getLevel(*queue) == ALL) { boost::shared_ptr<QueueReplicator> qr( new QueueReplicator(haBroker, queue, link)); - if (!exchanges.registerExchange(qr)) - throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); return qr; } @@ -879,6 +876,7 @@ namespace { } } +// Called by ConnectionObserver::disconnected, disconnected from the network side. void BrokerReplicator::disconnected() { QPID_LOG(info, logPrefix << "Disconnected from " << primary); connection = 0; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index dece9dd045..3580c49826 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -109,7 +109,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb, haBroker(hb), logPrefix("Backup queue "+q->getName()+": "), queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false), - settings(hb.getSettings()) + settings(hb.getSettings()), destroyed(false) { args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); @@ -119,10 +119,17 @@ QueueReplicator::QueueReplicator(HaBroker& hb, setArgs(args); } -// This must be separate from the constructor so we can call shared_from_this. +// This must be called immediately after the constructor. +// It has to be separate so we can call shared_from_this(). void QueueReplicator::activate() { Mutex::ScopedLock l(lock); if (!queue) return; // Already destroyed + + // Enable callback to route() + if (!getBroker()->getExchanges().registerExchange(shared_from_this())) + throw Exception(QPID_MSG("Duplicate queue replicator " << getName())); + + // Enable callback to initializeBridge std::pair<Bridge::shared_ptr, bool> result = queue->getBroker()->getLinks().declare( bridgeName, @@ -145,29 +152,37 @@ void QueueReplicator::activate() { bridge = result.first; bridge->setErrorListener( boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix))); - boost::shared_ptr<QueueObserver> observer(new QueueObserver(shared_from_this())); - queue->addObserver(observer); + + // Enable callback to destroy() + queue->addObserver( + boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this()))); } QueueReplicator::~QueueReplicator() {} +// Called from Queue::destroyed() void QueueReplicator::destroy() { - // Called from Queue::destroyed() - Mutex::ScopedLock l(lock); - if (!bridge) return; - QPID_LOG(debug, logPrefix << "Destroyed."); - bridge->close(); - // Need to drop shared pointers to avoid pointer cycles keeping this in memory. - queue.reset(); - link.reset(); - bridge.reset(); - getBroker()->getExchanges().destroy(getName()); + boost::shared_ptr<Bridge> bridge2; // To call outside of lock + { + Mutex::ScopedLock l(lock); + if (destroyed) return; + destroyed = true; + QPID_LOG(debug, logPrefix << "Destroyed."); + // Need to drop shared pointers to avoid pointer cycles keeping this in memory. + queue.reset(); + link.reset(); + bridge.reset(); + getBroker()->getExchanges().destroy(getName()); + bridge2 = bridge; + } + if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock. } // Called in a broker connection thread when the bridge is created. +// Note: called with the Link lock held. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { Mutex::ScopedLock l(lock); - if (!queue) return; // Already destroyed + if (destroyed) return; // Already destroyed AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); FieldTable arguments; @@ -207,13 +222,7 @@ template <class T> T decodeContent(Message& m) { } void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) { - boost::shared_ptr<Queue> q; - { - Mutex::ScopedLock l(lock); - if (!queue) return; // Already destroyed - q = queue; - } - // Thread safe: only calls thread safe Queue functions. + if (destroyed) return; queue->dequeueMessageAt(n); } @@ -234,7 +243,7 @@ void QueueReplicator::route(Deliverable& msg) try { const std::string& key = msg.getMessage().getRoutingKey(); Mutex::ScopedLock l(lock); - if (!queue) return; // Already destroyed + if (destroyed) return; if (!isEventKey(key)) { msg.deliverTo(queue); // We are on a backup so the queue is not modified except via this. diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 757605a23a..7f0fa52480 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -72,7 +72,7 @@ class QueueReplicator : public broker::Exchange, ~QueueReplicator(); - void activate(); // Call after ctor + void activate(); // Must be called immediately after constructor. std::string getType() const; bool bind(boost::shared_ptr<broker::Queue @@ -105,6 +105,7 @@ class QueueReplicator : public broker::Exchange, BrokerInfo brokerInfo; bool subscribed; const Settings& settings; + bool destroyed; }; }} // namespace qpid::ha |