diff options
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 94 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 138 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_mock.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_mock.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 25 |
8 files changed, 153 insertions, 191 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index f3d8332a441..f11d19ffdd5 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -529,18 +529,17 @@ void BackgroundSync::_produce() { _replicationCoordinatorExternalState->getOplogFetcherSteadyStateMaxFetcherRestarts(); auto oplogFetcherPtr = std::make_unique<OplogFetcher>( _replicationCoordinatorExternalState->getTaskExecutor(), - lastOpTimeFetched, - source, - _replCoord->getConfig(), std::make_unique<OplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts), - syncSourceResp.rbid, - true /* requireFresherSyncSource */, &dataReplicatorExternalState, [this](const auto& a1, const auto& a2, const auto& a3) { return this->_enqueueDocuments(a1, a2, a3); }, onOplogFetcherShutdownCallbackFn, - bgSyncOplogFetcherBatchSize); + OplogFetcher::Config(lastOpTimeFetched, + source, + _replCoord->getConfig(), + syncSourceResp.rbid, + bgSyncOplogFetcherBatchSize)); stdx::lock_guard<Latch> lock(_mutex); if (_state != ProducerState::Running) { return; diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index daa709848f7..0928e30a1f3 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -1177,15 +1177,18 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> } const auto& config = configResult.getValue(); - _oplogFetcher = (*_createOplogFetcherFn)( - *_attemptExec, + OplogFetcher::Config oplogFetcherConfig( beginFetchingOpTime, _syncSource, config, + _rollbackChecker->getBaseRBID(), + initialSyncOplogFetcherBatchSize, + OplogFetcher::RequireFresherSyncSource::kDontRequireFresherSyncSource); + oplogFetcherConfig.startingPoint = OplogFetcher::StartingPoint::kEnqueueFirstDoc; + _oplogFetcher = (*_createOplogFetcherFn)( + *_attemptExec, std::make_unique<OplogFetcherRestartDecisionInitialSyncer>( _sharedData.get(), _opts.oplogFetcherMaxFetcherRestarts), - _rollbackChecker->getBaseRBID(), - false /* requireFresherSyncSource */, _dataReplicatorExternalState.get(), [=](OplogFetcher::Documents::const_iterator first, OplogFetcher::Documents::const_iterator last, @@ -1193,8 +1196,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> return _enqueueDocuments(first, last, info); }, [=](const Status& s, int rbid) { _oplogFetcherCallback(s, onCompletionGuard); }, - initialSyncOplogFetcherBatchSize, - OplogFetcher::StartingPoint::kEnqueueFirstDoc); + std::move(oplogFetcherConfig)); LOGV2_DEBUG(21178, 2, diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 153bb71cb9c..3eb5a13583a 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -176,39 +176,23 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( } OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - StartingPoint startingPoint, - BSONObj filter, - ReadConcernArgs readConcern, - bool requestResumeToken, - StringData name) - : AbstractAsyncComponent(executor, name.toString()), - _source(source), - _requiredRBID(requiredRBID), + Config config) + : AbstractAsyncComponent(executor, config.name), + _receivedRBID(config.requiredRBID), _oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)), _onShutdownCallbackFn(onShutdownCallbackFn), - _lastFetched(lastFetched), + _lastFetched(config.initialLastFetched), _createClientFn( [] { return std::make_unique<DBClientConnection>(true /* autoReconnect */); }), - _requireFresherSyncSource(requireFresherSyncSource), _dataReplicatorExternalState(dataReplicatorExternalState), _enqueueDocumentsFn(enqueueDocumentsFn), - _awaitDataTimeout(calculateAwaitDataTimeout(config)), - _batchSize(batchSize), - _startingPoint(startingPoint), - _queryFilter(filter), - _queryReadConcern(readConcern), - _requestResumeToken(requestResumeToken) { - invariant(config.isInitialized()); + _awaitDataTimeout(calculateAwaitDataTimeout(config.replSetConfig)), + _config(std::move(config)) { + invariant(_config.replSetConfig.isInitialized()); invariant(!_lastFetched.isNull()); invariant(onShutdownCallbackFn); invariant(enqueueDocumentsFn); @@ -257,7 +241,7 @@ std::string OplogFetcher::toString() { str::stream output; output << "OplogFetcher -"; output << " last optime fetched: " << _lastFetched.toString(); - output << " source: " << _source.toString(); + output << " source: " << _config.source.toString(); output << " namespace: " << _nss.toString(); output << " active: " << _isActive_inlock(); output << " shutting down?:" << _isShuttingDown_inlock(); @@ -328,7 +312,7 @@ void OplogFetcher::_finishCallback(Status status) { str::stream() << "Got error: \"" << status.toString() << "\" while oplog fetcher is shutting down"); } - _onShutdownCallbackFn(status, _requiredRBID); + _onShutdownCallbackFn(status, _receivedRBID); decltype(_onShutdownCallbackFn) onShutdownCallbackFn; decltype(_oplogFetcherRestartDecision) oplogFetcherRestartDecision; @@ -452,12 +436,12 @@ Status OplogFetcher::_connect() { "error"_attr = connectStatus); _conn->checkConnection(); } else { - uassertStatusOK(_conn->connect(_source, "OplogFetcher")); + uassertStatusOK(_conn->connect(_config.source, "OplogFetcher")); } uassertStatusOK(replAuthenticate(_conn.get()) .withContext(str::stream() << "OplogFetcher failed to authenticate to " - << _source)); + << _config.source)); // Reset any state needed to track restarts on successful connection. _oplogFetcherRestartDecision->fetchSuccessful(this); return Status::OK(); @@ -505,14 +489,15 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const { BSONObjBuilder filterBob(queryBob.subobjStart("query")); filterBob.append("ts", BSON("$gte" << lastOpTimeFetched.getTimestamp())); // Handle caller-provided filter. - if (!_queryFilter.isEmpty()) { + if (!_config.queryFilter.isEmpty()) { filterBob.append( - "$or", BSON_ARRAY(_queryFilter << BSON("ts" << lastOpTimeFetched.getTimestamp()))); + "$or", + BSON_ARRAY(_config.queryFilter << BSON("ts" << lastOpTimeFetched.getTimestamp()))); } filterBob.done(); queryBob.append("$maxTimeMS", findTimeout); - if (_requestResumeToken) { + if (_config.requestResumeToken) { queryBob.append("$hint", BSON("$natural" << 1)); queryBob.append("$_requestResumeToken", true); } @@ -524,7 +509,7 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const { queryBob.append("term", term); } - if (_queryReadConcern.isEmpty()) { + if (_config.queryReadConcern.isEmpty()) { // This ensures that the sync source waits for all earlier oplog writes to be visible. // Since Timestamp(0, 0) isn't allowed, Timestamp(0, 1) is the minimal we can use. queryBob.append("readConcern", @@ -533,7 +518,7 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const { << "afterClusterTime" << Timestamp(0, 1))); } else { // Caller-provided read concern. - queryBob.appendElements(_queryReadConcern.toBSON()); + queryBob.appendElements(_config.queryReadConcern.toBSON()); } return queryBob.obj(); @@ -556,7 +541,7 @@ void OplogFetcher::_createNewCursor(bool initialFind) { nullptr /* fieldsToReturn */, QueryOption_CursorTailable | QueryOption_AwaitData | (oplogFetcherUsesExhaust ? QueryOption_Exhaust : 0), - _batchSize); + _config.batchSize); _firstBatch = true; @@ -576,8 +561,8 @@ StatusWith<OplogFetcher::Documents> OplogFetcher::_getNextBatch() { if (!_cursor->init()) { _cursor.reset(); return {ErrorCodes::InvalidSyncSource, - str::stream() - << "Oplog fetcher could not create cursor on source: " << _source}; + str::stream() << "Oplog fetcher could not create cursor on source: " + << _config.source}; } // This will also set maxTimeMS on the generated getMore command. @@ -591,8 +576,8 @@ StatusWith<OplogFetcher::Documents> OplogFetcher::_getNextBatch() { // Due to a bug in DBClientCursor, it actually uses batchSize 2 if the given batchSize // is 1 for the find command. So if the given batchSize is 1, we need to set it // explicitly for getMores. - if (_batchSize == 1) { - _cursor->setBatchSize(_batchSize); + if (_config.batchSize == 1) { + _cursor->setBatchSize(_config.batchSize); } } else { auto lastCommittedWithCurrentTerm = @@ -690,7 +675,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { "invalid oplog query metadata from sync source {syncSource}: " "{error}: {metadata}", "Invalid oplog query metadata from sync source", - "syncSource"_attr = _source, + "syncSource"_attr = _config.source, "error"_attr = oqMetadataResult.getStatus(), "metadata"_attr = _metadataObj); return oqMetadataResult.getStatus(); @@ -709,7 +694,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { 1, "oplog fetcher successfully fetched from {syncSource}", "Oplog fetcher successfully fetched from sync source", - "syncSource"_attr = _source); + "syncSource"_attr = _config.source); // We do not always enqueue the first document. We elect to skip it for the following // reasons: @@ -724,13 +709,13 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { // happens on the first batch when we always accept a document with the previous // fetched timestamp. - if (_startingPoint == StartingPoint::kSkipFirstDoc) { + if (_config.startingPoint == StartingPoint::kSkipFirstDoc) { firstDocToApply++; - } else if (!_queryFilter.isEmpty()) { + } else if (!_config.queryFilter.isEmpty()) { auto opCtx = cc().makeOperationContext(); auto expCtx = make_intrusive<ExpressionContext>(opCtx.get(), nullptr /* collator */, _nss); - Matcher m(_queryFilter, expCtx); + Matcher m(_config.queryFilter, expCtx); if (!m.matches(*firstDocToApply)) firstDocToApply++; } @@ -740,7 +725,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { auto previousOpTimeFetched = _getLastOpTimeFetched(); auto validateResult = OplogFetcher::validateDocuments( - documents, _firstBatch, previousOpTimeFetched.getTimestamp(), _startingPoint); + documents, _firstBatch, previousOpTimeFetched.getTimestamp(), _config.startingPoint); if (!validateResult.isOK()) { return validateResult.getStatus(); } @@ -757,7 +742,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { "invalid replication metadata from sync source {syncSource}: " "{error}: {metadata}", "Invalid replication metadata from sync source", - "syncSource"_attr = _source, + "syncSource"_attr = _config.source, "error"_attr = metadataResult.getStatus(), "metadata"_attr = _metadataObj); return metadataResult.getStatus(); @@ -766,9 +751,9 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { // Determine if we should stop syncing from our current sync source. auto changeSyncSourceAction = _dataReplicatorExternalState->shouldStopFetching( - _source, replSetMetadata, oqMetadata, previousOpTimeFetched, lastDocOpTime); + _config.source, replSetMetadata, oqMetadata, previousOpTimeFetched, lastDocOpTime); str::stream errMsg; - errMsg << "sync source " << _source.toString(); + errMsg << "sync source " << _config.source.toString(); errMsg << " (config version: " << replSetMetadata.getConfigVersion(); errMsg << "; last applied optime: " << oqMetadata.getLastOpApplied().toString(); errMsg << "; sync source index: " << oqMetadata.getSyncSourceIndex(); @@ -815,7 +800,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { // Start skipping the first doc after at least one doc has been enqueued in the lifetime // of this fetcher. - _startingPoint = StartingPoint::kSkipFirstDoc; + _config.startingPoint = StartingPoint::kSkipFirstDoc; // We have now processed the batch. We should only move forward our view of _lastFetched if the // batch was not empty. @@ -843,15 +828,15 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum // upstream node hasn't rolled back since that could cause it to not have our required minValid // point. The cursor will be killed if the upstream node rolls back so we don't need to keep // checking once the cursor is established. If we do not use rollback-via-refetch, this check is - // not necessary, and _requiredRBID will be set to kUninitializedRollbackId in that case. - if (_requiredRBID != ReplicationProcess::kUninitializedRollbackId && - remoteRBID != _requiredRBID) { + // not necessary, and _config.requiredRBID will be set to kUninitializedRollbackId in that case. + if (_config.requiredRBID != ReplicationProcess::kUninitializedRollbackId && + remoteRBID != _config.requiredRBID) { return Status(ErrorCodes::InvalidSyncSource, "Upstream node rolled back after choosing it as a sync source. Choosing " "new sync source."); } - // Set _requiredRBID to remoteRBID so that it can be returned when the oplog fetcher shuts down. - _requiredRBID = remoteRBID; + // Set _receivedRBID to remoteRBID so that it can be returned when the oplog fetcher shuts down. + _receivedRBID = remoteRBID; // Sometimes our remoteLastOpApplied may be stale; if we received a document with an // opTime later than remoteLastApplied, we can assume the remote is at least up to that @@ -867,7 +852,7 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum // The sync source could be behind us if it rolled back after we selected it. We could have // failed to detect the rollback if it occurred between sync source selection (when we check the - // candidate is ahead of us) and sync source resolution (when we got '_requiredRBID'). If the + // candidate is ahead of us) and sync source resolution (when we got '_receivedRBID'). If the // sync source is now behind us, choose a new sync source to prevent going into rollback. if (remoteLastOpApplied < lastFetched) { return Status(ErrorCodes::InvalidSyncSource, @@ -885,7 +870,8 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum // problematic to check this condition for initial sync, since the 'lastFetched' OpTime will // almost always equal the 'remoteLastApplied', since we fetch the sync source's last applied // OpTime to determine where to start our OplogFetcher. - if (_requireFresherSyncSource && remoteLastOpApplied <= lastFetched) { + if (_config.requireFresherSyncSource == RequireFresherSyncSource::kRequireFresherSyncSource && + remoteLastOpApplied <= lastFetched) { return Status(ErrorCodes::InvalidSyncSource, "Sync source must be ahead of me. My last fetched oplog optime: {}, latest " "oplog optime of sync source: {}"_format(lastFetched.toString(), diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index e7fbe964f5d..d7eaeec7651 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -164,25 +164,74 @@ public: const std::size_t _maxRestarts; }; + enum class RequireFresherSyncSource { + kDontRequireFresherSyncSource, + kRequireFresherSyncSource + }; + + struct Config { + Config(OpTime initialLastFetchedIn, + HostAndPort sourceIn, + ReplSetConfig replSetConfigIn, + int requiredRBIDIn, + int batchSizeIn, + RequireFresherSyncSource requireFresherSyncSourceIn = + RequireFresherSyncSource::kRequireFresherSyncSource) + : initialLastFetched(initialLastFetchedIn), + source(sourceIn), + replSetConfig(replSetConfigIn), + requiredRBID(requiredRBIDIn), + batchSize(batchSizeIn), + requireFresherSyncSource(requireFresherSyncSourceIn) {} + // The OpTime, last oplog entry fetched in a previous run, or the optime to start fetching + // from, depending on the startingPoint (below.). If the startingPoint is kSkipFirstDoc, + // this entry will be verified to exist, then discarded. If it is kEnqueueFirstDoc, it will + // be sent to the enqueue function with the first batch. + OpTime initialLastFetched; + + // Sync source to read from. + HostAndPort source; + + ReplSetConfig replSetConfig; + + // Rollback ID that the sync source is required to have after the first batch. If the value + // is uninitialized, the oplog fetcher has not contacted the sync source yet. + int requiredRBID; + + int batchSize; + + // A flag indicating whether we should error if the sync source is not ahead of our initial + // last fetched OpTime on the first batch. Most of the time this should be set to + // kRequireFresherSyncSource, but there are certain special cases where it's acceptable for + // our sync source to have no ops newer than _lastFetched. + RequireFresherSyncSource requireFresherSyncSource; + + // Predicate with additional filtering to be done on oplog entries. + BSONObj queryFilter = BSONObj(); + + // Read concern to use for reading the oplog. Empty read concern means we use a default + // of "afterClusterTime: Timestamp(0,1)". + ReadConcernArgs queryReadConcern = ReadConcernArgs(); + + // Indicates if we want to skip the first document during oplog fetching or not. + StartingPoint startingPoint = StartingPoint::kSkipFirstDoc; + + // Specifies if the oplog fetcher should request a resume token and provide it to + // _enqueueDocumentsFn. + bool requestResumeToken = false; + + std::string name = "oplog fetcher"; + }; + /** * Invariants if validation fails on any of the provided arguments. */ OplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - StartingPoint startingPoint = StartingPoint::kSkipFirstDoc, - BSONObj filter = BSONObj(), - ReadConcernArgs readConcern = ReadConcernArgs(), - bool requestResumeToken = false, - StringData name = "oplog fetcher"_sd); + Config config); virtual ~OplogFetcher(); @@ -384,15 +433,12 @@ private: // Protects member data of this OplogFetcher. mutable Mutex _mutex = MONGO_MAKE_LATCH("OplogFetcher::_mutex"); - // Sync source to read from. - const HostAndPort _source; - // Namespace of the oplog to read. const NamespaceString _nss = NamespaceString::kRsOplogNamespace; - // Rollback ID that the sync source is required to have after the first batch. If the value is - // uninitialized, the oplog fetcher has not contacted the sync source yet. - int _requiredRBID; + // Rollback ID that the sync source had after the first batch. Initialized from + // the requiredRBID in the OplogFetcher::Config and passed to the onShutdown callback. + int _receivedRBID; // Indicates whether the current batch is the first received via this cursor. bool _firstBatch = true; @@ -427,30 +473,10 @@ private: // shouldContinue function, a new cursor will be created or the oplog fetcher will shut down. std::unique_ptr<DBClientCursor> _cursor; - // A boolean indicating whether we should error if the sync source is not ahead of our initial - // last fetched OpTime on the first batch. Most of the time this should be set to true, - // but there are certain special cases, namely during initial sync, where it's acceptable for - // our sync source to have no ops newer than _lastFetched. - bool _requireFresherSyncSource; - DataReplicatorExternalState* const _dataReplicatorExternalState; const EnqueueDocumentsFn _enqueueDocumentsFn; const Milliseconds _awaitDataTimeout; - const int _batchSize; - - // Indicates if we want to skip the first document during oplog fetching or not. - StartingPoint _startingPoint; - - // Predicate with additional filtering to be done on oplog entries. - BSONObj _queryFilter; - - // Read concern to use for reading the oplog. Empty read concern means we use a default - // of "afterClusterTime: Timestamp(0,1)". - ReadConcernArgs _queryReadConcern; - - // Specifies if the oplog fetcher should request a resume token and provide it to - // _enqueueDocumentsFn. - const bool _requestResumeToken; + Config _config; // Handle to currently scheduled _runQuery task. executor::TaskExecutor::CallbackHandle _runQueryHandle; @@ -463,21 +489,11 @@ public: virtual ~OplogFetcherFactory() = default; virtual std::unique_ptr<OplogFetcher> operator()( executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn, OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc, - BSONObj filter = BSONObj(), - ReadConcernArgs readConcern = ReadConcernArgs(), - bool requestResumeToken = false, - StringData name = "oplog fetcher"_sd) const = 0; + OplogFetcher::Config config) const = 0; }; template <class T> @@ -485,37 +501,17 @@ class OplogFetcherFactoryImpl : public OplogFetcherFactory { public: std::unique_ptr<OplogFetcher> operator()( executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn, OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc, - BSONObj filter = BSONObj(), - ReadConcernArgs readConcern = ReadConcernArgs(), - bool requestResumeToken = false, - StringData name = "oplog_fetcher"_sd) const final { + OplogFetcher::Config config) const final { return std::make_unique<T>(executor, - lastFetched, - source, - config, std::move(oplogFetcherRestartDecision), - requiredRBID, - requireFresherSyncSource, dataReplicatorExternalState, std::move(enqueueDocumentsFn), std::move(onShutdownCallbackFn), - batchSize, - startingPoint, - std::move(filter), - std::move(readConcern), - requestResumeToken, - name); + std::move(config)); } static std::unique_ptr<OplogFetcherFactory> get() { diff --git a/src/mongo/db/repl/oplog_fetcher_mock.cpp b/src/mongo/db/repl/oplog_fetcher_mock.cpp index 79ac007a896..afa02f75931 100644 --- a/src/mongo/db/repl/oplog_fetcher_mock.cpp +++ b/src/mongo/db/repl/oplog_fetcher_mock.cpp @@ -38,45 +38,25 @@ namespace mongo { namespace repl { OplogFetcherMock::OplogFetcherMock( executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - StartingPoint startingPoint, - BSONObj filter, - ReadConcernArgs readConcern, - bool requestResumeToken, - StringData name) + Config config) : OplogFetcher(executor, - lastFetched, - std::move(source), - std::move(config), // Pass a dummy OplogFetcherRestartDecision to the base OplogFetcher. std::make_unique<OplogFetcherRestartDecisionDefault>(0), - requiredRBID, - requireFresherSyncSource, dataReplicatorExternalState, // Pass a dummy EnqueueDocumentsFn to the base OplogFetcher. [](const auto& a1, const auto& a2, const auto& a3) { return Status::OK(); }, // Pass a dummy OnShutdownCallbackFn to the base OplogFetcher. [](const auto& a, const int b) {}, - batchSize, - startingPoint, - filter, - readConcern, - requestResumeToken, - name), + config), _oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)), _onShutdownCallbackFn(std::move(onShutdownCallbackFn)), _enqueueDocumentsFn(std::move(enqueueDocumentsFn)), - _startingPoint(startingPoint), - _lastFetched(lastFetched) {} + _startingPoint(config.startingPoint), + _lastFetched(config.initialLastFetched) {} OplogFetcherMock::~OplogFetcherMock() { shutdown(); diff --git a/src/mongo/db/repl/oplog_fetcher_mock.h b/src/mongo/db/repl/oplog_fetcher_mock.h index 24a9a4f997f..80dd2bbf742 100644 --- a/src/mongo/db/repl/oplog_fetcher_mock.h +++ b/src/mongo/db/repl/oplog_fetcher_mock.h @@ -38,21 +38,11 @@ class OplogFetcherMock : public OplogFetcher { public: explicit OplogFetcherMock( executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - StartingPoint startingPoint = StartingPoint::kSkipFirstDoc, - BSONObj filter = BSONObj(), - ReadConcernArgs readConcern = ReadConcernArgs(), - bool requestResumeToken = false, - StringData name = "oplog fetcher"_sd); + Config config); virtual ~OplogFetcherMock(); diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 33159b26e43..e4a43936c08 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -458,22 +458,26 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe BSONObj filter, ReadConcernArgs readConcern, bool requestResumeToken) { - auto oplogFetcher = std::make_unique<OplogFetcher>( - executor, + OplogFetcher::Config oplogFetcherConfig( lastFetched, source, _createConfig(), - std::make_unique<OplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts), requiredRBID, - requireFresherSyncSource, + defaultBatchSize, + requireFresherSyncSource + ? OplogFetcher::RequireFresherSyncSource::kRequireFresherSyncSource + : OplogFetcher::RequireFresherSyncSource::kDontRequireFresherSyncSource); + oplogFetcherConfig.startingPoint = startingPoint; + oplogFetcherConfig.queryFilter = filter; + oplogFetcherConfig.queryReadConcern = readConcern; + oplogFetcherConfig.requestResumeToken = requestResumeToken; + auto oplogFetcher = std::make_unique<OplogFetcher>( + executor, + std::make_unique<OplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts), dataReplicatorExternalState.get(), enqueueDocumentsFn, fn, - defaultBatchSize, - startingPoint, - filter, - readConcern, - requestResumeToken); + std::move(oplogFetcherConfig)); oplogFetcher->setCreateClientFn_forTest([this]() { const auto autoReconnect = true; return std::unique_ptr<DBClientConnection>( diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 30309958a6a..93bb1bc2c90 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -514,18 +514,28 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { StorageInterface::get(opCtx.get()), oplogBufferNs, options); _donorOplogBuffer->startup(opCtx.get()); _dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateTenantMigration>(); - _donorOplogFetcher = (*_createOplogFetcherFn)( - (**_scopedExecutor).get(), + OplogFetcher::Config oplogFetcherConfig( *_stateDoc.getStartFetchingDonorOpTime(), _oplogFetcherClient->getServerHostAndPort(), // The config is only used for setting the awaitData timeout; the defaults are fine. ReplSetConfig::parse(BSON("_id" << "dummy" << "version" << 1 << "members" << BSONObj())), - std::make_unique<OplogFetcherRestartDecisionTenantMigration>(), // We do not need to check the rollback ID. ReplicationProcess::kUninitializedRollbackId, - false /* requireFresherSyncSource */, + tenantMigrationOplogFetcherBatchSize, + OplogFetcher::RequireFresherSyncSource::kDontRequireFresherSyncSource); + oplogFetcherConfig.queryFilter = _getOplogFetcherFilter(); + oplogFetcherConfig.queryReadConcern = + ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern); + oplogFetcherConfig.requestResumeToken = true; + oplogFetcherConfig.name = + "TenantOplogFetcher_" + getTenantId() + "_" + getMigrationUUID().toString(); + oplogFetcherConfig.startingPoint = OplogFetcher::StartingPoint::kEnqueueFirstDoc; + + _donorOplogFetcher = (*_createOplogFetcherFn)( + (**_scopedExecutor).get(), + std::make_unique<OplogFetcherRestartDecisionTenantMigration>(), _dataReplicatorExternalState.get(), [this, self = shared_from_this()](OplogFetcher::Documents::const_iterator first, OplogFetcher::Documents::const_iterator last, @@ -533,12 +543,7 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { return _enqueueDocuments(first, last, info); }, [this, self = shared_from_this()](const Status& s, int rbid) { _oplogFetcherCallback(s); }, - tenantMigrationOplogFetcherBatchSize, - OplogFetcher::StartingPoint::kEnqueueFirstDoc, - _getOplogFetcherFilter(), - ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern), - true /* requestResumeToken */, - "TenantOplogFetcher_" + getTenantId() + "_" + getMigrationUUID().toString()); + std::move(oplogFetcherConfig)); _donorOplogFetcher->setConnection(std::move(_oplogFetcherClient)); uassertStatusOK(_donorOplogFetcher->startup()); } |