diff options
author | Daniel Alabi <alabidan@gmail.com> | 2015-06-09 17:43:09 -0400 |
---|---|---|
committer | Daniel Alabi <alabidan@gmail.com> | 2015-06-11 16:51:13 -0400 |
commit | dfea887c3b0eab7fec881ef9bb6d300566ab669f (patch) | |
tree | 82f8b71e84af5eda8273eaf448ea3f9324442df7 /src/mongo/db/repl | |
parent | 0d403de0d525237ea3fa2aee63117080ca357591 (diff) | |
download | mongo-dfea887c3b0eab7fec881ef9bb6d300566ab669f.tar.gz |
SERVER-18901 Move Fetcher and QueryFetcher to mongo/client/
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/SConscript | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 80 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/database_cloner.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/fetcher.cpp | 301 | ||||
-rw-r--r-- | src/mongo/db/repl/fetcher.h | 189 | ||||
-rw-r--r-- | src/mongo/db/repl/fetcher_test.cpp | 627 |
9 files changed, 10 insertions, 1224 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d899720f731..98fa6f5caad 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -423,19 +423,6 @@ env.Library( ], ) -env.Library( - target='fetcher', - source=[ - 'fetcher.cpp', - ], - LIBDEPS=[ - 'replication_executor', - '$BUILD_DIR/mongo/logger/logger', - '$BUILD_DIR/mongo/db/namespace_string', - '$BUILD_DIR/mongo/rpc/command_status', - ], -) - env.CppUnitTest( target='reporter_test', source='reporter_test.cpp', @@ -445,15 +432,6 @@ env.CppUnitTest( ], ) -env.CppUnitTest( - target='fetcher_test', - source='fetcher_test.cpp', - LIBDEPS=[ - 'fetcher', - 'replication_executor_test_fixture', - ], -) - env.Library( target='base_cloner_test_fixture', source=[ @@ -470,9 +448,9 @@ env.Library( 'collection_cloner.cpp', ], LIBDEPS=[ - 'fetcher', 'replication_executor', '$BUILD_DIR/mongo/db/catalog/collection_options', + '$BUILD_DIR/mongo/client/fetcher', '$BUILD_DIR/mongo/logger/logger', ], ) @@ -611,8 +589,8 @@ env.Library( 'applier', 'collection_cloner', 'database_cloner', - 'fetcher', 'repl_coordinator_interface', + '$BUILD_DIR/mongo/client/fetcher', ], ) diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 37dc718195b..19ff6850163 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -35,10 +35,10 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" +#include "mongo/client/fetcher.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/base_cloner.h" -#include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/condition_variable.h" diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 8e71942ec54..d0caa1eebd6 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -37,7 +37,9 @@ #include <thread> #include "mongo/base/status.h" +#include "mongo/client/query_fetcher.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner.h" @@ -92,43 +94,6 @@ namespace { } // namespace /** - * Follows the fetcher pattern for a find+getmore - */ - class QueryFetcher { - MONGO_DISALLOW_COPYING(QueryFetcher); - public: - using CallbackFn = stdx::function<void (const BatchDataStatus&, NextAction*)>; - - QueryFetcher(ReplicationExecutor* exec, - const HostAndPort& source, - const NamespaceString& nss, - const BSONObj& cmdBSON, - const QueryFetcher::CallbackFn& onBatchAvailable); - virtual ~QueryFetcher() = default; - - bool isActive() const { return _fetcher.isActive(); } - Status schedule() { return _fetcher.schedule(); } - void cancel() { return _fetcher.cancel(); } - void wait() { if (_fetcher.isActive()) _fetcher.wait(); } - std::string toString() const; - - protected: - void _onFetchCallback(const BatchDataStatus& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob); - - virtual void _delegateCallback(const BatchDataStatus& fetchResult, - NextAction* nextAction) { - _work(fetchResult, nextAction); - }; - - ReplicationExecutor* _exec; - Fetcher _fetcher; - int _responses; - const QueryFetcher::CallbackFn _work; - }; - - /** * Follows the fetcher pattern for a find+getmore on an oplog * Returns additional errors if the start oplog entry cannot be found. */ @@ -156,47 +121,6 @@ namespace { const Timestamp _startTS; }; - // QueryFetcher - QueryFetcher::QueryFetcher(ReplicationExecutor* exec, - const HostAndPort& src, - const NamespaceString& nss, - const BSONObj& cmdBSON, - const CallbackFn& work) - : _exec(exec), - _fetcher(exec, - src, - nss.db().toString(), - cmdBSON, - stdx::bind(&QueryFetcher::_onFetchCallback, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3)), - _responses(0), - _work(work) { - } - - void QueryFetcher::_onFetchCallback(const BatchDataStatus& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { - ++_responses; - - _delegateCallback(fetchResult, nextAction); - // The fetcher will continue to call with kGetMore until an error or the last batch. - if (fetchResult.isOK() && *nextAction == NextAction::kGetMore) { - const auto batchData(fetchResult.getValue()); - invariant(getMoreBob); - getMoreBob->append("getMore", batchData.cursorId); - getMoreBob->append("collection", batchData.nss.coll()); - } - } - - std::string QueryFetcher::toString() const { - return str::stream() << "QueryFetcher -" - << " responses: " << _responses - << " fetcher: " << _fetcher.getDiagnosticString(); - } - // OplogFetcher OplogFetcher::OplogFetcher(ReplicationExecutor* exec, const Timestamp& startTS, diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index 59e242dfc0d..a01e0c61b19 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -41,7 +41,6 @@ #include "mongo/db/repl/applier.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner.h" -#include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/reporter.h" @@ -49,6 +48,9 @@ #include "mongo/util/queue.h" namespace mongo { + +class QueryFetcher; + namespace repl { using Operations = Applier::Operations; @@ -65,7 +67,6 @@ using Response = RemoteCommandResponse; using TimestampStatus = StatusWith<Timestamp>; using UniqueLock = stdx::unique_lock<stdx::mutex>; -class QueryFetcher; class OplogFetcher; struct InitialSyncState; diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 854829ef113..f10fb2514fa 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -32,10 +32,10 @@ #include <memory> +#include "mongo/client/fetcher.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/data_replicator.h" -#include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replica_set_config.h" diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h index 447d7ce07fd..071b33d0e96 100644 --- a/src/mongo/db/repl/database_cloner.h +++ b/src/mongo/db/repl/database_cloner.h @@ -35,10 +35,10 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" +#include "mongo/client/fetcher.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/base_cloner.h" -#include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" diff --git a/src/mongo/db/repl/fetcher.cpp b/src/mongo/db/repl/fetcher.cpp deleted file mode 100644 index cfe8402057d..00000000000 --- a/src/mongo/db/repl/fetcher.cpp +++ /dev/null @@ -1,301 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/fetcher.h" - -#include "mongo/db/jsobj.h" -#include "mongo/db/namespace_string.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/mongoutils/str.h" - -namespace mongo { -namespace repl { - -namespace { - - const char* kCursorFieldName = "cursor"; - const char* kCursorIdFieldName = "id"; - const char* kNamespaceFieldName = "ns"; - - const char* kFirstBatchFieldName = "firstBatch"; - const char* kNextBatchFieldName = "nextBatch"; - - /** - * Parses cursor response in command result for cursor ID, namespace and documents. - * 'batchFieldName' will be 'firstBatch' for the initial remote command invocation and - * 'nextBatch' for getMore. - */ - Status parseCursorResponse(const BSONObj& obj, - const std::string& batchFieldName, - Fetcher::BatchData* batchData) { - invariant(batchFieldName == kFirstBatchFieldName || batchFieldName == kNextBatchFieldName); - invariant(batchData); - - BSONElement cursorElement = obj.getField(kCursorFieldName); - if (cursorElement.eoo()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "cursor response must contain '" << kCursorFieldName << - "' field: " << obj); - } - if (!cursorElement.isABSONObj()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "'" << kCursorFieldName << "' field must be an object: " << obj); - } - BSONObj cursorObj = cursorElement.Obj(); - - BSONElement cursorIdElement = cursorObj.getField(kCursorIdFieldName); - if (cursorIdElement.eoo()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "cursor response must contain '" << kCursorFieldName << "." << - kCursorIdFieldName << "' field: " << obj); - } - if (!(cursorIdElement.type() == mongo::NumberLong || - cursorIdElement.type() == mongo::NumberInt)) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "'" << kCursorFieldName << "." << kCursorIdFieldName << - "' field must be a integral number of type 'int' or 'long' but was a '" - << typeName(cursorIdElement.type()) << "': " << obj); - } - batchData->cursorId = cursorIdElement.numberLong(); - - BSONElement namespaceElement = cursorObj.getField(kNamespaceFieldName); - if (namespaceElement.eoo()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "cursor response must contain " << - "'" << kCursorFieldName << "." << kNamespaceFieldName << "' field: " << - obj); - } - if (namespaceElement.type() != mongo::String) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "'" << kCursorFieldName << "." << kNamespaceFieldName << - "' field must be a string: " << obj); - } - NamespaceString tempNss(namespaceElement.valuestrsafe()); - if (!tempNss.isValid()) { - return Status(ErrorCodes::BadValue, str::stream() << - "'" << kCursorFieldName << "." << kNamespaceFieldName << - "' contains an invalid namespace: " << obj); - } - batchData->nss = tempNss; - - BSONElement batchElement = cursorObj.getField(batchFieldName); - if (batchElement.eoo()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "cursor response must contain '" << kCursorFieldName << "." << - batchFieldName << "' field: " << obj); - } - if (!batchElement.isABSONObj()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "'" << kCursorFieldName << "." << batchFieldName << - "' field must be an array: " << obj); - } - BSONObj batchObj = batchElement.Obj(); - for (auto itemElement : batchObj) { - if (!itemElement.isABSONObj()) { - return Status(ErrorCodes::FailedToParse, str::stream() << - "found non-object " << itemElement << " in " << - "'" << kCursorFieldName << "." << batchFieldName << "' field: " << - obj); - } - batchData->documents.push_back(itemElement.Obj().getOwned()); - } - - return Status::OK(); - } - - Status parseReplResponse(const BSONObj& obj) { - return Status::OK(); - } - -} // namespace - - Fetcher::BatchData::BatchData(CursorId theCursorId, - const NamespaceString& theNss, - Documents theDocuments) - : cursorId(theCursorId), - nss(theNss), - documents(theDocuments) { } - - Fetcher::Fetcher(executor::TaskExecutor* executor, - const HostAndPort& source, - const std::string& dbname, - const BSONObj& findCmdObj, - const CallbackFn& work) - : _executor(executor), - _source(source), - _dbname(dbname), - _cmdObj(findCmdObj.getOwned()), - _work(work), - _active(false), - _remoteCommandCallbackHandle() { - - uassert(ErrorCodes::BadValue, "null replication executor", executor); - uassert(ErrorCodes::BadValue, "database name cannot be empty", !dbname.empty()); - uassert(ErrorCodes::BadValue, "command object cannot be empty", !findCmdObj.isEmpty()); - uassert(ErrorCodes::BadValue, "callback function cannot be null", work); - } - - Fetcher::~Fetcher() { - DESTRUCTOR_GUARD( - cancel(); - wait(); - ); - } - - std::string Fetcher::getDiagnosticString() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - str::stream output; - output << "Fetcher"; - output << " executor: " << _executor->getDiagnosticString(); - output << " source: " << _source.toString(); - output << " database: " << _dbname; - output << " query: " << _cmdObj; - output << " active: " << _active; - return output; - } - - bool Fetcher::isActive() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; - } - - Status Fetcher::schedule() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_active) { - return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled"); - } - return _schedule_inlock(_cmdObj, kFirstBatchFieldName); - } - - void Fetcher::cancel() { - executor::TaskExecutor::CallbackHandle remoteCommandCallbackHandle; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (!_active) { - return; - } - - remoteCommandCallbackHandle = _remoteCommandCallbackHandle; - } - - invariant(remoteCommandCallbackHandle.isValid()); - _executor->cancel(remoteCommandCallbackHandle); - } - - void Fetcher::wait() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _condition.wait(lk, [this]() { return !_active; }); - } - - Status Fetcher::_schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName) { - StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult = - _executor->scheduleRemoteCommand( - RemoteCommandRequest(_source, _dbname, cmdObj), - stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, batchFieldName)); - - if (!scheduleResult.isOK()) { - return scheduleResult.getStatus(); - } - - _active = true; - _remoteCommandCallbackHandle = scheduleResult.getValue(); - return Status::OK(); - } - - void Fetcher::_callback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd, - const char* batchFieldName) { - - if (!rcbd.response.isOK()) { - _work(StatusWith<Fetcher::BatchData>(rcbd.response.getStatus()), nullptr, nullptr); - _finishCallback(); - return; - } - - const BSONObj& queryResponseObj = rcbd.response.getValue().data; - Status status = getStatusFromCommandResult(queryResponseObj); - if (!status.isOK()) { - _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr); - _finishCallback(); - return; - } - - status = parseReplResponse(queryResponseObj); - if (!status.isOK()) { - _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr); - _finishCallback(); - return; - } - - BatchData batchData; - status = parseCursorResponse(queryResponseObj, batchFieldName, &batchData); - if (!status.isOK()) { - _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr); - _finishCallback(); - return; - } - - NextAction nextAction = NextAction::kNoAction; - - if (batchData.cursorId) { - nextAction = NextAction::kGetMore; - } - - BSONObjBuilder bob; - _work(StatusWith<BatchData>(batchData), &nextAction, &bob); - - // Callback function _work may modify nextAction to request the fetcher - // not to schedule a getMore command. - if (!batchData.cursorId || nextAction != NextAction::kGetMore) { - _finishCallback(); - return; - } - - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - status = _schedule_inlock(bob.obj(), kNextBatchFieldName); - } - if (!status.isOK()) { - nextAction = NextAction::kNoAction; - _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr); - _finishCallback(); - return; - } - } - - void Fetcher::_finishCallback() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = false; - _condition.notify_all(); - } - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/fetcher.h b/src/mongo/db/repl/fetcher.h deleted file mode 100644 index 15ea466981a..00000000000 --- a/src/mongo/db/repl/fetcher.h +++ /dev/null @@ -1,189 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <string> -#include <vector> - -#include "mongo/base/disallow_copying.h" -#include "mongo/base/status.h" -#include "mongo/base/status_with.h" -#include "mongo/bson/bsonobj.h" -#include "mongo/db/clientcursor.h" -#include "mongo/db/namespace_string.h" -#include "mongo/executor/task_executor.h" -#include "mongo/stdx/functional.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" -#include "mongo/util/net/hostandport.h" - -namespace mongo { -namespace repl { - - class Fetcher { - MONGO_DISALLOW_COPYING(Fetcher); - public: - - /** - * Container for BSON documents extracted from cursor results. - */ - typedef std::vector<BSONObj> Documents; - - /** - * Documents in current batch with cursor ID and associated namespace name. - * If cursor ID is zero, there are no additional batches. - */ - struct BatchData { - BatchData() = default; - BatchData(CursorId theCursorId, const NamespaceString& theNss, Documents theDocuments); - CursorId cursorId = 0; - NamespaceString nss; - Documents documents; - }; - - /** - * Represents next steps of fetcher. - */ - enum class NextAction : int { - kInvalid=0, - kNoAction=1, - kGetMore=2 - }; - - /** - * Type of a fetcher callback function. - */ - typedef stdx::function<void (const StatusWith<BatchData>&, - NextAction*, - BSONObjBuilder*)> CallbackFn; - - /** - * Creates Fetcher task but does not schedule it to be run by the executor. - * - * First remote command to be run by the executor will be 'cmdObj'. The results - * of 'cmdObj' must contain a cursor response object. - * See Commands::appendCursorResponseObject. - * - * Callback function 'work' will be called 1 or more times after a successful - * schedule() call depending on the results of the remote command. - * - * Depending on the cursor ID in the initial cursor response object, the fetcher may run - * subsequent getMore commands on the remote server in order to obtain a complete - * set of results. - * - * Failed remote commands will also cause 'work' to be invoked with the - * error details provided by the remote server. On failure, the fetcher will stop - * sending getMore requests to the remote server. - * - * If the fetcher is canceled (either by calling cancel() or shutting down the executor), - * 'work' will not be invoked. - * - * Fetcher uses the NextAction argument to inform client via callback if a getMore command - * will be scheduled to be run by the executor to retrieve additional results. - * Also, note that the NextAction is both an input and output argument to allow - * the client to suggest a different action for the fetcher to take post-callback. - * - * The callback function 'work' is not allowed to call into the Fetcher instance. This - * behavior is undefined and may result in a deadlock. - */ - Fetcher(executor::TaskExecutor* executor, - const HostAndPort& source, - const std::string& dbname, - const BSONObj& cmdObj, - const CallbackFn& work); - - virtual ~Fetcher(); - - /** - * Returns diagnostic information. - */ - std::string getDiagnosticString() const; - - /** - * Returns true if a remote command has been scheduled (but not completed) - * with the executor. - */ - bool isActive() const; - - /** - * Schedules 'cmdObj' to be run on the remote server. - */ - Status schedule(); - - /** - * Cancels remote command request. - * Returns immediately if fetcher is not active. - */ - void cancel(); - - /** - * Waits for remote command requests to complete. - * Returns immediately if fetcher is not active. - */ - void wait(); - - private: - - /** - * Schedules remote command to be run by the executor - */ - Status _schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName); - - /** - * Callback for remote command. - */ - void _callback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd, - const char* batchFieldName); - - /** - * Sets fetcher state to inactive and notifies waiters. - */ - void _finishCallback(); - - // Not owned by us. - executor::TaskExecutor* _executor; - - HostAndPort _source; - std::string _dbname; - BSONObj _cmdObj; - CallbackFn _work; - - // Protects member data of this Fetcher. - mutable stdx::mutex _mutex; - - mutable stdx::condition_variable _condition; - - // _active is true when Fetcher is scheduled to be run by the executor. - bool _active; - // Callback handle to the scheduled remote command. - executor::TaskExecutor::CallbackHandle _remoteCommandCallbackHandle; - }; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/fetcher_test.cpp b/src/mongo/db/repl/fetcher_test.cpp deleted file mode 100644 index 4672dd9759d..00000000000 --- a/src/mongo/db/repl/fetcher_test.cpp +++ /dev/null @@ -1,627 +0,0 @@ -/** - * Copyright 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include <memory> - -#include "mongo/db/jsobj.h" -#include "mongo/db/repl/fetcher.h" -#include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/replication_executor_test_fixture.h" -#include "mongo/executor/network_interface_mock.h" - -#include "mongo/unittest/unittest.h" - -namespace { - - using namespace mongo; - using namespace mongo::repl; - using executor::NetworkInterfaceMock; - - const HostAndPort target("localhost", -1); - const BSONObj findCmdObj = BSON("find" << "coll"); - - class FetcherTest : public ReplicationExecutorTest { - public: - static Status getDetectableErrorStatus(); - FetcherTest(); - void setUp() override; - void tearDown() override; - void clear(); - void scheduleNetworkResponse(const BSONObj& obj); - void scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason); - void processNetworkResponse(const BSONObj& obj); - void processNetworkResponse(ErrorCodes::Error code, const std::string& reason); - void finishProcessingNetworkResponse(); - - protected: - Status status; - CursorId cursorId; - Fetcher::Documents documents; - Fetcher::NextAction nextAction; - Fetcher::NextAction newNextAction; - std::unique_ptr<Fetcher> fetcher; - // Called at end of _callback - Fetcher::CallbackFn callbackHook; - - private: - void _callback(const StatusWith<Fetcher::BatchData>& result, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob); - }; - - Status FetcherTest::getDetectableErrorStatus() { - return Status(ErrorCodes::InternalError, "Not mutated"); - } - - FetcherTest::FetcherTest() - : status(getDetectableErrorStatus()), - cursorId(-1), - nextAction(Fetcher::NextAction::kInvalid) { } - - void FetcherTest::setUp() { - ReplicationExecutorTest::setUp(); - clear(); - fetcher.reset(new Fetcher( - &getExecutor(), target, "db", findCmdObj, - stdx::bind(&FetcherTest::_callback, this, - stdx::placeholders::_1, stdx::placeholders::_2, stdx::placeholders::_3))); - launchExecutorThread(); - } - - void FetcherTest::tearDown() { - ReplicationExecutorTest::tearDown(); - // Executor may still invoke fetcher's callback before shutting down. - fetcher.reset(); - } - - void FetcherTest::clear() { - status = getDetectableErrorStatus(); - cursorId = -1; - documents.clear(); - nextAction = Fetcher::NextAction::kInvalid; - } - - void FetcherTest::scheduleNetworkResponse(const BSONObj& obj) { - NetworkInterfaceMock* net = getNet(); - ASSERT_TRUE(net->hasReadyRequests()); - Milliseconds millis(0); - RemoteCommandResponse response(obj, millis); - ReplicationExecutor::ResponseStatus responseStatus(response); - net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); - } - - void FetcherTest::scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason) { - NetworkInterfaceMock* net = getNet(); - ASSERT_TRUE(net->hasReadyRequests()); - ReplicationExecutor::ResponseStatus responseStatus(code, reason); - net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); - } - - void FetcherTest::processNetworkResponse(const BSONObj& obj) { - scheduleNetworkResponse(obj); - finishProcessingNetworkResponse(); - } - - void FetcherTest::processNetworkResponse(ErrorCodes::Error code, - const std::string& reason) { - scheduleNetworkResponse(code, reason); - finishProcessingNetworkResponse(); - } - - void FetcherTest::finishProcessingNetworkResponse() { - clear(); - ASSERT_TRUE(fetcher->isActive()); - getNet()->runReadyNetworkOperations(); - ASSERT_FALSE(getNet()->hasReadyRequests()); - ASSERT_FALSE(fetcher->isActive()); - } - - void FetcherTest::_callback(const StatusWith<Fetcher::BatchData>& result, - Fetcher::NextAction* nextActionFromFetcher, - BSONObjBuilder* getMoreBob) { - status = result.getStatus(); - if (result.isOK()) { - const Fetcher::BatchData& batchData = result.getValue(); - cursorId = batchData.cursorId; - documents = batchData.documents; - } - - if (callbackHook) { - callbackHook(result, nextActionFromFetcher, getMoreBob); - } - - if (nextActionFromFetcher) { - nextAction = *nextActionFromFetcher; - } - } - - void unusedFetcherCallback(const StatusWith<Fetcher::BatchData>& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { - FAIL("should not reach here"); - } - - TEST_F(FetcherTest, InvalidConstruction) { - ReplicationExecutor& executor = getExecutor(); - - // Null executor. - ASSERT_THROWS(Fetcher(nullptr, target, "db", findCmdObj, unusedFetcherCallback), - UserException); - - // Empty database name. - ASSERT_THROWS(Fetcher(&executor, target, "", findCmdObj, unusedFetcherCallback), - UserException); - - // Empty command object. - ASSERT_THROWS(Fetcher(&executor, target, "db", BSONObj(), unusedFetcherCallback), - UserException); - - // Callback function cannot be null. - ASSERT_THROWS(Fetcher(&executor, target, "db", findCmdObj, Fetcher::CallbackFn()), - UserException); - } - - // Command object can refer to any command that returns a cursor. This - // includes listIndexes and listCollections. - TEST_F(FetcherTest, NonFindCommand) { - ReplicationExecutor& executor = getExecutor(); - - Fetcher(&executor, target, "db", BSON("listIndexes" << "coll"), unusedFetcherCallback); - Fetcher(&executor, target, "db", BSON("listCollections" << 1), unusedFetcherCallback); - Fetcher(&executor, target, "db", BSON("a" << 1), unusedFetcherCallback); - } - - TEST_F(FetcherTest, GetDiagnosticString) { - Fetcher fetcher(&getExecutor(), target, "db", findCmdObj, unusedFetcherCallback); - ASSERT_FALSE(fetcher.getDiagnosticString().empty()); - } - - TEST_F(FetcherTest, IsActiveAfterSchedule) { - ASSERT_FALSE(fetcher->isActive()); - ASSERT_OK(fetcher->schedule()); - ASSERT_TRUE(fetcher->isActive()); - } - - TEST_F(FetcherTest, ScheduleWhenActive) { - ASSERT_OK(fetcher->schedule()); - ASSERT_TRUE(fetcher->isActive()); - ASSERT_NOT_OK(fetcher->schedule()); - } - - TEST_F(FetcherTest, CancelWithoutSchedule) { - ASSERT_FALSE(fetcher->isActive()); - fetcher->cancel(); - } - - TEST_F(FetcherTest, WaitWithoutSchedule) { - ASSERT_FALSE(fetcher->isActive()); - fetcher->wait(); - } - - TEST_F(FetcherTest, ShutdownBeforeSchedule) { - getExecutor().shutdown(); - ASSERT_NOT_OK(fetcher->schedule()); - ASSERT_FALSE(fetcher->isActive()); - } - - TEST_F(FetcherTest, ScheduleAndCancel) { - ASSERT_OK(fetcher->schedule()); - scheduleNetworkResponse(BSON("ok" << 1)); - - fetcher->cancel(); - finishProcessingNetworkResponse(); - - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); - } - - TEST_F(FetcherTest, ScheduleButShutdown) { - ASSERT_OK(fetcher->schedule()); - scheduleNetworkResponse(BSON("ok" << 1)); - - getExecutor().shutdown(); - // Network interface should not deliver mock response to callback. - finishProcessingNetworkResponse(); - - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); - } - - TEST_F(FetcherTest, FindCommandFailed1) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(ErrorCodes::BadValue, "bad hint"); - ASSERT_EQUALS(ErrorCodes::BadValue, status.code()); - ASSERT_EQUALS("bad hint", status.reason()); - } - - TEST_F(FetcherTest, FindCommandFailed2) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("ok" << 0 << - "errmsg" << "bad hint" << - "code" << int(ErrorCodes::BadValue))); - ASSERT_EQUALS(ErrorCodes::BadValue, status.code()); - ASSERT_EQUALS("bad hint", status.reason()); - } - - TEST_F(FetcherTest, CursorFieldMissing) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("ok" << 1)); - ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); - ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor' field"); - } - - TEST_F(FetcherTest, CursorNotAnObject) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << 123 << "ok" << 1)); - ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); - ASSERT_STRING_CONTAINS(status.reason(), "'cursor' field must be an object"); - } - - TEST_F(FetcherTest, CursorIdFieldMissing) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << BSON("ns" << "db.coll" << - "firstBatch" << BSONArray()) << - "ok" << 1)); - ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); - ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.id' field"); - } - - TEST_F(FetcherTest, CursorIdNotLongNumber) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << BSON("id" << 123.1 << - "ns" << "db.coll" << - "firstBatch" << BSONArray()) << - "ok" << 1)); - ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); - ASSERT_STRING_CONTAINS(status.reason(), - "'cursor.id' field must be"); - ASSERT_EQ((int)Fetcher::NextAction::kInvalid, (int)nextAction); - } - - TEST_F(FetcherTest, NamespaceFieldMissing) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << BSON("id" << 123LL << - "firstBatch" << BSONArray()) << - "ok" << 1)); - ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); - ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.ns' field"); - } - - TEST_F(FetcherTest, NamespaceNotAString) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << BSON("id" << 123LL << - "ns" << 123 << - "firstBatch" << BSONArray()) << - "ok" << 1)); - ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); - ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' field must be a string"); - } - - TEST_F(FetcherTest, NamespaceEmpty) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << BSON("id" << 123LL << - "ns" << "" << - "firstBatch" << BSONArray()) << - "ok" << 1)); - ASSERT_EQUALS(ErrorCodes::BadValue, status.code()); - ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' contains an invalid namespace"); - } - - TEST_F(FetcherTest, NamespaceMissingCollectionName) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << BSON("id" << 123LL << - "ns" << "db." << - "firstBatch" << BSONArray()) << - "ok" << 1)); - ASSERT_EQUALS(ErrorCodes::BadValue, status.code()); - ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' contains an invalid namespace"); - } - - TEST_F(FetcherTest, FirstBatchFieldMissing) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << - "ns" << "db.coll") << - "ok" << 1)); - ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); - ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.firstBatch' field"); - } - - TEST_F(FetcherTest, FirstBatchNotAnArray) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << - "ns" << "db.coll" << - "firstBatch" << 123) << - "ok" << 1)); - ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); - ASSERT_STRING_CONTAINS(status.reason(), "'cursor.firstBatch' field must be an array"); - } - - TEST_F(FetcherTest, FirstBatchArrayContainsNonObject) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << - "ns" << "db.coll" << - "firstBatch" << BSON_ARRAY(8)) << - "ok" << 1)); - ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); - ASSERT_STRING_CONTAINS(status.reason(), "found non-object"); - ASSERT_STRING_CONTAINS(status.reason(), "in 'cursor.firstBatch' field"); - } - - TEST_F(FetcherTest, FirstBatchEmptyArray) { - ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << - "ns" << "db.coll" << - "firstBatch" << BSONArray()) << - "ok" << 1)); - ASSERT_OK(status); - ASSERT_TRUE(documents.empty()); - } - - TEST_F(FetcherTest, FetchOneDocument) { - ASSERT_OK(fetcher->schedule()); - const BSONObj doc = BSON("_id" << 1); - processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << - "ns" << "db.coll" << - "firstBatch" << BSON_ARRAY(doc)) << - "ok" << 1)); - ASSERT_OK(status); - ASSERT_EQUALS(0, cursorId); - ASSERT_EQUALS(1U, documents.size()); - ASSERT_EQUALS(doc, documents.front()); - } - - TEST_F(FetcherTest, SetNextActionToContinueWhenNextBatchIsNotAvailable) { - ASSERT_OK(fetcher->schedule()); - const BSONObj doc = BSON("_id" << 1); - callbackHook = [](const StatusWith<Fetcher::BatchData>& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { - ASSERT_OK(fetchResult.getStatus()); - Fetcher::BatchData batchData{fetchResult.getValue()}; - - ASSERT(nextAction); - *nextAction = Fetcher::NextAction::kGetMore; - ASSERT(getMoreBob); - getMoreBob->append("getMore", batchData.cursorId); - getMoreBob->append("collection", batchData.nss.coll()); - }; - processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << - "ns" << "db.coll" << - "firstBatch" << BSON_ARRAY(doc)) << - "ok" << 1)); - ASSERT_OK(status); - ASSERT_EQUALS(0, cursorId); - ASSERT_EQUALS(1U, documents.size()); - ASSERT_EQUALS(doc, documents.front()); - } - - TEST_F(FetcherTest, FetchMultipleBatches) { - ASSERT_OK(fetcher->schedule()); - const BSONObj doc = BSON("_id" << 1); - scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << - "ns" << "db.coll" << - "firstBatch" << BSON_ARRAY(doc)) << - "ok" << 1)); - getNet()->runReadyNetworkOperations(); - ASSERT_OK(status); - ASSERT_EQUALS(1U, documents.size()); - ASSERT_EQUALS(doc, documents.front()); - ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); - - ASSERT_TRUE(getNet()->hasReadyRequests()); - const BSONObj doc2 = BSON("_id" << 2); - scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << - "ns" << "db.coll" << - "nextBatch" << BSON_ARRAY(doc2)) << - "ok" << 1)); - getNet()->runReadyNetworkOperations(); - ASSERT_OK(status); - ASSERT_EQUALS(1U, documents.size()); - ASSERT_EQUALS(doc2, documents.front()); - ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); - - ASSERT_TRUE(getNet()->hasReadyRequests()); - const BSONObj doc3 = BSON("_id" << 3); - scheduleNetworkResponse(BSON("cursor" << BSON("id" << 0LL << - "ns" << "db.coll" << - "nextBatch" << BSON_ARRAY(doc3)) << - "ok" << 1)); - getNet()->runReadyNetworkOperations(); - ASSERT_OK(status); - ASSERT_EQUALS(1U, documents.size()); - ASSERT_EQUALS(doc3, documents.front()); - ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction); - ASSERT_FALSE(fetcher->isActive()); - - ASSERT_FALSE(getNet()->hasReadyRequests()); - } - - TEST_F(FetcherTest, ScheduleGetMoreAndCancel) { - ASSERT_OK(fetcher->schedule()); - const BSONObj doc = BSON("_id" << 1); - scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << - "ns" << "db.coll" << - "firstBatch" << BSON_ARRAY(doc)) << - "ok" << 1)); - getNet()->runReadyNetworkOperations(); - ASSERT_OK(status); - ASSERT_EQUALS(1U, documents.size()); - ASSERT_EQUALS(doc, documents.front()); - ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); - - ASSERT_TRUE(getNet()->hasReadyRequests()); - const BSONObj doc2 = BSON("_id" << 2); - scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << - "ns" << "db.coll" << - "nextBatch" << BSON_ARRAY(doc2)) << - "ok" << 1)); - getNet()->runReadyNetworkOperations(); - ASSERT_OK(status); - ASSERT_EQUALS(1U, documents.size()); - ASSERT_EQUALS(doc2, documents.front()); - ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); - - fetcher->cancel(); - finishProcessingNetworkResponse(); - ASSERT_NOT_OK(status); - } - - TEST_F(FetcherTest, ScheduleGetMoreButShutdown) { - ASSERT_OK(fetcher->schedule()); - const BSONObj doc = BSON("_id" << 1); - scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << - "ns" << "db.coll" << - "firstBatch" << BSON_ARRAY(doc)) << - "ok" << 1)); - getNet()->runReadyNetworkOperations(); - ASSERT_OK(status); - ASSERT_EQUALS(1U, documents.size()); - ASSERT_EQUALS(doc, documents.front()); - ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); - - ASSERT_TRUE(getNet()->hasReadyRequests()); - const BSONObj doc2 = BSON("_id" << 2); - scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << - "ns" << "db.coll" << - "nextBatch" << BSON_ARRAY(doc2)) << - "ok" << 1)); - getNet()->runReadyNetworkOperations(); - ASSERT_OK(status); - ASSERT_EQUALS(1U, documents.size()); - ASSERT_EQUALS(doc2, documents.front()); - ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); - - getExecutor().shutdown(); - finishProcessingNetworkResponse(); - ASSERT_NOT_OK(status); - } - - void setNextActionToNoAction(const StatusWith<Fetcher::BatchData>& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { - *nextAction = Fetcher::NextAction::kNoAction; - } - - TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) { - ASSERT_OK(fetcher->schedule()); - const BSONObj doc = BSON("_id" << 1); - scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << - "ns" << "db.coll" << - "firstBatch" << BSON_ARRAY(doc)) << - "ok" << 1)); - getNet()->runReadyNetworkOperations(); - ASSERT_OK(status); - ASSERT_EQUALS(1U, documents.size()); - ASSERT_EQUALS(doc, documents.front()); - ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); - - ASSERT_TRUE(getNet()->hasReadyRequests()); - const BSONObj doc2 = BSON("_id" << 2); - scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << - "ns" << "db.coll" << - "nextBatch" << BSON_ARRAY(doc2)) << - "ok" << 1)); - - callbackHook = setNextActionToNoAction; - - getNet()->runReadyNetworkOperations(); - ASSERT_OK(status); - ASSERT_EQUALS(1U, documents.size()); - ASSERT_EQUALS(doc2, documents.front()); - ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction); - ASSERT_FALSE(fetcher->isActive()); - } - - /** - * This will be invoked twice before the fetcher returns control to the replication executor. - */ - void shutdownDuringSecondBatch(const StatusWith<Fetcher::BatchData>& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob, - const BSONObj& doc2, - ReplicationExecutor* executor, bool* isShutdownCalled) { - if (*isShutdownCalled) { - return; - } - - // First time during second batch - ASSERT_OK(fetchResult.getStatus()); - Fetcher::BatchData batchData{fetchResult.getValue()}; - ASSERT_EQUALS(1U, batchData.documents.size()); - ASSERT_EQUALS(doc2, batchData.documents.front()); - ASSERT_TRUE(Fetcher::NextAction::kGetMore == *nextAction); - ASSERT(getMoreBob); - getMoreBob->append("getMore", batchData.cursorId); - getMoreBob->append("collection", batchData.nss.coll()); - - executor->shutdown(); - *isShutdownCalled = true; - } - - TEST_F(FetcherTest, ShutdownDuringSecondBatch) { - ASSERT_OK(fetcher->schedule()); - const BSONObj doc = BSON("_id" << 1); - scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << - "ns" << "db.coll" << - "firstBatch" << BSON_ARRAY(doc)) << - "ok" << 1)); - getNet()->runReadyNetworkOperations(); - ASSERT_OK(status); - ASSERT_EQUALS(1U, documents.size()); - ASSERT_EQUALS(doc, documents.front()); - ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); - - ASSERT_TRUE(getNet()->hasReadyRequests()); - const BSONObj doc2 = BSON("_id" << 2); - scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << - "ns" << "db.coll" << - "nextBatch" << BSON_ARRAY(doc2)) << - "ok" << 1)); - - bool isShutdownCalled = false; - callbackHook = stdx::bind(shutdownDuringSecondBatch, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3, - doc2, - &getExecutor(), &isShutdownCalled); - - getNet()->runReadyNetworkOperations(); - ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code()); - ASSERT_FALSE(fetcher->isActive()); - } - -} // namespace |