summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog_fetcher.h
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-04-18 10:06:14 -0400
committerJudah Schvimer <judah@mongodb.com>2017-04-18 10:06:14 -0400
commitda37567fe37e39a52a96dd75c8929fabd096d2cb (patch)
treeada3cef019fea8f0cb8f622e8a06f35400e0b450 /src/mongo/db/repl/oplog_fetcher.h
parentbae70bcec33c9e45a8adfa54c8e4468b60093d04 (diff)
downloadmongo-da37567fe37e39a52a96dd75c8929fabd096d2cb.tar.gz
SERVER-28209 Implement RollbackCommonPointResolver
Diffstat (limited to 'src/mongo/db/repl/oplog_fetcher.h')
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h96
1 files changed, 12 insertions, 84 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index d2067ab9c3f..052c5bb1017 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -29,20 +29,16 @@
#pragma once
#include <cstddef>
-#include <memory>
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status_with.h"
#include "mongo/bson/timestamp.h"
#include "mongo/client/fetcher.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/repl/abstract_async_component.h"
+#include "mongo/db/repl/abstract_oplog_fetcher.h"
#include "mongo/db/repl/data_replicator_external_state.h"
-#include "mongo/db/repl/optime_with.h"
#include "mongo/db/repl/repl_set_config.h"
-#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
-#include "mongo/stdx/mutex.h"
#include "mongo/util/fail_point_service.h"
namespace mongo {
@@ -51,11 +47,6 @@ namespace repl {
MONGO_FP_FORWARD_DECLARE(stopReplProducer);
/**
- * Used to keep track of the optime and hash of the last fetched operation.
- */
-using OpTimeWithHash = OpTimeWith<long long>;
-
-/**
* The oplog fetcher, once started, reads operations from a remote oplog using a tailable cursor.
*
* The initial find command is generated from last fetched optime and hash and may contain the
@@ -75,24 +66,17 @@ using OpTimeWithHash = OpTimeWith<long long>;
*
* When there is an error or when it is not possible to issue another getMore request, calls
* "onShutdownCallbackFn" to signal the end of processing.
+ *
+ * This class subclasses AbstractOplogFetcher which takes care of scheduling the Fetcher and
+ * `getMore` commands, and handles restarting on errors.
*/
-class OplogFetcher : public AbstractAsyncComponent {
+class OplogFetcher : public AbstractOplogFetcher {
MONGO_DISALLOW_COPYING(OplogFetcher);
public:
static Seconds kDefaultProtocolZeroAwaitDataTimeout;
/**
- * Type of function called by the oplog fetcher on shutdown with
- * the final oplog fetcher status, last optime fetched and last hash fetched.
- *
- * The status will be Status::OK() if we have processed the last batch of operations
- * from the tailable cursor ("bob" is null in the fetcher callback).
- */
- using OnShutdownCallbackFn =
- stdx::function<void(const Status& shutdownStatus, const OpTimeWithHash& lastFetched)>;
-
- /**
* Statistics on current batch of operations returned by the fetcher.
*/
struct DocumentsInfo {
@@ -126,9 +110,7 @@ public:
Timestamp lastTS);
/**
- * Initializes fetcher with command to tail remote oplog.
- *
- * Throws a UserException if validation fails on any of the provided arguments.
+ * Invariants if validation fails on any of the provided arguments.
*/
OplogFetcher(executor::TaskExecutor* executor,
OpTimeWithHash lastFetched,
@@ -144,21 +126,9 @@ public:
virtual ~OplogFetcher();
- std::string toString() const;
-
- /**
- * Returns optime and hash of the last oplog entry in the most recent oplog query result.
- */
- OpTimeWithHash getLastOpTimeWithHashFetched() const;
-
// ================== Test support API ===================
/**
- * Returns command object sent in first remote command.
- */
- BSONObj getCommandObject_forTest() const;
-
- /**
* Returns metadata object sent in remote commands.
*/
BSONObj getMetadataObject_forTest() const;
@@ -174,49 +144,19 @@ public:
Milliseconds getAwaitDataTimeout_forTest() const;
private:
- // AbstractAsyncComponent overrides.
- Status _doStartup_inlock() noexcept override;
- void _doShutdown_inlock() noexcept override;
- stdx::mutex* _getMutex() noexcept override;
+ BSONObj _makeFindCommandObject(const NamespaceString& nss,
+ OpTime lastOpTimeFetched) const override;
- /**
- * Schedules fetcher and updates counters.
- */
- Status _scheduleFetcher_inlock();
-
- /**
- * Processes each batch of results from the tailable cursor started by the fetcher on the sync
- * source.
- *
- * Calls "onShutdownCallbackFn" if there is an error or if there are no further results to
- * request from the sync source.
- */
- void _callback(const Fetcher::QueryResponseStatus& result, BSONObjBuilder* getMoreBob);
+ BSONObj _makeMetadataObject() const override;
/**
- * Notifies caller that the oplog fetcher has completed processing operations from
- * the remote oplog.
+ * This function is run by the AbstractOplogFetcher on a successful batch of oplog entries.
*/
- void _finishCallback(Status status);
- void _finishCallback(Status status, OpTimeWithHash opTimeWithHash);
+ StatusWith<BSONObj> _onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) override;
- /**
- * Creates a new instance of the fetcher to tail the remote oplog starting at the given optime.
- */
- std::unique_ptr<Fetcher> _makeFetcher(long long currentTerm, OpTime lastFetchedOpTime);
-
- // Protects member data of this OplogFetcher.
- mutable stdx::mutex _mutex;
-
- mutable stdx::condition_variable _condition;
-
- const HostAndPort _source;
- const NamespaceString _nss;
+ // The metadata object sent with the Fetcher queries.
const BSONObj _metadataObject;
- // Maximum number of times to consecutively restart the fetcher on non-cancellation errors.
- const std::size_t _maxFetcherRestarts;
-
// Rollback ID that the sync source is required to have after the first batch.
int _requiredRBID;
@@ -229,18 +169,6 @@ private:
DataReplicatorExternalState* const _dataReplicatorExternalState;
const EnqueueDocumentsFn _enqueueDocumentsFn;
const Milliseconds _awaitDataTimeout;
- OnShutdownCallbackFn _onShutdownCallbackFn;
-
- // Used to validate start of first batch of results from the remote oplog
- // tailing query and to keep track of the last known operation consumed via
- // "_enqueueDocumentsFn".
- OpTimeWithHash _lastFetched;
-
- // Fetcher restarts since the last successful oplog query response.
- std::size_t _fetcherRestarts = 0;
-
- std::unique_ptr<Fetcher> _fetcher;
- std::unique_ptr<Fetcher> _shuttingDownFetcher;
};
} // namespace repl