diff options
author | Alan Conway <aconway@apache.org> | 2013-01-23 21:58:03 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-01-23 21:58:03 +0000 |
commit | 25c6f2104c02054b05d362f517268c1235bc36a2 (patch) | |
tree | dde0a9005fc1880654b100bd43729ae2ee966c10 /qpid/cpp/src/qpid/ha/Primary.cpp | |
parent | 5705c6575e717d74e6bd2a942b7ee085eb62cffb (diff) | |
download | qpid-python-25c6f2104c02054b05d362f517268c1235bc36a2.tar.gz |
NO-JIRA: HA refactor, re-organise code for clarity and thread safety.
Introduce Role base class. Primary and Backup are now subclasses of Role. Moved
backup/primary specific code from HaBroker to the Backup and Primary roles.
HaBroker always holds a single Role, via a thread-safe RoleHolder. RoleHolder
ensures atomic transition between roles: the old role is deleted before the new
role is created.
Membership is now independently thread safe, breaking the potential deadlock
between HaBroker and the Roles.
Logging improvements and other minor cleanup.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1437771 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 259b043bef..12535399e3 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -82,8 +82,10 @@ class ExpectedBackupTimerTask : public sys::TimerTask { Primary* Primary::instance = 0; Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : - haBroker(hb), logPrefix("Primary: "), active(false) + haBroker(hb), membership(hb.getMembership()), + logPrefix("Primary: "), active(false) { + hb.getMembership().setStatus(RECOVERING); assert(instance == 0); instance = this; // Let queue replicators find us. if (expect.empty()) { @@ -108,11 +110,18 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : hb.getBroker().getTimer().add(timerTask); } + + // Remove backup tag property from outgoing link properties. + framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties(); + linkProperties.erase(ConnectionObserver::BACKUP_TAG); + hb.getBroker().setLinkClientProperties(linkProperties); + configurationObserver.reset(new PrimaryConfigurationObserver(*this)); haBroker.getBroker().getConfigurationObservers().add(configurationObserver); Mutex::ScopedLock l(lock); // We are now active as a configurationObserver checkReady(l); + // Allow client connections connectionObserver.reset(new PrimaryConnectionObserver(*this)); haBroker.getObserver()->setObserver(connectionObserver, logPrefix); @@ -128,7 +137,7 @@ void Primary::checkReady(Mutex::ScopedLock&) { active = true; Mutex::ScopedUnlock u(lock); // Don't hold lock across callback QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active."); - haBroker.activate(); + membership.setStatus(ACTIVE); } } @@ -136,7 +145,7 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { if (i != backups.end() && i->second->reportReady()) { BrokerInfo info = i->second->getBrokerInfo(); info.setStatus(READY); - haBroker.addBroker(info); + membership.add(info); if (expectedBackups.erase(i->second)) { QPID_LOG(info, logPrefix << "Expected backup is ready: " << info); checkReady(l); @@ -164,7 +173,7 @@ void Primary::timeoutExpectedBackups() { // Downgrade the broker's status to CATCHUP // The broker will get this status change when it eventually connects. info.setStatus(CATCHUP); - haBroker.addBroker(info); + membership.add(info); } else ++i; } @@ -243,7 +252,7 @@ void Primary::opened(broker::Connection& connection) { checkReady(i, l); } if (info.getStatus() == JOINING) info.setStatus(CATCHUP); - haBroker.addBroker(info); + membership.add(info); } else QPID_LOG(debug, logPrefix << "Accepted client connection " @@ -260,7 +269,7 @@ void Primary::closed(broker::Connection& connection) { // Checking isConnected() lets us ignore such spurious closes. if (i != backups.end() && i->second->isConnected()) { QPID_LOG(info, logPrefix << "Backup disconnected: " << info); - haBroker.removeBroker(info.getSystemId()); + membership.remove(info.getSystemId()); expectedBackups.erase(i->second); backups.erase(i); checkReady(l); @@ -276,4 +285,9 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q); } +Role* Primary::promote() { + QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo()); + return 0; +} + }} // namespace qpid::ha |