summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-19 19:16:29 +0000
committerAlan Conway <aconway@apache.org>2012-07-19 19:16:29 +0000
commit7036b9768a8c71bb723b12decae826cf1daae95d (patch)
treec7437ece9999554f973a521596edc3f2356dcbdd
parente20bed55215d15ed453e7a19de73fb28bf1aca84 (diff)
downloadqpid-python-7036b9768a8c71bb723b12decae826cf1daae95d.tar.gz
QPID-4148: HA Not setting initial queues for new RemoteBackups
Fix bug introduced by r1362584: "QPID-4144 HA broker deadlocks on broker::QueueRegistry lock and ha::Primary lock" Stopped setting initial queues on new (i.e. not expected) RemoteBackups. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1363488 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp15
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp12
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.h8
3 files changed, 21 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index f3df5b2263..2a3eb86b64 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -93,7 +93,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
new RemoteBackup(*i, haBroker.getReplicationTest(), false));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
- backup->createGuards(hb.getBroker().getQueues());
+ backup->setInitialQueues(hb.getBroker().getQueues(), true); // Create guards
}
// Set timeout for expected brokers to connect and become ready.
sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC));
@@ -187,13 +187,18 @@ void Primary::queueDestroy(const QueuePtr& q) {
}
void Primary::opened(broker::Connection& connection) {
- Mutex::ScopedLock l(lock);
BrokerInfo info;
- boost::shared_ptr<RemoteBackup> backup;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
+ Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
- backup.reset(new RemoteBackup(info, haBroker.getReplicationTest(), true));
+ boost::shared_ptr<RemoteBackup> backup(
+ new RemoteBackup(info, haBroker.getReplicationTest(), true));
+ {
+ // Avoid deadlock with queue registry lock.
+ Mutex::ScopedUnlock u(lock);
+ backup->setInitialQueues(haBroker.getBroker().getQueues(), false);
+ }
backups[info.getSystemId()] = backup;
QPID_LOG(debug, logPrefix << "New backup connected: " << info);
}
@@ -207,7 +212,7 @@ void Primary::opened(broker::Connection& connection) {
}
else
QPID_LOG(debug, logPrefix << "Accepted client connection "
- << connection.getMgmtId())
+ << connection.getMgmtId());
}
void Primary::closed(broker::Connection& connection) {
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index 54383750fd..1b4ab6f69a 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -36,10 +36,10 @@ RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con)
brokerInfo(info), replicationTest(rt), connected(con)
{}
-void RemoteBackup::createGuards(broker::QueueRegistry& queues)
+void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool createGuards)
{
- QPID_LOG(debug, logPrefix << "Guarding queues for backup broker.");
- queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1));
+ QPID_LOG(debug, logPrefix << "Setting initial queues" << (createGuards ? " and guards" : ""));
+ queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1, createGuards));
}
RemoteBackup::~RemoteBackup() { cancel(); }
@@ -54,10 +54,10 @@ bool RemoteBackup::isReady() {
return connected && initialQueues.empty();
}
-void RemoteBackup::initialQueue(const QueuePtr& q) {
+void RemoteBackup::initialQueue(const QueuePtr& q, bool createGuard) {
if (replicationTest.isReplicated(ALL, *q)) {
initialQueues.insert(q);
- queueCreate(q);
+ if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo));
}
}
@@ -93,7 +93,7 @@ void RemoteBackup::ready(const QueuePtr& q) {
if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
}
-// Called via ConfigurationObserver and from initialQueue
+// Called via ConfigurationObserver::queueCreate and from initialQueue
void RemoteBackup::queueCreate(const QueuePtr& q) {
if (replicationTest.isReplicated(ALL, *q))
guards[q].reset(new QueueGuard(*q, brokerInfo));
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h
index 3cc18bcd1b..99e5a24494 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.h
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h
@@ -57,8 +57,10 @@ class RemoteBackup
RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected);
~RemoteBackup();
- /** Create initial guards for all the replicated queues in the registry. */
- void createGuards(broker::QueueRegistry&);
+ /** Set the initial queues for all queues in the registry.
+ *@createGuards if true create guards also, if false guards will be created on demand.
+ */
+ void setInitialQueues(broker::QueueRegistry&, bool createGuards);
/** Return guard associated with a queue. Used to create ReplicatingSubscription. */
GuardPtr guard(const QueuePtr&);
@@ -90,7 +92,7 @@ class RemoteBackup
typedef std::set<QueuePtr> QueueSet;
/** Add queue to guard as an initial queue */
- void initialQueue(const QueuePtr&);
+ void initialQueue(const QueuePtr&, bool createGuard);
std::string logPrefix;
BrokerInfo brokerInfo;