diff options
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.cpp | 188 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_v1_test.cpp | 40 |
3 files changed, 143 insertions, 96 deletions
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index e38cd18f126..86c58d4281d 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -895,7 +895,6 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( Milliseconds networkRoundTripTime, const HostAndPort& target, const StatusWith<ReplSetHeartbeatResponse>& hbResponse) { - const MemberState originalState = getMemberState(); PingStats& hbStats = _pings[target]; invariant(hbStats.getLastHeartbeatStartDate() != Date_t()); const bool isUnauthorized = (hbResponse.getStatus().code() == ErrorCodes::Unauthorized) || @@ -943,6 +942,9 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( "error"_attr = hbResponse.getStatus()); } + HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); + nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); + if (hbResponse.isOK() && hbResponse.getValue().hasConfig()) { // -2 is for uninitialized config. const ConfigVersionAndTerm currentConfigVersionAndTerm = _rsConfig.isInitialized() @@ -950,9 +952,16 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( : ConfigVersionAndTerm(-2, OpTime::kUninitializedTerm); const ReplSetConfig& newConfig = hbResponse.getValue().getConfig(); if (newConfig.getConfigVersionAndTerm() > currentConfigVersionAndTerm) { - HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeReconfigAction(); + nextAction = HeartbeatResponseAction::makeReconfigAction(); nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); - return nextAction; + + // TODO(SERVER-48178) Only continue processing heartbeat in primary state to avoid + // concurrent reconfig and rollback. + if (_role != Role::kLeader) { + return nextAction; + } + + // Continue processing heartbeat responses even if we decide to install a new config. } else { // Could be we got the newer version before we got the response, or the // target erroneously sent us one, even though it isn't newer. @@ -982,8 +991,6 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( // Check if the heartbeat target is in our config. If it isn't, there's nothing left to do, // so return early. if (!_rsConfig.isInitialized()) { - HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); - nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); return nextAction; } // If we're not in the config, we don't need to respond to heartbeats. @@ -995,8 +1002,6 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( "Could not find ourself in current config so ignoring heartbeat", "target"_attr = target, "currentConfig"_attr = _rsConfig.toBSON()); - HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); - nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); return nextAction; } const int memberIndex = _rsConfig.findMemberIndexByHostAndPort(target); @@ -1008,8 +1013,6 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( "Could not find target in current config so ignoring", "target"_attr = target, "currentConfig"_attr = _rsConfig.toBSON()); - HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction(); - nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); return nextAction; } @@ -1047,8 +1050,14 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( advancedOpTimeOrUpdatedConfig = hbData.setUpValues(now, std::move(hbr)); } - HeartbeatResponseAction nextAction; - nextAction = _updatePrimaryFromHBDataV1(memberIndex, originalState, now); + _updatePrimaryFromHBDataV1(now); + + // If we've decided to install a newer config, we don't need to consider takeovers. + if (nextAction.getAction() == HeartbeatResponseAction::Reconfig) { + return nextAction; + } + + nextAction = _shouldTakeOverPrimary(memberIndex); nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); nextAction.setAdvancedOpTimeOrUpdatedConfig(advancedOpTimeOrUpdatedConfig); @@ -1386,23 +1395,14 @@ MemberData* TopologyCoordinator::_findMemberDataByMemberId(const int memberId) { return nullptr; } -HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBDataV1( - int updatedConfigIndex, const MemberState& originalState, Date_t now) { - // +void TopologyCoordinator::_updatePrimaryFromHBDataV1(Date_t now) { // Updates the local notion of which remote node, if any is primary. - // Start the priority takeover process if we are eligible. - // - - invariant(updatedConfigIndex != _selfIndex); + invariant(_selfIndex != -1); - // If we are missing from the config, do not participate in primary maintenance or election. - if (_selfIndex == -1) { - return HeartbeatResponseAction::makeNoAction(); - } // If we are the primary, there must be no other primary, otherwise its higher term would // have already made us step down. if (_currentPrimaryIndex == _selfIndex) { - return HeartbeatResponseAction::makeNoAction(); + return; } // Scan the member list's heartbeat data for who is primary, and update _currentPrimaryIndex. @@ -1416,84 +1416,90 @@ HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBDataV1( } } _currentPrimaryIndex = primaryIndex; - if (_currentPrimaryIndex == -1) { - return HeartbeatResponseAction::makeNoAction(); - } // Clear last heartbeat message on ourselves. setMyHeartbeatMessage(now, ""); +} +HeartbeatResponseAction TopologyCoordinator::_shouldTakeOverPrimary(int updatedConfigIndex) { // Takeover when the replset is stable. // // Take over the primary only if the remote primary is in the latest term I know. // This is done only when we get a heartbeat response from the primary. // Otherwise, there must be an outstanding election, which may succeed or not, but // the remote primary will become aware of that election eventually and step down. - if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex) { - - // Don't schedule catchup takeover if catchup takeover or primary catchup is disabled. - bool catchupTakeoverDisabled = - ReplSetConfig::kCatchUpDisabled == _rsConfig.getCatchUpTimeoutPeriod() || - ReplSetConfig::kCatchUpTakeoverDisabled == _rsConfig.getCatchUpTakeoverDelay(); - - bool scheduleCatchupTakeover = false; - bool schedulePriorityTakeover = false; - - if (!catchupTakeoverDisabled && - (_memberData.at(primaryIndex).getLastAppliedOpTime() < - _memberData.at(_selfIndex).getLastAppliedOpTime())) { - LOGV2_FOR_ELECTION(23975, - 2, - "I can take over the primary due to fresher data", - "primaryIndex"_attr = primaryIndex, - "primaryTerm"_attr = _memberData.at(primaryIndex).getTerm(), - "primaryOpTime"_attr = - _memberData.at(primaryIndex).getLastAppliedOpTime(), - "myOpTime"_attr = _memberData.at(_selfIndex).getLastAppliedOpTime(), - "replicaSetStatus"_attr = _getReplSetStatusString()); - - scheduleCatchupTakeover = true; - } - - if (_rsConfig.getMemberAt(primaryIndex).getPriority() < - _rsConfig.getMemberAt(_selfIndex).getPriority()) { - LOGV2_FOR_ELECTION(23977, - 2, - "I can take over the primary due to higher priority", - "primaryIndex"_attr = primaryIndex, - "primaryTerm"_attr = _memberData.at(primaryIndex).getTerm(), - "replicaSetStatus"_attr = _getReplSetStatusString()); - - schedulePriorityTakeover = true; - } - - // Calculate rank of current node. A rank of 0 indicates that it has the highest priority. - auto currentNodePriority = _rsConfig.getMemberAt(_selfIndex).getPriority(); - - // Schedule a priority takeover early only if we know that the current node has the highest - // priority in the replica set, has a higher priority than the primary, and is the most - // up to date node. - // Otherwise, prefer to schedule a catchup takeover over a priority takeover - if (scheduleCatchupTakeover && schedulePriorityTakeover && - _rsConfig.calculatePriorityRank(currentNodePriority) == 0) { - LOGV2_FOR_ELECTION( - 23979, - 2, - "I can take over the primary because I have a higher priority, the highest " - "priority in the replica set, and fresher data. Current primary index: " - "{primaryIndex} in term {primaryTerm}", - "I can take over the primary because I have a higher priority, the highest " - "priority in the replica set, and fresher data", - "primaryIndex"_attr = primaryIndex, - "primaryTerm"_attr = _memberData.at(primaryIndex).getTerm()); - return HeartbeatResponseAction::makePriorityTakeoverAction(); - } - if (scheduleCatchupTakeover) { - return HeartbeatResponseAction::makeCatchupTakeoverAction(); - } - if (schedulePriorityTakeover) { - return HeartbeatResponseAction::makePriorityTakeoverAction(); - } + + if (_currentPrimaryIndex == -1) { + return HeartbeatResponseAction::makeNoAction(); + } + + auto primaryIndex = _currentPrimaryIndex; + if (_memberData.at(primaryIndex).getTerm() != _term || updatedConfigIndex != primaryIndex) { + return HeartbeatResponseAction::makeNoAction(); + } + + // Don't schedule catchup takeover if catchup takeover or primary catchup is disabled. + bool catchupTakeoverDisabled = + ReplSetConfig::kCatchUpDisabled == _rsConfig.getCatchUpTimeoutPeriod() || + ReplSetConfig::kCatchUpTakeoverDisabled == _rsConfig.getCatchUpTakeoverDelay(); + + bool scheduleCatchupTakeover = false; + bool schedulePriorityTakeover = false; + + if (!catchupTakeoverDisabled && + (_memberData.at(primaryIndex).getLastAppliedOpTime() < + _memberData.at(_selfIndex).getLastAppliedOpTime())) { + LOGV2_FOR_ELECTION(23975, + 2, + "I can take over the primary due to fresher data", + "primaryIndex"_attr = primaryIndex, + "primaryTerm"_attr = _memberData.at(primaryIndex).getTerm(), + "primaryOpTime"_attr = + _memberData.at(primaryIndex).getLastAppliedOpTime(), + "myOpTime"_attr = _memberData.at(_selfIndex).getLastAppliedOpTime(), + "replicaSetStatus"_attr = _getReplSetStatusString()); + + scheduleCatchupTakeover = true; + } + + if (_rsConfig.getMemberAt(primaryIndex).getPriority() < + _rsConfig.getMemberAt(_selfIndex).getPriority()) { + LOGV2_FOR_ELECTION(23977, + 2, + "I can take over the primary due to higher priority", + "primaryIndex"_attr = primaryIndex, + "primaryTerm"_attr = _memberData.at(primaryIndex).getTerm(), + "replicaSetStatus"_attr = _getReplSetStatusString()); + + schedulePriorityTakeover = true; + } + + // Calculate rank of current node. A rank of 0 indicates that it has the highest priority. + auto currentNodePriority = _rsConfig.getMemberAt(_selfIndex).getPriority(); + + // Schedule a priority takeover early only if we know that the current node has the highest + // priority in the replica set, has a higher priority than the primary, and is the most + // up to date node. + // Otherwise, prefer to schedule a catchup takeover over a priority takeover + if (scheduleCatchupTakeover && schedulePriorityTakeover && + _rsConfig.calculatePriorityRank(currentNodePriority) == 0) { + LOGV2_FOR_ELECTION( + 23979, + 2, + "I can take over the primary because I have a higher priority, the highest " + "priority in the replica set, and fresher data. Current primary index: " + "{primaryIndex} in term {primaryTerm}", + "I can take over the primary because I have a higher priority, the highest " + "priority in the replica set, and fresher data", + "primaryIndex"_attr = primaryIndex, + "primaryTerm"_attr = _memberData.at(primaryIndex).getTerm()); + return HeartbeatResponseAction::makePriorityTakeoverAction(); + } + if (scheduleCatchupTakeover) { + return HeartbeatResponseAction::makeCatchupTakeoverAction(); + } + if (schedulePriorityTakeover) { + return HeartbeatResponseAction::makePriorityTakeoverAction(); } return HeartbeatResponseAction::makeNoAction(); } diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 797f1beecb5..56ec2fd881b 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -975,14 +975,15 @@ private: MemberData* _findMemberDataByMemberId(const int memberId); /** - * Performs updating "_currentPrimaryIndex" for processHeartbeatResponse(), and determines if an - * election or stepdown should commence. + * Performs updating "_currentPrimaryIndex" for processHeartbeatResponse(). */ - HeartbeatResponseAction _updatePrimaryFromHBDataV1(int updatedConfigIndex, - const MemberState& originalState, - Date_t now); + void _updatePrimaryFromHBDataV1(Date_t now); /** + * Determine if the node should run PriorityTakeover or CatchupTakeover. + */ + HeartbeatResponseAction _shouldTakeOverPrimary(int updatedConfigIndex); + /** * Updates _memberData based on the newConfig, ensuring that every member in the newConfig * has an entry in _memberData. If any nodes in the newConfig are also present in * _currentConfig, copies their heartbeat info into the corresponding entry in the updated diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index fc370fbee11..fa7ffbd01a8 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -4356,6 +4356,46 @@ TEST_F(HeartbeatResponseReconfigTestV1, NodeAcceptsConfigIfVersionInHeartbeatRes ASSERT_EQ(action.getAction(), HeartbeatResponseAction::Reconfig); } +TEST_F(HeartbeatResponseReconfigTestV1, PrimaryAcceptsConfigAfterUpdatingHeartbeatData) { + // Become primary but don't completeTransitionToPrimary. + Timestamp electionTimestamp(2, 0); + getTopoCoord().changeMemberState_forTest(MemberState::RS_PRIMARY, electionTimestamp); + getTopoCoord().setCurrentPrimary_forTest(getSelfIndex(), electionTimestamp); + OpTime dummyOpTime(electionTimestamp, getTopoCoord().getTerm()); + setMyOpTime(dummyOpTime); + + long long version = initConfigVersion + 1; + long long term = initConfigTerm; + auto config = ReplSetConfig::parse(makeRSConfigWithVersionAndTerm(version, term)); + + auto remote = HostAndPort("host2"); + auto remoteId = getCurrentConfig().findMemberByHostAndPort(remote)->getId(); + auto oldOpTime = getTopoCoord().latestKnownOpTimeSinceHeartbeatRestartPerMember().at(remoteId); + ASSERT_FALSE(oldOpTime); + + // Construct a higher OpTime. + OpTime newOpTime{{20, 1}, term}; + ReplSetHeartbeatResponse hb; + hb.initialize(BSON("ok" << 1 << "v" << 1 << "state" << MemberState::RS_SECONDARY), 1).ignore(); + hb.setConfig(config); + hb.setConfigVersion(version); + hb.setConfigTerm(term); + hb.setAppliedOpTimeAndWallTime({newOpTime, now()}); + StatusWith<ReplSetHeartbeatResponse> hbResponse(hb); + + getTopoCoord().prepareHeartbeatRequestV1(now(), "rs0", remote); + now() += Milliseconds(1); + auto action = + getTopoCoord().processHeartbeatResponse(now(), Milliseconds(1), remote, hbResponse); + ASSERT_EQ(action.getAction(), HeartbeatResponseAction::Reconfig); + + // Check that heartbeat response updated the heartbeat data even on reconfig. + auto actualOpTime = + getTopoCoord().latestKnownOpTimeSinceHeartbeatRestartPerMember().at(remoteId); + ASSERT(actualOpTime); + ASSERT_EQUALS(*actualOpTime, newOpTime); +} + TEST_F(HeartbeatResponseReconfigTestV1, NodeAcceptsConfigIfTermInHeartbeatResponseIsNewer) { long long version = initConfigVersion; long long term = initConfigTerm + 1; |