summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2022-03-07 23:08:45 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-17 19:24:18 +0000
commit285fe9de60cce759d745f67b55d828bc19e4c482 (patch)
tree29a1e3e0c7b3ee342ca2d0d3a03cac748be50ee3
parentc0b449903ffccd423737c262d3b5afd73fa36ea8 (diff)
downloadmongo-285fe9de60cce759d745f67b55d828bc19e4c482.tar.gz
SERVER-63417 Refactor shouldChangeSyncSource and improve tests for it in toplogy coordinator
(cherry picked from commit bff37c1e83f474ad68a396951c862b290b6f5fa5)
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp106
-rw-r--r--src/mongo/db/repl/topology_coordinator.h48
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp124
3 files changed, 250 insertions, 28 deletions
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 0363743ad33..6a871b8ec74 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -3021,19 +3021,64 @@ bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSourc
// If the currentSource has the same replication progress as we do and has no source for further
// progress, return true.
+ auto [initialDecision, currentSourceIndex] =
+ _shouldChangeSyncSourceInitialChecks(currentSource);
+ if (initialDecision != ChangeSyncSourceDecision::kMaybe) {
+ return initialDecision == ChangeSyncSourceDecision::kYes;
+ }
+
+ if (!replMetadata.getIsPrimary() &&
+ _shouldChangeSyncSourceDueToNewPrimary(currentSource, currentSourceIndex)) {
+ return true;
+ }
+
+ OpTime currentSourceOpTime =
+ std::max(oqMetadata.getLastOpApplied(),
+ _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
+
+ fassert(4612000, !currentSourceOpTime.isNull());
+
+ int syncSourceIndex = oqMetadata.getSyncSourceIndex();
+ std::string syncSourceHost = oqMetadata.getSyncSourceHost();
+
+ // Change sync source if they are not ahead of us, and don't have a sync source,
+ // unless they are primary.
+ if (_shouldChangeSyncSourceDueToSourceNotAhead(currentSource,
+ syncSourceIndex,
+ replMetadata.getIsPrimary(),
+ currentSourceOpTime,
+ lastOpTimeFetched))
+ return true;
+
+ if (_shouldChangeSyncSourceToBreakCycle(
+ currentSource, syncSourceHost, syncSourceIndex, currentSourceOpTime, lastOpTimeFetched))
+ return true;
+
+ if (_shouldChangeSyncSourceDueToLag(currentSource, currentSourceOpTime, lastOpTimeFetched, now))
+ return true;
+
+ if (_shouldChangeSyncSourceDueToBetterEligibleSource(
+ currentSource, currentSourceIndex, lastOpTimeFetched, now))
+ return true;
+
+ return false;
+}
+
+std::pair<TopologyCoordinator::ChangeSyncSourceDecision, int>
+TopologyCoordinator::_shouldChangeSyncSourceInitialChecks(const HostAndPort& currentSource) const {
if (_selfIndex == -1) {
LOGV2(21828, "Not choosing new sync source because we are not in the config");
- return false;
+ return {ChangeSyncSourceDecision::kNo, -1};
}
- // If the user requested a sync source change, return true.
+ // If the user requested a sync source change, return kYes.
if (_forceSyncSourceIndex != -1) {
LOGV2(21829,
"Choosing new sync source because the user has requested to use "
"{syncSource} as a sync source",
"Choosing new sync source because the user has requested a sync source",
"syncSource"_attr = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort());
- return true;
+ return {ChangeSyncSourceDecision::kYes, -1};
}
// While we can allow data replication across config versions, we still do not allow syncing
@@ -3044,27 +3089,22 @@ bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSourc
"Choosing new sync source because {currentSyncSource} is not in our config",
"Choosing new sync source because current sync source is not in our config",
"currentSyncSource"_attr = currentSource.toString());
- return true;
+ return {ChangeSyncSourceDecision::kYes, -1};
}
invariant(currentSourceIndex != _selfIndex);
+ return {ChangeSyncSourceDecision::kMaybe, currentSourceIndex};
+}
- OpTime currentSourceOpTime =
- std::max(oqMetadata.getLastOpApplied(),
- _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
-
- fassert(4612000, !currentSourceOpTime.isNull());
-
- int syncSourceIndex = oqMetadata.getSyncSourceIndex();
- std::string syncSourceHost = oqMetadata.getSyncSourceHost();
-
+bool TopologyCoordinator::_shouldChangeSyncSourceDueToNewPrimary(const HostAndPort& currentSource,
+ int currentSourceIndex) const {
// Change sync source if chaining is disabled (without overrides), we are not syncing from the
// primary, and we know who the new primary is. We do not consider chaining disabled if we are
// the primary, since we are in catchup mode.
auto chainingDisabled = !_rsConfig.isChainingAllowed() &&
!enableOverrideClusterChainingSetting.load() && _currentPrimaryIndex != _selfIndex;
auto foundNewPrimary = _currentPrimaryIndex != -1 && _currentPrimaryIndex != currentSourceIndex;
- if (!replMetadata.getIsPrimary() && chainingDisabled && foundNewPrimary) {
+ if (chainingDisabled && foundNewPrimary) {
auto newPrimary = _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort();
LOGV2(3962100,
"Choosing new sync source because chaining is disabled and we are aware of a new "
@@ -3073,21 +3113,34 @@ bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSourc
"newPrimary"_attr = newPrimary);
return true;
}
+ return false;
+}
- // Change sync source if they are not ahead of us, and don't have a sync source,
- // unless they are primary.
- if (syncSourceIndex == -1 && currentSourceOpTime <= lastOpTimeFetched &&
- !replMetadata.getIsPrimary()) {
+bool TopologyCoordinator::_shouldChangeSyncSourceDueToSourceNotAhead(
+ const HostAndPort& currentSource,
+ int syncSourceIndex,
+ bool syncSourceIsPrimary,
+ const OpTime& currentSourceOpTime,
+ const OpTime& lastOpTimeFetched) const {
+ if (syncSourceIndex == -1 && currentSourceOpTime <= lastOpTimeFetched && !syncSourceIsPrimary) {
LOGV2(21832,
"Choosing new sync source. Our current sync source is not primary and does "
"not have a sync source, so we require that it is ahead of us",
"syncSource"_attr = currentSource,
"lastOpTimeFetched"_attr = lastOpTimeFetched,
"syncSourceLatestOplogOpTime"_attr = currentSourceOpTime,
- "isPrimary"_attr = replMetadata.getIsPrimary());
+ "isPrimary"_attr = syncSourceIsPrimary);
return true;
}
+ return false;
+}
+bool TopologyCoordinator::_shouldChangeSyncSourceToBreakCycle(
+ const HostAndPort& currentSource,
+ const std::string& syncSourceHost,
+ int syncSourceIndex,
+ const OpTime& currentSourceOpTime,
+ const OpTime& lastOpTimeFetched) const {
// Change sync source if our sync source is also syncing from us when we are in primary
// catchup mode, forming a sync source selection cycle, and the sync source is not ahead
// of us. This is to prevent a deadlock situation. See SERVER-58988 for details.
@@ -3096,7 +3149,7 @@ bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSourc
// node that we think of because it was inferred from the sender node, which could have
// a different config. This is acceptable since we are just choosing a different sync
// source if that happens and reconfigs are rare.
- bool isSyncingFromMe = !syncSourceHost.empty()
+ const bool isSyncingFromMe = !syncSourceHost.empty()
? syncSourceHost == _selfMemberData().getHostAndPort().toString()
: syncSourceIndex == _selfIndex;
@@ -3110,7 +3163,13 @@ bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSourc
"syncSourceLatestOplogOpTime"_attr = currentSourceOpTime);
return true;
}
+ return false;
+}
+bool TopologyCoordinator::_shouldChangeSyncSourceDueToLag(const HostAndPort& currentSource,
+ const OpTime& currentSourceOpTime,
+ const OpTime& lastOpTimeFetched,
+ Date_t now) const {
if (MONGO_unlikely(disableMaxSyncSourceLagSecs.shouldFail())) {
LOGV2(
21833,
@@ -3148,7 +3207,14 @@ bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSourc
}
}
}
+ return false;
+}
+bool TopologyCoordinator::_shouldChangeSyncSourceDueToBetterEligibleSource(
+ const HostAndPort& currentSource,
+ const int currentSourceIndex,
+ const OpTime& lastOpTimeFetched,
+ Date_t now) const {
// Change sync source if our current sync source is not a preferred sync source node choice due
// to non-staleness issues, such as being a non-voter when we are a voter, or being hidden, or
// any of the other conditions checked in _isEligibleSyncSource with firstAttempt=true, and
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 419a74e61a1..8c515945213 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -916,6 +916,54 @@ private:
const OpTime& lastOpTimeFetched,
ReadPreference readPreference);
+ // Does preliminary checkes to see if a new sync source should be chosen
+ // * Do we have a valid configuration -- if so, we do not change sync source.
+ // * Are we in initial sync -- if so, we do not change sync source.
+ // * Do we have a new forced sync source -- if so, we do change sync source.
+ // Returns decision and current sync source candidate if decision is kMaybe.
+ // (kMaybe indicates to continue with further checks).
+ enum class ChangeSyncSourceDecision { kNo, kYes, kMaybe };
+ std::pair<ChangeSyncSourceDecision, int> _shouldChangeSyncSourceInitialChecks(
+ const HostAndPort& currentSource) const;
+
+ // Returns true if we should choose a new sync source because chaining is disabled
+ // and there is a new primary.
+ bool _shouldChangeSyncSourceDueToNewPrimary(const HostAndPort& currentSource,
+ int currentSourceIndex) const;
+
+ // Change sync source if they are not ahead of us, and don't have a sync source.
+ // Note 'syncSourceIndex' is the index of our sync source's sync source. The 'currentSource'
+ // is our sync source.
+ bool _shouldChangeSyncSourceDueToSourceNotAhead(const HostAndPort& currentSource,
+ int syncSourceIndex,
+ bool syncSourceIsPrimary,
+ const OpTime& currentSourceOpTime,
+ const OpTime& lastOpTimeFetched) const;
+
+ // Change sync source if our sync source is also syncing from us when we are in primary
+ // catchup mode, forming a sync source selection cycle, and the sync source is not ahead
+ // of us.
+ // Note 'syncSourceHost' and 'syncSourceIndex' are the host and index of ourb sync source's
+ // sync source. The 'currentSource' is our sync source.
+ bool _shouldChangeSyncSourceToBreakCycle(const HostAndPort& currentSource,
+ const std::string& syncSourceHost,
+ int syncSourceIndex,
+ const OpTime& currentSourceOpTime,
+ const OpTime& lastOpTimeFetched) const;
+
+ // Returns true if we should choose a new sync source due to our current sync source being
+ // greater than maxSyncSourceLagSeconds and a better source being available.
+ bool _shouldChangeSyncSourceDueToLag(const HostAndPort& currentSource,
+ const OpTime& currentSourceOpTime,
+ const OpTime& lastOpTimeFetched,
+ Date_t now) const;
+
+ // Returns true if we should choose a new sync source because our current sync source does
+ // not match our strict criteria for sync source candidates, but another member does.
+ bool _shouldChangeSyncSourceDueToBetterEligibleSource(const HostAndPort& currentSource,
+ int currentSourceIndex,
+ const OpTime& lastOpTimeFetched,
+ Date_t now) const;
/*
* Clear this node's sync source.
*/
diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index 83765f61625..3cb30919c87 100644
--- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
@@ -126,6 +126,10 @@ protected:
msgs.begin(), msgs.end(), [&](const auto& s) { return stringContains(s, needle); });
}
+ int64_t countLogLinesWithId(int32_t id) {
+ return countBSONFormatLogLinesIsSubset(BSON("id" << id));
+ }
+
void makeSelfPrimary(const Timestamp& electionTimestamp = Timestamp(0, 0)) {
getTopoCoord().changeMemberState_forTest(MemberState::RS_PRIMARY, electionTimestamp);
getTopoCoord().setCurrentPrimary_forTest(_selfIndex, electionTimestamp);
@@ -3725,6 +3729,7 @@ TEST_F(HeartbeatResponseTestV1,
OpTime staleOpTime = OpTime(Timestamp(4, 0), 0);
// ahead by more than maxSyncSourceLagSecs (30)
OpTime freshOpTime = OpTime(Timestamp(3005, 0), 0);
+ setSelfMemberState(MemberState::RS_SECONDARY);
updateConfig(BSON("_id"
<< "rs0"
@@ -3753,6 +3758,7 @@ TEST_F(HeartbeatResponseTestV1,
now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
+ ASSERT_EQUALS(1, countLogLinesWithId(21834));
}
TEST_F(HeartbeatResponseTestV1,
@@ -3764,6 +3770,7 @@ TEST_F(HeartbeatResponseTestV1,
// Set lastOpTimeFetched to be before the sync source's OpTime.
OpTime lastOpTimeFetched = OpTime(Timestamp(300, 0), 0);
+ setSelfMemberState(MemberState::RS_SECONDARY);
HeartbeatResponseAction nextAction = receiveUpHeartbeat(
HostAndPort("host2"), "rs0", MemberState::RS_PRIMARY, election, syncSourceOpTime);
@@ -3809,6 +3816,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceFormsCycleAn
// when it is not ahead of us and it selects us to be its sync source, forming a sync source
// cycle and we are currently in primary catchup.
setSelfMemberState(MemberState::RS_PRIMARY);
+ getTopoCoord().setPrimaryIndex(0);
+
OpTime election = OpTime();
OpTime syncSourceOpTime = OpTime(Timestamp(400, 0), 0);
@@ -3833,6 +3842,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceFormsCycleAn
nextAction = receiveUpHeartbeat(
HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, election, syncSourceOpTime);
ASSERT_NO_ACTION(nextAction.getAction());
+ setSelfMemberState(MemberState::RS_SECONDARY);
getTopoCoord().setPrimaryIndex(2);
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),
@@ -3845,7 +3855,8 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceFormsCycleAn
lastOpTimeFetched,
now()));
- // Show that we also like host2 while it has some progress beyond our own.
+ // Show that we also like host2 while we are primary and it has some progress beyond our own.
+ setSelfMemberState(MemberState::RS_PRIMARY);
getTopoCoord().setPrimaryIndex(0);
OpTime olderThanSyncSourceOpTime = OpTime(Timestamp(300, 0), 0);
nextAction = receiveUpHeartbeat(
@@ -3903,6 +3914,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown
OpTime syncSourceOpTime = OpTime(Timestamp(400, 1), 0);
// ahead by more than maxSyncSourceLagSecs (30)
OpTime fresherSyncSourceOpTime = OpTime(Timestamp(3005, 0), 0);
+ setSelfMemberState(MemberState::RS_SECONDARY);
HeartbeatResponseAction nextAction = receiveUpHeartbeat(
HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, election, fresherSyncSourceOpTime);
@@ -3937,6 +3949,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsDen
OpTime syncSourceOpTime = OpTime(Timestamp(400, 1), 0);
// ahead by more than maxSyncSourceLagSecs (30)
OpTime fresherSyncSourceOpTime = OpTime(Timestamp(3005, 0), 0);
+ setSelfMemberState(MemberState::RS_SECONDARY);
HeartbeatResponseAction nextAction = receiveUpHeartbeat(
HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, election, fresherSyncSourceOpTime);
@@ -3968,6 +3981,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsDen
now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
+ ASSERT_EQUALS(1, countLogLinesWithId(21834));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbeatButNotMetadata) {
@@ -4034,8 +4048,12 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenFresherMemberExists) {
OpTime staleOpTime = OpTime(Timestamp(4, 0), 0);
// ahead by more than maxSyncSourceLagSecs (30)
OpTime freshOpTime = OpTime(Timestamp(3005, 0), 0);
+ setSelfMemberState(MemberState::RS_SECONDARY);
HeartbeatResponseAction nextAction = receiveUpHeartbeat(
+ HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, election, staleOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+ nextAction = receiveUpHeartbeat(
HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, election, freshOpTime);
ASSERT_NO_ACTION(nextAction.getAction());
@@ -4049,6 +4067,35 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenFresherMemberExists) {
now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
+ ASSERT_EQUALS(1, countLogLinesWithId(21834));
+}
+
+TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceIsDown) {
+ // In this test, the TopologyCoordinator should tell us to change sync sources away from
+ // "host2" and to "host3" since "host2" is down.
+ OpTime election = OpTime();
+ OpTime oldSyncSourceOpTime = OpTime(Timestamp(4, 0), 0);
+ // ahead by less than maxSyncSourceLagSecs (30)
+ OpTime freshOpTime = OpTime(Timestamp(5, 0), 0);
+ setSelfMemberState(MemberState::RS_SECONDARY);
+
+ HeartbeatResponseAction nextAction = receiveDownHeartbeat(HostAndPort("host2"), "rs0");
+ ASSERT_NO_ACTION(nextAction.getAction());
+ nextAction = receiveUpHeartbeat(
+ HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, election, freshOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+
+ // set up complete, time for actual check
+ startCapturingLogMessages();
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ makeReplSetMetadata(),
+ makeOplogQueryMetadata(oldSyncSourceOpTime, -1 /* primaryIndex */, 1 /* syncSourceIndex */),
+ oldSyncSourceOpTime,
+ now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
+ ASSERT_EQUALS(1, countLogLinesWithId(5929000));
}
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceFromStalePrimary) {
@@ -4090,12 +4137,17 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberHasYetToHeart
now()));
}
-TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberNotInConfig) {
+TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenMemberNotInConfig) {
// In this test, the TopologyCoordinator should tell us to change sync sources away from
// "host4" since "host4" is absent from the config of version 10.
ReplSetMetadata replMetadata(0, {OpTime(), Date_t()}, OpTime(), 10, 0, OID(), -1, false);
+
+ startCapturingLogMessages();
ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host4"), replMetadata, makeOplogQueryMetadata(), OpTime(), now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
+ ASSERT_EQUALS(1, countLogLinesWithId(21831));
}
TEST_F(HeartbeatResponseTestV1,
@@ -4103,6 +4155,7 @@ TEST_F(HeartbeatResponseTestV1,
// In this test, the TopologyCoordinator should not tell us to change sync sources away from
// "host2" since we are not aware of who the new primary is.
+ setSelfMemberState(MemberState::RS_SECONDARY);
updateConfig(BSON("_id"
<< "rs0"
<< "version" << 5 << "term" << 1 << "members"
@@ -4132,6 +4185,7 @@ TEST_F(HeartbeatResponseTestV1,
// In this test, the TopologyCoordinator should tell us to change sync sources away from
// "host2" since "host3" is the new primary and chaining is disabled.
+ setSelfMemberState(MemberState::RS_SECONDARY);
updateConfig(BSON("_id"
<< "rs0"
<< "version" << 5 << "term" << 1 << "members"
@@ -4145,13 +4199,21 @@ TEST_F(HeartbeatResponseTestV1,
<< BSON("heartbeatTimeoutSecs" << 5 << "chainingAllowed" << false)),
0);
- OpTime election = OpTime(Timestamp(1, 0), 0);
- OpTime staleOpTime = OpTime(Timestamp(4, 0), 0);
- OpTime freshOpTime = OpTime(Timestamp(5, 0), 0);
+ OpTime oldElection = OpTime(Timestamp(1, 1), 1);
+ OpTime curElection = OpTime(Timestamp(4, 2), 2);
+ OpTime staleOpTime = OpTime(Timestamp(4, 1), 1);
+ OpTime freshOpTime = OpTime(Timestamp(5, 1), 2);
- // Set that host3 is the new primary.
+ // Old host should still be up; this is a stale heartbeat.
HeartbeatResponseAction nextAction = receiveUpHeartbeat(
- HostAndPort("host3"), "rs0", MemberState::RS_PRIMARY, election, freshOpTime);
+ HostAndPort("host2"), "rs0", MemberState::RS_PRIMARY, oldElection, staleOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+
+ getTopoCoord().updateTerm(2, now());
+
+ // Set that host3 is the new primary.
+ nextAction = receiveUpHeartbeat(
+ HostAndPort("host3"), "rs0", MemberState::RS_PRIMARY, curElection, freshOpTime);
ASSERT_NO_ACTION(nextAction.getAction());
startCapturingLogMessages();
@@ -4163,6 +4225,52 @@ TEST_F(HeartbeatResponseTestV1,
now()));
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
+ ASSERT_EQUALS(1, countLogLinesWithId(3962100));
+}
+
+TEST_F(HeartbeatResponseTestV1,
+ ShouldNotChangeSyncSourceWhenNotSyncingFromPrimaryChainingDisabledAndTwoPrimaries) {
+ // In this test, the TopologyCoordinator should not tell us to change sync sources away from
+ // "host2" since, though "host3" is the new primary, "host2" also thinks it is primary.
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version" << 5 << "term" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion" << 1 << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5 << "chainingAllowed" << false)),
+ 0);
+
+ OpTime oldElection = OpTime(Timestamp(1, 1), 1);
+ OpTime curElection = OpTime(Timestamp(4, 2), 2);
+ OpTime staleOpTime = OpTime(Timestamp(4, 1), 1);
+ OpTime freshOpTime = OpTime(Timestamp(5, 1), 2);
+
+ // Old host should still be up, and thinks it is primary.
+ HeartbeatResponseAction nextAction = receiveUpHeartbeat(
+ HostAndPort("host2"), "rs0", MemberState::RS_PRIMARY, oldElection, staleOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+
+ getTopoCoord().updateTerm(2, now());
+
+ // Set that host3 is the new primary.
+ nextAction = receiveUpHeartbeat(
+ HostAndPort("host3"), "rs0", MemberState::RS_PRIMARY, curElection, freshOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ // Indicate host2 still thinks it is primary.
+ makeReplSetMetadata(freshOpTime, true /* isPrimary */),
+ makeOplogQueryMetadata(freshOpTime, 1 /* primaryIndex */, -1 /* syncSourceIndex */),
+ staleOpTime, // lastOpTimeFetched so that we are behind host2 and host3
+ now()));
}
TEST_F(HeartbeatResponseTestV1, ShouldntChangeSyncSourceWhenChainingDisabledAndWeArePrimary) {
@@ -4183,7 +4291,7 @@ TEST_F(HeartbeatResponseTestV1, ShouldntChangeSyncSourceWhenChainingDisabledAndW
OpTime freshOpTime = OpTime(Timestamp(5, 0), 0);
// Set that we are primary.
- getTopoCoord().setPrimaryIndex(0);
+ makeSelfPrimary();
ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
HostAndPort("host2"),