diff options
Diffstat (limited to 'src/mongo/db')
22 files changed, 187 insertions, 121 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index b1bdb3eff66..77daa595256 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -110,10 +110,11 @@ public: ReplicationCoordinator* replicationCoordinator, ReplicationCoordinatorExternalState* replicationCoordinatorExternalState, BackgroundSync* bgsync); - bool shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) override; + ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) override; private: BackgroundSync* _bgsync; @@ -126,17 +127,18 @@ DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackground : DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState), _bgsync(bgsync) {} -bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching( +ChangeSyncSourceAction DataReplicatorExternalStateBackgroundSync::shouldStopFetching( const HostAndPort& source, const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, const OpTime& lastOpTimeFetched) { if (_bgsync->shouldStopFetching()) { - return true; + return ChangeSyncSourceAction::kStopSyncingAndEnqueueLastBatch; } return DataReplicatorExternalStateImpl::shouldStopFetching( - source, replMetadata, oqMetadata, lastOpTimeFetched); + source, replMetadata, oqMetadata, previousOpTimeFetched, lastOpTimeFetched); } size_t getSize(const BSONObj& o) { diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index f1906c50b3c..2d2e56df3d1 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -37,6 +37,7 @@ #include "mongo/db/repl/optime_with.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/replication_consistency_markers.h" +#include "mongo/db/repl/sync_source_selector.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/util/concurrency/thread_pool.h" @@ -95,10 +96,11 @@ public: * sync source (from metadata); and whether this sync source has a sync source (also from * metadata). */ - virtual bool shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) = 0; + virtual ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) = 0; /** * This function creates an oplog buffer of the type specified at server startup. diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 385883939fd..6849dd52418 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -91,13 +91,16 @@ void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata } } -bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) { +ChangeSyncSourceAction DataReplicatorExternalStateImpl::shouldStopFetching( + const HostAndPort& source, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) { // Re-evaluate quality of sync target. - if (_replicationCoordinator->shouldChangeSyncSource( - source, replMetadata, oqMetadata, lastOpTimeFetched)) { + auto changeSyncSourceAction = _replicationCoordinator->shouldChangeSyncSource( + source, replMetadata, oqMetadata, previousOpTimeFetched, lastOpTimeFetched); + if (changeSyncSourceAction != ChangeSyncSourceAction::kContinueSyncing) { LOGV2(21150, "Canceling oplog query due to OplogQueryMetadata. We have to choose a new " "sync source. Current source: {syncSource}, OpTime {lastAppliedOpTime}, " @@ -107,10 +110,8 @@ bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& sour "syncSource"_attr = source, "lastAppliedOpTime"_attr = oqMetadata.getLastOpApplied(), "syncSourceIndex"_attr = oqMetadata.getSyncSourceIndex()); - - return true; } - return false; + return changeSyncSourceAction; } std::unique_ptr<OplogBuffer> DataReplicatorExternalStateImpl::makeInitialSyncOplogBuffer( diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index 353b9b532e6..3741fbae7c4 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -54,10 +54,11 @@ public: void processMetadata(const rpc::ReplSetMetadata& replMetadata, rpc::OplogQueryMetadata oqMetadata) override; - bool shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) override; + ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) override; std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const override; diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp index 11cbe65f911..f26f2a1dd0a 100644 --- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp @@ -40,16 +40,18 @@ DataReplicatorExternalStateInitialSync::DataReplicatorExternalStateInitialSync( : DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState) { } -bool DataReplicatorExternalStateInitialSync::shouldStopFetching(const HostAndPort&, - const rpc::ReplSetMetadata&, - const rpc::OplogQueryMetadata&, - const OpTime& lastOpTimeFetched) { +ChangeSyncSourceAction DataReplicatorExternalStateInitialSync::shouldStopFetching( + const HostAndPort&, + const rpc::ReplSetMetadata&, + const rpc::OplogQueryMetadata&, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) { // Since initial sync does not allow for sync source changes, it should not check if there are // better sync sources. If there is a problem on the sync source, it will manifest itself in the // cloning phase as well, and cause a failure there. - return false; + return ChangeSyncSourceAction::kContinueSyncing; } } // namespace repl diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h index e656f0002da..c3c8195928c 100644 --- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h +++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h @@ -44,10 +44,11 @@ public: ReplicationCoordinator* replicationCoordinator, ReplicationCoordinatorExternalState* replicationCoordinatorExternalState); - bool shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) override; + ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) override; }; } // namespace repl diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index 0bc385a6708..305c4e1abd2 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -89,10 +89,12 @@ void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata metadataWasProcessed = true; } -bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) { +ChangeSyncSourceAction DataReplicatorExternalStateMock::shouldStopFetching( + const HostAndPort& source, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) { lastSyncSourceChecked = source; syncSourceLastOpTime = oqMetadata.getLastOpApplied(); syncSourceHasSyncSource = oqMetadata.getSyncSourceIndex() != -1; diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index ce53a542eb1..a9155c30b9e 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -51,10 +51,11 @@ public: void processMetadata(const rpc::ReplSetMetadata& metadata, rpc::OplogQueryMetadata oqMetadata) override; - bool shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) override; + ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) override; std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const override; @@ -86,7 +87,7 @@ public: bool syncSourceHasSyncSource = false; // Returned by shouldStopFetching. - bool shouldStopFetchingResult = false; + ChangeSyncSourceAction shouldStopFetchingResult = ChangeSyncSourceAction::kContinueSyncing; // Override to change applyOplogBatch behavior. using ApplyOplogBatchFn = std::function<StatusWith<OpTime>( diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 9ea641dfc5a..8adea565621 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -159,12 +159,13 @@ public: void blacklistSyncSource(const HostAndPort& host, Date_t until) override { _syncSourceSelector->blacklistSyncSource(host, until); } - bool shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) override { + ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) override { return _syncSourceSelector->shouldChangeSyncSource( - currentSource, replMetadata, oqMetadata, lastOpTimeFetched); + currentSource, replMetadata, oqMetadata, previousOpTimeFetched, lastOpTimeFetched); } void scheduleNetworkResponse(std::string cmdName, const BSONObj& obj) { diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index ebc0e522a11..7bdd2af9b90 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -680,14 +680,16 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { } // This lastFetched value is the last OpTime from the previous batch. - auto lastFetched = _getLastOpTimeFetched(); + auto previousOpTimeFetched = _getLastOpTimeFetched(); auto validateResult = OplogFetcher::validateDocuments( - documents, _firstBatch, lastFetched.getTimestamp(), _startingPoint); + documents, _firstBatch, previousOpTimeFetched.getTimestamp(), _startingPoint); if (!validateResult.isOK()) { return validateResult.getStatus(); } auto info = validateResult.getValue(); + // If the batch is empty, set 'lastDocOpTime' to the lastFetched from the previous batch. + auto lastDocOpTime = info.lastDocument.isNull() ? previousOpTimeFetched : info.lastDocument; // Process replset metadata. It is important that this happen after we've validated the // first batch, so we don't progress our knowledge of the commit point from a @@ -704,6 +706,24 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { return metadataResult.getStatus(); } auto replSetMetadata = metadataResult.getValue(); + + // Determine if we should stop syncing from our current sync source. + auto changeSyncSourceAction = _dataReplicatorExternalState->shouldStopFetching( + _source, replSetMetadata, oqMetadata, previousOpTimeFetched, lastDocOpTime); + str::stream errMsg; + errMsg << "sync source " << _source.toString(); + errMsg << " (config version: " << replSetMetadata.getConfigVersion(); + errMsg << "; last applied optime: " << oqMetadata.getLastOpApplied().toString(); + errMsg << "; sync source index: " << oqMetadata.getSyncSourceIndex(); + errMsg << "; has primary index: " << oqMetadata.hasPrimaryIndex(); + errMsg << ") is no longer valid"; + errMsg << " previous batch last fetched optime: " << previousOpTimeFetched.toString(); + errMsg << " current batch last fetched optime: " << lastDocOpTime.toString(); + + if (changeSyncSourceAction == ChangeSyncSourceAction::kStopSyncingAndDropLastBatch) { + return Status(ErrorCodes::InvalidSyncSource, errMsg); + } + _dataReplicatorExternalState->processMetadata(replSetMetadata, oqMetadata); // Increment stats. We read all of the docs in the query. @@ -717,6 +737,10 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { return status; } + if (changeSyncSourceAction == ChangeSyncSourceAction::kStopSyncingAndEnqueueLastBatch) { + return Status(ErrorCodes::InvalidSyncSource, errMsg); + } + if (MONGO_unlikely(hangOplogFetcherBeforeAdvancingLastFetched.shouldFail())) { hangOplogFetcherBeforeAdvancingLastFetched.pauseWhileSet(); } @@ -725,14 +749,9 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { // of this fetcher. _startingPoint = StartingPoint::kSkipFirstDoc; - // We have now processed the batch and should move forward our view of _lastFetched. - if (documents.size() > 0) { - auto lastDocOpTimeRes = OpTime::parseFromOplogEntry(documents.back()); - if (!lastDocOpTimeRes.isOK()) { - return lastDocOpTimeRes.getStatus(); - } - - auto lastDocOpTime = lastDocOpTimeRes.getValue(); + // We have now processed the batch. We should only move forward our view of _lastFetched if the + // batch was not empty. + if (lastDocOpTime != previousOpTimeFetched) { LOGV2_DEBUG(21273, 3, "Oplog fetcher setting last fetched optime ahead after batch: {lastDocOpTime}", @@ -743,22 +762,6 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { _lastFetched = lastDocOpTime; } - // Get the last fetched optime from the most recent batch. - lastFetched = _getLastOpTimeFetched(); - - if (_dataReplicatorExternalState->shouldStopFetching( - _source, replSetMetadata, oqMetadata, lastFetched)) { - str::stream errMsg; - errMsg << "sync source " << _source.toString(); - errMsg << " (config version: " << replSetMetadata.getConfigVersion(); - errMsg << "; last applied optime: " << oqMetadata.getLastOpApplied().toString(); - errMsg << "; sync source index: " << oqMetadata.getSyncSourceIndex(); - errMsg << "; has primary index: " << oqMetadata.hasPrimaryIndex(); - errMsg << ") is no longer valid"; - errMsg << "last fetched optime: " << lastFetched.toString(); - return Status(ErrorCodes::InvalidSyncSource, errMsg); - } - _firstBatch = false; return Status::OK(); } diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 88086deaeeb..2359c1ba250 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -323,7 +323,9 @@ protected: * Tests checkSyncSource result handling. */ void testSyncSourceChecking(const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata); + const rpc::OplogQueryMetadata& oqMetadata, + ChangeSyncSourceAction changeSyncSourceAction = + ChangeSyncSourceAction::kStopSyncingAndEnqueueLastBatch); void validateLastBatch(bool skipFirstDoc, OplogFetcher::Documents docs, OpTime lastFetched); @@ -478,14 +480,15 @@ std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(const Messag } void OplogFetcherTest::testSyncSourceChecking(const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata) { + const rpc::OplogQueryMetadata& oqMetadata, + ChangeSyncSourceAction changeSyncSourceAction) { auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); auto metadataObj = makeOplogBatchMetadata(replMetadata, oqMetadata); - dataReplicatorExternalState->shouldStopFetchingResult = true; + dataReplicatorExternalState->shouldStopFetchingResult = changeSyncSourceAction; auto shutdownState = processSingleBatch(makeFirstBatch(0, {firstEntry, secondEntry, thirdEntry}, metadataObj), @@ -1832,6 +1835,10 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetc ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); ASSERT_EQUALS(oqMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime); ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource); + + // We should have enqueued the last batch if the 'shouldStopFetching' check returns + // kStopSyncingAndEnqueueLastBatch. + ASSERT_FALSE(lastEnqueuedDocuments.empty()); } TEST_F(OplogFetcherTest, @@ -1845,6 +1852,19 @@ TEST_F(OplogFetcherTest, ASSERT_EQUALS(oplogQueryMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime); ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); + + // We should have enqueued the last batch if the 'shouldStopFetching' check returns + // kStopSyncingAndEnqueueLastBatch. + ASSERT_FALSE(lastEnqueuedDocuments.empty()); +} + +TEST_F(OplogFetcherTest, FailedSyncSourceCheckReturnsStopSyncingAndDropBatch) { + testSyncSourceChecking( + replSetMetadata, oqMetadata, ChangeSyncSourceAction::kStopSyncingAndDropLastBatch); + + // If the 'shouldStopFetching' check returns kStopSyncingAndDropLastBatch, we should not enqueue + // any documents. + ASSERT_TRUE(lastEnqueuedDocuments.empty()); } TEST_F(OplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index c058325362e..ea39a6edcba 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -4803,20 +4803,30 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opC _reportUpstream_inlock(std::move(lock)); } -bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) { +ChangeSyncSourceAction ReplicationCoordinatorImpl::shouldChangeSyncSource( + const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) { stdx::lock_guard<Latch> lock(_mutex); const auto now = _replExecutor->now(); + if (_topCoord->shouldChangeSyncSource( currentSource, replMetadata, oqMetadata, lastOpTimeFetched, now)) { - return true; + return ChangeSyncSourceAction::kStopSyncingAndEnqueueLastBatch; } const auto readPreference = _getSyncSourceReadPreference(lock); - return _topCoord->shouldChangeSyncSourceDueToPingTime( - currentSource, _getMemberState_inlock(), lastOpTimeFetched, now, readPreference); + if (_topCoord->shouldChangeSyncSourceDueToPingTime( + currentSource, _getMemberState_inlock(), previousOpTimeFetched, now, readPreference)) { + // We should drop the last batch if we find a significantly closer node. This is to + // avoid advancing our 'lastFetched', which makes it more likely that we will be able to + // choose the closer node as our sync source. + return ChangeSyncSourceAction::kStopSyncingAndDropLastBatch; + } + + return ChangeSyncSourceAction::kContinueSyncing; } void ReplicationCoordinatorImpl::_updateLastCommittedOpTimeAndWallTime(WithLock lk) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index cd9d487b4bc..354304e2a5f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -283,10 +283,11 @@ public: virtual void resetLastOpTimesFromOplog(OperationContext* opCtx, DataConsistency consistency) override; - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) override; + virtual ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) override; virtual OpTime getLastCommittedOpTime() const override; virtual OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const override; diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index dd7ca0f8e07..a531cb41da2 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -501,10 +501,12 @@ bool ReplicationCoordinatorMock::lastOpTimesWereReset() const { return _resetLastOpTimesCalled; } -bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) { +ChangeSyncSourceAction ReplicationCoordinatorMock::shouldChangeSyncSource( + const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index a79fc32478d..cafb302372a 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -247,10 +247,11 @@ public: bool lastOpTimesWereReset() const; - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched); + virtual ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched); virtual OpTime getLastCommittedOpTime() const; diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp index d6d2232bcc4..a85689d66e3 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.cpp +++ b/src/mongo/db/repl/replication_coordinator_noop.cpp @@ -396,10 +396,12 @@ void ReplicationCoordinatorNoOp::resetLastOpTimesFromOplog(OperationContext*, Da MONGO_UNREACHABLE; } -bool ReplicationCoordinatorNoOp::shouldChangeSyncSource(const HostAndPort&, - const rpc::ReplSetMetadata&, - const rpc::OplogQueryMetadata&, - const OpTime&) { +ChangeSyncSourceAction ReplicationCoordinatorNoOp::shouldChangeSyncSource( + const HostAndPort&, + const rpc::ReplSetMetadata&, + const rpc::OplogQueryMetadata&, + const OpTime&, + const OpTime&) { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h index a5c96eec3b0..cef32c185e2 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.h +++ b/src/mongo/db/repl/replication_coordinator_noop.h @@ -212,10 +212,11 @@ public: void resetLastOpTimesFromOplog(OperationContext*, DataConsistency) final; - bool shouldChangeSyncSource(const HostAndPort&, - const rpc::ReplSetMetadata&, - const rpc::OplogQueryMetadata&, - const OpTime&) final; + ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort&, + const rpc::ReplSetMetadata&, + const rpc::OplogQueryMetadata&, + const OpTime&, + const OpTime&) final; OpTime getLastCommittedOpTime() const final; diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h index 2bed6b70249..98646426da4 100644 --- a/src/mongo/db/repl/sync_source_selector.h +++ b/src/mongo/db/repl/sync_source_selector.h @@ -48,6 +48,12 @@ namespace repl { class OpTime; struct SyncSourceResolverResponse; +enum class ChangeSyncSourceAction { + kContinueSyncing, + kStopSyncingAndDropLastBatch, + kStopSyncingAndEnqueueLastBatch +}; + /** * Manage list of viable and blocked sync sources that we can replicate from. */ @@ -84,10 +90,11 @@ public: * * "now" is used to skip over currently blacklisted sync sources. */ - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) = 0; + virtual ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_source_selector_mock.cpp b/src/mongo/db/repl/sync_source_selector_mock.cpp index e77e5e3e29a..d684c957ef7 100644 --- a/src/mongo/db/repl/sync_source_selector_mock.cpp +++ b/src/mongo/db/repl/sync_source_selector_mock.cpp @@ -56,11 +56,13 @@ void SyncSourceSelectorMock::setChooseNewSyncSourceHook_forTest( _chooseNewSyncSourceHook = hook; } -bool SyncSourceSelectorMock::shouldChangeSyncSource(const HostAndPort&, - const rpc::ReplSetMetadata&, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) { - return false; +ChangeSyncSourceAction SyncSourceSelectorMock::shouldChangeSyncSource( + const HostAndPort&, + const rpc::ReplSetMetadata&, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) { + return ChangeSyncSourceAction::kContinueSyncing; } void SyncSourceSelectorMock::setChooseNewSyncSourceResult_forTest(const HostAndPort& syncSource) { diff --git a/src/mongo/db/repl/sync_source_selector_mock.h b/src/mongo/db/repl/sync_source_selector_mock.h index fd295cf4590..102a16452c8 100644 --- a/src/mongo/db/repl/sync_source_selector_mock.h +++ b/src/mongo/db/repl/sync_source_selector_mock.h @@ -53,10 +53,11 @@ public: void clearSyncSourceBlacklist() override; HostAndPort chooseNewSyncSource(const OpTime& ot) override; void blacklistSyncSource(const HostAndPort& host, Date_t until) override; - bool shouldChangeSyncSource(const HostAndPort&, - const rpc::ReplSetMetadata&, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& lastOpTimeFetched) override; + ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort&, + const rpc::ReplSetMetadata&, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) override; /** * Sets a function that will be run every time chooseNewSyncSource() is called. diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 8146e848f0a..8bc7cf36a2c 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -3016,7 +3016,7 @@ bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSourc "Choosing new sync source. Our current sync source is not primary and does " "not have a sync source, so we require that it is ahead of us", "syncSource"_attr = currentSource, - "lastFetchedOpTime"_attr = lastOpTimeFetched, + "lastOpTimeFetched"_attr = lastOpTimeFetched, "syncSourceLatestOplogOpTime"_attr = currentSourceOpTime, "isPrimary"_attr = replMetadata.getIsPrimary()); return true; @@ -3069,7 +3069,7 @@ bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSourc bool TopologyCoordinator::shouldChangeSyncSourceDueToPingTime(const HostAndPort& currentSource, const MemberState& memberState, - const OpTime& lastOpTimeFetched, + const OpTime& previousOpTimeFetched, Date_t now, const ReadPreference readPreference) { // If we find an eligible sync source that is significantly closer than our current sync source, @@ -3146,8 +3146,11 @@ bool TopologyCoordinator::shouldChangeSyncSourceDueToPingTime(const HostAndPort& continue; } - if (_isEligibleSyncSource( - candidateIndex, now, lastOpTimeFetched, readPreference, true /* firstAttempt */)) { + if (_isEligibleSyncSource(candidateIndex, + now, + previousOpTimeFetched, + readPreference, + true /* firstAttempt */)) { LOGV2(4744901, "Choosing new sync source because we have found another potential sync " "source that is significantly closer than our current sync source", diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index b2675ebad17..3810c0e1010 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -277,7 +277,7 @@ public: */ bool shouldChangeSyncSourceDueToPingTime(const HostAndPort& currentSource, const MemberState& memberState, - const OpTime& lastOpTimeFetched, + const OpTime& previousOpTimeFetched, Date_t now, const ReadPreference readPreference); |