diff options
author | Alan Conway <aconway@apache.org> | 2012-07-19 19:16:12 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-07-19 19:16:12 +0000 |
commit | 6af5d7550a52f3bce03f9517bfbdfefa1a90cbe8 (patch) | |
tree | 44d1e5c8e97a689b214381588404acd82230e92d | |
parent | 3ea4a64bb9262287c3b1ea3bf0871c92bbe6efd6 (diff) | |
download | qpid-python-6af5d7550a52f3bce03f9517bfbdfefa1a90cbe8.tar.gz |
QPID-4144 HA broker deadlocks on broker::QueueRegistry lock and ha::Primary lock
Running tests repeatedly, the broker deadlocked with the attached stack trace.
The problem call sequences are:
1. QueueRegistry::destroy takes QueuerRegistry lock > ConfigurationObserver::queueDestroy > ha::Primary::queueDestroy takes Primary lock.
2. ConnectionObserver::opened cals Primary::opened lock> RemoteBackup>getQueues().eachQueue
This patch breaks the deadlock at both ends: QueueRegistry no longer holds the lock across the observer call and Primary does not hold the lock across eachQueue.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1363486 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.h | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 23 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.h | 18 |
5 files changed, 44 insertions, 52 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 2916d7bb93..1401356444 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -79,18 +79,17 @@ QueueRegistry::declare(const string& declareName, bool durable, return result; } -void QueueRegistry::destroyLH (const string& name) { - QueueMap::iterator i = queues.find(name); - if (i != queues.end()) { - Queue::shared_ptr q = i->second; - queues.erase(i); - if (broker) broker->getConfigurationObservers().queueDestroy(q); +void QueueRegistry::destroy(const string& name) { + Queue::shared_ptr q; + { + qpid::sys::RWlock::ScopedWlock locker(lock); + QueueMap::iterator i = queues.find(name); + if (i != queues.end()) { + Queue::shared_ptr q = i->second; + queues.erase(i); + } } -} - -void QueueRegistry::destroy (const string& name){ - RWlock::ScopedWlock locker(lock); - destroyLH (name); + if (broker && q) broker->getConfigurationObservers().queueDestroy(q); } Queue::shared_ptr QueueRegistry::find(const string& name){ diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index f724e6b10c..a354513c5f 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -61,7 +61,7 @@ class QueueRegistry { QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare( const std::string& name, bool durable = false, - bool autodelete = false, + bool autodelete = false, const OwnershipToken* owner = 0, boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(), const qpid::framing::FieldTable& args = framing::FieldTable(), @@ -82,9 +82,8 @@ class QueueRegistry { QPID_BROKER_EXTERN void destroy(const std::string& name); template <class Test> bool destroyIf(const std::string& name, Test test) { - qpid::sys::RWlock::ScopedWlock locker(lock); if (test()) { - destroyLH (name); + destroy(name); return true; } else { return false; @@ -127,13 +126,13 @@ class QueueRegistry { for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); ++i) f(i->second); } - + /** * Change queue mode when cluster size drops to 1 node, expands again * in practice allows flow queue to disk when last name to be exectuted */ void updateQueueClusterState(bool lastNode); - + private: typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap; QueueMap queues; @@ -144,12 +143,9 @@ private: management::Manageable* parent; bool lastNode; //used to set mode on queue declare Broker* broker; - - //destroy impl that assumes lock is already held: - void destroyLH (const std::string& name); }; - + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 9f0fee84e9..af6066fc5b 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -90,13 +90,10 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : QPID_LOG(debug, logPrefix << "Promoted, expected backups: " << expect); for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) { boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup( - *i, haBroker.getBroker(), haBroker.getReplicationTest(), - true, // Create queue guards immediately for expected backups. - false // Not yet connected. - )); + new RemoteBackup(*i, haBroker.getReplicationTest(), false)); backups[i->getSystemId()] = backup; if (!backup->isReady()) expectedBackups.insert(backup); + backup->createGuards(hb.getBroker().getQueues()); } // Set timeout for expected brokers to connect and become ready. sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC)); @@ -181,16 +178,13 @@ void Primary::queueDestroy(const QueuePtr& q) { void Primary::opened(broker::Connection& connection) { Mutex::ScopedLock l(lock); BrokerInfo info; + boost::shared_ptr<RemoteBackup> backup; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { BackupMap::iterator i = backups.find(info.getSystemId()); if (i == backups.end()) { + backup.reset(new RemoteBackup(info, haBroker.getReplicationTest(), true)); + backups[info.getSystemId()] = backup; QPID_LOG(debug, logPrefix << "New backup connected: " << info); - backups[info.getSystemId()].reset( - new RemoteBackup( - info, haBroker.getBroker(), haBroker.getReplicationTest(), - false, // Lazy-create guards for new backups, creating now deadlocks - true // Backup is connected - )); } else { QPID_LOG(debug, logPrefix << "Known backup connected: " << info); diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index 2b8a0077f5..810913ba8f 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -30,14 +30,15 @@ namespace ha { using sys::Mutex; -RemoteBackup::RemoteBackup( - const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool cg, bool con) : +RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) : logPrefix("Primary remote backup "+info.getLogId()+": "), - brokerInfo(info), replicationTest(rt), - createGuards(cg), connected(con) + brokerInfo(info), replicationTest(rt), connected(con) +{} + +void RemoteBackup::createGuards(broker::QueueRegistry& queues) { QPID_LOG(debug, logPrefix << "Guarding queues for backup broker."); - broker.getQueues().eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1)); + queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1)); } RemoteBackup::~RemoteBackup() { cancel(); } @@ -56,14 +57,12 @@ void RemoteBackup::initialQueue(const QueuePtr& q) { } RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) { - if (!createGuards) return RemoteBackup::GuardPtr(); GuardMap::iterator i = guards.find(q); - if (i == guards.end()) { - assert(0); - throw Exception(logPrefix+": Cannot find queue guard: "+q->getName()); + GuardPtr guard; + if (i != guards.end()) { + guard = i->second; + guards.erase(i); } - GuardPtr guard = i->second; - guards.erase(i); return guard; } @@ -89,7 +88,7 @@ void RemoteBackup::ready(const QueuePtr& q) { // Called via ConfigurationObserver and from initialQueue void RemoteBackup::queueCreate(const QueuePtr& q) { - if (createGuards && replicationTest.isReplicated(ALL, *q)) + if (replicationTest.isReplicated(ALL, *q)) guards[q].reset(new QueueGuard(*q, brokerInfo)); } diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h index 8ea539b167..3cc18bcd1b 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.h +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h @@ -31,8 +31,8 @@ namespace qpid { namespace broker { -class Broker; class Queue; +class QueueRegistry; } namespace ha { @@ -51,11 +51,15 @@ class RemoteBackup typedef boost::shared_ptr<QueueGuard> GuardPtr; typedef boost::shared_ptr<broker::Queue> QueuePtr; - /** Note: isReady() can be true after construction */ - RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt, - bool createGuards, bool connected); + /** Note: isReady() can be true after construction + *@param connected true if the backup is already connected. + */ + RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected); ~RemoteBackup(); + /** Create initial guards for all the replicated queues in the registry. */ + void createGuards(broker::QueueRegistry&); + /** Return guard associated with a queue. Used to create ReplicatingSubscription. */ GuardPtr guard(const QueuePtr&); @@ -85,15 +89,15 @@ class RemoteBackup typedef std::map<QueuePtr, GuardPtr> GuardMap; typedef std::set<QueuePtr> QueueSet; + /** Add queue to guard as an initial queue */ + void initialQueue(const QueuePtr&); + std::string logPrefix; BrokerInfo brokerInfo; ReplicationTest replicationTest; GuardMap guards; QueueSet initialQueues; - bool createGuards; bool connected; - - void initialQueue(const QueuePtr&); }; }} // namespace qpid::ha |