diff options
author | Judah Schvimer <judah@mongodb.com> | 2017-04-18 10:06:14 -0400 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2017-04-18 10:06:14 -0400 |
commit | da37567fe37e39a52a96dd75c8929fabd096d2cb (patch) | |
tree | ada3cef019fea8f0cb8f622e8a06f35400e0b450 /src/mongo/db/repl/oplog_fetcher.h | |
parent | bae70bcec33c9e45a8adfa54c8e4468b60093d04 (diff) | |
download | mongo-da37567fe37e39a52a96dd75c8929fabd096d2cb.tar.gz |
SERVER-28209 Implement RollbackCommonPointResolver
Diffstat (limited to 'src/mongo/db/repl/oplog_fetcher.h')
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 96 |
1 files changed, 12 insertions, 84 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index d2067ab9c3f..052c5bb1017 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -29,20 +29,16 @@ #pragma once #include <cstddef> -#include <memory> #include "mongo/base/disallow_copying.h" #include "mongo/base/status_with.h" #include "mongo/bson/timestamp.h" #include "mongo/client/fetcher.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/repl/abstract_async_component.h" +#include "mongo/db/repl/abstract_oplog_fetcher.h" #include "mongo/db/repl/data_replicator_external_state.h" -#include "mongo/db/repl/optime_with.h" #include "mongo/db/repl/repl_set_config.h" -#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/fail_point_service.h" namespace mongo { @@ -51,11 +47,6 @@ namespace repl { MONGO_FP_FORWARD_DECLARE(stopReplProducer); /** - * Used to keep track of the optime and hash of the last fetched operation. - */ -using OpTimeWithHash = OpTimeWith<long long>; - -/** * The oplog fetcher, once started, reads operations from a remote oplog using a tailable cursor. * * The initial find command is generated from last fetched optime and hash and may contain the @@ -75,24 +66,17 @@ using OpTimeWithHash = OpTimeWith<long long>; * * When there is an error or when it is not possible to issue another getMore request, calls * "onShutdownCallbackFn" to signal the end of processing. + * + * This class subclasses AbstractOplogFetcher which takes care of scheduling the Fetcher and + * `getMore` commands, and handles restarting on errors. */ -class OplogFetcher : public AbstractAsyncComponent { +class OplogFetcher : public AbstractOplogFetcher { MONGO_DISALLOW_COPYING(OplogFetcher); public: static Seconds kDefaultProtocolZeroAwaitDataTimeout; /** - * Type of function called by the oplog fetcher on shutdown with - * the final oplog fetcher status, last optime fetched and last hash fetched. - * - * The status will be Status::OK() if we have processed the last batch of operations - * from the tailable cursor ("bob" is null in the fetcher callback). - */ - using OnShutdownCallbackFn = - stdx::function<void(const Status& shutdownStatus, const OpTimeWithHash& lastFetched)>; - - /** * Statistics on current batch of operations returned by the fetcher. */ struct DocumentsInfo { @@ -126,9 +110,7 @@ public: Timestamp lastTS); /** - * Initializes fetcher with command to tail remote oplog. - * - * Throws a UserException if validation fails on any of the provided arguments. + * Invariants if validation fails on any of the provided arguments. */ OplogFetcher(executor::TaskExecutor* executor, OpTimeWithHash lastFetched, @@ -144,21 +126,9 @@ public: virtual ~OplogFetcher(); - std::string toString() const; - - /** - * Returns optime and hash of the last oplog entry in the most recent oplog query result. - */ - OpTimeWithHash getLastOpTimeWithHashFetched() const; - // ================== Test support API =================== /** - * Returns command object sent in first remote command. - */ - BSONObj getCommandObject_forTest() const; - - /** * Returns metadata object sent in remote commands. */ BSONObj getMetadataObject_forTest() const; @@ -174,49 +144,19 @@ public: Milliseconds getAwaitDataTimeout_forTest() const; private: - // AbstractAsyncComponent overrides. - Status _doStartup_inlock() noexcept override; - void _doShutdown_inlock() noexcept override; - stdx::mutex* _getMutex() noexcept override; + BSONObj _makeFindCommandObject(const NamespaceString& nss, + OpTime lastOpTimeFetched) const override; - /** - * Schedules fetcher and updates counters. - */ - Status _scheduleFetcher_inlock(); - - /** - * Processes each batch of results from the tailable cursor started by the fetcher on the sync - * source. - * - * Calls "onShutdownCallbackFn" if there is an error or if there are no further results to - * request from the sync source. - */ - void _callback(const Fetcher::QueryResponseStatus& result, BSONObjBuilder* getMoreBob); + BSONObj _makeMetadataObject() const override; /** - * Notifies caller that the oplog fetcher has completed processing operations from - * the remote oplog. + * This function is run by the AbstractOplogFetcher on a successful batch of oplog entries. */ - void _finishCallback(Status status); - void _finishCallback(Status status, OpTimeWithHash opTimeWithHash); + StatusWith<BSONObj> _onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) override; - /** - * Creates a new instance of the fetcher to tail the remote oplog starting at the given optime. - */ - std::unique_ptr<Fetcher> _makeFetcher(long long currentTerm, OpTime lastFetchedOpTime); - - // Protects member data of this OplogFetcher. - mutable stdx::mutex _mutex; - - mutable stdx::condition_variable _condition; - - const HostAndPort _source; - const NamespaceString _nss; + // The metadata object sent with the Fetcher queries. const BSONObj _metadataObject; - // Maximum number of times to consecutively restart the fetcher on non-cancellation errors. - const std::size_t _maxFetcherRestarts; - // Rollback ID that the sync source is required to have after the first batch. int _requiredRBID; @@ -229,18 +169,6 @@ private: DataReplicatorExternalState* const _dataReplicatorExternalState; const EnqueueDocumentsFn _enqueueDocumentsFn; const Milliseconds _awaitDataTimeout; - OnShutdownCallbackFn _onShutdownCallbackFn; - - // Used to validate start of first batch of results from the remote oplog - // tailing query and to keep track of the last known operation consumed via - // "_enqueueDocumentsFn". - OpTimeWithHash _lastFetched; - - // Fetcher restarts since the last successful oplog query response. - std::size_t _fetcherRestarts = 0; - - std::unique_ptr<Fetcher> _fetcher; - std::unique_ptr<Fetcher> _shuttingDownFetcher; }; } // namespace repl |