summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenbin Zhu <wenbin.zhu@mongodb.com>2021-09-02 02:08:11 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-04 15:49:19 +0000
commitd9dba1a12f4e1257b711c5e7859de7d7680d5bc6 (patch)
tree5c9ad1fc3623e9dc31bbd3933a1f840ea1071adc
parent1bd43990be5f16e028cebb5a99f0798803ad6b41 (diff)
downloadmongo-d9dba1a12f4e1257b711c5e7859de7d7680d5bc6.tar.gz
SERVER-58988 Avoid sync source selection cycle during primary catchup.
(cherry picked from commit b46acdbba8ec51810b6f402dbe18ed7ea98fd13d)
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp3
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp56
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp37
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp95
-rw-r--r--src/mongo/rpc/metadata/oplog_query_metadata.cpp20
-rw-r--r--src/mongo/rpc/metadata/oplog_query_metadata.h12
-rw-r--r--src/mongo/rpc/metadata/oplog_query_metadata_test.cpp16
7 files changed, 201 insertions, 38 deletions
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 7cf799c5bb0..7e5f136836f 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -521,7 +521,8 @@ RemoteCommandResponse makeCursorResponse(CursorId cursorId,
int rbid = 1) {
OpTime futureOpTime(Timestamp(1000, 1000), 1000);
Date_t futureWallTime = Date_t() + Seconds(futureOpTime.getSecs());
- rpc::OplogQueryMetadata oqMetadata({futureOpTime, futureWallTime}, futureOpTime, rbid, 0, 0);
+ rpc::OplogQueryMetadata oqMetadata(
+ {futureOpTime, futureWallTime}, futureOpTime, rbid, 0, 0, "");
BSONObjBuilder bob;
{
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 80232733873..300508dc6e8 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -78,7 +78,8 @@ protected:
BSONObj makeOplogQueryMetadataObject(OpTime lastAppliedOpTime,
int rbid,
int primaryIndex,
- int syncSourceIndex);
+ int syncSourceIndex,
+ std::string syncSourceHost);
/**
* Tests checkSyncSource result handling.
@@ -130,9 +131,14 @@ void OplogFetcherTest::setUp() {
BSONObj OplogFetcherTest::makeOplogQueryMetadataObject(OpTime lastAppliedOpTime,
int rbid,
int primaryIndex,
- int syncSourceIndex) {
- rpc::OplogQueryMetadata oqMetadata(
- {staleOpTime, staleWallTime}, lastAppliedOpTime, rbid, primaryIndex, syncSourceIndex);
+ int syncSourceIndex,
+ std::string syncSourceHost) {
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime},
+ lastAppliedOpTime,
+ rbid,
+ primaryIndex,
+ syncSourceIndex,
+ syncSourceHost);
BSONObjBuilder bob;
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
return bob.obj();
@@ -299,7 +305,8 @@ DEATH_TEST_F(OplogFetcherTest,
TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) {
rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2);
+ rpc::OplogQueryMetadata oqMetadata(
+ {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2, "");
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -319,7 +326,7 @@ TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMe
TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) {
rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
rpc::OplogQueryMetadata oqMetadata(
- {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid + 1, 2, 2);
+ {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid + 1, 2, 2, "");
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -337,7 +344,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack)
TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) {
rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2);
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2, "");
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -355,7 +362,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind)
TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) {
rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2);
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2, "");
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -374,7 +381,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead
TEST_F(OplogFetcherTest,
MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) {
rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2);
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2, "");
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -395,7 +402,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButM
// is equal to us. Since that means the metadata is stale and can be ignored, we should accept
// this sync source.
rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2);
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2, "");
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -411,7 +418,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButM
TEST_F(OplogFetcherTest,
MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) {
rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2);
+ rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2, "");
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -442,7 +449,8 @@ TEST_F(OplogFetcherTest,
TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) {
rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2);
+ rpc::OplogQueryMetadata oqMetadata(
+ {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2, "");
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -469,7 +477,7 @@ TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingEr
}
TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) {
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, "");
ASSERT_EQUALS(ErrorCodes::InvalidBSON,
processSingleBatch({concatenate(makeCursorResponse(0, {BSONObj()}), metadataObj),
Milliseconds(0)})
@@ -479,7 +487,7 @@ TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWit
TEST_F(
OplogFetcherTest,
LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, "");
ASSERT_EQUALS(
ErrorCodes::OplogStartMissing,
processSingleBatch(
@@ -490,7 +498,7 @@ TEST_F(
TEST_F(OplogFetcherTest,
MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) {
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, "");
ASSERT_EQUALS(
ErrorCodes::NoSuchKey,
processSingleBatch(
@@ -504,7 +512,7 @@ TEST_F(OplogFetcherTest,
}
TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) {
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, "");
ASSERT_EQUALS(
ErrorCodes::OplogOutOfOrder,
processSingleBatch({concatenate(makeCursorResponse(0,
@@ -518,7 +526,7 @@ TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWith
}
TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) {
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, "");
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
@@ -592,7 +600,7 @@ TEST_F(OplogFetcherTest,
auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()});
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, "");
// Only send over the first entry. Save the second for the getMore request.
processNetworkResponse(
@@ -669,7 +677,7 @@ TEST_F(OplogFetcherTest,
auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()});
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, "");
// Only send over the first two entries. Save the third for the getMore request.
processNetworkResponse(
@@ -705,7 +713,7 @@ TEST_F(OplogFetcherTest,
}
TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) {
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, "");
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
@@ -769,7 +777,8 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetc
{{Seconds(20000), 0}, 1},
rbid,
2,
- 2);
+ 2,
+ "");
testSyncSourceChecking(&replMetadata, &oqMetadata);
@@ -795,7 +804,8 @@ TEST_F(OplogFetcherTest,
{{Seconds(20000), 0}, 1},
rbid,
2,
- -1);
+ -1,
+ "");
testSyncSourceChecking(&replMetadata, &oqMetadata);
@@ -829,7 +839,7 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling() {
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, "");
processNetworkResponse(
{concatenate(makeCursorResponse(cursorId, {firstEntry, secondEntry}), metadataObj),
Milliseconds(0)},
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 4ac2eeda7bf..d5ef30ecea8 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -2664,18 +2664,21 @@ bool TopologyCoordinator::shouldChangeSyncSource(
OpTime currentSourceOpTime;
int syncSourceIndex = -1;
int primaryIndex = -1;
+ std::string syncSourceHost;
if (oqMetadata) {
currentSourceOpTime =
std::max(oqMetadata->getLastOpApplied(),
_memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
syncSourceIndex = oqMetadata->getSyncSourceIndex();
primaryIndex = oqMetadata->getPrimaryIndex();
+ syncSourceHost = oqMetadata->getSyncSourceHost();
} else {
currentSourceOpTime =
std::max(replMetadata.getLastOpVisible(),
_memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
syncSourceIndex = replMetadata.getSyncSourceIndex();
primaryIndex = replMetadata.getPrimaryIndex();
+ syncSourceHost = "";
}
if (currentSourceOpTime.isNull()) {
@@ -2719,6 +2722,37 @@ bool TopologyCoordinator::shouldChangeSyncSource(
return true;
}
+ // Change sync source if our sync source is also syncing from us when we are in primary
+ // catchup mode, forming a sync source selection cycle, and the sync source is not ahead
+ // of us. This is to prevent a deadlock situation. See SERVER-58988 for details.
+ // When checking the sync source, we use syncSourceHost if it is set, otherwise fall back
+ // to use syncSourceIndex. The difference is that syncSourceIndex might not point to the
+ // node that we think of because it was inferred from the sender node, which could have
+ // a different config. This is acceptable since we are just choosing a different sync
+ // source if that happens and reconfigs are rare.
+ bool isSyncingFromMe = !syncSourceHost.empty()
+ ? syncSourceHost == _selfMemberData().getHostAndPort().toString()
+ : syncSourceIndex == _selfIndex;
+
+ if (isSyncingFromMe && _currentPrimaryIndex == _selfIndex &&
+ currentSourceOpTime <= myLastOpTime) {
+ std::stringstream logMessage;
+
+ logMessage << "Choosing new sync source because we are in primary catchup but our current "
+ "sync source is also syncing from us but is not ahead of us. "
+ << "Current sync source: " << currentSource.toString()
+ << ", my last fetched oplog optime: " << myLastOpTime
+ << ", latest oplog optime of sync source: " << currentSourceOpTime;
+
+ if (primaryIndex >= 0) {
+ logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)";
+ } else {
+ logMessage << " (sync source does not know the primary)";
+ }
+ log() << logMessage.str();
+ return true;
+ }
+
if (MONGO_FAIL_POINT(disableMaxSyncSourceLagSecs)) {
log() << "disableMaxSyncSourceLagSecs fail point enabled - not checking the most recent "
"OpTime, "
@@ -2769,7 +2803,8 @@ rpc::OplogQueryMetadata TopologyCoordinator::prepareOplogQueryMetadata(int rbid)
getMyLastAppliedOpTime(),
rbid,
_currentPrimaryIndex,
- _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
+ _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()),
+ getSyncSourceAddress().toString());
}
void TopologyCoordinator::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index dc403532871..f6a8adaa65b 100644
--- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
@@ -221,9 +221,14 @@ protected:
OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime(),
int primaryIndex = -1,
int syncSourceIndex = -1,
+ std::string syncSourceHost = "",
Date_t lastCommittedWall = Date_t()) {
- return OplogQueryMetadata(
- {OpTime(), lastCommittedWall}, lastAppliedOpTime, -1, primaryIndex, syncSourceIndex);
+ return OplogQueryMetadata({OpTime(), lastCommittedWall},
+ lastAppliedOpTime,
+ -1,
+ primaryIndex,
+ syncSourceIndex,
+ syncSourceHost);
}
HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member,
@@ -3764,6 +3769,92 @@ TEST_F(HeartbeatResponseTestV1,
HostAndPort("host2"), makeReplSetMetadata(newerThanLastOpTimeApplied), boost::none, now()));
}
+TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceFormsCycleAndWeArePrimary) {
+ // In this test, the TopologyCoordinator will tell us change our sync source away from "host2"
+ // when it is not ahead of us and it selects us to be its sync source, forming a sync source
+ // cycle and we are currently in primary catchup.
+ setSelfMemberState(MemberState::RS_PRIMARY);
+ OpTime election = OpTime();
+ OpTime syncSourceOpTime = OpTime(Timestamp(400, 0), 0);
+
+ // Set lastOpTimeFetched to be same as the sync source's OpTime.
+ OpTime lastOpTimeFetched = OpTime(Timestamp(400, 0), 0);
+ setMyOpTime(lastOpTimeFetched);
+
+ // Show we like host2 while it is not syncing from us.
+ HeartbeatResponseAction nextAction = receiveUpHeartbeat(
+ HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, election, syncSourceOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */),
+ makeOplogQueryMetadata(syncSourceOpTime,
+ -1 /* primaryIndex */,
+ 2 /* syncSourceIndex */,
+ "host3:27017" /* syncSourceHost */),
+ now()));
+
+ // Show that we also like host2 while we are not primary.
+ nextAction = receiveUpHeartbeat(
+ HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, election, syncSourceOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+ getTopoCoord().setPrimaryIndex(2);
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */),
+ // Sync source is also syncing from us.
+ makeOplogQueryMetadata(syncSourceOpTime,
+ -1 /* primaryIndex */,
+ 0 /* syncSourceIndex */,
+ "host1:27017" /* syncSourceHost */),
+ now()));
+
+ // Show that we also like host2 while it has some progress beyond our own.
+ getTopoCoord().setPrimaryIndex(0);
+ OpTime olderThanSyncSourceOpTime = OpTime(Timestamp(300, 0), 0);
+ topoCoordSetMyLastAppliedOpTime(olderThanSyncSourceOpTime, now(), true);
+ nextAction = receiveUpHeartbeat(
+ HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, election, syncSourceOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */),
+ // Sync source is also syncing from us.
+ makeOplogQueryMetadata(syncSourceOpTime,
+ -1 /* primaryIndex */,
+ 0 /* syncSourceIndex */,
+ "host1:27017" /* syncSourceHost */),
+ now()));
+
+ // Show that we do not like host2 it forms a sync source selection cycle with us and we
+ // are primary and it lacks progress beyond our own.
+ setMyOpTime(lastOpTimeFetched);
+ nextAction = receiveUpHeartbeat(
+ HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, election, syncSourceOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */),
+ // Sync source is also syncing from us.
+ makeOplogQueryMetadata(syncSourceOpTime,
+ -1 /* primaryIndex */,
+ 0 /* syncSourceIndex */,
+ "host1:27017" /* syncSourceHost */),
+ now()));
+
+ // Show that we still do not like it when syncSourceHost is not set, but we can rely on
+ // syncSourceIndex to decide if a sync source selection cycle has been formed.
+ nextAction = receiveUpHeartbeat(
+ HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, election, syncSourceOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(
+ HostAndPort("host2"),
+ makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */),
+ // Sync source is also syncing from us.
+ makeOplogQueryMetadata(syncSourceOpTime, -1 /* primaryIndex */, 0 /* syncSourceIndex */),
+ now()));
+}
+
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) {
// In this test, the TopologyCoordinator should not tell us to change sync sources away from
// "host2" and to "host3" despite "host2" being more than maxSyncSourceLagSecs(30) behind
diff --git a/src/mongo/rpc/metadata/oplog_query_metadata.cpp b/src/mongo/rpc/metadata/oplog_query_metadata.cpp
index 4548928d1ca..d181f5fbf1b 100644
--- a/src/mongo/rpc/metadata/oplog_query_metadata.cpp
+++ b/src/mongo/rpc/metadata/oplog_query_metadata.cpp
@@ -50,6 +50,7 @@ const char kLastCommittedWallFieldName[] = "lastCommittedWall";
const char kLastOpAppliedFieldName[] = "lastOpApplied";
const char kPrimaryIndexFieldName[] = "primaryIndex";
const char kSyncSourceIndexFieldName[] = "syncSourceIndex";
+const char kSyncSourceHostFieldName[] = "syncSourceHost";
const char kRBIDFieldName[] = "rbid";
} // unnamed namespace
@@ -60,12 +61,14 @@ OplogQueryMetadata::OplogQueryMetadata(OpTimeAndWallTime lastOpCommitted,
OpTime lastOpApplied,
int rbid,
int currentPrimaryIndex,
- int currentSyncSourceIndex)
+ int currentSyncSourceIndex,
+ std::string currentSyncSourceHost)
: _lastOpCommitted(std::move(lastOpCommitted)),
_lastOpApplied(std::move(lastOpApplied)),
_rbid(rbid),
_currentPrimaryIndex(currentPrimaryIndex),
- _currentSyncSourceIndex(currentSyncSourceIndex) {}
+ _currentSyncSourceIndex(currentSyncSourceIndex),
+ _currentSyncSourceHost(currentSyncSourceHost) {}
StatusWith<OplogQueryMetadata> OplogQueryMetadata::readFromMetadata(const BSONObj& metadataObj,
bool requireWallTime) {
@@ -87,6 +90,14 @@ StatusWith<OplogQueryMetadata> OplogQueryMetadata::readFromMetadata(const BSONOb
if (!status.isOK())
return status;
+ std::string syncSourceHost;
+ status = bsonExtractStringField(oqMetadataObj, kSyncSourceHostFieldName, &syncSourceHost);
+ // SyncSourceHost might not be set in older versions, checking NoSuchKey error
+ // for backward compatibility.
+ // TODO SERVER-59732: Remove the compatibility check once 6.0 is released.
+ if (!status.isOK() && status.code() != ErrorCodes::NoSuchKey)
+ return status;
+
long long rbid;
status = bsonExtractIntegerField(oqMetadataObj, kRBIDFieldName, &rbid);
if (!status.isOK())
@@ -111,7 +122,8 @@ StatusWith<OplogQueryMetadata> OplogQueryMetadata::readFromMetadata(const BSONOb
if (!status.isOK())
return status;
- return OplogQueryMetadata(lastOpCommitted, lastOpApplied, rbid, primaryIndex, syncSourceIndex);
+ return OplogQueryMetadata(
+ lastOpCommitted, lastOpApplied, rbid, primaryIndex, syncSourceIndex, syncSourceHost);
}
Status OplogQueryMetadata::writeToMetadata(BSONObjBuilder* builder) const {
@@ -122,6 +134,7 @@ Status OplogQueryMetadata::writeToMetadata(BSONObjBuilder* builder) const {
oqMetadataBuilder.append(kRBIDFieldName, _rbid);
oqMetadataBuilder.append(kPrimaryIndexFieldName, _currentPrimaryIndex);
oqMetadataBuilder.append(kSyncSourceIndexFieldName, _currentSyncSourceIndex);
+ oqMetadataBuilder.append(kSyncSourceHostFieldName, _currentSyncSourceHost);
oqMetadataBuilder.doneFast();
return Status::OK();
@@ -132,6 +145,7 @@ std::string OplogQueryMetadata::toString() const {
output << "OplogQueryMetadata";
output << " Primary Index: " << _currentPrimaryIndex;
output << " Sync Source Index: " << _currentSyncSourceIndex;
+ output << " Sync Source Host: " << _currentSyncSourceHost;
output << " RBID: " << _rbid;
output << " Last Op Committed: " << _lastOpCommitted.toString();
output << " Last Op Applied: " << _lastOpApplied.toString();
diff --git a/src/mongo/rpc/metadata/oplog_query_metadata.h b/src/mongo/rpc/metadata/oplog_query_metadata.h
index 1f8cc2705be..331cc761fda 100644
--- a/src/mongo/rpc/metadata/oplog_query_metadata.h
+++ b/src/mongo/rpc/metadata/oplog_query_metadata.h
@@ -57,7 +57,8 @@ public:
repl::OpTime lastOpApplied,
int rbid,
int currentPrimaryIndex,
- int currentSyncSourceIndex);
+ int currentSyncSourceIndex,
+ std::string currentSyncSourceHost);
/**
* format:
@@ -107,6 +108,14 @@ public:
}
/**
+ * Returns the host of the sync source of the sender.
+ * Returns empty string if it has no sync source.
+ */
+ std::string getSyncSourceHost() const {
+ return _currentSyncSourceHost;
+ }
+
+ /**
* Returns the current rbid of the sender.
*/
int getRBID() const {
@@ -124,6 +133,7 @@ private:
int _rbid = -1;
int _currentPrimaryIndex = kNoPrimary;
int _currentSyncSourceIndex = -1;
+ std::string _currentSyncSourceHost;
};
} // namespace rpc
diff --git a/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp b/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp
index 9f07a7775ad..e881b2d20f5 100644
--- a/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp
+++ b/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp
@@ -43,7 +43,7 @@ TEST(ReplResponseMetadataTest, OplogQueryMetadataRoundtrip) {
OpTime opTime1(Timestamp(1234, 100), 5);
Date_t committedWall = Date_t() + Seconds(opTime1.getSecs());
OpTime opTime2(Timestamp(7777, 101), 6);
- OplogQueryMetadata metadata({opTime1, committedWall}, opTime2, 6, 12, -1);
+ OplogQueryMetadata metadata({opTime1, committedWall}, opTime2, 6, 12, -1, "");
ASSERT_EQ(opTime1, metadata.getLastOpCommitted().opTime);
ASSERT_EQ(committedWall, metadata.getLastOpCommitted().wallTime);
@@ -52,12 +52,14 @@ TEST(ReplResponseMetadataTest, OplogQueryMetadataRoundtrip) {
BSONObjBuilder builder;
metadata.writeToMetadata(&builder).transitional_ignore();
- BSONObj expectedObj(BSON(
- kOplogQueryMetadataFieldName << BSON(
- "lastOpCommitted" << BSON("ts" << opTime1.getTimestamp() << "t" << opTime1.getTerm())
- << "lastCommittedWall" << committedWall << "lastOpApplied"
- << BSON("ts" << opTime2.getTimestamp() << "t" << opTime2.getTerm())
- << "rbid" << 6 << "primaryIndex" << 12 << "syncSourceIndex" << -1)));
+ BSONObj expectedObj(
+ BSON(kOplogQueryMetadataFieldName
+ << BSON("lastOpCommitted"
+ << BSON("ts" << opTime1.getTimestamp() << "t" << opTime1.getTerm())
+ << "lastCommittedWall" << committedWall << "lastOpApplied"
+ << BSON("ts" << opTime2.getTimestamp() << "t" << opTime2.getTerm()) << "rbid"
+ << 6 << "primaryIndex" << 12 << "syncSourceIndex" << -1 << "syncSourceHost"
+ << "")));
BSONObj serializedObj = builder.obj();
ASSERT_BSONOBJ_EQ(expectedObj, serializedObj);