summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2020-02-19 12:54:25 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-19 18:42:35 +0000
commitd2c07e12a87325dca3265e1b078045cbcf909044 (patch)
treefb39d92159580db0caa0f7a8945410941f2d1bc8 /src
parent2f318f6bc8a136d9273b5cc1973f590af374bae0 (diff)
downloadmongo-d2c07e12a87325dca3265e1b078045cbcf909044.tar.gz
SERVER-45574: Replace OplogFetcher with NewOplogFetcher
- Delete old OplogFetcher implementation and unit tests - Rename NewOplogFetcher as OplogFetcher
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/SConscript39
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher.cpp383
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher.h280
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher_test.cpp566
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp142
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h100
-rw-r--r--src/mongo/db/repl/bgsync.cpp12
-rw-r--r--src/mongo/db/repl/bgsync.h8
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp74
-rw-r--r--src/mongo/db/repl/initial_syncer.h28
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp10
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp419
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h242
-rw-r--r--src/mongo/db/repl/oplog_fetcher_mock.cpp32
-rw-r--r--src/mongo/db/repl/oplog_fetcher_mock.h2
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp1253
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl5
17 files changed, 305 insertions, 3290 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 0a01cd2f57d..99c78289a7f 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -844,45 +844,12 @@ env.Library(
)
env.Library(
- target='abstract_oplog_fetcher_test_fixture',
- source=[
- 'abstract_oplog_fetcher_test_fixture.cpp',
- ],
- LIBDEPS=[
- 'abstract_oplog_fetcher',
- 'oplog_entry',
- '$BUILD_DIR/mongo/unittest/task_executor_proxy',
- '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
- ],
-)
-
-env.Library(
- target='abstract_oplog_fetcher',
- source=[
- 'abstract_oplog_fetcher.cpp',
- ],
- LIBDEPS=[
- 'abstract_async_component',
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/client/fetcher',
- '$BUILD_DIR/mongo/db/namespace_string',
- '$BUILD_DIR/mongo/db/stats/counters',
- '$BUILD_DIR/mongo/db/stats/timer_stats',
- '$BUILD_DIR/mongo/executor/task_executor_interface',
- ],
- LIBDEPS_PRIVATE=[
- 'repl_server_parameters',
- '$BUILD_DIR/mongo/db/commands/server_status_core',
- ],
-)
-
-env.Library(
target='oplog_fetcher',
source=[
'oplog_fetcher.cpp',
],
LIBDEPS=[
- 'abstract_oplog_fetcher',
+ 'abstract_async_component',
'repl_coordinator_interface',
'replica_set_messages',
'$BUILD_DIR/mongo/base',
@@ -890,6 +857,7 @@ env.Library(
'$BUILD_DIR/mongo/db/namespace_string',
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/db/stats/timer_stats',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
],
LIBDEPS_PRIVATE=[
'repl_server_parameters',
@@ -1216,7 +1184,6 @@ env.CppUnitTest(
target='db_repl_test',
source=[
'abstract_async_component_test.cpp',
- 'abstract_oplog_fetcher_test.cpp',
'apply_ops_test.cpp',
'check_quorum_for_config_change_test.cpp',
'drop_pending_collection_reaper_test.cpp',
@@ -1291,8 +1258,6 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/util/clock_source_mock',
'$BUILD_DIR/mongo/util/concurrency/thread_pool',
'abstract_async_component',
- 'abstract_oplog_fetcher',
- 'abstract_oplog_fetcher_test_fixture',
'data_replicator_external_state_mock',
'drop_pending_collection_reaper',
'idempotency_test_fixture',
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher.cpp b/src/mongo/db/repl/abstract_oplog_fetcher.cpp
deleted file mode 100644
index ffff15f063f..00000000000
--- a/src/mongo/db/repl/abstract_oplog_fetcher.cpp
+++ /dev/null
@@ -1,383 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/repl/abstract_oplog_fetcher.h"
-
-#include <memory>
-
-#include "mongo/base/counter.h"
-#include "mongo/bson/util/bson_extract.h"
-#include "mongo/db/commands/server_status_metric.h"
-#include "mongo/db/jsobj.h"
-#include "mongo/db/repl/repl_server_parameters_gen.h"
-#include "mongo/logv2/log.h"
-#include "mongo/platform/mutex.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-namespace repl {
-
-// This failpoint is shared with oplog_fetcher.
-MONGO_FAIL_POINT_DEFINE(hangBeforeStartingOplogFetcher)
-
-namespace {
-// Default `maxTimeMS` timeout for `getMore`s.
-const Milliseconds kDefaultOplogGetMoreMaxMS{5000};
-
-} // namespace
-
-AbstractOplogFetcher::AbstractOplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- NamespaceString nss,
- std::size_t maxFetcherRestarts,
- OnShutdownCallbackFn onShutdownCallbackFn,
- const std::string& componentName)
- : AbstractOplogFetcher(executor,
- lastFetched,
- source,
- nss,
- std::make_unique<OplogFetcherRestartDecisionDefault>(maxFetcherRestarts),
- onShutdownCallbackFn,
- componentName) {
- invariant(!_lastFetched.isNull());
- invariant(onShutdownCallbackFn);
-}
-
-AbstractOplogFetcher::AbstractOplogFetcher(
- executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- NamespaceString nss,
- std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- OnShutdownCallbackFn onShutdownCallbackFn,
- const std::string& componentName)
- : AbstractAsyncComponent(executor, componentName),
- _source(source),
- _nss(nss),
- _oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)),
- _onShutdownCallbackFn(onShutdownCallbackFn),
- _lastFetched(lastFetched) {
- invariant(!_lastFetched.isNull());
- invariant(onShutdownCallbackFn);
-}
-
-
-Milliseconds AbstractOplogFetcher::_getInitialFindMaxTime() const {
- return Milliseconds(oplogInitialFindMaxSeconds.load() * 1000);
-}
-
-Milliseconds AbstractOplogFetcher::_getRetriedFindMaxTime() const {
- return Milliseconds(oplogRetriedFindMaxSeconds.load() * 1000);
-}
-
-Milliseconds AbstractOplogFetcher::_getGetMoreMaxTime() const {
- return kDefaultOplogGetMoreMaxMS;
-}
-
-Milliseconds AbstractOplogFetcher::_getNetworkTimeoutBuffer() const {
- return Milliseconds(oplogNetworkTimeoutBufferSeconds.load() * 1000);
-}
-
-std::string AbstractOplogFetcher::toString() const {
- stdx::lock_guard<Latch> lock(_mutex);
- str::stream msg;
- msg << _getComponentName() << " -"
- << " last optime fetched: " << _lastFetched.toString();
- // The fetcher is created a startup, not at construction, so we must check if it exists.
- if (_fetcher) {
- msg << " fetcher: " << _fetcher->getDiagnosticString();
- }
- return msg;
-}
-
-void AbstractOplogFetcher::_makeAndScheduleFetcherCallback(
- const executor::TaskExecutor::CallbackArgs& args) {
- Status responseStatus = _checkForShutdownAndConvertStatus(args, "error scheduling fetcher");
- if (!responseStatus.isOK()) {
- _finishCallback(responseStatus);
- return;
- }
-
- BSONObj findCommandObj =
- _makeFindCommandObject(_nss, _getLastOpTimeFetched(), _getInitialFindMaxTime());
- BSONObj metadataObj = _makeMetadataObject();
-
- Status scheduleStatus = Status::OK();
- {
- stdx::lock_guard<Latch> lock(_mutex);
- _fetcher = _makeFetcher(findCommandObj, metadataObj, _getInitialFindMaxTime());
- scheduleStatus = _scheduleFetcher_inlock();
- }
- if (!scheduleStatus.isOK()) {
- _finishCallback(scheduleStatus);
- return;
- }
-}
-
-Status AbstractOplogFetcher::_doStartup_inlock() noexcept {
- return _scheduleWorkAndSaveHandle_inlock(
- [this](const executor::TaskExecutor::CallbackArgs& args) {
- // Tests use this failpoint to prevent the oplog fetcher from starting. If those
- // tests fail and the oplog fetcher is canceled, we want to continue so we see
- // a test failure quickly instead of a test timeout eventually.
- while (hangBeforeStartingOplogFetcher.shouldFail() && !args.myHandle.isCanceled()) {
- sleepmillis(100);
- }
- _makeAndScheduleFetcherCallback(args);
- },
- &_makeAndScheduleFetcherHandle,
- "_makeAndScheduleFetcherCallback");
-}
-
-void AbstractOplogFetcher::_doShutdown_inlock() noexcept {
- _cancelHandle_inlock(_makeAndScheduleFetcherHandle);
- if (_fetcher) {
- _fetcher->shutdown();
- }
-}
-
-Mutex* AbstractOplogFetcher::_getMutex() noexcept {
- return &_mutex;
-}
-
-Status AbstractOplogFetcher::_scheduleFetcher_inlock() {
- return _fetcher->schedule();
-}
-
-OpTime AbstractOplogFetcher::getLastOpTimeFetched_forTest() const {
- return _getLastOpTimeFetched();
-}
-
-OpTime AbstractOplogFetcher::_getLastOpTimeFetched() const {
- stdx::lock_guard<Latch> lock(_mutex);
- return _lastFetched;
-}
-
-BSONObj AbstractOplogFetcher::getCommandObject_forTest() const {
- stdx::lock_guard<Latch> lock(_mutex);
- return _fetcher->getCommandObject();
-}
-
-BSONObj AbstractOplogFetcher::getFindQuery_forTest() const {
- return _makeFindCommandObject(_nss, _getLastOpTimeFetched(), _getInitialFindMaxTime());
-}
-
-HostAndPort AbstractOplogFetcher::_getSource() const {
- return _source;
-}
-
-NamespaceString AbstractOplogFetcher::_getNamespace() const {
- return _nss;
-}
-
-void AbstractOplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
- BSONObjBuilder* getMoreBob) {
- Status responseStatus =
- _checkForShutdownAndConvertStatus(result.getStatus(), "error in fetcher batch callback");
- if (ErrorCodes::CallbackCanceled == responseStatus) {
- LOGV2_DEBUG(21032,
- 1,
- "{getComponentName} oplog query cancelled to {getSource}: {responseStatus}",
- "getComponentName"_attr = _getComponentName(),
- "getSource"_attr = _getSource(),
- "responseStatus"_attr = redact(responseStatus));
- _finishCallback(responseStatus);
- return;
- }
- // If target cut connections between connecting and querying (for
- // example, because it stepped down) we might not have a cursor.
- if (!responseStatus.isOK()) {
- BSONObj findCommandObj =
- _makeFindCommandObject(_nss, _getLastOpTimeFetched(), _getRetriedFindMaxTime());
- BSONObj metadataObj = _makeMetadataObject();
- {
- if (_oplogFetcherRestartDecision->shouldContinue(this, responseStatus)) {
- stdx::lock_guard<Latch> lock(_mutex);
- // Destroying current instance in _shuttingDownFetcher will possibly block.
- _shuttingDownFetcher.reset();
- // Move the old fetcher into the shutting down instance.
- _shuttingDownFetcher.swap(_fetcher);
- // Create and start fetcher with current term and new starting optime, and use the
- // retry 'find' timeout.
- _fetcher = _makeFetcher(findCommandObj, metadataObj, _getRetriedFindMaxTime());
-
- auto scheduleStatus = _scheduleFetcher_inlock();
- if (scheduleStatus.isOK()) {
- LOGV2(21033,
- "Scheduled new oplog query {fetcher}",
- "fetcher"_attr = _fetcher->toString());
- return;
- }
- LOGV2_ERROR(21037,
- "Error scheduling new oplog query: {scheduleStatus}. Returning current "
- "oplog query error: {responseStatus}",
- "scheduleStatus"_attr = redact(scheduleStatus),
- "responseStatus"_attr = redact(responseStatus));
- }
- }
- _finishCallback(responseStatus);
- return;
- }
-
- // Reset fetcher restart counter on successful response.
- {
- stdx::lock_guard<Latch> lock(_mutex);
- invariant(_isActive_inlock());
- _oplogFetcherRestartDecision->fetchSuccessful(this);
- }
-
- if (_isShuttingDown()) {
- _finishCallback(
- Status(ErrorCodes::CallbackCanceled, _getComponentName() + " shutting down"));
- return;
- }
-
- // At this point we have a successful batch and can call the subclass's _onSuccessfulBatch.
- const auto& queryResponse = result.getValue();
- auto batchResult = _onSuccessfulBatch(queryResponse);
- if (!batchResult.isOK()) {
- // The stopReplProducer fail point expects this to return successfully. If another fail
- // point wants this to return unsuccessfully, it should use a different error code.
- if (batchResult.getStatus() == ErrorCodes::FailPointEnabled) {
- _finishCallback(Status::OK());
- return;
- }
- _finishCallback(batchResult.getStatus());
- return;
- }
-
- // No more data. Stop processing and return Status::OK.
- if (!getMoreBob) {
- _finishCallback(Status::OK());
- return;
- }
-
- // We have now processed the batch and should move forward our view of _lastFetched. Note that
- // the _lastFetched value will not be updated until the _onSuccessfulBatch function is
- // completed.
- const auto& documents = queryResponse.documents;
- if (documents.size() > 0) {
- auto lastDocRes = OpTime::parseFromOplogEntry(documents.back());
- if (!lastDocRes.isOK()) {
- _finishCallback(lastDocRes.getStatus());
- return;
- }
- auto lastDoc = lastDocRes.getValue();
- LOGV2_DEBUG(21034,
- 3,
- "{getComponentName} setting last fetched optime ahead after batch: {lastDoc}",
- "getComponentName"_attr = _getComponentName(),
- "lastDoc"_attr = lastDoc);
-
- stdx::lock_guard<Latch> lock(_mutex);
- _lastFetched = lastDoc;
- }
-
- // Check for shutdown to save an unnecessary `getMore` request.
- if (_isShuttingDown()) {
- _finishCallback(
- Status(ErrorCodes::CallbackCanceled, _getComponentName() + " shutting down"));
- return;
- }
-
- // The _onSuccessfulBatch function returns the `getMore` command we want to send.
- getMoreBob->appendElements(batchResult.getValue());
-}
-
-void AbstractOplogFetcher::_finishCallback(Status status) {
- invariant(isActive());
-
- _onShutdownCallbackFn(status);
-
- decltype(_onShutdownCallbackFn) onShutdownCallbackFn;
- decltype(_oplogFetcherRestartDecision) oplogFetcherRestartDecision;
- stdx::lock_guard<Latch> lock(_mutex);
- _transitionToComplete_inlock();
-
- // Release any resources that might be held by the '_onShutdownCallbackFn' function object.
- // The function object will be destroyed outside the lock since the temporary variable
- // 'onShutdownCallbackFn' is declared before 'lock'.
- invariant(_onShutdownCallbackFn);
- std::swap(_onShutdownCallbackFn, onShutdownCallbackFn);
-
- // Release any resources held by the OplogFetcherRestartDecision
- invariant(_oplogFetcherRestartDecision);
- std::swap(_oplogFetcherRestartDecision, oplogFetcherRestartDecision);
-}
-
-std::unique_ptr<Fetcher> AbstractOplogFetcher::_makeFetcher(const BSONObj& findCommandObj,
- const BSONObj& metadataObj,
- Milliseconds findMaxTime) {
- return std::make_unique<Fetcher>(
- _getExecutor(),
- _source,
- _nss.db().toString(),
- findCommandObj,
- [this](const StatusWith<Fetcher::QueryResponse>& resp,
- Fetcher::NextAction*,
- BSONObjBuilder* builder) { return _callback(resp, builder); },
- metadataObj,
- findMaxTime + _getNetworkTimeoutBuffer(),
- _getGetMoreMaxTime() + _getNetworkTimeoutBuffer());
-}
-
-bool AbstractOplogFetcher::OplogFetcherRestartDecisionDefault::shouldContinue(
- AbstractOplogFetcher* fetcher, Status status) {
- if (_fetcherRestarts == _maxFetcherRestarts) {
- LOGV2(21035,
- "Error returned from oplog query (no more query restarts left): {status}",
- "status"_attr = redact(status));
- return false;
- }
- LOGV2(
- 21036,
- "Restarting oplog query due to error: {status}. Last fetched optime: "
- "{fetcher_getLastOpTimeFetched}. Restarts remaining: {maxFetcherRestarts_fetcherRestarts}",
- "status"_attr = redact(status),
- "fetcher_getLastOpTimeFetched"_attr = fetcher->_getLastOpTimeFetched(),
- "maxFetcherRestarts_fetcherRestarts"_attr = (_maxFetcherRestarts - _fetcherRestarts));
- _fetcherRestarts++;
- return true;
-}
-
-void AbstractOplogFetcher::OplogFetcherRestartDecisionDefault::fetchSuccessful(
- AbstractOplogFetcher* fetcher) {
- _fetcherRestarts = 0;
-};
-
-AbstractOplogFetcher::OplogFetcherRestartDecision::~OplogFetcherRestartDecision(){};
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher.h b/src/mongo/db/repl/abstract_oplog_fetcher.h
deleted file mode 100644
index f588472bc18..00000000000
--- a/src/mongo/db/repl/abstract_oplog_fetcher.h
+++ /dev/null
@@ -1,280 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <functional>
-
-#include "mongo/base/status_with.h"
-#include "mongo/client/fetcher.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/repl/abstract_async_component.h"
-#include "mongo/db/repl/optime_with.h"
-#include "mongo/platform/mutex.h"
-
-namespace mongo {
-namespace repl {
-
-/**
- * This class represents an abstract base class for replication components that try to read from
- * remote oplogs. An abstract oplog fetcher is an abstract async component. It owns a Fetcher
- * that fetches operations from a remote oplog and restarts from the last fetched oplog entry on
- * error.
- *
- * The `find` command and metadata are provided by oplog fetchers that subclass the abstract oplog
- * fetcher. Subclasses also provide a callback to run on successful batches.
- */
-class AbstractOplogFetcher : public AbstractAsyncComponent {
- AbstractOplogFetcher(const AbstractOplogFetcher&) = delete;
- AbstractOplogFetcher& operator=(const AbstractOplogFetcher&) = delete;
-
-public:
- /**
- * Type of function called by the abstract oplog fetcher on shutdown with
- * the final abstract oplog fetcher status.
- *
- * The status will be Status::OK() if we have processed the last batch of operations
- * from the cursor ("bob" is null in the fetcher callback).
- *
- * This function will be called 0 times if startup() fails and at most once after startup()
- * returns success.
- */
- using OnShutdownCallbackFn = std::function<void(const Status& shutdownStatus)>;
-
- class OplogFetcherRestartDecision;
-
- /**
- * Invariants if validation fails on any of the provided arguments.
- */
- AbstractOplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- NamespaceString nss,
- std::size_t maxFetcherRestarts,
- OnShutdownCallbackFn onShutdownCallbackFn,
- const std::string& componentName);
-
- AbstractOplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- NamespaceString nss,
- std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- OnShutdownCallbackFn onShutdownCallbackFn,
- const std::string& componentName);
-
- virtual ~AbstractOplogFetcher() = default;
-
- std::string toString() const;
-
- // ================== Test support API ===================
-
- /**
- * Returns the command object sent in first remote command. Since the Fetcher is not created
- * until startup, this cannot be used until the Fetcher is guaranteed to exist.
- */
- BSONObj getCommandObject_forTest() const;
-
- /**
- * Returns the `find` query provided to the Fetcher. Since the Fetcher is not created until
- * startup, this can be used for logging the `find` query before startup.
- */
- BSONObj getFindQuery_forTest() const;
-
- /**
- * Returns the OpTime of the last oplog entry fetched and processed.
- */
- OpTime getLastOpTimeFetched_forTest() const;
-
- class OplogFetcherRestartDecision {
- public:
- OplogFetcherRestartDecision(){};
-
- virtual ~OplogFetcherRestartDecision() = 0;
-
- virtual bool shouldContinue(AbstractOplogFetcher* fetcher, Status status) = 0;
-
- virtual void fetchSuccessful(AbstractOplogFetcher* fetcher) = 0;
- };
-
- class OplogFetcherRestartDecisionDefault : public OplogFetcherRestartDecision {
- public:
- OplogFetcherRestartDecisionDefault(std::size_t maxFetcherRestarts)
- : _maxFetcherRestarts(maxFetcherRestarts){};
-
- bool shouldContinue(AbstractOplogFetcher* fetcher, Status status) final;
-
- void fetchSuccessful(AbstractOplogFetcher* fetcher) final;
-
- ~OplogFetcherRestartDecisionDefault(){};
-
- private:
- AbstractOplogFetcher* _abstractOplogFetcher;
-
- // Fetcher restarts since the last successful oplog query response.
- std::size_t _fetcherRestarts = 0;
-
- const std::size_t _maxFetcherRestarts;
- };
-
-
-protected:
- /**
- * Returns how long the `find` command should wait before timing out.
- */
- virtual Milliseconds _getInitialFindMaxTime() const;
-
- /**
- * Returns how long the `find` command should wait before timing out, if we are retrying the
- * 'find' due to an error. This timeout should be considerably smaller than our initial oplog
- * find time, since a communication failure with an upstream node may indicate it is
- * unreachable.
- */
- virtual Milliseconds _getRetriedFindMaxTime() const;
-
- /**
- * Returns how long the `getMore` command should wait before timing out.
- */
- virtual Milliseconds _getGetMoreMaxTime() const;
-
- /**
- * Returns the amount of time to add to the `find` and `getMore` timeouts to calculate the
- * network timeout for the requests.
- */
- virtual Milliseconds _getNetworkTimeoutBuffer() const;
-
- /**
- * Returns the sync source from which this oplog fetcher is fetching.
- */
- HostAndPort _getSource() const;
-
- /**
- * Returns the namespace from which this oplog fetcher is fetching.
- */
- NamespaceString _getNamespace() const;
-
- /**
- * Returns the OpTime of the last oplog entry fetched and processed.
- */
- virtual OpTime _getLastOpTimeFetched() const;
-
- // =============== AbstractAsyncComponent overrides ================
-
- /**
- * Initializes and schedules a Fetcher with a `find` command specified by the subclass.
- */
- virtual Status _doStartup_inlock() noexcept override;
-
- /**
- * Shuts down the Fetcher.
- */
- virtual void _doShutdown_inlock() noexcept override;
-
-private:
- Mutex* _getMutex() noexcept override;
-
- /**
- * This function must be overriden by subclass oplog fetchers to specify what `find` command
- * to issue to the sync source. The subclass is provided with the last OpTime fetched so that
- * it can begin its Fetcher from the middle of the oplog.
- */
- virtual BSONObj _makeFindCommandObject(const NamespaceString& nss,
- OpTime lastOpTimeFetched,
- Milliseconds findMaxTime) const = 0;
-
- /**
- * This function must be overriden by subclass oplog fetchers to specify what metadata object
- * to send with the `find` command.
- */
- virtual BSONObj _makeMetadataObject() const = 0;
-
- /**
- * Function called by the abstract oplog fetcher when it gets a successful batch from
- * the sync source.
- *
- * On success, returns the BSONObj of the `getMore` command that should be sent back to the
- * sync source. On failure returns a status that will be passed to the _finishCallback.
- */
- virtual StatusWith<BSONObj> _onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) = 0;
-
- /**
- * This function creates a Fetcher with the given `find` command and metadata.
- */
- std::unique_ptr<Fetcher> _makeFetcher(const BSONObj& findCommandObj,
- const BSONObj& metadataObj,
- Milliseconds findTimeout);
- /**
- * Callback used to make a Fetcher, and then save and schedule it in a lock.
- */
- void _makeAndScheduleFetcherCallback(const executor::TaskExecutor::CallbackArgs& args);
-
- /**
- * Schedules fetcher and updates counters.
- */
- Status _scheduleFetcher_inlock();
-
- /**
- * Processes each batch of results from the cursor started by the Fetcher on the sync source.
- *
- * Calls "_finishCallback" if there is an error or if there are no further results to
- * request from the sync source.
- */
- void _callback(const Fetcher::QueryResponseStatus& result, BSONObjBuilder* getMoreBob);
-
- /**
- * Notifies caller that the oplog fetcher has completed processing operations from
- * the remote oplog using the "_onShutdownCallbackFn".
- */
- void _finishCallback(Status status);
-
- // Sync source to read from.
- const HostAndPort _source;
-
- // Namespace of the oplog to read.
- const NamespaceString _nss;
-
- std::unique_ptr<OplogFetcherRestartDecision> _oplogFetcherRestartDecision;
-
- // Protects member data of this AbstractOplogFetcher.
- mutable Mutex _mutex = MONGO_MAKE_LATCH("AbstractOplogFetcher::_mutex");
-
- // Function to call when the oplog fetcher shuts down.
- OnShutdownCallbackFn _onShutdownCallbackFn;
-
- // Used to keep track of the last oplog entry read and processed from the sync source.
- OpTime _lastFetched;
-
- std::unique_ptr<Fetcher> _fetcher;
- std::unique_ptr<Fetcher> _shuttingDownFetcher;
-
- // Handle to currently scheduled _makeAndScheduleFetcherCallback task.
- executor::TaskExecutor::CallbackHandle _makeAndScheduleFetcherHandle;
-};
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp
deleted file mode 100644
index 5fe87c8530c..00000000000
--- a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp
+++ /dev/null
@@ -1,566 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
-
-#include "mongo/platform/basic.h"
-
-#include <memory>
-
-#include "mongo/db/repl/abstract_oplog_fetcher.h"
-#include "mongo/db/repl/abstract_oplog_fetcher_test_fixture.h"
-#include "mongo/db/repl/oplog_entry.h"
-#include "mongo/db/repl/task_executor_mock.h"
-#include "mongo/logv2/log.h"
-#include "mongo/unittest/task_executor_proxy.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/log.h"
-#include "mongo/util/scopeguard.h"
-
-namespace {
-
-using namespace mongo;
-using namespace mongo::repl;
-
-using executor::RemoteCommandRequest;
-using executor::RemoteCommandResponse;
-using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard;
-
-HostAndPort source("localhost:12345");
-NamespaceString nss("local.oplog.rs");
-
-// For testing. Should match the value used in the AbstractOplogFetcher.
-const Milliseconds kNetworkTimeoutBufferMS{5000};
-
-/**
- * This class is the minimal implementation of an oplog fetcher. It has the simplest `find` command
- * possible, no metadata, and the _onSuccessfulBatch function simply returns a `getMore` command
- * on the fetcher's cursor.
- */
-class MockOplogFetcher : public AbstractOplogFetcher {
-public:
- explicit MockOplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- NamespaceString nss,
- std::size_t maxFetcherRestarts,
- OnShutdownCallbackFn onShutdownCallbackFn);
-
- void setInitialFindMaxTime(Milliseconds findMaxTime) {
- _initialFindMaxTime = findMaxTime;
- }
-
- void setRetriedFindMaxTime(Milliseconds findMaxTime) {
- _retriedFindMaxTime = findMaxTime;
- }
-
-private:
- BSONObj _makeFindCommandObject(const NamespaceString& nss,
- OpTime lastOpTimeFetched,
- Milliseconds findMaxTime) const override;
- BSONObj _makeMetadataObject() const override;
-
- StatusWith<BSONObj> _onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) override;
-
- Milliseconds _getInitialFindMaxTime() const override;
-
- Milliseconds _getRetriedFindMaxTime() const override;
-
- Milliseconds _initialFindMaxTime{60000};
- Milliseconds _retriedFindMaxTime{2000};
-};
-
-MockOplogFetcher::MockOplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- NamespaceString nss,
- std::size_t maxFetcherRestarts,
- OnShutdownCallbackFn onShutdownCallbackFn)
- : AbstractOplogFetcher(executor,
- lastFetched,
- source,
- nss,
- maxFetcherRestarts,
- onShutdownCallbackFn,
- "mock oplog fetcher") {}
-
-Milliseconds MockOplogFetcher::_getInitialFindMaxTime() const {
- return _initialFindMaxTime;
-}
-
-Milliseconds MockOplogFetcher::_getRetriedFindMaxTime() const {
- return _retriedFindMaxTime;
-}
-
-BSONObj MockOplogFetcher::_makeFindCommandObject(const NamespaceString& nss,
- OpTime lastOpTimeFetched,
- Milliseconds findMaxTime) const {
- BSONObjBuilder cmdBob;
- cmdBob.append("find", nss.coll());
- cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())));
- cmdBob.append("maxTimeMS", durationCount<Milliseconds>(findMaxTime));
- return cmdBob.obj();
-}
-
-BSONObj MockOplogFetcher::_makeMetadataObject() const {
- return BSONObj();
-}
-
-StatusWith<BSONObj> MockOplogFetcher::_onSuccessfulBatch(
- const Fetcher::QueryResponse& queryResponse) {
- BSONObjBuilder cmdBob;
- cmdBob.append("getMore", queryResponse.cursorId);
- cmdBob.append("collection", _getNamespace().coll());
- return cmdBob.obj();
-}
-
-TEST_F(AbstractOplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) {
- getExecutor().shutdown();
-
- MockOplogFetcher oplogFetcher(&getExecutor(), lastFetched, source, nss, 0, [](Status) {});
-
- // Last optime fetched should match values passed to constructor.
- ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeFetched_forTest());
-
- ASSERT_FALSE(oplogFetcher.isActive());
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, oplogFetcher.startup());
- ASSERT_FALSE(oplogFetcher.isActive());
-
- // Last optime fetched should not change.
- ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeFetched_forTest());
-}
-
-TEST_F(AbstractOplogFetcherTest, StartupReturnsOperationFailedIfExecutorFailsToScheduleFetcher) {
- ShutdownState shutdownState;
-
- TaskExecutorMock taskExecutorMock(&getExecutor());
- taskExecutorMock.shouldFailScheduleWorkRequest = []() { return true; };
-
- MockOplogFetcher oplogFetcher(
- &taskExecutorMock, lastFetched, source, nss, 0, std::ref(shutdownState));
-
- ASSERT_EQUALS(ErrorCodes::OperationFailed, oplogFetcher.startup());
-}
-
-TEST_F(AbstractOplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToScheduleFind) {
- ShutdownState shutdownState;
-
- TaskExecutorMock taskExecutorMock(&getExecutor());
- taskExecutorMock.shouldFailScheduleRemoteCommandRequest =
- [](const executor::RemoteCommandRequestOnAny&) { return true; };
-
- MockOplogFetcher oplogFetcher(
- &taskExecutorMock, lastFetched, source, nss, 0, std::ref(shutdownState));
-
- ASSERT_FALSE(oplogFetcher.isActive());
- ASSERT_OK(oplogFetcher.startup());
-
- // It is racy to check OplogFetcher::isActive() immediately after calling startup() because
- // OplogFetcher schedules the remote command on a different thread from the caller of startup().
-
- oplogFetcher.join();
-
- ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState.getStatus());
-}
-
-TEST_F(AbstractOplogFetcherTest, ShuttingExecutorDownAfterStartupStopsTheOplogFetcher) {
- ShutdownState shutdownState;
-
- TaskExecutorMock taskExecutorMock(&getExecutor());
- taskExecutorMock.shouldDeferScheduleWorkRequestByOneSecond = []() { return true; };
-
- MockOplogFetcher oplogFetcher(
- &taskExecutorMock, lastFetched, source, nss, 0, std::ref(shutdownState));
-
- ASSERT_FALSE(oplogFetcher.isActive());
- ASSERT_OK(oplogFetcher.startup());
- ASSERT_TRUE(oplogFetcher.isActive());
-
- getExecutor().shutdown();
-
- oplogFetcher.join();
-
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
-}
-
-TEST_F(AbstractOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterStartup) {
- ShutdownState shutdownState;
-
- TaskExecutorMock taskExecutorMock(&getExecutor());
- taskExecutorMock.shouldDeferScheduleWorkRequestByOneSecond = []() { return true; };
-
- MockOplogFetcher oplogFetcher(
- &taskExecutorMock, lastFetched, source, nss, 0, std::ref(shutdownState));
-
- ASSERT_FALSE(oplogFetcher.isActive());
- ASSERT_OK(oplogFetcher.startup());
- ASSERT_TRUE(oplogFetcher.isActive());
-
- oplogFetcher.shutdown();
-
- oplogFetcher.join();
-
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
-}
-
-Timestamp _getTimestamp(const BSONObj& oplogEntry) {
- return OplogEntry(oplogEntry).getOpTime().getTimestamp();
-}
-
-OpTime _getOpTime(const BSONObj& oplogEntry) {
- return OplogEntry(oplogEntry).getOpTime();
-}
-
-std::vector<BSONObj> _generateOplogEntries(std::size_t size) {
- std::vector<BSONObj> ops(size);
- for (std::size_t i = 0; i < size; ++i) {
- ops[i] = AbstractOplogFetcherTest::makeNoopOplogEntry(Seconds(100 + int(i)));
- }
- return ops;
-}
-
-void _assertFindCommandTimestampEquals(const Timestamp& timestamp,
- const RemoteCommandRequest& request) {
- executor::TaskExecutorTest::assertRemoteCommandNameEquals("find", request);
- ASSERT_EQUALS(timestamp, request.cmdObj["filter"].Obj()["ts"].Obj()["$gte"].timestamp());
-}
-
-void _assertFindCommandTimestampEquals(const BSONObj& oplogEntry,
- const RemoteCommandRequest& request) {
- _assertFindCommandTimestampEquals(_getTimestamp(oplogEntry), request);
-}
-
-TEST_F(AbstractOplogFetcherTest,
- OplogFetcherCreatesNewFetcherOnCallbackErrorDuringGetMoreNumberOne) {
- auto ops = _generateOplogEntries(5U);
- std::size_t maxFetcherRestarts = 1U;
- auto shutdownState = std::make_unique<ShutdownState>();
- MockOplogFetcher oplogFetcher(&getExecutor(),
- _getOpTime(ops[0]),
- source,
- nss,
- maxFetcherRestarts,
- std::ref(*shutdownState));
-
- ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
-
- ASSERT_OK(oplogFetcher.startup());
-
- // Send first batch from FIND.
- _assertFindCommandTimestampEquals(
- ops[0], processNetworkResponse({makeCursorResponse(1, {ops[0], ops[1], ops[2]})}, true));
-
- // Send error during GETMORE.
- processNetworkResponse({ErrorCodes::CursorNotFound, "cursor not found"}, true);
-
- // Send first batch from FIND, and Check that it started from the end of the last FIND response.
- // Check that the optimes match for the query and last oplog entry.
- _assertFindCommandTimestampEquals(
- ops[2], processNetworkResponse({makeCursorResponse(0, {ops[2], ops[3], ops[4]})}, false));
-
- // Done.
- oplogFetcher.join();
- ASSERT_OK(shutdownState->getStatus());
-}
-
-TEST_F(AbstractOplogFetcherTest, OplogFetcherStopsRestartingFetcherIfRestartLimitIsReached) {
- auto ops = _generateOplogEntries(3U);
- std::size_t maxFetcherRestarts = 2U;
- auto shutdownState = std::make_unique<ShutdownState>();
- MockOplogFetcher oplogFetcher(&getExecutor(),
- _getOpTime(ops[0]),
- source,
- nss,
- maxFetcherRestarts,
- std::ref(*shutdownState));
-
- ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
-
- ASSERT_OK(oplogFetcher.startup());
-
- LOGV2(21038, "processing find request from first fetcher");
-
- _assertFindCommandTimestampEquals(
- ops[0], processNetworkResponse({makeCursorResponse(1, {ops[0], ops[1], ops[2]})}, true));
-
- LOGV2(21039, "sending error response to getMore request from first fetcher");
- assertRemoteCommandNameEquals(
- "getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "fail 1"}, true));
-
- LOGV2(21040, "sending error response to find request from second fetcher");
- _assertFindCommandTimestampEquals(
- ops[2], processNetworkResponse({ErrorCodes::IllegalOperation, "fail 2"}, true));
-
- LOGV2(21041, "sending error response to find request from third fetcher");
- _assertFindCommandTimestampEquals(
- ops[2], processNetworkResponse({ErrorCodes::OperationFailed, "fail 3"}, false));
-
- oplogFetcher.join();
- ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState->getStatus());
-}
-
-TEST_F(AbstractOplogFetcherTest, OplogFetcherResetsRestartCounterOnSuccessfulFetcherResponse) {
- auto ops = _generateOplogEntries(5U);
- std::size_t maxFetcherRestarts = 2U;
- auto shutdownState = std::make_unique<ShutdownState>();
- MockOplogFetcher oplogFetcher(&getExecutor(),
- _getOpTime(ops[0]),
- source,
- nss,
- maxFetcherRestarts,
- std::ref(*shutdownState));
- ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
-
- ASSERT_OK(oplogFetcher.startup());
-
- LOGV2(21042, "processing find request from first fetcher");
-
- _assertFindCommandTimestampEquals(
- ops[0], processNetworkResponse({makeCursorResponse(1, {ops[0], ops[1], ops[2]})}, true));
-
- LOGV2(21043, "sending error response to getMore request from first fetcher");
- assertRemoteCommandNameEquals(
- "getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "fail 1"}, true));
-
- LOGV2(21044, "processing find request from second fetcher");
- _assertFindCommandTimestampEquals(
- ops[2], processNetworkResponse({makeCursorResponse(1, {ops[2], ops[3], ops[4]})}, true));
-
- LOGV2(21045, "sending error response to getMore request from second fetcher");
- assertRemoteCommandNameEquals(
- "getMore", processNetworkResponse({ErrorCodes::IllegalOperation, "fail 2"}, true));
-
- LOGV2(21046, "sending error response to find request from third fetcher");
- _assertFindCommandTimestampEquals(
- ops[4], processNetworkResponse({ErrorCodes::InternalError, "fail 3"}, true));
-
- LOGV2(21047, "sending error response to find request from fourth fetcher");
- _assertFindCommandTimestampEquals(
- ops[4], processNetworkResponse({ErrorCodes::OperationFailed, "fail 4"}, false));
-
- oplogFetcher.join();
- ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState->getStatus());
-}
-
-class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy {
-public:
- using ShouldFailRequestFn = std::function<bool(const executor::RemoteCommandRequest&)>;
-
- TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor,
- ShouldFailRequestFn shouldFailRequest)
- : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
-
- StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb,
- const BatonHandle& baton = nullptr) override {
- if (_shouldFailRequest(request)) {
- return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
- }
- return getExecutor()->scheduleRemoteCommand(request, cb);
- }
-
-private:
- ShouldFailRequestFn _shouldFailRequest;
-};
-
-TEST_F(AbstractOplogFetcherTest,
- OplogFetcherAbortsWithOriginalResponseErrorOnFailureToScheduleNewFetcher) {
- auto ops = _generateOplogEntries(3U);
- std::size_t maxFetcherRestarts = 2U;
- auto shutdownState = std::make_unique<ShutdownState>();
- bool shouldFailSchedule = false;
- TaskExecutorWithFailureInScheduleRemoteCommand _executorProxy(
- &getExecutor(), [&shouldFailSchedule](const executor::RemoteCommandRequest& request) {
- return shouldFailSchedule;
- });
- MockOplogFetcher oplogFetcher(&_executorProxy,
- _getOpTime(ops[0]),
- source,
- nss,
- maxFetcherRestarts,
- std::ref(*shutdownState));
- ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
-
- ASSERT_OK(oplogFetcher.startup());
- ASSERT_TRUE(oplogFetcher.isActive());
-
- LOGV2(21048, "processing find request from first fetcher");
-
- _assertFindCommandTimestampEquals(
- ops[0], processNetworkResponse({makeCursorResponse(1, {ops[0], ops[1], ops[2]})}, true));
-
- LOGV2(21049, "sending error response to getMore request from first fetcher");
- shouldFailSchedule = true;
- assertRemoteCommandNameEquals(
- "getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "dead cursor"}, false));
-
- oplogFetcher.join();
- // Status in shutdown callback should match error for dead cursor instead of error from failed
- // schedule request.
- ASSERT_EQUALS(ErrorCodes::CappedPositionLost, shutdownState->getStatus());
-}
-
-TEST_F(AbstractOplogFetcherTest, OplogFetcherTimesOutCorrectlyOnInitialFindRequests) {
- auto ops = _generateOplogEntries(2U);
- std::size_t maxFetcherRestarts = 0U;
- auto shutdownState = std::make_unique<ShutdownState>();
- MockOplogFetcher oplogFetcher(&getExecutor(),
- _getOpTime(ops[0]),
- source,
- nss,
- maxFetcherRestarts,
- std::ref(*shutdownState));
-
- // Set a finite network timeout for the initial find request.
- auto initialFindMaxTime = Milliseconds(10000);
- oplogFetcher.setInitialFindMaxTime(initialFindMaxTime);
-
- ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
-
- ASSERT_OK(oplogFetcher.startup());
- ASSERT_TRUE(oplogFetcher.isActive());
-
- auto net = getNet();
-
- // Schedule a response at a time that would exceed the initial find request network timeout.
- net->enterNetwork();
- auto when = net->now() + initialFindMaxTime + kNetworkTimeoutBufferMS + Milliseconds(10);
- auto noi = getNet()->getNextReadyRequest();
- RemoteCommandResponse response = {{makeCursorResponse(1, {ops[0], ops[1]})}, Milliseconds(0)};
- auto request = net->scheduleSuccessfulResponse(noi, when, response);
- net->runUntil(when);
- net->runReadyNetworkOperations();
- net->exitNetwork();
-
- oplogFetcher.join();
-
- // The fetcher should have shut down after its last request timed out.
- ASSERT_TRUE(ErrorCodes::isExceededTimeLimitError(shutdownState->getStatus().code()));
-}
-
-TEST_F(AbstractOplogFetcherTest, OplogFetcherTimesOutCorrectlyOnRetriedFindRequests) {
- auto ops = _generateOplogEntries(2U);
- std::size_t maxFetcherRestarts = 1U;
- auto shutdownState = std::make_unique<ShutdownState>();
- MockOplogFetcher oplogFetcher(&getExecutor(),
- _getOpTime(ops[0]),
- source,
- nss,
- maxFetcherRestarts,
- std::ref(*shutdownState));
-
- // Set finite network timeouts for the initial and retried find requests.
- auto initialFindMaxTime = Milliseconds(10000);
- auto retriedFindMaxTime = Milliseconds(1000);
- oplogFetcher.setInitialFindMaxTime(initialFindMaxTime);
- oplogFetcher.setRetriedFindMaxTime(retriedFindMaxTime);
-
- ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
-
- ASSERT_OK(oplogFetcher.startup());
- ASSERT_TRUE(oplogFetcher.isActive());
-
- auto net = getNet();
-
- // Schedule a response at a time that would exceed the initial find request network timeout.
- net->enterNetwork();
- auto when = net->now() + initialFindMaxTime + kNetworkTimeoutBufferMS + Milliseconds(10);
- auto noi = getNet()->getNextReadyRequest();
- RemoteCommandResponse response = {{makeCursorResponse(1, {ops[0], ops[1]})}, Milliseconds(0)};
- auto request = net->scheduleSuccessfulResponse(noi, when, response);
- net->runUntil(when);
- net->runReadyNetworkOperations();
- net->exitNetwork();
-
- // Schedule a response at a time that would exceed the retried find request network timeout.
- net->enterNetwork();
- when = net->now() + retriedFindMaxTime + kNetworkTimeoutBufferMS + Milliseconds(10);
- noi = getNet()->getNextReadyRequest();
- response = {{makeCursorResponse(1, {ops[0], ops[1]})}, Milliseconds(0)};
- request = net->scheduleSuccessfulResponse(noi, when, response);
- net->runUntil(when);
- net->runReadyNetworkOperations();
- net->exitNetwork();
-
- oplogFetcher.join();
-
- // The fetcher should have shut down after its last request timed out.
- ASSERT_TRUE(ErrorCodes::isExceededTimeLimitError(shutdownState->getStatus().code()));
-}
-
-bool sharedCallbackStateDestroyed = false;
-class SharedCallbackState {
- SharedCallbackState(const SharedCallbackState&) = delete;
- SharedCallbackState& operator=(const SharedCallbackState&) = delete;
-
-public:
- SharedCallbackState() {}
- ~SharedCallbackState() {
- sharedCallbackStateDestroyed = true;
- }
-};
-
-TEST_F(AbstractOplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFunctionOnCompletion) {
- auto sharedCallbackData = std::make_shared<SharedCallbackState>();
- auto callbackInvoked = false;
- auto status = getDetectableErrorStatus();
-
- MockOplogFetcher oplogFetcher(
- &getExecutor(),
- lastFetched,
- source,
- nss,
- 0,
- [&callbackInvoked, sharedCallbackData, &status](const Status& shutdownStatus) {
- status = shutdownStatus, callbackInvoked = true;
- });
- ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
-
- ASSERT_FALSE(oplogFetcher.isActive());
- ASSERT_OK(oplogFetcher.startup());
- ASSERT_TRUE(oplogFetcher.isActive());
-
- sharedCallbackData.reset();
- ASSERT_FALSE(sharedCallbackStateDestroyed);
-
- processNetworkResponse({ErrorCodes::OperationFailed, "oplog tailing query failed"}, false);
-
- oplogFetcher.join();
-
- ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
-
- // Oplog fetcher should reset 'OplogFetcher::_onShutdownCallbackFn' after running callback
- // function before becoming inactive.
- // This ensures that we release resources associated with
- // 'OplogFetcher::_onShutdownCallbackFn'.
- ASSERT_TRUE(callbackInvoked);
- ASSERT_TRUE(sharedCallbackStateDestroyed);
-}
-
-} // namespace
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
deleted file mode 100644
index 68d8e861ddd..00000000000
--- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/repl/abstract_oplog_fetcher_test_fixture.h"
-
-#include "mongo/db/repl/oplog_entry.h"
-#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
-#include "mongo/logv2/log.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-namespace repl {
-
-namespace {
-
-/**
- * Creates an OplogEntry using given field values.
- */
-repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
- repl::OpTypeEnum opType,
- NamespaceString nss,
- BSONObj object) {
- return repl::OplogEntry(opTime, // optime
- boost::none, // hash
- opType, // opType
- nss, // namespace
- boost::none, // uuid
- boost::none, // fromMigrate
- repl::OplogEntry::kOplogVersion, // version
- object, // o
- boost::none, // o2
- {}, // sessionInfo
- boost::none, // upsert
- Date_t(), // wall clock time
- boost::none, // statement id
- boost::none, // optime of previous write within same transaction
- boost::none, // pre-image optime
- boost::none); // post-image optime
-}
-
-} // namespace
-
-ShutdownState::ShutdownState() = default;
-
-Status ShutdownState::getStatus() const {
- return _status;
-}
-
-void ShutdownState::operator()(const Status& status) {
- _status = status;
-}
-
-BSONObj AbstractOplogFetcherTest::makeNoopOplogEntry(OpTime opTime) {
- return makeOplogEntry(opTime, OpTypeEnum::kNoop, NamespaceString("test.t"), BSONObj()).toBSON();
-}
-
-BSONObj AbstractOplogFetcherTest::makeNoopOplogEntry(Seconds seconds) {
- return makeNoopOplogEntry({{seconds, 0}, 1LL});
-}
-
-BSONObj AbstractOplogFetcherTest::makeCursorResponse(CursorId cursorId,
- Fetcher::Documents oplogEntries,
- bool isFirstBatch,
- const NamespaceString& nss) {
- BSONObjBuilder bob;
- {
- BSONObjBuilder cursorBob(bob.subobjStart("cursor"));
- cursorBob.append("id", cursorId);
- cursorBob.append("ns", nss.toString());
- {
- BSONArrayBuilder batchBob(
- cursorBob.subarrayStart(isFirstBatch ? "firstBatch" : "nextBatch"));
- for (auto oplogEntry : oplogEntries) {
- batchBob.append(oplogEntry);
- }
- }
- }
- bob.append("ok", 1);
- return bob.obj();
-}
-
-void AbstractOplogFetcherTest::setUp() {
- executor::ThreadPoolExecutorTest::setUp();
- launchExecutorThread();
-
- lastFetched = {{123, 0}, 1};
- lastFetchedWall = Date_t() + Seconds(lastFetched.getSecs());
-}
-
-executor::RemoteCommandRequest AbstractOplogFetcherTest::processNetworkResponse(
- executor::RemoteCommandResponse response, bool expectReadyRequestsAfterProcessing) {
-
- auto net = getNet();
- executor::NetworkInterfaceMock::InNetworkGuard guard(net);
- LOGV2(21050, "scheduling response.");
- auto request = net->scheduleSuccessfulResponse(response);
- LOGV2(21051, "running network ops.");
- net->runReadyNetworkOperations();
- LOGV2(21052, "checking for more requests");
- ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests());
- LOGV2(21053, "returning consumed request");
- return request;
-}
-
-executor::RemoteCommandRequest AbstractOplogFetcherTest::processNetworkResponse(
- BSONObj obj, bool expectReadyRequestsAfterProcessing) {
- return processNetworkResponse({obj, Milliseconds(0)}, expectReadyRequestsAfterProcessing);
-}
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h
deleted file mode 100644
index 7349689bb32..00000000000
--- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-#pragma once
-
-#include "mongo/db/repl/abstract_oplog_fetcher.h"
-#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-namespace repl {
-
-/**
- * This class represents the state at shutdown of an abstract oplog fetcher.
- */
-class ShutdownState {
- ShutdownState(const ShutdownState&) = delete;
- ShutdownState& operator=(const ShutdownState&) = delete;
-
-public:
- ShutdownState();
-
- /**
- * Returns the status at shutdown.
- */
- Status getStatus() const;
-
- /**
- * Use this for oplog fetcher shutdown callback.
- */
- void operator()(const Status& status);
-
-private:
- Status _status = executor::TaskExecutorTest::getDetectableErrorStatus();
-};
-
-/**
- * This class contains many of the functions used by all oplog fetcher test suites.
- */
-class AbstractOplogFetcherTest : public executor::ThreadPoolExecutorTest {
-public:
- /**
- * Static functions for creating noop oplog entries.
- */
- static BSONObj makeNoopOplogEntry(OpTime opTime);
- static BSONObj makeNoopOplogEntry(Seconds seconds);
-
- /**
- * A static function for creating the response to a cursor. If it's the last batch, the
- * cursorId provided should be 0.
- */
- static BSONObj makeCursorResponse(
- CursorId cursorId,
- Fetcher::Documents oplogEntries,
- bool isFirstBatch = true,
- const NamespaceString& nss = NamespaceString("local.oplog.rs"));
-
-protected:
- void setUp() override;
-
- /**
- * Schedules network response and instructs network interface to process response.
- * Returns remote command request in network request.
- */
- executor::RemoteCommandRequest processNetworkResponse(
- executor::RemoteCommandResponse response, bool expectReadyRequestsAfterProcessing = false);
- executor::RemoteCommandRequest processNetworkResponse(
- BSONObj obj, bool expectReadyRequestsAfterProcessing = false);
-
- // The last OpTime fetched by the oplog fetcher.
- OpTime lastFetched;
- Date_t lastFetchedWall;
-};
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 9e70d60cf4a..d1c6d22120f 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -487,7 +487,7 @@ void BackgroundSync::_produce() {
Status fetcherReturnStatus = Status::OK();
DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState(
_replCoord, _replicationCoordinatorExternalState, this);
- NewOplogFetcher* oplogFetcher;
+ OplogFetcher* oplogFetcher;
try {
auto onOplogFetcherShutdownCallbackFn = [&fetcherReturnStatus](const Status& status) {
fetcherReturnStatus = status;
@@ -496,12 +496,12 @@ void BackgroundSync::_produce() {
// replication coordinator.
auto numRestarts =
_replicationCoordinatorExternalState->getOplogFetcherSteadyStateMaxFetcherRestarts();
- auto oplogFetcherPtr = std::make_unique<NewOplogFetcher>(
+ auto oplogFetcherPtr = std::make_unique<OplogFetcher>(
_replicationCoordinatorExternalState->getTaskExecutor(),
lastOpTimeFetched,
source,
_replCoord->getConfig(),
- std::make_unique<NewOplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts),
+ std::make_unique<OplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts),
syncSourceResp.rbid,
true /* requireFresherSyncSource */,
&dataReplicatorExternalState,
@@ -589,9 +589,9 @@ void BackgroundSync::_produce() {
}
}
-Status BackgroundSync::_enqueueDocuments(NewOplogFetcher::Documents::const_iterator begin,
- NewOplogFetcher::Documents::const_iterator end,
- const NewOplogFetcher::DocumentsInfo& info) {
+Status BackgroundSync::_enqueueDocuments(OplogFetcher::Documents::const_iterator begin,
+ OplogFetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info) {
// If this is the first batch of operations returned from the query, "toApplyDocumentCount" will
// be one fewer than "networkDocumentCount" because the first document (which was applied
// previously) is skipped.
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index f678d91384b..2e96b1ae434 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -166,9 +166,9 @@ private:
*
* requiredRBID is reset to empty after the first call.
*/
- Status _enqueueDocuments(NewOplogFetcher::Documents::const_iterator begin,
- NewOplogFetcher::Documents::const_iterator end,
- const NewOplogFetcher::DocumentsInfo& info);
+ Status _enqueueDocuments(OplogFetcher::Documents::const_iterator begin,
+ OplogFetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info);
/**
* Executes a rollback.
@@ -262,7 +262,7 @@ private:
std::unique_ptr<SyncSourceResolver> _syncSourceResolver; // (M)
// Current oplog fetcher tailing the oplog on the sync source.
- std::unique_ptr<NewOplogFetcher> _oplogFetcher;
+ std::unique_ptr<OplogFetcher> _oplogFetcher;
// Current rollback process. If this component is active, we are currently reverting local
// operations in the local oplog in order to bring this server to a consistent state relative
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index c3197ff7982..a2aa7ec87a3 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -206,32 +206,32 @@ InitialSyncer::InitialSyncer(
_onCompletion(onCompletion),
_createClientFn(
[] { return std::make_unique<DBClientConnection>(true /* autoReconnect */); }),
- _createOplogFetcherFn([](executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
- std::unique_ptr<NewOplogFetcher::OplogFetcherRestartDecision>
- oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
- DataReplicatorExternalState* dataReplicatorExternalState,
- NewOplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn,
- NewOplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- NewOplogFetcher::StartingPoint startingPoint) {
- return std::make_unique<NewOplogFetcher>(executor,
- lastFetched,
- source,
- config,
- std::move(oplogFetcherRestartDecision),
- requiredRBID,
- requireFresherSyncSource,
- dataReplicatorExternalState,
- std::move(enqueueDocumentsFn),
- std::move(onShutdownCallbackFn),
- batchSize,
- startingPoint);
- }) {
+ _createOplogFetcherFn(
+ [](executor::TaskExecutor* executor,
+ OpTime lastFetched,
+ HostAndPort source,
+ ReplSetConfig config,
+ std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision,
+ int requiredRBID,
+ bool requireFresherSyncSource,
+ DataReplicatorExternalState* dataReplicatorExternalState,
+ OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn,
+ OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn,
+ const int batchSize,
+ OplogFetcher::StartingPoint startingPoint) {
+ return std::make_unique<OplogFetcher>(executor,
+ lastFetched,
+ source,
+ config,
+ std::move(oplogFetcherRestartDecision),
+ requiredRBID,
+ requireFresherSyncSource,
+ dataReplicatorExternalState,
+ std::move(enqueueDocumentsFn),
+ std::move(onShutdownCallbackFn),
+ batchSize,
+ startingPoint);
+ }) {
uassert(ErrorCodes::BadValue, "task executor cannot be null", _exec);
uassert(ErrorCodes::BadValue, "invalid storage interface", _storage);
uassert(ErrorCodes::BadValue, "invalid replication process", _replicationProcess);
@@ -463,7 +463,7 @@ void InitialSyncer::setCreateOplogFetcherFn_forTest(
_createOplogFetcherFn = createOplogFetcherFn;
}
-NewOplogFetcher* InitialSyncer::getOplogFetcher_forTest() const {
+OplogFetcher* InitialSyncer::getOplogFetcher_forTest() const {
// Wait up to 10 seconds.
for (auto i = 0; i < 100; i++) {
{
@@ -1123,14 +1123,14 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
_rollbackChecker->getBaseRBID(),
false /* requireFresherSyncSource */,
_dataReplicatorExternalState.get(),
- [=](NewOplogFetcher::Documents::const_iterator first,
- NewOplogFetcher::Documents::const_iterator last,
- const NewOplogFetcher::DocumentsInfo& info) {
+ [=](OplogFetcher::Documents::const_iterator first,
+ OplogFetcher::Documents::const_iterator last,
+ const OplogFetcher::DocumentsInfo& info) {
return _enqueueDocuments(first, last, info);
},
[=](const Status& s) { _oplogFetcherCallback(s, onCompletionGuard); },
initialSyncOplogFetcherBatchSize,
- NewOplogFetcher::StartingPoint::kEnqueueFirstDoc);
+ OplogFetcher::StartingPoint::kEnqueueFirstDoc);
LOGV2_DEBUG(21178,
2,
@@ -1970,9 +1970,9 @@ StatusWith<HostAndPort> InitialSyncer::_chooseSyncSource_inlock() {
return syncSource;
}
-Status InitialSyncer::_enqueueDocuments(NewOplogFetcher::Documents::const_iterator begin,
- NewOplogFetcher::Documents::const_iterator end,
- const NewOplogFetcher::DocumentsInfo& info) {
+Status InitialSyncer::_enqueueDocuments(OplogFetcher::Documents::const_iterator begin,
+ OplogFetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info) {
if (info.toApplyDocumentCount == 0) {
return Status::OK();
}
@@ -2047,8 +2047,8 @@ void InitialSyncer::InitialSyncAttemptInfo::append(BSONObjBuilder* builder) cons
builder->append("totalTimeUnreachableMillis", totalTimeUnreachableMillis);
}
-bool InitialSyncer::OplogFetcherRestartDecisionInitialSyncer::shouldContinue(
- NewOplogFetcher* fetcher, Status status) {
+bool InitialSyncer::OplogFetcherRestartDecisionInitialSyncer::shouldContinue(OplogFetcher* fetcher,
+ Status status) {
if (ErrorCodes::isRetriableError(status)) {
stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData);
return _sharedData->shouldRetryOperation(lk, &_retryingOperation);
@@ -2060,7 +2060,7 @@ bool InitialSyncer::OplogFetcherRestartDecisionInitialSyncer::shouldContinue(
}
void InitialSyncer::OplogFetcherRestartDecisionInitialSyncer::fetchSuccessful(
- NewOplogFetcher* fetcher) {
+ OplogFetcher* fetcher) {
_retryingOperation = boost::none;
_defaultDecision.fetchSuccessful(fetcher);
}
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index 43d21bdafc3..6d72031d9bb 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -155,19 +155,19 @@ public:
/**
* Type of function to create an OplogFetcher.
*/
- using CreateOplogFetcherFn = std::function<std::unique_ptr<NewOplogFetcher>(
+ using CreateOplogFetcherFn = std::function<std::unique_ptr<OplogFetcher>(
executor::TaskExecutor* executor,
OpTime lastFetched,
HostAndPort source,
ReplSetConfig config,
- std::unique_ptr<NewOplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision,
+ std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision,
int requiredRBID,
bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
- NewOplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn,
- NewOplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn,
+ OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn,
+ OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn,
const int batchSize,
- NewOplogFetcher::StartingPoint startingPoint)>;
+ OplogFetcher::StartingPoint startingPoint)>;
struct InitialSyncAttemptInfo {
int durationMillis;
@@ -183,22 +183,22 @@ public:
};
class OplogFetcherRestartDecisionInitialSyncer
- : public NewOplogFetcher::OplogFetcherRestartDecision {
+ : public OplogFetcher::OplogFetcherRestartDecision {
public:
OplogFetcherRestartDecisionInitialSyncer(InitialSyncSharedData* sharedData,
std::size_t maxFetcherRestarts)
: _sharedData(sharedData), _defaultDecision(maxFetcherRestarts){};
- bool shouldContinue(NewOplogFetcher* fetcher, Status status) final;
+ bool shouldContinue(OplogFetcher* fetcher, Status status) final;
- void fetchSuccessful(NewOplogFetcher* fetcher) final;
+ void fetchSuccessful(OplogFetcher* fetcher) final;
private:
InitialSyncSharedData* _sharedData;
// We delegate to the default strategy when it's a non-network error.
- NewOplogFetcher::OplogFetcherRestartDecisionDefault _defaultDecision;
+ OplogFetcher::OplogFetcherRestartDecisionDefault _defaultDecision;
// The operation, if any, currently being retried because of a network error.
InitialSyncSharedData::RetryableOperation _retryingOperation;
@@ -280,7 +280,7 @@ public:
*
* For testing only.
*/
- NewOplogFetcher* getOplogFetcher_forTest() const;
+ OplogFetcher* getOplogFetcher_forTest() const;
/**
*
@@ -591,9 +591,9 @@ private:
* Returns a status even though it always returns OK, to conform the interface OplogFetcher
* expects for the EnqueueDocumentsFn.
*/
- Status _enqueueDocuments(NewOplogFetcher::Documents::const_iterator begin,
- NewOplogFetcher::Documents::const_iterator end,
- const NewOplogFetcher::DocumentsInfo& info);
+ Status _enqueueDocuments(OplogFetcher::Documents::const_iterator begin,
+ OplogFetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info);
void _appendInitialSyncProgressMinimal_inlock(BSONObjBuilder* bob) const;
BSONObj _getInitialSyncProgress_inlock() const;
@@ -743,7 +743,7 @@ private:
InitialSyncSharedData::RetryableOperation _retryingOperation; // (M)
std::unique_ptr<InitialSyncState> _initialSyncState; // (M)
- std::unique_ptr<NewOplogFetcher> _oplogFetcher; // (S)
+ std::unique_ptr<OplogFetcher> _oplogFetcher; // (S)
std::unique_ptr<Fetcher> _beginFetchingOpTimeFetcher; // (S)
std::unique_ptr<Fetcher> _lastOplogEntryFetcher; // (S)
std::unique_ptr<Fetcher> _fCVFetcher; // (S)
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 3fd72c2aef4..39ed893a7bd 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -432,16 +432,16 @@ protected:
OpTime lastFetched,
HostAndPort source,
ReplSetConfig config,
- std::unique_ptr<NewOplogFetcher::OplogFetcherRestartDecision>
+ std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision>
oplogFetcherRestartDecision,
int requiredRBID,
bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
- NewOplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn,
- NewOplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn,
+ OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn,
+ OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn,
const int batchSize,
- NewOplogFetcher::StartingPoint startingPoint) {
- return std::unique_ptr<NewOplogFetcher>(
+ OplogFetcher::StartingPoint startingPoint) {
+ return std::unique_ptr<OplogFetcher>(
new OplogFetcherMock(executor,
lastFetched,
source,
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 8d6ce4ac44d..94c212ce950 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -51,19 +51,15 @@
namespace mongo {
namespace repl {
-Seconds OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout(2);
-
MONGO_FAIL_POINT_DEFINE(stopReplProducer);
MONGO_FAIL_POINT_DEFINE(stopReplProducerOnDocument);
MONGO_FAIL_POINT_DEFINE(setSmallOplogGetMoreMaxTimeMS);
MONGO_FAIL_POINT_DEFINE(logAfterOplogFetcherConnCreated);
MONGO_FAIL_POINT_DEFINE(hangAfterOplogFetcherCallbackScheduled);
+MONGO_FAIL_POINT_DEFINE(hangBeforeStartingOplogFetcher);
MONGO_FAIL_POINT_DEFINE(hangBeforeOplogFetcherRetries);
MONGO_FAIL_POINT_DEFINE(hangBeforeProcessingSuccessfulBatch);
-// TODO SERVER-45574: Define the failpoint in this file instead.
-extern FailPoint hangBeforeStartingOplogFetcher;
-
namespace {
// The number and time spent reading batches off the network
@@ -280,6 +276,7 @@ StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata(
}
} // namespace
+
StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
const Fetcher::Documents& documents,
bool first,
@@ -333,46 +330,13 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
auto alreadyAppliedDocument = documents.cbegin();
info.toApplyDocumentBytes -= alreadyAppliedDocument->objsize();
}
- return info;
-}
-
-OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- NamespaceString nss,
- ReplSetConfig config,
- std::size_t maxFetcherRestarts,
- int requiredRBID,
- bool requireFresherSyncSource,
- DataReplicatorExternalState* dataReplicatorExternalState,
- EnqueueDocumentsFn enqueueDocumentsFn,
- OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- StartingPoint startingPoint)
- : AbstractOplogFetcher(executor,
- lastFetched,
- source,
- nss,
- maxFetcherRestarts,
- onShutdownCallbackFn,
- "oplog fetcher"),
- _metadataObject(makeMetadataObject()),
- _requiredRBID(requiredRBID),
- _requireFresherSyncSource(requireFresherSyncSource),
- _dataReplicatorExternalState(dataReplicatorExternalState),
- _enqueueDocumentsFn(enqueueDocumentsFn),
- _awaitDataTimeout(calculateAwaitDataTimeout(config)),
- _batchSize(batchSize),
- _startingPoint(startingPoint) {
- invariant(config.isInitialized());
- invariant(enqueueDocumentsFn);
+ return info;
}
OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
OpTime lastFetched,
HostAndPort source,
- NamespaceString nss,
ReplSetConfig config,
std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
int requiredRBID,
@@ -382,324 +346,6 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
OnShutdownCallbackFn onShutdownCallbackFn,
const int batchSize,
StartingPoint startingPoint)
- : AbstractOplogFetcher(executor,
- lastFetched,
- source,
- nss,
- std::move(oplogFetcherRestartDecision),
- onShutdownCallbackFn,
- "oplog fetcher"),
- _metadataObject(makeMetadataObject()),
- _requiredRBID(requiredRBID),
- _requireFresherSyncSource(requireFresherSyncSource),
- _dataReplicatorExternalState(dataReplicatorExternalState),
- _enqueueDocumentsFn(enqueueDocumentsFn),
- _awaitDataTimeout(calculateAwaitDataTimeout(config)),
- _batchSize(batchSize),
- _startingPoint(startingPoint) {
-
- invariant(config.isInitialized());
- invariant(enqueueDocumentsFn);
-}
-
-
-OplogFetcher::~OplogFetcher() {
- shutdown();
- join();
-}
-
-BSONObj OplogFetcher::_makeFindCommandObject(const NamespaceString& nss,
- OpTime lastOpTimeFetched,
- Milliseconds findMaxTime) const {
- auto lastCommittedWithCurrentTerm =
- _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
- auto term = lastCommittedWithCurrentTerm.value;
- BSONObjBuilder cmdBob;
- cmdBob.append("find", nss.coll());
- cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())));
- cmdBob.append("tailable", true);
- cmdBob.append("oplogReplay", true);
- cmdBob.append("awaitData", true);
- cmdBob.append("maxTimeMS", durationCount<Milliseconds>(findMaxTime));
- cmdBob.append("batchSize", _batchSize);
-
- if (term != OpTime::kUninitializedTerm) {
- cmdBob.append("term", term);
- }
-
- // This ensures that the sync source waits for all earlier oplog writes to be visible.
- // Since Timestamp(0, 0) isn't allowed, Timestamp(0, 1) is the minimal we can use.
- cmdBob.append("readConcern", BSON("afterClusterTime" << Timestamp(0, 1)));
-
- return cmdBob.obj();
-}
-
-BSONObj OplogFetcher::_makeMetadataObject() const {
- return _metadataObject;
-}
-
-BSONObj OplogFetcher::getMetadataObject_forTest() const {
- return _metadataObject;
-}
-
-Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const {
- return _getGetMoreMaxTime();
-}
-
-Milliseconds OplogFetcher::_getGetMoreMaxTime() const {
- if (MONGO_unlikely(setSmallOplogGetMoreMaxTimeMS.shouldFail())) {
- return Milliseconds(50);
- }
-
- return _awaitDataTimeout;
-}
-
-StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) {
-
- // Stop fetching and return on fail point.
- // This fail point makes the oplog fetcher ignore the downloaded batch of operations and not
- // error out. The FailPointEnabled error will be caught by the AbstractOplogFetcher.
- if (MONGO_unlikely(stopReplProducer.shouldFail())) {
- return Status(ErrorCodes::FailPointEnabled, "stopReplProducer fail point is enabled");
- }
-
- // Stop fetching and return when we reach a particular document. This failpoint should be used
- // with the setParameter bgSyncOplogFetcherBatchSize=1, so that documents are fetched one at a
- // time.
- {
- Status status = Status::OK();
- stopReplProducerOnDocument.executeIf(
- [&](auto&&) {
- status = {ErrorCodes::FailPointEnabled,
- "stopReplProducerOnDocument fail point is enabled."};
- LOGV2(21264, "{status_reason}", "status_reason"_attr = status.reason());
- },
- [&](const BSONObj& data) {
- auto opCtx = cc().makeOperationContext();
- boost::intrusive_ptr<ExpressionContext> expCtx(
- new ExpressionContext(opCtx.get(), nullptr, _getNamespace()));
- Matcher m(data["document"].Obj(), expCtx);
- return !queryResponse.documents.empty() &&
- m.matches(queryResponse.documents.front()["o"].Obj());
- });
- if (!status.isOK())
- return status;
- }
-
- const auto& documents = queryResponse.documents;
- auto firstDocToApply = documents.cbegin();
-
- if (!documents.empty()) {
- LOGV2_DEBUG(21265,
- 2,
- "oplog fetcher read {documents_size} operations from remote oplog starting at "
- "{documents_front_ts} and ending at {documents_back_ts}",
- "documents_size"_attr = documents.size(),
- "documents_front_ts"_attr = documents.front()["ts"],
- "documents_back_ts"_attr = documents.back()["ts"]);
- } else {
- LOGV2_DEBUG(21266, 2, "oplog fetcher read 0 operations from remote oplog");
- }
-
- auto oqMetadataResult = parseOplogQueryMetadata(queryResponse);
- if (!oqMetadataResult.isOK()) {
- LOGV2_ERROR(21276,
- "invalid oplog query metadata from sync source {getSource}: "
- "{oqMetadataResult_getStatus}: {queryResponse_otherFields_metadata}",
- "getSource"_attr = _getSource(),
- "oqMetadataResult_getStatus"_attr = oqMetadataResult.getStatus(),
- "queryResponse_otherFields_metadata"_attr = queryResponse.otherFields.metadata);
- return oqMetadataResult.getStatus();
- }
- auto oqMetadata = oqMetadataResult.getValue();
-
- // This lastFetched value is the last OpTime from the previous batch.
- auto lastFetched = _getLastOpTimeFetched();
-
- // Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
- if (queryResponse.first) {
- auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none;
- auto remoteLastApplied =
- oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none;
- auto status = checkRemoteOplogStart(documents,
- lastFetched,
- remoteLastApplied,
- _requiredRBID,
- remoteRBID,
- _requireFresherSyncSource);
- if (!status.isOK()) {
- // Stop oplog fetcher and execute rollback if necessary.
- return status;
- }
-
- LOGV2_DEBUG(21267,
- 1,
- "oplog fetcher successfully fetched from {getSource}",
- "getSource"_attr = _getSource());
-
- // We do not always enqueue the first document. We elect to skip it for the following
- // reasons:
- // 1. This is the first batch and no rollback is needed. Callers specify
- // StartingPoint::kSkipFirstDoc when they want this behavior.
- // 2. We have already enqueued that document in a previous attempt. We can get into
- // this situation if we had a batch with StartingPoint::kEnqueueFirstDoc that failed
- // right after that first document was enqueued. In such a scenario, we would not
- // have advanced the lastFetched opTime, so we skip past that document to avoid
- // duplicating it.
-
- if (_startingPoint == StartingPoint::kSkipFirstDoc) {
- firstDocToApply++;
- }
- }
-
- auto validateResult = OplogFetcher::validateDocuments(
- documents, queryResponse.first, lastFetched.getTimestamp(), _startingPoint);
- if (!validateResult.isOK()) {
- return validateResult.getStatus();
- }
- auto info = validateResult.getValue();
-
- // Process replset metadata. It is important that this happen after we've validated the
- // first batch, so we don't progress our knowledge of the commit point from a
- // response that triggers a rollback.
- rpc::ReplSetMetadata replSetMetadata;
- bool receivedReplMetadata =
- queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
- if (receivedReplMetadata) {
- const auto& metadataObj = queryResponse.otherFields.metadata;
- auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj);
- if (!metadataResult.isOK()) {
- LOGV2_ERROR(21277,
- "invalid replication metadata from sync source {getSource}: "
- "{metadataResult_getStatus}: {metadataObj}",
- "getSource"_attr = _getSource(),
- "metadataResult_getStatus"_attr = metadataResult.getStatus(),
- "metadataObj"_attr = metadataObj);
- return metadataResult.getStatus();
- }
- replSetMetadata = metadataResult.getValue();
-
- // We will only ever have OplogQueryMetadata if we have ReplSetMetadata, so it is safe
- // to call processMetadata() in this if block.
- invariant(oqMetadata);
- _dataReplicatorExternalState->processMetadata(replSetMetadata, *oqMetadata);
- }
-
- // Increment stats. We read all of the docs in the query.
- opsReadStats.increment(info.networkDocumentCount);
- networkByteStats.increment(info.networkDocumentBytes);
-
- // Record time for each batch.
- getmoreReplStats.recordMillis(durationCount<Milliseconds>(queryResponse.elapsedMillis));
-
- auto status = _enqueueDocumentsFn(firstDocToApply, documents.cend(), info);
- if (!status.isOK()) {
- return status;
- }
-
- // Start skipping the first doc after at least one doc has been enqueued in the lifetime
- // of this fetcher.
- _startingPoint = StartingPoint::kSkipFirstDoc;
-
- if (_dataReplicatorExternalState->shouldStopFetching(
- _getSource(), replSetMetadata, oqMetadata)) {
- str::stream errMsg;
- errMsg << "sync source " << _getSource().toString();
- errMsg << " (config version: " << replSetMetadata.getConfigVersion();
- // If OplogQueryMetadata was provided, its values were used to determine if we should
- // stop fetching from this sync source.
- if (oqMetadata) {
- errMsg << "; last applied optime: " << oqMetadata->getLastOpApplied().toString();
- errMsg << "; sync source index: " << oqMetadata->getSyncSourceIndex();
- errMsg << "; primary index: " << oqMetadata->getPrimaryIndex();
- } else {
- errMsg << "; last visible optime: " << replSetMetadata.getLastOpVisible().toString();
- errMsg << "; sync source index: " << replSetMetadata.getSyncSourceIndex();
- errMsg << "; primary index: " << replSetMetadata.getPrimaryIndex();
- }
- errMsg << ") is no longer valid";
- return Status(ErrorCodes::InvalidSyncSource, errMsg);
- }
-
- auto lastCommittedWithCurrentTerm =
- _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
- return makeGetMoreCommandObject(queryResponse.nss,
- queryResponse.cursorId,
- lastCommittedWithCurrentTerm,
- _getGetMoreMaxTime(),
- _batchSize);
-}
-
-StatusWith<NewOplogFetcher::DocumentsInfo> NewOplogFetcher::validateDocuments(
- const Fetcher::Documents& documents,
- bool first,
- Timestamp lastTS,
- StartingPoint startingPoint) {
- if (first && documents.empty()) {
- return Status(ErrorCodes::OplogStartMissing,
- str::stream() << "The first batch of oplog entries is empty, but expected at "
- "least 1 document matching ts: "
- << lastTS.toString());
- }
-
- DocumentsInfo info;
- // The count of the bytes of the documents read off the network.
- info.networkDocumentBytes = 0;
- info.networkDocumentCount = 0;
- for (auto&& doc : documents) {
- info.networkDocumentBytes += doc.objsize();
- ++info.networkDocumentCount;
-
- // If this is the first response (to the $gte query) then we already applied the first doc.
- if (first && info.networkDocumentCount == 1U) {
- continue;
- }
-
- auto docOpTime = OpTime::parseFromOplogEntry(doc);
- if (!docOpTime.isOK()) {
- return docOpTime.getStatus();
- }
- info.lastDocument = docOpTime.getValue();
-
- // Check to see if the oplog entry goes back in time for this document.
- const auto docTS = info.lastDocument.getTimestamp();
- if (lastTS >= docTS) {
- return Status(ErrorCodes::OplogOutOfOrder,
- str::stream() << "Out of order entries in oplog. lastTS: "
- << lastTS.toString() << " outOfOrderTS:" << docTS.toString()
- << " in batch with " << info.networkDocumentCount
- << "docs; first-batch:" << first << ", doc:" << doc);
- }
- lastTS = docTS;
- }
-
- // These numbers are for the documents we will apply.
- info.toApplyDocumentCount = documents.size();
- info.toApplyDocumentBytes = info.networkDocumentBytes;
- if (first && startingPoint == StartingPoint::kSkipFirstDoc) {
- // The count is one less since the first document found was already applied ($gte $ts query)
- // and we will not apply it again.
- --info.toApplyDocumentCount;
- auto alreadyAppliedDocument = documents.cbegin();
- info.toApplyDocumentBytes -= alreadyAppliedDocument->objsize();
- }
-
- return info;
-}
-
-NewOplogFetcher::NewOplogFetcher(
- executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
- std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
- DataReplicatorExternalState* dataReplicatorExternalState,
- EnqueueDocumentsFn enqueueDocumentsFn,
- OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- StartingPoint startingPoint)
: AbstractAsyncComponent(executor, "oplog fetcher"),
_source(source),
_requiredRBID(requiredRBID),
@@ -721,12 +367,12 @@ NewOplogFetcher::NewOplogFetcher(
invariant(enqueueDocumentsFn);
}
-NewOplogFetcher::~NewOplogFetcher() {
+OplogFetcher::~OplogFetcher() {
shutdown();
join();
}
-Status NewOplogFetcher::_doStartup_inlock() noexcept {
+Status OplogFetcher::_doStartup_inlock() noexcept {
return _scheduleWorkAndSaveHandle_inlock(
[this](const executor::TaskExecutor::CallbackArgs& args) {
// Tests use this failpoint to prevent the oplog fetcher from starting. If those
@@ -741,7 +387,7 @@ Status NewOplogFetcher::_doStartup_inlock() noexcept {
"_runQuery");
}
-void NewOplogFetcher::_doShutdown_inlock() noexcept {
+void OplogFetcher::_doShutdown_inlock() noexcept {
_cancelHandle_inlock(_runQueryHandle);
if (_conn) {
@@ -749,11 +395,11 @@ void NewOplogFetcher::_doShutdown_inlock() noexcept {
}
}
-Mutex* NewOplogFetcher::_getMutex() noexcept {
+Mutex* OplogFetcher::_getMutex() noexcept {
return &_mutex;
}
-std::string NewOplogFetcher::toString() {
+std::string OplogFetcher::toString() {
stdx::lock_guard lock(_mutex);
str::stream output;
output << "OplogFetcher -";
@@ -769,37 +415,37 @@ std::string NewOplogFetcher::toString() {
return output;
}
-OpTime NewOplogFetcher::getLastOpTimeFetched_forTest() const {
+OpTime OplogFetcher::getLastOpTimeFetched_forTest() const {
return _getLastOpTimeFetched();
}
-BSONObj NewOplogFetcher::getFindQuery_forTest(long long findTimeout) const {
+BSONObj OplogFetcher::getFindQuery_forTest(long long findTimeout) const {
return _makeFindQuery(findTimeout);
}
-Milliseconds NewOplogFetcher::getAwaitDataTimeout_forTest() const {
+Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const {
return _awaitDataTimeout;
}
-void NewOplogFetcher::setCreateClientFn_forTest(const CreateClientFn& createClientFn) {
+void OplogFetcher::setCreateClientFn_forTest(const CreateClientFn& createClientFn) {
stdx::lock_guard lock(_mutex);
_createClientFn = createClientFn;
}
-DBClientConnection* NewOplogFetcher::getDBClientConnection_forTest() const {
+DBClientConnection* OplogFetcher::getDBClientConnection_forTest() const {
stdx::lock_guard lock(_mutex);
return _conn.get();
}
-Milliseconds NewOplogFetcher::getInitialFindMaxTime_forTest() const {
+Milliseconds OplogFetcher::getInitialFindMaxTime_forTest() const {
return _getInitialFindMaxTime();
}
-Milliseconds NewOplogFetcher::getRetriedFindMaxTime_forTest() const {
+Milliseconds OplogFetcher::getRetriedFindMaxTime_forTest() const {
return _getRetriedFindMaxTime();
}
-void NewOplogFetcher::_setSocketTimeout(long long timeout) {
+void OplogFetcher::_setSocketTimeout(long long timeout) {
stdx::lock_guard<Latch> lock(_mutex);
invariant(_conn);
// setSoTimeout takes a double representing the number of seconds for send and receive
@@ -808,20 +454,20 @@ void NewOplogFetcher::_setSocketTimeout(long long timeout) {
_conn->setSoTimeout(timeout / 1000.0 + oplogNetworkTimeoutBufferSeconds.load());
}
-OpTime NewOplogFetcher::_getLastOpTimeFetched() const {
+OpTime OplogFetcher::_getLastOpTimeFetched() const {
stdx::lock_guard<Latch> lock(_mutex);
return _lastFetched;
}
-Milliseconds NewOplogFetcher::_getInitialFindMaxTime() const {
+Milliseconds OplogFetcher::_getInitialFindMaxTime() const {
return Milliseconds(oplogInitialFindMaxSeconds.load() * 1000);
}
-Milliseconds NewOplogFetcher::_getRetriedFindMaxTime() const {
+Milliseconds OplogFetcher::_getRetriedFindMaxTime() const {
return Milliseconds(oplogRetriedFindMaxSeconds.load() * 1000);
}
-void NewOplogFetcher::_finishCallback(Status status) {
+void OplogFetcher::_finishCallback(Status status) {
invariant(isActive());
_onShutdownCallbackFn(status);
@@ -842,7 +488,7 @@ void NewOplogFetcher::_finishCallback(Status status) {
std::swap(_oplogFetcherRestartDecision, oplogFetcherRestartDecision);
}
-void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& callbackData) noexcept {
+void OplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& callbackData) noexcept {
Status responseStatus =
_checkForShutdownAndConvertStatus(callbackData, "error running oplog fetcher");
if (!responseStatus.isOK()) {
@@ -926,7 +572,7 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call
}
}
-Status NewOplogFetcher::_connect() {
+Status OplogFetcher::_connect() {
Status connectStatus = Status::OK();
do {
if (_isShuttingDown()) {
@@ -962,7 +608,7 @@ Status NewOplogFetcher::_connect() {
return connectStatus;
}
-void NewOplogFetcher::_setMetadataWriterAndReader() {
+void OplogFetcher::_setMetadataWriterAndReader() {
invariant(_conn);
_conn->setRequestMetadataWriter([this](OperationContext* opCtx, BSONObjBuilder* metadataBob) {
@@ -979,7 +625,7 @@ void NewOplogFetcher::_setMetadataWriterAndReader() {
});
}
-BSONObj NewOplogFetcher::_makeFindQuery(long long findTimeout) const {
+BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const {
BSONObjBuilder queryBob;
auto lastOpTimeFetched = _getLastOpTimeFetched();
@@ -1004,7 +650,7 @@ BSONObj NewOplogFetcher::_makeFindQuery(long long findTimeout) const {
return queryBob.obj();
}
-void NewOplogFetcher::_createNewCursor(bool initialFind) {
+void OplogFetcher::_createNewCursor(bool initialFind) {
invariant(_conn);
// Set the socket timeout to the 'find' timeout plus a network buffer.
@@ -1028,7 +674,7 @@ void NewOplogFetcher::_createNewCursor(bool initialFind) {
readersCreatedStats.increment();
}
-StatusWith<NewOplogFetcher::Documents> NewOplogFetcher::_getNextBatch() {
+StatusWith<OplogFetcher::Documents> OplogFetcher::_getNextBatch() {
Documents batch;
try {
Timer timer;
@@ -1089,7 +735,7 @@ StatusWith<NewOplogFetcher::Documents> NewOplogFetcher::_getNextBatch() {
return batch;
}
-Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) {
+Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
hangBeforeProcessingSuccessfulBatch.pauseWhileSet();
if (_isShuttingDown()) {
@@ -1195,7 +841,7 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) {
}
}
- auto validateResult = NewOplogFetcher::validateDocuments(
+ auto validateResult = OplogFetcher::validateDocuments(
documents, _firstBatch, lastFetched.getTimestamp(), _startingPoint);
if (!validateResult.isOK()) {
return validateResult.getStatus();
@@ -1281,8 +927,8 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) {
return Status::OK();
}
-bool NewOplogFetcher::OplogFetcherRestartDecisionDefault::shouldContinue(NewOplogFetcher* fetcher,
- Status status) {
+bool OplogFetcher::OplogFetcherRestartDecisionDefault::shouldContinue(OplogFetcher* fetcher,
+ Status status) {
if (_numRestarts == _maxRestarts) {
LOGV2(21274,
"Error returned from oplog query (no more query restarts left): {status}",
@@ -1299,12 +945,11 @@ bool NewOplogFetcher::OplogFetcherRestartDecisionDefault::shouldContinue(NewOplo
return true;
}
-void NewOplogFetcher::OplogFetcherRestartDecisionDefault::fetchSuccessful(
- NewOplogFetcher* fetcher) {
+void OplogFetcher::OplogFetcherRestartDecisionDefault::fetchSuccessful(OplogFetcher* fetcher) {
_numRestarts = 0;
};
-NewOplogFetcher::OplogFetcherRestartDecision::~OplogFetcherRestartDecision(){};
+OplogFetcher::OplogFetcherRestartDecision::~OplogFetcherRestartDecision(){};
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 69af79c6c6f..c24b4309227 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -38,7 +38,7 @@
#include "mongo/client/dbclient_cursor.h"
#include "mongo/client/fetcher.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/repl/abstract_oplog_fetcher.h"
+#include "mongo/db/repl/abstract_async_component.h"
#include "mongo/db/repl/data_replicator_external_state.h"
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/util/fail_point.h"
@@ -49,165 +49,6 @@ namespace repl {
extern FailPoint stopReplProducer;
/**
- * The oplog fetcher, once started, reads operations from a remote oplog using a tailable cursor.
- *
- * The initial find command is generated from last fetched optime and may contain the current term
- * depending on the replica set config provided.
- *
- * Forwards metadata in each find/getMore response to the data replicator external state.
- *
- * Performs additional validation on first batch of operations returned from the query to ensure we
- * are able to continue from our last known fetched operation.
- *
- * Validates each batch of operations.
- *
- * Pushes operations from each batch of operations onto a buffer using the "enqueueDocumentsFn"
- * function.
- *
- * Issues a getMore command after successfully processing each batch of operations.
- *
- * When there is an error or when it is not possible to issue another getMore request, calls
- * "onShutdownCallbackFn" to signal the end of processing.
- *
- * This class subclasses AbstractOplogFetcher which takes care of scheduling the Fetcher and
- * `getMore` commands, and handles restarting on errors.
- */
-class OplogFetcher : public AbstractOplogFetcher {
- OplogFetcher(const OplogFetcher&) = delete;
- OplogFetcher& operator=(const OplogFetcher&) = delete;
-
-public:
- static Seconds kDefaultProtocolZeroAwaitDataTimeout;
-
- /**
- * Statistics on current batch of operations returned by the fetcher.
- */
- struct DocumentsInfo {
- size_t networkDocumentCount = 0;
- size_t networkDocumentBytes = 0;
- size_t toApplyDocumentCount = 0;
- size_t toApplyDocumentBytes = 0;
- OpTime lastDocument = OpTime();
- };
-
- /**
- * An enum that indicates if we want to skip the first document during oplog fetching or not.
- * Currently, the only time we don't want to skip the first document is during initial sync
- * if the sync source has a valid oldest active transaction optime, as we need to include
- * the corresponding oplog entry when applying.
- */
- enum class StartingPoint { kSkipFirstDoc, kEnqueueFirstDoc };
-
- /**
- * Type of function that accepts a pair of iterators into a range of operations
- * within the current batch of results and copies the operations into
- * a buffer to be consumed by the next stage of the replication process.
- *
- * Additional information on the operations is provided in a DocumentsInfo
- * struct.
- */
- using EnqueueDocumentsFn = std::function<Status(Fetcher::Documents::const_iterator begin,
- Fetcher::Documents::const_iterator end,
- const DocumentsInfo& info)>;
-
- /**
- * Validates documents in current batch of results returned from tailing the remote oplog.
- * 'first' should be set to true if this set of documents is the first batch returned from the
- * query.
- * On success, returns statistics on operations.
- */
- static StatusWith<DocumentsInfo> validateDocuments(
- const Fetcher::Documents& documents,
- bool first,
- Timestamp lastTS,
- StartingPoint startingPoint = StartingPoint::kSkipFirstDoc);
-
- /**
- * Invariants if validation fails on any of the provided arguments.
- */
- OplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- NamespaceString nss,
- ReplSetConfig config,
- std::size_t maxFetcherRestarts,
- int requiredRBID,
- bool requireFresherSyncSource,
- DataReplicatorExternalState* dataReplicatorExternalState,
- EnqueueDocumentsFn enqueueDocumentsFn,
- OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- StartingPoint startingPoint = StartingPoint::kSkipFirstDoc);
-
- OplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- NamespaceString nss,
- ReplSetConfig config,
- std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
- DataReplicatorExternalState* dataReplicatorExternalState,
- EnqueueDocumentsFn enqueueDocumentsFn,
- OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- StartingPoint startingPoint = StartingPoint::kSkipFirstDoc);
-
- virtual ~OplogFetcher();
-
- // ================== Test support API ===================
-
- /**
- * Returns metadata object sent in remote commands.
- */
- BSONObj getMetadataObject_forTest() const;
-
- /**
- * Returns timeout for remote commands to complete.
- */
- Milliseconds getRemoteCommandTimeout_forTest() const;
-
- /**
- * Returns the await data timeout used for the "maxTimeMS" field in getMore command requests.
- */
- Milliseconds getAwaitDataTimeout_forTest() const;
-
-private:
- BSONObj _makeFindCommandObject(const NamespaceString& nss,
- OpTime lastOpTimeFetched,
- Milliseconds findMaxTime) const override;
-
- BSONObj _makeMetadataObject() const override;
-
- Milliseconds _getGetMoreMaxTime() const override;
-
- /**
- * This function is run by the AbstractOplogFetcher on a successful batch of oplog entries.
- */
- StatusWith<BSONObj> _onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) override;
-
- // The metadata object sent with the Fetcher queries.
- const BSONObj _metadataObject;
-
- // Rollback ID that the sync source is required to have after the first batch.
- int _requiredRBID;
-
- // A boolean indicating whether we should error if the sync source is not ahead of our initial
- // last fetched OpTime on the first batch. Most of the time this should be set to true,
- // but there are certain special cases, namely during initial sync, where it's acceptable for
- // our sync source to have no ops newer than _lastFetched.
- bool _requireFresherSyncSource;
-
- DataReplicatorExternalState* const _dataReplicatorExternalState;
- const EnqueueDocumentsFn _enqueueDocumentsFn;
- const Milliseconds _awaitDataTimeout;
- const int _batchSize;
-
- // Indicates if we want to skip the first document during oplog fetching or not.
- StartingPoint _startingPoint;
-};
-
-/**
* The oplog fetcher, once started, reads operations from a remote oplog using a tailable,
* awaitData, exhaust cursor.
*
@@ -231,47 +72,10 @@ private:
* "onShutdownCallbackFn" to signal the end of processing.
*
* An oplog fetcher is an abstract async component, which takes care of startup and shutdown logic.
- *
- * TODO SERVER-45574: edit or remove this flowchart when the NewOplogFetcher is implemented.
- *
- * NewOplogFetcher flowchart:
- *
- * _runQuery()
- * |
- * |
- * +---------+
- * |
- * |
- * V
- * _createNewCursor()
- * |
- * |
- * +<--------------------------+
- * | ^
- * | |
- * _getNextBatch() |
- * | | |
- * | | |
- * (unsuccessful batch | | (successful batch) |
- * or error) | | |
- * | V |
- * | _onSuccessfulBatch() |
- * | | |
- * | | |
- * | | |
- * V | |
- * _createNewCursor() | |
- * | | |
- * | | |
- * +---V---+ |
- * | |
- * | |
- * +-------------------------->+
- *
*/
-class NewOplogFetcher : public AbstractAsyncComponent {
- NewOplogFetcher(const OplogFetcher&) = delete;
- NewOplogFetcher& operator=(const OplogFetcher&) = delete;
+class OplogFetcher : public AbstractAsyncComponent {
+ OplogFetcher(const OplogFetcher&) = delete;
+ OplogFetcher& operator=(const OplogFetcher&) = delete;
public:
/**
@@ -330,27 +134,25 @@ public:
* Defines which situations the oplog fetcher will restart after encountering an error.
* Called when getting the next batch failed for some reason.
*/
- virtual bool shouldContinue(NewOplogFetcher* fetcher, Status status) = 0;
+ virtual bool shouldContinue(OplogFetcher* fetcher, Status status) = 0;
/**
* Called when a batch was successfully fetched to reset any state needed to track restarts.
*/
- virtual void fetchSuccessful(NewOplogFetcher* fetcher) = 0;
+ virtual void fetchSuccessful(OplogFetcher* fetcher) = 0;
};
class OplogFetcherRestartDecisionDefault : public OplogFetcherRestartDecision {
public:
OplogFetcherRestartDecisionDefault(std::size_t maxRestarts) : _maxRestarts(maxRestarts){};
- bool shouldContinue(NewOplogFetcher* fetcher, Status status) final;
+ bool shouldContinue(OplogFetcher* fetcher, Status status) final;
- void fetchSuccessful(NewOplogFetcher* fetcher) final;
+ void fetchSuccessful(OplogFetcher* fetcher) final;
~OplogFetcherRestartDecisionDefault(){};
private:
- NewOplogFetcher* _newOplogFetcher;
-
// Restarts since the last successful oplog query response.
std::size_t _numRestarts = 0;
@@ -360,20 +162,20 @@ public:
/**
* Invariants if validation fails on any of the provided arguments.
*/
- NewOplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
- std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
- DataReplicatorExternalState* dataReplicatorExternalState,
- EnqueueDocumentsFn enqueueDocumentsFn,
- OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- StartingPoint startingPoint = StartingPoint::kSkipFirstDoc);
-
- virtual ~NewOplogFetcher();
+ OplogFetcher(executor::TaskExecutor* executor,
+ OpTime lastFetched,
+ HostAndPort source,
+ ReplSetConfig config,
+ std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
+ int requiredRBID,
+ bool requireFresherSyncSource,
+ DataReplicatorExternalState* dataReplicatorExternalState,
+ EnqueueDocumentsFn enqueueDocumentsFn,
+ OnShutdownCallbackFn onShutdownCallbackFn,
+ const int batchSize,
+ StartingPoint startingPoint = StartingPoint::kSkipFirstDoc);
+
+ virtual ~OplogFetcher();
/**
* Validates documents in current batch of results returned from tailing the remote oplog.
diff --git a/src/mongo/db/repl/oplog_fetcher_mock.cpp b/src/mongo/db/repl/oplog_fetcher_mock.cpp
index 8b69d109661..f375b6c6eb0 100644
--- a/src/mongo/db/repl/oplog_fetcher_mock.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_mock.cpp
@@ -50,21 +50,21 @@ OplogFetcherMock::OplogFetcherMock(
OnShutdownCallbackFn onShutdownCallbackFn,
const int batchSize,
StartingPoint startingPoint)
- : NewOplogFetcher(executor,
- lastFetched,
- std::move(source),
- std::move(config),
- // Pass a dummy OplogFetcherRestartDecision to the base OplogFetcher.
- std::make_unique<OplogFetcherRestartDecisionDefault>(0),
- requiredRBID,
- requireFresherSyncSource,
- dataReplicatorExternalState,
- // Pass a dummy EnqueueDocumentsFn to the base OplogFetcher.
- [](const auto& a1, const auto& a2, const auto& a3) { return Status::OK(); },
- // Pass a dummy OnShutdownCallbackFn to the base OplogFetcher.
- [](const auto& a) {},
- batchSize,
- startingPoint),
+ : OplogFetcher(executor,
+ lastFetched,
+ std::move(source),
+ std::move(config),
+ // Pass a dummy OplogFetcherRestartDecision to the base OplogFetcher.
+ std::make_unique<OplogFetcherRestartDecisionDefault>(0),
+ requiredRBID,
+ requireFresherSyncSource,
+ dataReplicatorExternalState,
+ // Pass a dummy EnqueueDocumentsFn to the base OplogFetcher.
+ [](const auto& a1, const auto& a2, const auto& a3) { return Status::OK(); },
+ // Pass a dummy OnShutdownCallbackFn to the base OplogFetcher.
+ [](const auto& a) {},
+ batchSize,
+ startingPoint),
_oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)),
_onShutdownCallbackFn(std::move(onShutdownCallbackFn)),
_enqueueDocumentsFn(std::move(enqueueDocumentsFn)),
@@ -88,7 +88,7 @@ void OplogFetcherMock::receiveBatch(CursorId cursorId, Fetcher::Documents docume
_oplogFetcherRestartDecision->fetchSuccessful(this);
}
- auto validateResult = NewOplogFetcher::validateDocuments(
+ auto validateResult = OplogFetcher::validateDocuments(
documents, _first, _getLastOpTimeFetched().getTimestamp(), _startingPoint);
// Set _first to false after receiving the first batch.
diff --git a/src/mongo/db/repl/oplog_fetcher_mock.h b/src/mongo/db/repl/oplog_fetcher_mock.h
index cf2350e2b41..02a367b2ea9 100644
--- a/src/mongo/db/repl/oplog_fetcher_mock.h
+++ b/src/mongo/db/repl/oplog_fetcher_mock.h
@@ -33,7 +33,7 @@
namespace mongo {
namespace repl {
-class OplogFetcherMock : public NewOplogFetcher {
+class OplogFetcherMock : public OplogFetcher {
public:
explicit OplogFetcherMock(
executor::TaskExecutor* executor,
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index a3f9b5cb242..144de2076bd 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -27,22 +27,19 @@
* it in the license file.
*/
-#include "mongo/platform/basic.h"
-
#include <memory>
-#include "mongo/db/repl/abstract_oplog_fetcher_test_fixture.h"
#include "mongo/db/repl/data_replicator_external_state_mock.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/task_executor_mock.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/dbtests/mock/mock_dbclient_connection.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/metadata.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/unittest/death_test.h"
-#include "mongo/unittest/ensure_fcv.h"
#include "mongo/unittest/task_executor_proxy.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/fail_point.h"
@@ -54,94 +51,6 @@ using namespace mongo;
using namespace mongo::repl;
using namespace unittest;
-using executor::RemoteCommandRequest;
-using executor::RemoteCommandResponse;
-using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard;
-
-class OplogFetcherTest : public AbstractOplogFetcherTest {
-protected:
- void setUp() override;
-
- /**
- * Starts an oplog fetcher. Processes a single batch of results from
- * the oplog query and shuts down.
- * Returns shutdown state.
- */
-
- // 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use.
- const int defaultBatchSize = (16 * 1024 * 1024) / 12 * 10;
-
- std::unique_ptr<ShutdownState> processSingleBatch(executor::RemoteCommandResponse response,
- bool requireFresherSyncSource = true);
- std::unique_ptr<ShutdownState> processSingleBatch(BSONObj obj,
- bool requireFresherSyncSource = true);
-
- /**
- * Makes an OplogQueryMetadata object with the given fields and a stale committed OpTime.
- */
- BSONObj makeOplogQueryMetadataObject(OpTime lastAppliedOpTime,
- int rbid,
- int primaryIndex,
- int syncSourceIndex);
-
- /**
- * Tests checkSyncSource result handling.
- */
- void testSyncSourceChecking(rpc::ReplSetMetadata* replMetadata,
- rpc::OplogQueryMetadata* oqMetadata);
-
- /**
- * Tests handling of two batches of operations returned from query.
- * Returns getMore request.
- */
- RemoteCommandRequest testTwoBatchHandling();
-
- OpTime remoteNewerOpTime;
- OpTime staleOpTime;
- Date_t staleWallTime;
- int rbid;
-
- std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState;
-
- Fetcher::Documents lastEnqueuedDocuments;
- OplogFetcher::DocumentsInfo lastEnqueuedDocumentsInfo;
- OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn;
-
- std::unique_ptr<OplogFetcher> makeOplogFetcher(ReplSetConfig config);
-};
-
-void OplogFetcherTest::setUp() {
- AbstractOplogFetcherTest::setUp();
-
- remoteNewerOpTime = {{124, 1}, 2};
- staleOpTime = {{1, 1}, 0};
- staleWallTime = Date_t() + Seconds(staleOpTime.getSecs());
- rbid = 2;
-
- dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateMock>();
- dataReplicatorExternalState->currentTerm = lastFetched.getTerm();
- dataReplicatorExternalState->lastCommittedOpTime = {{9999, 0}, lastFetched.getTerm()};
-
- enqueueDocumentsFn = [this](Fetcher::Documents::const_iterator begin,
- Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo& info) -> Status {
- lastEnqueuedDocuments = {begin, end};
- lastEnqueuedDocumentsInfo = info;
- return Status::OK();
- };
-}
-
-BSONObj OplogFetcherTest::makeOplogQueryMetadataObject(OpTime lastAppliedOpTime,
- int rbid,
- int primaryIndex,
- int syncSourceIndex) {
- rpc::OplogQueryMetadata oqMetadata(
- {staleOpTime, staleWallTime}, lastAppliedOpTime, rbid, primaryIndex, syncSourceIndex);
- BSONObjBuilder bob;
- ASSERT_OK(oqMetadata.writeToMetadata(&bob));
- return bob.obj();
-}
-
HostAndPort source("localhost:12345");
NamespaceString nss("local.oplog.rs");
@@ -166,869 +75,12 @@ ReplSetConfig _createConfig() {
return config;
}
-std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(RemoteCommandResponse response,
- bool requireFresherSyncSource) {
- auto shutdownState = std::make_unique<ShutdownState>();
-
- OplogFetcher oplogFetcher(&getExecutor(),
- lastFetched,
- source,
- nss,
- _createConfig(),
- 0,
- rbid,
- requireFresherSyncSource,
- dataReplicatorExternalState.get(),
- enqueueDocumentsFn,
- std::ref(*shutdownState),
- defaultBatchSize);
-
- ASSERT_FALSE(oplogFetcher.isActive());
- ASSERT_OK(oplogFetcher.startup());
- ASSERT_TRUE(oplogFetcher.isActive());
-
- auto request = processNetworkResponse(response);
-
- ASSERT_BSONOBJ_EQ(oplogFetcher.getCommandObject_forTest(), request.cmdObj);
- ASSERT_BSONOBJ_EQ(oplogFetcher.getMetadataObject_forTest(), request.metadata);
-
- oplogFetcher.shutdown();
- oplogFetcher.join();
-
- return shutdownState;
-}
-
-std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(BSONObj obj,
- bool requireFresherSyncSource) {
- return processSingleBatch({obj, Milliseconds(0)}, requireFresherSyncSource);
-}
-
-void _checkDefaultCommandObjectFields(BSONObj cmdObj) {
- ASSERT_EQUALS(std::string("find"), cmdObj.firstElementFieldName());
- ASSERT_TRUE(cmdObj.getBoolField("tailable"));
- ASSERT_TRUE(cmdObj.getBoolField("oplogReplay"));
- ASSERT_TRUE(cmdObj.getBoolField("awaitData"));
- ASSERT_EQUALS(60000, cmdObj.getIntField("maxTimeMS"));
-}
-
-std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcher(ReplSetConfig config) {
- return std::make_unique<OplogFetcher>(&getExecutor(),
- lastFetched,
- source,
- nss,
- config,
- 0,
- -1,
- true,
- dataReplicatorExternalState.get(),
- enqueueDocumentsFn,
- [](Status) {},
- defaultBatchSize);
-}
-
BSONObj concatenate(BSONObj a, const BSONObj& b) {
auto bob = BSONObjBuilder(std::move(a));
bob.appendElements(b);
return bob.obj();
}
-TEST_F(
- OplogFetcherTest,
- FindQueryContainsTermAndStartTimestampIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) {
- auto cmdObj = makeOplogFetcher(_createConfig())->getFindQuery_forTest();
- ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type());
- ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())),
- cmdObj["filter"].Obj());
- ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, cmdObj["term"].numberLong());
- _checkDefaultCommandObjectFields(cmdObj);
-}
-
-TEST_F(OplogFetcherTest,
- FindQueryDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm) {
- dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm;
- auto cmdObj = makeOplogFetcher(_createConfig())->getFindQuery_forTest();
- ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type());
- ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())),
- cmdObj["filter"].Obj());
- ASSERT_FALSE(cmdObj.hasField("term"));
- _checkDefaultCommandObjectFields(cmdObj);
-}
-
-TEST_F(OplogFetcherTest, MetadataObjectContainsMetadataFieldsUnderProtocolVersion1) {
- auto metadataObj = makeOplogFetcher(_createConfig())->getMetadataObject_forTest();
- ASSERT_EQUALS(3, metadataObj.nFields());
- ASSERT_EQUALS(1, metadataObj[rpc::kReplSetMetadataFieldName].numberInt());
- ASSERT_EQUALS(1, metadataObj[rpc::kOplogQueryMetadataFieldName].numberInt());
-}
-
-TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeoutUnderProtocolVersion1) {
- auto config = _createConfig();
- auto timeout = makeOplogFetcher(config)->getAwaitDataTimeout_forTest();
- ASSERT_EQUALS(config.getElectionTimeoutPeriod() / 2, timeout);
-}
-
-TEST_F(OplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher) {
- auto shutdownState =
- processSingleBatch({concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
- BSON(rpc::kReplSetMetadataFieldName
- << BSON("invalid_repl_metadata_field" << 1))),
- Milliseconds(0)});
-
- ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus());
-}
-
-TEST_F(OplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher) {
- auto shutdownState =
- processSingleBatch({concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
- BSON(rpc::kOplogQueryMetadataFieldName
- << BSON("invalid_oq_metadata_field" << 1))),
- Milliseconds(0)});
-
- ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus());
-}
-
-DEATH_TEST_F(OplogFetcherTest,
- ValidMetadataInResponseWithoutOplogMetadataInvariants,
- "Invariant failure oqMetadata") {
- rpc::ReplSetMetadata metadata(
- 1, {lastFetched, lastFetchedWall}, lastFetched, 1, OID::gen(), 2, 2);
- BSONObjBuilder bob;
- ASSERT_OK(metadata.writeToMetadata(&bob));
- auto metadataObj = bob.obj();
-
- processSingleBatch(
- {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj),
- Milliseconds(0)});
-}
-
-TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) {
- rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2);
- BSONObjBuilder bob;
- ASSERT_OK(replMetadata.writeToMetadata(&bob));
- ASSERT_OK(oqMetadata.writeToMetadata(&bob));
- auto metadataObj = bob.obj();
- ASSERT_OK(
- processSingleBatch(
- {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj),
- Milliseconds(0)})
- ->getStatus());
- ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed);
- ASSERT_EQUALS(replMetadata.getPrimaryIndex(),
- dataReplicatorExternalState->replMetadataProcessed.getPrimaryIndex());
- ASSERT_EQUALS(oqMetadata.getPrimaryIndex(),
- dataReplicatorExternalState->oqMetadataProcessed.getPrimaryIndex());
-}
-
-TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) {
- rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata(
- {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid + 1, 2, 2);
- BSONObjBuilder bob;
- ASSERT_OK(replMetadata.writeToMetadata(&bob));
- ASSERT_OK(oqMetadata.writeToMetadata(&bob));
- auto metadataObj = bob.obj();
-
- ASSERT_EQUALS(
- ErrorCodes::InvalidSyncSource,
- processSingleBatch(
- {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj),
- Milliseconds(0)})
- ->getStatus());
- ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
- ASSERT(lastEnqueuedDocuments.empty());
-}
-
-TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) {
- rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2);
- BSONObjBuilder bob;
- ASSERT_OK(replMetadata.writeToMetadata(&bob));
- ASSERT_OK(oqMetadata.writeToMetadata(&bob));
- auto metadataObj = bob.obj();
-
- ASSERT_EQUALS(
- ErrorCodes::InvalidSyncSource,
- processSingleBatch(
- {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj),
- Milliseconds(0)})
- ->getStatus());
- ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
- ASSERT(lastEnqueuedDocuments.empty());
-}
-
-TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) {
- rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2);
- BSONObjBuilder bob;
- ASSERT_OK(replMetadata.writeToMetadata(&bob));
- ASSERT_OK(oqMetadata.writeToMetadata(&bob));
- auto metadataObj = bob.obj();
-
- ASSERT_EQUALS(
- ErrorCodes::InvalidSyncSource,
- processSingleBatch(
- {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj),
- Milliseconds(0)})
- ->getStatus());
- ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
- ASSERT(lastEnqueuedDocuments.empty());
-}
-
-TEST_F(OplogFetcherTest,
- MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) {
- rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2);
- BSONObjBuilder bob;
- ASSERT_OK(replMetadata.writeToMetadata(&bob));
- ASSERT_OK(oqMetadata.writeToMetadata(&bob));
- auto metadataObj = bob.obj();
-
- auto entry = makeNoopOplogEntry(staleOpTime);
- ASSERT_EQUALS(
- ErrorCodes::InvalidSyncSource,
- processSingleBatch(
- {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false)
- ->getStatus());
- ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
- ASSERT(lastEnqueuedDocuments.empty());
-}
-
-TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButMetadataIsStale) {
- // This tests the case where the sync source metadata is behind us but we get a document which
- // is equal to us. Since that means the metadata is stale and can be ignored, we should accept
- // this sync source.
- rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2);
- BSONObjBuilder bob;
- ASSERT_OK(replMetadata.writeToMetadata(&bob));
- ASSERT_OK(oqMetadata.writeToMetadata(&bob));
- auto metadataObj = bob.obj();
-
- auto entry = makeNoopOplogEntry(lastFetched);
- auto shutdownState = processSingleBatch(
- {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false);
- ASSERT_OK(shutdownState->getStatus());
- ASSERT(dataReplicatorExternalState->metadataWasProcessed);
-}
-
-TEST_F(OplogFetcherTest,
- MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) {
- rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2);
- BSONObjBuilder bob;
- ASSERT_OK(replMetadata.writeToMetadata(&bob));
- ASSERT_OK(oqMetadata.writeToMetadata(&bob));
- auto metadataObj = bob.obj();
-
- auto entry = makeNoopOplogEntry(lastFetched);
- auto shutdownState = processSingleBatch(
- {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false);
- ASSERT_OK(shutdownState->getStatus());
- ASSERT(dataReplicatorExternalState->metadataWasProcessed);
-}
-
-TEST_F(OplogFetcherTest,
- MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) {
- rpc::ReplSetMetadata metadata(
- 1, {lastFetched, lastFetchedWall}, lastFetched, 1, OID::gen(), 2, 2);
- BSONObjBuilder bob;
- ASSERT_OK(metadata.writeToMetadata(&bob));
- auto metadataObj = bob.obj();
- ASSERT_EQUALS(
- ErrorCodes::OplogStartMissing,
- processSingleBatch(
- {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456))}), metadataObj),
- Milliseconds(0)})
- ->getStatus());
- ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
-}
-
-TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) {
- rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2);
- BSONObjBuilder bob;
- ASSERT_OK(replMetadata.writeToMetadata(&bob));
- ASSERT_OK(oqMetadata.writeToMetadata(&bob));
- auto metadataObj = bob.obj();
- ASSERT_EQUALS(
- ErrorCodes::OplogStartMissing,
- processSingleBatch(
- {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456))}), metadataObj),
- Milliseconds(0)})
- ->getStatus());
- ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
-}
-
-TEST_F(OplogFetcherTest, EmptyMetadataIsNotProcessed) {
- ASSERT_OK(processSingleBatch(
- {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), Milliseconds(0)})
- ->getStatus());
- ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
-}
-
-TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingError) {
- ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
- processSingleBatch(makeCursorResponse(0, {}))->getStatus());
-}
-
-TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) {
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
- ASSERT_EQUALS(ErrorCodes::InvalidBSON,
- processSingleBatch({concatenate(makeCursorResponse(0, {BSONObj()}), metadataObj),
- Milliseconds(0)})
- ->getStatus());
-}
-
-TEST_F(
- OplogFetcherTest,
- LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
- ASSERT_EQUALS(
- ErrorCodes::OplogStartMissing,
- processSingleBatch(
- {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456))}), metadataObj),
- Milliseconds(0)})
- ->getStatus());
-}
-
-TEST_F(OplogFetcherTest,
- MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) {
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
- ASSERT_EQUALS(
- ErrorCodes::NoSuchKey,
- processSingleBatch(
- {concatenate(makeCursorResponse(0,
- {makeNoopOplogEntry(lastFetched),
- BSON("o" << BSON("msg"
- << "oplog entry without optime"))}),
- metadataObj),
- Milliseconds(0)})
- ->getStatus());
-}
-
-TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) {
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
- ASSERT_EQUALS(
- ErrorCodes::OplogOutOfOrder,
- processSingleBatch({concatenate(makeCursorResponse(0,
- {makeNoopOplogEntry(lastFetched),
- makeNoopOplogEntry(Seconds(1000)),
- makeNoopOplogEntry(Seconds(2000)),
- makeNoopOplogEntry(Seconds(1500))}),
- metadataObj),
- Milliseconds(0)})
- ->getStatus());
-}
-
-TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) {
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
-
- auto firstEntry = makeNoopOplogEntry(lastFetched);
- auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()});
- Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry};
-
- auto shutdownState = processSingleBatch(
- {concatenate(makeCursorResponse(0, documents), metadataObj), Milliseconds(0)});
-
- ASSERT_EQUALS(2U, lastEnqueuedDocuments.size());
- ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]);
- ASSERT_BSONOBJ_EQ(thirdEntry, lastEnqueuedDocuments[1]);
-
- ASSERT_EQUALS(3U, lastEnqueuedDocumentsInfo.networkDocumentCount);
- ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
- lastEnqueuedDocumentsInfo.networkDocumentBytes);
-
- ASSERT_EQUALS(2U, lastEnqueuedDocumentsInfo.toApplyDocumentCount);
- ASSERT_EQUALS(size_t(secondEntry.objsize() + thirdEntry.objsize()),
- lastEnqueuedDocumentsInfo.toApplyDocumentBytes);
-
- ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)),
- lastEnqueuedDocumentsInfo.lastDocument);
-
- // The last fetched optime should be updated after pushing the operations into the
- // buffer and reflected in the shutdown callback arguments.
- ASSERT_OK(shutdownState->getStatus());
-}
-
-TEST_F(OplogFetcherTest,
- OplogFetcherShouldNotDuplicateFirstDocWithEnqueueFirstDocOnErrorAfterFirstDoc) {
-
- // This function verifies that every oplog entry is only enqueued once.
- OpTime lastEnqueuedOpTime = OpTime();
- enqueueDocumentsFn = [&lastEnqueuedOpTime](Fetcher::Documents::const_iterator begin,
- Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo&) -> Status {
- auto count = 0;
- auto toEnqueueOpTime = OpTime();
-
- for (auto i = begin; i != end; ++i) {
- count++;
-
- toEnqueueOpTime = OplogEntry(*i).getOpTime();
- ASSERT_GREATER_THAN(toEnqueueOpTime, lastEnqueuedOpTime);
- lastEnqueuedOpTime = toEnqueueOpTime;
- }
-
- ASSERT_EQ(1, count);
- return Status::OK();
- };
-
- auto shutdownState = std::make_unique<ShutdownState>();
- OplogFetcher oplogFetcher(&getExecutor(),
- lastFetched,
- source,
- nss,
- _createConfig(),
- 1 /* maxFetcherRestarts */,
- rbid,
- false /* requireFresherSyncSource */,
- dataReplicatorExternalState.get(),
- enqueueDocumentsFn,
- std::ref(*shutdownState),
- defaultBatchSize,
- OplogFetcher::StartingPoint::kEnqueueFirstDoc);
-
- ASSERT_FALSE(oplogFetcher.isActive());
- ASSERT_OK(oplogFetcher.startup());
- ASSERT_TRUE(oplogFetcher.isActive());
-
- auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()});
- auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
-
- // Only send over the first entry. Save the second for the getMore request.
- processNetworkResponse(
- {concatenate(makeCursorResponse(22L, {firstEntry}), metadataObj), Milliseconds(0)}, true);
-
- // Simulate an error right before receiving the second entry.
- auto request = processNetworkResponse(RemoteCommandResponse(ErrorCodes::QueryPlanKilled,
- "Simulating failure for test.",
- Milliseconds(0)),
- true);
- ASSERT_EQUALS(std::string("getMore"), request.cmdObj.firstElementFieldName());
-
- // Resend all data for the retry. The enqueueDocumentsFn will check to make sure that
- // the first entry was not enqueued twice.
- request = processNetworkResponse(
- {concatenate(makeCursorResponse(0, {firstEntry, secondEntry}), metadataObj),
- Milliseconds(0)},
- false);
-
- ASSERT_EQUALS(std::string("find"), request.cmdObj.firstElementFieldName());
- ASSERT_EQUALS("oplog.rs", request.cmdObj["find"].String());
-
- ASSERT(request.cmdObj["filter"].ok());
- ASSERT(request.cmdObj["filter"]["ts"].ok());
- ASSERT(request.cmdObj["filter"]["ts"]["$gte"].ok());
- ASSERT_EQUALS(firstEntry["ts"].timestamp(), request.cmdObj["filter"]["ts"]["$gte"].timestamp());
-
- oplogFetcher.join();
- ASSERT_OK(shutdownState->getStatus());
-}
-
-TEST_F(OplogFetcherTest,
- OplogFetcherShouldNotDuplicateFirstDocWithEnqueueFirstDocOnErrorAfterSecondDoc) {
-
- // This function verifies that every oplog entry is only enqueued once.
- OpTime lastEnqueuedOpTime = OpTime();
- enqueueDocumentsFn = [&lastEnqueuedOpTime](Fetcher::Documents::const_iterator begin,
- Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo&) -> Status {
- auto count = 0;
- auto toEnqueueOpTime = OpTime();
-
- for (auto i = begin; i != end; ++i) {
- count++;
-
- toEnqueueOpTime = OplogEntry(*i).getOpTime();
- ASSERT_GREATER_THAN(toEnqueueOpTime, lastEnqueuedOpTime);
- lastEnqueuedOpTime = toEnqueueOpTime;
- }
-
- ASSERT_NOT_GREATER_THAN(count, 2);
- return Status::OK();
- };
-
- auto shutdownState = std::make_unique<ShutdownState>();
- OplogFetcher oplogFetcher(&getExecutor(),
- lastFetched,
- source,
- nss,
- _createConfig(),
- 1 /* maxFetcherRestarts */,
- rbid,
- false /* requireFresherSyncSource */,
- dataReplicatorExternalState.get(),
- enqueueDocumentsFn,
- std::ref(*shutdownState),
- defaultBatchSize,
- OplogFetcher::StartingPoint::kEnqueueFirstDoc);
-
- ASSERT_FALSE(oplogFetcher.isActive());
- ASSERT_OK(oplogFetcher.startup());
- ASSERT_TRUE(oplogFetcher.isActive());
-
- auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()});
- auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()});
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
-
- // Only send over the first two entries. Save the third for the getMore request.
- processNetworkResponse(
- {concatenate(makeCursorResponse(22L, {firstEntry, secondEntry}), metadataObj),
- Milliseconds(0)},
- true);
-
- // Simulate an error right before receiving the third entry.
- auto request = processNetworkResponse(RemoteCommandResponse(ErrorCodes::QueryPlanKilled,
- "Simulating failure for test.",
- Milliseconds(0)),
- true);
- ASSERT_EQUALS(std::string("getMore"), request.cmdObj.firstElementFieldName());
-
- // Resend all data for the retry. The enqueueDocumentsFn will check to make sure that
- // the first entry was not enqueued twice.
- request = processNetworkResponse(
- {concatenate(makeCursorResponse(0, {secondEntry, thirdEntry}), metadataObj),
- Milliseconds(0)},
- false);
-
- ASSERT_EQUALS(std::string("find"), request.cmdObj.firstElementFieldName());
- ASSERT_EQUALS("oplog.rs", request.cmdObj["find"].String());
-
- ASSERT(request.cmdObj["filter"].ok());
- ASSERT(request.cmdObj["filter"]["ts"].ok());
- ASSERT(request.cmdObj["filter"]["ts"]["$gte"].ok());
- ASSERT_EQUALS(secondEntry["ts"].timestamp(),
- request.cmdObj["filter"]["ts"]["$gte"].timestamp());
-
- oplogFetcher.join();
- ASSERT_OK(shutdownState->getStatus());
-}
-
-TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) {
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
-
- auto firstEntry = makeNoopOplogEntry(lastFetched);
- auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()});
- Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry};
-
- enqueueDocumentsFn = [](Fetcher::Documents::const_iterator,
- Fetcher::Documents::const_iterator,
- const OplogFetcher::DocumentsInfo&) -> Status {
- return Status(ErrorCodes::InternalError, "my custom error");
- };
-
- auto shutdownState = processSingleBatch(
- {concatenate(makeCursorResponse(0, documents), metadataObj), Milliseconds(0)});
- ASSERT_EQ(shutdownState->getStatus(), Status(ErrorCodes::InternalError, "my custom error"));
-}
-
-void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* replMetadata,
- rpc::OplogQueryMetadata* oqMetadata) {
- auto firstEntry = makeNoopOplogEntry(lastFetched);
- auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
- auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()});
- Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry};
-
- BSONObjBuilder bob;
- if (replMetadata) {
- ASSERT_OK(replMetadata->writeToMetadata(&bob));
- }
- if (oqMetadata) {
- ASSERT_OK(oqMetadata->writeToMetadata(&bob));
- }
- BSONObj metadataObj = bob.obj();
-
- dataReplicatorExternalState->shouldStopFetchingResult = true;
-
- auto shutdownState = processSingleBatch(
- {concatenate(makeCursorResponse(0, documents), metadataObj), Milliseconds(0)});
-
- // Sync source checking happens after we have successfully pushed the operations into
- // the buffer for the next replication phase (eg. applier).
- // The last fetched optime should be reflected in the shutdown callback arguments.
- ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus());
-}
-
-TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) {
- testSyncSourceChecking(nullptr, nullptr);
-
- // Sync source optime and "hasSyncSource" are not available if the response does not
- // contain metadata.
- ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
- ASSERT_EQUALS(OpTime(), dataReplicatorExternalState->syncSourceLastOpTime);
- ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
-}
-
-TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) {
- rpc::ReplSetMetadata replMetadata(
- lastFetched.getTerm(), {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1);
- OpTime committedOpTime = {{Seconds(10000), 0}, 1};
- rpc::OplogQueryMetadata oqMetadata(
- {committedOpTime, Date_t() + Seconds(committedOpTime.getSecs())},
- {{Seconds(20000), 0}, 1},
- rbid,
- 2,
- 2);
-
- testSyncSourceChecking(&replMetadata, &oqMetadata);
-
- // Sync source optime and "hasSyncSource" can be set if the respone contains metadata.
- ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
- ASSERT_EQUALS(oqMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime);
- ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource);
-}
-
-TEST_F(OplogFetcherTest,
- FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) {
- OpTime committedOpTime = {{Seconds(10000), 0}, 1};
- rpc::ReplSetMetadata replMetadata(
- lastFetched.getTerm(),
- {committedOpTime, Date_t() + Seconds(committedOpTime.getSecs())},
- {{Seconds(20000), 0}, 1},
- 1,
- OID::gen(),
- 2,
- 2);
- rpc::OplogQueryMetadata oqMetadata(
- {committedOpTime, Date_t() + Seconds(committedOpTime.getSecs())},
- {{Seconds(20000), 0}, 1},
- rbid,
- 2,
- -1);
-
- testSyncSourceChecking(&replMetadata, &oqMetadata);
-
- // Sync source "hasSyncSource" is derived from metadata.
- ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
- ASSERT_EQUALS(oqMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime);
- ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
-}
-
-RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling() {
- ShutdownState shutdownState;
-
- OplogFetcher oplogFetcher(&getExecutor(),
- lastFetched,
- source,
- nss,
- _createConfig(),
- 0,
- rbid,
- true,
- dataReplicatorExternalState.get(),
- enqueueDocumentsFn,
- std::ref(shutdownState),
- defaultBatchSize);
- ASSERT_EQUALS(OplogFetcher::State::kPreStart, oplogFetcher.getState_forTest());
-
- ASSERT_OK(oplogFetcher.startup());
- ASSERT_EQUALS(OplogFetcher::State::kRunning, oplogFetcher.getState_forTest());
-
- CursorId cursorId = 22LL;
- auto firstEntry = makeNoopOplogEntry(lastFetched);
- auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
-
- auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
- processNetworkResponse(
- {concatenate(makeCursorResponse(cursorId, {firstEntry, secondEntry}), metadataObj),
- Milliseconds(0)},
- true);
-
- ASSERT_EQUALS(1U, lastEnqueuedDocuments.size());
- ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]);
-
- // Set cursor ID to 0 in getMore response to indicate no more data available.
- auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()});
- auto fourthEntry = makeNoopOplogEntry({{Seconds(1200), 0}, lastFetched.getTerm()});
- auto request = processNetworkResponse(makeCursorResponse(0, {thirdEntry, fourthEntry}, false));
-
- ASSERT_EQUALS(std::string("getMore"), request.cmdObj.firstElementFieldName());
- ASSERT_EQUALS(nss.coll(), request.cmdObj["collection"].String());
- ASSERT_EQUALS(int(durationCount<Milliseconds>(oplogFetcher.getAwaitDataTimeout_forTest())),
- request.cmdObj.getIntField("maxTimeMS"));
-
- ASSERT_EQUALS(2U, lastEnqueuedDocuments.size());
- ASSERT_BSONOBJ_EQ(thirdEntry, lastEnqueuedDocuments[0]);
- ASSERT_BSONOBJ_EQ(fourthEntry, lastEnqueuedDocuments[1]);
-
- oplogFetcher.join();
- ASSERT_EQUALS(OplogFetcher::State::kComplete, oplogFetcher.getState_forTest());
-
- ASSERT_OK(shutdownState.getStatus());
-
- return request;
-}
-
-TEST_F(
- OplogFetcherTest,
- NoDataAvailableAfterFirstTwoBatchesShouldCauseTheOplogFetcherToShutDownWithSuccessfulStatus) {
- auto request = testTwoBatchHandling();
- ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, request.cmdObj["term"].numberLong());
- ASSERT_EQUALS(dataReplicatorExternalState->lastCommittedOpTime,
- unittest::assertGet(OpTime::parseFromOplogEntry(
- request.cmdObj["lastKnownCommittedOpTime"].Obj())));
-}
-
-TEST_F(OplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) {
- auto firstEntry = makeNoopOplogEntry(Seconds(123));
- auto secondEntry = BSON("o" << BSON("msg"
- << "oplog entry without optime"));
-
- ASSERT_EQUALS(ErrorCodes::NoSuchKey,
- OplogFetcher::validateDocuments(
- {firstEntry, secondEntry},
- true,
- unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
- .getStatus());
-}
-
-TEST_F(
- OplogFetcherTest,
- ValidateDocumentsReturnsOutOfOrderIfTimestampInFirstEntryIsEqualToLastTimestampAndNotProcessingFirstBatch) {
- auto firstEntry = makeNoopOplogEntry(Seconds(123));
- auto secondEntry = makeNoopOplogEntry(Seconds(456));
-
- ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
- OplogFetcher::validateDocuments(
- {firstEntry, secondEntry},
- false,
- unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
- .getStatus());
-}
-
-TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInSecondEntryIsBeforeFirst) {
- auto firstEntry = makeNoopOplogEntry(Seconds(456));
- auto secondEntry = makeNoopOplogEntry(Seconds(123));
-
- ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
- OplogFetcher::validateDocuments(
- {firstEntry, secondEntry},
- true,
- unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
- .getStatus());
-}
-
-TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEntryIsBeforeSecond) {
- auto firstEntry = makeNoopOplogEntry(Seconds(123));
- auto secondEntry = makeNoopOplogEntry(Seconds(789));
- auto thirdEntry = makeNoopOplogEntry(Seconds(456));
-
- ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
- OplogFetcher::validateDocuments(
- {firstEntry, secondEntry, thirdEntry},
- true,
- unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
- .getStatus());
-}
-
-TEST_F(
- OplogFetcherTest,
- ValidateDocumentsExcludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatchAndSkipFirstDoc) {
- auto firstEntry = makeNoopOplogEntry(Seconds(123));
- auto secondEntry = makeNoopOplogEntry(Seconds(456));
- auto thirdEntry = makeNoopOplogEntry(Seconds(789));
-
- auto info = unittest::assertGet(OplogFetcher::validateDocuments(
- {firstEntry, secondEntry, thirdEntry},
- true,
- unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp(),
- mongo::repl::OplogFetcher::StartingPoint::kSkipFirstDoc));
-
- ASSERT_EQUALS(3U, info.networkDocumentCount);
- ASSERT_EQUALS(2U, info.toApplyDocumentCount);
- ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
- info.networkDocumentBytes);
- ASSERT_EQUALS(size_t(secondEntry.objsize() + thirdEntry.objsize()), info.toApplyDocumentBytes);
-
- ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument);
-}
-
-TEST_F(
- OplogFetcherTest,
- ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatchAndEnqueueFirstDoc) {
- auto firstEntry = makeNoopOplogEntry(Seconds(123));
- auto secondEntry = makeNoopOplogEntry(Seconds(456));
- auto thirdEntry = makeNoopOplogEntry(Seconds(789));
-
- auto info = unittest::assertGet(OplogFetcher::validateDocuments(
- {firstEntry, secondEntry, thirdEntry},
- true,
- unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp(),
- mongo::repl::OplogFetcher::StartingPoint::kEnqueueFirstDoc));
-
- ASSERT_EQUALS(3U, info.networkDocumentCount);
- ASSERT_EQUALS(3U, info.toApplyDocumentCount);
- ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
- info.networkDocumentBytes);
- ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
- info.toApplyDocumentBytes);
-
- ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument);
-}
-
-TEST_F(OplogFetcherTest,
- ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfNotProcessingFirstBatch) {
- auto firstEntry = makeNoopOplogEntry(Seconds(123));
- auto secondEntry = makeNoopOplogEntry(Seconds(456));
- auto thirdEntry = makeNoopOplogEntry(Seconds(789));
-
- auto info = unittest::assertGet(OplogFetcher::validateDocuments(
- {firstEntry, secondEntry, thirdEntry}, false, Timestamp(Seconds(100), 0)));
-
- ASSERT_EQUALS(3U, info.networkDocumentCount);
- ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()),
- info.networkDocumentBytes);
-
- ASSERT_EQUALS(info.networkDocumentCount, info.toApplyDocumentCount);
- ASSERT_EQUALS(info.networkDocumentBytes, info.toApplyDocumentBytes);
-
- ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument);
-}
-
-TEST_F(OplogFetcherTest,
- ValidateDocumentsReturnsDefaultLastDocumentOpTimeWhenThereAreNoDocumentsToApply) {
- auto firstEntry = makeNoopOplogEntry(Seconds(123));
-
- auto info = unittest::assertGet(OplogFetcher::validateDocuments(
- {firstEntry},
- true,
- unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()));
-
- ASSERT_EQUALS(1U, info.networkDocumentCount);
- ASSERT_EQUALS(size_t(firstEntry.objsize()), info.networkDocumentBytes);
-
- ASSERT_EQUALS(0U, info.toApplyDocumentCount);
- ASSERT_EQUALS(0U, info.toApplyDocumentBytes);
-
- ASSERT_EQUALS(OpTime(), info.lastDocument);
-}
-
-TEST_F(OplogFetcherTest,
- ValidateDocumentsReturnsOplogStartMissingWhenThereAreNoDocumentsWhenProcessingFirstBatch) {
- ASSERT_EQUALS(
- ErrorCodes::OplogStartMissing,
- OplogFetcher::validateDocuments({}, true, Timestamp(Seconds(100), 0)).getStatus());
-}
-
-TEST_F(OplogFetcherTest,
- ValidateDocumentsReturnsDefaultInfoWhenThereAreNoDocumentsWhenNotProcessingFirstBatch) {
- auto info =
- unittest::assertGet(OplogFetcher::validateDocuments({}, false, Timestamp(Seconds(100), 0)));
-
- ASSERT_EQUALS(0U, info.networkDocumentCount);
- ASSERT_EQUALS(0U, info.networkDocumentBytes);
-
- ASSERT_EQUALS(0U, info.toApplyDocumentCount);
- ASSERT_EQUALS(0U, info.toApplyDocumentBytes);
-
- ASSERT_EQUALS(OpTime(), info.lastDocument);
-}
-
BSONObj makeNoopOplogEntry(OpTime opTime) {
auto oplogEntry =
repl::OplogEntry(opTime, // optime
@@ -1067,14 +119,14 @@ BSONObj makeOplogBatchMetadata(boost::optional<const rpc::ReplSetMetadata&> repl
}
Message makeFirstBatch(CursorId cursorId,
- const NewOplogFetcher::Documents& oplogEntries,
+ const OplogFetcher::Documents& oplogEntries,
const BSONObj& metadata) {
return MockDBClientConnection::mockFindResponse(
NamespaceString::kRsOplogNamespace, cursorId, oplogEntries, metadata);
}
Message makeSubsequentBatch(CursorId cursorId,
- const NewOplogFetcher::Documents& oplogEntries,
+ const OplogFetcher::Documents& oplogEntries,
const BSONObj& metadata,
bool moreToCome) {
return MockDBClientConnection::mockGetMoreResponse(
@@ -1182,8 +234,39 @@ void simulateNetworkDisconnect(DBClientConnection* conn) {
mockConn->shutdown();
}
-class NewOplogFetcherTest : public executor::ThreadPoolExecutorTest,
- public ScopedGlobalServiceContextForTest {
+class ShutdownState {
+ ShutdownState(const ShutdownState&) = delete;
+ ShutdownState& operator=(const ShutdownState&) = delete;
+
+public:
+ ShutdownState();
+
+ /**
+ * Returns the status at shutdown.
+ */
+ Status getStatus() const;
+
+ /**
+ * Use this for oplog fetcher shutdown callback.
+ */
+ void operator()(const Status& status);
+
+private:
+ Status _status = executor::TaskExecutorTest::getDetectableErrorStatus();
+};
+
+ShutdownState::ShutdownState() = default;
+
+Status ShutdownState::getStatus() const {
+ return _status;
+}
+
+void ShutdownState::operator()(const Status& status) {
+ _status = status;
+}
+
+class OplogFetcherTest : public executor::ThreadPoolExecutorTest,
+ public ScopedGlobalServiceContextForTest {
protected:
static const OpTime remoteNewerOpTime;
static const OpTime staleOpTime;
@@ -1200,20 +283,18 @@ protected:
void setUp() override;
- std::unique_ptr<NewOplogFetcher> makeOplogFetcher();
- std::unique_ptr<NewOplogFetcher> makeOplogFetcherWithDifferentExecutor(
+ std::unique_ptr<OplogFetcher> makeOplogFetcher();
+ std::unique_ptr<OplogFetcher> makeOplogFetcherWithDifferentExecutor(
executor::TaskExecutor* executor,
- NewOplogFetcher::OnShutdownCallbackFn fn,
+ OplogFetcher::OnShutdownCallbackFn fn,
int numRestarts = 0,
bool requireFresherSyncSource = true,
- NewOplogFetcher::StartingPoint startingPoint =
- NewOplogFetcher::StartingPoint::kSkipFirstDoc);
- std::unique_ptr<NewOplogFetcher> getOplogFetcherAfterConnectionCreated(
- NewOplogFetcher::OnShutdownCallbackFn fn,
+ OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc);
+ std::unique_ptr<OplogFetcher> getOplogFetcherAfterConnectionCreated(
+ OplogFetcher::OnShutdownCallbackFn fn,
int numRestarts = 0,
bool requireFresherSyncSource = true,
- NewOplogFetcher::StartingPoint startingPoint =
- NewOplogFetcher::StartingPoint::kSkipFirstDoc);
+ OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc);
std::unique_ptr<ShutdownState> processSingleBatch(const Message& response,
bool shouldShutdown = false,
@@ -1226,13 +307,13 @@ protected:
void testSyncSourceChecking(boost::optional<const rpc::ReplSetMetadata&> replMetadata,
boost::optional<const rpc::OplogQueryMetadata&> oqMetadata);
- void validateLastBatch(bool skipFirstDoc, NewOplogFetcher::Documents docs, OpTime lastFetched);
+ void validateLastBatch(bool skipFirstDoc, OplogFetcher::Documents docs, OpTime lastFetched);
std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState;
- NewOplogFetcher::Documents lastEnqueuedDocuments;
- NewOplogFetcher::DocumentsInfo lastEnqueuedDocumentsInfo;
- NewOplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn;
+ OplogFetcher::Documents lastEnqueuedDocuments;
+ OplogFetcher::DocumentsInfo lastEnqueuedDocumentsInfo;
+ OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn;
// The last OpTime fetched by the oplog fetcher.
OpTime lastFetched;
@@ -1240,20 +321,20 @@ protected:
std::unique_ptr<MockRemoteDBServer> _mockServer;
};
-const int NewOplogFetcherTest::rbid;
-const OpTime NewOplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(1000, 1), 2);
-const rpc::OplogQueryMetadata NewOplogFetcherTest::oqMetadata = rpc::OplogQueryMetadata(
+const int OplogFetcherTest::rbid;
+const OpTime OplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(1000, 1), 2);
+const rpc::OplogQueryMetadata OplogFetcherTest::oqMetadata = rpc::OplogQueryMetadata(
{staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, primaryIndex, syncSourceIndex);
-const OpTime NewOplogFetcherTest::staleOpTime = OpTime(Timestamp(1, 1), 0);
-const Date_t NewOplogFetcherTest::staleWallTime = Date_t() + Seconds(staleOpTime.getSecs());
-const rpc::OplogQueryMetadata NewOplogFetcherTest::staleOqMetadata = rpc::OplogQueryMetadata(
+const OpTime OplogFetcherTest::staleOpTime = OpTime(Timestamp(1, 1), 0);
+const Date_t OplogFetcherTest::staleWallTime = Date_t() + Seconds(staleOpTime.getSecs());
+const rpc::OplogQueryMetadata OplogFetcherTest::staleOqMetadata = rpc::OplogQueryMetadata(
{staleOpTime, staleWallTime}, staleOpTime, rbid, primaryIndex, syncSourceIndex);
-const rpc::ReplSetMetadata NewOplogFetcherTest::replSetMetadata =
+const rpc::ReplSetMetadata OplogFetcherTest::replSetMetadata =
rpc::ReplSetMetadata(1, OpTimeAndWallTime(), OpTime(), 1, OID(), primaryIndex, syncSourceIndex);
-void NewOplogFetcherTest::setUp() {
+void OplogFetcherTest::setUp() {
executor::ThreadPoolExecutorTest::setUp();
launchExecutorThread();
@@ -1263,9 +344,9 @@ void NewOplogFetcherTest::setUp() {
dataReplicatorExternalState->currentTerm = lastFetched.getTerm();
dataReplicatorExternalState->lastCommittedOpTime = {{9999, 0}, lastFetched.getTerm()};
- enqueueDocumentsFn = [this](NewOplogFetcher::Documents::const_iterator begin,
- NewOplogFetcher::Documents::const_iterator end,
- const NewOplogFetcher::DocumentsInfo& info) -> Status {
+ enqueueDocumentsFn = [this](OplogFetcher::Documents::const_iterator begin,
+ OplogFetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info) -> Status {
lastEnqueuedDocuments = {begin, end};
lastEnqueuedDocumentsInfo = info;
return Status::OK();
@@ -1279,15 +360,15 @@ void NewOplogFetcherTest::setUp() {
oplogFetcherUsesExhaust = true;
}
-std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcher() {
+std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcher() {
return makeOplogFetcherWithDifferentExecutor(&getExecutor(), [](Status) {});
}
-std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::getOplogFetcherAfterConnectionCreated(
- NewOplogFetcher::OnShutdownCallbackFn fn,
+std::unique_ptr<OplogFetcher> OplogFetcherTest::getOplogFetcherAfterConnectionCreated(
+ OplogFetcher::OnShutdownCallbackFn fn,
int numRestarts,
bool requireFresherSyncSource,
- NewOplogFetcher::StartingPoint startingPoint) {
+ OplogFetcher::StartingPoint startingPoint) {
auto oplogFetcher = makeOplogFetcherWithDifferentExecutor(
&getExecutor(), fn, numRestarts, requireFresherSyncSource, startingPoint);
@@ -1305,18 +386,18 @@ std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::getOplogFetcherAfterConnec
return oplogFetcher;
}
-std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDifferentExecutor(
+std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExecutor(
executor::TaskExecutor* executor,
- NewOplogFetcher::OnShutdownCallbackFn fn,
+ OplogFetcher::OnShutdownCallbackFn fn,
int numRestarts,
bool requireFresherSyncSource,
- NewOplogFetcher::StartingPoint startingPoint) {
- auto oplogFetcher = std::make_unique<NewOplogFetcher>(
+ OplogFetcher::StartingPoint startingPoint) {
+ auto oplogFetcher = std::make_unique<OplogFetcher>(
executor,
lastFetched,
source,
_createConfig(),
- std::make_unique<NewOplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts),
+ std::make_unique<OplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts),
rbid,
requireFresherSyncSource,
dataReplicatorExternalState.get(),
@@ -1332,11 +413,10 @@ std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDiffer
return oplogFetcher;
}
-std::unique_ptr<ShutdownState> NewOplogFetcherTest::processSingleBatch(
- const Message& response,
- bool shouldShutdown,
- bool requireFresherSyncSource,
- bool lastFetchedShouldAdvance) {
+std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(const Message& response,
+ bool shouldShutdown,
+ bool requireFresherSyncSource,
+ bool lastFetchedShouldAdvance) {
auto shutdownState = std::make_unique<ShutdownState>();
// Create an oplog fetcher with no retries.
@@ -1366,7 +446,7 @@ std::unique_ptr<ShutdownState> NewOplogFetcherTest::processSingleBatch(
return shutdownState;
}
-void NewOplogFetcherTest::testSyncSourceChecking(
+void OplogFetcherTest::testSyncSourceChecking(
boost::optional<const rpc::ReplSetMetadata&> replMetadata,
boost::optional<const rpc::OplogQueryMetadata&> oqMetadata) {
auto firstEntry = makeNoopOplogEntry(lastFetched);
@@ -1383,9 +463,9 @@ void NewOplogFetcherTest::testSyncSourceChecking(
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus());
}
-void NewOplogFetcherTest::validateLastBatch(bool skipFirstDoc,
- NewOplogFetcher::Documents docs,
- OpTime lastFetched) {
+void OplogFetcherTest::validateLastBatch(bool skipFirstDoc,
+ OplogFetcher::Documents docs,
+ OpTime lastFetched) {
auto docs_iter = docs.begin();
auto enqueue_iter = lastEnqueuedDocuments.begin();
@@ -1403,7 +483,7 @@ void NewOplogFetcherTest::validateLastBatch(bool skipFirstDoc,
ASSERT_EQUALS(docs.back()["ts"].timestamp(), lastFetched.getTimestamp());
}
-TEST_F(NewOplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) {
+TEST_F(OplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) {
getExecutor().shutdown();
auto oplogFetcher = makeOplogFetcher();
@@ -1419,7 +499,7 @@ TEST_F(NewOplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromSta
ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest());
}
-TEST_F(NewOplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToScheduleRunQuery) {
+TEST_F(OplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToScheduleRunQuery) {
TaskExecutorMock taskExecutorMock(&getExecutor());
taskExecutorMock.shouldFailScheduleWorkRequest = []() { return true; };
@@ -1438,7 +518,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToS
ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest());
}
-TEST_F(NewOplogFetcherTest, ShuttingExecutorDownAfterStartupButBeforeRunQueryScheduled) {
+TEST_F(OplogFetcherTest, ShuttingExecutorDownAfterStartupButBeforeRunQueryScheduled) {
ShutdownState shutdownState;
// Defer scheduling work so that the executor's shutdown happens before startup's work is
@@ -1460,7 +540,7 @@ TEST_F(NewOplogFetcherTest, ShuttingExecutorDownAfterStartupButBeforeRunQuerySch
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeRunQueryScheduled) {
+TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeRunQueryScheduled) {
ShutdownState shutdownState;
// Defer scheduling work so that the oplog fetcher's shutdown happens before startup's work is
@@ -1482,7 +562,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeR
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterRunQueryScheduled) {
+TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterRunQueryScheduled) {
// Tests shutting down after _runQuery is scheduled (but not while blocked on the network).
ShutdownState shutdownState;
@@ -1507,7 +587,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterRu
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
OplogFetcherReturnsHostUnreachableIfShutdownAfterRunQueryScheduledWhileBlockedOnCall) {
// Tests that shutting down while the connection is blocked on call successfully shuts down the
// connection as well.
@@ -1531,7 +611,7 @@ TEST_F(NewOplogFetcherTest,
ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
OplogFetcherReturnsCallbackCanceledIfShutdownAfterGettingBatchBeforeProcessing) {
// Tests shutting down after getting the first batch, but before enqueuing it.
@@ -1596,7 +676,7 @@ public:
}
};
-TEST_F(NewOplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFnOnCompletion) {
+TEST_F(OplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFnOnCompletion) {
auto sharedCallbackData = std::make_shared<SharedCallbackState>();
auto callbackInvoked = false;
auto status = getDetectableErrorStatus();
@@ -1630,7 +710,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFnOnCompletion)
ASSERT_TRUE(sharedCallbackStateDestroyedSoon());
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
FindQueryContainsTermIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) {
// Test that the correct maxTimeMS is set if this is the initial 'find' query.
auto oplogFetcher = makeOplogFetcher();
@@ -1649,7 +729,7 @@ TEST_F(NewOplogFetcherTest,
ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, queryObj["term"].numberLong());
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
FindQueryDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm) {
dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm;
auto oplogFetcher = makeOplogFetcher();
@@ -1671,7 +751,7 @@ TEST_F(NewOplogFetcherTest,
}
TEST_F(
- NewOplogFetcherTest,
+ OplogFetcherTest,
GetMoreQueryDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm) {
dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm;
@@ -1705,13 +785,13 @@ TEST_F(
ASSERT_OK(shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeout) {
+TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeout) {
auto config = _createConfig();
auto timeout = makeOplogFetcher()->getAwaitDataTimeout_forTest();
ASSERT_EQUALS(config.getElectionTimeoutPeriod() / 2, timeout);
}
-TEST_F(NewOplogFetcherTest, AwaitDataTimeoutSmallerWhenFailPointSet) {
+TEST_F(OplogFetcherTest, AwaitDataTimeoutSmallerWhenFailPointSet) {
auto failPoint = globalFailPointRegistry().find("setSmallOplogGetMoreMaxTimeMS");
failPoint->setMode(FailPoint::alwaysOn);
auto timeout = makeOplogFetcher()->getAwaitDataTimeout_forTest();
@@ -1719,7 +799,7 @@ TEST_F(NewOplogFetcherTest, AwaitDataTimeoutSmallerWhenFailPointSet) {
failPoint->setMode(FailPoint::off);
}
-TEST_F(NewOplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher) {
+TEST_F(OplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher) {
CursorId cursorId = 22LL;
auto entry = makeNoopOplogEntry(lastFetched);
auto metadataObj =
@@ -1728,7 +808,7 @@ TEST_F(NewOplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher
processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus());
}
-TEST_F(NewOplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher) {
+TEST_F(OplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher) {
CursorId cursorId = 22LL;
auto entry = makeNoopOplogEntry(lastFetched);
auto metadataObj =
@@ -1737,7 +817,7 @@ TEST_F(NewOplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetc
processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus());
}
-DEATH_TEST_F(NewOplogFetcherTest,
+DEATH_TEST_F(OplogFetcherTest,
ValidMetadataInResponseWithoutOplogMetadataInvariants,
"Invariant failure oqMetadata") {
CursorId cursorId = 22LL;
@@ -1747,7 +827,7 @@ DEATH_TEST_F(NewOplogFetcherTest,
processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj));
}
-TEST_F(NewOplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) {
+TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) {
CursorId cursorId = 0LL;
auto entry = makeNoopOplogEntry(lastFetched);
auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata);
@@ -1760,7 +840,7 @@ TEST_F(NewOplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProces
dataReplicatorExternalState->oqMetadataProcessed.getPrimaryIndex());
}
-TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) {
+TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) {
CursorId cursorId = 22LL;
auto entry = makeNoopOplogEntry(lastFetched);
@@ -1775,7 +855,7 @@ TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBa
ASSERT(lastEnqueuedDocuments.empty());
}
-TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) {
+TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) {
CursorId cursorId = 22LL;
auto entry = makeNoopOplogEntry(staleOpTime);
auto metadataObj = makeOplogBatchMetadata(replSetMetadata, staleOqMetadata);
@@ -1787,7 +867,7 @@ TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehin
ASSERT(lastEnqueuedDocuments.empty());
}
-TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) {
+TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) {
CursorId cursorId = 22LL;
auto entry = makeNoopOplogEntry(lastFetched);
@@ -1802,7 +882,7 @@ TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAh
ASSERT(lastEnqueuedDocuments.empty());
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) {
CursorId cursorId = 22LL;
auto entry = makeNoopOplogEntry(staleOpTime);
@@ -1818,7 +898,7 @@ TEST_F(NewOplogFetcherTest,
ASSERT(lastEnqueuedDocuments.empty());
}
-TEST_F(NewOplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButMetadataIsStale) {
+TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButMetadataIsStale) {
// This tests the case where the sync source metadata is behind us but we get a document which
// is equal to us. Since that means the metadata is stale and can be ignored, we should accept
// this sync source.
@@ -1834,7 +914,7 @@ TEST_F(NewOplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentB
ASSERT(dataReplicatorExternalState->metadataWasProcessed);
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) {
CursorId cursorId = 0LL;
rpc::OplogQueryMetadata oplogQueryMetadata(
@@ -1849,7 +929,7 @@ TEST_F(NewOplogFetcherTest,
ASSERT(dataReplicatorExternalState->metadataWasProcessed);
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) {
CursorId cursorId = 22LL;
auto metadataObj = makeOplogBatchMetadata(replSetMetadata, boost::none);
@@ -1860,7 +940,7 @@ TEST_F(NewOplogFetcherTest,
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
}
-TEST_F(NewOplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) {
+TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) {
CursorId cursorId = 22LL;
auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata);
auto entry = makeNoopOplogEntry(Seconds(456));
@@ -1871,7 +951,7 @@ TEST_F(NewOplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) {
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
}
-TEST_F(NewOplogFetcherTest, EmptyMetadataIsNotProcessed) {
+TEST_F(OplogFetcherTest, EmptyMetadataIsNotProcessed) {
CursorId cursorId = 0LL;
auto entry = makeNoopOplogEntry(lastFetched);
@@ -1879,11 +959,11 @@ TEST_F(NewOplogFetcherTest, EmptyMetadataIsNotProcessed) {
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
}
-TEST_F(NewOplogFetcherTest, FailingInitialCreateNewCursorNoRetriesShutsDownOplogFetcher) {
+TEST_F(OplogFetcherTest, FailingInitialCreateNewCursorNoRetriesShutsDownOplogFetcher) {
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, processSingleBatch(Message())->getStatus());
}
-TEST_F(NewOplogFetcherTest, FailingInitialCreateNewCursorWithRetriesShutsDownOplogFetcher) {
+TEST_F(OplogFetcherTest, FailingInitialCreateNewCursorWithRetriesShutsDownOplogFetcher) {
ShutdownState shutdownState;
// Create an oplog fetcher with one retry.
@@ -1900,7 +980,7 @@ TEST_F(NewOplogFetcherTest, FailingInitialCreateNewCursorWithRetriesShutsDownOpl
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
NetworkExceptionDuringInitialCreateNewCursorWithRetriesShutsDownOplogFetcher) {
ShutdownState shutdownState;
@@ -1923,7 +1003,7 @@ TEST_F(NewOplogFetcherTest,
ASSERT_EQUALS(ErrorCodes::NetworkTimeout, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, DontRecreateNewCursorAfterFailedBatchNoRetries) {
+TEST_F(OplogFetcherTest, DontRecreateNewCursorAfterFailedBatchNoRetries) {
ShutdownState shutdownState;
// Create an oplog fetcher without any retries.
@@ -1961,7 +1041,7 @@ TEST_F(NewOplogFetcherTest, DontRecreateNewCursorAfterFailedBatchNoRetries) {
ASSERT_EQUALS(ErrorCodes::NetworkTimeout, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, FailCreateNewCursorAfterFailedBatchRetriesShutsDownOplogFetcher) {
+TEST_F(OplogFetcherTest, FailCreateNewCursorAfterFailedBatchRetriesShutsDownOplogFetcher) {
ShutdownState shutdownState;
// Create an oplog fetcher with one retry.
@@ -2002,7 +1082,7 @@ TEST_F(NewOplogFetcherTest, FailCreateNewCursorAfterFailedBatchRetriesShutsDownO
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, SuccessfullyRecreateCursorAfterFailedBatch) {
+TEST_F(OplogFetcherTest, SuccessfullyRecreateCursorAfterFailedBatch) {
// This tests that the oplog fetcher successfully can recreate a cursor after it failed to get
// a batch and makes sure the recreated cursor behaves like an exhaust cursor. This will also
// check that the socket timeouts are set as expected. The steps are:
@@ -2157,7 +1237,7 @@ TEST_F(NewOplogFetcherTest, SuccessfullyRecreateCursorAfterFailedBatch) {
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, SuccessfulBatchResetsNumRestarts) {
+TEST_F(OplogFetcherTest, SuccessfulBatchResetsNumRestarts) {
// This tests that the OplogFetcherRestartDecision resets its counter when the oplog fetcher
// successfully gets the next batch. The steps are:
// 1. Start the oplog fetcher.
@@ -2243,7 +1323,7 @@ TEST_F(NewOplogFetcherTest, SuccessfulBatchResetsNumRestarts) {
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, OplogFetcherWorksWithoutExhaust) {
+TEST_F(OplogFetcherTest, OplogFetcherWorksWithoutExhaust) {
// Test that the oplog fetcher works if the 'oplogFetcherUsesExhaust' server parameter is set to
// false.
@@ -2327,7 +1407,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherWorksWithoutExhaust) {
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatus) {
+TEST_F(OplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatus) {
ShutdownState shutdownState;
// Create an oplog fetcher with one retry.
@@ -2359,14 +1439,13 @@ TEST_F(NewOplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatu
ASSERT_OK(shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingError) {
+TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingError) {
CursorId cursorId = 22LL;
ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
processSingleBatch(makeFirstBatch(cursorId, {}, {}))->getStatus());
}
-TEST_F(NewOplogFetcherTest,
- MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) {
+TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) {
CursorId cursorId = 22LL;
auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
ASSERT_EQUALS(
@@ -2375,7 +1454,7 @@ TEST_F(NewOplogFetcherTest,
}
TEST_F(
- NewOplogFetcherTest,
+ OplogFetcherTest,
LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
CursorId cursorId = 22LL;
auto entry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
@@ -2384,7 +1463,7 @@ TEST_F(
processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus());
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) {
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
@@ -2399,8 +1478,7 @@ TEST_F(NewOplogFetcherTest,
->getStatus());
}
-TEST_F(NewOplogFetcherTest,
- TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) {
+TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) {
CursorId cursorId = 22LL;
auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
@@ -2413,8 +1491,7 @@ TEST_F(NewOplogFetcherTest,
->getStatus());
}
-TEST_F(NewOplogFetcherTest,
- OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) {
+TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) {
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
@@ -2445,14 +1522,14 @@ TEST_F(NewOplogFetcherTest,
ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState->getStatus());
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
OplogFetcherShouldNotDuplicateFirstDocWithEnqueueFirstDocOnErrorAfterFirstDoc) {
// This function verifies that every oplog entry is only enqueued once.
OpTime lastEnqueuedOpTime = OpTime();
- enqueueDocumentsFn = [&lastEnqueuedOpTime](NewOplogFetcher::Documents::const_iterator begin,
- NewOplogFetcher::Documents::const_iterator end,
- const NewOplogFetcher::DocumentsInfo&) -> Status {
+ enqueueDocumentsFn = [&lastEnqueuedOpTime](OplogFetcher::Documents::const_iterator begin,
+ OplogFetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo&) -> Status {
auto count = 0;
auto toEnqueueOpTime = OpTime();
@@ -2475,7 +1552,7 @@ TEST_F(NewOplogFetcherTest,
getOplogFetcherAfterConnectionCreated(std::ref(*shutdownState),
1,
true /* requireFresherSyncSource */,
- NewOplogFetcher::StartingPoint::kEnqueueFirstDoc);
+ OplogFetcher::StartingPoint::kEnqueueFirstDoc);
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()});
@@ -2519,14 +1596,14 @@ TEST_F(NewOplogFetcherTest,
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState->getStatus());
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
OplogFetcherShouldNotDuplicateFirstDocWithEnqueueFirstDocOnErrorAfterSecondDoc) {
// This function verifies that every oplog entry is only enqueued once.
OpTime lastEnqueuedOpTime = OpTime();
- enqueueDocumentsFn = [&lastEnqueuedOpTime](NewOplogFetcher::Documents::const_iterator begin,
- NewOplogFetcher::Documents::const_iterator end,
- const NewOplogFetcher::DocumentsInfo&) -> Status {
+ enqueueDocumentsFn = [&lastEnqueuedOpTime](OplogFetcher::Documents::const_iterator begin,
+ OplogFetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo&) -> Status {
auto count = 0;
auto toEnqueueOpTime = OpTime();
@@ -2549,7 +1626,7 @@ TEST_F(NewOplogFetcherTest,
getOplogFetcherAfterConnectionCreated(std::ref(*shutdownState),
1,
true /* requireFresherSyncSource */,
- NewOplogFetcher::StartingPoint::kEnqueueFirstDoc);
+ OplogFetcher::StartingPoint::kEnqueueFirstDoc);
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()});
@@ -2594,15 +1671,15 @@ TEST_F(NewOplogFetcherTest,
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState->getStatus());
}
-TEST_F(NewOplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromEnqueueDocumentsFn) {
+TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromEnqueueDocumentsFn) {
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata);
- enqueueDocumentsFn = [](NewOplogFetcher::Documents::const_iterator,
- NewOplogFetcher::Documents::const_iterator,
- const NewOplogFetcher::DocumentsInfo&) -> Status {
+ enqueueDocumentsFn = [](OplogFetcher::Documents::const_iterator,
+ OplogFetcher::Documents::const_iterator,
+ const OplogFetcher::DocumentsInfo&) -> Status {
return Status(ErrorCodes::InternalError, "my custom error");
};
@@ -2611,7 +1688,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromEnqueueDocum
ASSERT_EQ(Status(ErrorCodes::InternalError, "my custom error"), shutdownState->getStatus());
}
-TEST_F(NewOplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) {
+TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) {
testSyncSourceChecking(boost::none, boost::none);
// Sync source optime and "hasSyncSource" are not available if the response does not
@@ -2621,7 +1698,7 @@ TEST_F(NewOplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFet
ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
}
-TEST_F(NewOplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) {
+TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) {
testSyncSourceChecking(replSetMetadata, oqMetadata);
// Sync source optime and "hasSyncSource" can be set if the respone contains metadata.
@@ -2630,7 +1707,7 @@ TEST_F(NewOplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogF
ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource);
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) {
rpc::OplogQueryMetadata oplogQueryMetadata(
{staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, primaryIndex, -1);
@@ -2643,13 +1720,13 @@ TEST_F(NewOplogFetcherTest,
ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
}
-TEST_F(NewOplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) {
+TEST_F(OplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) {
auto firstEntry = makeNoopOplogEntry(Seconds(123));
auto secondEntry = BSON("o" << BSON("msg"
<< "oplog entry without optime"));
ASSERT_EQUALS(ErrorCodes::NoSuchKey,
- NewOplogFetcher::validateDocuments(
+ OplogFetcher::validateDocuments(
{firstEntry, secondEntry},
true,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
@@ -2657,40 +1734,38 @@ TEST_F(NewOplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFou
}
TEST_F(
- NewOplogFetcherTest,
+ OplogFetcherTest,
ValidateDocumentsReturnsOutOfOrderIfTimestampInFirstEntryIsEqualToLastTimestampAndNotProcessingFirstBatch) {
auto firstEntry = makeNoopOplogEntry(Seconds(123));
auto secondEntry = makeNoopOplogEntry(Seconds(456));
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
- NewOplogFetcher::validateDocuments(
+ OplogFetcher::validateDocuments(
{firstEntry, secondEntry},
false,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
.getStatus());
}
-TEST_F(NewOplogFetcherTest,
- ValidateDocumentsReturnsOutOfOrderIfTimestampInSecondEntryIsBeforeFirst) {
+TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInSecondEntryIsBeforeFirst) {
auto firstEntry = makeNoopOplogEntry(Seconds(456));
auto secondEntry = makeNoopOplogEntry(Seconds(123));
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
- NewOplogFetcher::validateDocuments(
+ OplogFetcher::validateDocuments(
{firstEntry, secondEntry},
true,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
.getStatus());
}
-TEST_F(NewOplogFetcherTest,
- ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEntryIsBeforeSecond) {
+TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEntryIsBeforeSecond) {
auto firstEntry = makeNoopOplogEntry(Seconds(123));
auto secondEntry = makeNoopOplogEntry(Seconds(789));
auto thirdEntry = makeNoopOplogEntry(Seconds(456));
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
- NewOplogFetcher::validateDocuments(
+ OplogFetcher::validateDocuments(
{firstEntry, secondEntry, thirdEntry},
true,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())
@@ -2698,17 +1773,17 @@ TEST_F(NewOplogFetcherTest,
}
TEST_F(
- NewOplogFetcherTest,
+ OplogFetcherTest,
ValidateDocumentsExcludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatchAndSkipFirstDoc) {
auto firstEntry = makeNoopOplogEntry(Seconds(123));
auto secondEntry = makeNoopOplogEntry(Seconds(456));
auto thirdEntry = makeNoopOplogEntry(Seconds(789));
- auto info = unittest::assertGet(NewOplogFetcher::validateDocuments(
+ auto info = unittest::assertGet(OplogFetcher::validateDocuments(
{firstEntry, secondEntry, thirdEntry},
true,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp(),
- mongo::repl::NewOplogFetcher::StartingPoint::kSkipFirstDoc));
+ mongo::repl::OplogFetcher::StartingPoint::kSkipFirstDoc));
ASSERT_EQUALS(3U, info.networkDocumentCount);
ASSERT_EQUALS(2U, info.toApplyDocumentCount);
@@ -2720,17 +1795,17 @@ TEST_F(
}
TEST_F(
- NewOplogFetcherTest,
+ OplogFetcherTest,
ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatchAndEnqueueFirstDoc) {
auto firstEntry = makeNoopOplogEntry(Seconds(123));
auto secondEntry = makeNoopOplogEntry(Seconds(456));
auto thirdEntry = makeNoopOplogEntry(Seconds(789));
- auto info = unittest::assertGet(NewOplogFetcher::validateDocuments(
+ auto info = unittest::assertGet(OplogFetcher::validateDocuments(
{firstEntry, secondEntry, thirdEntry},
true,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp(),
- mongo::repl::NewOplogFetcher::StartingPoint::kEnqueueFirstDoc));
+ mongo::repl::OplogFetcher::StartingPoint::kEnqueueFirstDoc));
ASSERT_EQUALS(3U, info.networkDocumentCount);
ASSERT_EQUALS(3U, info.toApplyDocumentCount);
@@ -2742,13 +1817,13 @@ TEST_F(
ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument);
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfNotProcessingFirstBatch) {
auto firstEntry = makeNoopOplogEntry(Seconds(123));
auto secondEntry = makeNoopOplogEntry(Seconds(456));
auto thirdEntry = makeNoopOplogEntry(Seconds(789));
- auto info = unittest::assertGet(NewOplogFetcher::validateDocuments(
+ auto info = unittest::assertGet(OplogFetcher::validateDocuments(
{firstEntry, secondEntry, thirdEntry}, false, Timestamp(Seconds(100), 0)));
ASSERT_EQUALS(3U, info.networkDocumentCount);
@@ -2761,11 +1836,11 @@ TEST_F(NewOplogFetcherTest,
ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument);
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
ValidateDocumentsReturnsDefaultLastDocumentOpTimeWhenThereAreNoDocumentsToApply) {
auto firstEntry = makeNoopOplogEntry(Seconds(123));
- auto info = unittest::assertGet(NewOplogFetcher::validateDocuments(
+ auto info = unittest::assertGet(OplogFetcher::validateDocuments(
{firstEntry},
true,
unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()));
@@ -2779,17 +1854,17 @@ TEST_F(NewOplogFetcherTest,
ASSERT_EQUALS(OpTime(), info.lastDocument);
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
ValidateDocumentsReturnsOplogStartMissingWhenThereAreNoDocumentsWhenProcessingFirstBatch) {
ASSERT_EQUALS(
ErrorCodes::OplogStartMissing,
- NewOplogFetcher::validateDocuments({}, true, Timestamp(Seconds(100), 0)).getStatus());
+ OplogFetcher::validateDocuments({}, true, Timestamp(Seconds(100), 0)).getStatus());
}
-TEST_F(NewOplogFetcherTest,
+TEST_F(OplogFetcherTest,
ValidateDocumentsReturnsDefaultInfoWhenThereAreNoDocumentsWhenNotProcessingFirstBatch) {
- auto info = unittest::assertGet(
- NewOplogFetcher::validateDocuments({}, false, Timestamp(Seconds(100), 0)));
+ auto info =
+ unittest::assertGet(OplogFetcher::validateDocuments({}, false, Timestamp(Seconds(100), 0)));
ASSERT_EQUALS(0U, info.networkDocumentCount);
ASSERT_EQUALS(0U, info.networkDocumentBytes);
@@ -2800,7 +1875,7 @@ TEST_F(NewOplogFetcherTest,
ASSERT_EQUALS(OpTime(), info.lastDocument);
}
-TEST_F(NewOplogFetcherTest, OplogFetcherReturnsHostUnreachableOnConnectionFailures) {
+TEST_F(OplogFetcherTest, OplogFetcherReturnsHostUnreachableOnConnectionFailures) {
// Test that OplogFetcher fails to establish initial connection, retrying HostUnreachable.
ShutdownState shutdownState;
@@ -2816,7 +1891,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherReturnsHostUnreachableOnConnectionFailur
ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, OplogFetcherRetriesConnectionButFails) {
+TEST_F(OplogFetcherTest, OplogFetcherRetriesConnectionButFails) {
// Test that OplogFetcher tries but fails after failing the initial connection, retrying
// HostUnreachable.
ShutdownState shutdownState;
@@ -2834,7 +1909,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherRetriesConnectionButFails) {
ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeReconnect) {
+TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeReconnect) {
// Test that OplogFetcher returns CallbackCanceled error if it is shut down after failing the
// initial connection but before it retries the connection.
ShutdownState shutdownState;
@@ -2864,7 +1939,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeR
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, OplogFetcherResetsNumRestartsOnSuccessfulConnection) {
+TEST_F(OplogFetcherTest, OplogFetcherResetsNumRestartsOnSuccessfulConnection) {
// Test that OplogFetcher resets the number of restarts after a successful connection on a
// retry.
ShutdownState shutdownState;
@@ -2909,7 +1984,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherResetsNumRestartsOnSuccessfulConnection)
ASSERT_OK(shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, OplogFetcherCanAutoReconnect) {
+TEST_F(OplogFetcherTest, OplogFetcherCanAutoReconnect) {
// Test that the OplogFetcher can autoreconnect after a broken connection.
ShutdownState shutdownState;
@@ -2932,7 +2007,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherCanAutoReconnect) {
ASSERT_OK(shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, OplogFetcherAutoReconnectsButFails) {
+TEST_F(OplogFetcherTest, OplogFetcherAutoReconnectsButFails) {
// Test that the OplogFetcher fails an autoreconnect after a broken connection.
ShutdownState shutdownState;
@@ -2961,7 +2036,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherAutoReconnectsButFails) {
ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, DisconnectsOnErrorsDuringExhaustStream) {
+TEST_F(OplogFetcherTest, DisconnectsOnErrorsDuringExhaustStream) {
// Test that the connection disconnects if we get errors after successfully receiving a batch
// from the exhaust stream.
ShutdownState shutdownState;
@@ -3010,7 +2085,7 @@ TEST_F(NewOplogFetcherTest, DisconnectsOnErrorsDuringExhaustStream) {
ASSERT_OK(shutdownState.getStatus());
}
-TEST_F(NewOplogFetcherTest, GetMoreEmptyBatch) {
+TEST_F(OplogFetcherTest, GetMoreEmptyBatch) {
ShutdownState shutdownState;
// Create an oplog fetcher without any retries.
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index bdb90237d0b..2ac6a02f4b2 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -77,8 +77,7 @@ server_parameters:
default:
expr: (16 * 1024 * 1024) / 12 * 10
- # TODO SERVER-45574: change to oplog_fetcher.cpp
- # From abstract_oplog_fetcher.cpp
+ # From oplog_fetcher.cpp
oplogInitialFindMaxSeconds:
description: >-
Number of seconds for the `maxTimeMS` on the initial `find` command.
@@ -181,7 +180,7 @@ server_parameters:
# From collection_bulk_loader_impl.cpp
collectionBulkLoaderBatchSizeInBytes:
description: >-
- Limit for the number of bytes of data inserted per storage transaction
+ Limit for the number of bytes of data inserted per storage transaction
(WriteUnitOfWork) by collectionBulkLoader during initial sync collection cloning
set_at: startup
cpp_vartype: int