diff options
Diffstat (limited to 'cpp/src/qpid/ha/Primary.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/Primary.cpp | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index 259b043bef..12535399e3 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -82,8 +82,10 @@ class ExpectedBackupTimerTask : public sys::TimerTask { Primary* Primary::instance = 0; Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : - haBroker(hb), logPrefix("Primary: "), active(false) + haBroker(hb), membership(hb.getMembership()), + logPrefix("Primary: "), active(false) { + hb.getMembership().setStatus(RECOVERING); assert(instance == 0); instance = this; // Let queue replicators find us. if (expect.empty()) { @@ -108,11 +110,18 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : hb.getBroker().getTimer().add(timerTask); } + + // Remove backup tag property from outgoing link properties. + framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties(); + linkProperties.erase(ConnectionObserver::BACKUP_TAG); + hb.getBroker().setLinkClientProperties(linkProperties); + configurationObserver.reset(new PrimaryConfigurationObserver(*this)); haBroker.getBroker().getConfigurationObservers().add(configurationObserver); Mutex::ScopedLock l(lock); // We are now active as a configurationObserver checkReady(l); + // Allow client connections connectionObserver.reset(new PrimaryConnectionObserver(*this)); haBroker.getObserver()->setObserver(connectionObserver, logPrefix); @@ -128,7 +137,7 @@ void Primary::checkReady(Mutex::ScopedLock&) { active = true; Mutex::ScopedUnlock u(lock); // Don't hold lock across callback QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active."); - haBroker.activate(); + membership.setStatus(ACTIVE); } } @@ -136,7 +145,7 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { if (i != backups.end() && i->second->reportReady()) { BrokerInfo info = i->second->getBrokerInfo(); info.setStatus(READY); - haBroker.addBroker(info); + membership.add(info); if (expectedBackups.erase(i->second)) { QPID_LOG(info, logPrefix << "Expected backup is ready: " << info); checkReady(l); @@ -164,7 +173,7 @@ void Primary::timeoutExpectedBackups() { // Downgrade the broker's status to CATCHUP // The broker will get this status change when it eventually connects. info.setStatus(CATCHUP); - haBroker.addBroker(info); + membership.add(info); } else ++i; } @@ -243,7 +252,7 @@ void Primary::opened(broker::Connection& connection) { checkReady(i, l); } if (info.getStatus() == JOINING) info.setStatus(CATCHUP); - haBroker.addBroker(info); + membership.add(info); } else QPID_LOG(debug, logPrefix << "Accepted client connection " @@ -260,7 +269,7 @@ void Primary::closed(broker::Connection& connection) { // Checking isConnected() lets us ignore such spurious closes. if (i != backups.end() && i->second->isConnected()) { QPID_LOG(info, logPrefix << "Backup disconnected: " << info); - haBroker.removeBroker(info.getSystemId()); + membership.remove(info.getSystemId()); expectedBackups.erase(i->second); backups.erase(i); checkReady(l); @@ -276,4 +285,9 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q); } +Role* Primary::promote() { + QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo()); + return 0; +} + }} // namespace qpid::ha |
