summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp188
-rw-r--r--src/mongo/db/repl/topology_coordinator.h11
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp40
3 files changed, 143 insertions, 96 deletions
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index e38cd18f126..86c58d4281d 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -895,7 +895,6 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
Milliseconds networkRoundTripTime,
const HostAndPort& target,
const StatusWith<ReplSetHeartbeatResponse>& hbResponse) {
- const MemberState originalState = getMemberState();
PingStats& hbStats = _pings[target];
invariant(hbStats.getLastHeartbeatStartDate() != Date_t());
const bool isUnauthorized = (hbResponse.getStatus().code() == ErrorCodes::Unauthorized) ||
@@ -943,6 +942,9 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
"error"_attr = hbResponse.getStatus());
}
+ HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
+ nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
+
if (hbResponse.isOK() && hbResponse.getValue().hasConfig()) {
// -2 is for uninitialized config.
const ConfigVersionAndTerm currentConfigVersionAndTerm = _rsConfig.isInitialized()
@@ -950,9 +952,16 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
: ConfigVersionAndTerm(-2, OpTime::kUninitializedTerm);
const ReplSetConfig& newConfig = hbResponse.getValue().getConfig();
if (newConfig.getConfigVersionAndTerm() > currentConfigVersionAndTerm) {
- HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeReconfigAction();
+ nextAction = HeartbeatResponseAction::makeReconfigAction();
nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
- return nextAction;
+
+ // TODO(SERVER-48178) Only continue processing heartbeat in primary state to avoid
+ // concurrent reconfig and rollback.
+ if (_role != Role::kLeader) {
+ return nextAction;
+ }
+
+ // Continue processing heartbeat responses even if we decide to install a new config.
} else {
// Could be we got the newer version before we got the response, or the
// target erroneously sent us one, even though it isn't newer.
@@ -982,8 +991,6 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
// Check if the heartbeat target is in our config. If it isn't, there's nothing left to do,
// so return early.
if (!_rsConfig.isInitialized()) {
- HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
- nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
return nextAction;
}
// If we're not in the config, we don't need to respond to heartbeats.
@@ -995,8 +1002,6 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
"Could not find ourself in current config so ignoring heartbeat",
"target"_attr = target,
"currentConfig"_attr = _rsConfig.toBSON());
- HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
- nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
return nextAction;
}
const int memberIndex = _rsConfig.findMemberIndexByHostAndPort(target);
@@ -1008,8 +1013,6 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
"Could not find target in current config so ignoring",
"target"_attr = target,
"currentConfig"_attr = _rsConfig.toBSON());
- HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
- nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
return nextAction;
}
@@ -1047,8 +1050,14 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
advancedOpTimeOrUpdatedConfig = hbData.setUpValues(now, std::move(hbr));
}
- HeartbeatResponseAction nextAction;
- nextAction = _updatePrimaryFromHBDataV1(memberIndex, originalState, now);
+ _updatePrimaryFromHBDataV1(now);
+
+ // If we've decided to install a newer config, we don't need to consider takeovers.
+ if (nextAction.getAction() == HeartbeatResponseAction::Reconfig) {
+ return nextAction;
+ }
+
+ nextAction = _shouldTakeOverPrimary(memberIndex);
nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
nextAction.setAdvancedOpTimeOrUpdatedConfig(advancedOpTimeOrUpdatedConfig);
@@ -1386,23 +1395,14 @@ MemberData* TopologyCoordinator::_findMemberDataByMemberId(const int memberId) {
return nullptr;
}
-HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBDataV1(
- int updatedConfigIndex, const MemberState& originalState, Date_t now) {
- //
+void TopologyCoordinator::_updatePrimaryFromHBDataV1(Date_t now) {
// Updates the local notion of which remote node, if any is primary.
- // Start the priority takeover process if we are eligible.
- //
-
- invariant(updatedConfigIndex != _selfIndex);
+ invariant(_selfIndex != -1);
- // If we are missing from the config, do not participate in primary maintenance or election.
- if (_selfIndex == -1) {
- return HeartbeatResponseAction::makeNoAction();
- }
// If we are the primary, there must be no other primary, otherwise its higher term would
// have already made us step down.
if (_currentPrimaryIndex == _selfIndex) {
- return HeartbeatResponseAction::makeNoAction();
+ return;
}
// Scan the member list's heartbeat data for who is primary, and update _currentPrimaryIndex.
@@ -1416,84 +1416,90 @@ HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBDataV1(
}
}
_currentPrimaryIndex = primaryIndex;
- if (_currentPrimaryIndex == -1) {
- return HeartbeatResponseAction::makeNoAction();
- }
// Clear last heartbeat message on ourselves.
setMyHeartbeatMessage(now, "");
+}
+HeartbeatResponseAction TopologyCoordinator::_shouldTakeOverPrimary(int updatedConfigIndex) {
// Takeover when the replset is stable.
//
// Take over the primary only if the remote primary is in the latest term I know.
// This is done only when we get a heartbeat response from the primary.
// Otherwise, there must be an outstanding election, which may succeed or not, but
// the remote primary will become aware of that election eventually and step down.
- if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex) {
-
- // Don't schedule catchup takeover if catchup takeover or primary catchup is disabled.
- bool catchupTakeoverDisabled =
- ReplSetConfig::kCatchUpDisabled == _rsConfig.getCatchUpTimeoutPeriod() ||
- ReplSetConfig::kCatchUpTakeoverDisabled == _rsConfig.getCatchUpTakeoverDelay();
-
- bool scheduleCatchupTakeover = false;
- bool schedulePriorityTakeover = false;
-
- if (!catchupTakeoverDisabled &&
- (_memberData.at(primaryIndex).getLastAppliedOpTime() <
- _memberData.at(_selfIndex).getLastAppliedOpTime())) {
- LOGV2_FOR_ELECTION(23975,
- 2,
- "I can take over the primary due to fresher data",
- "primaryIndex"_attr = primaryIndex,
- "primaryTerm"_attr = _memberData.at(primaryIndex).getTerm(),
- "primaryOpTime"_attr =
- _memberData.at(primaryIndex).getLastAppliedOpTime(),
- "myOpTime"_attr = _memberData.at(_selfIndex).getLastAppliedOpTime(),
- "replicaSetStatus"_attr = _getReplSetStatusString());
-
- scheduleCatchupTakeover = true;
- }
-
- if (_rsConfig.getMemberAt(primaryIndex).getPriority() <
- _rsConfig.getMemberAt(_selfIndex).getPriority()) {
- LOGV2_FOR_ELECTION(23977,
- 2,
- "I can take over the primary due to higher priority",
- "primaryIndex"_attr = primaryIndex,
- "primaryTerm"_attr = _memberData.at(primaryIndex).getTerm(),
- "replicaSetStatus"_attr = _getReplSetStatusString());
-
- schedulePriorityTakeover = true;
- }
-
- // Calculate rank of current node. A rank of 0 indicates that it has the highest priority.
- auto currentNodePriority = _rsConfig.getMemberAt(_selfIndex).getPriority();
-
- // Schedule a priority takeover early only if we know that the current node has the highest
- // priority in the replica set, has a higher priority than the primary, and is the most
- // up to date node.
- // Otherwise, prefer to schedule a catchup takeover over a priority takeover
- if (scheduleCatchupTakeover && schedulePriorityTakeover &&
- _rsConfig.calculatePriorityRank(currentNodePriority) == 0) {
- LOGV2_FOR_ELECTION(
- 23979,
- 2,
- "I can take over the primary because I have a higher priority, the highest "
- "priority in the replica set, and fresher data. Current primary index: "
- "{primaryIndex} in term {primaryTerm}",
- "I can take over the primary because I have a higher priority, the highest "
- "priority in the replica set, and fresher data",
- "primaryIndex"_attr = primaryIndex,
- "primaryTerm"_attr = _memberData.at(primaryIndex).getTerm());
- return HeartbeatResponseAction::makePriorityTakeoverAction();
- }
- if (scheduleCatchupTakeover) {
- return HeartbeatResponseAction::makeCatchupTakeoverAction();
- }
- if (schedulePriorityTakeover) {
- return HeartbeatResponseAction::makePriorityTakeoverAction();
- }
+
+ if (_currentPrimaryIndex == -1) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+
+ auto primaryIndex = _currentPrimaryIndex;
+ if (_memberData.at(primaryIndex).getTerm() != _term || updatedConfigIndex != primaryIndex) {
+ return HeartbeatResponseAction::makeNoAction();
+ }
+
+ // Don't schedule catchup takeover if catchup takeover or primary catchup is disabled.
+ bool catchupTakeoverDisabled =
+ ReplSetConfig::kCatchUpDisabled == _rsConfig.getCatchUpTimeoutPeriod() ||
+ ReplSetConfig::kCatchUpTakeoverDisabled == _rsConfig.getCatchUpTakeoverDelay();
+
+ bool scheduleCatchupTakeover = false;
+ bool schedulePriorityTakeover = false;
+
+ if (!catchupTakeoverDisabled &&
+ (_memberData.at(primaryIndex).getLastAppliedOpTime() <
+ _memberData.at(_selfIndex).getLastAppliedOpTime())) {
+ LOGV2_FOR_ELECTION(23975,
+ 2,
+ "I can take over the primary due to fresher data",
+ "primaryIndex"_attr = primaryIndex,
+ "primaryTerm"_attr = _memberData.at(primaryIndex).getTerm(),
+ "primaryOpTime"_attr =
+ _memberData.at(primaryIndex).getLastAppliedOpTime(),
+ "myOpTime"_attr = _memberData.at(_selfIndex).getLastAppliedOpTime(),
+ "replicaSetStatus"_attr = _getReplSetStatusString());
+
+ scheduleCatchupTakeover = true;
+ }
+
+ if (_rsConfig.getMemberAt(primaryIndex).getPriority() <
+ _rsConfig.getMemberAt(_selfIndex).getPriority()) {
+ LOGV2_FOR_ELECTION(23977,
+ 2,
+ "I can take over the primary due to higher priority",
+ "primaryIndex"_attr = primaryIndex,
+ "primaryTerm"_attr = _memberData.at(primaryIndex).getTerm(),
+ "replicaSetStatus"_attr = _getReplSetStatusString());
+
+ schedulePriorityTakeover = true;
+ }
+
+ // Calculate rank of current node. A rank of 0 indicates that it has the highest priority.
+ auto currentNodePriority = _rsConfig.getMemberAt(_selfIndex).getPriority();
+
+ // Schedule a priority takeover early only if we know that the current node has the highest
+ // priority in the replica set, has a higher priority than the primary, and is the most
+ // up to date node.
+ // Otherwise, prefer to schedule a catchup takeover over a priority takeover
+ if (scheduleCatchupTakeover && schedulePriorityTakeover &&
+ _rsConfig.calculatePriorityRank(currentNodePriority) == 0) {
+ LOGV2_FOR_ELECTION(
+ 23979,
+ 2,
+ "I can take over the primary because I have a higher priority, the highest "
+ "priority in the replica set, and fresher data. Current primary index: "
+ "{primaryIndex} in term {primaryTerm}",
+ "I can take over the primary because I have a higher priority, the highest "
+ "priority in the replica set, and fresher data",
+ "primaryIndex"_attr = primaryIndex,
+ "primaryTerm"_attr = _memberData.at(primaryIndex).getTerm());
+ return HeartbeatResponseAction::makePriorityTakeoverAction();
+ }
+ if (scheduleCatchupTakeover) {
+ return HeartbeatResponseAction::makeCatchupTakeoverAction();
+ }
+ if (schedulePriorityTakeover) {
+ return HeartbeatResponseAction::makePriorityTakeoverAction();
}
return HeartbeatResponseAction::makeNoAction();
}
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 797f1beecb5..56ec2fd881b 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -975,14 +975,15 @@ private:
MemberData* _findMemberDataByMemberId(const int memberId);
/**
- * Performs updating "_currentPrimaryIndex" for processHeartbeatResponse(), and determines if an
- * election or stepdown should commence.
+ * Performs updating "_currentPrimaryIndex" for processHeartbeatResponse().
*/
- HeartbeatResponseAction _updatePrimaryFromHBDataV1(int updatedConfigIndex,
- const MemberState& originalState,
- Date_t now);
+ void _updatePrimaryFromHBDataV1(Date_t now);
/**
+ * Determine if the node should run PriorityTakeover or CatchupTakeover.
+ */
+ HeartbeatResponseAction _shouldTakeOverPrimary(int updatedConfigIndex);
+ /**
* Updates _memberData based on the newConfig, ensuring that every member in the newConfig
* has an entry in _memberData. If any nodes in the newConfig are also present in
* _currentConfig, copies their heartbeat info into the corresponding entry in the updated
diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index fc370fbee11..fa7ffbd01a8 100644
--- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
@@ -4356,6 +4356,46 @@ TEST_F(HeartbeatResponseReconfigTestV1, NodeAcceptsConfigIfVersionInHeartbeatRes
ASSERT_EQ(action.getAction(), HeartbeatResponseAction::Reconfig);
}
+TEST_F(HeartbeatResponseReconfigTestV1, PrimaryAcceptsConfigAfterUpdatingHeartbeatData) {
+ // Become primary but don't completeTransitionToPrimary.
+ Timestamp electionTimestamp(2, 0);
+ getTopoCoord().changeMemberState_forTest(MemberState::RS_PRIMARY, electionTimestamp);
+ getTopoCoord().setCurrentPrimary_forTest(getSelfIndex(), electionTimestamp);
+ OpTime dummyOpTime(electionTimestamp, getTopoCoord().getTerm());
+ setMyOpTime(dummyOpTime);
+
+ long long version = initConfigVersion + 1;
+ long long term = initConfigTerm;
+ auto config = ReplSetConfig::parse(makeRSConfigWithVersionAndTerm(version, term));
+
+ auto remote = HostAndPort("host2");
+ auto remoteId = getCurrentConfig().findMemberByHostAndPort(remote)->getId();
+ auto oldOpTime = getTopoCoord().latestKnownOpTimeSinceHeartbeatRestartPerMember().at(remoteId);
+ ASSERT_FALSE(oldOpTime);
+
+ // Construct a higher OpTime.
+ OpTime newOpTime{{20, 1}, term};
+ ReplSetHeartbeatResponse hb;
+ hb.initialize(BSON("ok" << 1 << "v" << 1 << "state" << MemberState::RS_SECONDARY), 1).ignore();
+ hb.setConfig(config);
+ hb.setConfigVersion(version);
+ hb.setConfigTerm(term);
+ hb.setAppliedOpTimeAndWallTime({newOpTime, now()});
+ StatusWith<ReplSetHeartbeatResponse> hbResponse(hb);
+
+ getTopoCoord().prepareHeartbeatRequestV1(now(), "rs0", remote);
+ now() += Milliseconds(1);
+ auto action =
+ getTopoCoord().processHeartbeatResponse(now(), Milliseconds(1), remote, hbResponse);
+ ASSERT_EQ(action.getAction(), HeartbeatResponseAction::Reconfig);
+
+ // Check that heartbeat response updated the heartbeat data even on reconfig.
+ auto actualOpTime =
+ getTopoCoord().latestKnownOpTimeSinceHeartbeatRestartPerMember().at(remoteId);
+ ASSERT(actualOpTime);
+ ASSERT_EQUALS(*actualOpTime, newOpTime);
+}
+
TEST_F(HeartbeatResponseReconfigTestV1, NodeAcceptsConfigIfTermInHeartbeatResponseIsNewer) {
long long version = initConfigVersion;
long long term = initConfigTerm + 1;