summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_elect.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect.cpp397
1 files changed, 199 insertions, 198 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
index 35f5fdf9f9d..d298decf65f 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
@@ -42,54 +42,55 @@ namespace mongo {
namespace repl {
namespace {
- class LoseElectionGuard {
- MONGO_DISALLOW_COPYING(LoseElectionGuard);
- public:
- LoseElectionGuard(
- TopologyCoordinator* topCoord,
- ReplicationExecutor* executor,
- std::unique_ptr<FreshnessChecker>* freshnessChecker,
- std::unique_ptr<ElectCmdRunner>* electCmdRunner,
- ReplicationExecutor::EventHandle* electionFinishedEvent)
- : _topCoord(topCoord),
- _executor(executor),
- _freshnessChecker(freshnessChecker),
- _electCmdRunner(electCmdRunner),
- _electionFinishedEvent(electionFinishedEvent),
- _dismissed(false) {
+class LoseElectionGuard {
+ MONGO_DISALLOW_COPYING(LoseElectionGuard);
+
+public:
+ LoseElectionGuard(TopologyCoordinator* topCoord,
+ ReplicationExecutor* executor,
+ std::unique_ptr<FreshnessChecker>* freshnessChecker,
+ std::unique_ptr<ElectCmdRunner>* electCmdRunner,
+ ReplicationExecutor::EventHandle* electionFinishedEvent)
+ : _topCoord(topCoord),
+ _executor(executor),
+ _freshnessChecker(freshnessChecker),
+ _electCmdRunner(electCmdRunner),
+ _electionFinishedEvent(electionFinishedEvent),
+ _dismissed(false) {}
+
+ ~LoseElectionGuard() {
+ if (_dismissed) {
+ return;
}
-
- ~LoseElectionGuard() {
- if (_dismissed) {
- return;
- }
- _topCoord->processLoseElection();
- _freshnessChecker->reset(NULL);
- _electCmdRunner->reset(NULL);
- if (_electionFinishedEvent->isValid()) {
- _executor->signalEvent(*_electionFinishedEvent);
- }
+ _topCoord->processLoseElection();
+ _freshnessChecker->reset(NULL);
+ _electCmdRunner->reset(NULL);
+ if (_electionFinishedEvent->isValid()) {
+ _executor->signalEvent(*_electionFinishedEvent);
}
+ }
- void dismiss() { _dismissed = true; }
+ void dismiss() {
+ _dismissed = true;
+ }
- private:
- TopologyCoordinator* const _topCoord;
- ReplicationExecutor* const _executor;
- std::unique_ptr<FreshnessChecker>* const _freshnessChecker;
- std::unique_ptr<ElectCmdRunner>* const _electCmdRunner;
- const ReplicationExecutor::EventHandle* _electionFinishedEvent;
- bool _dismissed;
- };
+private:
+ TopologyCoordinator* const _topCoord;
+ ReplicationExecutor* const _executor;
+ std::unique_ptr<FreshnessChecker>* const _freshnessChecker;
+ std::unique_ptr<ElectCmdRunner>* const _electCmdRunner;
+ const ReplicationExecutor::EventHandle* _electionFinishedEvent;
+ bool _dismissed;
+};
} // namespace
- void ReplicationCoordinatorImpl::_startElectSelf() {
- invariant(!_freshnessChecker);
- invariant(!_electCmdRunner);
+void ReplicationCoordinatorImpl::_startElectSelf() {
+ invariant(!_freshnessChecker);
+ invariant(!_electCmdRunner);
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- switch (_rsConfigState) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ switch (_rsConfigState) {
case kConfigSteady:
break;
case kConfigInitiating:
@@ -100,183 +101,183 @@ namespace {
_topCoord->processLoseElection();
return;
default:
- severe() << "Entered replica set election code while in illegal config state " <<
- int(_rsConfigState);
+ severe() << "Entered replica set election code while in illegal config state "
+ << int(_rsConfigState);
fassertFailed(18913);
- }
+ }
- log() << "Standing for election";
- const StatusWith<ReplicationExecutor::EventHandle> finishEvh = _replExecutor.makeEvent();
- if (finishEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(18680, finishEvh.getStatus());
- _electionFinishedEvent = finishEvh.getValue();
- LoseElectionGuard lossGuard(_topCoord.get(),
- &_replExecutor,
- &_freshnessChecker,
- &_electCmdRunner,
- &_electionFinishedEvent);
+ log() << "Standing for election";
+ const StatusWith<ReplicationExecutor::EventHandle> finishEvh = _replExecutor.makeEvent();
+ if (finishEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return;
+ }
+ fassert(18680, finishEvh.getStatus());
+ _electionFinishedEvent = finishEvh.getValue();
+ LoseElectionGuard lossGuard(_topCoord.get(),
+ &_replExecutor,
+ &_freshnessChecker,
+ &_electCmdRunner,
+ &_electionFinishedEvent);
+
+
+ invariant(_rsConfig.getMemberAt(_selfIndex).isElectable());
+ OpTime lastOpTimeApplied(_getMyLastOptime_inlock());
+
+ if (lastOpTimeApplied.isNull()) {
+ log() << "not trying to elect self, "
+ "do not yet have a complete set of data from any point in time";
+ return;
+ }
+ _freshnessChecker.reset(new FreshnessChecker);
+
+ // This is necessary because the freshnessChecker may call directly into winning an
+ // election, if there are no other MaybeUp nodes. Winning an election attempts to lock
+ // _mutex again.
+ lk.unlock();
+
+ StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _freshnessChecker->start(
+ &_replExecutor,
+ lastOpTimeApplied.getTimestamp(),
+ _rsConfig,
+ _selfIndex,
+ _topCoord->getMaybeUpHostAndPorts(),
+ stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, this));
+ if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return;
+ }
+ fassert(18681, nextPhaseEvh.getStatus());
+ lossGuard.dismiss();
+}
+
+void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() {
+ invariant(_freshnessChecker);
+ invariant(!_electCmdRunner);
+ LoseElectionGuard lossGuard(_topCoord.get(),
+ &_replExecutor,
+ &_freshnessChecker,
+ &_electCmdRunner,
+ &_electionFinishedEvent);
+
+ if (_freshnessChecker->isCanceled()) {
+ LOG(2) << "Election canceled during freshness check phase";
+ return;
+ }
- invariant(_rsConfig.getMemberAt(_selfIndex).isElectable());
- OpTime lastOpTimeApplied(_getMyLastOptime_inlock());
+ const Date_t now(_replExecutor.now());
+ const FreshnessChecker::ElectionAbortReason abortReason =
+ _freshnessChecker->shouldAbortElection();
- if (lastOpTimeApplied.isNull()) {
- log() << "not trying to elect self, "
- "do not yet have a complete set of data from any point in time";
+ // need to not sleep after last time sleeping,
+ switch (abortReason) {
+ case FreshnessChecker::None:
+ break;
+ case FreshnessChecker::FreshnessTie:
+ if ((_selfIndex != 0) && !_sleptLastElection) {
+ const auto ms = Milliseconds(_replExecutor.nextRandomInt64(1000) + 50);
+ const Date_t nextCandidateTime = now + ms;
+ log() << "possible election tie; sleeping " << ms.count() << "ms until "
+ << dateToISOStringLocal(nextCandidateTime);
+ _topCoord->setElectionSleepUntil(nextCandidateTime);
+ _replExecutor.scheduleWorkAt(
+ nextCandidateTime,
+ stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie,
+ this,
+ stdx::placeholders::_1));
+ _sleptLastElection = true;
+ return;
+ }
+ _sleptLastElection = false;
+ break;
+ case FreshnessChecker::FresherNodeFound:
+ log() << "not electing self, we are not freshest";
return;
- }
-
- _freshnessChecker.reset(new FreshnessChecker);
-
- // This is necessary because the freshnessChecker may call directly into winning an
- // election, if there are no other MaybeUp nodes. Winning an election attempts to lock
- // _mutex again.
- lk.unlock();
-
- StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _freshnessChecker->start(
- &_replExecutor,
- lastOpTimeApplied.getTimestamp(),
- _rsConfig,
- _selfIndex,
- _topCoord->getMaybeUpHostAndPorts(),
- stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, this));
- if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ case FreshnessChecker::QuorumUnreachable:
+ log() << "not electing self, we could not contact enough voting members";
+ return;
+ default:
+ log() << "not electing self due to election abort message :"
+ << static_cast<int>(abortReason);
return;
- }
- fassert(18681, nextPhaseEvh.getStatus());
- lossGuard.dismiss();
}
- void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() {
- invariant(_freshnessChecker);
- invariant(!_electCmdRunner);
- LoseElectionGuard lossGuard(_topCoord.get(),
- &_replExecutor,
- &_freshnessChecker,
- &_electCmdRunner,
- &_electionFinishedEvent);
-
- if (_freshnessChecker->isCanceled()) {
- LOG(2) << "Election canceled during freshness check phase";
- return;
- }
+ log() << "running for election";
+ // Secure our vote for ourself first
+ if (!_topCoord->voteForMyself(now)) {
+ return;
+ }
- const Date_t now(_replExecutor.now());
- const FreshnessChecker::ElectionAbortReason abortReason =
- _freshnessChecker->shouldAbortElection();
-
- // need to not sleep after last time sleeping,
- switch (abortReason) {
- case FreshnessChecker::None:
- break;
- case FreshnessChecker::FreshnessTie:
- if ((_selfIndex != 0) && !_sleptLastElection) {
- const auto ms = Milliseconds(_replExecutor.nextRandomInt64(1000) + 50);
- const Date_t nextCandidateTime = now + ms;
- log() << "possible election tie; sleeping " << ms.count() << "ms until " <<
- dateToISOStringLocal(nextCandidateTime);
- _topCoord->setElectionSleepUntil(nextCandidateTime);
- _replExecutor.scheduleWorkAt(
- nextCandidateTime,
- stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie,
- this,
- stdx::placeholders::_1));
- _sleptLastElection = true;
- return;
- }
- _sleptLastElection = false;
- break;
- case FreshnessChecker::FresherNodeFound:
- log() << "not electing self, we are not freshest";
- return;
- case FreshnessChecker::QuorumUnreachable:
- log() << "not electing self, we could not contact enough voting members";
- return;
- default:
- log() << "not electing self due to election abort message :"
- << static_cast<int>(abortReason);
- return;
- }
+ _electCmdRunner.reset(new ElectCmdRunner);
+ StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _electCmdRunner->start(
+ &_replExecutor,
+ _rsConfig,
+ _selfIndex,
+ _topCoord->getMaybeUpHostAndPorts(),
+ stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, this));
+ if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return;
+ }
+ fassert(18685, nextPhaseEvh.getStatus());
+ lossGuard.dismiss();
+}
+
+void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() {
+ LoseElectionGuard lossGuard(_topCoord.get(),
+ &_replExecutor,
+ &_freshnessChecker,
+ &_electCmdRunner,
+ &_electionFinishedEvent);
+
+ invariant(_freshnessChecker);
+ invariant(_electCmdRunner);
+ if (_electCmdRunner->isCanceled()) {
+ LOG(2) << "Election canceled during elect self phase";
+ return;
+ }
- log() << "running for election";
- // Secure our vote for ourself first
- if (!_topCoord->voteForMyself(now)) {
- return;
- }
+ const int receivedVotes = _electCmdRunner->getReceivedVotes();
- _electCmdRunner.reset(new ElectCmdRunner);
- StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _electCmdRunner->start(
- &_replExecutor,
- _rsConfig,
- _selfIndex,
- _topCoord->getMaybeUpHostAndPorts(),
- stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, this));
- if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(18685, nextPhaseEvh.getStatus());
- lossGuard.dismiss();
+ if (receivedVotes < _rsConfig.getMajorityVoteCount()) {
+ log() << "couldn't elect self, only received " << receivedVotes
+ << " votes, but needed at least " << _rsConfig.getMajorityVoteCount();
+ // Suppress ourselves from standing for election again, giving other nodes a chance
+ // to win their elections.
+ const auto ms = Milliseconds(_replExecutor.nextRandomInt64(1000) + 50);
+ const Date_t now(_replExecutor.now());
+ const Date_t nextCandidateTime = now + ms;
+ log() << "waiting until " << nextCandidateTime << " before standing for election again";
+ _topCoord->setElectionSleepUntil(nextCandidateTime);
+ _replExecutor.scheduleWorkAt(
+ nextCandidateTime,
+ stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie,
+ this,
+ stdx::placeholders::_1));
+ return;
}
- void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() {
- LoseElectionGuard lossGuard(_topCoord.get(),
- &_replExecutor,
- &_freshnessChecker,
- &_electCmdRunner,
- &_electionFinishedEvent);
-
- invariant(_freshnessChecker);
- invariant(_electCmdRunner);
- if (_electCmdRunner->isCanceled()) {
- LOG(2) << "Election canceled during elect self phase";
- return;
- }
+ if (_rsConfig.getConfigVersion() != _freshnessChecker->getOriginalConfigVersion()) {
+ log() << "config version changed during our election, ignoring result";
+ return;
+ }
- const int receivedVotes = _electCmdRunner->getReceivedVotes();
-
- if (receivedVotes < _rsConfig.getMajorityVoteCount()) {
- log() << "couldn't elect self, only received " << receivedVotes <<
- " votes, but needed at least " << _rsConfig.getMajorityVoteCount();
- // Suppress ourselves from standing for election again, giving other nodes a chance
- // to win their elections.
- const auto ms = Milliseconds(_replExecutor.nextRandomInt64(1000) + 50);
- const Date_t now(_replExecutor.now());
- const Date_t nextCandidateTime = now + ms;
- log() << "waiting until " << nextCandidateTime << " before standing for election again";
- _topCoord->setElectionSleepUntil(nextCandidateTime);
- _replExecutor.scheduleWorkAt(
- nextCandidateTime,
- stdx::bind(&ReplicationCoordinatorImpl::_recoverFromElectionTie,
- this,
- stdx::placeholders::_1));
- return;
- }
+ log() << "election succeeded, assuming primary role";
- if (_rsConfig.getConfigVersion() != _freshnessChecker->getOriginalConfigVersion()) {
- log() << "config version changed during our election, ignoring result";
- return;
- }
-
- log() << "election succeeded, assuming primary role";
+ lossGuard.dismiss();
+ _freshnessChecker.reset(NULL);
+ _electCmdRunner.reset(NULL);
+ _performPostMemberStateUpdateAction(kActionWinElection);
+ _replExecutor.signalEvent(_electionFinishedEvent);
+}
- lossGuard.dismiss();
- _freshnessChecker.reset(NULL);
- _electCmdRunner.reset(NULL);
- _performPostMemberStateUpdateAction(kActionWinElection);
- _replExecutor.signalEvent(_electionFinishedEvent);
+void ReplicationCoordinatorImpl::_recoverFromElectionTie(
+ const ReplicationExecutor::CallbackArgs& cbData) {
+ if (!cbData.status.isOK()) {
+ return;
}
-
- void ReplicationCoordinatorImpl::_recoverFromElectionTie(
- const ReplicationExecutor::CallbackArgs& cbData) {
- if (!cbData.status.isOK()) {
- return;
- }
- if (_topCoord->checkShouldStandForElection(_replExecutor.now(), getMyLastOptime())) {
- _startElectSelf();
- }
+ if (_topCoord->checkShouldStandForElection(_replExecutor.now(), getMyLastOptime())) {
+ _startElectSelf();
}
+}
} // namespace repl
} // namespace mongo