summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/bgsync.cpp11
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp14
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp94
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h138
-rw-r--r--src/mongo/db/repl/oplog_fetcher_mock.cpp28
-rw-r--r--src/mongo/db/repl/oplog_fetcher_mock.h12
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp22
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp25
8 files changed, 153 insertions, 191 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index f3d8332a441..f11d19ffdd5 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -529,18 +529,17 @@ void BackgroundSync::_produce() {
_replicationCoordinatorExternalState->getOplogFetcherSteadyStateMaxFetcherRestarts();
auto oplogFetcherPtr = std::make_unique<OplogFetcher>(
_replicationCoordinatorExternalState->getTaskExecutor(),
- lastOpTimeFetched,
- source,
- _replCoord->getConfig(),
std::make_unique<OplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts),
- syncSourceResp.rbid,
- true /* requireFresherSyncSource */,
&dataReplicatorExternalState,
[this](const auto& a1, const auto& a2, const auto& a3) {
return this->_enqueueDocuments(a1, a2, a3);
},
onOplogFetcherShutdownCallbackFn,
- bgSyncOplogFetcherBatchSize);
+ OplogFetcher::Config(lastOpTimeFetched,
+ source,
+ _replCoord->getConfig(),
+ syncSourceResp.rbid,
+ bgSyncOplogFetcherBatchSize));
stdx::lock_guard<Latch> lock(_mutex);
if (_state != ProducerState::Running) {
return;
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index daa709848f7..0928e30a1f3 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -1177,15 +1177,18 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
}
const auto& config = configResult.getValue();
- _oplogFetcher = (*_createOplogFetcherFn)(
- *_attemptExec,
+ OplogFetcher::Config oplogFetcherConfig(
beginFetchingOpTime,
_syncSource,
config,
+ _rollbackChecker->getBaseRBID(),
+ initialSyncOplogFetcherBatchSize,
+ OplogFetcher::RequireFresherSyncSource::kDontRequireFresherSyncSource);
+ oplogFetcherConfig.startingPoint = OplogFetcher::StartingPoint::kEnqueueFirstDoc;
+ _oplogFetcher = (*_createOplogFetcherFn)(
+ *_attemptExec,
std::make_unique<OplogFetcherRestartDecisionInitialSyncer>(
_sharedData.get(), _opts.oplogFetcherMaxFetcherRestarts),
- _rollbackChecker->getBaseRBID(),
- false /* requireFresherSyncSource */,
_dataReplicatorExternalState.get(),
[=](OplogFetcher::Documents::const_iterator first,
OplogFetcher::Documents::const_iterator last,
@@ -1193,8 +1196,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
return _enqueueDocuments(first, last, info);
},
[=](const Status& s, int rbid) { _oplogFetcherCallback(s, onCompletionGuard); },
- initialSyncOplogFetcherBatchSize,
- OplogFetcher::StartingPoint::kEnqueueFirstDoc);
+ std::move(oplogFetcherConfig));
LOGV2_DEBUG(21178,
2,
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 153bb71cb9c..3eb5a13583a 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -176,39 +176,23 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
}
OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- StartingPoint startingPoint,
- BSONObj filter,
- ReadConcernArgs readConcern,
- bool requestResumeToken,
- StringData name)
- : AbstractAsyncComponent(executor, name.toString()),
- _source(source),
- _requiredRBID(requiredRBID),
+ Config config)
+ : AbstractAsyncComponent(executor, config.name),
+ _receivedRBID(config.requiredRBID),
_oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)),
_onShutdownCallbackFn(onShutdownCallbackFn),
- _lastFetched(lastFetched),
+ _lastFetched(config.initialLastFetched),
_createClientFn(
[] { return std::make_unique<DBClientConnection>(true /* autoReconnect */); }),
- _requireFresherSyncSource(requireFresherSyncSource),
_dataReplicatorExternalState(dataReplicatorExternalState),
_enqueueDocumentsFn(enqueueDocumentsFn),
- _awaitDataTimeout(calculateAwaitDataTimeout(config)),
- _batchSize(batchSize),
- _startingPoint(startingPoint),
- _queryFilter(filter),
- _queryReadConcern(readConcern),
- _requestResumeToken(requestResumeToken) {
- invariant(config.isInitialized());
+ _awaitDataTimeout(calculateAwaitDataTimeout(config.replSetConfig)),
+ _config(std::move(config)) {
+ invariant(_config.replSetConfig.isInitialized());
invariant(!_lastFetched.isNull());
invariant(onShutdownCallbackFn);
invariant(enqueueDocumentsFn);
@@ -257,7 +241,7 @@ std::string OplogFetcher::toString() {
str::stream output;
output << "OplogFetcher -";
output << " last optime fetched: " << _lastFetched.toString();
- output << " source: " << _source.toString();
+ output << " source: " << _config.source.toString();
output << " namespace: " << _nss.toString();
output << " active: " << _isActive_inlock();
output << " shutting down?:" << _isShuttingDown_inlock();
@@ -328,7 +312,7 @@ void OplogFetcher::_finishCallback(Status status) {
str::stream() << "Got error: \"" << status.toString()
<< "\" while oplog fetcher is shutting down");
}
- _onShutdownCallbackFn(status, _requiredRBID);
+ _onShutdownCallbackFn(status, _receivedRBID);
decltype(_onShutdownCallbackFn) onShutdownCallbackFn;
decltype(_oplogFetcherRestartDecision) oplogFetcherRestartDecision;
@@ -452,12 +436,12 @@ Status OplogFetcher::_connect() {
"error"_attr = connectStatus);
_conn->checkConnection();
} else {
- uassertStatusOK(_conn->connect(_source, "OplogFetcher"));
+ uassertStatusOK(_conn->connect(_config.source, "OplogFetcher"));
}
uassertStatusOK(replAuthenticate(_conn.get())
.withContext(str::stream()
<< "OplogFetcher failed to authenticate to "
- << _source));
+ << _config.source));
// Reset any state needed to track restarts on successful connection.
_oplogFetcherRestartDecision->fetchSuccessful(this);
return Status::OK();
@@ -505,14 +489,15 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const {
BSONObjBuilder filterBob(queryBob.subobjStart("query"));
filterBob.append("ts", BSON("$gte" << lastOpTimeFetched.getTimestamp()));
// Handle caller-provided filter.
- if (!_queryFilter.isEmpty()) {
+ if (!_config.queryFilter.isEmpty()) {
filterBob.append(
- "$or", BSON_ARRAY(_queryFilter << BSON("ts" << lastOpTimeFetched.getTimestamp())));
+ "$or",
+ BSON_ARRAY(_config.queryFilter << BSON("ts" << lastOpTimeFetched.getTimestamp())));
}
filterBob.done();
queryBob.append("$maxTimeMS", findTimeout);
- if (_requestResumeToken) {
+ if (_config.requestResumeToken) {
queryBob.append("$hint", BSON("$natural" << 1));
queryBob.append("$_requestResumeToken", true);
}
@@ -524,7 +509,7 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const {
queryBob.append("term", term);
}
- if (_queryReadConcern.isEmpty()) {
+ if (_config.queryReadConcern.isEmpty()) {
// This ensures that the sync source waits for all earlier oplog writes to be visible.
// Since Timestamp(0, 0) isn't allowed, Timestamp(0, 1) is the minimal we can use.
queryBob.append("readConcern",
@@ -533,7 +518,7 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const {
<< "afterClusterTime" << Timestamp(0, 1)));
} else {
// Caller-provided read concern.
- queryBob.appendElements(_queryReadConcern.toBSON());
+ queryBob.appendElements(_config.queryReadConcern.toBSON());
}
return queryBob.obj();
@@ -556,7 +541,7 @@ void OplogFetcher::_createNewCursor(bool initialFind) {
nullptr /* fieldsToReturn */,
QueryOption_CursorTailable | QueryOption_AwaitData |
(oplogFetcherUsesExhaust ? QueryOption_Exhaust : 0),
- _batchSize);
+ _config.batchSize);
_firstBatch = true;
@@ -576,8 +561,8 @@ StatusWith<OplogFetcher::Documents> OplogFetcher::_getNextBatch() {
if (!_cursor->init()) {
_cursor.reset();
return {ErrorCodes::InvalidSyncSource,
- str::stream()
- << "Oplog fetcher could not create cursor on source: " << _source};
+ str::stream() << "Oplog fetcher could not create cursor on source: "
+ << _config.source};
}
// This will also set maxTimeMS on the generated getMore command.
@@ -591,8 +576,8 @@ StatusWith<OplogFetcher::Documents> OplogFetcher::_getNextBatch() {
// Due to a bug in DBClientCursor, it actually uses batchSize 2 if the given batchSize
// is 1 for the find command. So if the given batchSize is 1, we need to set it
// explicitly for getMores.
- if (_batchSize == 1) {
- _cursor->setBatchSize(_batchSize);
+ if (_config.batchSize == 1) {
+ _cursor->setBatchSize(_config.batchSize);
}
} else {
auto lastCommittedWithCurrentTerm =
@@ -690,7 +675,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
"invalid oplog query metadata from sync source {syncSource}: "
"{error}: {metadata}",
"Invalid oplog query metadata from sync source",
- "syncSource"_attr = _source,
+ "syncSource"_attr = _config.source,
"error"_attr = oqMetadataResult.getStatus(),
"metadata"_attr = _metadataObj);
return oqMetadataResult.getStatus();
@@ -709,7 +694,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
1,
"oplog fetcher successfully fetched from {syncSource}",
"Oplog fetcher successfully fetched from sync source",
- "syncSource"_attr = _source);
+ "syncSource"_attr = _config.source);
// We do not always enqueue the first document. We elect to skip it for the following
// reasons:
@@ -724,13 +709,13 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
// happens on the first batch when we always accept a document with the previous
// fetched timestamp.
- if (_startingPoint == StartingPoint::kSkipFirstDoc) {
+ if (_config.startingPoint == StartingPoint::kSkipFirstDoc) {
firstDocToApply++;
- } else if (!_queryFilter.isEmpty()) {
+ } else if (!_config.queryFilter.isEmpty()) {
auto opCtx = cc().makeOperationContext();
auto expCtx =
make_intrusive<ExpressionContext>(opCtx.get(), nullptr /* collator */, _nss);
- Matcher m(_queryFilter, expCtx);
+ Matcher m(_config.queryFilter, expCtx);
if (!m.matches(*firstDocToApply))
firstDocToApply++;
}
@@ -740,7 +725,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
auto previousOpTimeFetched = _getLastOpTimeFetched();
auto validateResult = OplogFetcher::validateDocuments(
- documents, _firstBatch, previousOpTimeFetched.getTimestamp(), _startingPoint);
+ documents, _firstBatch, previousOpTimeFetched.getTimestamp(), _config.startingPoint);
if (!validateResult.isOK()) {
return validateResult.getStatus();
}
@@ -757,7 +742,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
"invalid replication metadata from sync source {syncSource}: "
"{error}: {metadata}",
"Invalid replication metadata from sync source",
- "syncSource"_attr = _source,
+ "syncSource"_attr = _config.source,
"error"_attr = metadataResult.getStatus(),
"metadata"_attr = _metadataObj);
return metadataResult.getStatus();
@@ -766,9 +751,9 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
// Determine if we should stop syncing from our current sync source.
auto changeSyncSourceAction = _dataReplicatorExternalState->shouldStopFetching(
- _source, replSetMetadata, oqMetadata, previousOpTimeFetched, lastDocOpTime);
+ _config.source, replSetMetadata, oqMetadata, previousOpTimeFetched, lastDocOpTime);
str::stream errMsg;
- errMsg << "sync source " << _source.toString();
+ errMsg << "sync source " << _config.source.toString();
errMsg << " (config version: " << replSetMetadata.getConfigVersion();
errMsg << "; last applied optime: " << oqMetadata.getLastOpApplied().toString();
errMsg << "; sync source index: " << oqMetadata.getSyncSourceIndex();
@@ -815,7 +800,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
// Start skipping the first doc after at least one doc has been enqueued in the lifetime
// of this fetcher.
- _startingPoint = StartingPoint::kSkipFirstDoc;
+ _config.startingPoint = StartingPoint::kSkipFirstDoc;
// We have now processed the batch. We should only move forward our view of _lastFetched if the
// batch was not empty.
@@ -843,15 +828,15 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum
// upstream node hasn't rolled back since that could cause it to not have our required minValid
// point. The cursor will be killed if the upstream node rolls back so we don't need to keep
// checking once the cursor is established. If we do not use rollback-via-refetch, this check is
- // not necessary, and _requiredRBID will be set to kUninitializedRollbackId in that case.
- if (_requiredRBID != ReplicationProcess::kUninitializedRollbackId &&
- remoteRBID != _requiredRBID) {
+ // not necessary, and _config.requiredRBID will be set to kUninitializedRollbackId in that case.
+ if (_config.requiredRBID != ReplicationProcess::kUninitializedRollbackId &&
+ remoteRBID != _config.requiredRBID) {
return Status(ErrorCodes::InvalidSyncSource,
"Upstream node rolled back after choosing it as a sync source. Choosing "
"new sync source.");
}
- // Set _requiredRBID to remoteRBID so that it can be returned when the oplog fetcher shuts down.
- _requiredRBID = remoteRBID;
+ // Set _receivedRBID to remoteRBID so that it can be returned when the oplog fetcher shuts down.
+ _receivedRBID = remoteRBID;
// Sometimes our remoteLastOpApplied may be stale; if we received a document with an
// opTime later than remoteLastApplied, we can assume the remote is at least up to that
@@ -867,7 +852,7 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum
// The sync source could be behind us if it rolled back after we selected it. We could have
// failed to detect the rollback if it occurred between sync source selection (when we check the
- // candidate is ahead of us) and sync source resolution (when we got '_requiredRBID'). If the
+ // candidate is ahead of us) and sync source resolution (when we got '_receivedRBID'). If the
// sync source is now behind us, choose a new sync source to prevent going into rollback.
if (remoteLastOpApplied < lastFetched) {
return Status(ErrorCodes::InvalidSyncSource,
@@ -885,7 +870,8 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum
// problematic to check this condition for initial sync, since the 'lastFetched' OpTime will
// almost always equal the 'remoteLastApplied', since we fetch the sync source's last applied
// OpTime to determine where to start our OplogFetcher.
- if (_requireFresherSyncSource && remoteLastOpApplied <= lastFetched) {
+ if (_config.requireFresherSyncSource == RequireFresherSyncSource::kRequireFresherSyncSource &&
+ remoteLastOpApplied <= lastFetched) {
return Status(ErrorCodes::InvalidSyncSource,
"Sync source must be ahead of me. My last fetched oplog optime: {}, latest "
"oplog optime of sync source: {}"_format(lastFetched.toString(),
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index e7fbe964f5d..d7eaeec7651 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -164,25 +164,74 @@ public:
const std::size_t _maxRestarts;
};
+ enum class RequireFresherSyncSource {
+ kDontRequireFresherSyncSource,
+ kRequireFresherSyncSource
+ };
+
+ struct Config {
+ Config(OpTime initialLastFetchedIn,
+ HostAndPort sourceIn,
+ ReplSetConfig replSetConfigIn,
+ int requiredRBIDIn,
+ int batchSizeIn,
+ RequireFresherSyncSource requireFresherSyncSourceIn =
+ RequireFresherSyncSource::kRequireFresherSyncSource)
+ : initialLastFetched(initialLastFetchedIn),
+ source(sourceIn),
+ replSetConfig(replSetConfigIn),
+ requiredRBID(requiredRBIDIn),
+ batchSize(batchSizeIn),
+ requireFresherSyncSource(requireFresherSyncSourceIn) {}
+ // The OpTime, last oplog entry fetched in a previous run, or the optime to start fetching
+ // from, depending on the startingPoint (below.). If the startingPoint is kSkipFirstDoc,
+ // this entry will be verified to exist, then discarded. If it is kEnqueueFirstDoc, it will
+ // be sent to the enqueue function with the first batch.
+ OpTime initialLastFetched;
+
+ // Sync source to read from.
+ HostAndPort source;
+
+ ReplSetConfig replSetConfig;
+
+ // Rollback ID that the sync source is required to have after the first batch. If the value
+ // is uninitialized, the oplog fetcher has not contacted the sync source yet.
+ int requiredRBID;
+
+ int batchSize;
+
+ // A flag indicating whether we should error if the sync source is not ahead of our initial
+ // last fetched OpTime on the first batch. Most of the time this should be set to
+ // kRequireFresherSyncSource, but there are certain special cases where it's acceptable for
+ // our sync source to have no ops newer than _lastFetched.
+ RequireFresherSyncSource requireFresherSyncSource;
+
+ // Predicate with additional filtering to be done on oplog entries.
+ BSONObj queryFilter = BSONObj();
+
+ // Read concern to use for reading the oplog. Empty read concern means we use a default
+ // of "afterClusterTime: Timestamp(0,1)".
+ ReadConcernArgs queryReadConcern = ReadConcernArgs();
+
+ // Indicates if we want to skip the first document during oplog fetching or not.
+ StartingPoint startingPoint = StartingPoint::kSkipFirstDoc;
+
+ // Specifies if the oplog fetcher should request a resume token and provide it to
+ // _enqueueDocumentsFn.
+ bool requestResumeToken = false;
+
+ std::string name = "oplog fetcher";
+ };
+
/**
* Invariants if validation fails on any of the provided arguments.
*/
OplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- StartingPoint startingPoint = StartingPoint::kSkipFirstDoc,
- BSONObj filter = BSONObj(),
- ReadConcernArgs readConcern = ReadConcernArgs(),
- bool requestResumeToken = false,
- StringData name = "oplog fetcher"_sd);
+ Config config);
virtual ~OplogFetcher();
@@ -384,15 +433,12 @@ private:
// Protects member data of this OplogFetcher.
mutable Mutex _mutex = MONGO_MAKE_LATCH("OplogFetcher::_mutex");
- // Sync source to read from.
- const HostAndPort _source;
-
// Namespace of the oplog to read.
const NamespaceString _nss = NamespaceString::kRsOplogNamespace;
- // Rollback ID that the sync source is required to have after the first batch. If the value is
- // uninitialized, the oplog fetcher has not contacted the sync source yet.
- int _requiredRBID;
+ // Rollback ID that the sync source had after the first batch. Initialized from
+ // the requiredRBID in the OplogFetcher::Config and passed to the onShutdown callback.
+ int _receivedRBID;
// Indicates whether the current batch is the first received via this cursor.
bool _firstBatch = true;
@@ -427,30 +473,10 @@ private:
// shouldContinue function, a new cursor will be created or the oplog fetcher will shut down.
std::unique_ptr<DBClientCursor> _cursor;
- // A boolean indicating whether we should error if the sync source is not ahead of our initial
- // last fetched OpTime on the first batch. Most of the time this should be set to true,
- // but there are certain special cases, namely during initial sync, where it's acceptable for
- // our sync source to have no ops newer than _lastFetched.
- bool _requireFresherSyncSource;
-
DataReplicatorExternalState* const _dataReplicatorExternalState;
const EnqueueDocumentsFn _enqueueDocumentsFn;
const Milliseconds _awaitDataTimeout;
- const int _batchSize;
-
- // Indicates if we want to skip the first document during oplog fetching or not.
- StartingPoint _startingPoint;
-
- // Predicate with additional filtering to be done on oplog entries.
- BSONObj _queryFilter;
-
- // Read concern to use for reading the oplog. Empty read concern means we use a default
- // of "afterClusterTime: Timestamp(0,1)".
- ReadConcernArgs _queryReadConcern;
-
- // Specifies if the oplog fetcher should request a resume token and provide it to
- // _enqueueDocumentsFn.
- const bool _requestResumeToken;
+ Config _config;
// Handle to currently scheduled _runQuery task.
executor::TaskExecutor::CallbackHandle _runQueryHandle;
@@ -463,21 +489,11 @@ public:
virtual ~OplogFetcherFactory() = default;
virtual std::unique_ptr<OplogFetcher> operator()(
executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn,
OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc,
- BSONObj filter = BSONObj(),
- ReadConcernArgs readConcern = ReadConcernArgs(),
- bool requestResumeToken = false,
- StringData name = "oplog fetcher"_sd) const = 0;
+ OplogFetcher::Config config) const = 0;
};
template <class T>
@@ -485,37 +501,17 @@ class OplogFetcherFactoryImpl : public OplogFetcherFactory {
public:
std::unique_ptr<OplogFetcher> operator()(
executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn,
OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc,
- BSONObj filter = BSONObj(),
- ReadConcernArgs readConcern = ReadConcernArgs(),
- bool requestResumeToken = false,
- StringData name = "oplog_fetcher"_sd) const final {
+ OplogFetcher::Config config) const final {
return std::make_unique<T>(executor,
- lastFetched,
- source,
- config,
std::move(oplogFetcherRestartDecision),
- requiredRBID,
- requireFresherSyncSource,
dataReplicatorExternalState,
std::move(enqueueDocumentsFn),
std::move(onShutdownCallbackFn),
- batchSize,
- startingPoint,
- std::move(filter),
- std::move(readConcern),
- requestResumeToken,
- name);
+ std::move(config));
}
static std::unique_ptr<OplogFetcherFactory> get() {
diff --git a/src/mongo/db/repl/oplog_fetcher_mock.cpp b/src/mongo/db/repl/oplog_fetcher_mock.cpp
index 79ac007a896..afa02f75931 100644
--- a/src/mongo/db/repl/oplog_fetcher_mock.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_mock.cpp
@@ -38,45 +38,25 @@ namespace mongo {
namespace repl {
OplogFetcherMock::OplogFetcherMock(
executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- StartingPoint startingPoint,
- BSONObj filter,
- ReadConcernArgs readConcern,
- bool requestResumeToken,
- StringData name)
+ Config config)
: OplogFetcher(executor,
- lastFetched,
- std::move(source),
- std::move(config),
// Pass a dummy OplogFetcherRestartDecision to the base OplogFetcher.
std::make_unique<OplogFetcherRestartDecisionDefault>(0),
- requiredRBID,
- requireFresherSyncSource,
dataReplicatorExternalState,
// Pass a dummy EnqueueDocumentsFn to the base OplogFetcher.
[](const auto& a1, const auto& a2, const auto& a3) { return Status::OK(); },
// Pass a dummy OnShutdownCallbackFn to the base OplogFetcher.
[](const auto& a, const int b) {},
- batchSize,
- startingPoint,
- filter,
- readConcern,
- requestResumeToken,
- name),
+ config),
_oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)),
_onShutdownCallbackFn(std::move(onShutdownCallbackFn)),
_enqueueDocumentsFn(std::move(enqueueDocumentsFn)),
- _startingPoint(startingPoint),
- _lastFetched(lastFetched) {}
+ _startingPoint(config.startingPoint),
+ _lastFetched(config.initialLastFetched) {}
OplogFetcherMock::~OplogFetcherMock() {
shutdown();
diff --git a/src/mongo/db/repl/oplog_fetcher_mock.h b/src/mongo/db/repl/oplog_fetcher_mock.h
index 24a9a4f997f..80dd2bbf742 100644
--- a/src/mongo/db/repl/oplog_fetcher_mock.h
+++ b/src/mongo/db/repl/oplog_fetcher_mock.h
@@ -38,21 +38,11 @@ class OplogFetcherMock : public OplogFetcher {
public:
explicit OplogFetcherMock(
executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- StartingPoint startingPoint = StartingPoint::kSkipFirstDoc,
- BSONObj filter = BSONObj(),
- ReadConcernArgs readConcern = ReadConcernArgs(),
- bool requestResumeToken = false,
- StringData name = "oplog fetcher"_sd);
+ Config config);
virtual ~OplogFetcherMock();
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 33159b26e43..e4a43936c08 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -458,22 +458,26 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe
BSONObj filter,
ReadConcernArgs readConcern,
bool requestResumeToken) {
- auto oplogFetcher = std::make_unique<OplogFetcher>(
- executor,
+ OplogFetcher::Config oplogFetcherConfig(
lastFetched,
source,
_createConfig(),
- std::make_unique<OplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts),
requiredRBID,
- requireFresherSyncSource,
+ defaultBatchSize,
+ requireFresherSyncSource
+ ? OplogFetcher::RequireFresherSyncSource::kRequireFresherSyncSource
+ : OplogFetcher::RequireFresherSyncSource::kDontRequireFresherSyncSource);
+ oplogFetcherConfig.startingPoint = startingPoint;
+ oplogFetcherConfig.queryFilter = filter;
+ oplogFetcherConfig.queryReadConcern = readConcern;
+ oplogFetcherConfig.requestResumeToken = requestResumeToken;
+ auto oplogFetcher = std::make_unique<OplogFetcher>(
+ executor,
+ std::make_unique<OplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts),
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
fn,
- defaultBatchSize,
- startingPoint,
- filter,
- readConcern,
- requestResumeToken);
+ std::move(oplogFetcherConfig));
oplogFetcher->setCreateClientFn_forTest([this]() {
const auto autoReconnect = true;
return std::unique_ptr<DBClientConnection>(
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 30309958a6a..93bb1bc2c90 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -514,18 +514,28 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
StorageInterface::get(opCtx.get()), oplogBufferNs, options);
_donorOplogBuffer->startup(opCtx.get());
_dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateTenantMigration>();
- _donorOplogFetcher = (*_createOplogFetcherFn)(
- (**_scopedExecutor).get(),
+ OplogFetcher::Config oplogFetcherConfig(
*_stateDoc.getStartFetchingDonorOpTime(),
_oplogFetcherClient->getServerHostAndPort(),
// The config is only used for setting the awaitData timeout; the defaults are fine.
ReplSetConfig::parse(BSON("_id"
<< "dummy"
<< "version" << 1 << "members" << BSONObj())),
- std::make_unique<OplogFetcherRestartDecisionTenantMigration>(),
// We do not need to check the rollback ID.
ReplicationProcess::kUninitializedRollbackId,
- false /* requireFresherSyncSource */,
+ tenantMigrationOplogFetcherBatchSize,
+ OplogFetcher::RequireFresherSyncSource::kDontRequireFresherSyncSource);
+ oplogFetcherConfig.queryFilter = _getOplogFetcherFilter();
+ oplogFetcherConfig.queryReadConcern =
+ ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern);
+ oplogFetcherConfig.requestResumeToken = true;
+ oplogFetcherConfig.name =
+ "TenantOplogFetcher_" + getTenantId() + "_" + getMigrationUUID().toString();
+ oplogFetcherConfig.startingPoint = OplogFetcher::StartingPoint::kEnqueueFirstDoc;
+
+ _donorOplogFetcher = (*_createOplogFetcherFn)(
+ (**_scopedExecutor).get(),
+ std::make_unique<OplogFetcherRestartDecisionTenantMigration>(),
_dataReplicatorExternalState.get(),
[this, self = shared_from_this()](OplogFetcher::Documents::const_iterator first,
OplogFetcher::Documents::const_iterator last,
@@ -533,12 +543,7 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
return _enqueueDocuments(first, last, info);
},
[this, self = shared_from_this()](const Status& s, int rbid) { _oplogFetcherCallback(s); },
- tenantMigrationOplogFetcherBatchSize,
- OplogFetcher::StartingPoint::kEnqueueFirstDoc,
- _getOplogFetcherFilter(),
- ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern),
- true /* requestResumeToken */,
- "TenantOplogFetcher_" + getTenantId() + "_" + getMigrationUUID().toString());
+ std::move(oplogFetcherConfig));
_donorOplogFetcher->setConnection(std::move(_oplogFetcherClient));
uassertStatusOK(_donorOplogFetcher->startup());
}