diff options
author | Wenbin Zhu <wenbin.zhu@mongodb.com> | 2021-09-02 02:08:11 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-04 15:49:19 +0000 |
commit | d9dba1a12f4e1257b711c5e7859de7d7680d5bc6 (patch) | |
tree | 5c9ad1fc3623e9dc31bbd3933a1f840ea1071adc | |
parent | 1bd43990be5f16e028cebb5a99f0798803ad6b41 (diff) | |
download | mongo-d9dba1a12f4e1257b711c5e7859de7d7680d5bc6.tar.gz |
SERVER-58988 Avoid sync source selection cycle during primary catchup.
(cherry picked from commit b46acdbba8ec51810b6f402dbe18ed7ea98fd13d)
-rw-r--r-- | src/mongo/db/repl/initial_syncer_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 56 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.cpp | 37 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_v1_test.cpp | 95 | ||||
-rw-r--r-- | src/mongo/rpc/metadata/oplog_query_metadata.cpp | 20 | ||||
-rw-r--r-- | src/mongo/rpc/metadata/oplog_query_metadata.h | 12 | ||||
-rw-r--r-- | src/mongo/rpc/metadata/oplog_query_metadata_test.cpp | 16 |
7 files changed, 201 insertions, 38 deletions
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 7cf799c5bb0..7e5f136836f 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -521,7 +521,8 @@ RemoteCommandResponse makeCursorResponse(CursorId cursorId, int rbid = 1) { OpTime futureOpTime(Timestamp(1000, 1000), 1000); Date_t futureWallTime = Date_t() + Seconds(futureOpTime.getSecs()); - rpc::OplogQueryMetadata oqMetadata({futureOpTime, futureWallTime}, futureOpTime, rbid, 0, 0); + rpc::OplogQueryMetadata oqMetadata( + {futureOpTime, futureWallTime}, futureOpTime, rbid, 0, 0, ""); BSONObjBuilder bob; { diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 80232733873..300508dc6e8 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -78,7 +78,8 @@ protected: BSONObj makeOplogQueryMetadataObject(OpTime lastAppliedOpTime, int rbid, int primaryIndex, - int syncSourceIndex); + int syncSourceIndex, + std::string syncSourceHost); /** * Tests checkSyncSource result handling. @@ -130,9 +131,14 @@ void OplogFetcherTest::setUp() { BSONObj OplogFetcherTest::makeOplogQueryMetadataObject(OpTime lastAppliedOpTime, int rbid, int primaryIndex, - int syncSourceIndex) { - rpc::OplogQueryMetadata oqMetadata( - {staleOpTime, staleWallTime}, lastAppliedOpTime, rbid, primaryIndex, syncSourceIndex); + int syncSourceIndex, + std::string syncSourceHost) { + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, + lastAppliedOpTime, + rbid, + primaryIndex, + syncSourceIndex, + syncSourceHost); BSONObjBuilder bob; ASSERT_OK(oqMetadata.writeToMetadata(&bob)); return bob.obj(); @@ -299,7 +305,8 @@ DEATH_TEST_F(OplogFetcherTest, TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) { rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2); + rpc::OplogQueryMetadata oqMetadata( + {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2, ""); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -319,7 +326,7 @@ TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMe TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) { rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); rpc::OplogQueryMetadata oqMetadata( - {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid + 1, 2, 2); + {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid + 1, 2, 2, ""); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -337,7 +344,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) { rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2); + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2, ""); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -355,7 +362,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) { rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2); + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2, ""); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -374,7 +381,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) { rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2); + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2, ""); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -395,7 +402,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButM // is equal to us. Since that means the metadata is stale and can be ignored, we should accept // this sync source. rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2); + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2, ""); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -411,7 +418,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButM TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) { rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2); + rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2, ""); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -442,7 +449,8 @@ TEST_F(OplogFetcherTest, TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2); + rpc::OplogQueryMetadata oqMetadata( + {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2, ""); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -469,7 +477,7 @@ TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingEr } TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, ""); ASSERT_EQUALS(ErrorCodes::InvalidBSON, processSingleBatch({concatenate(makeCursorResponse(0, {BSONObj()}), metadataObj), Milliseconds(0)}) @@ -479,7 +487,7 @@ TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWit TEST_F( OplogFetcherTest, LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, ""); ASSERT_EQUALS( ErrorCodes::OplogStartMissing, processSingleBatch( @@ -490,7 +498,7 @@ TEST_F( TEST_F(OplogFetcherTest, MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, ""); ASSERT_EQUALS( ErrorCodes::NoSuchKey, processSingleBatch( @@ -504,7 +512,7 @@ TEST_F(OplogFetcherTest, } TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, ""); ASSERT_EQUALS( ErrorCodes::OplogOutOfOrder, processSingleBatch({concatenate(makeCursorResponse(0, @@ -518,7 +526,7 @@ TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWith } TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, ""); auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); @@ -592,7 +600,7 @@ TEST_F(OplogFetcherTest, auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()}); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, ""); // Only send over the first entry. Save the second for the getMore request. processNetworkResponse( @@ -669,7 +677,7 @@ TEST_F(OplogFetcherTest, auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()}); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, ""); // Only send over the first two entries. Save the third for the getMore request. processNetworkResponse( @@ -705,7 +713,7 @@ TEST_F(OplogFetcherTest, } TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, ""); auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); @@ -769,7 +777,8 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetc {{Seconds(20000), 0}, 1}, rbid, 2, - 2); + 2, + ""); testSyncSourceChecking(&replMetadata, &oqMetadata); @@ -795,7 +804,8 @@ TEST_F(OplogFetcherTest, {{Seconds(20000), 0}, 1}, rbid, 2, - -1); + -1, + ""); testSyncSourceChecking(&replMetadata, &oqMetadata); @@ -829,7 +839,7 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling() { auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2, ""); processNetworkResponse( {concatenate(makeCursorResponse(cursorId, {firstEntry, secondEntry}), metadataObj), Milliseconds(0)}, diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 4ac2eeda7bf..d5ef30ecea8 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -2664,18 +2664,21 @@ bool TopologyCoordinator::shouldChangeSyncSource( OpTime currentSourceOpTime; int syncSourceIndex = -1; int primaryIndex = -1; + std::string syncSourceHost; if (oqMetadata) { currentSourceOpTime = std::max(oqMetadata->getLastOpApplied(), _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); syncSourceIndex = oqMetadata->getSyncSourceIndex(); primaryIndex = oqMetadata->getPrimaryIndex(); + syncSourceHost = oqMetadata->getSyncSourceHost(); } else { currentSourceOpTime = std::max(replMetadata.getLastOpVisible(), _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); syncSourceIndex = replMetadata.getSyncSourceIndex(); primaryIndex = replMetadata.getPrimaryIndex(); + syncSourceHost = ""; } if (currentSourceOpTime.isNull()) { @@ -2719,6 +2722,37 @@ bool TopologyCoordinator::shouldChangeSyncSource( return true; } + // Change sync source if our sync source is also syncing from us when we are in primary + // catchup mode, forming a sync source selection cycle, and the sync source is not ahead + // of us. This is to prevent a deadlock situation. See SERVER-58988 for details. + // When checking the sync source, we use syncSourceHost if it is set, otherwise fall back + // to use syncSourceIndex. The difference is that syncSourceIndex might not point to the + // node that we think of because it was inferred from the sender node, which could have + // a different config. This is acceptable since we are just choosing a different sync + // source if that happens and reconfigs are rare. + bool isSyncingFromMe = !syncSourceHost.empty() + ? syncSourceHost == _selfMemberData().getHostAndPort().toString() + : syncSourceIndex == _selfIndex; + + if (isSyncingFromMe && _currentPrimaryIndex == _selfIndex && + currentSourceOpTime <= myLastOpTime) { + std::stringstream logMessage; + + logMessage << "Choosing new sync source because we are in primary catchup but our current " + "sync source is also syncing from us but is not ahead of us. " + << "Current sync source: " << currentSource.toString() + << ", my last fetched oplog optime: " << myLastOpTime + << ", latest oplog optime of sync source: " << currentSourceOpTime; + + if (primaryIndex >= 0) { + logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)"; + } else { + logMessage << " (sync source does not know the primary)"; + } + log() << logMessage.str(); + return true; + } + if (MONGO_FAIL_POINT(disableMaxSyncSourceLagSecs)) { log() << "disableMaxSyncSourceLagSecs fail point enabled - not checking the most recent " "OpTime, " @@ -2769,7 +2803,8 @@ rpc::OplogQueryMetadata TopologyCoordinator::prepareOplogQueryMetadata(int rbid) getMyLastAppliedOpTime(), rbid, _currentPrimaryIndex, - _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); + _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()), + getSyncSourceAddress().toString()); } void TopologyCoordinator::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index dc403532871..f6a8adaa65b 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -221,9 +221,14 @@ protected: OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime(), int primaryIndex = -1, int syncSourceIndex = -1, + std::string syncSourceHost = "", Date_t lastCommittedWall = Date_t()) { - return OplogQueryMetadata( - {OpTime(), lastCommittedWall}, lastAppliedOpTime, -1, primaryIndex, syncSourceIndex); + return OplogQueryMetadata({OpTime(), lastCommittedWall}, + lastAppliedOpTime, + -1, + primaryIndex, + syncSourceIndex, + syncSourceHost); } HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member, @@ -3764,6 +3769,92 @@ TEST_F(HeartbeatResponseTestV1, HostAndPort("host2"), makeReplSetMetadata(newerThanLastOpTimeApplied), boost::none, now())); } +TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceFormsCycleAndWeArePrimary) { + // In this test, the TopologyCoordinator will tell us change our sync source away from "host2" + // when it is not ahead of us and it selects us to be its sync source, forming a sync source + // cycle and we are currently in primary catchup. + setSelfMemberState(MemberState::RS_PRIMARY); + OpTime election = OpTime(); + OpTime syncSourceOpTime = OpTime(Timestamp(400, 0), 0); + + // Set lastOpTimeFetched to be same as the sync source's OpTime. + OpTime lastOpTimeFetched = OpTime(Timestamp(400, 0), 0); + setMyOpTime(lastOpTimeFetched); + + // Show we like host2 while it is not syncing from us. + HeartbeatResponseAction nextAction = receiveUpHeartbeat( + HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, election, syncSourceOpTime); + ASSERT_NO_ACTION(nextAction.getAction()); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), + makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */), + makeOplogQueryMetadata(syncSourceOpTime, + -1 /* primaryIndex */, + 2 /* syncSourceIndex */, + "host3:27017" /* syncSourceHost */), + now())); + + // Show that we also like host2 while we are not primary. + nextAction = receiveUpHeartbeat( + HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, election, syncSourceOpTime); + ASSERT_NO_ACTION(nextAction.getAction()); + getTopoCoord().setPrimaryIndex(2); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), + makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */), + // Sync source is also syncing from us. + makeOplogQueryMetadata(syncSourceOpTime, + -1 /* primaryIndex */, + 0 /* syncSourceIndex */, + "host1:27017" /* syncSourceHost */), + now())); + + // Show that we also like host2 while it has some progress beyond our own. + getTopoCoord().setPrimaryIndex(0); + OpTime olderThanSyncSourceOpTime = OpTime(Timestamp(300, 0), 0); + topoCoordSetMyLastAppliedOpTime(olderThanSyncSourceOpTime, now(), true); + nextAction = receiveUpHeartbeat( + HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, election, syncSourceOpTime); + ASSERT_NO_ACTION(nextAction.getAction()); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), + makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */), + // Sync source is also syncing from us. + makeOplogQueryMetadata(syncSourceOpTime, + -1 /* primaryIndex */, + 0 /* syncSourceIndex */, + "host1:27017" /* syncSourceHost */), + now())); + + // Show that we do not like host2 it forms a sync source selection cycle with us and we + // are primary and it lacks progress beyond our own. + setMyOpTime(lastOpTimeFetched); + nextAction = receiveUpHeartbeat( + HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, election, syncSourceOpTime); + ASSERT_NO_ACTION(nextAction.getAction()); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), + makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */), + // Sync source is also syncing from us. + makeOplogQueryMetadata(syncSourceOpTime, + -1 /* primaryIndex */, + 0 /* syncSourceIndex */, + "host1:27017" /* syncSourceHost */), + now())); + + // Show that we still do not like it when syncSourceHost is not set, but we can rely on + // syncSourceIndex to decide if a sync source selection cycle has been formed. + nextAction = receiveUpHeartbeat( + HostAndPort("host2"), "rs0", MemberState::RS_SECONDARY, election, syncSourceOpTime); + ASSERT_NO_ACTION(nextAction.getAction()); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), + makeReplSetMetadata(OpTime() /* visibleOpTime */, false /* isPrimary */), + // Sync source is also syncing from us. + makeOplogQueryMetadata(syncSourceOpTime, -1 /* primaryIndex */, 0 /* syncSourceIndex */), + now())); +} + TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) { // In this test, the TopologyCoordinator should not tell us to change sync sources away from // "host2" and to "host3" despite "host2" being more than maxSyncSourceLagSecs(30) behind diff --git a/src/mongo/rpc/metadata/oplog_query_metadata.cpp b/src/mongo/rpc/metadata/oplog_query_metadata.cpp index 4548928d1ca..d181f5fbf1b 100644 --- a/src/mongo/rpc/metadata/oplog_query_metadata.cpp +++ b/src/mongo/rpc/metadata/oplog_query_metadata.cpp @@ -50,6 +50,7 @@ const char kLastCommittedWallFieldName[] = "lastCommittedWall"; const char kLastOpAppliedFieldName[] = "lastOpApplied"; const char kPrimaryIndexFieldName[] = "primaryIndex"; const char kSyncSourceIndexFieldName[] = "syncSourceIndex"; +const char kSyncSourceHostFieldName[] = "syncSourceHost"; const char kRBIDFieldName[] = "rbid"; } // unnamed namespace @@ -60,12 +61,14 @@ OplogQueryMetadata::OplogQueryMetadata(OpTimeAndWallTime lastOpCommitted, OpTime lastOpApplied, int rbid, int currentPrimaryIndex, - int currentSyncSourceIndex) + int currentSyncSourceIndex, + std::string currentSyncSourceHost) : _lastOpCommitted(std::move(lastOpCommitted)), _lastOpApplied(std::move(lastOpApplied)), _rbid(rbid), _currentPrimaryIndex(currentPrimaryIndex), - _currentSyncSourceIndex(currentSyncSourceIndex) {} + _currentSyncSourceIndex(currentSyncSourceIndex), + _currentSyncSourceHost(currentSyncSourceHost) {} StatusWith<OplogQueryMetadata> OplogQueryMetadata::readFromMetadata(const BSONObj& metadataObj, bool requireWallTime) { @@ -87,6 +90,14 @@ StatusWith<OplogQueryMetadata> OplogQueryMetadata::readFromMetadata(const BSONOb if (!status.isOK()) return status; + std::string syncSourceHost; + status = bsonExtractStringField(oqMetadataObj, kSyncSourceHostFieldName, &syncSourceHost); + // SyncSourceHost might not be set in older versions, checking NoSuchKey error + // for backward compatibility. + // TODO SERVER-59732: Remove the compatibility check once 6.0 is released. + if (!status.isOK() && status.code() != ErrorCodes::NoSuchKey) + return status; + long long rbid; status = bsonExtractIntegerField(oqMetadataObj, kRBIDFieldName, &rbid); if (!status.isOK()) @@ -111,7 +122,8 @@ StatusWith<OplogQueryMetadata> OplogQueryMetadata::readFromMetadata(const BSONOb if (!status.isOK()) return status; - return OplogQueryMetadata(lastOpCommitted, lastOpApplied, rbid, primaryIndex, syncSourceIndex); + return OplogQueryMetadata( + lastOpCommitted, lastOpApplied, rbid, primaryIndex, syncSourceIndex, syncSourceHost); } Status OplogQueryMetadata::writeToMetadata(BSONObjBuilder* builder) const { @@ -122,6 +134,7 @@ Status OplogQueryMetadata::writeToMetadata(BSONObjBuilder* builder) const { oqMetadataBuilder.append(kRBIDFieldName, _rbid); oqMetadataBuilder.append(kPrimaryIndexFieldName, _currentPrimaryIndex); oqMetadataBuilder.append(kSyncSourceIndexFieldName, _currentSyncSourceIndex); + oqMetadataBuilder.append(kSyncSourceHostFieldName, _currentSyncSourceHost); oqMetadataBuilder.doneFast(); return Status::OK(); @@ -132,6 +145,7 @@ std::string OplogQueryMetadata::toString() const { output << "OplogQueryMetadata"; output << " Primary Index: " << _currentPrimaryIndex; output << " Sync Source Index: " << _currentSyncSourceIndex; + output << " Sync Source Host: " << _currentSyncSourceHost; output << " RBID: " << _rbid; output << " Last Op Committed: " << _lastOpCommitted.toString(); output << " Last Op Applied: " << _lastOpApplied.toString(); diff --git a/src/mongo/rpc/metadata/oplog_query_metadata.h b/src/mongo/rpc/metadata/oplog_query_metadata.h index 1f8cc2705be..331cc761fda 100644 --- a/src/mongo/rpc/metadata/oplog_query_metadata.h +++ b/src/mongo/rpc/metadata/oplog_query_metadata.h @@ -57,7 +57,8 @@ public: repl::OpTime lastOpApplied, int rbid, int currentPrimaryIndex, - int currentSyncSourceIndex); + int currentSyncSourceIndex, + std::string currentSyncSourceHost); /** * format: @@ -107,6 +108,14 @@ public: } /** + * Returns the host of the sync source of the sender. + * Returns empty string if it has no sync source. + */ + std::string getSyncSourceHost() const { + return _currentSyncSourceHost; + } + + /** * Returns the current rbid of the sender. */ int getRBID() const { @@ -124,6 +133,7 @@ private: int _rbid = -1; int _currentPrimaryIndex = kNoPrimary; int _currentSyncSourceIndex = -1; + std::string _currentSyncSourceHost; }; } // namespace rpc diff --git a/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp b/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp index 9f07a7775ad..e881b2d20f5 100644 --- a/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp +++ b/src/mongo/rpc/metadata/oplog_query_metadata_test.cpp @@ -43,7 +43,7 @@ TEST(ReplResponseMetadataTest, OplogQueryMetadataRoundtrip) { OpTime opTime1(Timestamp(1234, 100), 5); Date_t committedWall = Date_t() + Seconds(opTime1.getSecs()); OpTime opTime2(Timestamp(7777, 101), 6); - OplogQueryMetadata metadata({opTime1, committedWall}, opTime2, 6, 12, -1); + OplogQueryMetadata metadata({opTime1, committedWall}, opTime2, 6, 12, -1, ""); ASSERT_EQ(opTime1, metadata.getLastOpCommitted().opTime); ASSERT_EQ(committedWall, metadata.getLastOpCommitted().wallTime); @@ -52,12 +52,14 @@ TEST(ReplResponseMetadataTest, OplogQueryMetadataRoundtrip) { BSONObjBuilder builder; metadata.writeToMetadata(&builder).transitional_ignore(); - BSONObj expectedObj(BSON( - kOplogQueryMetadataFieldName << BSON( - "lastOpCommitted" << BSON("ts" << opTime1.getTimestamp() << "t" << opTime1.getTerm()) - << "lastCommittedWall" << committedWall << "lastOpApplied" - << BSON("ts" << opTime2.getTimestamp() << "t" << opTime2.getTerm()) - << "rbid" << 6 << "primaryIndex" << 12 << "syncSourceIndex" << -1))); + BSONObj expectedObj( + BSON(kOplogQueryMetadataFieldName + << BSON("lastOpCommitted" + << BSON("ts" << opTime1.getTimestamp() << "t" << opTime1.getTerm()) + << "lastCommittedWall" << committedWall << "lastOpApplied" + << BSON("ts" << opTime2.getTimestamp() << "t" << opTime2.getTerm()) << "rbid" + << 6 << "primaryIndex" << 12 << "syncSourceIndex" << -1 << "syncSourceHost" + << ""))); BSONObj serializedObj = builder.obj(); ASSERT_BSONOBJ_EQ(expectedObj, serializedObj); |