summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/Primary.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-01-23 21:58:03 +0000
committerAlan Conway <aconway@apache.org>2013-01-23 21:58:03 +0000
commit25c6f2104c02054b05d362f517268c1235bc36a2 (patch)
treedde0a9005fc1880654b100bd43729ae2ee966c10 /qpid/cpp/src/qpid/ha/Primary.cpp
parent5705c6575e717d74e6bd2a942b7ee085eb62cffb (diff)
downloadqpid-python-25c6f2104c02054b05d362f517268c1235bc36a2.tar.gz
NO-JIRA: HA refactor, re-organise code for clarity and thread safety.
Introduce Role base class. Primary and Backup are now subclasses of Role. Moved backup/primary specific code from HaBroker to the Backup and Primary roles. HaBroker always holds a single Role, via a thread-safe RoleHolder. RoleHolder ensures atomic transition between roles: the old role is deleted before the new role is created. Membership is now independently thread safe, breaking the potential deadlock between HaBroker and the Roles. Logging improvements and other minor cleanup. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1437771 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp26
1 files changed, 20 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 259b043bef..12535399e3 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -82,8 +82,10 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
Primary* Primary::instance = 0;
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
- haBroker(hb), logPrefix("Primary: "), active(false)
+ haBroker(hb), membership(hb.getMembership()),
+ logPrefix("Primary: "), active(false)
{
+ hb.getMembership().setStatus(RECOVERING);
assert(instance == 0);
instance = this; // Let queue replicators find us.
if (expect.empty()) {
@@ -108,11 +110,18 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
hb.getBroker().getTimer().add(timerTask);
}
+
+ // Remove backup tag property from outgoing link properties.
+ framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties();
+ linkProperties.erase(ConnectionObserver::BACKUP_TAG);
+ hb.getBroker().setLinkClientProperties(linkProperties);
+
configurationObserver.reset(new PrimaryConfigurationObserver(*this));
haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
Mutex::ScopedLock l(lock); // We are now active as a configurationObserver
checkReady(l);
+
// Allow client connections
connectionObserver.reset(new PrimaryConnectionObserver(*this));
haBroker.getObserver()->setObserver(connectionObserver, logPrefix);
@@ -128,7 +137,7 @@ void Primary::checkReady(Mutex::ScopedLock&) {
active = true;
Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active.");
- haBroker.activate();
+ membership.setStatus(ACTIVE);
}
}
@@ -136,7 +145,7 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
if (i != backups.end() && i->second->reportReady()) {
BrokerInfo info = i->second->getBrokerInfo();
info.setStatus(READY);
- haBroker.addBroker(info);
+ membership.add(info);
if (expectedBackups.erase(i->second)) {
QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
checkReady(l);
@@ -164,7 +173,7 @@ void Primary::timeoutExpectedBackups() {
// Downgrade the broker's status to CATCHUP
// The broker will get this status change when it eventually connects.
info.setStatus(CATCHUP);
- haBroker.addBroker(info);
+ membership.add(info);
}
else ++i;
}
@@ -243,7 +252,7 @@ void Primary::opened(broker::Connection& connection) {
checkReady(i, l);
}
if (info.getStatus() == JOINING) info.setStatus(CATCHUP);
- haBroker.addBroker(info);
+ membership.add(info);
}
else
QPID_LOG(debug, logPrefix << "Accepted client connection "
@@ -260,7 +269,7 @@ void Primary::closed(broker::Connection& connection) {
// Checking isConnected() lets us ignore such spurious closes.
if (i != backups.end() && i->second->isConnected()) {
QPID_LOG(info, logPrefix << "Backup disconnected: " << info);
- haBroker.removeBroker(info.getSystemId());
+ membership.remove(info.getSystemId());
expectedBackups.erase(i->second);
backups.erase(i);
checkReady(l);
@@ -276,4 +285,9 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI
return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
}
+Role* Primary::promote() {
+ QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo());
+ return 0;
+}
+
}} // namespace qpid::ha