diff options
author | Jason Chan <jason.chan@mongodb.com> | 2020-06-25 21:02:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-13 21:45:43 +0000 |
commit | c31cac2855eb84c9bc824852773265c8e0b60a4e (patch) | |
tree | 30c871c1ad20f3c7a404b34adf3a3e89bbe08088 | |
parent | b62fbbebc64f1ab105db49a1cc410af6d878add1 (diff) | |
download | mongo-c31cac2855eb84c9bc824852773265c8e0b60a4e.tar.gz |
SERVER-48256 Add ElectionState object to manage vote requester logic
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 187 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp | 186 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp | 121 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/vote_requester.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/vote_requester.h | 1 |
7 files changed, 335 insertions, 213 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 76649706d7f..6f293809254 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1082,7 +1082,7 @@ Status ReplicationCoordinatorImpl::_setFollowerMode(OperationContext* opCtx, "Cannot set follower mode when node is currently the leader"); } - if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) { + if (auto electionFinishedEvent = _cancelElectionIfNeeded(lk)) { // We were a candidate, which means _topCoord believed us to be in state RS_SECONDARY, and // we know that newState != RS_SECONDARY because we would have returned early, above if // the old and new state were equal. So, try again after the election is over to @@ -3543,7 +3543,7 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(OperationContext* opCtx, executor::TaskExecutor::EventHandle electionFinishedEvent; { stdx::lock_guard<Latch> lk(_mutex); - electionFinishedEvent = _cancelElectionIfNeeded_inlock(); + electionFinishedEvent = _cancelElectionIfNeeded(lk); } // If there is an election in-progress, there can be at most one. No new election can happen as @@ -5548,14 +5548,26 @@ void ReplicationCoordinatorImpl::_dropAllSnapshots_inlock() { } void ReplicationCoordinatorImpl::waitForElectionFinish_forTest() { - if (_electionFinishedEvent.isValid()) { - _replExecutor->waitForEvent(_electionFinishedEvent); + EventHandle finishedEvent; + { + stdx::lock_guard lk(_mutex); + if (_electionState) { + finishedEvent = _electionState->getElectionFinishedEvent(lk); + } + } + if (finishedEvent.isValid()) { + _replExecutor->waitForEvent(finishedEvent); } } void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() { - if (_electionDryRunFinishedEvent.isValid()) { - _replExecutor->waitForEvent(_electionDryRunFinishedEvent); + EventHandle finishedEvent; + if (_electionState) { + stdx::lock_guard lk(_mutex); + finishedEvent = _electionState->getElectionDryRunFinishedEvent(lk); + } + if (finishedEvent.isValid()) { + _replExecutor->waitForEvent(finishedEvent); } } @@ -5621,7 +5633,10 @@ Status ReplicationCoordinatorImpl::stepUpIfEligible(bool skipDryRun) { EventHandle finishEvent; { stdx::lock_guard<Latch> lk(_mutex); - finishEvent = _electionFinishedEvent; + // A null _electionState indicates that the election has already completed. + if (_electionState) { + finishEvent = _electionState->getElectionFinishedEvent(lk); + } } if (finishEvent.isValid()) { _replExecutor->waitForEvent(finishEvent); @@ -5639,13 +5654,14 @@ Status ReplicationCoordinatorImpl::stepUpIfEligible(bool skipDryRun) { return Status::OK(); } -executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_cancelElectionIfNeeded_inlock() { +executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_cancelElectionIfNeeded( + WithLock lk) { if (_topCoord->getRole() != TopologyCoordinator::Role::kCandidate) { return {}; } - invariant(_voteRequester); - _voteRequester->cancel(); - return _electionFinishedEvent; + invariant(_electionState); + _electionState->cancel(lk); + return _electionState->getElectionFinishedEvent(lk); } int64_t ReplicationCoordinatorImpl::_nextRandomInt64_inlock(int64_t limit) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 71b485b8379..e73d5398acf 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -47,6 +47,7 @@ #include "mongo/db/repl/sync_source_resolver.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/update_position_args.h" +#include "mongo/db/repl/vote_requester.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/random.h" @@ -82,7 +83,6 @@ class ReplSetConfig; class SyncSourceFeedback; class StorageInterface; class TopologyCoordinator; -class VoteRequester; class ReplicationCoordinatorImpl : public ReplicationCoordinator { ReplicationCoordinatorImpl(const ReplicationCoordinatorImpl&) = delete; @@ -466,13 +466,13 @@ public: long long term, TopologyCoordinator::UpdateTermResult* updateResult); /** - * If called after _startElectSelfV1_inlock(), blocks until all asynchronous + * If called after ElectionState::start(), blocks until all asynchronous * activities associated with election complete. */ void waitForElectionFinish_forTest(); /** - * If called after _startElectSelfV1_inlock(), blocks until all asynchronous + * If called after ElectionState::start(), blocks until all asynchronous * activities associated with election dry run complete, including writing * last vote and scheduling the real election. */ @@ -484,6 +484,12 @@ public: */ void waitForStepDownAttempt_forTest(); + /** + * Cancels all future processing work of the VoteRequester and sets the election state to + * kCanceled. + */ + void cancelElection_forTest(); + private: using CallbackFn = executor::TaskExecutor::CallbackFn; @@ -496,9 +502,6 @@ private: using SharedPromiseOfIsMasterResponse = SharedPromise<std::shared_ptr<const IsMasterResponse>>; - class LoseElectionGuardV1; - class LoseElectionDryRunGuardV1; - /** * Configuration states for a replica set node. * @@ -728,6 +731,112 @@ private: long _numCatchUpOps = 0; }; + class ElectionState { + public: + ElectionState(ReplicationCoordinatorImpl* repl) + : _repl(repl), + _topCoord(repl->_topCoord.get()), + _replExecutor(repl->_replExecutor.get()) {} + + /** + * Begins an attempt to elect this node. + * Called after an incoming heartbeat changes this node's view of the set such that it + * believes it can be elected PRIMARY. + * For proper concurrency, start methods must be called while holding _mutex. + * + * For V1 (raft) style elections the election path is: + * _processDryRunResult() (may skip) + * _startRealElection() + * _writeLastVoteForMyElection() + * _requestVotesForRealElection() + * _onVoteRequestComplete() + */ + void start(WithLock lk, StartElectionReasonEnum reason); + + // Returns the election finished event. + executor::TaskExecutor::EventHandle getElectionFinishedEvent(WithLock); + + // Returns the election dry run finished event. + executor::TaskExecutor::EventHandle getElectionDryRunFinishedEvent(WithLock); + + // Notifies the VoteRequester to cancel further processing. Sets the election state to + // canceled. + void cancel(WithLock lk); + + private: + class LoseElectionGuardV1; + class LoseElectionDryRunGuardV1; + + /** + * Returns the election result from the VoteRequester. + */ + VoteRequester::Result _getElectionResult(WithLock) const; + + /** + * Starts the VoteRequester and requests votes from known members of the replica set. + */ + StatusWith<executor::TaskExecutor::EventHandle> _startVoteRequester( + WithLock, long long term, bool dryRun, OpTime lastAppliedOpTime, int primaryIndex); + + /** + * Starts VoteRequester to run the real election when last vote write has completed. + */ + void _requestVotesForRealElection(WithLock lk, + long long newTerm, + StartElectionReasonEnum reason); + + /** + * Callback called when the dryRun VoteRequester has completed; checks the results and + * decides whether to conduct a proper election. + * "originalTerm" was the term during which the dry run began, if the term has since + * changed, do not run for election. + */ + void _processDryRunResult(long long originalTerm, StartElectionReasonEnum reason); + + /** + * Begins executing a real election. This is called either a successful dry run, or when the + * dry run was skipped (which may be specified for a ReplSetStepUp). + */ + void _startRealElection(WithLock lk, + long long originalTerm, + StartElectionReasonEnum reason); + + /** + * Writes the last vote in persistent storage after completing dry run successfully. + * This job will be scheduled to run in DB worker threads. + */ + void _writeLastVoteForMyElection(LastVote lastVote, + const executor::TaskExecutor::CallbackArgs& cbData, + StartElectionReasonEnum reason); + + /** + * Callback called when the VoteRequester has completed; checks the results and + * decides whether to change state to primary and alert other nodes of our primary-ness. + * "originalTerm" was the term during which the election began, if the term has since + * changed, do not step up as primary. + */ + void _onVoteRequestComplete(long long originalTerm, StartElectionReasonEnum reason); + + // Not owned. + ReplicationCoordinatorImpl* _repl; + // The VoteRequester used to start and gather results from the election voting process. + std::unique_ptr<VoteRequester> _voteRequester; + // Flag that indicates whether the election has been canceled. + bool _isCanceled = false; + // Event that the election code will signal when the in-progress election completes. + executor::TaskExecutor::EventHandle _electionFinishedEvent; + + // Event that the election code will signal when the in-progress election dry run completes, + // which includes writing the last vote and scheduling the real election. + executor::TaskExecutor::EventHandle _electionDryRunFinishedEvent; + + // Pointer to the TopologyCoordinator owned by ReplicationCoordinator. + TopologyCoordinator* _topCoord; + + // Pointer to the executor owned by ReplicationCoordinator. + executor::TaskExecutor* _replExecutor; + }; + // Inner class to manage the concurrency of _canAcceptNonLocalWrites and _canServeNonLocalReads. class ReadWriteAbility { public: @@ -1079,57 +1188,6 @@ private: void _onFollowerModeStateChange(); /** - * Begins an attempt to elect this node. - * Called after an incoming heartbeat changes this node's view of the set such that it - * believes it can be elected PRIMARY. - * For proper concurrency, start methods must be called while holding _mutex. - * - * For V1 (raft) style elections the election path is: - * _startElectSelfIfEligibleV1() - * _processDryRunResult() (may skip) - * _startRealElection_inlock() - * _writeLastVoteForMyElection() - * _startVoteRequester_inlock() - * _onVoteRequestComplete() - */ - void _startElectSelfV1_inlock(StartElectionReasonEnum reason); - - /** - * Callback called when the dryRun VoteRequester has completed; checks the results and - * decides whether to conduct a proper election. - * "originalTerm" was the term during which the dry run began, if the term has since - * changed, do not run for election. - */ - void _processDryRunResult(long long originalTerm, StartElectionReasonEnum reason); - - /** - * Begins executing a real election. This is called either a successful dry run, or when the - * dry run was skipped (which may be specified for a ReplSetStepUp). - */ - void _startRealElection_inlock(long long originalTerm, StartElectionReasonEnum reason); - - /** - * Writes the last vote in persistent storage after completing dry run successfully. - * This job will be scheduled to run in DB worker threads. - */ - void _writeLastVoteForMyElection(LastVote lastVote, - const executor::TaskExecutor::CallbackArgs& cbData, - StartElectionReasonEnum reason); - - /** - * Starts VoteRequester to run the real election when last vote write has completed. - */ - void _startVoteRequester_inlock(long long newTerm, StartElectionReasonEnum reason); - - /** - * Callback called when the VoteRequester has completed; checks the results and - * decides whether to change state to primary and alert other nodes of our primary-ness. - * "originalTerm" was the term during which the election began, if the term has since - * changed, do not step up as primary. - */ - void _onVoteRequestComplete(long long originalTerm, StartElectionReasonEnum reason); - - /** * Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply * ignored and no error is thrown. * @@ -1402,7 +1460,7 @@ private: * canceled election completes. If there is no running election, returns an invalid event * handle. */ - executor::TaskExecutor::EventHandle _cancelElectionIfNeeded_inlock(); + executor::TaskExecutor::EventHandle _cancelElectionIfNeeded(WithLock); /** * Waits until the lastApplied opTime is at least the 'targetOpTime'. @@ -1551,15 +1609,6 @@ private: // This member's index position in the current config. int _selfIndex; // (M) - std::unique_ptr<VoteRequester> _voteRequester; // (M) - - // Event that the election code will signal when the in-progress election completes. - executor::TaskExecutor::EventHandle _electionFinishedEvent; // (M) - - // Event that the election code will signal when the in-progress election dry run completes, - // which includes writing the last vote and scheduling the real election. - executor::TaskExecutor::EventHandle _electionDryRunFinishedEvent; // (M) - // Whether we slept last time we attempted an election but possibly tied with other nodes. bool _sleptLastElection; // (M) @@ -1639,6 +1688,10 @@ private: // that the node is currently in catchup mode. std::unique_ptr<CatchupState> _catchupState; // (X) + // The election state that includes logic to start and return information from the election + // voting process. + std::unique_ptr<ElectionState> _electionState; // (M) + // Atomic-synchronized copy of Topology Coordinator's _term, for use by the public getTerm() // function. // This variable must be written immediately after _term, and thus its value can lag. diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp index e0c14764474..68c619af21e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp @@ -39,12 +39,15 @@ #include "mongo/db/repl/vote_requester.h" #include "mongo/logv2/log.h" #include "mongo/platform/mutex.h" +#include "mongo/util/fail_point.h" #include "mongo/util/scopeguard.h" namespace mongo { namespace repl { -class ReplicationCoordinatorImpl::LoseElectionGuardV1 { +MONGO_FAIL_POINT_DEFINE(hangInWritingLastVoteForDryRun); + +class ReplicationCoordinatorImpl::ElectionState::LoseElectionGuardV1 { LoseElectionGuardV1(const LoseElectionGuardV1&) = delete; LoseElectionGuardV1& operator=(const LoseElectionGuardV1&) = delete; @@ -57,14 +60,14 @@ public: } LOGV2(21434, "Lost election", "isDryRun"_attr = _isDryRun); _replCoord->_topCoord->processLoseElection(); - _replCoord->_voteRequester.reset(nullptr); - if (_isDryRun && _replCoord->_electionDryRunFinishedEvent.isValid()) { - _replCoord->_replExecutor->signalEvent(_replCoord->_electionDryRunFinishedEvent); + const auto electionState = _replCoord->_electionState.get(); + if (_isDryRun && electionState->_electionDryRunFinishedEvent.isValid()) { + _replCoord->_replExecutor->signalEvent(electionState->_electionDryRunFinishedEvent); } - if (_replCoord->_electionFinishedEvent.isValid()) { - _replCoord->_replExecutor->signalEvent(_replCoord->_electionFinishedEvent); + if (electionState->_electionFinishedEvent.isValid()) { + _replCoord->_replExecutor->signalEvent(electionState->_electionFinishedEvent); } - + _replCoord->_electionState = nullptr; // Clear the node's election candidate metrics if it loses either the dry-run or actual // election, since it will not become primary. ReplicationMetrics::get(getGlobalServiceContext()).clearElectionCandidateMetrics(); @@ -80,7 +83,8 @@ protected: bool _dismissed = false; }; -class ReplicationCoordinatorImpl::LoseElectionDryRunGuardV1 : public LoseElectionGuardV1 { +class ReplicationCoordinatorImpl::ElectionState::LoseElectionDryRunGuardV1 + : public LoseElectionGuardV1 { LoseElectionDryRunGuardV1(const LoseElectionDryRunGuardV1&) = delete; LoseElectionDryRunGuardV1& operator=(const LoseElectionDryRunGuardV1&) = delete; @@ -91,44 +95,79 @@ public: } }; -void ReplicationCoordinatorImpl::_startElectSelfV1_inlock(StartElectionReasonEnum reason) { - invariant(!_voteRequester); +void ReplicationCoordinatorImpl::cancelElection_forTest() { + stdx::lock_guard<Latch> lk(_mutex); + invariant(_electionState); + _electionState->cancel(lk); +} + +StatusWith<executor::TaskExecutor::EventHandle> +ReplicationCoordinatorImpl::ElectionState::_startVoteRequester( + WithLock lk, long long term, bool dryRun, OpTime lastAppliedOpTime, int primaryIndex) { + _voteRequester.reset(new VoteRequester); + return _voteRequester->start(_replExecutor, + _repl->_rsConfig, + _repl->_selfIndex, + term, + dryRun, + lastAppliedOpTime, + primaryIndex); +} + +VoteRequester::Result ReplicationCoordinatorImpl::ElectionState::_getElectionResult( + WithLock lk) const { + if (_isCanceled) { + return VoteRequester::Result::kCancelled; + } + return _voteRequester->getResult(); +} + +executor::TaskExecutor::EventHandle +ReplicationCoordinatorImpl::ElectionState::getElectionFinishedEvent(WithLock) { + return _electionFinishedEvent; +} + +executor::TaskExecutor::EventHandle +ReplicationCoordinatorImpl::ElectionState::getElectionDryRunFinishedEvent(WithLock) { + return _electionDryRunFinishedEvent; +} + +void ReplicationCoordinatorImpl::ElectionState::cancel(WithLock) { + _isCanceled = true; + _voteRequester->cancel(); +} - switch (_rsConfigState) { +void ReplicationCoordinatorImpl::ElectionState::start(WithLock lk, StartElectionReasonEnum reason) { + LoseElectionDryRunGuardV1 lossGuard(_repl); + switch (_repl->_rsConfigState) { case kConfigSteady: break; case kConfigInitiating: case kConfigReconfiguring: case kConfigHBReconfiguring: LOGV2_DEBUG(21435, 2, "Not standing for election; processing a configuration change"); - // Transition out of candidate role. - _topCoord->processLoseElection(); return; default: LOGV2_FATAL(28641, "Entered replica set election code while in illegal config state " "{rsConfigState}", "Entered replica set election code while in illegal config state", - "rsConfigState"_attr = int(_rsConfigState)); + "rsConfigState"_attr = int(_repl->_rsConfigState)); } - auto finishedEvent = _makeEvent(); + auto finishedEvent = _repl->_makeEvent(); if (!finishedEvent) { return; } _electionFinishedEvent = finishedEvent; - - auto dryRunFinishedEvent = _makeEvent(); + auto dryRunFinishedEvent = _repl->_makeEvent(); if (!dryRunFinishedEvent) { return; } _electionDryRunFinishedEvent = dryRunFinishedEvent; - LoseElectionDryRunGuardV1 lossGuard(this); - - - invariant(_rsConfig.getMemberAt(_selfIndex).isElectable()); - const auto lastAppliedOpTime = _getMyLastAppliedOpTime_inlock(); + invariant(_repl->_rsConfig.getMemberAt(_repl->_selfIndex).isElectable()); + const auto lastAppliedOpTime = _repl->_getMyLastAppliedOpTime_inlock(); if (lastAppliedOpTime == OpTime()) { LOGV2(21436, @@ -146,7 +185,7 @@ void ReplicationCoordinatorImpl::_startElectSelfV1_inlock(StartElectionReasonEnu "skipping dry run and running for election in term {newTerm}", "Skipping dry run and running for election", "newTerm"_attr = newTerm); - _startRealElection_inlock(newTerm, reason); + _startRealElection(lk, newTerm, reason); lossGuard.dismiss(); return; } @@ -156,20 +195,17 @@ void ReplicationCoordinatorImpl::_startElectSelfV1_inlock(StartElectionReasonEnu "conducting a dry run election to see if we could be elected. current term: {currentTerm}", "Conducting a dry run election to see if we could be elected", "currentTerm"_attr = term); - _voteRequester.reset(new VoteRequester); // Only set primaryIndex if the primary's vote is required during the dry run. if (reason == StartElectionReasonEnum::kCatchupTakeover) { primaryIndex = _topCoord->getCurrentPrimaryIndex(); } StatusWith<executor::TaskExecutor::EventHandle> nextPhaseEvh = - _voteRequester->start(_replExecutor.get(), - _rsConfig, - _selfIndex, - term, - true, // dry run - lastAppliedOpTime, - primaryIndex); + _startVoteRequester(lk, + term, + true, // dry run + lastAppliedOpTime, + primaryIndex); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } @@ -183,12 +219,11 @@ void ReplicationCoordinatorImpl::_startElectSelfV1_inlock(StartElectionReasonEnu lossGuard.dismiss(); } -void ReplicationCoordinatorImpl::_processDryRunResult(long long originalTerm, - StartElectionReasonEnum reason) { - stdx::lock_guard<Latch> lk(_mutex); - LoseElectionDryRunGuardV1 lossGuard(this); - - invariant(_voteRequester); +void ReplicationCoordinatorImpl::ElectionState::_processDryRunResult( + long long originalTerm, StartElectionReasonEnum reason) { + stdx::lock_guard<Latch> lk(_repl->_mutex); + LoseElectionDryRunGuardV1 lossGuard(_repl); + invariant(_voteRequester != nullptr); if (_topCoord->getTerm() != originalTerm) { LOGV2(21439, @@ -200,7 +235,7 @@ void ReplicationCoordinatorImpl::_processDryRunResult(long long originalTerm, return; } - const auto endResult = _voteRequester->getResult(); + const auto endResult = _getElectionResult(lk); switch (endResult) { case VoteRequester::Result::kInsufficientVotes: LOGV2(21440, "Not running for primary, we received insufficient votes"); @@ -225,25 +260,28 @@ void ReplicationCoordinatorImpl::_processDryRunResult(long long originalTerm, "Dry election run succeeded, running for election", "newTerm"_attr = newTerm); - _startRealElection_inlock(newTerm, reason); + _startRealElection(lk, newTerm, reason); lossGuard.dismiss(); } -void ReplicationCoordinatorImpl::_startRealElection_inlock(long long newTerm, - StartElectionReasonEnum reason) { +void ReplicationCoordinatorImpl::ElectionState::_startRealElection(WithLock lk, + long long newTerm, + StartElectionReasonEnum reason) { + const auto& rsConfig = _repl->_rsConfig; + const auto selfIndex = _repl->_selfIndex; const Date_t now = _replExecutor->now(); const OpTime lastCommittedOpTime = _topCoord->getLastCommittedOpTime(); const OpTime lastSeenOpTime = _topCoord->latestKnownOpTime(); - const int numVotesNeeded = _rsConfig.getMajorityVoteCount(); - const double priorityAtElection = _rsConfig.getMemberAt(_selfIndex).getPriority(); - const Milliseconds electionTimeoutMillis = _rsConfig.getElectionTimeoutPeriod(); + const int numVotesNeeded = rsConfig.getMajorityVoteCount(); + const double priorityAtElection = rsConfig.getMemberAt(selfIndex).getPriority(); + const Milliseconds electionTimeoutMillis = rsConfig.getElectionTimeoutPeriod(); const int priorPrimaryIndex = _topCoord->getCurrentPrimaryIndex(); const boost::optional<int> priorPrimaryMemberId = (priorPrimaryIndex == -1) ? boost::none - : boost::make_optional(_rsConfig.getMemberAt(priorPrimaryIndex).getId().getData()); + : boost::make_optional(rsConfig.getMemberAt(priorPrimaryIndex).getId().getData()); - ReplicationMetrics::get(getServiceContext()) + ReplicationMetrics::get(_repl->getServiceContext()) .setElectionCandidateMetrics(reason, now, newTerm, @@ -253,12 +291,13 @@ void ReplicationCoordinatorImpl::_startRealElection_inlock(long long newTerm, priorityAtElection, electionTimeoutMillis, priorPrimaryMemberId); - ReplicationMetrics::get(getServiceContext()).incrementNumElectionsCalledForReason(reason); + ReplicationMetrics::get(_repl->getServiceContext()) + .incrementNumElectionsCalledForReason(reason); - LoseElectionDryRunGuardV1 lossGuard(this); + LoseElectionDryRunGuardV1 lossGuard(_repl); TopologyCoordinator::UpdateTermResult updateTermResult; - _updateTerm_inlock(newTerm, &updateTermResult); + _repl->_updateTerm_inlock(newTerm, &updateTermResult); // This is the only valid result from this term update. If we are here, then we are not a // primary, so a stepdown is not possible. We have also not yet learned of a higher term from // someone else: seeing an update in the topology coordinator mid-election requires releasing @@ -268,7 +307,7 @@ void ReplicationCoordinatorImpl::_startRealElection_inlock(long long newTerm, _topCoord->voteForMyselfV1(); // Store the vote in persistent storage. - LastVote lastVote{newTerm, _selfIndex}; + LastVote lastVote{newTerm, selfIndex}; auto cbStatus = _replExecutor->scheduleWork( [this, lastVote, reason](const executor::TaskExecutor::CallbackArgs& cbData) { @@ -281,7 +320,7 @@ void ReplicationCoordinatorImpl::_startRealElection_inlock(long long newTerm, lossGuard.dismiss(); } -void ReplicationCoordinatorImpl::_writeLastVoteForMyElection( +void ReplicationCoordinatorImpl::ElectionState::_writeLastVoteForMyElection( LastVote lastVote, const executor::TaskExecutor::CallbackArgs& cbData, StartElectionReasonEnum reason) { @@ -296,11 +335,15 @@ void ReplicationCoordinatorImpl::_writeLastVoteForMyElection( auto opCtx = cc().makeOperationContext(); // Any writes that occur as part of an election should not be subject to Flow Control. opCtx->setShouldParticipateInFlowControl(false); - return _externalState->storeLocalLastVoteDocument(opCtx.get(), lastVote); + return _repl->_externalState->storeLocalLastVoteDocument(opCtx.get(), lastVote); }(); - stdx::lock_guard<Latch> lk(_mutex); - LoseElectionDryRunGuardV1 lossGuard(this); + if (MONGO_unlikely(hangInWritingLastVoteForDryRun.shouldFail())) { + LOGV2(4825601, "Hang due to hangInWritingLastVoteForDryRun failpoint"); + hangInWritingLastVoteForDryRun.pauseWhileSet(); + } + stdx::lock_guard<Latch> lk(_repl->_mutex); + LoseElectionDryRunGuardV1 lossGuard(_repl); if (status == ErrorCodes::CallbackCanceled) { return; } @@ -323,19 +366,19 @@ void ReplicationCoordinatorImpl::_writeLastVoteForMyElection( "currentTerm"_attr = _topCoord->getTerm()); return; } - _startVoteRequester_inlock(lastVote.getTerm(), reason); + + _requestVotesForRealElection(lk, lastVote.getTerm(), reason); _replExecutor->signalEvent(_electionDryRunFinishedEvent); lossGuard.dismiss(); } -void ReplicationCoordinatorImpl::_startVoteRequester_inlock(long long newTerm, - StartElectionReasonEnum reason) { - const auto lastAppliedOpTime = _getMyLastAppliedOpTime_inlock(); +void ReplicationCoordinatorImpl::ElectionState::_requestVotesForRealElection( + WithLock lk, long long newTerm, StartElectionReasonEnum reason) { + const auto lastAppliedOpTime = _repl->_getMyLastAppliedOpTime_inlock(); - _voteRequester.reset(new VoteRequester); - StatusWith<executor::TaskExecutor::EventHandle> nextPhaseEvh = _voteRequester->start( - _replExecutor.get(), _rsConfig, _selfIndex, newTerm, false, lastAppliedOpTime, -1); + StatusWith<executor::TaskExecutor::EventHandle> nextPhaseEvh = + _startVoteRequester(lk, newTerm, false, lastAppliedOpTime, -1); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } @@ -350,12 +393,11 @@ void ReplicationCoordinatorImpl::_startVoteRequester_inlock(long long newTerm, MONGO_FAIL_POINT_DEFINE(electionHangsBeforeUpdateMemberState); -void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long newTerm, - StartElectionReasonEnum reason) { - stdx::lock_guard<Latch> lk(_mutex); - LoseElectionGuardV1 lossGuard(this); - - invariant(_voteRequester); +void ReplicationCoordinatorImpl::ElectionState::_onVoteRequestComplete( + long long newTerm, StartElectionReasonEnum reason) { + stdx::lock_guard<Latch> lk(_repl->_mutex); + LoseElectionGuardV1 lossGuard(_repl); + invariant(_voteRequester != nullptr); if (_topCoord->getTerm() != newTerm) { LOGV2(21447, @@ -367,7 +409,7 @@ void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long newTerm, return; } - const VoteRequester::Result endResult = _voteRequester->getResult(); + const VoteRequester::Result endResult = _getElectionResult(lk); invariant(endResult != VoteRequester::Result::kPrimaryRespondedNo); switch (endResult) { @@ -385,7 +427,7 @@ void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long newTerm, "election succeeded, assuming primary role in term {term}", "Election succeeded, assuming primary role", "term"_attr = _topCoord->getTerm()); - ReplicationMetrics::get(getServiceContext()) + ReplicationMetrics::get(_repl->getServiceContext()) .incrementNumElectionsSuccessfulForReason(reason); break; case VoteRequester::Result::kPrimaryRespondedNo: @@ -399,9 +441,7 @@ void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long newTerm, Date_t now = _replExecutor->now(); _topCoord->resetMemberTimeouts(now, _voteRequester->getResponders()); - _voteRequester.reset(); auto electionFinishedEvent = _electionFinishedEvent; - electionHangsBeforeUpdateMemberState.execute([&](const BSONObj& customWait) { auto waitForMillis = Milliseconds(customWait["waitForMillis"].numberInt()); LOGV2(21451, @@ -413,9 +453,11 @@ void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long newTerm, sleepFor(waitForMillis); }); - _postWonElectionUpdateMemberState(lk); + _repl->_postWonElectionUpdateMemberState(lk); _replExecutor->signalEvent(electionFinishedEvent); lossGuard.dismiss(); + + _repl->_electionState = nullptr; } } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index eafb5c1ad69..360cfb72bb8 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -160,7 +160,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { } net->exitNetwork(); - // _startElectSelfV1_inlock is called when election timeout expires, so election + // ElectionState::start is called when election timeout expires, so election // finished event has been set. getReplCoord()->waitForElectionFinish_forTest(); @@ -2243,6 +2243,72 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase) ASSERT(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); } +TEST_F(ReplCoordTest, NodeCancelsElectionWhenWritingLastVoteInDryRun) { + // Start up and become electable. + assertStartSuccess(BSON("_id" + << "mySet" + << "version" << 2 << "protocolVersion" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 3 << "host" + << "node3:12345") + << BSON("_id" << 2 << "host" + << "node2:12345")) + << "settings" << BSON("heartbeatIntervalMillis" << 100)), + HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100)); + simulateEnoughHeartbeatsForAllNodesUp(); + // Set a failpoint to hang after checking the results for the dry run but before we initiate the + // vote request for the real election. + const auto hangInWritingLastVoteForDryRun = + globalFailPointRegistry().find("hangInWritingLastVoteForDryRun"); + const auto timesEnteredFailPoint = hangInWritingLastVoteForDryRun->setMode(FailPoint::alwaysOn); + stdx::thread electionThread([&] { simulateSuccessfulDryRun(); }); + // Wait to hit the failpoint. + hangInWritingLastVoteForDryRun->waitForTimesEntered(timesEnteredFailPoint + 1); + ASSERT(TopologyCoordinator::Role::kCandidate == getTopoCoord().getRole()); + + // Cancel the election after the dry-run has already completed but before we create a new + // VoteRequester. + startCapturingLogMessages(); + getReplCoord()->cancelElection_forTest(); + hangInWritingLastVoteForDryRun->setMode(FailPoint::off, 0); + electionThread.join(); + + // Finish the election. We will receive the requested votes, but the election should still be + // canceled. + NetworkInterfaceMock* net = getNet(); + net->enterNetwork(); + while (net->hasReadyRequests()) { + const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + LOGV2(4825602, + "{request_target} processing {request_cmdObj}", + "request_target"_attr = request.target.toString(), + "request_cmdObj"_attr = request.cmdObj); + if (request.cmdObj.firstElement().fieldNameStringData() != "replSetRequestVotes") { + net->blackHole(noi); + } else { + net->scheduleResponse( + noi, + net->now(), + makeResponseStatus( + BSON("ok" << 1 << "term" << 1 << "voteGranted" << true << "reason" + << "The election should be canceled even if I give you votes"))); + } + net->runReadyNetworkOperations(); + } + net->exitNetwork(); + + getReplCoord()->waitForElectionFinish_forTest(); + stopCapturingLogMessages(); + ASSERT_EQUALS( + 1, countTextFormatLogLinesContaining("Not becoming primary, election has been cancelled")); + ASSERT(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); +} + class PrimaryCatchUpTest : public ReplCoordTest { protected: using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator; @@ -3004,59 +3070,6 @@ TEST_F(PrimaryCatchUpTest, CatchUpFailsDueToPrimaryStepDown) { .getNumCatchUpsFailedWithReplSetAbortPrimaryCatchUpCmd_forTesting()); } -class VoteRequesterRunnerTest : public ReplCoordTest { -protected: - void testVoteRequesterCancellation(bool dryRun) { - auto configObj = configWithMembers( - 2, 1, BSON_ARRAY(member(1, "node1:12345") << member(2, "node2:12345"))); - assertStartSuccess(configObj, {"node1", 12345}); - // Clean up existing heartbeat requests on startup. - replyToReceivedHeartbeatV1(); - - auto config = ReplSetConfig::parse(configObj); - auto selfIndex = 0; - auto newTerm = 2; - OpTime lastApplied{Timestamp(1, 1), 1}; - VoteRequester voteRequester; - auto endEvh = voteRequester.start( - getReplExec(), config, selfIndex, newTerm, dryRun, lastApplied, -1 /* primaryIndex */); - ASSERT_OK(endEvh.getStatus()); - - // Process a vote request. - enterNetwork(); - auto noi = getNet()->getNextReadyRequest(); - auto& request = noi->getRequest(); - LOGV2( - 214650, "processing", "target"_attr = request.target, "request"_attr = request.cmdObj); - ASSERT_EQ(request.cmdObj.firstElement().fieldNameStringData(), "replSetRequestVotes"); - - ReplSetRequestVotesResponse response; - response.setVoteGranted(true); - response.setTerm(newTerm); - auto responseObj = (BSONObjBuilder(response.toBSON()) << "ok" << 1).obj(); - getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseObj)); - getNet()->runReadyNetworkOperations(); - exitNetwork(); - - // Election succeeds. - ASSERT(voteRequester.getResult() == VoteRequester::Result::kSuccessfullyElected); - - voteRequester.cancel(); - ASSERT(voteRequester.getResult() == VoteRequester::Result::kCancelled); - - // The event should be signaled, so this returns immediately. - getReplExec()->waitForEvent(endEvh.getValue()); - } -}; - -TEST_F(VoteRequesterRunnerTest, DryRunCancel) { - testVoteRequesterCancellation(true); -} - -TEST_F(VoteRequesterRunnerTest, Cancel) { - testVoteRequesterCancellation(false); -} - } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 3c3fa15769b..c4ebaad4ce5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -742,7 +742,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( // "kConfigReconfiguring" which prevents new elections from happening. { stdx::lock_guard<Latch> lk(_mutex); - if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) { + if (auto electionFinishedEvent = _cancelElectionIfNeeded(lk)) { LOGV2_FOR_HEARTBEATS(4615629, 0, "Waiting for election to complete before finishing reconfig to " @@ -1069,7 +1069,7 @@ void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(StartElectionReason _startElectSelfIfEligibleV1(lock, reason); } -void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(WithLock, +void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(WithLock lk, StartElectionReasonEnum reason) { // If it is not a single node replica set, no need to start an election after stepdown timeout. if (reason == StartElectionReasonEnum::kSingleNodePromptElection && @@ -1173,7 +1173,10 @@ void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(WithLock, MONGO_UNREACHABLE; } - _startElectSelfV1_inlock(reason); + invariant(!_electionState); + + _electionState = std::make_unique<ElectionState>(this); + _electionState->start(lk, reason); } } // namespace repl diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp index 82c62e650b7..368ef902e14 100644 --- a/src/mongo/db/repl/vote_requester.cpp +++ b/src/mongo/db/repl/vote_requester.cpp @@ -196,7 +196,7 @@ stdx::unordered_set<HostAndPort> VoteRequester::Algorithm::getResponders() const return _responders; } -VoteRequester::VoteRequester() : _isCanceled(false) {} +VoteRequester::VoteRequester() {} VoteRequester::~VoteRequester() {} StatusWith<executor::TaskExecutor::EventHandle> VoteRequester::start( @@ -214,14 +214,10 @@ StatusWith<executor::TaskExecutor::EventHandle> VoteRequester::start( } void VoteRequester::cancel() { - _isCanceled = true; _runner->cancel(); } VoteRequester::Result VoteRequester::getResult() const { - if (_isCanceled) - return Result::kCancelled; - return _algorithm->getResult(); } diff --git a/src/mongo/db/repl/vote_requester.h b/src/mongo/db/repl/vote_requester.h index 84eaea45c23..3962e4dec77 100644 --- a/src/mongo/db/repl/vote_requester.h +++ b/src/mongo/db/repl/vote_requester.h @@ -133,7 +133,6 @@ public: private: std::shared_ptr<Algorithm> _algorithm; std::unique_ptr<ScatterGatherRunner> _runner; - bool _isCanceled = false; }; } // namespace repl |