diff options
26 files changed, 427 insertions, 42 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index dc8ad23ed71..8217e38090a 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -114,7 +114,10 @@ public: const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) override; + const OpTime& lastOpTimeFetched) const final; + + ChangeSyncSourceAction shouldStopFetchingOnError(const HostAndPort& source, + const OpTime& lastOpTimeFetched) const final; private: BackgroundSync* _bgsync; @@ -132,7 +135,7 @@ ChangeSyncSourceAction DataReplicatorExternalStateBackgroundSync::shouldStopFetc const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) { + const OpTime& lastOpTimeFetched) const { if (_bgsync->shouldStopFetching()) { return ChangeSyncSourceAction::kStopSyncingAndEnqueueLastBatch; } @@ -141,6 +144,15 @@ ChangeSyncSourceAction DataReplicatorExternalStateBackgroundSync::shouldStopFetc source, replMetadata, oqMetadata, previousOpTimeFetched, lastOpTimeFetched); } +ChangeSyncSourceAction DataReplicatorExternalStateBackgroundSync::shouldStopFetchingOnError( + const HostAndPort& source, const OpTime& lastOpTimeFetched) const { + if (_bgsync->shouldStopFetching()) { + return ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent; + } + + return DataReplicatorExternalStateImpl::shouldStopFetchingOnError(source, lastOpTimeFetched); +} + size_t getSize(const BSONObj& o) { // SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion return static_cast<size_t>(o.objsize()); diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index 5649195b807..87826b0f199 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -101,7 +101,14 @@ public: const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) = 0; + const OpTime& lastOpTimeFetched) const = 0; + + /** + * Evaluates quality of sync source. This is intended to be called on error when no + * current metadata is available. + */ + virtual ChangeSyncSourceAction shouldStopFetchingOnError( + const HostAndPort& source, const OpTime& lastOpTimeFetched) const = 0; /** * This function creates an oplog buffer of the type specified at server startup. diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 2621917a862..00c924ff1ea 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -100,7 +100,7 @@ ChangeSyncSourceAction DataReplicatorExternalStateImpl::shouldStopFetching( const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) { + const OpTime& lastOpTimeFetched) const { // Re-evaluate quality of sync target. auto changeSyncSourceAction = _replicationCoordinator->shouldChangeSyncSource( source, replMetadata, oqMetadata, previousOpTimeFetched, lastOpTimeFetched); @@ -118,6 +118,18 @@ ChangeSyncSourceAction DataReplicatorExternalStateImpl::shouldStopFetching( return changeSyncSourceAction; } +ChangeSyncSourceAction DataReplicatorExternalStateImpl::shouldStopFetchingOnError( + const HostAndPort& source, const OpTime& lastOpTimeFetched) const { + auto changeSyncSourceAction = + _replicationCoordinator->shouldChangeSyncSourceOnError(source, lastOpTimeFetched); + if (changeSyncSourceAction != ChangeSyncSourceAction::kContinueSyncing) { + LOGV2(6341701, + "Canceling oplog query on fetch error. We have to choose a new sync source", + "syncSource"_attr = source); + } + return changeSyncSourceAction; +} + std::unique_ptr<OplogBuffer> DataReplicatorExternalStateImpl::makeInitialSyncOplogBuffer( OperationContext* opCtx) const { if (initialSyncOplogBuffer == kCollectionOplogBufferName) { diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index 080483955b4..c408c484dc9 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -59,7 +59,10 @@ public: const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) override; + const OpTime& lastOpTimeFetched) const override; + + ChangeSyncSourceAction shouldStopFetchingOnError( + const HostAndPort& source, const OpTime& lastOpTimeFetched) const override; std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const override; diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp index f26f2a1dd0a..abb8f3bebd8 100644 --- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp @@ -45,7 +45,7 @@ ChangeSyncSourceAction DataReplicatorExternalStateInitialSync::shouldStopFetchin const rpc::ReplSetMetadata&, const rpc::OplogQueryMetadata&, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) { + const OpTime& lastOpTimeFetched) const { // Since initial sync does not allow for sync source changes, it should not check if there are // better sync sources. If there is a problem on the sync source, it will manifest itself in the @@ -54,5 +54,11 @@ ChangeSyncSourceAction DataReplicatorExternalStateInitialSync::shouldStopFetchin return ChangeSyncSourceAction::kContinueSyncing; } +ChangeSyncSourceAction DataReplicatorExternalStateInitialSync::shouldStopFetchingOnError( + const HostAndPort&, const OpTime& lastOpTimeFetched) const { + + return ChangeSyncSourceAction::kContinueSyncing; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h index c3c8195928c..55316e3da16 100644 --- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h +++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h @@ -48,7 +48,10 @@ public: const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) override; + const OpTime& lastOpTimeFetched) const override; + + ChangeSyncSourceAction shouldStopFetchingOnError( + const HostAndPort& source, const OpTime& lastOpTimeFetched) const override; }; } // namespace repl diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index dddc2e4f9ba..ddcfc701ca6 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -98,13 +98,19 @@ ChangeSyncSourceAction DataReplicatorExternalStateMock::shouldStopFetching( const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) { + const OpTime& lastOpTimeFetched) const { lastSyncSourceChecked = source; syncSourceLastOpTime = oqMetadata.getLastOpApplied(); syncSourceHasSyncSource = oqMetadata.getSyncSourceIndex() != -1; return shouldStopFetchingResult; } +ChangeSyncSourceAction DataReplicatorExternalStateMock::shouldStopFetchingOnError( + const HostAndPort& source, const OpTime& lastOpTimeFetched) const { + lastSyncSourceChecked = source; + return shouldStopFetchingResult; +} + std::unique_ptr<OplogBuffer> DataReplicatorExternalStateMock::makeInitialSyncOplogBuffer( OperationContext* opCtx) const { return std::make_unique<OplogBufferBlockingQueue>(); diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index 789e6a63ed3..535ee513102 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -56,7 +56,10 @@ public: const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) override; + const OpTime& lastOpTimeFetched) const override; + + ChangeSyncSourceAction shouldStopFetchingOnError( + const HostAndPort& source, const OpTime& lastOpTimeFetched) const override; std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const override; @@ -89,9 +92,9 @@ public: bool metadataWasProcessed = false; // Set by shouldStopFetching. - HostAndPort lastSyncSourceChecked; - OpTime syncSourceLastOpTime; - bool syncSourceHasSyncSource = false; + mutable HostAndPort lastSyncSourceChecked; + mutable OpTime syncSourceLastOpTime; + mutable bool syncSourceHasSyncSource = false; // Returned by shouldStopFetching. ChangeSyncSourceAction shouldStopFetchingResult = ChangeSyncSourceAction::kContinueSyncing; diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 9420e749159..7956564d23a 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -178,11 +178,16 @@ public: const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) override { + const OpTime& lastOpTimeFetched) const override { return _syncSourceSelector->shouldChangeSyncSource( currentSource, replMetadata, oqMetadata, previousOpTimeFetched, lastOpTimeFetched); } + ChangeSyncSourceAction shouldChangeSyncSourceOnError( + const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const override { + return _syncSourceSelector->shouldChangeSyncSourceOnError(currentSource, lastOpTimeFetched); + } + void scheduleNetworkResponse(std::string cmdName, const BSONObj& obj) { NetworkInterfaceMock* net = getNet(); if (!net->hasReadyRequests()) { diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index e05f286fb05..816b5f19fa6 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -400,11 +400,17 @@ void OplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& callbac auto batchResult = _getNextBatch(); if (!batchResult.isOK()) { auto brStatus = batchResult.getStatus(); + // Determine if we should stop syncing from our current sync source. If we're going + // to change sync sources anyway, do it immediately rather than checking if we can + // retry the error. + const bool stopFetching = _dataReplicatorExternalState->shouldStopFetchingOnError( + _config.source, _getLastOpTimeFetched()) != + ChangeSyncSourceAction::kContinueSyncing; // Recreate a cursor if we have enough retries left. // If we are a TenantOplogFetcher, we never retry as we will always restart the // TenantMigrationRecipient state machine on failure. So instead, we just fail and exit. - if (_oplogFetcherRestartDecision->shouldContinue(this, brStatus) && + if (!stopFetching && _oplogFetcherRestartDecision->shouldContinue(this, brStatus) && !_config.forTenantMigration) { hangBeforeOplogFetcherRetries.pauseWhileSet(); _cursor.reset(); @@ -492,6 +498,9 @@ Status OplogFetcher::_connect() { } }(); } while (!connectStatus.isOK() && + _dataReplicatorExternalState->shouldStopFetchingOnError(_config.source, + _getLastOpTimeFetched()) == + ChangeSyncSourceAction::kContinueSyncing && _oplogFetcherRestartDecision->shouldContinue(this, connectStatus)); return connectStatus; @@ -888,7 +897,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { errMsg << " previous batch last fetched optime: " << previousOpTimeFetched.toString(); errMsg << " current batch last fetched optime: " << lastDocOpTime.toString(); - if (changeSyncSourceAction == ChangeSyncSourceAction::kStopSyncingAndDropLastBatch) { + if (changeSyncSourceAction == ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent) { return Status(ErrorCodes::InvalidSyncSource, errMsg); } diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index eca60c5d2a3..96003452793 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -1236,6 +1236,48 @@ TEST_F(OplogFetcherTest, DontRecreateNewCursorAfterFailedBatchNoRetries) { ASSERT_EQUALS(ErrorCodes::NetworkTimeout, shutdownState.getStatus()); } +TEST_F(OplogFetcherTest, DontRecreateNewCursorAfterFailedBatchWhenSyncSourceChangeIsExpected) { + ShutdownState shutdownState; + + // Create an oplog fetcher with one retry. + auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1); + + CursorId cursorId = 22LL; + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata); + auto firstBatch = {firstEntry, secondEntry}; + + // Update lastFetched before it is updated by getting the next batch. + lastFetched = oplogFetcher->getLastOpTimeFetched_forTest(); + + // Creating the cursor will succeed. + auto m = processSingleRequestResponse(oplogFetcher->getDBClientConnection_forTest(), + makeFirstBatch(cursorId, firstBatch, metadataObj), + true); + + validateFindCommand( + m, lastFetched, durationCount<Milliseconds>(oplogFetcher->getInitialFindMaxTime_forTest())); + + // Check that the first batch was successfully processed. + validateLastBatch( + true /* skipFirstDoc */, firstBatch, oplogFetcher->getLastOpTimeFetched_forTest()); + + // Mock a result that tells us to stop syncing. + dataReplicatorExternalState->shouldStopFetchingResult = + ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent; + + // This will cause the oplog fetcher to fail while getting the next batch. Since we're expecting + // a sync source change, the oplog fetcher will shut down. + processSingleRequestResponse( + oplogFetcher->getDBClientConnection_forTest(), + mongo::Status{mongo::ErrorCodes::NetworkTimeout, "Fake socket timeout"}); + + oplogFetcher->join(); + + ASSERT_EQUALS(ErrorCodes::NetworkTimeout, shutdownState.getStatus()); +} + TEST_F(OplogFetcherTest, FailCreateNewCursorAfterFailedBatchRetriesShutsDownOplogFetcher) { ShutdownState shutdownState; @@ -2012,10 +2054,10 @@ TEST_F(OplogFetcherTest, TEST_F(OplogFetcherTest, FailedSyncSourceCheckReturnsStopSyncingAndDropBatch) { testSyncSourceChecking( - replSetMetadata, oqMetadata, ChangeSyncSourceAction::kStopSyncingAndDropLastBatch); + replSetMetadata, oqMetadata, ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent); - // If the 'shouldStopFetching' check returns kStopSyncingAndDropLastBatch, we should not enqueue - // any documents. + // If the 'shouldStopFetching' check returns kStopSyncingAndDropLastBatchIfPresent, we should + // not enqueue any documents. ASSERT_TRUE(lastEnqueuedDocuments.empty()); } @@ -2199,14 +2241,50 @@ TEST_F(OplogFetcherTest, OplogFetcherRetriesConnectionButFails) { // Shutdown the mock remote server before the OplogFetcher tries to connect. _mockServer->shutdown(); + startCapturingLogMessages(); // Create an OplogFetcher with 1 retry attempt. This will also ensure that _runQuery was // scheduled before returning. auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1); oplogFetcher->join(); + stopCapturingLogMessages(); // This is the error code for connection failures. ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus()); + + // We should see one retry + ASSERT_EQUALS(1, countBSONFormatLogLinesIsSubset(BSON("id" << 21274))); + + // We should one failure to retry + ASSERT_EQUALS(1, countBSONFormatLogLinesIsSubset(BSON("id" << 21275))); +} + +TEST_F(OplogFetcherTest, OplogFetcherDoesNotRetryConnectionWhenSyncSourceChangeIsExpected) { + // Test that OplogFetcher does not retry after failing the initial connection if we've gotten + // a better sync source in the meantime. + ShutdownState shutdownState; + + // Mock a result that tells us to stop syncing. + dataReplicatorExternalState->shouldStopFetchingResult = + ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent; + + // Shutdown the mock remote server before the OplogFetcher tries to connect. + _mockServer->shutdown(); + + startCapturingLogMessages(); + // Create an OplogFetcher with 1 retry attempt. This will also ensure that _runQuery was + // scheduled before returning. + auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1); + + oplogFetcher->join(); + stopCapturingLogMessages(); + + // This is the error code for connection failures. + ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus()); + + // We should not see retry attempts. + ASSERT_EQUALS(0, countBSONFormatLogLinesIsSubset(BSON("id" << 21274))); + ASSERT_EQUALS(0, countBSONFormatLogLinesIsSubset(BSON("id" << 21275))); } TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeReconnect) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index a17f698b8ab..da5fbd375e9 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -5066,7 +5066,7 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const { return getReplicationMode() != modeNone; } -const ReadPreference ReplicationCoordinatorImpl::_getSyncSourceReadPreference(WithLock) { +const ReadPreference ReplicationCoordinatorImpl::_getSyncSourceReadPreference(WithLock) const { // Always allow chaining while in catchup and drain mode. auto memberState = _getMemberState_inlock(); ReadPreference readPreference = ReadPreference::Nearest; @@ -5176,7 +5176,7 @@ ChangeSyncSourceAction ReplicationCoordinatorImpl::shouldChangeSyncSource( const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) { + const OpTime& lastOpTimeFetched) const { stdx::lock_guard<Latch> lock(_mutex); const auto now = _replExecutor->now(); @@ -5191,7 +5191,25 @@ ChangeSyncSourceAction ReplicationCoordinatorImpl::shouldChangeSyncSource( // We should drop the last batch if we find a significantly closer node. This is to // avoid advancing our 'lastFetched', which makes it more likely that we will be able to // choose the closer node as our sync source. - return ChangeSyncSourceAction::kStopSyncingAndDropLastBatch; + return ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent; + } + + return ChangeSyncSourceAction::kContinueSyncing; +} + +ChangeSyncSourceAction ReplicationCoordinatorImpl::shouldChangeSyncSourceOnError( + const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const { + stdx::lock_guard<Latch> lock(_mutex); + const auto now = _replExecutor->now(); + + if (_topCoord->shouldChangeSyncSourceOnError(currentSource, lastOpTimeFetched, now)) { + return ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent; + } + + const auto readPreference = _getSyncSourceReadPreference(lock); + if (_topCoord->shouldChangeSyncSourceDueToPingTime( + currentSource, _getMemberState_inlock(), lastOpTimeFetched, now, readPreference)) { + return ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent; } return ChangeSyncSourceAction::kContinueSyncing; diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index e213e4d18d2..8affe603f30 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -316,11 +316,15 @@ public: virtual void resetLastOpTimesFromOplog(OperationContext* opCtx) override; - virtual ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) override; + virtual ChangeSyncSourceAction shouldChangeSyncSource( + const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) const override; + + virtual ChangeSyncSourceAction shouldChangeSyncSourceOnError( + const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const override; virtual OpTime getLastCommittedOpTime() const override; virtual OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const override; @@ -1543,7 +1547,7 @@ private: /* * Calculates and returns the read preference for the node. */ - const ReadPreference _getSyncSourceReadPreference(WithLock); + const ReadPreference _getSyncSourceReadPreference(WithLock) const; /* * Performs the replica set reconfig procedure. Certain consensus safety checks are omitted when diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index cc5b6a58388..0dfcd6e66e1 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -588,7 +588,12 @@ ChangeSyncSourceAction ReplicationCoordinatorMock::shouldChangeSyncSource( const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) { + const OpTime& lastOpTimeFetched) const { + MONGO_UNREACHABLE; +} + +ChangeSyncSourceAction ReplicationCoordinatorMock::shouldChangeSyncSourceOnError( + const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 9e69b080931..a03227ce33e 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -287,7 +287,10 @@ public: const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched); + const OpTime& lastOpTimeFetched) const; + + virtual ChangeSyncSourceAction shouldChangeSyncSourceOnError( + const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const; virtual OpTime getLastCommittedOpTime() const; diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp index e2d0994e89d..9ca8ffe1d9d 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.cpp +++ b/src/mongo/db/repl/replication_coordinator_noop.cpp @@ -459,7 +459,12 @@ ChangeSyncSourceAction ReplicationCoordinatorNoOp::shouldChangeSyncSource( const rpc::ReplSetMetadata&, const rpc::OplogQueryMetadata&, const OpTime&, - const OpTime&) { + const OpTime&) const { + MONGO_UNREACHABLE; +} + +ChangeSyncSourceAction ReplicationCoordinatorNoOp::shouldChangeSyncSourceOnError( + const HostAndPort&, const OpTime&) const { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h index c355ce73e37..635ac26d371 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.h +++ b/src/mongo/db/repl/replication_coordinator_noop.h @@ -246,7 +246,10 @@ public: const rpc::ReplSetMetadata&, const rpc::OplogQueryMetadata&, const OpTime&, - const OpTime&) final; + const OpTime&) const final; + + ChangeSyncSourceAction shouldChangeSyncSourceOnError(const HostAndPort&, + const OpTime&) const final; OpTime getLastCommittedOpTime() const final; diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h index bf96d335dfe..10590d6d5be 100644 --- a/src/mongo/db/repl/sync_source_selector.h +++ b/src/mongo/db/repl/sync_source_selector.h @@ -50,7 +50,7 @@ struct SyncSourceResolverResponse; enum class ChangeSyncSourceAction { kContinueSyncing, - kStopSyncingAndDropLastBatch, + kStopSyncingAndDropLastBatchIfPresent, kStopSyncingAndEnqueueLastBatch }; @@ -90,11 +90,20 @@ public: * * "now" is used to skip over currently denylisted sync sources. */ - virtual ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - const rpc::OplogQueryMetadata& oqMetadata, - const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) = 0; + virtual ChangeSyncSourceAction shouldChangeSyncSource( + const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) const = 0; + + /* + * Determines if a new sync source should be chosen when an error occures during fetching, + * without attempting retries on the same sync source. + * Because metadata is not available, checks are a subset of those in shouldChangeSyncSource. + */ + virtual ChangeSyncSourceAction shouldChangeSyncSourceOnError( + const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_source_selector_mock.cpp b/src/mongo/db/repl/sync_source_selector_mock.cpp index 51e0b5a2c0f..2368064e4e8 100644 --- a/src/mongo/db/repl/sync_source_selector_mock.cpp +++ b/src/mongo/db/repl/sync_source_selector_mock.cpp @@ -61,7 +61,12 @@ ChangeSyncSourceAction SyncSourceSelectorMock::shouldChangeSyncSource( const rpc::ReplSetMetadata&, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) { + const OpTime& lastOpTimeFetched) const { + return ChangeSyncSourceAction::kContinueSyncing; +} + +ChangeSyncSourceAction SyncSourceSelectorMock::shouldChangeSyncSourceOnError( + const HostAndPort&, const OpTime& lastOpTimeFetched) const { return ChangeSyncSourceAction::kContinueSyncing; } diff --git a/src/mongo/db/repl/sync_source_selector_mock.h b/src/mongo/db/repl/sync_source_selector_mock.h index dde3ccaa457..8253831dd59 100644 --- a/src/mongo/db/repl/sync_source_selector_mock.h +++ b/src/mongo/db/repl/sync_source_selector_mock.h @@ -57,7 +57,10 @@ public: const rpc::ReplSetMetadata&, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) override; + const OpTime& lastOpTimeFetched) const override; + + ChangeSyncSourceAction shouldChangeSyncSourceOnError( + const HostAndPort&, const OpTime& lastOpTimeFetched) const override; /** * Sets a function that will be run every time chooseNewSyncSource() is called. diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 4d983b57979..6cf88435d70 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -219,7 +219,13 @@ public: const rpc::ReplSetMetadata& replMetadata, const rpc::OplogQueryMetadata& oqMetadata, const OpTime& previousOpTimeFetched, - const OpTime& lastOpTimeFetched) final { + const OpTime& lastOpTimeFetched) const final { + return ChangeSyncSourceAction::kContinueSyncing; + } + + // Tenant migration does not re-evaluate sync source on error. + ChangeSyncSourceAction shouldStopFetchingOnError(const HostAndPort& source, + const OpTime& lastOpTimeFetched) const final { return ChangeSyncSourceAction::kContinueSyncing; } diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index e873f0dd6c0..47a043e093e 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -3078,6 +3078,32 @@ bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSourc return false; } +bool TopologyCoordinator::shouldChangeSyncSourceOnError(const HostAndPort& currentSource, + const OpTime& lastOpTimeFetched, + Date_t now) const { + // We change sync source on error if + // 1) A forced sync source change has been requested. + // 2) Chaining is disabled and a new primary has been detected. + // 3) A more eligible node exists. Note this covers the case where our current sync source is + // down. + + auto [initialDecision, currentSourceIndex] = + _shouldChangeSyncSourceInitialChecks(currentSource); + if (initialDecision != ChangeSyncSourceDecision::kMaybe) { + return initialDecision == ChangeSyncSourceDecision::kYes; + } + + if (_shouldChangeSyncSourceDueToNewPrimary(currentSource, currentSourceIndex)) { + return true; + } + + if (_shouldChangeSyncSourceDueToBetterEligibleSource( + currentSource, currentSourceIndex, lastOpTimeFetched, now)) + return true; + + return false; +} + std::pair<TopologyCoordinator::ChangeSyncSourceDecision, int> TopologyCoordinator::_shouldChangeSyncSourceInitialChecks(const HostAndPort& currentSource) const { if (_selfIndex == -1) { diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 0bf275af1cc..3d3084ddf76 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -272,6 +272,14 @@ public: Date_t now) const; /** + * Determines if a new sync source should be chosen when an error occurs. In this case + * we do not have current metadata from the sync source and so can only do a subset of + * the checks we do when we get a response. + */ + bool shouldChangeSyncSourceOnError(const HostAndPort& currentSource, + const OpTime& lastOpTimeFetched, + Date_t now) const; + /** * Returns true if we find an eligible sync source that is significantly closer than our current * sync source. */ diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index 924fbf01768..3898035b08c 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -4101,6 +4101,30 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceIsDown) { ASSERT_EQUALS(1, countLogLinesWithId(5929000)); } +TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceOnErrorWhenSyncSourceIsDown) { + // In this test, the TopologyCoordinator should tell us to change sync sources away from + // "host2" and to "host3" since "host2" is down. + OpTime election = OpTime(); + OpTime oldSyncSourceOpTime = OpTime(Timestamp(4, 0), 0); + // ahead by less than maxSyncSourceLagSecs (30) + OpTime freshOpTime = OpTime(Timestamp(5, 0), 0); + setSelfMemberState(MemberState::RS_SECONDARY); + + HeartbeatResponseAction nextAction = receiveDownHeartbeat(HostAndPort("host2"), "rs0"); + ASSERT_NO_ACTION(nextAction.getAction()); + nextAction = receiveUpHeartbeat( + HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, election, freshOpTime); + ASSERT_NO_ACTION(nextAction.getAction()); + + // set up complete, time for actual check + startCapturingLogMessages(); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSourceOnError( + HostAndPort("host2"), oldSyncSourceOpTime, now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); + ASSERT_EQUALS(1, countLogLinesWithId(5929000)); +} + TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceFromStalePrimary) { // In this test, the TopologyCoordinator should still sync to the primary, "host2", although // "host3" is fresher. @@ -4153,6 +4177,19 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenMemberNotInConfig) { ASSERT_EQUALS(1, countLogLinesWithId(21831)); } +TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceOnErrorWhenMemberNotInConfig) { + // In this test, the TopologyCoordinator should tell us to change sync sources away from + // "host4" since "host4" is absent from the config of version 10. + ReplSetMetadata replMetadata(0, {OpTime(), Date_t()}, OpTime(), 10, 0, OID(), -1, false); + + startCapturingLogMessages(); + ASSERT_TRUE( + getTopoCoord().shouldChangeSyncSourceOnError(HostAndPort("host4"), OpTime(), now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); + ASSERT_EQUALS(1, countLogLinesWithId(21831)); +} + TEST_F(HeartbeatResponseTestV1, ShouldntChangeSyncSourceWhenNotSyncingFromPrimaryAndChainingDisabledButNoNewPrimary) { // In this test, the TopologyCoordinator should not tell us to change sync sources away from @@ -4232,6 +4269,80 @@ TEST_F(HeartbeatResponseTestV1, } TEST_F(HeartbeatResponseTestV1, + ShouldntChangeSyncSourceOnErrorWhenNotSyncingFromPrimaryAndChainingDisabledButNoNewPrimary) { + // In this test, the TopologyCoordinator should not tell us to change sync sources away from + // "host2" since we are not aware of who the new primary is. + + setSelfMemberState(MemberState::RS_SECONDARY); + updateConfig(BSON("_id" + << "rs0" + << "version" << 5 << "term" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "host1:27017") + << BSON("_id" << 1 << "host" + << "host2:27017") + << BSON("_id" << 2 << "host" + << "host3:27017")) + << "protocolVersion" << 1 << "settings" + << BSON("heartbeatTimeoutSecs" << 5 << "chainingAllowed" << false)), + 0); + + OpTime staleOpTime = OpTime(Timestamp(4, 0), 0); + OpTime freshOpTime = OpTime(Timestamp(5, 0), 0); + + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSourceOnError( + HostAndPort("host2"), + staleOpTime, // lastOpTimeFetched so that we are behind host2 + now())); +} + +TEST_F(HeartbeatResponseTestV1, + ShouldChangeSyncSourceOnErrorWhenNotSyncingFromPrimaryChainingDisabledAndFoundNewPrimary) { + // In this test, the TopologyCoordinator should tell us to change sync sources away from + // "host2" since "host3" is the new primary and chaining is disabled. + + setSelfMemberState(MemberState::RS_SECONDARY); + updateConfig(BSON("_id" + << "rs0" + << "version" << 5 << "term" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "host1:27017") + << BSON("_id" << 1 << "host" + << "host2:27017") + << BSON("_id" << 2 << "host" + << "host3:27017")) + << "protocolVersion" << 1 << "settings" + << BSON("heartbeatTimeoutSecs" << 5 << "chainingAllowed" << false)), + 0); + + OpTime oldElection = OpTime(Timestamp(1, 1), 1); + OpTime curElection = OpTime(Timestamp(4, 2), 2); + OpTime staleOpTime = OpTime(Timestamp(4, 1), 1); + OpTime freshOpTime = OpTime(Timestamp(5, 1), 2); + + // Old host should still be up; this is a stale heartbeat. + HeartbeatResponseAction nextAction = receiveUpHeartbeat( + HostAndPort("host2"), "rs0", MemberState::RS_PRIMARY, oldElection, staleOpTime); + ASSERT_NO_ACTION(nextAction.getAction()); + + getTopoCoord().updateTerm(2, now()); + + // Set that host3 is the new primary. + nextAction = receiveUpHeartbeat( + HostAndPort("host3"), "rs0", MemberState::RS_PRIMARY, curElection, freshOpTime); + ASSERT_NO_ACTION(nextAction.getAction()); + + startCapturingLogMessages(); + ASSERT(getTopoCoord().shouldChangeSyncSourceOnError( + HostAndPort("host2"), + staleOpTime, // lastOpTimeFetched so that we are behind host2 and host3 + now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); + ASSERT_EQUALS(1, countLogLinesWithId(3962100)); +} + +TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenNotSyncingFromPrimaryChainingDisabledAndTwoPrimaries) { // In this test, the TopologyCoordinator should not tell us to change sync sources away from // "host2" since, though "host3" is the new primary, "host2" also thinks it is primary. @@ -4304,6 +4415,33 @@ TEST_F(HeartbeatResponseTestV1, ShouldntChangeSyncSourceWhenChainingDisabledAndW now())); } +TEST_F(HeartbeatResponseTestV1, + ShouldntChangeSyncSourceOnErrorWhenChainingDisabledAndWeArePrimary) { + updateConfig(BSON("_id" + << "rs0" + << "version" << 5 << "term" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "host1:27017") + << BSON("_id" << 1 << "host" + << "host2:27017") + << BSON("_id" << 2 << "host" + << "host3:27017")) + << "protocolVersion" << 1 << "settings" + << BSON("heartbeatTimeoutSecs" << 5 << "chainingAllowed" << false)), + 0); + + OpTime staleOpTime = OpTime(Timestamp(4, 0), 0); + OpTime freshOpTime = OpTime(Timestamp(5, 0), 0); + + // Set that we are primary. + makeSelfPrimary(); + + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSourceOnError( + HostAndPort("host2"), + staleOpTime, // lastOpTimeFetched so that we are behind host2 + now())); +} + class ReevalSyncSourceTest : public TopoCoordTest { public: virtual void setUp() { diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp index b178e9d1972..53e95a0739c 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.cpp +++ b/src/mongo/embedded/replication_coordinator_embedded.cpp @@ -485,7 +485,12 @@ ChangeSyncSourceAction ReplicationCoordinatorEmbedded::shouldChangeSyncSource( const rpc::ReplSetMetadata&, const rpc::OplogQueryMetadata&, const OpTime&, - const OpTime&) { + const OpTime&) const { + UASSERT_NOT_IMPLEMENTED; +} + +ChangeSyncSourceAction ReplicationCoordinatorEmbedded::shouldChangeSyncSourceOnError( + const HostAndPort&, const OpTime&) const { UASSERT_NOT_IMPLEMENTED; } diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h index 9a6bad99f28..b6c5bfe1c62 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.h +++ b/src/mongo/embedded/replication_coordinator_embedded.h @@ -255,7 +255,10 @@ public: const rpc::ReplSetMetadata&, const rpc::OplogQueryMetadata&, const repl::OpTime&, - const repl::OpTime&) override; + const repl::OpTime&) const override; + + repl::ChangeSyncSourceAction shouldChangeSyncSourceOnError(const HostAndPort&, + const repl::OpTime&) const override; repl::OpTime getLastCommittedOpTime() const override; |