diff options
author | Alan Conway <aconway@apache.org> | 2012-07-19 19:16:29 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-07-19 19:16:29 +0000 |
commit | 7036b9768a8c71bb723b12decae826cf1daae95d (patch) | |
tree | c7437ece9999554f973a521596edc3f2356dcbdd | |
parent | e20bed55215d15ed453e7a19de73fb28bf1aca84 (diff) | |
download | qpid-python-7036b9768a8c71bb723b12decae826cf1daae95d.tar.gz |
QPID-4148: HA Not setting initial queues for new RemoteBackups
Fix bug introduced by r1362584:
"QPID-4144 HA broker deadlocks on broker::QueueRegistry lock and ha::Primary lock"
Stopped setting initial queues on new (i.e. not expected) RemoteBackups.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1363488 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.h | 8 |
3 files changed, 21 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index f3df5b2263..2a3eb86b64 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -93,7 +93,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : new RemoteBackup(*i, haBroker.getReplicationTest(), false)); backups[i->getSystemId()] = backup; if (!backup->isReady()) expectedBackups.insert(backup); - backup->createGuards(hb.getBroker().getQueues()); + backup->setInitialQueues(hb.getBroker().getQueues(), true); // Create guards } // Set timeout for expected brokers to connect and become ready. sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC)); @@ -187,13 +187,18 @@ void Primary::queueDestroy(const QueuePtr& q) { } void Primary::opened(broker::Connection& connection) { - Mutex::ScopedLock l(lock); BrokerInfo info; - boost::shared_ptr<RemoteBackup> backup; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { + Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(info.getSystemId()); if (i == backups.end()) { - backup.reset(new RemoteBackup(info, haBroker.getReplicationTest(), true)); + boost::shared_ptr<RemoteBackup> backup( + new RemoteBackup(info, haBroker.getReplicationTest(), true)); + { + // Avoid deadlock with queue registry lock. + Mutex::ScopedUnlock u(lock); + backup->setInitialQueues(haBroker.getBroker().getQueues(), false); + } backups[info.getSystemId()] = backup; QPID_LOG(debug, logPrefix << "New backup connected: " << info); } @@ -207,7 +212,7 @@ void Primary::opened(broker::Connection& connection) { } else QPID_LOG(debug, logPrefix << "Accepted client connection " - << connection.getMgmtId()) + << connection.getMgmtId()); } void Primary::closed(broker::Connection& connection) { diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index 54383750fd..1b4ab6f69a 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -36,10 +36,10 @@ RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) brokerInfo(info), replicationTest(rt), connected(con) {} -void RemoteBackup::createGuards(broker::QueueRegistry& queues) +void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool createGuards) { - QPID_LOG(debug, logPrefix << "Guarding queues for backup broker."); - queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1)); + QPID_LOG(debug, logPrefix << "Setting initial queues" << (createGuards ? " and guards" : "")); + queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1, createGuards)); } RemoteBackup::~RemoteBackup() { cancel(); } @@ -54,10 +54,10 @@ bool RemoteBackup::isReady() { return connected && initialQueues.empty(); } -void RemoteBackup::initialQueue(const QueuePtr& q) { +void RemoteBackup::initialQueue(const QueuePtr& q, bool createGuard) { if (replicationTest.isReplicated(ALL, *q)) { initialQueues.insert(q); - queueCreate(q); + if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo)); } } @@ -93,7 +93,7 @@ void RemoteBackup::ready(const QueuePtr& q) { if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready"); } -// Called via ConfigurationObserver and from initialQueue +// Called via ConfigurationObserver::queueCreate and from initialQueue void RemoteBackup::queueCreate(const QueuePtr& q) { if (replicationTest.isReplicated(ALL, *q)) guards[q].reset(new QueueGuard(*q, brokerInfo)); diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h index 3cc18bcd1b..99e5a24494 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.h +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h @@ -57,8 +57,10 @@ class RemoteBackup RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected); ~RemoteBackup(); - /** Create initial guards for all the replicated queues in the registry. */ - void createGuards(broker::QueueRegistry&); + /** Set the initial queues for all queues in the registry. + *@createGuards if true create guards also, if false guards will be created on demand. + */ + void setInitialQueues(broker::QueueRegistry&, bool createGuards); /** Return guard associated with a queue. Used to create ReplicatingSubscription. */ GuardPtr guard(const QueuePtr&); @@ -90,7 +92,7 @@ class RemoteBackup typedef std::set<QueuePtr> QueueSet; /** Add queue to guard as an initial queue */ - void initialQueue(const QueuePtr&); + void initialQueue(const QueuePtr&, bool createGuard); std::string logPrefix; BrokerInfo brokerInfo; |