summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-19 19:16:12 +0000
committerAlan Conway <aconway@apache.org>2012-07-19 19:16:12 +0000
commit6af5d7550a52f3bce03f9517bfbdfefa1a90cbe8 (patch)
tree44d1e5c8e97a689b214381588404acd82230e92d
parent3ea4a64bb9262287c3b1ea3bf0871c92bbe6efd6 (diff)
downloadqpid-python-6af5d7550a52f3bce03f9517bfbdfefa1a90cbe8.tar.gz
QPID-4144 HA broker deadlocks on broker::QueueRegistry lock and ha::Primary lock
Running tests repeatedly, the broker deadlocked with the attached stack trace. The problem call sequences are: 1. QueueRegistry::destroy takes QueuerRegistry lock > ConfigurationObserver::queueDestroy > ha::Primary::queueDestroy takes Primary lock. 2. ConnectionObserver::opened cals Primary::opened lock> RemoteBackup>getQueues().eachQueue This patch breaks the deadlock at both ends: QueueRegistry no longer holds the lock across the observer call and Primary does not hold the lock across eachQueue. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1363486 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp21
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.h18
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp16
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp23
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.h18
5 files changed, 44 insertions, 52 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index 2916d7bb93..1401356444 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -79,18 +79,17 @@ QueueRegistry::declare(const string& declareName, bool durable,
return result;
}
-void QueueRegistry::destroyLH (const string& name) {
- QueueMap::iterator i = queues.find(name);
- if (i != queues.end()) {
- Queue::shared_ptr q = i->second;
- queues.erase(i);
- if (broker) broker->getConfigurationObservers().queueDestroy(q);
+void QueueRegistry::destroy(const string& name) {
+ Queue::shared_ptr q;
+ {
+ qpid::sys::RWlock::ScopedWlock locker(lock);
+ QueueMap::iterator i = queues.find(name);
+ if (i != queues.end()) {
+ Queue::shared_ptr q = i->second;
+ queues.erase(i);
+ }
}
-}
-
-void QueueRegistry::destroy (const string& name){
- RWlock::ScopedWlock locker(lock);
- destroyLH (name);
+ if (broker && q) broker->getConfigurationObservers().queueDestroy(q);
}
Queue::shared_ptr QueueRegistry::find(const string& name){
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h
index f724e6b10c..a354513c5f 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.h
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,7 +61,7 @@ class QueueRegistry {
QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare(
const std::string& name,
bool durable = false,
- bool autodelete = false,
+ bool autodelete = false,
const OwnershipToken* owner = 0,
boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
const qpid::framing::FieldTable& args = framing::FieldTable(),
@@ -82,9 +82,8 @@ class QueueRegistry {
QPID_BROKER_EXTERN void destroy(const std::string& name);
template <class Test> bool destroyIf(const std::string& name, Test test)
{
- qpid::sys::RWlock::ScopedWlock locker(lock);
if (test()) {
- destroyLH (name);
+ destroy(name);
return true;
} else {
return false;
@@ -127,13 +126,13 @@ class QueueRegistry {
for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); ++i)
f(i->second);
}
-
+
/**
* Change queue mode when cluster size drops to 1 node, expands again
* in practice allows flow queue to disk when last name to be exectuted
*/
void updateQueueClusterState(bool lastNode);
-
+
private:
typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap;
QueueMap queues;
@@ -144,12 +143,9 @@ private:
management::Manageable* parent;
bool lastNode; //used to set mode on queue declare
Broker* broker;
-
- //destroy impl that assumes lock is already held:
- void destroyLH (const std::string& name);
};
-
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 9f0fee84e9..af6066fc5b 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -90,13 +90,10 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
QPID_LOG(debug, logPrefix << "Promoted, expected backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(
- *i, haBroker.getBroker(), haBroker.getReplicationTest(),
- true, // Create queue guards immediately for expected backups.
- false // Not yet connected.
- ));
+ new RemoteBackup(*i, haBroker.getReplicationTest(), false));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
+ backup->createGuards(hb.getBroker().getQueues());
}
// Set timeout for expected brokers to connect and become ready.
sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC));
@@ -181,16 +178,13 @@ void Primary::queueDestroy(const QueuePtr& q) {
void Primary::opened(broker::Connection& connection) {
Mutex::ScopedLock l(lock);
BrokerInfo info;
+ boost::shared_ptr<RemoteBackup> backup;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
+ backup.reset(new RemoteBackup(info, haBroker.getReplicationTest(), true));
+ backups[info.getSystemId()] = backup;
QPID_LOG(debug, logPrefix << "New backup connected: " << info);
- backups[info.getSystemId()].reset(
- new RemoteBackup(
- info, haBroker.getBroker(), haBroker.getReplicationTest(),
- false, // Lazy-create guards for new backups, creating now deadlocks
- true // Backup is connected
- ));
}
else {
QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index 2b8a0077f5..810913ba8f 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -30,14 +30,15 @@ namespace ha {
using sys::Mutex;
-RemoteBackup::RemoteBackup(
- const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool cg, bool con) :
+RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
logPrefix("Primary remote backup "+info.getLogId()+": "),
- brokerInfo(info), replicationTest(rt),
- createGuards(cg), connected(con)
+ brokerInfo(info), replicationTest(rt), connected(con)
+{}
+
+void RemoteBackup::createGuards(broker::QueueRegistry& queues)
{
QPID_LOG(debug, logPrefix << "Guarding queues for backup broker.");
- broker.getQueues().eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1));
+ queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1));
}
RemoteBackup::~RemoteBackup() { cancel(); }
@@ -56,14 +57,12 @@ void RemoteBackup::initialQueue(const QueuePtr& q) {
}
RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) {
- if (!createGuards) return RemoteBackup::GuardPtr();
GuardMap::iterator i = guards.find(q);
- if (i == guards.end()) {
- assert(0);
- throw Exception(logPrefix+": Cannot find queue guard: "+q->getName());
+ GuardPtr guard;
+ if (i != guards.end()) {
+ guard = i->second;
+ guards.erase(i);
}
- GuardPtr guard = i->second;
- guards.erase(i);
return guard;
}
@@ -89,7 +88,7 @@ void RemoteBackup::ready(const QueuePtr& q) {
// Called via ConfigurationObserver and from initialQueue
void RemoteBackup::queueCreate(const QueuePtr& q) {
- if (createGuards && replicationTest.isReplicated(ALL, *q))
+ if (replicationTest.isReplicated(ALL, *q))
guards[q].reset(new QueueGuard(*q, brokerInfo));
}
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h
index 8ea539b167..3cc18bcd1b 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.h
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h
@@ -31,8 +31,8 @@
namespace qpid {
namespace broker {
-class Broker;
class Queue;
+class QueueRegistry;
}
namespace ha {
@@ -51,11 +51,15 @@ class RemoteBackup
typedef boost::shared_ptr<QueueGuard> GuardPtr;
typedef boost::shared_ptr<broker::Queue> QueuePtr;
- /** Note: isReady() can be true after construction */
- RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt,
- bool createGuards, bool connected);
+ /** Note: isReady() can be true after construction
+ *@param connected true if the backup is already connected.
+ */
+ RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected);
~RemoteBackup();
+ /** Create initial guards for all the replicated queues in the registry. */
+ void createGuards(broker::QueueRegistry&);
+
/** Return guard associated with a queue. Used to create ReplicatingSubscription. */
GuardPtr guard(const QueuePtr&);
@@ -85,15 +89,15 @@ class RemoteBackup
typedef std::map<QueuePtr, GuardPtr> GuardMap;
typedef std::set<QueuePtr> QueueSet;
+ /** Add queue to guard as an initial queue */
+ void initialQueue(const QueuePtr&);
+
std::string logPrefix;
BrokerInfo brokerInfo;
ReplicationTest replicationTest;
GuardMap guards;
QueueSet initialQueues;
- bool createGuards;
bool connected;
-
- void initialQueue(const QueuePtr&);
};
}} // namespace qpid::ha