diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2020-02-19 12:54:25 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-19 18:42:35 +0000 |
commit | d2c07e12a87325dca3265e1b078045cbcf909044 (patch) | |
tree | fb39d92159580db0caa0f7a8945410941f2d1bc8 /src | |
parent | 2f318f6bc8a136d9273b5cc1973f590af374bae0 (diff) | |
download | mongo-d2c07e12a87325dca3265e1b078045cbcf909044.tar.gz |
SERVER-45574: Replace OplogFetcher with NewOplogFetcher
- Delete old OplogFetcher implementation and unit tests
- Rename NewOplogFetcher as OplogFetcher
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/SConscript | 39 | ||||
-rw-r--r-- | src/mongo/db/repl/abstract_oplog_fetcher.cpp | 383 | ||||
-rw-r--r-- | src/mongo/db/repl/abstract_oplog_fetcher.h | 280 | ||||
-rw-r--r-- | src/mongo/db/repl/abstract_oplog_fetcher_test.cpp | 566 | ||||
-rw-r--r-- | src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp | 142 | ||||
-rw-r--r-- | src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h | 100 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 74 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.h | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer_test.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 419 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 242 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_mock.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 1253 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_server_parameters.idl | 5 |
17 files changed, 305 insertions, 3290 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 0a01cd2f57d..99c78289a7f 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -844,45 +844,12 @@ env.Library( ) env.Library( - target='abstract_oplog_fetcher_test_fixture', - source=[ - 'abstract_oplog_fetcher_test_fixture.cpp', - ], - LIBDEPS=[ - 'abstract_oplog_fetcher', - 'oplog_entry', - '$BUILD_DIR/mongo/unittest/task_executor_proxy', - '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', - ], -) - -env.Library( - target='abstract_oplog_fetcher', - source=[ - 'abstract_oplog_fetcher.cpp', - ], - LIBDEPS=[ - 'abstract_async_component', - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/client/fetcher', - '$BUILD_DIR/mongo/db/namespace_string', - '$BUILD_DIR/mongo/db/stats/counters', - '$BUILD_DIR/mongo/db/stats/timer_stats', - '$BUILD_DIR/mongo/executor/task_executor_interface', - ], - LIBDEPS_PRIVATE=[ - 'repl_server_parameters', - '$BUILD_DIR/mongo/db/commands/server_status_core', - ], -) - -env.Library( target='oplog_fetcher', source=[ 'oplog_fetcher.cpp', ], LIBDEPS=[ - 'abstract_oplog_fetcher', + 'abstract_async_component', 'repl_coordinator_interface', 'replica_set_messages', '$BUILD_DIR/mongo/base', @@ -890,6 +857,7 @@ env.Library( '$BUILD_DIR/mongo/db/namespace_string', '$BUILD_DIR/mongo/db/stats/counters', '$BUILD_DIR/mongo/db/stats/timer_stats', + '$BUILD_DIR/mongo/executor/task_executor_interface', ], LIBDEPS_PRIVATE=[ 'repl_server_parameters', @@ -1216,7 +1184,6 @@ env.CppUnitTest( target='db_repl_test', source=[ 'abstract_async_component_test.cpp', - 'abstract_oplog_fetcher_test.cpp', 'apply_ops_test.cpp', 'check_quorum_for_config_change_test.cpp', 'drop_pending_collection_reaper_test.cpp', @@ -1291,8 +1258,6 @@ env.CppUnitTest( '$BUILD_DIR/mongo/util/clock_source_mock', '$BUILD_DIR/mongo/util/concurrency/thread_pool', 'abstract_async_component', - 'abstract_oplog_fetcher', - 'abstract_oplog_fetcher_test_fixture', 'data_replicator_external_state_mock', 'drop_pending_collection_reaper', 'idempotency_test_fixture', diff --git a/src/mongo/db/repl/abstract_oplog_fetcher.cpp b/src/mongo/db/repl/abstract_oplog_fetcher.cpp deleted file mode 100644 index ffff15f063f..00000000000 --- a/src/mongo/db/repl/abstract_oplog_fetcher.cpp +++ /dev/null @@ -1,383 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/abstract_oplog_fetcher.h" - -#include <memory> - -#include "mongo/base/counter.h" -#include "mongo/bson/util/bson_extract.h" -#include "mongo/db/commands/server_status_metric.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/repl/repl_server_parameters_gen.h" -#include "mongo/logv2/log.h" -#include "mongo/platform/mutex.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/log.h" - -namespace mongo { -namespace repl { - -// This failpoint is shared with oplog_fetcher. -MONGO_FAIL_POINT_DEFINE(hangBeforeStartingOplogFetcher) - -namespace { -// Default `maxTimeMS` timeout for `getMore`s. -const Milliseconds kDefaultOplogGetMoreMaxMS{5000}; - -} // namespace - -AbstractOplogFetcher::AbstractOplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - NamespaceString nss, - std::size_t maxFetcherRestarts, - OnShutdownCallbackFn onShutdownCallbackFn, - const std::string& componentName) - : AbstractOplogFetcher(executor, - lastFetched, - source, - nss, - std::make_unique<OplogFetcherRestartDecisionDefault>(maxFetcherRestarts), - onShutdownCallbackFn, - componentName) { - invariant(!_lastFetched.isNull()); - invariant(onShutdownCallbackFn); -} - -AbstractOplogFetcher::AbstractOplogFetcher( - executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - NamespaceString nss, - std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, - OnShutdownCallbackFn onShutdownCallbackFn, - const std::string& componentName) - : AbstractAsyncComponent(executor, componentName), - _source(source), - _nss(nss), - _oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)), - _onShutdownCallbackFn(onShutdownCallbackFn), - _lastFetched(lastFetched) { - invariant(!_lastFetched.isNull()); - invariant(onShutdownCallbackFn); -} - - -Milliseconds AbstractOplogFetcher::_getInitialFindMaxTime() const { - return Milliseconds(oplogInitialFindMaxSeconds.load() * 1000); -} - -Milliseconds AbstractOplogFetcher::_getRetriedFindMaxTime() const { - return Milliseconds(oplogRetriedFindMaxSeconds.load() * 1000); -} - -Milliseconds AbstractOplogFetcher::_getGetMoreMaxTime() const { - return kDefaultOplogGetMoreMaxMS; -} - -Milliseconds AbstractOplogFetcher::_getNetworkTimeoutBuffer() const { - return Milliseconds(oplogNetworkTimeoutBufferSeconds.load() * 1000); -} - -std::string AbstractOplogFetcher::toString() const { - stdx::lock_guard<Latch> lock(_mutex); - str::stream msg; - msg << _getComponentName() << " -" - << " last optime fetched: " << _lastFetched.toString(); - // The fetcher is created a startup, not at construction, so we must check if it exists. - if (_fetcher) { - msg << " fetcher: " << _fetcher->getDiagnosticString(); - } - return msg; -} - -void AbstractOplogFetcher::_makeAndScheduleFetcherCallback( - const executor::TaskExecutor::CallbackArgs& args) { - Status responseStatus = _checkForShutdownAndConvertStatus(args, "error scheduling fetcher"); - if (!responseStatus.isOK()) { - _finishCallback(responseStatus); - return; - } - - BSONObj findCommandObj = - _makeFindCommandObject(_nss, _getLastOpTimeFetched(), _getInitialFindMaxTime()); - BSONObj metadataObj = _makeMetadataObject(); - - Status scheduleStatus = Status::OK(); - { - stdx::lock_guard<Latch> lock(_mutex); - _fetcher = _makeFetcher(findCommandObj, metadataObj, _getInitialFindMaxTime()); - scheduleStatus = _scheduleFetcher_inlock(); - } - if (!scheduleStatus.isOK()) { - _finishCallback(scheduleStatus); - return; - } -} - -Status AbstractOplogFetcher::_doStartup_inlock() noexcept { - return _scheduleWorkAndSaveHandle_inlock( - [this](const executor::TaskExecutor::CallbackArgs& args) { - // Tests use this failpoint to prevent the oplog fetcher from starting. If those - // tests fail and the oplog fetcher is canceled, we want to continue so we see - // a test failure quickly instead of a test timeout eventually. - while (hangBeforeStartingOplogFetcher.shouldFail() && !args.myHandle.isCanceled()) { - sleepmillis(100); - } - _makeAndScheduleFetcherCallback(args); - }, - &_makeAndScheduleFetcherHandle, - "_makeAndScheduleFetcherCallback"); -} - -void AbstractOplogFetcher::_doShutdown_inlock() noexcept { - _cancelHandle_inlock(_makeAndScheduleFetcherHandle); - if (_fetcher) { - _fetcher->shutdown(); - } -} - -Mutex* AbstractOplogFetcher::_getMutex() noexcept { - return &_mutex; -} - -Status AbstractOplogFetcher::_scheduleFetcher_inlock() { - return _fetcher->schedule(); -} - -OpTime AbstractOplogFetcher::getLastOpTimeFetched_forTest() const { - return _getLastOpTimeFetched(); -} - -OpTime AbstractOplogFetcher::_getLastOpTimeFetched() const { - stdx::lock_guard<Latch> lock(_mutex); - return _lastFetched; -} - -BSONObj AbstractOplogFetcher::getCommandObject_forTest() const { - stdx::lock_guard<Latch> lock(_mutex); - return _fetcher->getCommandObject(); -} - -BSONObj AbstractOplogFetcher::getFindQuery_forTest() const { - return _makeFindCommandObject(_nss, _getLastOpTimeFetched(), _getInitialFindMaxTime()); -} - -HostAndPort AbstractOplogFetcher::_getSource() const { - return _source; -} - -NamespaceString AbstractOplogFetcher::_getNamespace() const { - return _nss; -} - -void AbstractOplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, - BSONObjBuilder* getMoreBob) { - Status responseStatus = - _checkForShutdownAndConvertStatus(result.getStatus(), "error in fetcher batch callback"); - if (ErrorCodes::CallbackCanceled == responseStatus) { - LOGV2_DEBUG(21032, - 1, - "{getComponentName} oplog query cancelled to {getSource}: {responseStatus}", - "getComponentName"_attr = _getComponentName(), - "getSource"_attr = _getSource(), - "responseStatus"_attr = redact(responseStatus)); - _finishCallback(responseStatus); - return; - } - // If target cut connections between connecting and querying (for - // example, because it stepped down) we might not have a cursor. - if (!responseStatus.isOK()) { - BSONObj findCommandObj = - _makeFindCommandObject(_nss, _getLastOpTimeFetched(), _getRetriedFindMaxTime()); - BSONObj metadataObj = _makeMetadataObject(); - { - if (_oplogFetcherRestartDecision->shouldContinue(this, responseStatus)) { - stdx::lock_guard<Latch> lock(_mutex); - // Destroying current instance in _shuttingDownFetcher will possibly block. - _shuttingDownFetcher.reset(); - // Move the old fetcher into the shutting down instance. - _shuttingDownFetcher.swap(_fetcher); - // Create and start fetcher with current term and new starting optime, and use the - // retry 'find' timeout. - _fetcher = _makeFetcher(findCommandObj, metadataObj, _getRetriedFindMaxTime()); - - auto scheduleStatus = _scheduleFetcher_inlock(); - if (scheduleStatus.isOK()) { - LOGV2(21033, - "Scheduled new oplog query {fetcher}", - "fetcher"_attr = _fetcher->toString()); - return; - } - LOGV2_ERROR(21037, - "Error scheduling new oplog query: {scheduleStatus}. Returning current " - "oplog query error: {responseStatus}", - "scheduleStatus"_attr = redact(scheduleStatus), - "responseStatus"_attr = redact(responseStatus)); - } - } - _finishCallback(responseStatus); - return; - } - - // Reset fetcher restart counter on successful response. - { - stdx::lock_guard<Latch> lock(_mutex); - invariant(_isActive_inlock()); - _oplogFetcherRestartDecision->fetchSuccessful(this); - } - - if (_isShuttingDown()) { - _finishCallback( - Status(ErrorCodes::CallbackCanceled, _getComponentName() + " shutting down")); - return; - } - - // At this point we have a successful batch and can call the subclass's _onSuccessfulBatch. - const auto& queryResponse = result.getValue(); - auto batchResult = _onSuccessfulBatch(queryResponse); - if (!batchResult.isOK()) { - // The stopReplProducer fail point expects this to return successfully. If another fail - // point wants this to return unsuccessfully, it should use a different error code. - if (batchResult.getStatus() == ErrorCodes::FailPointEnabled) { - _finishCallback(Status::OK()); - return; - } - _finishCallback(batchResult.getStatus()); - return; - } - - // No more data. Stop processing and return Status::OK. - if (!getMoreBob) { - _finishCallback(Status::OK()); - return; - } - - // We have now processed the batch and should move forward our view of _lastFetched. Note that - // the _lastFetched value will not be updated until the _onSuccessfulBatch function is - // completed. - const auto& documents = queryResponse.documents; - if (documents.size() > 0) { - auto lastDocRes = OpTime::parseFromOplogEntry(documents.back()); - if (!lastDocRes.isOK()) { - _finishCallback(lastDocRes.getStatus()); - return; - } - auto lastDoc = lastDocRes.getValue(); - LOGV2_DEBUG(21034, - 3, - "{getComponentName} setting last fetched optime ahead after batch: {lastDoc}", - "getComponentName"_attr = _getComponentName(), - "lastDoc"_attr = lastDoc); - - stdx::lock_guard<Latch> lock(_mutex); - _lastFetched = lastDoc; - } - - // Check for shutdown to save an unnecessary `getMore` request. - if (_isShuttingDown()) { - _finishCallback( - Status(ErrorCodes::CallbackCanceled, _getComponentName() + " shutting down")); - return; - } - - // The _onSuccessfulBatch function returns the `getMore` command we want to send. - getMoreBob->appendElements(batchResult.getValue()); -} - -void AbstractOplogFetcher::_finishCallback(Status status) { - invariant(isActive()); - - _onShutdownCallbackFn(status); - - decltype(_onShutdownCallbackFn) onShutdownCallbackFn; - decltype(_oplogFetcherRestartDecision) oplogFetcherRestartDecision; - stdx::lock_guard<Latch> lock(_mutex); - _transitionToComplete_inlock(); - - // Release any resources that might be held by the '_onShutdownCallbackFn' function object. - // The function object will be destroyed outside the lock since the temporary variable - // 'onShutdownCallbackFn' is declared before 'lock'. - invariant(_onShutdownCallbackFn); - std::swap(_onShutdownCallbackFn, onShutdownCallbackFn); - - // Release any resources held by the OplogFetcherRestartDecision - invariant(_oplogFetcherRestartDecision); - std::swap(_oplogFetcherRestartDecision, oplogFetcherRestartDecision); -} - -std::unique_ptr<Fetcher> AbstractOplogFetcher::_makeFetcher(const BSONObj& findCommandObj, - const BSONObj& metadataObj, - Milliseconds findMaxTime) { - return std::make_unique<Fetcher>( - _getExecutor(), - _source, - _nss.db().toString(), - findCommandObj, - [this](const StatusWith<Fetcher::QueryResponse>& resp, - Fetcher::NextAction*, - BSONObjBuilder* builder) { return _callback(resp, builder); }, - metadataObj, - findMaxTime + _getNetworkTimeoutBuffer(), - _getGetMoreMaxTime() + _getNetworkTimeoutBuffer()); -} - -bool AbstractOplogFetcher::OplogFetcherRestartDecisionDefault::shouldContinue( - AbstractOplogFetcher* fetcher, Status status) { - if (_fetcherRestarts == _maxFetcherRestarts) { - LOGV2(21035, - "Error returned from oplog query (no more query restarts left): {status}", - "status"_attr = redact(status)); - return false; - } - LOGV2( - 21036, - "Restarting oplog query due to error: {status}. Last fetched optime: " - "{fetcher_getLastOpTimeFetched}. Restarts remaining: {maxFetcherRestarts_fetcherRestarts}", - "status"_attr = redact(status), - "fetcher_getLastOpTimeFetched"_attr = fetcher->_getLastOpTimeFetched(), - "maxFetcherRestarts_fetcherRestarts"_attr = (_maxFetcherRestarts - _fetcherRestarts)); - _fetcherRestarts++; - return true; -} - -void AbstractOplogFetcher::OplogFetcherRestartDecisionDefault::fetchSuccessful( - AbstractOplogFetcher* fetcher) { - _fetcherRestarts = 0; -}; - -AbstractOplogFetcher::OplogFetcherRestartDecision::~OplogFetcherRestartDecision(){}; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/abstract_oplog_fetcher.h b/src/mongo/db/repl/abstract_oplog_fetcher.h deleted file mode 100644 index f588472bc18..00000000000 --- a/src/mongo/db/repl/abstract_oplog_fetcher.h +++ /dev/null @@ -1,280 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <functional> - -#include "mongo/base/status_with.h" -#include "mongo/client/fetcher.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/repl/abstract_async_component.h" -#include "mongo/db/repl/optime_with.h" -#include "mongo/platform/mutex.h" - -namespace mongo { -namespace repl { - -/** - * This class represents an abstract base class for replication components that try to read from - * remote oplogs. An abstract oplog fetcher is an abstract async component. It owns a Fetcher - * that fetches operations from a remote oplog and restarts from the last fetched oplog entry on - * error. - * - * The `find` command and metadata are provided by oplog fetchers that subclass the abstract oplog - * fetcher. Subclasses also provide a callback to run on successful batches. - */ -class AbstractOplogFetcher : public AbstractAsyncComponent { - AbstractOplogFetcher(const AbstractOplogFetcher&) = delete; - AbstractOplogFetcher& operator=(const AbstractOplogFetcher&) = delete; - -public: - /** - * Type of function called by the abstract oplog fetcher on shutdown with - * the final abstract oplog fetcher status. - * - * The status will be Status::OK() if we have processed the last batch of operations - * from the cursor ("bob" is null in the fetcher callback). - * - * This function will be called 0 times if startup() fails and at most once after startup() - * returns success. - */ - using OnShutdownCallbackFn = std::function<void(const Status& shutdownStatus)>; - - class OplogFetcherRestartDecision; - - /** - * Invariants if validation fails on any of the provided arguments. - */ - AbstractOplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - NamespaceString nss, - std::size_t maxFetcherRestarts, - OnShutdownCallbackFn onShutdownCallbackFn, - const std::string& componentName); - - AbstractOplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - NamespaceString nss, - std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, - OnShutdownCallbackFn onShutdownCallbackFn, - const std::string& componentName); - - virtual ~AbstractOplogFetcher() = default; - - std::string toString() const; - - // ================== Test support API =================== - - /** - * Returns the command object sent in first remote command. Since the Fetcher is not created - * until startup, this cannot be used until the Fetcher is guaranteed to exist. - */ - BSONObj getCommandObject_forTest() const; - - /** - * Returns the `find` query provided to the Fetcher. Since the Fetcher is not created until - * startup, this can be used for logging the `find` query before startup. - */ - BSONObj getFindQuery_forTest() const; - - /** - * Returns the OpTime of the last oplog entry fetched and processed. - */ - OpTime getLastOpTimeFetched_forTest() const; - - class OplogFetcherRestartDecision { - public: - OplogFetcherRestartDecision(){}; - - virtual ~OplogFetcherRestartDecision() = 0; - - virtual bool shouldContinue(AbstractOplogFetcher* fetcher, Status status) = 0; - - virtual void fetchSuccessful(AbstractOplogFetcher* fetcher) = 0; - }; - - class OplogFetcherRestartDecisionDefault : public OplogFetcherRestartDecision { - public: - OplogFetcherRestartDecisionDefault(std::size_t maxFetcherRestarts) - : _maxFetcherRestarts(maxFetcherRestarts){}; - - bool shouldContinue(AbstractOplogFetcher* fetcher, Status status) final; - - void fetchSuccessful(AbstractOplogFetcher* fetcher) final; - - ~OplogFetcherRestartDecisionDefault(){}; - - private: - AbstractOplogFetcher* _abstractOplogFetcher; - - // Fetcher restarts since the last successful oplog query response. - std::size_t _fetcherRestarts = 0; - - const std::size_t _maxFetcherRestarts; - }; - - -protected: - /** - * Returns how long the `find` command should wait before timing out. - */ - virtual Milliseconds _getInitialFindMaxTime() const; - - /** - * Returns how long the `find` command should wait before timing out, if we are retrying the - * 'find' due to an error. This timeout should be considerably smaller than our initial oplog - * find time, since a communication failure with an upstream node may indicate it is - * unreachable. - */ - virtual Milliseconds _getRetriedFindMaxTime() const; - - /** - * Returns how long the `getMore` command should wait before timing out. - */ - virtual Milliseconds _getGetMoreMaxTime() const; - - /** - * Returns the amount of time to add to the `find` and `getMore` timeouts to calculate the - * network timeout for the requests. - */ - virtual Milliseconds _getNetworkTimeoutBuffer() const; - - /** - * Returns the sync source from which this oplog fetcher is fetching. - */ - HostAndPort _getSource() const; - - /** - * Returns the namespace from which this oplog fetcher is fetching. - */ - NamespaceString _getNamespace() const; - - /** - * Returns the OpTime of the last oplog entry fetched and processed. - */ - virtual OpTime _getLastOpTimeFetched() const; - - // =============== AbstractAsyncComponent overrides ================ - - /** - * Initializes and schedules a Fetcher with a `find` command specified by the subclass. - */ - virtual Status _doStartup_inlock() noexcept override; - - /** - * Shuts down the Fetcher. - */ - virtual void _doShutdown_inlock() noexcept override; - -private: - Mutex* _getMutex() noexcept override; - - /** - * This function must be overriden by subclass oplog fetchers to specify what `find` command - * to issue to the sync source. The subclass is provided with the last OpTime fetched so that - * it can begin its Fetcher from the middle of the oplog. - */ - virtual BSONObj _makeFindCommandObject(const NamespaceString& nss, - OpTime lastOpTimeFetched, - Milliseconds findMaxTime) const = 0; - - /** - * This function must be overriden by subclass oplog fetchers to specify what metadata object - * to send with the `find` command. - */ - virtual BSONObj _makeMetadataObject() const = 0; - - /** - * Function called by the abstract oplog fetcher when it gets a successful batch from - * the sync source. - * - * On success, returns the BSONObj of the `getMore` command that should be sent back to the - * sync source. On failure returns a status that will be passed to the _finishCallback. - */ - virtual StatusWith<BSONObj> _onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) = 0; - - /** - * This function creates a Fetcher with the given `find` command and metadata. - */ - std::unique_ptr<Fetcher> _makeFetcher(const BSONObj& findCommandObj, - const BSONObj& metadataObj, - Milliseconds findTimeout); - /** - * Callback used to make a Fetcher, and then save and schedule it in a lock. - */ - void _makeAndScheduleFetcherCallback(const executor::TaskExecutor::CallbackArgs& args); - - /** - * Schedules fetcher and updates counters. - */ - Status _scheduleFetcher_inlock(); - - /** - * Processes each batch of results from the cursor started by the Fetcher on the sync source. - * - * Calls "_finishCallback" if there is an error or if there are no further results to - * request from the sync source. - */ - void _callback(const Fetcher::QueryResponseStatus& result, BSONObjBuilder* getMoreBob); - - /** - * Notifies caller that the oplog fetcher has completed processing operations from - * the remote oplog using the "_onShutdownCallbackFn". - */ - void _finishCallback(Status status); - - // Sync source to read from. - const HostAndPort _source; - - // Namespace of the oplog to read. - const NamespaceString _nss; - - std::unique_ptr<OplogFetcherRestartDecision> _oplogFetcherRestartDecision; - - // Protects member data of this AbstractOplogFetcher. - mutable Mutex _mutex = MONGO_MAKE_LATCH("AbstractOplogFetcher::_mutex"); - - // Function to call when the oplog fetcher shuts down. - OnShutdownCallbackFn _onShutdownCallbackFn; - - // Used to keep track of the last oplog entry read and processed from the sync source. - OpTime _lastFetched; - - std::unique_ptr<Fetcher> _fetcher; - std::unique_ptr<Fetcher> _shuttingDownFetcher; - - // Handle to currently scheduled _makeAndScheduleFetcherCallback task. - executor::TaskExecutor::CallbackHandle _makeAndScheduleFetcherHandle; -}; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp deleted file mode 100644 index 5fe87c8530c..00000000000 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp +++ /dev/null @@ -1,566 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault - -#include "mongo/platform/basic.h" - -#include <memory> - -#include "mongo/db/repl/abstract_oplog_fetcher.h" -#include "mongo/db/repl/abstract_oplog_fetcher_test_fixture.h" -#include "mongo/db/repl/oplog_entry.h" -#include "mongo/db/repl/task_executor_mock.h" -#include "mongo/logv2/log.h" -#include "mongo/unittest/task_executor_proxy.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/log.h" -#include "mongo/util/scopeguard.h" - -namespace { - -using namespace mongo; -using namespace mongo::repl; - -using executor::RemoteCommandRequest; -using executor::RemoteCommandResponse; -using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard; - -HostAndPort source("localhost:12345"); -NamespaceString nss("local.oplog.rs"); - -// For testing. Should match the value used in the AbstractOplogFetcher. -const Milliseconds kNetworkTimeoutBufferMS{5000}; - -/** - * This class is the minimal implementation of an oplog fetcher. It has the simplest `find` command - * possible, no metadata, and the _onSuccessfulBatch function simply returns a `getMore` command - * on the fetcher's cursor. - */ -class MockOplogFetcher : public AbstractOplogFetcher { -public: - explicit MockOplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - NamespaceString nss, - std::size_t maxFetcherRestarts, - OnShutdownCallbackFn onShutdownCallbackFn); - - void setInitialFindMaxTime(Milliseconds findMaxTime) { - _initialFindMaxTime = findMaxTime; - } - - void setRetriedFindMaxTime(Milliseconds findMaxTime) { - _retriedFindMaxTime = findMaxTime; - } - -private: - BSONObj _makeFindCommandObject(const NamespaceString& nss, - OpTime lastOpTimeFetched, - Milliseconds findMaxTime) const override; - BSONObj _makeMetadataObject() const override; - - StatusWith<BSONObj> _onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) override; - - Milliseconds _getInitialFindMaxTime() const override; - - Milliseconds _getRetriedFindMaxTime() const override; - - Milliseconds _initialFindMaxTime{60000}; - Milliseconds _retriedFindMaxTime{2000}; -}; - -MockOplogFetcher::MockOplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - NamespaceString nss, - std::size_t maxFetcherRestarts, - OnShutdownCallbackFn onShutdownCallbackFn) - : AbstractOplogFetcher(executor, - lastFetched, - source, - nss, - maxFetcherRestarts, - onShutdownCallbackFn, - "mock oplog fetcher") {} - -Milliseconds MockOplogFetcher::_getInitialFindMaxTime() const { - return _initialFindMaxTime; -} - -Milliseconds MockOplogFetcher::_getRetriedFindMaxTime() const { - return _retriedFindMaxTime; -} - -BSONObj MockOplogFetcher::_makeFindCommandObject(const NamespaceString& nss, - OpTime lastOpTimeFetched, - Milliseconds findMaxTime) const { - BSONObjBuilder cmdBob; - cmdBob.append("find", nss.coll()); - cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp()))); - cmdBob.append("maxTimeMS", durationCount<Milliseconds>(findMaxTime)); - return cmdBob.obj(); -} - -BSONObj MockOplogFetcher::_makeMetadataObject() const { - return BSONObj(); -} - -StatusWith<BSONObj> MockOplogFetcher::_onSuccessfulBatch( - const Fetcher::QueryResponse& queryResponse) { - BSONObjBuilder cmdBob; - cmdBob.append("getMore", queryResponse.cursorId); - cmdBob.append("collection", _getNamespace().coll()); - return cmdBob.obj(); -} - -TEST_F(AbstractOplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) { - getExecutor().shutdown(); - - MockOplogFetcher oplogFetcher(&getExecutor(), lastFetched, source, nss, 0, [](Status) {}); - - // Last optime fetched should match values passed to constructor. - ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeFetched_forTest()); - - ASSERT_FALSE(oplogFetcher.isActive()); - ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, oplogFetcher.startup()); - ASSERT_FALSE(oplogFetcher.isActive()); - - // Last optime fetched should not change. - ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeFetched_forTest()); -} - -TEST_F(AbstractOplogFetcherTest, StartupReturnsOperationFailedIfExecutorFailsToScheduleFetcher) { - ShutdownState shutdownState; - - TaskExecutorMock taskExecutorMock(&getExecutor()); - taskExecutorMock.shouldFailScheduleWorkRequest = []() { return true; }; - - MockOplogFetcher oplogFetcher( - &taskExecutorMock, lastFetched, source, nss, 0, std::ref(shutdownState)); - - ASSERT_EQUALS(ErrorCodes::OperationFailed, oplogFetcher.startup()); -} - -TEST_F(AbstractOplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToScheduleFind) { - ShutdownState shutdownState; - - TaskExecutorMock taskExecutorMock(&getExecutor()); - taskExecutorMock.shouldFailScheduleRemoteCommandRequest = - [](const executor::RemoteCommandRequestOnAny&) { return true; }; - - MockOplogFetcher oplogFetcher( - &taskExecutorMock, lastFetched, source, nss, 0, std::ref(shutdownState)); - - ASSERT_FALSE(oplogFetcher.isActive()); - ASSERT_OK(oplogFetcher.startup()); - - // It is racy to check OplogFetcher::isActive() immediately after calling startup() because - // OplogFetcher schedules the remote command on a different thread from the caller of startup(). - - oplogFetcher.join(); - - ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState.getStatus()); -} - -TEST_F(AbstractOplogFetcherTest, ShuttingExecutorDownAfterStartupStopsTheOplogFetcher) { - ShutdownState shutdownState; - - TaskExecutorMock taskExecutorMock(&getExecutor()); - taskExecutorMock.shouldDeferScheduleWorkRequestByOneSecond = []() { return true; }; - - MockOplogFetcher oplogFetcher( - &taskExecutorMock, lastFetched, source, nss, 0, std::ref(shutdownState)); - - ASSERT_FALSE(oplogFetcher.isActive()); - ASSERT_OK(oplogFetcher.startup()); - ASSERT_TRUE(oplogFetcher.isActive()); - - getExecutor().shutdown(); - - oplogFetcher.join(); - - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); -} - -TEST_F(AbstractOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterStartup) { - ShutdownState shutdownState; - - TaskExecutorMock taskExecutorMock(&getExecutor()); - taskExecutorMock.shouldDeferScheduleWorkRequestByOneSecond = []() { return true; }; - - MockOplogFetcher oplogFetcher( - &taskExecutorMock, lastFetched, source, nss, 0, std::ref(shutdownState)); - - ASSERT_FALSE(oplogFetcher.isActive()); - ASSERT_OK(oplogFetcher.startup()); - ASSERT_TRUE(oplogFetcher.isActive()); - - oplogFetcher.shutdown(); - - oplogFetcher.join(); - - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); -} - -Timestamp _getTimestamp(const BSONObj& oplogEntry) { - return OplogEntry(oplogEntry).getOpTime().getTimestamp(); -} - -OpTime _getOpTime(const BSONObj& oplogEntry) { - return OplogEntry(oplogEntry).getOpTime(); -} - -std::vector<BSONObj> _generateOplogEntries(std::size_t size) { - std::vector<BSONObj> ops(size); - for (std::size_t i = 0; i < size; ++i) { - ops[i] = AbstractOplogFetcherTest::makeNoopOplogEntry(Seconds(100 + int(i))); - } - return ops; -} - -void _assertFindCommandTimestampEquals(const Timestamp& timestamp, - const RemoteCommandRequest& request) { - executor::TaskExecutorTest::assertRemoteCommandNameEquals("find", request); - ASSERT_EQUALS(timestamp, request.cmdObj["filter"].Obj()["ts"].Obj()["$gte"].timestamp()); -} - -void _assertFindCommandTimestampEquals(const BSONObj& oplogEntry, - const RemoteCommandRequest& request) { - _assertFindCommandTimestampEquals(_getTimestamp(oplogEntry), request); -} - -TEST_F(AbstractOplogFetcherTest, - OplogFetcherCreatesNewFetcherOnCallbackErrorDuringGetMoreNumberOne) { - auto ops = _generateOplogEntries(5U); - std::size_t maxFetcherRestarts = 1U; - auto shutdownState = std::make_unique<ShutdownState>(); - MockOplogFetcher oplogFetcher(&getExecutor(), - _getOpTime(ops[0]), - source, - nss, - maxFetcherRestarts, - std::ref(*shutdownState)); - - ON_BLOCK_EXIT([this] { getExecutor().shutdown(); }); - - ASSERT_OK(oplogFetcher.startup()); - - // Send first batch from FIND. - _assertFindCommandTimestampEquals( - ops[0], processNetworkResponse({makeCursorResponse(1, {ops[0], ops[1], ops[2]})}, true)); - - // Send error during GETMORE. - processNetworkResponse({ErrorCodes::CursorNotFound, "cursor not found"}, true); - - // Send first batch from FIND, and Check that it started from the end of the last FIND response. - // Check that the optimes match for the query and last oplog entry. - _assertFindCommandTimestampEquals( - ops[2], processNetworkResponse({makeCursorResponse(0, {ops[2], ops[3], ops[4]})}, false)); - - // Done. - oplogFetcher.join(); - ASSERT_OK(shutdownState->getStatus()); -} - -TEST_F(AbstractOplogFetcherTest, OplogFetcherStopsRestartingFetcherIfRestartLimitIsReached) { - auto ops = _generateOplogEntries(3U); - std::size_t maxFetcherRestarts = 2U; - auto shutdownState = std::make_unique<ShutdownState>(); - MockOplogFetcher oplogFetcher(&getExecutor(), - _getOpTime(ops[0]), - source, - nss, - maxFetcherRestarts, - std::ref(*shutdownState)); - - ON_BLOCK_EXIT([this] { getExecutor().shutdown(); }); - - ASSERT_OK(oplogFetcher.startup()); - - LOGV2(21038, "processing find request from first fetcher"); - - _assertFindCommandTimestampEquals( - ops[0], processNetworkResponse({makeCursorResponse(1, {ops[0], ops[1], ops[2]})}, true)); - - LOGV2(21039, "sending error response to getMore request from first fetcher"); - assertRemoteCommandNameEquals( - "getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "fail 1"}, true)); - - LOGV2(21040, "sending error response to find request from second fetcher"); - _assertFindCommandTimestampEquals( - ops[2], processNetworkResponse({ErrorCodes::IllegalOperation, "fail 2"}, true)); - - LOGV2(21041, "sending error response to find request from third fetcher"); - _assertFindCommandTimestampEquals( - ops[2], processNetworkResponse({ErrorCodes::OperationFailed, "fail 3"}, false)); - - oplogFetcher.join(); - ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState->getStatus()); -} - -TEST_F(AbstractOplogFetcherTest, OplogFetcherResetsRestartCounterOnSuccessfulFetcherResponse) { - auto ops = _generateOplogEntries(5U); - std::size_t maxFetcherRestarts = 2U; - auto shutdownState = std::make_unique<ShutdownState>(); - MockOplogFetcher oplogFetcher(&getExecutor(), - _getOpTime(ops[0]), - source, - nss, - maxFetcherRestarts, - std::ref(*shutdownState)); - ON_BLOCK_EXIT([this] { getExecutor().shutdown(); }); - - ASSERT_OK(oplogFetcher.startup()); - - LOGV2(21042, "processing find request from first fetcher"); - - _assertFindCommandTimestampEquals( - ops[0], processNetworkResponse({makeCursorResponse(1, {ops[0], ops[1], ops[2]})}, true)); - - LOGV2(21043, "sending error response to getMore request from first fetcher"); - assertRemoteCommandNameEquals( - "getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "fail 1"}, true)); - - LOGV2(21044, "processing find request from second fetcher"); - _assertFindCommandTimestampEquals( - ops[2], processNetworkResponse({makeCursorResponse(1, {ops[2], ops[3], ops[4]})}, true)); - - LOGV2(21045, "sending error response to getMore request from second fetcher"); - assertRemoteCommandNameEquals( - "getMore", processNetworkResponse({ErrorCodes::IllegalOperation, "fail 2"}, true)); - - LOGV2(21046, "sending error response to find request from third fetcher"); - _assertFindCommandTimestampEquals( - ops[4], processNetworkResponse({ErrorCodes::InternalError, "fail 3"}, true)); - - LOGV2(21047, "sending error response to find request from fourth fetcher"); - _assertFindCommandTimestampEquals( - ops[4], processNetworkResponse({ErrorCodes::OperationFailed, "fail 4"}, false)); - - oplogFetcher.join(); - ASSERT_EQUALS(ErrorCodes::OperationFailed, shutdownState->getStatus()); -} - -class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy { -public: - using ShouldFailRequestFn = std::function<bool(const executor::RemoteCommandRequest&)>; - - TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor, - ShouldFailRequestFn shouldFailRequest) - : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} - - StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, - const RemoteCommandCallbackFn& cb, - const BatonHandle& baton = nullptr) override { - if (_shouldFailRequest(request)) { - return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); - } - return getExecutor()->scheduleRemoteCommand(request, cb); - } - -private: - ShouldFailRequestFn _shouldFailRequest; -}; - -TEST_F(AbstractOplogFetcherTest, - OplogFetcherAbortsWithOriginalResponseErrorOnFailureToScheduleNewFetcher) { - auto ops = _generateOplogEntries(3U); - std::size_t maxFetcherRestarts = 2U; - auto shutdownState = std::make_unique<ShutdownState>(); - bool shouldFailSchedule = false; - TaskExecutorWithFailureInScheduleRemoteCommand _executorProxy( - &getExecutor(), [&shouldFailSchedule](const executor::RemoteCommandRequest& request) { - return shouldFailSchedule; - }); - MockOplogFetcher oplogFetcher(&_executorProxy, - _getOpTime(ops[0]), - source, - nss, - maxFetcherRestarts, - std::ref(*shutdownState)); - ON_BLOCK_EXIT([this] { getExecutor().shutdown(); }); - - ASSERT_OK(oplogFetcher.startup()); - ASSERT_TRUE(oplogFetcher.isActive()); - - LOGV2(21048, "processing find request from first fetcher"); - - _assertFindCommandTimestampEquals( - ops[0], processNetworkResponse({makeCursorResponse(1, {ops[0], ops[1], ops[2]})}, true)); - - LOGV2(21049, "sending error response to getMore request from first fetcher"); - shouldFailSchedule = true; - assertRemoteCommandNameEquals( - "getMore", processNetworkResponse({ErrorCodes::CappedPositionLost, "dead cursor"}, false)); - - oplogFetcher.join(); - // Status in shutdown callback should match error for dead cursor instead of error from failed - // schedule request. - ASSERT_EQUALS(ErrorCodes::CappedPositionLost, shutdownState->getStatus()); -} - -TEST_F(AbstractOplogFetcherTest, OplogFetcherTimesOutCorrectlyOnInitialFindRequests) { - auto ops = _generateOplogEntries(2U); - std::size_t maxFetcherRestarts = 0U; - auto shutdownState = std::make_unique<ShutdownState>(); - MockOplogFetcher oplogFetcher(&getExecutor(), - _getOpTime(ops[0]), - source, - nss, - maxFetcherRestarts, - std::ref(*shutdownState)); - - // Set a finite network timeout for the initial find request. - auto initialFindMaxTime = Milliseconds(10000); - oplogFetcher.setInitialFindMaxTime(initialFindMaxTime); - - ON_BLOCK_EXIT([this] { getExecutor().shutdown(); }); - - ASSERT_OK(oplogFetcher.startup()); - ASSERT_TRUE(oplogFetcher.isActive()); - - auto net = getNet(); - - // Schedule a response at a time that would exceed the initial find request network timeout. - net->enterNetwork(); - auto when = net->now() + initialFindMaxTime + kNetworkTimeoutBufferMS + Milliseconds(10); - auto noi = getNet()->getNextReadyRequest(); - RemoteCommandResponse response = {{makeCursorResponse(1, {ops[0], ops[1]})}, Milliseconds(0)}; - auto request = net->scheduleSuccessfulResponse(noi, when, response); - net->runUntil(when); - net->runReadyNetworkOperations(); - net->exitNetwork(); - - oplogFetcher.join(); - - // The fetcher should have shut down after its last request timed out. - ASSERT_TRUE(ErrorCodes::isExceededTimeLimitError(shutdownState->getStatus().code())); -} - -TEST_F(AbstractOplogFetcherTest, OplogFetcherTimesOutCorrectlyOnRetriedFindRequests) { - auto ops = _generateOplogEntries(2U); - std::size_t maxFetcherRestarts = 1U; - auto shutdownState = std::make_unique<ShutdownState>(); - MockOplogFetcher oplogFetcher(&getExecutor(), - _getOpTime(ops[0]), - source, - nss, - maxFetcherRestarts, - std::ref(*shutdownState)); - - // Set finite network timeouts for the initial and retried find requests. - auto initialFindMaxTime = Milliseconds(10000); - auto retriedFindMaxTime = Milliseconds(1000); - oplogFetcher.setInitialFindMaxTime(initialFindMaxTime); - oplogFetcher.setRetriedFindMaxTime(retriedFindMaxTime); - - ON_BLOCK_EXIT([this] { getExecutor().shutdown(); }); - - ASSERT_OK(oplogFetcher.startup()); - ASSERT_TRUE(oplogFetcher.isActive()); - - auto net = getNet(); - - // Schedule a response at a time that would exceed the initial find request network timeout. - net->enterNetwork(); - auto when = net->now() + initialFindMaxTime + kNetworkTimeoutBufferMS + Milliseconds(10); - auto noi = getNet()->getNextReadyRequest(); - RemoteCommandResponse response = {{makeCursorResponse(1, {ops[0], ops[1]})}, Milliseconds(0)}; - auto request = net->scheduleSuccessfulResponse(noi, when, response); - net->runUntil(when); - net->runReadyNetworkOperations(); - net->exitNetwork(); - - // Schedule a response at a time that would exceed the retried find request network timeout. - net->enterNetwork(); - when = net->now() + retriedFindMaxTime + kNetworkTimeoutBufferMS + Milliseconds(10); - noi = getNet()->getNextReadyRequest(); - response = {{makeCursorResponse(1, {ops[0], ops[1]})}, Milliseconds(0)}; - request = net->scheduleSuccessfulResponse(noi, when, response); - net->runUntil(when); - net->runReadyNetworkOperations(); - net->exitNetwork(); - - oplogFetcher.join(); - - // The fetcher should have shut down after its last request timed out. - ASSERT_TRUE(ErrorCodes::isExceededTimeLimitError(shutdownState->getStatus().code())); -} - -bool sharedCallbackStateDestroyed = false; -class SharedCallbackState { - SharedCallbackState(const SharedCallbackState&) = delete; - SharedCallbackState& operator=(const SharedCallbackState&) = delete; - -public: - SharedCallbackState() {} - ~SharedCallbackState() { - sharedCallbackStateDestroyed = true; - } -}; - -TEST_F(AbstractOplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFunctionOnCompletion) { - auto sharedCallbackData = std::make_shared<SharedCallbackState>(); - auto callbackInvoked = false; - auto status = getDetectableErrorStatus(); - - MockOplogFetcher oplogFetcher( - &getExecutor(), - lastFetched, - source, - nss, - 0, - [&callbackInvoked, sharedCallbackData, &status](const Status& shutdownStatus) { - status = shutdownStatus, callbackInvoked = true; - }); - ON_BLOCK_EXIT([this] { getExecutor().shutdown(); }); - - ASSERT_FALSE(oplogFetcher.isActive()); - ASSERT_OK(oplogFetcher.startup()); - ASSERT_TRUE(oplogFetcher.isActive()); - - sharedCallbackData.reset(); - ASSERT_FALSE(sharedCallbackStateDestroyed); - - processNetworkResponse({ErrorCodes::OperationFailed, "oplog tailing query failed"}, false); - - oplogFetcher.join(); - - ASSERT_EQUALS(ErrorCodes::OperationFailed, status); - - // Oplog fetcher should reset 'OplogFetcher::_onShutdownCallbackFn' after running callback - // function before becoming inactive. - // This ensures that we release resources associated with - // 'OplogFetcher::_onShutdownCallbackFn'. - ASSERT_TRUE(callbackInvoked); - ASSERT_TRUE(sharedCallbackStateDestroyed); -} - -} // namespace diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp deleted file mode 100644 index 68d8e861ddd..00000000000 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/abstract_oplog_fetcher_test_fixture.h" - -#include "mongo/db/repl/oplog_entry.h" -#include "mongo/executor/thread_pool_task_executor_test_fixture.h" -#include "mongo/logv2/log.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/log.h" - -namespace mongo { -namespace repl { - -namespace { - -/** - * Creates an OplogEntry using given field values. - */ -repl::OplogEntry makeOplogEntry(repl::OpTime opTime, - repl::OpTypeEnum opType, - NamespaceString nss, - BSONObj object) { - return repl::OplogEntry(opTime, // optime - boost::none, // hash - opType, // opType - nss, // namespace - boost::none, // uuid - boost::none, // fromMigrate - repl::OplogEntry::kOplogVersion, // version - object, // o - boost::none, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none); // post-image optime -} - -} // namespace - -ShutdownState::ShutdownState() = default; - -Status ShutdownState::getStatus() const { - return _status; -} - -void ShutdownState::operator()(const Status& status) { - _status = status; -} - -BSONObj AbstractOplogFetcherTest::makeNoopOplogEntry(OpTime opTime) { - return makeOplogEntry(opTime, OpTypeEnum::kNoop, NamespaceString("test.t"), BSONObj()).toBSON(); -} - -BSONObj AbstractOplogFetcherTest::makeNoopOplogEntry(Seconds seconds) { - return makeNoopOplogEntry({{seconds, 0}, 1LL}); -} - -BSONObj AbstractOplogFetcherTest::makeCursorResponse(CursorId cursorId, - Fetcher::Documents oplogEntries, - bool isFirstBatch, - const NamespaceString& nss) { - BSONObjBuilder bob; - { - BSONObjBuilder cursorBob(bob.subobjStart("cursor")); - cursorBob.append("id", cursorId); - cursorBob.append("ns", nss.toString()); - { - BSONArrayBuilder batchBob( - cursorBob.subarrayStart(isFirstBatch ? "firstBatch" : "nextBatch")); - for (auto oplogEntry : oplogEntries) { - batchBob.append(oplogEntry); - } - } - } - bob.append("ok", 1); - return bob.obj(); -} - -void AbstractOplogFetcherTest::setUp() { - executor::ThreadPoolExecutorTest::setUp(); - launchExecutorThread(); - - lastFetched = {{123, 0}, 1}; - lastFetchedWall = Date_t() + Seconds(lastFetched.getSecs()); -} - -executor::RemoteCommandRequest AbstractOplogFetcherTest::processNetworkResponse( - executor::RemoteCommandResponse response, bool expectReadyRequestsAfterProcessing) { - - auto net = getNet(); - executor::NetworkInterfaceMock::InNetworkGuard guard(net); - LOGV2(21050, "scheduling response."); - auto request = net->scheduleSuccessfulResponse(response); - LOGV2(21051, "running network ops."); - net->runReadyNetworkOperations(); - LOGV2(21052, "checking for more requests"); - ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests()); - LOGV2(21053, "returning consumed request"); - return request; -} - -executor::RemoteCommandRequest AbstractOplogFetcherTest::processNetworkResponse( - BSONObj obj, bool expectReadyRequestsAfterProcessing) { - return processNetworkResponse({obj, Milliseconds(0)}, expectReadyRequestsAfterProcessing); -} - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h deleted file mode 100644 index 7349689bb32..00000000000 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ -#pragma once - -#include "mongo/db/repl/abstract_oplog_fetcher.h" -#include "mongo/executor/thread_pool_task_executor_test_fixture.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace repl { - -/** - * This class represents the state at shutdown of an abstract oplog fetcher. - */ -class ShutdownState { - ShutdownState(const ShutdownState&) = delete; - ShutdownState& operator=(const ShutdownState&) = delete; - -public: - ShutdownState(); - - /** - * Returns the status at shutdown. - */ - Status getStatus() const; - - /** - * Use this for oplog fetcher shutdown callback. - */ - void operator()(const Status& status); - -private: - Status _status = executor::TaskExecutorTest::getDetectableErrorStatus(); -}; - -/** - * This class contains many of the functions used by all oplog fetcher test suites. - */ -class AbstractOplogFetcherTest : public executor::ThreadPoolExecutorTest { -public: - /** - * Static functions for creating noop oplog entries. - */ - static BSONObj makeNoopOplogEntry(OpTime opTime); - static BSONObj makeNoopOplogEntry(Seconds seconds); - - /** - * A static function for creating the response to a cursor. If it's the last batch, the - * cursorId provided should be 0. - */ - static BSONObj makeCursorResponse( - CursorId cursorId, - Fetcher::Documents oplogEntries, - bool isFirstBatch = true, - const NamespaceString& nss = NamespaceString("local.oplog.rs")); - -protected: - void setUp() override; - - /** - * Schedules network response and instructs network interface to process response. - * Returns remote command request in network request. - */ - executor::RemoteCommandRequest processNetworkResponse( - executor::RemoteCommandResponse response, bool expectReadyRequestsAfterProcessing = false); - executor::RemoteCommandRequest processNetworkResponse( - BSONObj obj, bool expectReadyRequestsAfterProcessing = false); - - // The last OpTime fetched by the oplog fetcher. - OpTime lastFetched; - Date_t lastFetchedWall; -}; -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 9e70d60cf4a..d1c6d22120f 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -487,7 +487,7 @@ void BackgroundSync::_produce() { Status fetcherReturnStatus = Status::OK(); DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState( _replCoord, _replicationCoordinatorExternalState, this); - NewOplogFetcher* oplogFetcher; + OplogFetcher* oplogFetcher; try { auto onOplogFetcherShutdownCallbackFn = [&fetcherReturnStatus](const Status& status) { fetcherReturnStatus = status; @@ -496,12 +496,12 @@ void BackgroundSync::_produce() { // replication coordinator. auto numRestarts = _replicationCoordinatorExternalState->getOplogFetcherSteadyStateMaxFetcherRestarts(); - auto oplogFetcherPtr = std::make_unique<NewOplogFetcher>( + auto oplogFetcherPtr = std::make_unique<OplogFetcher>( _replicationCoordinatorExternalState->getTaskExecutor(), lastOpTimeFetched, source, _replCoord->getConfig(), - std::make_unique<NewOplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts), + std::make_unique<OplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts), syncSourceResp.rbid, true /* requireFresherSyncSource */, &dataReplicatorExternalState, @@ -589,9 +589,9 @@ void BackgroundSync::_produce() { } } -Status BackgroundSync::_enqueueDocuments(NewOplogFetcher::Documents::const_iterator begin, - NewOplogFetcher::Documents::const_iterator end, - const NewOplogFetcher::DocumentsInfo& info) { +Status BackgroundSync::_enqueueDocuments(OplogFetcher::Documents::const_iterator begin, + OplogFetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info) { // If this is the first batch of operations returned from the query, "toApplyDocumentCount" will // be one fewer than "networkDocumentCount" because the first document (which was applied // previously) is skipped. diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index f678d91384b..2e96b1ae434 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -166,9 +166,9 @@ private: * * requiredRBID is reset to empty after the first call. */ - Status _enqueueDocuments(NewOplogFetcher::Documents::const_iterator begin, - NewOplogFetcher::Documents::const_iterator end, - const NewOplogFetcher::DocumentsInfo& info); + Status _enqueueDocuments(OplogFetcher::Documents::const_iterator begin, + OplogFetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info); /** * Executes a rollback. @@ -262,7 +262,7 @@ private: std::unique_ptr<SyncSourceResolver> _syncSourceResolver; // (M) // Current oplog fetcher tailing the oplog on the sync source. - std::unique_ptr<NewOplogFetcher> _oplogFetcher; + std::unique_ptr<OplogFetcher> _oplogFetcher; // Current rollback process. If this component is active, we are currently reverting local // operations in the local oplog in order to bring this server to a consistent state relative diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index c3197ff7982..a2aa7ec87a3 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -206,32 +206,32 @@ InitialSyncer::InitialSyncer( _onCompletion(onCompletion), _createClientFn( [] { return std::make_unique<DBClientConnection>(true /* autoReconnect */); }), - _createOplogFetcherFn([](executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, - std::unique_ptr<NewOplogFetcher::OplogFetcherRestartDecision> - oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, - DataReplicatorExternalState* dataReplicatorExternalState, - NewOplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn, - NewOplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - NewOplogFetcher::StartingPoint startingPoint) { - return std::make_unique<NewOplogFetcher>(executor, - lastFetched, - source, - config, - std::move(oplogFetcherRestartDecision), - requiredRBID, - requireFresherSyncSource, - dataReplicatorExternalState, - std::move(enqueueDocumentsFn), - std::move(onShutdownCallbackFn), - batchSize, - startingPoint); - }) { + _createOplogFetcherFn( + [](executor::TaskExecutor* executor, + OpTime lastFetched, + HostAndPort source, + ReplSetConfig config, + std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision, + int requiredRBID, + bool requireFresherSyncSource, + DataReplicatorExternalState* dataReplicatorExternalState, + OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn, + OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn, + const int batchSize, + OplogFetcher::StartingPoint startingPoint) { + return std::make_unique<OplogFetcher>(executor, + lastFetched, + source, + config, + std::move(oplogFetcherRestartDecision), + requiredRBID, + requireFresherSyncSource, + dataReplicatorExternalState, + std::move(enqueueDocumentsFn), + std::move(onShutdownCallbackFn), + batchSize, + startingPoint); + }) { uassert(ErrorCodes::BadValue, "task executor cannot be null", _exec); uassert(ErrorCodes::BadValue, "invalid storage interface", _storage); uassert(ErrorCodes::BadValue, "invalid replication process", _replicationProcess); @@ -463,7 +463,7 @@ void InitialSyncer::setCreateOplogFetcherFn_forTest( _createOplogFetcherFn = createOplogFetcherFn; } -NewOplogFetcher* InitialSyncer::getOplogFetcher_forTest() const { +OplogFetcher* InitialSyncer::getOplogFetcher_forTest() const { // Wait up to 10 seconds. for (auto i = 0; i < 100; i++) { { @@ -1123,14 +1123,14 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> _rollbackChecker->getBaseRBID(), false /* requireFresherSyncSource */, _dataReplicatorExternalState.get(), - [=](NewOplogFetcher::Documents::const_iterator first, - NewOplogFetcher::Documents::const_iterator last, - const NewOplogFetcher::DocumentsInfo& info) { + [=](OplogFetcher::Documents::const_iterator first, + OplogFetcher::Documents::const_iterator last, + const OplogFetcher::DocumentsInfo& info) { return _enqueueDocuments(first, last, info); }, [=](const Status& s) { _oplogFetcherCallback(s, onCompletionGuard); }, initialSyncOplogFetcherBatchSize, - NewOplogFetcher::StartingPoint::kEnqueueFirstDoc); + OplogFetcher::StartingPoint::kEnqueueFirstDoc); LOGV2_DEBUG(21178, 2, @@ -1970,9 +1970,9 @@ StatusWith<HostAndPort> InitialSyncer::_chooseSyncSource_inlock() { return syncSource; } -Status InitialSyncer::_enqueueDocuments(NewOplogFetcher::Documents::const_iterator begin, - NewOplogFetcher::Documents::const_iterator end, - const NewOplogFetcher::DocumentsInfo& info) { +Status InitialSyncer::_enqueueDocuments(OplogFetcher::Documents::const_iterator begin, + OplogFetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info) { if (info.toApplyDocumentCount == 0) { return Status::OK(); } @@ -2047,8 +2047,8 @@ void InitialSyncer::InitialSyncAttemptInfo::append(BSONObjBuilder* builder) cons builder->append("totalTimeUnreachableMillis", totalTimeUnreachableMillis); } -bool InitialSyncer::OplogFetcherRestartDecisionInitialSyncer::shouldContinue( - NewOplogFetcher* fetcher, Status status) { +bool InitialSyncer::OplogFetcherRestartDecisionInitialSyncer::shouldContinue(OplogFetcher* fetcher, + Status status) { if (ErrorCodes::isRetriableError(status)) { stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData); return _sharedData->shouldRetryOperation(lk, &_retryingOperation); @@ -2060,7 +2060,7 @@ bool InitialSyncer::OplogFetcherRestartDecisionInitialSyncer::shouldContinue( } void InitialSyncer::OplogFetcherRestartDecisionInitialSyncer::fetchSuccessful( - NewOplogFetcher* fetcher) { + OplogFetcher* fetcher) { _retryingOperation = boost::none; _defaultDecision.fetchSuccessful(fetcher); } diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index 43d21bdafc3..6d72031d9bb 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -155,19 +155,19 @@ public: /** * Type of function to create an OplogFetcher. */ - using CreateOplogFetcherFn = std::function<std::unique_ptr<NewOplogFetcher>( + using CreateOplogFetcherFn = std::function<std::unique_ptr<OplogFetcher>( executor::TaskExecutor* executor, OpTime lastFetched, HostAndPort source, ReplSetConfig config, - std::unique_ptr<NewOplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision, + std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision, int requiredRBID, bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, - NewOplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn, - NewOplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn, + OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn, + OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn, const int batchSize, - NewOplogFetcher::StartingPoint startingPoint)>; + OplogFetcher::StartingPoint startingPoint)>; struct InitialSyncAttemptInfo { int durationMillis; @@ -183,22 +183,22 @@ public: }; class OplogFetcherRestartDecisionInitialSyncer - : public NewOplogFetcher::OplogFetcherRestartDecision { + : public OplogFetcher::OplogFetcherRestartDecision { public: OplogFetcherRestartDecisionInitialSyncer(InitialSyncSharedData* sharedData, std::size_t maxFetcherRestarts) : _sharedData(sharedData), _defaultDecision(maxFetcherRestarts){}; - bool shouldContinue(NewOplogFetcher* fetcher, Status status) final; + bool shouldContinue(OplogFetcher* fetcher, Status status) final; - void fetchSuccessful(NewOplogFetcher* fetcher) final; + void fetchSuccessful(OplogFetcher* fetcher) final; private: InitialSyncSharedData* _sharedData; // We delegate to the default strategy when it's a non-network error. - NewOplogFetcher::OplogFetcherRestartDecisionDefault _defaultDecision; + OplogFetcher::OplogFetcherRestartDecisionDefault _defaultDecision; // The operation, if any, currently being retried because of a network error. InitialSyncSharedData::RetryableOperation _retryingOperation; @@ -280,7 +280,7 @@ public: * * For testing only. */ - NewOplogFetcher* getOplogFetcher_forTest() const; + OplogFetcher* getOplogFetcher_forTest() const; /** * @@ -591,9 +591,9 @@ private: * Returns a status even though it always returns OK, to conform the interface OplogFetcher * expects for the EnqueueDocumentsFn. */ - Status _enqueueDocuments(NewOplogFetcher::Documents::const_iterator begin, - NewOplogFetcher::Documents::const_iterator end, - const NewOplogFetcher::DocumentsInfo& info); + Status _enqueueDocuments(OplogFetcher::Documents::const_iterator begin, + OplogFetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info); void _appendInitialSyncProgressMinimal_inlock(BSONObjBuilder* bob) const; BSONObj _getInitialSyncProgress_inlock() const; @@ -743,7 +743,7 @@ private: InitialSyncSharedData::RetryableOperation _retryingOperation; // (M) std::unique_ptr<InitialSyncState> _initialSyncState; // (M) - std::unique_ptr<NewOplogFetcher> _oplogFetcher; // (S) + std::unique_ptr<OplogFetcher> _oplogFetcher; // (S) std::unique_ptr<Fetcher> _beginFetchingOpTimeFetcher; // (S) std::unique_ptr<Fetcher> _lastOplogEntryFetcher; // (S) std::unique_ptr<Fetcher> _fCVFetcher; // (S) diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 3fd72c2aef4..39ed893a7bd 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -432,16 +432,16 @@ protected: OpTime lastFetched, HostAndPort source, ReplSetConfig config, - std::unique_ptr<NewOplogFetcher::OplogFetcherRestartDecision> + std::unique_ptr<OplogFetcher::OplogFetcherRestartDecision> oplogFetcherRestartDecision, int requiredRBID, bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, - NewOplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn, - NewOplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn, + OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn, + OplogFetcher::OnShutdownCallbackFn onShutdownCallbackFn, const int batchSize, - NewOplogFetcher::StartingPoint startingPoint) { - return std::unique_ptr<NewOplogFetcher>( + OplogFetcher::StartingPoint startingPoint) { + return std::unique_ptr<OplogFetcher>( new OplogFetcherMock(executor, lastFetched, source, diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 8d6ce4ac44d..94c212ce950 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -51,19 +51,15 @@ namespace mongo { namespace repl { -Seconds OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout(2); - MONGO_FAIL_POINT_DEFINE(stopReplProducer); MONGO_FAIL_POINT_DEFINE(stopReplProducerOnDocument); MONGO_FAIL_POINT_DEFINE(setSmallOplogGetMoreMaxTimeMS); MONGO_FAIL_POINT_DEFINE(logAfterOplogFetcherConnCreated); MONGO_FAIL_POINT_DEFINE(hangAfterOplogFetcherCallbackScheduled); +MONGO_FAIL_POINT_DEFINE(hangBeforeStartingOplogFetcher); MONGO_FAIL_POINT_DEFINE(hangBeforeOplogFetcherRetries); MONGO_FAIL_POINT_DEFINE(hangBeforeProcessingSuccessfulBatch); -// TODO SERVER-45574: Define the failpoint in this file instead. -extern FailPoint hangBeforeStartingOplogFetcher; - namespace { // The number and time spent reading batches off the network @@ -280,6 +276,7 @@ StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata( } } // namespace + StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( const Fetcher::Documents& documents, bool first, @@ -333,46 +330,13 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( auto alreadyAppliedDocument = documents.cbegin(); info.toApplyDocumentBytes -= alreadyAppliedDocument->objsize(); } - return info; -} - -OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - NamespaceString nss, - ReplSetConfig config, - std::size_t maxFetcherRestarts, - int requiredRBID, - bool requireFresherSyncSource, - DataReplicatorExternalState* dataReplicatorExternalState, - EnqueueDocumentsFn enqueueDocumentsFn, - OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - StartingPoint startingPoint) - : AbstractOplogFetcher(executor, - lastFetched, - source, - nss, - maxFetcherRestarts, - onShutdownCallbackFn, - "oplog fetcher"), - _metadataObject(makeMetadataObject()), - _requiredRBID(requiredRBID), - _requireFresherSyncSource(requireFresherSyncSource), - _dataReplicatorExternalState(dataReplicatorExternalState), - _enqueueDocumentsFn(enqueueDocumentsFn), - _awaitDataTimeout(calculateAwaitDataTimeout(config)), - _batchSize(batchSize), - _startingPoint(startingPoint) { - invariant(config.isInitialized()); - invariant(enqueueDocumentsFn); + return info; } OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, OpTime lastFetched, HostAndPort source, - NamespaceString nss, ReplSetConfig config, std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, int requiredRBID, @@ -382,324 +346,6 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, OnShutdownCallbackFn onShutdownCallbackFn, const int batchSize, StartingPoint startingPoint) - : AbstractOplogFetcher(executor, - lastFetched, - source, - nss, - std::move(oplogFetcherRestartDecision), - onShutdownCallbackFn, - "oplog fetcher"), - _metadataObject(makeMetadataObject()), - _requiredRBID(requiredRBID), - _requireFresherSyncSource(requireFresherSyncSource), - _dataReplicatorExternalState(dataReplicatorExternalState), - _enqueueDocumentsFn(enqueueDocumentsFn), - _awaitDataTimeout(calculateAwaitDataTimeout(config)), - _batchSize(batchSize), - _startingPoint(startingPoint) { - - invariant(config.isInitialized()); - invariant(enqueueDocumentsFn); -} - - -OplogFetcher::~OplogFetcher() { - shutdown(); - join(); -} - -BSONObj OplogFetcher::_makeFindCommandObject(const NamespaceString& nss, - OpTime lastOpTimeFetched, - Milliseconds findMaxTime) const { - auto lastCommittedWithCurrentTerm = - _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(); - auto term = lastCommittedWithCurrentTerm.value; - BSONObjBuilder cmdBob; - cmdBob.append("find", nss.coll()); - cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp()))); - cmdBob.append("tailable", true); - cmdBob.append("oplogReplay", true); - cmdBob.append("awaitData", true); - cmdBob.append("maxTimeMS", durationCount<Milliseconds>(findMaxTime)); - cmdBob.append("batchSize", _batchSize); - - if (term != OpTime::kUninitializedTerm) { - cmdBob.append("term", term); - } - - // This ensures that the sync source waits for all earlier oplog writes to be visible. - // Since Timestamp(0, 0) isn't allowed, Timestamp(0, 1) is the minimal we can use. - cmdBob.append("readConcern", BSON("afterClusterTime" << Timestamp(0, 1))); - - return cmdBob.obj(); -} - -BSONObj OplogFetcher::_makeMetadataObject() const { - return _metadataObject; -} - -BSONObj OplogFetcher::getMetadataObject_forTest() const { - return _metadataObject; -} - -Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const { - return _getGetMoreMaxTime(); -} - -Milliseconds OplogFetcher::_getGetMoreMaxTime() const { - if (MONGO_unlikely(setSmallOplogGetMoreMaxTimeMS.shouldFail())) { - return Milliseconds(50); - } - - return _awaitDataTimeout; -} - -StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) { - - // Stop fetching and return on fail point. - // This fail point makes the oplog fetcher ignore the downloaded batch of operations and not - // error out. The FailPointEnabled error will be caught by the AbstractOplogFetcher. - if (MONGO_unlikely(stopReplProducer.shouldFail())) { - return Status(ErrorCodes::FailPointEnabled, "stopReplProducer fail point is enabled"); - } - - // Stop fetching and return when we reach a particular document. This failpoint should be used - // with the setParameter bgSyncOplogFetcherBatchSize=1, so that documents are fetched one at a - // time. - { - Status status = Status::OK(); - stopReplProducerOnDocument.executeIf( - [&](auto&&) { - status = {ErrorCodes::FailPointEnabled, - "stopReplProducerOnDocument fail point is enabled."}; - LOGV2(21264, "{status_reason}", "status_reason"_attr = status.reason()); - }, - [&](const BSONObj& data) { - auto opCtx = cc().makeOperationContext(); - boost::intrusive_ptr<ExpressionContext> expCtx( - new ExpressionContext(opCtx.get(), nullptr, _getNamespace())); - Matcher m(data["document"].Obj(), expCtx); - return !queryResponse.documents.empty() && - m.matches(queryResponse.documents.front()["o"].Obj()); - }); - if (!status.isOK()) - return status; - } - - const auto& documents = queryResponse.documents; - auto firstDocToApply = documents.cbegin(); - - if (!documents.empty()) { - LOGV2_DEBUG(21265, - 2, - "oplog fetcher read {documents_size} operations from remote oplog starting at " - "{documents_front_ts} and ending at {documents_back_ts}", - "documents_size"_attr = documents.size(), - "documents_front_ts"_attr = documents.front()["ts"], - "documents_back_ts"_attr = documents.back()["ts"]); - } else { - LOGV2_DEBUG(21266, 2, "oplog fetcher read 0 operations from remote oplog"); - } - - auto oqMetadataResult = parseOplogQueryMetadata(queryResponse); - if (!oqMetadataResult.isOK()) { - LOGV2_ERROR(21276, - "invalid oplog query metadata from sync source {getSource}: " - "{oqMetadataResult_getStatus}: {queryResponse_otherFields_metadata}", - "getSource"_attr = _getSource(), - "oqMetadataResult_getStatus"_attr = oqMetadataResult.getStatus(), - "queryResponse_otherFields_metadata"_attr = queryResponse.otherFields.metadata); - return oqMetadataResult.getStatus(); - } - auto oqMetadata = oqMetadataResult.getValue(); - - // This lastFetched value is the last OpTime from the previous batch. - auto lastFetched = _getLastOpTimeFetched(); - - // Check start of remote oplog and, if necessary, stop fetcher to execute rollback. - if (queryResponse.first) { - auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none; - auto remoteLastApplied = - oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none; - auto status = checkRemoteOplogStart(documents, - lastFetched, - remoteLastApplied, - _requiredRBID, - remoteRBID, - _requireFresherSyncSource); - if (!status.isOK()) { - // Stop oplog fetcher and execute rollback if necessary. - return status; - } - - LOGV2_DEBUG(21267, - 1, - "oplog fetcher successfully fetched from {getSource}", - "getSource"_attr = _getSource()); - - // We do not always enqueue the first document. We elect to skip it for the following - // reasons: - // 1. This is the first batch and no rollback is needed. Callers specify - // StartingPoint::kSkipFirstDoc when they want this behavior. - // 2. We have already enqueued that document in a previous attempt. We can get into - // this situation if we had a batch with StartingPoint::kEnqueueFirstDoc that failed - // right after that first document was enqueued. In such a scenario, we would not - // have advanced the lastFetched opTime, so we skip past that document to avoid - // duplicating it. - - if (_startingPoint == StartingPoint::kSkipFirstDoc) { - firstDocToApply++; - } - } - - auto validateResult = OplogFetcher::validateDocuments( - documents, queryResponse.first, lastFetched.getTimestamp(), _startingPoint); - if (!validateResult.isOK()) { - return validateResult.getStatus(); - } - auto info = validateResult.getValue(); - - // Process replset metadata. It is important that this happen after we've validated the - // first batch, so we don't progress our knowledge of the commit point from a - // response that triggers a rollback. - rpc::ReplSetMetadata replSetMetadata; - bool receivedReplMetadata = - queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); - if (receivedReplMetadata) { - const auto& metadataObj = queryResponse.otherFields.metadata; - auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj); - if (!metadataResult.isOK()) { - LOGV2_ERROR(21277, - "invalid replication metadata from sync source {getSource}: " - "{metadataResult_getStatus}: {metadataObj}", - "getSource"_attr = _getSource(), - "metadataResult_getStatus"_attr = metadataResult.getStatus(), - "metadataObj"_attr = metadataObj); - return metadataResult.getStatus(); - } - replSetMetadata = metadataResult.getValue(); - - // We will only ever have OplogQueryMetadata if we have ReplSetMetadata, so it is safe - // to call processMetadata() in this if block. - invariant(oqMetadata); - _dataReplicatorExternalState->processMetadata(replSetMetadata, *oqMetadata); - } - - // Increment stats. We read all of the docs in the query. - opsReadStats.increment(info.networkDocumentCount); - networkByteStats.increment(info.networkDocumentBytes); - - // Record time for each batch. - getmoreReplStats.recordMillis(durationCount<Milliseconds>(queryResponse.elapsedMillis)); - - auto status = _enqueueDocumentsFn(firstDocToApply, documents.cend(), info); - if (!status.isOK()) { - return status; - } - - // Start skipping the first doc after at least one doc has been enqueued in the lifetime - // of this fetcher. - _startingPoint = StartingPoint::kSkipFirstDoc; - - if (_dataReplicatorExternalState->shouldStopFetching( - _getSource(), replSetMetadata, oqMetadata)) { - str::stream errMsg; - errMsg << "sync source " << _getSource().toString(); - errMsg << " (config version: " << replSetMetadata.getConfigVersion(); - // If OplogQueryMetadata was provided, its values were used to determine if we should - // stop fetching from this sync source. - if (oqMetadata) { - errMsg << "; last applied optime: " << oqMetadata->getLastOpApplied().toString(); - errMsg << "; sync source index: " << oqMetadata->getSyncSourceIndex(); - errMsg << "; primary index: " << oqMetadata->getPrimaryIndex(); - } else { - errMsg << "; last visible optime: " << replSetMetadata.getLastOpVisible().toString(); - errMsg << "; sync source index: " << replSetMetadata.getSyncSourceIndex(); - errMsg << "; primary index: " << replSetMetadata.getPrimaryIndex(); - } - errMsg << ") is no longer valid"; - return Status(ErrorCodes::InvalidSyncSource, errMsg); - } - - auto lastCommittedWithCurrentTerm = - _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(); - return makeGetMoreCommandObject(queryResponse.nss, - queryResponse.cursorId, - lastCommittedWithCurrentTerm, - _getGetMoreMaxTime(), - _batchSize); -} - -StatusWith<NewOplogFetcher::DocumentsInfo> NewOplogFetcher::validateDocuments( - const Fetcher::Documents& documents, - bool first, - Timestamp lastTS, - StartingPoint startingPoint) { - if (first && documents.empty()) { - return Status(ErrorCodes::OplogStartMissing, - str::stream() << "The first batch of oplog entries is empty, but expected at " - "least 1 document matching ts: " - << lastTS.toString()); - } - - DocumentsInfo info; - // The count of the bytes of the documents read off the network. - info.networkDocumentBytes = 0; - info.networkDocumentCount = 0; - for (auto&& doc : documents) { - info.networkDocumentBytes += doc.objsize(); - ++info.networkDocumentCount; - - // If this is the first response (to the $gte query) then we already applied the first doc. - if (first && info.networkDocumentCount == 1U) { - continue; - } - - auto docOpTime = OpTime::parseFromOplogEntry(doc); - if (!docOpTime.isOK()) { - return docOpTime.getStatus(); - } - info.lastDocument = docOpTime.getValue(); - - // Check to see if the oplog entry goes back in time for this document. - const auto docTS = info.lastDocument.getTimestamp(); - if (lastTS >= docTS) { - return Status(ErrorCodes::OplogOutOfOrder, - str::stream() << "Out of order entries in oplog. lastTS: " - << lastTS.toString() << " outOfOrderTS:" << docTS.toString() - << " in batch with " << info.networkDocumentCount - << "docs; first-batch:" << first << ", doc:" << doc); - } - lastTS = docTS; - } - - // These numbers are for the documents we will apply. - info.toApplyDocumentCount = documents.size(); - info.toApplyDocumentBytes = info.networkDocumentBytes; - if (first && startingPoint == StartingPoint::kSkipFirstDoc) { - // The count is one less since the first document found was already applied ($gte $ts query) - // and we will not apply it again. - --info.toApplyDocumentCount; - auto alreadyAppliedDocument = documents.cbegin(); - info.toApplyDocumentBytes -= alreadyAppliedDocument->objsize(); - } - - return info; -} - -NewOplogFetcher::NewOplogFetcher( - executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, - std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, - DataReplicatorExternalState* dataReplicatorExternalState, - EnqueueDocumentsFn enqueueDocumentsFn, - OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - StartingPoint startingPoint) : AbstractAsyncComponent(executor, "oplog fetcher"), _source(source), _requiredRBID(requiredRBID), @@ -721,12 +367,12 @@ NewOplogFetcher::NewOplogFetcher( invariant(enqueueDocumentsFn); } -NewOplogFetcher::~NewOplogFetcher() { +OplogFetcher::~OplogFetcher() { shutdown(); join(); } -Status NewOplogFetcher::_doStartup_inlock() noexcept { +Status OplogFetcher::_doStartup_inlock() noexcept { return _scheduleWorkAndSaveHandle_inlock( [this](const executor::TaskExecutor::CallbackArgs& args) { // Tests use this failpoint to prevent the oplog fetcher from starting. If those @@ -741,7 +387,7 @@ Status NewOplogFetcher::_doStartup_inlock() noexcept { "_runQuery"); } -void NewOplogFetcher::_doShutdown_inlock() noexcept { +void OplogFetcher::_doShutdown_inlock() noexcept { _cancelHandle_inlock(_runQueryHandle); if (_conn) { @@ -749,11 +395,11 @@ void NewOplogFetcher::_doShutdown_inlock() noexcept { } } -Mutex* NewOplogFetcher::_getMutex() noexcept { +Mutex* OplogFetcher::_getMutex() noexcept { return &_mutex; } -std::string NewOplogFetcher::toString() { +std::string OplogFetcher::toString() { stdx::lock_guard lock(_mutex); str::stream output; output << "OplogFetcher -"; @@ -769,37 +415,37 @@ std::string NewOplogFetcher::toString() { return output; } -OpTime NewOplogFetcher::getLastOpTimeFetched_forTest() const { +OpTime OplogFetcher::getLastOpTimeFetched_forTest() const { return _getLastOpTimeFetched(); } -BSONObj NewOplogFetcher::getFindQuery_forTest(long long findTimeout) const { +BSONObj OplogFetcher::getFindQuery_forTest(long long findTimeout) const { return _makeFindQuery(findTimeout); } -Milliseconds NewOplogFetcher::getAwaitDataTimeout_forTest() const { +Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const { return _awaitDataTimeout; } -void NewOplogFetcher::setCreateClientFn_forTest(const CreateClientFn& createClientFn) { +void OplogFetcher::setCreateClientFn_forTest(const CreateClientFn& createClientFn) { stdx::lock_guard lock(_mutex); _createClientFn = createClientFn; } -DBClientConnection* NewOplogFetcher::getDBClientConnection_forTest() const { +DBClientConnection* OplogFetcher::getDBClientConnection_forTest() const { stdx::lock_guard lock(_mutex); return _conn.get(); } -Milliseconds NewOplogFetcher::getInitialFindMaxTime_forTest() const { +Milliseconds OplogFetcher::getInitialFindMaxTime_forTest() const { return _getInitialFindMaxTime(); } -Milliseconds NewOplogFetcher::getRetriedFindMaxTime_forTest() const { +Milliseconds OplogFetcher::getRetriedFindMaxTime_forTest() const { return _getRetriedFindMaxTime(); } -void NewOplogFetcher::_setSocketTimeout(long long timeout) { +void OplogFetcher::_setSocketTimeout(long long timeout) { stdx::lock_guard<Latch> lock(_mutex); invariant(_conn); // setSoTimeout takes a double representing the number of seconds for send and receive @@ -808,20 +454,20 @@ void NewOplogFetcher::_setSocketTimeout(long long timeout) { _conn->setSoTimeout(timeout / 1000.0 + oplogNetworkTimeoutBufferSeconds.load()); } -OpTime NewOplogFetcher::_getLastOpTimeFetched() const { +OpTime OplogFetcher::_getLastOpTimeFetched() const { stdx::lock_guard<Latch> lock(_mutex); return _lastFetched; } -Milliseconds NewOplogFetcher::_getInitialFindMaxTime() const { +Milliseconds OplogFetcher::_getInitialFindMaxTime() const { return Milliseconds(oplogInitialFindMaxSeconds.load() * 1000); } -Milliseconds NewOplogFetcher::_getRetriedFindMaxTime() const { +Milliseconds OplogFetcher::_getRetriedFindMaxTime() const { return Milliseconds(oplogRetriedFindMaxSeconds.load() * 1000); } -void NewOplogFetcher::_finishCallback(Status status) { +void OplogFetcher::_finishCallback(Status status) { invariant(isActive()); _onShutdownCallbackFn(status); @@ -842,7 +488,7 @@ void NewOplogFetcher::_finishCallback(Status status) { std::swap(_oplogFetcherRestartDecision, oplogFetcherRestartDecision); } -void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& callbackData) noexcept { +void OplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& callbackData) noexcept { Status responseStatus = _checkForShutdownAndConvertStatus(callbackData, "error running oplog fetcher"); if (!responseStatus.isOK()) { @@ -926,7 +572,7 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call } } -Status NewOplogFetcher::_connect() { +Status OplogFetcher::_connect() { Status connectStatus = Status::OK(); do { if (_isShuttingDown()) { @@ -962,7 +608,7 @@ Status NewOplogFetcher::_connect() { return connectStatus; } -void NewOplogFetcher::_setMetadataWriterAndReader() { +void OplogFetcher::_setMetadataWriterAndReader() { invariant(_conn); _conn->setRequestMetadataWriter([this](OperationContext* opCtx, BSONObjBuilder* metadataBob) { @@ -979,7 +625,7 @@ void NewOplogFetcher::_setMetadataWriterAndReader() { }); } -BSONObj NewOplogFetcher::_makeFindQuery(long long findTimeout) const { +BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const { BSONObjBuilder queryBob; auto lastOpTimeFetched = _getLastOpTimeFetched(); @@ -1004,7 +650,7 @@ BSONObj NewOplogFetcher::_makeFindQuery(long long findTimeout) const { return queryBob.obj(); } -void NewOplogFetcher::_createNewCursor(bool initialFind) { +void OplogFetcher::_createNewCursor(bool initialFind) { invariant(_conn); // Set the socket timeout to the 'find' timeout plus a network buffer. @@ -1028,7 +674,7 @@ void NewOplogFetcher::_createNewCursor(bool initialFind) { readersCreatedStats.increment(); } -StatusWith<NewOplogFetcher::Documents> NewOplogFetcher::_getNextBatch() { +StatusWith<OplogFetcher::Documents> OplogFetcher::_getNextBatch() { Documents batch; try { Timer timer; @@ -1089,7 +735,7 @@ StatusWith<NewOplogFetcher::Documents> NewOplogFetcher::_getNextBatch() { return batch; } -Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) { +Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { hangBeforeProcessingSuccessfulBatch.pauseWhileSet(); if (_isShuttingDown()) { @@ -1195,7 +841,7 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) { } } - auto validateResult = NewOplogFetcher::validateDocuments( + auto validateResult = OplogFetcher::validateDocuments( documents, _firstBatch, lastFetched.getTimestamp(), _startingPoint); if (!validateResult.isOK()) { return validateResult.getStatus(); @@ -1281,8 +927,8 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) { return Status::OK(); } -bool NewOplogFetcher::OplogFetcherRestartDecisionDefault::shouldContinue(NewOplogFetcher* fetcher, - Status status) { +bool OplogFetcher::OplogFetcherRestartDecisionDefault::shouldContinue(OplogFetcher* fetcher, + Status status) { if (_numRestarts == _maxRestarts) { LOGV2(21274, "Error returned from oplog query (no more query restarts left): {status}", @@ -1299,12 +945,11 @@ bool NewOplogFetcher::OplogFetcherRestartDecisionDefault::shouldContinue(NewOplo return true; } -void NewOplogFetcher::OplogFetcherRestartDecisionDefault::fetchSuccessful( - NewOplogFetcher* fetcher) { +void OplogFetcher::OplogFetcherRestartDecisionDefault::fetchSuccessful(OplogFetcher* fetcher) { _numRestarts = 0; }; -NewOplogFetcher::OplogFetcherRestartDecision::~OplogFetcherRestartDecision(){}; +OplogFetcher::OplogFetcherRestartDecision::~OplogFetcherRestartDecision(){}; } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 69af79c6c6f..c24b4309227 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -38,7 +38,7 @@ #include "mongo/client/dbclient_cursor.h" #include "mongo/client/fetcher.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/repl/abstract_oplog_fetcher.h" +#include "mongo/db/repl/abstract_async_component.h" #include "mongo/db/repl/data_replicator_external_state.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/util/fail_point.h" @@ -49,165 +49,6 @@ namespace repl { extern FailPoint stopReplProducer; /** - * The oplog fetcher, once started, reads operations from a remote oplog using a tailable cursor. - * - * The initial find command is generated from last fetched optime and may contain the current term - * depending on the replica set config provided. - * - * Forwards metadata in each find/getMore response to the data replicator external state. - * - * Performs additional validation on first batch of operations returned from the query to ensure we - * are able to continue from our last known fetched operation. - * - * Validates each batch of operations. - * - * Pushes operations from each batch of operations onto a buffer using the "enqueueDocumentsFn" - * function. - * - * Issues a getMore command after successfully processing each batch of operations. - * - * When there is an error or when it is not possible to issue another getMore request, calls - * "onShutdownCallbackFn" to signal the end of processing. - * - * This class subclasses AbstractOplogFetcher which takes care of scheduling the Fetcher and - * `getMore` commands, and handles restarting on errors. - */ -class OplogFetcher : public AbstractOplogFetcher { - OplogFetcher(const OplogFetcher&) = delete; - OplogFetcher& operator=(const OplogFetcher&) = delete; - -public: - static Seconds kDefaultProtocolZeroAwaitDataTimeout; - - /** - * Statistics on current batch of operations returned by the fetcher. - */ - struct DocumentsInfo { - size_t networkDocumentCount = 0; - size_t networkDocumentBytes = 0; - size_t toApplyDocumentCount = 0; - size_t toApplyDocumentBytes = 0; - OpTime lastDocument = OpTime(); - }; - - /** - * An enum that indicates if we want to skip the first document during oplog fetching or not. - * Currently, the only time we don't want to skip the first document is during initial sync - * if the sync source has a valid oldest active transaction optime, as we need to include - * the corresponding oplog entry when applying. - */ - enum class StartingPoint { kSkipFirstDoc, kEnqueueFirstDoc }; - - /** - * Type of function that accepts a pair of iterators into a range of operations - * within the current batch of results and copies the operations into - * a buffer to be consumed by the next stage of the replication process. - * - * Additional information on the operations is provided in a DocumentsInfo - * struct. - */ - using EnqueueDocumentsFn = std::function<Status(Fetcher::Documents::const_iterator begin, - Fetcher::Documents::const_iterator end, - const DocumentsInfo& info)>; - - /** - * Validates documents in current batch of results returned from tailing the remote oplog. - * 'first' should be set to true if this set of documents is the first batch returned from the - * query. - * On success, returns statistics on operations. - */ - static StatusWith<DocumentsInfo> validateDocuments( - const Fetcher::Documents& documents, - bool first, - Timestamp lastTS, - StartingPoint startingPoint = StartingPoint::kSkipFirstDoc); - - /** - * Invariants if validation fails on any of the provided arguments. - */ - OplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - NamespaceString nss, - ReplSetConfig config, - std::size_t maxFetcherRestarts, - int requiredRBID, - bool requireFresherSyncSource, - DataReplicatorExternalState* dataReplicatorExternalState, - EnqueueDocumentsFn enqueueDocumentsFn, - OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - StartingPoint startingPoint = StartingPoint::kSkipFirstDoc); - - OplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - NamespaceString nss, - ReplSetConfig config, - std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, - DataReplicatorExternalState* dataReplicatorExternalState, - EnqueueDocumentsFn enqueueDocumentsFn, - OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - StartingPoint startingPoint = StartingPoint::kSkipFirstDoc); - - virtual ~OplogFetcher(); - - // ================== Test support API =================== - - /** - * Returns metadata object sent in remote commands. - */ - BSONObj getMetadataObject_forTest() const; - - /** - * Returns timeout for remote commands to complete. - */ - Milliseconds getRemoteCommandTimeout_forTest() const; - - /** - * Returns the await data timeout used for the "maxTimeMS" field in getMore command requests. - */ - Milliseconds getAwaitDataTimeout_forTest() const; - -private: - BSONObj _makeFindCommandObject(const NamespaceString& nss, - OpTime lastOpTimeFetched, - Milliseconds findMaxTime) const override; - - BSONObj _makeMetadataObject() const override; - - Milliseconds _getGetMoreMaxTime() const override; - - /** - * This function is run by the AbstractOplogFetcher on a successful batch of oplog entries. - */ - StatusWith<BSONObj> _onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) override; - - // The metadata object sent with the Fetcher queries. - const BSONObj _metadataObject; - - // Rollback ID that the sync source is required to have after the first batch. - int _requiredRBID; - - // A boolean indicating whether we should error if the sync source is not ahead of our initial - // last fetched OpTime on the first batch. Most of the time this should be set to true, - // but there are certain special cases, namely during initial sync, where it's acceptable for - // our sync source to have no ops newer than _lastFetched. - bool _requireFresherSyncSource; - - DataReplicatorExternalState* const _dataReplicatorExternalState; - const EnqueueDocumentsFn _enqueueDocumentsFn; - const Milliseconds _awaitDataTimeout; - const int _batchSize; - - // Indicates if we want to skip the first document during oplog fetching or not. - StartingPoint _startingPoint; -}; - -/** * The oplog fetcher, once started, reads operations from a remote oplog using a tailable, * awaitData, exhaust cursor. * @@ -231,47 +72,10 @@ private: * "onShutdownCallbackFn" to signal the end of processing. * * An oplog fetcher is an abstract async component, which takes care of startup and shutdown logic. - * - * TODO SERVER-45574: edit or remove this flowchart when the NewOplogFetcher is implemented. - * - * NewOplogFetcher flowchart: - * - * _runQuery() - * | - * | - * +---------+ - * | - * | - * V - * _createNewCursor() - * | - * | - * +<--------------------------+ - * | ^ - * | | - * _getNextBatch() | - * | | | - * | | | - * (unsuccessful batch | | (successful batch) | - * or error) | | | - * | V | - * | _onSuccessfulBatch() | - * | | | - * | | | - * | | | - * V | | - * _createNewCursor() | | - * | | | - * | | | - * +---V---+ | - * | | - * | | - * +-------------------------->+ - * */ -class NewOplogFetcher : public AbstractAsyncComponent { - NewOplogFetcher(const OplogFetcher&) = delete; - NewOplogFetcher& operator=(const OplogFetcher&) = delete; +class OplogFetcher : public AbstractAsyncComponent { + OplogFetcher(const OplogFetcher&) = delete; + OplogFetcher& operator=(const OplogFetcher&) = delete; public: /** @@ -330,27 +134,25 @@ public: * Defines which situations the oplog fetcher will restart after encountering an error. * Called when getting the next batch failed for some reason. */ - virtual bool shouldContinue(NewOplogFetcher* fetcher, Status status) = 0; + virtual bool shouldContinue(OplogFetcher* fetcher, Status status) = 0; /** * Called when a batch was successfully fetched to reset any state needed to track restarts. */ - virtual void fetchSuccessful(NewOplogFetcher* fetcher) = 0; + virtual void fetchSuccessful(OplogFetcher* fetcher) = 0; }; class OplogFetcherRestartDecisionDefault : public OplogFetcherRestartDecision { public: OplogFetcherRestartDecisionDefault(std::size_t maxRestarts) : _maxRestarts(maxRestarts){}; - bool shouldContinue(NewOplogFetcher* fetcher, Status status) final; + bool shouldContinue(OplogFetcher* fetcher, Status status) final; - void fetchSuccessful(NewOplogFetcher* fetcher) final; + void fetchSuccessful(OplogFetcher* fetcher) final; ~OplogFetcherRestartDecisionDefault(){}; private: - NewOplogFetcher* _newOplogFetcher; - // Restarts since the last successful oplog query response. std::size_t _numRestarts = 0; @@ -360,20 +162,20 @@ public: /** * Invariants if validation fails on any of the provided arguments. */ - NewOplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, - std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, - DataReplicatorExternalState* dataReplicatorExternalState, - EnqueueDocumentsFn enqueueDocumentsFn, - OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - StartingPoint startingPoint = StartingPoint::kSkipFirstDoc); - - virtual ~NewOplogFetcher(); + OplogFetcher(executor::TaskExecutor* executor, + OpTime lastFetched, + HostAndPort source, + ReplSetConfig config, + std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, + int requiredRBID, + bool requireFresherSyncSource, + DataReplicatorExternalState* dataReplicatorExternalState, + EnqueueDocumentsFn enqueueDocumentsFn, + OnShutdownCallbackFn onShutdownCallbackFn, + const int batchSize, + StartingPoint startingPoint = StartingPoint::kSkipFirstDoc); + + virtual ~OplogFetcher(); /** * Validates documents in current batch of results returned from tailing the remote oplog. diff --git a/src/mongo/db/repl/oplog_fetcher_mock.cpp b/src/mongo/db/repl/oplog_fetcher_mock.cpp index 8b69d109661..f375b6c6eb0 100644 --- a/src/mongo/db/repl/oplog_fetcher_mock.cpp +++ b/src/mongo/db/repl/oplog_fetcher_mock.cpp @@ -50,21 +50,21 @@ OplogFetcherMock::OplogFetcherMock( OnShutdownCallbackFn onShutdownCallbackFn, const int batchSize, StartingPoint startingPoint) - : NewOplogFetcher(executor, - lastFetched, - std::move(source), - std::move(config), - // Pass a dummy OplogFetcherRestartDecision to the base OplogFetcher. - std::make_unique<OplogFetcherRestartDecisionDefault>(0), - requiredRBID, - requireFresherSyncSource, - dataReplicatorExternalState, - // Pass a dummy EnqueueDocumentsFn to the base OplogFetcher. - [](const auto& a1, const auto& a2, const auto& a3) { return Status::OK(); }, - // Pass a dummy OnShutdownCallbackFn to the base OplogFetcher. - [](const auto& a) {}, - batchSize, - startingPoint), + : OplogFetcher(executor, + lastFetched, + std::move(source), + std::move(config), + // Pass a dummy OplogFetcherRestartDecision to the base OplogFetcher. + std::make_unique<OplogFetcherRestartDecisionDefault>(0), + requiredRBID, + requireFresherSyncSource, + dataReplicatorExternalState, + // Pass a dummy EnqueueDocumentsFn to the base OplogFetcher. + [](const auto& a1, const auto& a2, const auto& a3) { return Status::OK(); }, + // Pass a dummy OnShutdownCallbackFn to the base OplogFetcher. + [](const auto& a) {}, + batchSize, + startingPoint), _oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)), _onShutdownCallbackFn(std::move(onShutdownCallbackFn)), _enqueueDocumentsFn(std::move(enqueueDocumentsFn)), @@ -88,7 +88,7 @@ void OplogFetcherMock::receiveBatch(CursorId cursorId, Fetcher::Documents docume _oplogFetcherRestartDecision->fetchSuccessful(this); } - auto validateResult = NewOplogFetcher::validateDocuments( + auto validateResult = OplogFetcher::validateDocuments( documents, _first, _getLastOpTimeFetched().getTimestamp(), _startingPoint); // Set _first to false after receiving the first batch. diff --git a/src/mongo/db/repl/oplog_fetcher_mock.h b/src/mongo/db/repl/oplog_fetcher_mock.h index cf2350e2b41..02a367b2ea9 100644 --- a/src/mongo/db/repl/oplog_fetcher_mock.h +++ b/src/mongo/db/repl/oplog_fetcher_mock.h @@ -33,7 +33,7 @@ namespace mongo { namespace repl { -class OplogFetcherMock : public NewOplogFetcher { +class OplogFetcherMock : public OplogFetcher { public: explicit OplogFetcherMock( executor::TaskExecutor* executor, diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index a3f9b5cb242..144de2076bd 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -27,22 +27,19 @@ * it in the license file. */ -#include "mongo/platform/basic.h" - #include <memory> -#include "mongo/db/repl/abstract_oplog_fetcher_test_fixture.h" #include "mongo/db/repl/data_replicator_external_state_mock.h" #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/task_executor_mock.h" #include "mongo/db/service_context_test_fixture.h" #include "mongo/dbtests/mock/mock_dbclient_connection.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/unittest/death_test.h" -#include "mongo/unittest/ensure_fcv.h" #include "mongo/unittest/task_executor_proxy.h" #include "mongo/unittest/unittest.h" #include "mongo/util/fail_point.h" @@ -54,94 +51,6 @@ using namespace mongo; using namespace mongo::repl; using namespace unittest; -using executor::RemoteCommandRequest; -using executor::RemoteCommandResponse; -using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard; - -class OplogFetcherTest : public AbstractOplogFetcherTest { -protected: - void setUp() override; - - /** - * Starts an oplog fetcher. Processes a single batch of results from - * the oplog query and shuts down. - * Returns shutdown state. - */ - - // 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use. - const int defaultBatchSize = (16 * 1024 * 1024) / 12 * 10; - - std::unique_ptr<ShutdownState> processSingleBatch(executor::RemoteCommandResponse response, - bool requireFresherSyncSource = true); - std::unique_ptr<ShutdownState> processSingleBatch(BSONObj obj, - bool requireFresherSyncSource = true); - - /** - * Makes an OplogQueryMetadata object with the given fields and a stale committed OpTime. - */ - BSONObj makeOplogQueryMetadataObject(OpTime lastAppliedOpTime, - int rbid, - int primaryIndex, - int syncSourceIndex); - - /** - * Tests checkSyncSource result handling. - */ - void testSyncSourceChecking(rpc::ReplSetMetadata* replMetadata, - rpc::OplogQueryMetadata* oqMetadata); - - /** - * Tests handling of two batches of operations returned from query. - * Returns getMore request. - */ - RemoteCommandRequest testTwoBatchHandling(); - - OpTime remoteNewerOpTime; - OpTime staleOpTime; - Date_t staleWallTime; - int rbid; - - std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState; - - Fetcher::Documents lastEnqueuedDocuments; - OplogFetcher::DocumentsInfo lastEnqueuedDocumentsInfo; - OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn; - - std::unique_ptr<OplogFetcher> makeOplogFetcher(ReplSetConfig config); -}; - -void OplogFetcherTest::setUp() { - AbstractOplogFetcherTest::setUp(); - - remoteNewerOpTime = {{124, 1}, 2}; - staleOpTime = {{1, 1}, 0}; - staleWallTime = Date_t() + Seconds(staleOpTime.getSecs()); - rbid = 2; - - dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateMock>(); - dataReplicatorExternalState->currentTerm = lastFetched.getTerm(); - dataReplicatorExternalState->lastCommittedOpTime = {{9999, 0}, lastFetched.getTerm()}; - - enqueueDocumentsFn = [this](Fetcher::Documents::const_iterator begin, - Fetcher::Documents::const_iterator end, - const OplogFetcher::DocumentsInfo& info) -> Status { - lastEnqueuedDocuments = {begin, end}; - lastEnqueuedDocumentsInfo = info; - return Status::OK(); - }; -} - -BSONObj OplogFetcherTest::makeOplogQueryMetadataObject(OpTime lastAppliedOpTime, - int rbid, - int primaryIndex, - int syncSourceIndex) { - rpc::OplogQueryMetadata oqMetadata( - {staleOpTime, staleWallTime}, lastAppliedOpTime, rbid, primaryIndex, syncSourceIndex); - BSONObjBuilder bob; - ASSERT_OK(oqMetadata.writeToMetadata(&bob)); - return bob.obj(); -} - HostAndPort source("localhost:12345"); NamespaceString nss("local.oplog.rs"); @@ -166,869 +75,12 @@ ReplSetConfig _createConfig() { return config; } -std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(RemoteCommandResponse response, - bool requireFresherSyncSource) { - auto shutdownState = std::make_unique<ShutdownState>(); - - OplogFetcher oplogFetcher(&getExecutor(), - lastFetched, - source, - nss, - _createConfig(), - 0, - rbid, - requireFresherSyncSource, - dataReplicatorExternalState.get(), - enqueueDocumentsFn, - std::ref(*shutdownState), - defaultBatchSize); - - ASSERT_FALSE(oplogFetcher.isActive()); - ASSERT_OK(oplogFetcher.startup()); - ASSERT_TRUE(oplogFetcher.isActive()); - - auto request = processNetworkResponse(response); - - ASSERT_BSONOBJ_EQ(oplogFetcher.getCommandObject_forTest(), request.cmdObj); - ASSERT_BSONOBJ_EQ(oplogFetcher.getMetadataObject_forTest(), request.metadata); - - oplogFetcher.shutdown(); - oplogFetcher.join(); - - return shutdownState; -} - -std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(BSONObj obj, - bool requireFresherSyncSource) { - return processSingleBatch({obj, Milliseconds(0)}, requireFresherSyncSource); -} - -void _checkDefaultCommandObjectFields(BSONObj cmdObj) { - ASSERT_EQUALS(std::string("find"), cmdObj.firstElementFieldName()); - ASSERT_TRUE(cmdObj.getBoolField("tailable")); - ASSERT_TRUE(cmdObj.getBoolField("oplogReplay")); - ASSERT_TRUE(cmdObj.getBoolField("awaitData")); - ASSERT_EQUALS(60000, cmdObj.getIntField("maxTimeMS")); -} - -std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcher(ReplSetConfig config) { - return std::make_unique<OplogFetcher>(&getExecutor(), - lastFetched, - source, - nss, - config, - 0, - -1, - true, - dataReplicatorExternalState.get(), - enqueueDocumentsFn, - [](Status) {}, - defaultBatchSize); -} - BSONObj concatenate(BSONObj a, const BSONObj& b) { auto bob = BSONObjBuilder(std::move(a)); bob.appendElements(b); return bob.obj(); } -TEST_F( - OplogFetcherTest, - FindQueryContainsTermAndStartTimestampIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) { - auto cmdObj = makeOplogFetcher(_createConfig())->getFindQuery_forTest(); - ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type()); - ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())), - cmdObj["filter"].Obj()); - ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, cmdObj["term"].numberLong()); - _checkDefaultCommandObjectFields(cmdObj); -} - -TEST_F(OplogFetcherTest, - FindQueryDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm) { - dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm; - auto cmdObj = makeOplogFetcher(_createConfig())->getFindQuery_forTest(); - ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type()); - ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())), - cmdObj["filter"].Obj()); - ASSERT_FALSE(cmdObj.hasField("term")); - _checkDefaultCommandObjectFields(cmdObj); -} - -TEST_F(OplogFetcherTest, MetadataObjectContainsMetadataFieldsUnderProtocolVersion1) { - auto metadataObj = makeOplogFetcher(_createConfig())->getMetadataObject_forTest(); - ASSERT_EQUALS(3, metadataObj.nFields()); - ASSERT_EQUALS(1, metadataObj[rpc::kReplSetMetadataFieldName].numberInt()); - ASSERT_EQUALS(1, metadataObj[rpc::kOplogQueryMetadataFieldName].numberInt()); -} - -TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeoutUnderProtocolVersion1) { - auto config = _createConfig(); - auto timeout = makeOplogFetcher(config)->getAwaitDataTimeout_forTest(); - ASSERT_EQUALS(config.getElectionTimeoutPeriod() / 2, timeout); -} - -TEST_F(OplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher) { - auto shutdownState = - processSingleBatch({concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), - BSON(rpc::kReplSetMetadataFieldName - << BSON("invalid_repl_metadata_field" << 1))), - Milliseconds(0)}); - - ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus()); -} - -TEST_F(OplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher) { - auto shutdownState = - processSingleBatch({concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), - BSON(rpc::kOplogQueryMetadataFieldName - << BSON("invalid_oq_metadata_field" << 1))), - Milliseconds(0)}); - - ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus()); -} - -DEATH_TEST_F(OplogFetcherTest, - ValidMetadataInResponseWithoutOplogMetadataInvariants, - "Invariant failure oqMetadata") { - rpc::ReplSetMetadata metadata( - 1, {lastFetched, lastFetchedWall}, lastFetched, 1, OID::gen(), 2, 2); - BSONObjBuilder bob; - ASSERT_OK(metadata.writeToMetadata(&bob)); - auto metadataObj = bob.obj(); - - processSingleBatch( - {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj), - Milliseconds(0)}); -} - -TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) { - rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2); - BSONObjBuilder bob; - ASSERT_OK(replMetadata.writeToMetadata(&bob)); - ASSERT_OK(oqMetadata.writeToMetadata(&bob)); - auto metadataObj = bob.obj(); - ASSERT_OK( - processSingleBatch( - {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj), - Milliseconds(0)}) - ->getStatus()); - ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed); - ASSERT_EQUALS(replMetadata.getPrimaryIndex(), - dataReplicatorExternalState->replMetadataProcessed.getPrimaryIndex()); - ASSERT_EQUALS(oqMetadata.getPrimaryIndex(), - dataReplicatorExternalState->oqMetadataProcessed.getPrimaryIndex()); -} - -TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) { - rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata( - {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid + 1, 2, 2); - BSONObjBuilder bob; - ASSERT_OK(replMetadata.writeToMetadata(&bob)); - ASSERT_OK(oqMetadata.writeToMetadata(&bob)); - auto metadataObj = bob.obj(); - - ASSERT_EQUALS( - ErrorCodes::InvalidSyncSource, - processSingleBatch( - {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj), - Milliseconds(0)}) - ->getStatus()); - ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); - ASSERT(lastEnqueuedDocuments.empty()); -} - -TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) { - rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2); - BSONObjBuilder bob; - ASSERT_OK(replMetadata.writeToMetadata(&bob)); - ASSERT_OK(oqMetadata.writeToMetadata(&bob)); - auto metadataObj = bob.obj(); - - ASSERT_EQUALS( - ErrorCodes::InvalidSyncSource, - processSingleBatch( - {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj), - Milliseconds(0)}) - ->getStatus()); - ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); - ASSERT(lastEnqueuedDocuments.empty()); -} - -TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) { - rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2); - BSONObjBuilder bob; - ASSERT_OK(replMetadata.writeToMetadata(&bob)); - ASSERT_OK(oqMetadata.writeToMetadata(&bob)); - auto metadataObj = bob.obj(); - - ASSERT_EQUALS( - ErrorCodes::InvalidSyncSource, - processSingleBatch( - {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj), - Milliseconds(0)}) - ->getStatus()); - ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); - ASSERT(lastEnqueuedDocuments.empty()); -} - -TEST_F(OplogFetcherTest, - MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) { - rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2); - BSONObjBuilder bob; - ASSERT_OK(replMetadata.writeToMetadata(&bob)); - ASSERT_OK(oqMetadata.writeToMetadata(&bob)); - auto metadataObj = bob.obj(); - - auto entry = makeNoopOplogEntry(staleOpTime); - ASSERT_EQUALS( - ErrorCodes::InvalidSyncSource, - processSingleBatch( - {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false) - ->getStatus()); - ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); - ASSERT(lastEnqueuedDocuments.empty()); -} - -TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButMetadataIsStale) { - // This tests the case where the sync source metadata is behind us but we get a document which - // is equal to us. Since that means the metadata is stale and can be ignored, we should accept - // this sync source. - rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, staleOpTime, rbid, 2, 2); - BSONObjBuilder bob; - ASSERT_OK(replMetadata.writeToMetadata(&bob)); - ASSERT_OK(oqMetadata.writeToMetadata(&bob)); - auto metadataObj = bob.obj(); - - auto entry = makeNoopOplogEntry(lastFetched); - auto shutdownState = processSingleBatch( - {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false); - ASSERT_OK(shutdownState->getStatus()); - ASSERT(dataReplicatorExternalState->metadataWasProcessed); -} - -TEST_F(OplogFetcherTest, - MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) { - rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2); - BSONObjBuilder bob; - ASSERT_OK(replMetadata.writeToMetadata(&bob)); - ASSERT_OK(oqMetadata.writeToMetadata(&bob)); - auto metadataObj = bob.obj(); - - auto entry = makeNoopOplogEntry(lastFetched); - auto shutdownState = processSingleBatch( - {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false); - ASSERT_OK(shutdownState->getStatus()); - ASSERT(dataReplicatorExternalState->metadataWasProcessed); -} - -TEST_F(OplogFetcherTest, - MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) { - rpc::ReplSetMetadata metadata( - 1, {lastFetched, lastFetchedWall}, lastFetched, 1, OID::gen(), 2, 2); - BSONObjBuilder bob; - ASSERT_OK(metadata.writeToMetadata(&bob)); - auto metadataObj = bob.obj(); - ASSERT_EQUALS( - ErrorCodes::OplogStartMissing, - processSingleBatch( - {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456))}), metadataObj), - Milliseconds(0)}) - ->getStatus()); - ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); -} - -TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { - rpc::ReplSetMetadata replMetadata(1, {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, 2, 2); - BSONObjBuilder bob; - ASSERT_OK(replMetadata.writeToMetadata(&bob)); - ASSERT_OK(oqMetadata.writeToMetadata(&bob)); - auto metadataObj = bob.obj(); - ASSERT_EQUALS( - ErrorCodes::OplogStartMissing, - processSingleBatch( - {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456))}), metadataObj), - Milliseconds(0)}) - ->getStatus()); - ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); -} - -TEST_F(OplogFetcherTest, EmptyMetadataIsNotProcessed) { - ASSERT_OK(processSingleBatch( - {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), Milliseconds(0)}) - ->getStatus()); - ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); -} - -TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingError) { - ASSERT_EQUALS(ErrorCodes::OplogStartMissing, - processSingleBatch(makeCursorResponse(0, {}))->getStatus()); -} - -TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - ASSERT_EQUALS(ErrorCodes::InvalidBSON, - processSingleBatch({concatenate(makeCursorResponse(0, {BSONObj()}), metadataObj), - Milliseconds(0)}) - ->getStatus()); -} - -TEST_F( - OplogFetcherTest, - LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - ASSERT_EQUALS( - ErrorCodes::OplogStartMissing, - processSingleBatch( - {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456))}), metadataObj), - Milliseconds(0)}) - ->getStatus()); -} - -TEST_F(OplogFetcherTest, - MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - ASSERT_EQUALS( - ErrorCodes::NoSuchKey, - processSingleBatch( - {concatenate(makeCursorResponse(0, - {makeNoopOplogEntry(lastFetched), - BSON("o" << BSON("msg" - << "oplog entry without optime"))}), - metadataObj), - Milliseconds(0)}) - ->getStatus()); -} - -TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - ASSERT_EQUALS( - ErrorCodes::OplogOutOfOrder, - processSingleBatch({concatenate(makeCursorResponse(0, - {makeNoopOplogEntry(lastFetched), - makeNoopOplogEntry(Seconds(1000)), - makeNoopOplogEntry(Seconds(2000)), - makeNoopOplogEntry(Seconds(1500))}), - metadataObj), - Milliseconds(0)}) - ->getStatus()); -} - -TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - - auto firstEntry = makeNoopOplogEntry(lastFetched); - auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); - Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry}; - - auto shutdownState = processSingleBatch( - {concatenate(makeCursorResponse(0, documents), metadataObj), Milliseconds(0)}); - - ASSERT_EQUALS(2U, lastEnqueuedDocuments.size()); - ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]); - ASSERT_BSONOBJ_EQ(thirdEntry, lastEnqueuedDocuments[1]); - - ASSERT_EQUALS(3U, lastEnqueuedDocumentsInfo.networkDocumentCount); - ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()), - lastEnqueuedDocumentsInfo.networkDocumentBytes); - - ASSERT_EQUALS(2U, lastEnqueuedDocumentsInfo.toApplyDocumentCount); - ASSERT_EQUALS(size_t(secondEntry.objsize() + thirdEntry.objsize()), - lastEnqueuedDocumentsInfo.toApplyDocumentBytes); - - ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), - lastEnqueuedDocumentsInfo.lastDocument); - - // The last fetched optime should be updated after pushing the operations into the - // buffer and reflected in the shutdown callback arguments. - ASSERT_OK(shutdownState->getStatus()); -} - -TEST_F(OplogFetcherTest, - OplogFetcherShouldNotDuplicateFirstDocWithEnqueueFirstDocOnErrorAfterFirstDoc) { - - // This function verifies that every oplog entry is only enqueued once. - OpTime lastEnqueuedOpTime = OpTime(); - enqueueDocumentsFn = [&lastEnqueuedOpTime](Fetcher::Documents::const_iterator begin, - Fetcher::Documents::const_iterator end, - const OplogFetcher::DocumentsInfo&) -> Status { - auto count = 0; - auto toEnqueueOpTime = OpTime(); - - for (auto i = begin; i != end; ++i) { - count++; - - toEnqueueOpTime = OplogEntry(*i).getOpTime(); - ASSERT_GREATER_THAN(toEnqueueOpTime, lastEnqueuedOpTime); - lastEnqueuedOpTime = toEnqueueOpTime; - } - - ASSERT_EQ(1, count); - return Status::OK(); - }; - - auto shutdownState = std::make_unique<ShutdownState>(); - OplogFetcher oplogFetcher(&getExecutor(), - lastFetched, - source, - nss, - _createConfig(), - 1 /* maxFetcherRestarts */, - rbid, - false /* requireFresherSyncSource */, - dataReplicatorExternalState.get(), - enqueueDocumentsFn, - std::ref(*shutdownState), - defaultBatchSize, - OplogFetcher::StartingPoint::kEnqueueFirstDoc); - - ASSERT_FALSE(oplogFetcher.isActive()); - ASSERT_OK(oplogFetcher.startup()); - ASSERT_TRUE(oplogFetcher.isActive()); - - auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()}); - auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - - // Only send over the first entry. Save the second for the getMore request. - processNetworkResponse( - {concatenate(makeCursorResponse(22L, {firstEntry}), metadataObj), Milliseconds(0)}, true); - - // Simulate an error right before receiving the second entry. - auto request = processNetworkResponse(RemoteCommandResponse(ErrorCodes::QueryPlanKilled, - "Simulating failure for test.", - Milliseconds(0)), - true); - ASSERT_EQUALS(std::string("getMore"), request.cmdObj.firstElementFieldName()); - - // Resend all data for the retry. The enqueueDocumentsFn will check to make sure that - // the first entry was not enqueued twice. - request = processNetworkResponse( - {concatenate(makeCursorResponse(0, {firstEntry, secondEntry}), metadataObj), - Milliseconds(0)}, - false); - - ASSERT_EQUALS(std::string("find"), request.cmdObj.firstElementFieldName()); - ASSERT_EQUALS("oplog.rs", request.cmdObj["find"].String()); - - ASSERT(request.cmdObj["filter"].ok()); - ASSERT(request.cmdObj["filter"]["ts"].ok()); - ASSERT(request.cmdObj["filter"]["ts"]["$gte"].ok()); - ASSERT_EQUALS(firstEntry["ts"].timestamp(), request.cmdObj["filter"]["ts"]["$gte"].timestamp()); - - oplogFetcher.join(); - ASSERT_OK(shutdownState->getStatus()); -} - -TEST_F(OplogFetcherTest, - OplogFetcherShouldNotDuplicateFirstDocWithEnqueueFirstDocOnErrorAfterSecondDoc) { - - // This function verifies that every oplog entry is only enqueued once. - OpTime lastEnqueuedOpTime = OpTime(); - enqueueDocumentsFn = [&lastEnqueuedOpTime](Fetcher::Documents::const_iterator begin, - Fetcher::Documents::const_iterator end, - const OplogFetcher::DocumentsInfo&) -> Status { - auto count = 0; - auto toEnqueueOpTime = OpTime(); - - for (auto i = begin; i != end; ++i) { - count++; - - toEnqueueOpTime = OplogEntry(*i).getOpTime(); - ASSERT_GREATER_THAN(toEnqueueOpTime, lastEnqueuedOpTime); - lastEnqueuedOpTime = toEnqueueOpTime; - } - - ASSERT_NOT_GREATER_THAN(count, 2); - return Status::OK(); - }; - - auto shutdownState = std::make_unique<ShutdownState>(); - OplogFetcher oplogFetcher(&getExecutor(), - lastFetched, - source, - nss, - _createConfig(), - 1 /* maxFetcherRestarts */, - rbid, - false /* requireFresherSyncSource */, - dataReplicatorExternalState.get(), - enqueueDocumentsFn, - std::ref(*shutdownState), - defaultBatchSize, - OplogFetcher::StartingPoint::kEnqueueFirstDoc); - - ASSERT_FALSE(oplogFetcher.isActive()); - ASSERT_OK(oplogFetcher.startup()); - ASSERT_TRUE(oplogFetcher.isActive()); - - auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()}); - auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - - // Only send over the first two entries. Save the third for the getMore request. - processNetworkResponse( - {concatenate(makeCursorResponse(22L, {firstEntry, secondEntry}), metadataObj), - Milliseconds(0)}, - true); - - // Simulate an error right before receiving the third entry. - auto request = processNetworkResponse(RemoteCommandResponse(ErrorCodes::QueryPlanKilled, - "Simulating failure for test.", - Milliseconds(0)), - true); - ASSERT_EQUALS(std::string("getMore"), request.cmdObj.firstElementFieldName()); - - // Resend all data for the retry. The enqueueDocumentsFn will check to make sure that - // the first entry was not enqueued twice. - request = processNetworkResponse( - {concatenate(makeCursorResponse(0, {secondEntry, thirdEntry}), metadataObj), - Milliseconds(0)}, - false); - - ASSERT_EQUALS(std::string("find"), request.cmdObj.firstElementFieldName()); - ASSERT_EQUALS("oplog.rs", request.cmdObj["find"].String()); - - ASSERT(request.cmdObj["filter"].ok()); - ASSERT(request.cmdObj["filter"]["ts"].ok()); - ASSERT(request.cmdObj["filter"]["ts"]["$gte"].ok()); - ASSERT_EQUALS(secondEntry["ts"].timestamp(), - request.cmdObj["filter"]["ts"]["$gte"].timestamp()); - - oplogFetcher.join(); - ASSERT_OK(shutdownState->getStatus()); -} - -TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - - auto firstEntry = makeNoopOplogEntry(lastFetched); - auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); - Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry}; - - enqueueDocumentsFn = [](Fetcher::Documents::const_iterator, - Fetcher::Documents::const_iterator, - const OplogFetcher::DocumentsInfo&) -> Status { - return Status(ErrorCodes::InternalError, "my custom error"); - }; - - auto shutdownState = processSingleBatch( - {concatenate(makeCursorResponse(0, documents), metadataObj), Milliseconds(0)}); - ASSERT_EQ(shutdownState->getStatus(), Status(ErrorCodes::InternalError, "my custom error")); -} - -void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* replMetadata, - rpc::OplogQueryMetadata* oqMetadata) { - auto firstEntry = makeNoopOplogEntry(lastFetched); - auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); - Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry}; - - BSONObjBuilder bob; - if (replMetadata) { - ASSERT_OK(replMetadata->writeToMetadata(&bob)); - } - if (oqMetadata) { - ASSERT_OK(oqMetadata->writeToMetadata(&bob)); - } - BSONObj metadataObj = bob.obj(); - - dataReplicatorExternalState->shouldStopFetchingResult = true; - - auto shutdownState = processSingleBatch( - {concatenate(makeCursorResponse(0, documents), metadataObj), Milliseconds(0)}); - - // Sync source checking happens after we have successfully pushed the operations into - // the buffer for the next replication phase (eg. applier). - // The last fetched optime should be reflected in the shutdown callback arguments. - ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus()); -} - -TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) { - testSyncSourceChecking(nullptr, nullptr); - - // Sync source optime and "hasSyncSource" are not available if the response does not - // contain metadata. - ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); - ASSERT_EQUALS(OpTime(), dataReplicatorExternalState->syncSourceLastOpTime); - ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); -} - -TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) { - rpc::ReplSetMetadata replMetadata( - lastFetched.getTerm(), {OpTime(), Date_t()}, OpTime(), 1, OID::gen(), -1, -1); - OpTime committedOpTime = {{Seconds(10000), 0}, 1}; - rpc::OplogQueryMetadata oqMetadata( - {committedOpTime, Date_t() + Seconds(committedOpTime.getSecs())}, - {{Seconds(20000), 0}, 1}, - rbid, - 2, - 2); - - testSyncSourceChecking(&replMetadata, &oqMetadata); - - // Sync source optime and "hasSyncSource" can be set if the respone contains metadata. - ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); - ASSERT_EQUALS(oqMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime); - ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource); -} - -TEST_F(OplogFetcherTest, - FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) { - OpTime committedOpTime = {{Seconds(10000), 0}, 1}; - rpc::ReplSetMetadata replMetadata( - lastFetched.getTerm(), - {committedOpTime, Date_t() + Seconds(committedOpTime.getSecs())}, - {{Seconds(20000), 0}, 1}, - 1, - OID::gen(), - 2, - 2); - rpc::OplogQueryMetadata oqMetadata( - {committedOpTime, Date_t() + Seconds(committedOpTime.getSecs())}, - {{Seconds(20000), 0}, 1}, - rbid, - 2, - -1); - - testSyncSourceChecking(&replMetadata, &oqMetadata); - - // Sync source "hasSyncSource" is derived from metadata. - ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); - ASSERT_EQUALS(oqMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime); - ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); -} - -RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling() { - ShutdownState shutdownState; - - OplogFetcher oplogFetcher(&getExecutor(), - lastFetched, - source, - nss, - _createConfig(), - 0, - rbid, - true, - dataReplicatorExternalState.get(), - enqueueDocumentsFn, - std::ref(shutdownState), - defaultBatchSize); - ASSERT_EQUALS(OplogFetcher::State::kPreStart, oplogFetcher.getState_forTest()); - - ASSERT_OK(oplogFetcher.startup()); - ASSERT_EQUALS(OplogFetcher::State::kRunning, oplogFetcher.getState_forTest()); - - CursorId cursorId = 22LL; - auto firstEntry = makeNoopOplogEntry(lastFetched); - auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - processNetworkResponse( - {concatenate(makeCursorResponse(cursorId, {firstEntry, secondEntry}), metadataObj), - Milliseconds(0)}, - true); - - ASSERT_EQUALS(1U, lastEnqueuedDocuments.size()); - ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]); - - // Set cursor ID to 0 in getMore response to indicate no more data available. - auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); - auto fourthEntry = makeNoopOplogEntry({{Seconds(1200), 0}, lastFetched.getTerm()}); - auto request = processNetworkResponse(makeCursorResponse(0, {thirdEntry, fourthEntry}, false)); - - ASSERT_EQUALS(std::string("getMore"), request.cmdObj.firstElementFieldName()); - ASSERT_EQUALS(nss.coll(), request.cmdObj["collection"].String()); - ASSERT_EQUALS(int(durationCount<Milliseconds>(oplogFetcher.getAwaitDataTimeout_forTest())), - request.cmdObj.getIntField("maxTimeMS")); - - ASSERT_EQUALS(2U, lastEnqueuedDocuments.size()); - ASSERT_BSONOBJ_EQ(thirdEntry, lastEnqueuedDocuments[0]); - ASSERT_BSONOBJ_EQ(fourthEntry, lastEnqueuedDocuments[1]); - - oplogFetcher.join(); - ASSERT_EQUALS(OplogFetcher::State::kComplete, oplogFetcher.getState_forTest()); - - ASSERT_OK(shutdownState.getStatus()); - - return request; -} - -TEST_F( - OplogFetcherTest, - NoDataAvailableAfterFirstTwoBatchesShouldCauseTheOplogFetcherToShutDownWithSuccessfulStatus) { - auto request = testTwoBatchHandling(); - ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, request.cmdObj["term"].numberLong()); - ASSERT_EQUALS(dataReplicatorExternalState->lastCommittedOpTime, - unittest::assertGet(OpTime::parseFromOplogEntry( - request.cmdObj["lastKnownCommittedOpTime"].Obj()))); -} - -TEST_F(OplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) { - auto firstEntry = makeNoopOplogEntry(Seconds(123)); - auto secondEntry = BSON("o" << BSON("msg" - << "oplog entry without optime")); - - ASSERT_EQUALS(ErrorCodes::NoSuchKey, - OplogFetcher::validateDocuments( - {firstEntry, secondEntry}, - true, - unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()) - .getStatus()); -} - -TEST_F( - OplogFetcherTest, - ValidateDocumentsReturnsOutOfOrderIfTimestampInFirstEntryIsEqualToLastTimestampAndNotProcessingFirstBatch) { - auto firstEntry = makeNoopOplogEntry(Seconds(123)); - auto secondEntry = makeNoopOplogEntry(Seconds(456)); - - ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, - OplogFetcher::validateDocuments( - {firstEntry, secondEntry}, - false, - unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()) - .getStatus()); -} - -TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInSecondEntryIsBeforeFirst) { - auto firstEntry = makeNoopOplogEntry(Seconds(456)); - auto secondEntry = makeNoopOplogEntry(Seconds(123)); - - ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, - OplogFetcher::validateDocuments( - {firstEntry, secondEntry}, - true, - unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()) - .getStatus()); -} - -TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEntryIsBeforeSecond) { - auto firstEntry = makeNoopOplogEntry(Seconds(123)); - auto secondEntry = makeNoopOplogEntry(Seconds(789)); - auto thirdEntry = makeNoopOplogEntry(Seconds(456)); - - ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, - OplogFetcher::validateDocuments( - {firstEntry, secondEntry, thirdEntry}, - true, - unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()) - .getStatus()); -} - -TEST_F( - OplogFetcherTest, - ValidateDocumentsExcludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatchAndSkipFirstDoc) { - auto firstEntry = makeNoopOplogEntry(Seconds(123)); - auto secondEntry = makeNoopOplogEntry(Seconds(456)); - auto thirdEntry = makeNoopOplogEntry(Seconds(789)); - - auto info = unittest::assertGet(OplogFetcher::validateDocuments( - {firstEntry, secondEntry, thirdEntry}, - true, - unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp(), - mongo::repl::OplogFetcher::StartingPoint::kSkipFirstDoc)); - - ASSERT_EQUALS(3U, info.networkDocumentCount); - ASSERT_EQUALS(2U, info.toApplyDocumentCount); - ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()), - info.networkDocumentBytes); - ASSERT_EQUALS(size_t(secondEntry.objsize() + thirdEntry.objsize()), info.toApplyDocumentBytes); - - ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument); -} - -TEST_F( - OplogFetcherTest, - ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatchAndEnqueueFirstDoc) { - auto firstEntry = makeNoopOplogEntry(Seconds(123)); - auto secondEntry = makeNoopOplogEntry(Seconds(456)); - auto thirdEntry = makeNoopOplogEntry(Seconds(789)); - - auto info = unittest::assertGet(OplogFetcher::validateDocuments( - {firstEntry, secondEntry, thirdEntry}, - true, - unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp(), - mongo::repl::OplogFetcher::StartingPoint::kEnqueueFirstDoc)); - - ASSERT_EQUALS(3U, info.networkDocumentCount); - ASSERT_EQUALS(3U, info.toApplyDocumentCount); - ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()), - info.networkDocumentBytes); - ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()), - info.toApplyDocumentBytes); - - ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument); -} - -TEST_F(OplogFetcherTest, - ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfNotProcessingFirstBatch) { - auto firstEntry = makeNoopOplogEntry(Seconds(123)); - auto secondEntry = makeNoopOplogEntry(Seconds(456)); - auto thirdEntry = makeNoopOplogEntry(Seconds(789)); - - auto info = unittest::assertGet(OplogFetcher::validateDocuments( - {firstEntry, secondEntry, thirdEntry}, false, Timestamp(Seconds(100), 0))); - - ASSERT_EQUALS(3U, info.networkDocumentCount); - ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()), - info.networkDocumentBytes); - - ASSERT_EQUALS(info.networkDocumentCount, info.toApplyDocumentCount); - ASSERT_EQUALS(info.networkDocumentBytes, info.toApplyDocumentBytes); - - ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument); -} - -TEST_F(OplogFetcherTest, - ValidateDocumentsReturnsDefaultLastDocumentOpTimeWhenThereAreNoDocumentsToApply) { - auto firstEntry = makeNoopOplogEntry(Seconds(123)); - - auto info = unittest::assertGet(OplogFetcher::validateDocuments( - {firstEntry}, - true, - unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())); - - ASSERT_EQUALS(1U, info.networkDocumentCount); - ASSERT_EQUALS(size_t(firstEntry.objsize()), info.networkDocumentBytes); - - ASSERT_EQUALS(0U, info.toApplyDocumentCount); - ASSERT_EQUALS(0U, info.toApplyDocumentBytes); - - ASSERT_EQUALS(OpTime(), info.lastDocument); -} - -TEST_F(OplogFetcherTest, - ValidateDocumentsReturnsOplogStartMissingWhenThereAreNoDocumentsWhenProcessingFirstBatch) { - ASSERT_EQUALS( - ErrorCodes::OplogStartMissing, - OplogFetcher::validateDocuments({}, true, Timestamp(Seconds(100), 0)).getStatus()); -} - -TEST_F(OplogFetcherTest, - ValidateDocumentsReturnsDefaultInfoWhenThereAreNoDocumentsWhenNotProcessingFirstBatch) { - auto info = - unittest::assertGet(OplogFetcher::validateDocuments({}, false, Timestamp(Seconds(100), 0))); - - ASSERT_EQUALS(0U, info.networkDocumentCount); - ASSERT_EQUALS(0U, info.networkDocumentBytes); - - ASSERT_EQUALS(0U, info.toApplyDocumentCount); - ASSERT_EQUALS(0U, info.toApplyDocumentBytes); - - ASSERT_EQUALS(OpTime(), info.lastDocument); -} - BSONObj makeNoopOplogEntry(OpTime opTime) { auto oplogEntry = repl::OplogEntry(opTime, // optime @@ -1067,14 +119,14 @@ BSONObj makeOplogBatchMetadata(boost::optional<const rpc::ReplSetMetadata&> repl } Message makeFirstBatch(CursorId cursorId, - const NewOplogFetcher::Documents& oplogEntries, + const OplogFetcher::Documents& oplogEntries, const BSONObj& metadata) { return MockDBClientConnection::mockFindResponse( NamespaceString::kRsOplogNamespace, cursorId, oplogEntries, metadata); } Message makeSubsequentBatch(CursorId cursorId, - const NewOplogFetcher::Documents& oplogEntries, + const OplogFetcher::Documents& oplogEntries, const BSONObj& metadata, bool moreToCome) { return MockDBClientConnection::mockGetMoreResponse( @@ -1182,8 +234,39 @@ void simulateNetworkDisconnect(DBClientConnection* conn) { mockConn->shutdown(); } -class NewOplogFetcherTest : public executor::ThreadPoolExecutorTest, - public ScopedGlobalServiceContextForTest { +class ShutdownState { + ShutdownState(const ShutdownState&) = delete; + ShutdownState& operator=(const ShutdownState&) = delete; + +public: + ShutdownState(); + + /** + * Returns the status at shutdown. + */ + Status getStatus() const; + + /** + * Use this for oplog fetcher shutdown callback. + */ + void operator()(const Status& status); + +private: + Status _status = executor::TaskExecutorTest::getDetectableErrorStatus(); +}; + +ShutdownState::ShutdownState() = default; + +Status ShutdownState::getStatus() const { + return _status; +} + +void ShutdownState::operator()(const Status& status) { + _status = status; +} + +class OplogFetcherTest : public executor::ThreadPoolExecutorTest, + public ScopedGlobalServiceContextForTest { protected: static const OpTime remoteNewerOpTime; static const OpTime staleOpTime; @@ -1200,20 +283,18 @@ protected: void setUp() override; - std::unique_ptr<NewOplogFetcher> makeOplogFetcher(); - std::unique_ptr<NewOplogFetcher> makeOplogFetcherWithDifferentExecutor( + std::unique_ptr<OplogFetcher> makeOplogFetcher(); + std::unique_ptr<OplogFetcher> makeOplogFetcherWithDifferentExecutor( executor::TaskExecutor* executor, - NewOplogFetcher::OnShutdownCallbackFn fn, + OplogFetcher::OnShutdownCallbackFn fn, int numRestarts = 0, bool requireFresherSyncSource = true, - NewOplogFetcher::StartingPoint startingPoint = - NewOplogFetcher::StartingPoint::kSkipFirstDoc); - std::unique_ptr<NewOplogFetcher> getOplogFetcherAfterConnectionCreated( - NewOplogFetcher::OnShutdownCallbackFn fn, + OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc); + std::unique_ptr<OplogFetcher> getOplogFetcherAfterConnectionCreated( + OplogFetcher::OnShutdownCallbackFn fn, int numRestarts = 0, bool requireFresherSyncSource = true, - NewOplogFetcher::StartingPoint startingPoint = - NewOplogFetcher::StartingPoint::kSkipFirstDoc); + OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc); std::unique_ptr<ShutdownState> processSingleBatch(const Message& response, bool shouldShutdown = false, @@ -1226,13 +307,13 @@ protected: void testSyncSourceChecking(boost::optional<const rpc::ReplSetMetadata&> replMetadata, boost::optional<const rpc::OplogQueryMetadata&> oqMetadata); - void validateLastBatch(bool skipFirstDoc, NewOplogFetcher::Documents docs, OpTime lastFetched); + void validateLastBatch(bool skipFirstDoc, OplogFetcher::Documents docs, OpTime lastFetched); std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState; - NewOplogFetcher::Documents lastEnqueuedDocuments; - NewOplogFetcher::DocumentsInfo lastEnqueuedDocumentsInfo; - NewOplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn; + OplogFetcher::Documents lastEnqueuedDocuments; + OplogFetcher::DocumentsInfo lastEnqueuedDocumentsInfo; + OplogFetcher::EnqueueDocumentsFn enqueueDocumentsFn; // The last OpTime fetched by the oplog fetcher. OpTime lastFetched; @@ -1240,20 +321,20 @@ protected: std::unique_ptr<MockRemoteDBServer> _mockServer; }; -const int NewOplogFetcherTest::rbid; -const OpTime NewOplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(1000, 1), 2); -const rpc::OplogQueryMetadata NewOplogFetcherTest::oqMetadata = rpc::OplogQueryMetadata( +const int OplogFetcherTest::rbid; +const OpTime OplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(1000, 1), 2); +const rpc::OplogQueryMetadata OplogFetcherTest::oqMetadata = rpc::OplogQueryMetadata( {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, primaryIndex, syncSourceIndex); -const OpTime NewOplogFetcherTest::staleOpTime = OpTime(Timestamp(1, 1), 0); -const Date_t NewOplogFetcherTest::staleWallTime = Date_t() + Seconds(staleOpTime.getSecs()); -const rpc::OplogQueryMetadata NewOplogFetcherTest::staleOqMetadata = rpc::OplogQueryMetadata( +const OpTime OplogFetcherTest::staleOpTime = OpTime(Timestamp(1, 1), 0); +const Date_t OplogFetcherTest::staleWallTime = Date_t() + Seconds(staleOpTime.getSecs()); +const rpc::OplogQueryMetadata OplogFetcherTest::staleOqMetadata = rpc::OplogQueryMetadata( {staleOpTime, staleWallTime}, staleOpTime, rbid, primaryIndex, syncSourceIndex); -const rpc::ReplSetMetadata NewOplogFetcherTest::replSetMetadata = +const rpc::ReplSetMetadata OplogFetcherTest::replSetMetadata = rpc::ReplSetMetadata(1, OpTimeAndWallTime(), OpTime(), 1, OID(), primaryIndex, syncSourceIndex); -void NewOplogFetcherTest::setUp() { +void OplogFetcherTest::setUp() { executor::ThreadPoolExecutorTest::setUp(); launchExecutorThread(); @@ -1263,9 +344,9 @@ void NewOplogFetcherTest::setUp() { dataReplicatorExternalState->currentTerm = lastFetched.getTerm(); dataReplicatorExternalState->lastCommittedOpTime = {{9999, 0}, lastFetched.getTerm()}; - enqueueDocumentsFn = [this](NewOplogFetcher::Documents::const_iterator begin, - NewOplogFetcher::Documents::const_iterator end, - const NewOplogFetcher::DocumentsInfo& info) -> Status { + enqueueDocumentsFn = [this](OplogFetcher::Documents::const_iterator begin, + OplogFetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info) -> Status { lastEnqueuedDocuments = {begin, end}; lastEnqueuedDocumentsInfo = info; return Status::OK(); @@ -1279,15 +360,15 @@ void NewOplogFetcherTest::setUp() { oplogFetcherUsesExhaust = true; } -std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcher() { +std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcher() { return makeOplogFetcherWithDifferentExecutor(&getExecutor(), [](Status) {}); } -std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::getOplogFetcherAfterConnectionCreated( - NewOplogFetcher::OnShutdownCallbackFn fn, +std::unique_ptr<OplogFetcher> OplogFetcherTest::getOplogFetcherAfterConnectionCreated( + OplogFetcher::OnShutdownCallbackFn fn, int numRestarts, bool requireFresherSyncSource, - NewOplogFetcher::StartingPoint startingPoint) { + OplogFetcher::StartingPoint startingPoint) { auto oplogFetcher = makeOplogFetcherWithDifferentExecutor( &getExecutor(), fn, numRestarts, requireFresherSyncSource, startingPoint); @@ -1305,18 +386,18 @@ std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::getOplogFetcherAfterConnec return oplogFetcher; } -std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDifferentExecutor( +std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExecutor( executor::TaskExecutor* executor, - NewOplogFetcher::OnShutdownCallbackFn fn, + OplogFetcher::OnShutdownCallbackFn fn, int numRestarts, bool requireFresherSyncSource, - NewOplogFetcher::StartingPoint startingPoint) { - auto oplogFetcher = std::make_unique<NewOplogFetcher>( + OplogFetcher::StartingPoint startingPoint) { + auto oplogFetcher = std::make_unique<OplogFetcher>( executor, lastFetched, source, _createConfig(), - std::make_unique<NewOplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts), + std::make_unique<OplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts), rbid, requireFresherSyncSource, dataReplicatorExternalState.get(), @@ -1332,11 +413,10 @@ std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDiffer return oplogFetcher; } -std::unique_ptr<ShutdownState> NewOplogFetcherTest::processSingleBatch( - const Message& response, - bool shouldShutdown, - bool requireFresherSyncSource, - bool lastFetchedShouldAdvance) { +std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(const Message& response, + bool shouldShutdown, + bool requireFresherSyncSource, + bool lastFetchedShouldAdvance) { auto shutdownState = std::make_unique<ShutdownState>(); // Create an oplog fetcher with no retries. @@ -1366,7 +446,7 @@ std::unique_ptr<ShutdownState> NewOplogFetcherTest::processSingleBatch( return shutdownState; } -void NewOplogFetcherTest::testSyncSourceChecking( +void OplogFetcherTest::testSyncSourceChecking( boost::optional<const rpc::ReplSetMetadata&> replMetadata, boost::optional<const rpc::OplogQueryMetadata&> oqMetadata) { auto firstEntry = makeNoopOplogEntry(lastFetched); @@ -1383,9 +463,9 @@ void NewOplogFetcherTest::testSyncSourceChecking( ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus()); } -void NewOplogFetcherTest::validateLastBatch(bool skipFirstDoc, - NewOplogFetcher::Documents docs, - OpTime lastFetched) { +void OplogFetcherTest::validateLastBatch(bool skipFirstDoc, + OplogFetcher::Documents docs, + OpTime lastFetched) { auto docs_iter = docs.begin(); auto enqueue_iter = lastEnqueuedDocuments.begin(); @@ -1403,7 +483,7 @@ void NewOplogFetcherTest::validateLastBatch(bool skipFirstDoc, ASSERT_EQUALS(docs.back()["ts"].timestamp(), lastFetched.getTimestamp()); } -TEST_F(NewOplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) { +TEST_F(OplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) { getExecutor().shutdown(); auto oplogFetcher = makeOplogFetcher(); @@ -1419,7 +499,7 @@ TEST_F(NewOplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromSta ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest()); } -TEST_F(NewOplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToScheduleRunQuery) { +TEST_F(OplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToScheduleRunQuery) { TaskExecutorMock taskExecutorMock(&getExecutor()); taskExecutorMock.shouldFailScheduleWorkRequest = []() { return true; }; @@ -1438,7 +518,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToS ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest()); } -TEST_F(NewOplogFetcherTest, ShuttingExecutorDownAfterStartupButBeforeRunQueryScheduled) { +TEST_F(OplogFetcherTest, ShuttingExecutorDownAfterStartupButBeforeRunQueryScheduled) { ShutdownState shutdownState; // Defer scheduling work so that the executor's shutdown happens before startup's work is @@ -1460,7 +540,7 @@ TEST_F(NewOplogFetcherTest, ShuttingExecutorDownAfterStartupButBeforeRunQuerySch ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeRunQueryScheduled) { +TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeRunQueryScheduled) { ShutdownState shutdownState; // Defer scheduling work so that the oplog fetcher's shutdown happens before startup's work is @@ -1482,7 +562,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeR ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterRunQueryScheduled) { +TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterRunQueryScheduled) { // Tests shutting down after _runQuery is scheduled (but not while blocked on the network). ShutdownState shutdownState; @@ -1507,7 +587,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterRu ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, OplogFetcherReturnsHostUnreachableIfShutdownAfterRunQueryScheduledWhileBlockedOnCall) { // Tests that shutting down while the connection is blocked on call successfully shuts down the // connection as well. @@ -1531,7 +611,7 @@ TEST_F(NewOplogFetcherTest, ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterGettingBatchBeforeProcessing) { // Tests shutting down after getting the first batch, but before enqueuing it. @@ -1596,7 +676,7 @@ public: } }; -TEST_F(NewOplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFnOnCompletion) { +TEST_F(OplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFnOnCompletion) { auto sharedCallbackData = std::make_shared<SharedCallbackState>(); auto callbackInvoked = false; auto status = getDetectableErrorStatus(); @@ -1630,7 +710,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFnOnCompletion) ASSERT_TRUE(sharedCallbackStateDestroyedSoon()); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, FindQueryContainsTermIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) { // Test that the correct maxTimeMS is set if this is the initial 'find' query. auto oplogFetcher = makeOplogFetcher(); @@ -1649,7 +729,7 @@ TEST_F(NewOplogFetcherTest, ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, queryObj["term"].numberLong()); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, FindQueryDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm) { dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm; auto oplogFetcher = makeOplogFetcher(); @@ -1671,7 +751,7 @@ TEST_F(NewOplogFetcherTest, } TEST_F( - NewOplogFetcherTest, + OplogFetcherTest, GetMoreQueryDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm) { dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm; @@ -1705,13 +785,13 @@ TEST_F( ASSERT_OK(shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeout) { +TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeout) { auto config = _createConfig(); auto timeout = makeOplogFetcher()->getAwaitDataTimeout_forTest(); ASSERT_EQUALS(config.getElectionTimeoutPeriod() / 2, timeout); } -TEST_F(NewOplogFetcherTest, AwaitDataTimeoutSmallerWhenFailPointSet) { +TEST_F(OplogFetcherTest, AwaitDataTimeoutSmallerWhenFailPointSet) { auto failPoint = globalFailPointRegistry().find("setSmallOplogGetMoreMaxTimeMS"); failPoint->setMode(FailPoint::alwaysOn); auto timeout = makeOplogFetcher()->getAwaitDataTimeout_forTest(); @@ -1719,7 +799,7 @@ TEST_F(NewOplogFetcherTest, AwaitDataTimeoutSmallerWhenFailPointSet) { failPoint->setMode(FailPoint::off); } -TEST_F(NewOplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher) { +TEST_F(OplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher) { CursorId cursorId = 22LL; auto entry = makeNoopOplogEntry(lastFetched); auto metadataObj = @@ -1728,7 +808,7 @@ TEST_F(NewOplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); } -TEST_F(NewOplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher) { +TEST_F(OplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher) { CursorId cursorId = 22LL; auto entry = makeNoopOplogEntry(lastFetched); auto metadataObj = @@ -1737,7 +817,7 @@ TEST_F(NewOplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetc processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); } -DEATH_TEST_F(NewOplogFetcherTest, +DEATH_TEST_F(OplogFetcherTest, ValidMetadataInResponseWithoutOplogMetadataInvariants, "Invariant failure oqMetadata") { CursorId cursorId = 22LL; @@ -1747,7 +827,7 @@ DEATH_TEST_F(NewOplogFetcherTest, processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj)); } -TEST_F(NewOplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) { +TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) { CursorId cursorId = 0LL; auto entry = makeNoopOplogEntry(lastFetched); auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata); @@ -1760,7 +840,7 @@ TEST_F(NewOplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProces dataReplicatorExternalState->oqMetadataProcessed.getPrimaryIndex()); } -TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) { +TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) { CursorId cursorId = 22LL; auto entry = makeNoopOplogEntry(lastFetched); @@ -1775,7 +855,7 @@ TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBa ASSERT(lastEnqueuedDocuments.empty()); } -TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) { +TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) { CursorId cursorId = 22LL; auto entry = makeNoopOplogEntry(staleOpTime); auto metadataObj = makeOplogBatchMetadata(replSetMetadata, staleOqMetadata); @@ -1787,7 +867,7 @@ TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehin ASSERT(lastEnqueuedDocuments.empty()); } -TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) { +TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) { CursorId cursorId = 22LL; auto entry = makeNoopOplogEntry(lastFetched); @@ -1802,7 +882,7 @@ TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAh ASSERT(lastEnqueuedDocuments.empty()); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) { CursorId cursorId = 22LL; auto entry = makeNoopOplogEntry(staleOpTime); @@ -1818,7 +898,7 @@ TEST_F(NewOplogFetcherTest, ASSERT(lastEnqueuedDocuments.empty()); } -TEST_F(NewOplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButMetadataIsStale) { +TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButMetadataIsStale) { // This tests the case where the sync source metadata is behind us but we get a document which // is equal to us. Since that means the metadata is stale and can be ignored, we should accept // this sync source. @@ -1834,7 +914,7 @@ TEST_F(NewOplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentB ASSERT(dataReplicatorExternalState->metadataWasProcessed); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) { CursorId cursorId = 0LL; rpc::OplogQueryMetadata oplogQueryMetadata( @@ -1849,7 +929,7 @@ TEST_F(NewOplogFetcherTest, ASSERT(dataReplicatorExternalState->metadataWasProcessed); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) { CursorId cursorId = 22LL; auto metadataObj = makeOplogBatchMetadata(replSetMetadata, boost::none); @@ -1860,7 +940,7 @@ TEST_F(NewOplogFetcherTest, ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); } -TEST_F(NewOplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { +TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { CursorId cursorId = 22LL; auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata); auto entry = makeNoopOplogEntry(Seconds(456)); @@ -1871,7 +951,7 @@ TEST_F(NewOplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); } -TEST_F(NewOplogFetcherTest, EmptyMetadataIsNotProcessed) { +TEST_F(OplogFetcherTest, EmptyMetadataIsNotProcessed) { CursorId cursorId = 0LL; auto entry = makeNoopOplogEntry(lastFetched); @@ -1879,11 +959,11 @@ TEST_F(NewOplogFetcherTest, EmptyMetadataIsNotProcessed) { ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); } -TEST_F(NewOplogFetcherTest, FailingInitialCreateNewCursorNoRetriesShutsDownOplogFetcher) { +TEST_F(OplogFetcherTest, FailingInitialCreateNewCursorNoRetriesShutsDownOplogFetcher) { ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, processSingleBatch(Message())->getStatus()); } -TEST_F(NewOplogFetcherTest, FailingInitialCreateNewCursorWithRetriesShutsDownOplogFetcher) { +TEST_F(OplogFetcherTest, FailingInitialCreateNewCursorWithRetriesShutsDownOplogFetcher) { ShutdownState shutdownState; // Create an oplog fetcher with one retry. @@ -1900,7 +980,7 @@ TEST_F(NewOplogFetcherTest, FailingInitialCreateNewCursorWithRetriesShutsDownOpl ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, NetworkExceptionDuringInitialCreateNewCursorWithRetriesShutsDownOplogFetcher) { ShutdownState shutdownState; @@ -1923,7 +1003,7 @@ TEST_F(NewOplogFetcherTest, ASSERT_EQUALS(ErrorCodes::NetworkTimeout, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, DontRecreateNewCursorAfterFailedBatchNoRetries) { +TEST_F(OplogFetcherTest, DontRecreateNewCursorAfterFailedBatchNoRetries) { ShutdownState shutdownState; // Create an oplog fetcher without any retries. @@ -1961,7 +1041,7 @@ TEST_F(NewOplogFetcherTest, DontRecreateNewCursorAfterFailedBatchNoRetries) { ASSERT_EQUALS(ErrorCodes::NetworkTimeout, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, FailCreateNewCursorAfterFailedBatchRetriesShutsDownOplogFetcher) { +TEST_F(OplogFetcherTest, FailCreateNewCursorAfterFailedBatchRetriesShutsDownOplogFetcher) { ShutdownState shutdownState; // Create an oplog fetcher with one retry. @@ -2002,7 +1082,7 @@ TEST_F(NewOplogFetcherTest, FailCreateNewCursorAfterFailedBatchRetriesShutsDownO ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, SuccessfullyRecreateCursorAfterFailedBatch) { +TEST_F(OplogFetcherTest, SuccessfullyRecreateCursorAfterFailedBatch) { // This tests that the oplog fetcher successfully can recreate a cursor after it failed to get // a batch and makes sure the recreated cursor behaves like an exhaust cursor. This will also // check that the socket timeouts are set as expected. The steps are: @@ -2157,7 +1237,7 @@ TEST_F(NewOplogFetcherTest, SuccessfullyRecreateCursorAfterFailedBatch) { ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, SuccessfulBatchResetsNumRestarts) { +TEST_F(OplogFetcherTest, SuccessfulBatchResetsNumRestarts) { // This tests that the OplogFetcherRestartDecision resets its counter when the oplog fetcher // successfully gets the next batch. The steps are: // 1. Start the oplog fetcher. @@ -2243,7 +1323,7 @@ TEST_F(NewOplogFetcherTest, SuccessfulBatchResetsNumRestarts) { ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, OplogFetcherWorksWithoutExhaust) { +TEST_F(OplogFetcherTest, OplogFetcherWorksWithoutExhaust) { // Test that the oplog fetcher works if the 'oplogFetcherUsesExhaust' server parameter is set to // false. @@ -2327,7 +1407,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherWorksWithoutExhaust) { ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatus) { +TEST_F(OplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatus) { ShutdownState shutdownState; // Create an oplog fetcher with one retry. @@ -2359,14 +1439,13 @@ TEST_F(NewOplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatu ASSERT_OK(shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingError) { +TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingError) { CursorId cursorId = 22LL; ASSERT_EQUALS(ErrorCodes::OplogStartMissing, processSingleBatch(makeFirstBatch(cursorId, {}, {}))->getStatus()); } -TEST_F(NewOplogFetcherTest, - MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) { +TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) { CursorId cursorId = 22LL; auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); ASSERT_EQUALS( @@ -2375,7 +1454,7 @@ TEST_F(NewOplogFetcherTest, } TEST_F( - NewOplogFetcherTest, + OplogFetcherTest, LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) { CursorId cursorId = 22LL; auto entry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); @@ -2384,7 +1463,7 @@ TEST_F( processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) { CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); @@ -2399,8 +1478,7 @@ TEST_F(NewOplogFetcherTest, ->getStatus()); } -TEST_F(NewOplogFetcherTest, - TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) { +TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) { CursorId cursorId = 22LL; auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, @@ -2413,8 +1491,7 @@ TEST_F(NewOplogFetcherTest, ->getStatus()); } -TEST_F(NewOplogFetcherTest, - OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) { +TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) { CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); @@ -2445,14 +1522,14 @@ TEST_F(NewOplogFetcherTest, ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState->getStatus()); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, OplogFetcherShouldNotDuplicateFirstDocWithEnqueueFirstDocOnErrorAfterFirstDoc) { // This function verifies that every oplog entry is only enqueued once. OpTime lastEnqueuedOpTime = OpTime(); - enqueueDocumentsFn = [&lastEnqueuedOpTime](NewOplogFetcher::Documents::const_iterator begin, - NewOplogFetcher::Documents::const_iterator end, - const NewOplogFetcher::DocumentsInfo&) -> Status { + enqueueDocumentsFn = [&lastEnqueuedOpTime](OplogFetcher::Documents::const_iterator begin, + OplogFetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo&) -> Status { auto count = 0; auto toEnqueueOpTime = OpTime(); @@ -2475,7 +1552,7 @@ TEST_F(NewOplogFetcherTest, getOplogFetcherAfterConnectionCreated(std::ref(*shutdownState), 1, true /* requireFresherSyncSource */, - NewOplogFetcher::StartingPoint::kEnqueueFirstDoc); + OplogFetcher::StartingPoint::kEnqueueFirstDoc); CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()}); @@ -2519,14 +1596,14 @@ TEST_F(NewOplogFetcherTest, ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState->getStatus()); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, OplogFetcherShouldNotDuplicateFirstDocWithEnqueueFirstDocOnErrorAfterSecondDoc) { // This function verifies that every oplog entry is only enqueued once. OpTime lastEnqueuedOpTime = OpTime(); - enqueueDocumentsFn = [&lastEnqueuedOpTime](NewOplogFetcher::Documents::const_iterator begin, - NewOplogFetcher::Documents::const_iterator end, - const NewOplogFetcher::DocumentsInfo&) -> Status { + enqueueDocumentsFn = [&lastEnqueuedOpTime](OplogFetcher::Documents::const_iterator begin, + OplogFetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo&) -> Status { auto count = 0; auto toEnqueueOpTime = OpTime(); @@ -2549,7 +1626,7 @@ TEST_F(NewOplogFetcherTest, getOplogFetcherAfterConnectionCreated(std::ref(*shutdownState), 1, true /* requireFresherSyncSource */, - NewOplogFetcher::StartingPoint::kEnqueueFirstDoc); + OplogFetcher::StartingPoint::kEnqueueFirstDoc); CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()}); @@ -2594,15 +1671,15 @@ TEST_F(NewOplogFetcherTest, ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState->getStatus()); } -TEST_F(NewOplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromEnqueueDocumentsFn) { +TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromEnqueueDocumentsFn) { CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); - enqueueDocumentsFn = [](NewOplogFetcher::Documents::const_iterator, - NewOplogFetcher::Documents::const_iterator, - const NewOplogFetcher::DocumentsInfo&) -> Status { + enqueueDocumentsFn = [](OplogFetcher::Documents::const_iterator, + OplogFetcher::Documents::const_iterator, + const OplogFetcher::DocumentsInfo&) -> Status { return Status(ErrorCodes::InternalError, "my custom error"); }; @@ -2611,7 +1688,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromEnqueueDocum ASSERT_EQ(Status(ErrorCodes::InternalError, "my custom error"), shutdownState->getStatus()); } -TEST_F(NewOplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) { +TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) { testSyncSourceChecking(boost::none, boost::none); // Sync source optime and "hasSyncSource" are not available if the response does not @@ -2621,7 +1698,7 @@ TEST_F(NewOplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFet ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); } -TEST_F(NewOplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) { +TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) { testSyncSourceChecking(replSetMetadata, oqMetadata); // Sync source optime and "hasSyncSource" can be set if the respone contains metadata. @@ -2630,7 +1707,7 @@ TEST_F(NewOplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogF ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) { rpc::OplogQueryMetadata oplogQueryMetadata( {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, primaryIndex, -1); @@ -2643,13 +1720,13 @@ TEST_F(NewOplogFetcherTest, ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); } -TEST_F(NewOplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) { +TEST_F(OplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) { auto firstEntry = makeNoopOplogEntry(Seconds(123)); auto secondEntry = BSON("o" << BSON("msg" << "oplog entry without optime")); ASSERT_EQUALS(ErrorCodes::NoSuchKey, - NewOplogFetcher::validateDocuments( + OplogFetcher::validateDocuments( {firstEntry, secondEntry}, true, unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()) @@ -2657,40 +1734,38 @@ TEST_F(NewOplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFou } TEST_F( - NewOplogFetcherTest, + OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInFirstEntryIsEqualToLastTimestampAndNotProcessingFirstBatch) { auto firstEntry = makeNoopOplogEntry(Seconds(123)); auto secondEntry = makeNoopOplogEntry(Seconds(456)); ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, - NewOplogFetcher::validateDocuments( + OplogFetcher::validateDocuments( {firstEntry, secondEntry}, false, unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()) .getStatus()); } -TEST_F(NewOplogFetcherTest, - ValidateDocumentsReturnsOutOfOrderIfTimestampInSecondEntryIsBeforeFirst) { +TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInSecondEntryIsBeforeFirst) { auto firstEntry = makeNoopOplogEntry(Seconds(456)); auto secondEntry = makeNoopOplogEntry(Seconds(123)); ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, - NewOplogFetcher::validateDocuments( + OplogFetcher::validateDocuments( {firstEntry, secondEntry}, true, unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()) .getStatus()); } -TEST_F(NewOplogFetcherTest, - ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEntryIsBeforeSecond) { +TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEntryIsBeforeSecond) { auto firstEntry = makeNoopOplogEntry(Seconds(123)); auto secondEntry = makeNoopOplogEntry(Seconds(789)); auto thirdEntry = makeNoopOplogEntry(Seconds(456)); ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, - NewOplogFetcher::validateDocuments( + OplogFetcher::validateDocuments( {firstEntry, secondEntry, thirdEntry}, true, unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp()) @@ -2698,17 +1773,17 @@ TEST_F(NewOplogFetcherTest, } TEST_F( - NewOplogFetcherTest, + OplogFetcherTest, ValidateDocumentsExcludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatchAndSkipFirstDoc) { auto firstEntry = makeNoopOplogEntry(Seconds(123)); auto secondEntry = makeNoopOplogEntry(Seconds(456)); auto thirdEntry = makeNoopOplogEntry(Seconds(789)); - auto info = unittest::assertGet(NewOplogFetcher::validateDocuments( + auto info = unittest::assertGet(OplogFetcher::validateDocuments( {firstEntry, secondEntry, thirdEntry}, true, unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp(), - mongo::repl::NewOplogFetcher::StartingPoint::kSkipFirstDoc)); + mongo::repl::OplogFetcher::StartingPoint::kSkipFirstDoc)); ASSERT_EQUALS(3U, info.networkDocumentCount); ASSERT_EQUALS(2U, info.toApplyDocumentCount); @@ -2720,17 +1795,17 @@ TEST_F( } TEST_F( - NewOplogFetcherTest, + OplogFetcherTest, ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatchAndEnqueueFirstDoc) { auto firstEntry = makeNoopOplogEntry(Seconds(123)); auto secondEntry = makeNoopOplogEntry(Seconds(456)); auto thirdEntry = makeNoopOplogEntry(Seconds(789)); - auto info = unittest::assertGet(NewOplogFetcher::validateDocuments( + auto info = unittest::assertGet(OplogFetcher::validateDocuments( {firstEntry, secondEntry, thirdEntry}, true, unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp(), - mongo::repl::NewOplogFetcher::StartingPoint::kEnqueueFirstDoc)); + mongo::repl::OplogFetcher::StartingPoint::kEnqueueFirstDoc)); ASSERT_EQUALS(3U, info.networkDocumentCount); ASSERT_EQUALS(3U, info.toApplyDocumentCount); @@ -2742,13 +1817,13 @@ TEST_F( ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfNotProcessingFirstBatch) { auto firstEntry = makeNoopOplogEntry(Seconds(123)); auto secondEntry = makeNoopOplogEntry(Seconds(456)); auto thirdEntry = makeNoopOplogEntry(Seconds(789)); - auto info = unittest::assertGet(NewOplogFetcher::validateDocuments( + auto info = unittest::assertGet(OplogFetcher::validateDocuments( {firstEntry, secondEntry, thirdEntry}, false, Timestamp(Seconds(100), 0))); ASSERT_EQUALS(3U, info.networkDocumentCount); @@ -2761,11 +1836,11 @@ TEST_F(NewOplogFetcherTest, ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, ValidateDocumentsReturnsDefaultLastDocumentOpTimeWhenThereAreNoDocumentsToApply) { auto firstEntry = makeNoopOplogEntry(Seconds(123)); - auto info = unittest::assertGet(NewOplogFetcher::validateDocuments( + auto info = unittest::assertGet(OplogFetcher::validateDocuments( {firstEntry}, true, unittest::assertGet(OpTime::parseFromOplogEntry(firstEntry)).getTimestamp())); @@ -2779,17 +1854,17 @@ TEST_F(NewOplogFetcherTest, ASSERT_EQUALS(OpTime(), info.lastDocument); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOplogStartMissingWhenThereAreNoDocumentsWhenProcessingFirstBatch) { ASSERT_EQUALS( ErrorCodes::OplogStartMissing, - NewOplogFetcher::validateDocuments({}, true, Timestamp(Seconds(100), 0)).getStatus()); + OplogFetcher::validateDocuments({}, true, Timestamp(Seconds(100), 0)).getStatus()); } -TEST_F(NewOplogFetcherTest, +TEST_F(OplogFetcherTest, ValidateDocumentsReturnsDefaultInfoWhenThereAreNoDocumentsWhenNotProcessingFirstBatch) { - auto info = unittest::assertGet( - NewOplogFetcher::validateDocuments({}, false, Timestamp(Seconds(100), 0))); + auto info = + unittest::assertGet(OplogFetcher::validateDocuments({}, false, Timestamp(Seconds(100), 0))); ASSERT_EQUALS(0U, info.networkDocumentCount); ASSERT_EQUALS(0U, info.networkDocumentBytes); @@ -2800,7 +1875,7 @@ TEST_F(NewOplogFetcherTest, ASSERT_EQUALS(OpTime(), info.lastDocument); } -TEST_F(NewOplogFetcherTest, OplogFetcherReturnsHostUnreachableOnConnectionFailures) { +TEST_F(OplogFetcherTest, OplogFetcherReturnsHostUnreachableOnConnectionFailures) { // Test that OplogFetcher fails to establish initial connection, retrying HostUnreachable. ShutdownState shutdownState; @@ -2816,7 +1891,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherReturnsHostUnreachableOnConnectionFailur ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, OplogFetcherRetriesConnectionButFails) { +TEST_F(OplogFetcherTest, OplogFetcherRetriesConnectionButFails) { // Test that OplogFetcher tries but fails after failing the initial connection, retrying // HostUnreachable. ShutdownState shutdownState; @@ -2834,7 +1909,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherRetriesConnectionButFails) { ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeReconnect) { +TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeReconnect) { // Test that OplogFetcher returns CallbackCanceled error if it is shut down after failing the // initial connection but before it retries the connection. ShutdownState shutdownState; @@ -2864,7 +1939,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeR ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, OplogFetcherResetsNumRestartsOnSuccessfulConnection) { +TEST_F(OplogFetcherTest, OplogFetcherResetsNumRestartsOnSuccessfulConnection) { // Test that OplogFetcher resets the number of restarts after a successful connection on a // retry. ShutdownState shutdownState; @@ -2909,7 +1984,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherResetsNumRestartsOnSuccessfulConnection) ASSERT_OK(shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, OplogFetcherCanAutoReconnect) { +TEST_F(OplogFetcherTest, OplogFetcherCanAutoReconnect) { // Test that the OplogFetcher can autoreconnect after a broken connection. ShutdownState shutdownState; @@ -2932,7 +2007,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherCanAutoReconnect) { ASSERT_OK(shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, OplogFetcherAutoReconnectsButFails) { +TEST_F(OplogFetcherTest, OplogFetcherAutoReconnectsButFails) { // Test that the OplogFetcher fails an autoreconnect after a broken connection. ShutdownState shutdownState; @@ -2961,7 +2036,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherAutoReconnectsButFails) { ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, DisconnectsOnErrorsDuringExhaustStream) { +TEST_F(OplogFetcherTest, DisconnectsOnErrorsDuringExhaustStream) { // Test that the connection disconnects if we get errors after successfully receiving a batch // from the exhaust stream. ShutdownState shutdownState; @@ -3010,7 +2085,7 @@ TEST_F(NewOplogFetcherTest, DisconnectsOnErrorsDuringExhaustStream) { ASSERT_OK(shutdownState.getStatus()); } -TEST_F(NewOplogFetcherTest, GetMoreEmptyBatch) { +TEST_F(OplogFetcherTest, GetMoreEmptyBatch) { ShutdownState shutdownState; // Create an oplog fetcher without any retries. diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index bdb90237d0b..2ac6a02f4b2 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -77,8 +77,7 @@ server_parameters: default: expr: (16 * 1024 * 1024) / 12 * 10 - # TODO SERVER-45574: change to oplog_fetcher.cpp - # From abstract_oplog_fetcher.cpp + # From oplog_fetcher.cpp oplogInitialFindMaxSeconds: description: >- Number of seconds for the `maxTimeMS` on the initial `find` command. @@ -181,7 +180,7 @@ server_parameters: # From collection_bulk_loader_impl.cpp collectionBulkLoaderBatchSizeInBytes: description: >- - Limit for the number of bytes of data inserted per storage transaction + Limit for the number of bytes of data inserted per storage transaction (WriteUnitOfWork) by collectionBulkLoader during initial sync collection cloning set_at: startup cpp_vartype: int |