diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_elect.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_elect.cpp | 397 |
1 files changed, 199 insertions, 198 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp index 35f5fdf9f9d..d298decf65f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp @@ -42,54 +42,55 @@ namespace mongo { namespace repl { namespace { - class LoseElectionGuard { - MONGO_DISALLOW_COPYING(LoseElectionGuard); - public: - LoseElectionGuard( - TopologyCoordinator* topCoord, - ReplicationExecutor* executor, - std::unique_ptr<FreshnessChecker>* freshnessChecker, - std::unique_ptr<ElectCmdRunner>* electCmdRunner, - ReplicationExecutor::EventHandle* electionFinishedEvent) - : _topCoord(topCoord), - _executor(executor), - _freshnessChecker(freshnessChecker), - _electCmdRunner(electCmdRunner), - _electionFinishedEvent(electionFinishedEvent), - _dismissed(false) { +class LoseElectionGuard { + MONGO_DISALLOW_COPYING(LoseElectionGuard); + +public: + LoseElectionGuard(TopologyCoordinator* topCoord, + ReplicationExecutor* executor, + std::unique_ptr<FreshnessChecker>* freshnessChecker, + std::unique_ptr<ElectCmdRunner>* electCmdRunner, + ReplicationExecutor::EventHandle* electionFinishedEvent) + : _topCoord(topCoord), + _executor(executor), + _freshnessChecker(freshnessChecker), + _electCmdRunner(electCmdRunner), + _electionFinishedEvent(electionFinishedEvent), + _dismissed(false) {} + + ~LoseElectionGuard() { + if (_dismissed) { + return; } - - ~LoseElectionGuard() { - if (_dismissed) { - return; - } - _topCoord->processLoseElection(); - _freshnessChecker->reset(NULL); - _electCmdRunner->reset(NULL); - if (_electionFinishedEvent->isValid()) { - _executor->signalEvent(*_electionFinishedEvent); - } + _topCoord->processLoseElection(); + _freshnessChecker->reset(NULL); + _electCmdRunner->reset(NULL); + if (_electionFinishedEvent->isValid()) { + _executor->signalEvent(*_electionFinishedEvent); } + } - void dismiss() { _dismissed = true; } + void dismiss() { + _dismissed = true; + } - private: - TopologyCoordinator* const _topCoord; - ReplicationExecutor* const _executor; - std::unique_ptr<FreshnessChecker>* const _freshnessChecker; - std::unique_ptr<ElectCmdRunner>* const _electCmdRunner; - const ReplicationExecutor::EventHandle* _electionFinishedEvent; - bool _dismissed; - }; +private: + TopologyCoordinator* const _topCoord; + ReplicationExecutor* const _executor; + std::unique_ptr<FreshnessChecker>* const _freshnessChecker; + std::unique_ptr<ElectCmdRunner>* const _electCmdRunner; + const ReplicationExecutor::EventHandle* _electionFinishedEvent; + bool _dismissed; +}; } // namespace - void ReplicationCoordinatorImpl::_startElectSelf() { - invariant(!_freshnessChecker); - invariant(!_electCmdRunner); +void ReplicationCoordinatorImpl::_startElectSelf() { + invariant(!_freshnessChecker); + invariant(!_electCmdRunner); - stdx::unique_lock<stdx::mutex> lk(_mutex); - switch (_rsConfigState) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + switch (_rsConfigState) { case kConfigSteady: break; case kConfigInitiating: @@ -100,183 +101,183 @@ namespace { _topCoord->processLoseElection(); return; default: - severe() << "Entered replica set election code while in illegal config state " << - int(_rsConfigState); + severe() << "Entered replica set election code while in illegal config state " + << int(_rsConfigState); fassertFailed(18913); - } + } - log() << "Standing for election"; - const StatusWith<ReplicationExecutor::EventHandle> finishEvh = _replExecutor.makeEvent(); - if (finishEvh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(18680, finishEvh.getStatus()); - _electionFinishedEvent = finishEvh.getValue(); - LoseElectionGuard lossGuard(_topCoord.get(), - &_replExecutor, - &_freshnessChecker, - &_electCmdRunner, - &_electionFinishedEvent); + log() << "Standing for election"; + const StatusWith<ReplicationExecutor::EventHandle> finishEvh = _replExecutor.makeEvent(); + if (finishEvh.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(18680, finishEvh.getStatus()); + _electionFinishedEvent = finishEvh.getValue(); + LoseElectionGuard lossGuard(_topCoord.get(), + &_replExecutor, + &_freshnessChecker, + &_electCmdRunner, + &_electionFinishedEvent); + + + invariant(_rsConfig.getMemberAt(_selfIndex).isElectable()); + OpTime lastOpTimeApplied(_getMyLastOptime_inlock()); + + if (lastOpTimeApplied.isNull()) { + log() << "not trying to elect self, " + "do not yet have a complete set of data from any point in time"; + return; + } + _freshnessChecker.reset(new FreshnessChecker); + + // This is necessary because the freshnessChecker may call directly into winning an + // election, if there are no other MaybeUp nodes. Winning an election attempts to lock + // _mutex again. + lk.unlock(); + + StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _freshnessChecker->start( + &_replExecutor, + lastOpTimeApplied.getTimestamp(), + _rsConfig, + _selfIndex, + _topCoord->getMaybeUpHostAndPorts(), + stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, this)); + if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(18681, nextPhaseEvh.getStatus()); + lossGuard.dismiss(); +} + +void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() { + invariant(_freshnessChecker); + invariant(!_electCmdRunner); + LoseElectionGuard lossGuard(_topCoord.get(), + &_replExecutor, + &_freshnessChecker, + &_electCmdRunner, + &_electionFinishedEvent); + + if (_freshnessChecker->isCanceled()) { + LOG(2) << "Election canceled during freshness check phase"; + return; + } - invariant(_rsConfig.getMemberAt(_selfIndex).isElectable()); - OpTime lastOpTimeApplied(_getMyLastOptime_inlock()); + const Date_t now(_replExecutor.now()); + const FreshnessChecker::ElectionAbortReason abortReason = + _freshnessChecker->shouldAbortElection(); - if (lastOpTimeApplied.isNull()) { - log() << "not trying to elect self, " - "do not yet have a complete set of data from any point in time"; + // need to not sleep after last time sleeping, + switch (abortReason) { + case FreshnessChecker::None: + break; + case FreshnessChecker::FreshnessTie: + if ((_selfIndex != 0) && !_sleptLastElection) { + const auto ms = Milliseconds(_replExecutor.nextRandomInt64(1000) + 50); + const Date_t nextCandidateTime = now + ms; + log() << "possible election tie; sleeping " << ms.count() << "ms until " + << dateToISOStringLocal(nextCandidateTime); + _topCoord->setElectionSleepUntil(nextCandidateTime); + _replExecutor.scheduleWorkAt( + nextCandidateTime, + stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie, + this, + stdx::placeholders::_1)); + _sleptLastElection = true; + return; + } + _sleptLastElection = false; + break; + case FreshnessChecker::FresherNodeFound: + log() << "not electing self, we are not freshest"; return; - } - - _freshnessChecker.reset(new FreshnessChecker); - - // This is necessary because the freshnessChecker may call directly into winning an - // election, if there are no other MaybeUp nodes. Winning an election attempts to lock - // _mutex again. - lk.unlock(); - - StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _freshnessChecker->start( - &_replExecutor, - lastOpTimeApplied.getTimestamp(), - _rsConfig, - _selfIndex, - _topCoord->getMaybeUpHostAndPorts(), - stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, this)); - if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { + case FreshnessChecker::QuorumUnreachable: + log() << "not electing self, we could not contact enough voting members"; + return; + default: + log() << "not electing self due to election abort message :" + << static_cast<int>(abortReason); return; - } - fassert(18681, nextPhaseEvh.getStatus()); - lossGuard.dismiss(); } - void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() { - invariant(_freshnessChecker); - invariant(!_electCmdRunner); - LoseElectionGuard lossGuard(_topCoord.get(), - &_replExecutor, - &_freshnessChecker, - &_electCmdRunner, - &_electionFinishedEvent); - - if (_freshnessChecker->isCanceled()) { - LOG(2) << "Election canceled during freshness check phase"; - return; - } + log() << "running for election"; + // Secure our vote for ourself first + if (!_topCoord->voteForMyself(now)) { + return; + } - const Date_t now(_replExecutor.now()); - const FreshnessChecker::ElectionAbortReason abortReason = - _freshnessChecker->shouldAbortElection(); - - // need to not sleep after last time sleeping, - switch (abortReason) { - case FreshnessChecker::None: - break; - case FreshnessChecker::FreshnessTie: - if ((_selfIndex != 0) && !_sleptLastElection) { - const auto ms = Milliseconds(_replExecutor.nextRandomInt64(1000) + 50); - const Date_t nextCandidateTime = now + ms; - log() << "possible election tie; sleeping " << ms.count() << "ms until " << - dateToISOStringLocal(nextCandidateTime); - _topCoord->setElectionSleepUntil(nextCandidateTime); - _replExecutor.scheduleWorkAt( - nextCandidateTime, - stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie, - this, - stdx::placeholders::_1)); - _sleptLastElection = true; - return; - } - _sleptLastElection = false; - break; - case FreshnessChecker::FresherNodeFound: - log() << "not electing self, we are not freshest"; - return; - case FreshnessChecker::QuorumUnreachable: - log() << "not electing self, we could not contact enough voting members"; - return; - default: - log() << "not electing self due to election abort message :" - << static_cast<int>(abortReason); - return; - } + _electCmdRunner.reset(new ElectCmdRunner); + StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _electCmdRunner->start( + &_replExecutor, + _rsConfig, + _selfIndex, + _topCoord->getMaybeUpHostAndPorts(), + stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, this)); + if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(18685, nextPhaseEvh.getStatus()); + lossGuard.dismiss(); +} + +void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() { + LoseElectionGuard lossGuard(_topCoord.get(), + &_replExecutor, + &_freshnessChecker, + &_electCmdRunner, + &_electionFinishedEvent); + + invariant(_freshnessChecker); + invariant(_electCmdRunner); + if (_electCmdRunner->isCanceled()) { + LOG(2) << "Election canceled during elect self phase"; + return; + } - log() << "running for election"; - // Secure our vote for ourself first - if (!_topCoord->voteForMyself(now)) { - return; - } + const int receivedVotes = _electCmdRunner->getReceivedVotes(); - _electCmdRunner.reset(new ElectCmdRunner); - StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _electCmdRunner->start( - &_replExecutor, - _rsConfig, - _selfIndex, - _topCoord->getMaybeUpHostAndPorts(), - stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, this)); - if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(18685, nextPhaseEvh.getStatus()); - lossGuard.dismiss(); + if (receivedVotes < _rsConfig.getMajorityVoteCount()) { + log() << "couldn't elect self, only received " << receivedVotes + << " votes, but needed at least " << _rsConfig.getMajorityVoteCount(); + // Suppress ourselves from standing for election again, giving other nodes a chance + // to win their elections. + const auto ms = Milliseconds(_replExecutor.nextRandomInt64(1000) + 50); + const Date_t now(_replExecutor.now()); + const Date_t nextCandidateTime = now + ms; + log() << "waiting until " << nextCandidateTime << " before standing for election again"; + _topCoord->setElectionSleepUntil(nextCandidateTime); + _replExecutor.scheduleWorkAt( + nextCandidateTime, + stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie, + this, + stdx::placeholders::_1)); + return; } - void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() { - LoseElectionGuard lossGuard(_topCoord.get(), - &_replExecutor, - &_freshnessChecker, - &_electCmdRunner, - &_electionFinishedEvent); - - invariant(_freshnessChecker); - invariant(_electCmdRunner); - if (_electCmdRunner->isCanceled()) { - LOG(2) << "Election canceled during elect self phase"; - return; - } + if (_rsConfig.getConfigVersion() != _freshnessChecker->getOriginalConfigVersion()) { + log() << "config version changed during our election, ignoring result"; + return; + } - const int receivedVotes = _electCmdRunner->getReceivedVotes(); - - if (receivedVotes < _rsConfig.getMajorityVoteCount()) { - log() << "couldn't elect self, only received " << receivedVotes << - " votes, but needed at least " << _rsConfig.getMajorityVoteCount(); - // Suppress ourselves from standing for election again, giving other nodes a chance - // to win their elections. - const auto ms = Milliseconds(_replExecutor.nextRandomInt64(1000) + 50); - const Date_t now(_replExecutor.now()); - const Date_t nextCandidateTime = now + ms; - log() << "waiting until " << nextCandidateTime << " before standing for election again"; - _topCoord->setElectionSleepUntil(nextCandidateTime); - _replExecutor.scheduleWorkAt( - nextCandidateTime, - stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie, - this, - stdx::placeholders::_1)); - return; - } + log() << "election succeeded, assuming primary role"; - if (_rsConfig.getConfigVersion() != _freshnessChecker->getOriginalConfigVersion()) { - log() << "config version changed during our election, ignoring result"; - return; - } - - log() << "election succeeded, assuming primary role"; + lossGuard.dismiss(); + _freshnessChecker.reset(NULL); + _electCmdRunner.reset(NULL); + _performPostMemberStateUpdateAction(kActionWinElection); + _replExecutor.signalEvent(_electionFinishedEvent); +} - lossGuard.dismiss(); - _freshnessChecker.reset(NULL); - _electCmdRunner.reset(NULL); - _performPostMemberStateUpdateAction(kActionWinElection); - _replExecutor.signalEvent(_electionFinishedEvent); +void ReplicationCoordinatorImpl::_recoverFromElectionTie( + const ReplicationExecutor::CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + return; } - - void ReplicationCoordinatorImpl::_recoverFromElectionTie( - const ReplicationExecutor::CallbackArgs& cbData) { - if (!cbData.status.isOK()) { - return; - } - if (_topCoord->checkShouldStandForElection(_replExecutor.now(), getMyLastOptime())) { - _startElectSelf(); - } + if (_topCoord->checkShouldStandForElection(_replExecutor.now(), getMyLastOptime())) { + _startElectSelf(); } +} } // namespace repl } // namespace mongo |