diff options
author | Matthew Russotto <matthew.russotto@mongodb.com> | 2020-12-14 13:18:34 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-12-15 22:21:08 +0000 |
commit | 843390351dc7cd2e30df9f823b3b37ccff6ae274 (patch) | |
tree | 410a884e25f961fba74fde4523eeca4975347cf6 /src/mongo/db/repl/oplog_fetcher.h | |
parent | 64be66623e1e13c66c83dc6f31c544d2276b55fd (diff) | |
download | mongo-843390351dc7cd2e30df9f823b3b37ccff6ae274.tar.gz |
SERVER-51798 Pass an options struct into OplogFetcher rather than individual arguments
Diffstat (limited to 'src/mongo/db/repl/oplog_fetcher.h')
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 138 |
1 files changed, 67 insertions, 71 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index e7fbe964f5d..d7eaeec7651 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -164,25 +164,74 @@ public: const std::size_t _maxRestarts; }; + enum class RequireFresherSyncSource { + kDontRequireFresherSyncSource, + kRequireFresherSyncSource + }; + + struct Config { + Config(OpTime initialLastFetchedIn, + HostAndPort sourceIn, + ReplSetConfig replSetConfigIn, + int requiredRBIDIn, + int batchSizeIn, + RequireFresherSyncSource requireFresherSyncSourceIn = + RequireFresherSyncSource::kRequireFresherSyncSource) + : initialLastFetched(initialLastFetchedIn), + source(sourceIn), + replSetConfig(replSetConfigIn), + requiredRBID(requiredRBIDIn), + batchSize(batchSizeIn), + requireFresherSyncSource(requireFresherSyncSourceIn) {} + // The OpTime, last oplog entry fetched in a previous run, or the optime to start fetching + // from, depending on the startingPoint (below.). If the startingPoint is kSkipFirstDoc, + // this entry will be verified to exist, then discarded. If it is kEnqueueFirstDoc, it will + // be sent to the enqueue function with the first batch. + OpTime initialLastFetched; + + // Sync source to read from. + HostAndPort source; + + ReplSetConfig replSetConfig; + + // Rollback ID that the sync source is required to have after the first batch. If the value + // is uninitialized, the oplog fetcher has not contacted the sync source yet. + int requiredRBID; + + int batchSize; + + // A flag indicating whether we should error if the sync source is not ahead of our initial + // last fetched OpTime on the first batch. Most of the time this should be set to + // kRequireFresherSyncSource, but there are certain special cases where it's acceptable for + // our sync source to have no ops newer than _lastFetched. + RequireFresherSyncSource requireFresherSyncSource; + + // Predicate with additional filtering to be done on oplog entries. + BSONObj queryFilter = BSONObj(); + + // Read concern to use for reading the oplog. Empty read concern means we use a default + // of "afterClusterTime: Timestamp(0,1)". + ReadConcernArgs queryReadConcern = ReadConcernArgs(); + + // Indicates if we want to skip the first document during oplog fetching or not. + StartingPoint startingPoint = StartingPoint::kSkipFirstDoc; + + // Specifies if the oplog fetcher should request a resume token and provide it to + // _enqueueDocumentsFn. + bool requestResumeToken = false; + + std::string name = "oplog fetcher"; + }; + /** * Invariants if validation fails on any of the provided arguments. */ OplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - StartingPoint startingPoint = StartingPoint::kSkipFirstDoc, - BSONObj filter = BSONObj(), - ReadConcernArgs readConcern = ReadConcernArgs(), - bool requestResumeToken = false, - StringData name = "oplog fetcher"_sd); + Config config); virtual ~OplogFetcher(); @@ -384,15 +433,12 @@ private: // Protects member data of this OplogFetcher. mutable Mutex _mutex = MONGO_MAKE_LATCH("OplogFetcher::_mutex"); - // Sync source to read from. - const HostAndPort _source; - // Namespace of the oplog to read. const NamespaceString _nss = NamespaceString::kRsOplogNamespace; - // Rollback ID that the sync source is required to have after the first batch. If the value is - // uninitialized, the oplog fetcher has not contacted the sync source yet. - int _requiredRBID; + // Rollback ID that the sync source had after the first batch. Initialized from + // the requiredRBID in the OplogFetcher::Config and passed to the onShutdown callback. + int _receivedRBID; // Indicates whether the current batch is the first received via this cursor. bool _firstBatch = true; @@ -427,30 +473,10 @@ private: // shouldContinue function, a new cursor will be created or the oplog fetcher will shut down. std::unique_ptr<DBClientCursor> _cursor; - // A boolean indicating whether we should error if the sync source is not ahead of our initial - // last fetched OpTime on the first batch. Most of the time this should be set to true, - // but there are certain special cases, namely during initial sync, where it's acceptable for - // our sync source to have no ops newer than _lastFetched. - bool _requireFresherSyncSource; - DataReplicatorExternalState* const _dataReplicatorExternalState; const EnqueueDocumentsFn _enqueueDocumentsFn; const Milliseconds _awaitDataTimeout; - const int _batchSize; - - // Indicates if we want to skip the first document during oplog fetching or not. - StartingPoint _startingPoint; - - // Predicate with additional filtering to be done on oplog entries. - BSONObj _queryFilter; - - // Read concern to use for reading the oplog. Empty read concern means we use a default - // of "afterClusterTime: Timestamp(0,1)". - ReadConcernArgs _queryReadConcern; - - // Specifies if the oplog fetcher should request a resume token and provide it to - // _enqueueDocumentsFn. - const bool _requestResumeToken; + Config _config; // Handle to currently scheduled _runQuery task. executor::TaskExecutor::CallbackHandle _runQueryHandle; @@ -463,21 +489,11 @@ public: virtual ~OplogFetcherFactory() = default; virtual std::unique_ptr<OplogFetcher> operator()( executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn, OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc, - BSONObj filter = BSONObj(), - ReadConcernArgs readConcern = ReadConcernArgs(), - bool requestResumeToken = false, - StringData name = "oplog fetcher"_sd) const = 0; + OplogFetcher::Config config) const = 0; }; template <class T> @@ -485,37 +501,17 @@ class OplogFetcherFactoryImpl : public OplogFetcherFactory { public: std::unique_ptr<OplogFetcher> operator()( executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn, OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc, - BSONObj filter = BSONObj(), - ReadConcernArgs readConcern = ReadConcernArgs(), - bool requestResumeToken = false, - StringData name = "oplog_fetcher"_sd) const final { + OplogFetcher::Config config) const final { return std::make_unique<T>(executor, - lastFetched, - source, - config, std::move(oplogFetcherRestartDecision), - requiredRBID, - requireFresherSyncSource, dataReplicatorExternalState, std::move(enqueueDocumentsFn), std::move(onShutdownCallbackFn), - batchSize, - startingPoint, - std::move(filter), - std::move(readConcern), - requestResumeToken, - name); + std::move(config)); } static std::unique_ptr<OplogFetcherFactory> get() { |