diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp | 110 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/vote_requester.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/vote_requester.h | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/vote_requester_test.cpp | 83 |
10 files changed, 263 insertions, 26 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 84f5cb58031..5ca9f6cabf5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2504,7 +2504,7 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( case kActionStartSingleNodeElection: // In protocol version 1, single node replset will run an election instead of // kActionWinElection as in protocol version 0. - _startElectSelfV1(); + _startElectSelfV1(TopologyCoordinator::StartElectionReason::kElectionTimeout); break; default: severe() << "Unknown post member state update action " << static_cast<int>(action); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 27d698feb69..cb30bd1164b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -848,8 +848,8 @@ private: * _onVoteRequestComplete() */ void _startElectSelf_inlock(); - void _startElectSelfV1_inlock(); - void _startElectSelfV1(); + void _startElectSelfV1_inlock(TopologyCoordinator::StartElectionReason reason); + void _startElectSelfV1(TopologyCoordinator::StartElectionReason reason); /** * Callback called when the FreshnessChecker has completed; checks the results and diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp index 394878e52ec..1050da2461e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp @@ -85,12 +85,14 @@ public: }; -void ReplicationCoordinatorImpl::_startElectSelfV1() { +void ReplicationCoordinatorImpl::_startElectSelfV1( + TopologyCoordinator::StartElectionReason reason) { stdx::lock_guard<stdx::mutex> lk(_mutex); - _startElectSelfV1_inlock(); + _startElectSelfV1_inlock(reason); } -void ReplicationCoordinatorImpl::_startElectSelfV1_inlock() { +void ReplicationCoordinatorImpl::_startElectSelfV1_inlock( + TopologyCoordinator::StartElectionReason reason) { invariant(!_voteRequester); invariant(!_freshnessChecker); @@ -138,13 +140,20 @@ void ReplicationCoordinatorImpl::_startElectSelfV1_inlock() { _voteRequester.reset(new VoteRequester); long long term = _topCoord->getTerm(); + int primaryIndex = -1; + + // Only set primaryIndex if the primary's vote is required during the dry run. + if (reason == TopologyCoordinator::StartElectionReason::kCatchupTakeover) { + primaryIndex = _topCoord->getCurrentPrimaryIndex(); + } StatusWith<executor::TaskExecutor::EventHandle> nextPhaseEvh = _voteRequester->start(_replExecutor.get(), _rsConfig, _selfIndex, _topCoord->getTerm(), true, // dry run - lastOpTime); + lastOpTime, + primaryIndex); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } @@ -175,6 +184,9 @@ void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) { } else if (endResult == VoteRequester::Result::kStaleTerm) { log() << "not running for primary, we have been superceded already"; return; + } else if (endResult == VoteRequester::Result::kPrimaryRespondedNo) { + log() << "not running for primary, the current primary responded no in the dry run"; + return; } else if (endResult != VoteRequester::Result::kSuccessfullyElected) { log() << "not running for primary, we received an unexpected problem"; return; @@ -241,7 +253,7 @@ void ReplicationCoordinatorImpl::_startVoteRequester_inlock(long long newTerm) { _voteRequester.reset(new VoteRequester); StatusWith<executor::TaskExecutor::EventHandle> nextPhaseEvh = _voteRequester->start( - _replExecutor.get(), _rsConfig, _selfIndex, _topCoord->getTerm(), false, lastOpTime); + _replExecutor.get(), _rsConfig, _selfIndex, _topCoord->getTerm(), false, lastOpTime, -1); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } @@ -264,6 +276,7 @@ void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long originalTerm) } const VoteRequester::Result endResult = _voteRequester->getResult(); + invariant(endResult != VoteRequester::Result::kPrimaryRespondedNo); switch (endResult) { case VoteRequester::Result::kInsufficientVotes: @@ -275,6 +288,10 @@ void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long originalTerm) case VoteRequester::Result::kSuccessfullyElected: log() << "election succeeded, assuming primary role in term " << _topCoord->getTerm(); break; + case VoteRequester::Result::kPrimaryRespondedNo: + // This is impossible because we would only require the primary's + // vote during a dry run. + invariant(false); } // Mark all nodes that responded to our vote request as up to avoid immediately diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index ca7ef2f5d86..3b6f9500378 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -1372,6 +1372,116 @@ TEST_F(TakeoverTest, SuccessfulCatchupTakeover) { lastVoteExpected); } +TEST_F(TakeoverTest, CatchupTakeoverDryRunFailsPrimarySaysNo) { + startCapturingLogMessages(); + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345") + << BSON("_id" << 4 << "host" + << "node4:12345") + << BSON("_id" << 5 << "host" + << "node5:12345")) + << "protocolVersion" + << 1); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + HostAndPort primaryHostAndPort("node2", 12345); + + auto replCoord = getReplCoord(); + auto now = getNet()->now(); + + OperationContextNoop opCtx; + OpTime currentOptime(Timestamp(100, 5000), 0); + OpTime behindOptime(Timestamp(100, 4000), 0); + + replCoord->setMyLastAppliedOpTime(currentOptime); + replCoord->setMyLastDurableOpTime(currentOptime); + + // Update the term so that the current term is ahead of the term of + // the last applied op time. This means that the primary is still in + // catchup mode since it hasn't written anything this term. + ASSERT_EQUALS(ErrorCodes::StaleTerm, replCoord->updateTerm(&opCtx, replCoord->getTerm() + 1)); + + // Make sure we're secondary and that no takeover has been scheduled. + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_FALSE(replCoord->getCatchupTakeover_forTest()); + + // Mock a first round of heartbeat responses. + now = respondToHeartbeatsUntil(config, now, primaryHostAndPort, behindOptime); + + // Make sure that the catchup takeover has actually been scheduled and at the + // correct time. + ASSERT(replCoord->getCatchupTakeover_forTest()); + auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get(); + Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now; + ASSERT_EQUALS(config.getCatchUpTakeoverDelay(), catchupTakeoverDelay); + + // The catchup takeover will be scheduled at a time later than one election + // timeout after our initial heartbeat responses, so mock a few rounds of + // heartbeat responses to prevent a normal election timeout. + now = respondToHeartbeatsUntil( + config, catchupTakeoverTime, HostAndPort("node2", 12345), behindOptime); + + ASSERT_EQUALS(1, countLogLinesContaining("Starting an election for a catchup takeover")); + + // Simulate a dry run where the primary has caught up and is now ahead of the + // node trying to do the catchup takeover. All the secondary nodes respond + // first so that it tests that we require the primary vote even when we've + // received a majority of the votes. Then the primary responds no to the vote + // request and as a result the dry run fails. + int voteRequests = 0; + int votesExpected = config.getNumMembers() - 1; + NetworkInterfaceMock* net = getNet(); + net->enterNetwork(); + NetworkInterfaceMock::NetworkOperationIterator noi_primary; + Date_t until = net->now() + Seconds(1); + while (voteRequests < votesExpected) { + unittest::log() << "request: " << voteRequests << " expected: " << votesExpected; + const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + log() << request.target.toString() << " processing " << request.cmdObj; + if (request.cmdObj.firstElement().fieldNameStringData() != "replSetRequestVotes") { + net->blackHole(noi); + } else { + bool voteGranted = request.target != primaryHostAndPort; + net->scheduleResponse( + noi, + until, + makeResponseStatus(BSON("ok" << 1 << "term" << 1 << "voteGranted" << voteGranted + << "reason" + << ""))); + voteRequests++; + } + net->runReadyNetworkOperations(); + } + + while (net->now() < until) { + net->runUntil(until); + if (net->hasReadyRequests()) { + const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + net->blackHole(noi); + } + } + net->exitNetwork(); + + getReplCoord()->waitForElectionDryRunFinish_forTest(); + stopCapturingLogMessages(); + + // Make sure an election wasn't called for and that we are still secondary. + ASSERT_EQUALS(1, + countLogLinesContaining( + "not running for primary, the current primary responded no in the dry run")); + ASSERT(replCoord->getMemberState().secondary()); +} + TEST_F(TakeoverTest, PrimaryCatchesUpBeforeCatchupTakeover) { BSONObj configObj = BSON("_id" << "mySet" diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 35199798605..34988a9bfac 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -791,7 +791,7 @@ void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1( const auto status = _topCoord->becomeCandidateIfElectable(_replExecutor->now(), reason); if (!status.isOK()) { switch (reason) { - case TopologyCoordinator::TopologyCoordinator::StartElectionReason::kElectionTimeout: + case TopologyCoordinator::StartElectionReason::kElectionTimeout: log() << "Not starting an election, since we are not electable due to: " << status.reason(); break; @@ -827,7 +827,7 @@ void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1( break; } - _startElectSelfV1_inlock(); + _startElectSelfV1_inlock(reason); } } // namespace repl diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 3c2df705d93..36d558957e7 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -587,6 +587,11 @@ public: */ virtual void setPrimaryIndex(long long primaryIndex) = 0; + /** + * Returns the current primary index. + */ + virtual int getCurrentPrimaryIndex() const = 0; + enum StartElectionReason { kElectionTimeout, kPriorityTakeover, diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 5114f012be2..1cc6f30bb13 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -228,6 +228,7 @@ public: virtual void voteForMyselfV1(); virtual void prepareForStepDown(); virtual void setPrimaryIndex(long long primaryIndex); + virtual int getCurrentPrimaryIndex() const; virtual bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten); virtual bool haveTaggedNodesReachedOpTime(const OpTime& opTime, const ReplSetTagPattern& tagPattern, @@ -278,9 +279,6 @@ public: // Returns _electionId. Only used in unittests. OID getElectionId() const; - // Returns _currentPrimaryIndex. Only used in unittests. - int getCurrentPrimaryIndex() const; - private: enum UnelectableReason { None = 0, diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp index bb26c524e3a..3f9af9f2e0e 100644 --- a/src/mongo/db/repl/vote_requester.cpp +++ b/src/mongo/db/repl/vote_requester.cpp @@ -48,7 +48,8 @@ VoteRequester::Algorithm::Algorithm(const ReplSetConfig& rsConfig, long long candidateIndex, long long term, bool dryRun, - OpTime lastDurableOpTime) + OpTime lastDurableOpTime, + int primaryIndex) : _rsConfig(rsConfig), _candidateIndex(candidateIndex), _term(term), @@ -60,6 +61,9 @@ VoteRequester::Algorithm::Algorithm(const ReplSetConfig& rsConfig, if (member->isVoter() && index != candidateIndex) { _targets.push_back(member->getHostAndPort()); } + if (index == primaryIndex) { + _primaryHost = member->getHostAndPort(); + } index++; } } @@ -98,6 +102,11 @@ void VoteRequester::Algorithm::processResponse(const RemoteCommandRequest& reque return; } _responders.insert(request.target); + + // If the primary's vote is a yes, we will set _primaryVote to be Yes. + if (request.target == _primaryHost.get()) { + _primaryVote = PrimaryVote::No; + } ReplSetRequestVotesResponse voteResponse; const auto status = voteResponse.initialize(response.data); if (!status.isOK()) { @@ -106,6 +115,9 @@ void VoteRequester::Algorithm::processResponse(const RemoteCommandRequest& reque if (voteResponse.getVoteGranted()) { logLine << "received a yes vote from " << request.target; + if (request.target == _primaryHost.get()) { + _primaryVote = PrimaryVote::Yes; + } _votes++; } else { logLine << "received a no vote from " << request.target << " with reason \"" @@ -119,13 +131,23 @@ void VoteRequester::Algorithm::processResponse(const RemoteCommandRequest& reque } bool VoteRequester::Algorithm::hasReceivedSufficientResponses() const { - return _staleTerm || _votes == _rsConfig.getMajorityVoteCount() || + if (_primaryHost && _primaryVote == PrimaryVote::No) { + return true; + } + + if (_primaryHost && _primaryVote == PrimaryVote::Pending) { + return false; + } + + return _staleTerm || _votes >= _rsConfig.getMajorityVoteCount() || _responsesProcessed == static_cast<int>(_targets.size()); } VoteRequester::Result VoteRequester::Algorithm::getResult() const { if (_staleTerm) { return Result::kStaleTerm; + } else if (_primaryHost && _primaryVote != PrimaryVote::Yes) { + return Result::kPrimaryRespondedNo; } else if (_votes >= _rsConfig.getMajorityVoteCount()) { return Result::kSuccessfullyElected; } else { @@ -146,8 +168,10 @@ StatusWith<executor::TaskExecutor::EventHandle> VoteRequester::start( long long candidateIndex, long long term, bool dryRun, - OpTime lastDurableOpTime) { - _algorithm.reset(new Algorithm(rsConfig, candidateIndex, term, dryRun, lastDurableOpTime)); + OpTime lastDurableOpTime, + int primaryIndex) { + _algorithm.reset( + new Algorithm(rsConfig, candidateIndex, term, dryRun, lastDurableOpTime, primaryIndex)); _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor)); return _runner->start(); } diff --git a/src/mongo/db/repl/vote_requester.h b/src/mongo/db/repl/vote_requester.h index a64802eba16..d3d33851379 100644 --- a/src/mongo/db/repl/vote_requester.h +++ b/src/mongo/db/repl/vote_requester.h @@ -50,11 +50,7 @@ class VoteRequester { MONGO_DISALLOW_COPYING(VoteRequester); public: - enum class Result { - kSuccessfullyElected, - kStaleTerm, - kInsufficientVotes, - }; + enum class Result { kSuccessfullyElected, kStaleTerm, kInsufficientVotes, kPrimaryRespondedNo }; class Algorithm : public ScatterGatherAlgorithm { public: @@ -62,7 +58,8 @@ public: long long candidateIndex, long long term, bool dryRun, - OpTime lastDurableOpTime); + OpTime lastDurableOpTime, + int primaryIndex); virtual ~Algorithm(); virtual std::vector<executor::RemoteCommandRequest> getRequests() const; virtual void processResponse(const executor::RemoteCommandRequest& request, @@ -82,6 +79,8 @@ public: unordered_set<HostAndPort> getResponders() const; private: + enum class PrimaryVote { Pending, Yes, No }; + const ReplSetConfig _rsConfig; const long long _candidateIndex; const long long _term; @@ -92,6 +91,8 @@ public: bool _staleTerm = false; long long _responsesProcessed = 0; long long _votes = 1; + boost::optional<HostAndPort> _primaryHost; + PrimaryVote _primaryVote = PrimaryVote::Pending; }; VoteRequester(); @@ -100,6 +101,8 @@ public: /** * Begins the process of sending replSetRequestVotes commands to all non-DOWN nodes * in currentConfig, in attempt to receive sufficient votes to win the election. + * If primaryIndex is not -1, then it means that the primary's vote is required + * to win the elction. * * evh can be used to schedule a callback when the process is complete. * If this function returns Status::OK(), evh is then guaranteed to be signaled. @@ -109,7 +112,8 @@ public: long long candidateIndex, long long term, bool dryRun, - OpTime lastDurableOpTime); + OpTime lastDurableOpTime, + int primaryIndex); /** * Informs the VoteRequester to cancel further processing. diff --git a/src/mongo/db/repl/vote_requester_test.cpp b/src/mongo/db/repl/vote_requester_test.cpp index e3edf89fe6c..04142a340a3 100644 --- a/src/mongo/db/repl/vote_requester_test.cpp +++ b/src/mongo/db/repl/vote_requester_test.cpp @@ -89,7 +89,8 @@ public: candidateId, term, false, // not a dryRun - lastOplogEntry)); + lastOplogEntry, + -1)); } virtual void tearDown() { @@ -218,7 +219,42 @@ public: candidateId, term, true, // dryRun - lastOplogEntry)); + lastOplogEntry, + -1)); + } +}; + +class VoteRequesterCatchupTakeoverDryRunTest : public VoteRequesterTest { +public: + virtual void setUp() { + ReplSetConfig config; + ASSERT_OK(config.initialize(BSON("_id" + << "rs0" + << "version" + << 2 + << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "host0") + << BSON("_id" << 1 << "host" + << "host1") + << BSON("_id" << 2 << "host" + << "host2") + << BSON("_id" << 3 << "host" + << "host3") + << BSON("_id" << 4 << "host" + << "host4"))))); + ASSERT_OK(config.validate()); + long long candidateId = 0; + long long term = 2; + int primaryIndex = 1; + OpTime lastOplogEntry = OpTime(Timestamp(999, 0), 1); + + _requester.reset(new VoteRequester::Algorithm(config, + candidateId, + term, + true, // dryRun + lastOplogEntry, + primaryIndex)); } }; @@ -418,6 +454,49 @@ TEST_F(VoteRequesterDryRunTest, NotEnoughVotesLoseElection) { stopCapturingLogMessages(); } +TEST_F(VoteRequesterCatchupTakeoverDryRunTest, CatchupTakeoverPrimarySaysYesWinElection) { + ASSERT_FALSE(hasReceivedSufficientResponses()); + processResponse(requestFrom("host1"), votedYes()); + processResponse(requestFrom("host2"), votedYes()); + ASSERT_TRUE(hasReceivedSufficientResponses()); + ASSERT(VoteRequester::Result::kSuccessfullyElected == getResult()); + ASSERT_EQUALS(2, getNumResponders()); +} + +TEST_F(VoteRequesterCatchupTakeoverDryRunTest, CatchupTakeoverPrimarySaysYesButNotEnoughVotes) { + ASSERT_FALSE(hasReceivedSufficientResponses()); + processResponse(requestFrom("host1"), votedYes()); + ASSERT(VoteRequester::Result::kInsufficientVotes == getResult()); + processResponse(requestFrom("host2"), votedNoBecauseLastOpTimeIsGreater()); + processResponse(requestFrom("host3"), votedNoBecauseLastOpTimeIsGreater()); + processResponse(requestFrom("host4"), votedNoBecauseLastOpTimeIsGreater()); + ASSERT_TRUE(hasReceivedSufficientResponses()); + ASSERT(VoteRequester::Result::kInsufficientVotes == getResult()); + ASSERT_EQUALS(4, getNumResponders()); +} + +TEST_F(VoteRequesterCatchupTakeoverDryRunTest, CatchupTakeoverPrimarySaysNoLoseElection) { + startCapturingLogMessages(); + ASSERT_FALSE(hasReceivedSufficientResponses()); + processResponse(requestFrom("host2"), votedYes()); + processResponse(requestFrom("host3"), votedYes()); + + // This covers the case that the Vote Requester is cancelled partway through + // the dry run before the primary responded. + ASSERT(VoteRequester::Result::kPrimaryRespondedNo == getResult()); + + // It also tests that even if a majority of yes votes have already been received, + // it still needs to wait for a yes response from the primary. + ASSERT_FALSE(hasReceivedSufficientResponses()); + + processResponse(requestFrom("host1"), votedNoBecauseLastOpTimeIsGreater()); + ASSERT_EQUALS(1, countLogLinesContaining("received a no vote from host1:27017")); + ASSERT_TRUE(hasReceivedSufficientResponses()); + ASSERT(VoteRequester::Result::kPrimaryRespondedNo == getResult()); + ASSERT_EQUALS(3, getNumResponders()); + stopCapturingLogMessages(); +} + } // namespace } // namespace repl } // namespace mongo |