summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog_fetcher.h
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2020-12-14 13:18:34 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-15 22:21:08 +0000
commit843390351dc7cd2e30df9f823b3b37ccff6ae274 (patch)
tree410a884e25f961fba74fde4523eeca4975347cf6 /src/mongo/db/repl/oplog_fetcher.h
parent64be66623e1e13c66c83dc6f31c544d2276b55fd (diff)
downloadmongo-843390351dc7cd2e30df9f823b3b37ccff6ae274.tar.gz
SERVER-51798 Pass an options struct into OplogFetcher rather than individual arguments
Diffstat (limited to 'src/mongo/db/repl/oplog_fetcher.h')
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h138
1 files changed, 67 insertions, 71 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index e7fbe964f5d..d7eaeec7651 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -164,25 +164,74 @@ public:
const std::size_t _maxRestarts;
};
+ enum class RequireFresherSyncSource {
+ kDontRequireFresherSyncSource,
+ kRequireFresherSyncSource
+ };
+
+ struct Config {
+ Config(OpTime initialLastFetchedIn,
+ HostAndPort sourceIn,
+ ReplSetConfig replSetConfigIn,
+ int requiredRBIDIn,
+ int batchSizeIn,
+ RequireFresherSyncSource requireFresherSyncSourceIn =
+ RequireFresherSyncSource::kRequireFresherSyncSource)
+ : initialLastFetched(initialLastFetchedIn),
+ source(sourceIn),
+ replSetConfig(replSetConfigIn),
+ requiredRBID(requiredRBIDIn),
+ batchSize(batchSizeIn),
+ requireFresherSyncSource(requireFresherSyncSourceIn) {}
+ // The OpTime, last oplog entry fetched in a previous run, or the optime to start fetching
+ // from, depending on the startingPoint (below.). If the startingPoint is kSkipFirstDoc,
+ // this entry will be verified to exist, then discarded. If it is kEnqueueFirstDoc, it will
+ // be sent to the enqueue function with the first batch.
+ OpTime initialLastFetched;
+
+ // Sync source to read from.
+ HostAndPort source;
+
+ ReplSetConfig replSetConfig;
+
+ // Rollback ID that the sync source is required to have after the first batch. If the value
+ // is uninitialized, the oplog fetcher has not contacted the sync source yet.
+ int requiredRBID;
+
+ int batchSize;
+
+ // A flag indicating whether we should error if the sync source is not ahead of our initial
+ // last fetched OpTime on the first batch. Most of the time this should be set to
+ // kRequireFresherSyncSource, but there are certain special cases where it's acceptable for
+ // our sync source to have no ops newer than _lastFetched.
+ RequireFresherSyncSource requireFresherSyncSource;
+
+ // Predicate with additional filtering to be done on oplog entries.
+ BSONObj queryFilter = BSONObj();
+
+ // Read concern to use for reading the oplog. Empty read concern means we use a default
+ // of "afterClusterTime: Timestamp(0,1)".
+ ReadConcernArgs queryReadConcern = ReadConcernArgs();
+
+ // Indicates if we want to skip the first document during oplog fetching or not.
+ StartingPoint startingPoint = StartingPoint::kSkipFirstDoc;
+
+ // Specifies if the oplog fetcher should request a resume token and provide it to
+ // _enqueueDocumentsFn.
+ bool requestResumeToken = false;
+
+ std::string name = "oplog fetcher";
+ };
+
/**
* Invariants if validation fails on any of the provided arguments.
*/
OplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- StartingPoint startingPoint = StartingPoint::kSkipFirstDoc,
- BSONObj filter = BSONObj(),
- ReadConcernArgs readConcern = ReadConcernArgs(),
- bool requestResumeToken = false,
- StringData name = "oplog fetcher"_sd);
+ Config config);
virtual ~OplogFetcher();
@@ -384,15 +433,12 @@ private:
// Protects member data of this OplogFetcher.
mutable Mutex _mutex = MONGO_MAKE_LATCH("OplogFetcher::_mutex");
- // Sync source to read from.
- const HostAndPort _source;
-
// Namespace of the oplog to read.
const NamespaceString _nss = NamespaceString::kRsOplogNamespace;
- // Rollback ID that the sync source is required to have after the first batch. If the value is
- // uninitialized, the oplog fetcher has not contacted the sync source yet.
- int _requiredRBID;
+ // Rollback ID that the sync source had after the first batch. Initialized from
+ // the requiredRBID in the OplogFetcher::Config and passed to the onShutdown callback.
+ int _receivedRBID;
// Indicates whether the current batch is the first received via this cursor.
bool _firstBatch = true;
@@ -427,30 +473,10 @@ private:
// shouldContinue function, a new cursor will be created or the oplog fetcher will shut down.
std::unique_ptr<DBClientCursor> _cursor;
- // A boolean indicating whether we should error if the sync source is not ahead of our initial
- // last fetched OpTime on the first batch. Most of the time this should be set to true,
- // but there are certain special cases, namely during initial sync, where it's acceptable for
- // our sync source to have no ops newer than _lastFetched.
- bool _requireFresherSyncSource;
-
DataReplicatorExternalState* const _dataReplicatorExternalState;
const EnqueueDocumentsFn _enqueueDocumentsFn;
const Milliseconds _awaitDataTimeout;
- const int _batchSize;
-
- // Indicates if we want to skip the first document during oplog fetching or not.
- StartingPoint _startingPoint;
-
- // Predicate with additional filtering to be done on oplog entries.
- BSONObj _queryFilter;
-
- // Read concern to use for reading the oplog. Empty read concern means we use a default
- // of "afterClusterTime: Timestamp(0,1)".
- ReadConcernArgs _queryReadConcern;
-
- // Specifies if the oplog fetcher should request a resume token and provide it to
- // _enqueueDocumentsFn.
- const bool _requestResumeToken;
+ Config _config;
// Handle to currently scheduled _runQuery task.
executor::TaskExecutor::CallbackHandle _runQueryHandle;
@@ -463,21 +489,11 @@ public:
virtual ~OplogFetcherFactory() = default;
virtual std::unique_ptr<OplogFetcher> operator()(
executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn,
OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc,
- BSONObj filter = BSONObj(),
- ReadConcernArgs readConcern = ReadConcernArgs(),
- bool requestResumeToken = false,
- StringData name = "oplog fetcher"_sd) const = 0;
+ OplogFetcher::Config config) const = 0;
};
template <class T>
@@ -485,37 +501,17 @@ class OplogFetcherFactoryImpl : public OplogFetcherFactory {
public:
std::unique_ptr<OplogFetcher> operator()(
executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn,
OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc,
- BSONObj filter = BSONObj(),
- ReadConcernArgs readConcern = ReadConcernArgs(),
- bool requestResumeToken = false,
- StringData name = "oplog_fetcher"_sd) const final {
+ OplogFetcher::Config config) const final {
return std::make_unique<T>(executor,
- lastFetched,
- source,
- config,
std::move(oplogFetcherRestartDecision),
- requiredRBID,
- requireFresherSyncSource,
dataReplicatorExternalState,
std::move(enqueueDocumentsFn),
std::move(onShutdownCallbackFn),
- batchSize,
- startingPoint,
- std::move(filter),
- std::move(readConcern),
- requestResumeToken,
- name);
+ std::move(config));
}
static std::unique_ptr<OplogFetcherFactory> get() {