summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorDaniel Alabi <alabidan@gmail.com>2015-06-09 17:43:09 -0400
committerDaniel Alabi <alabidan@gmail.com>2015-06-11 16:51:13 -0400
commitdfea887c3b0eab7fec881ef9bb6d300566ab669f (patch)
tree82f8b71e84af5eda8273eaf448ea3f9324442df7 /src/mongo/db/repl
parent0d403de0d525237ea3fa2aee63117080ca357591 (diff)
downloadmongo-dfea887c3b0eab7fec881ef9bb6d300566ab669f.tar.gz
SERVER-18901 Move Fetcher and QueryFetcher to mongo/client/
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript26
-rw-r--r--src/mongo/db/repl/collection_cloner.h2
-rw-r--r--src/mongo/db/repl/data_replicator.cpp80
-rw-r--r--src/mongo/db/repl/data_replicator.h5
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp2
-rw-r--r--src/mongo/db/repl/database_cloner.h2
-rw-r--r--src/mongo/db/repl/fetcher.cpp301
-rw-r--r--src/mongo/db/repl/fetcher.h189
-rw-r--r--src/mongo/db/repl/fetcher_test.cpp627
9 files changed, 10 insertions, 1224 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d899720f731..98fa6f5caad 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -423,19 +423,6 @@ env.Library(
],
)
-env.Library(
- target='fetcher',
- source=[
- 'fetcher.cpp',
- ],
- LIBDEPS=[
- 'replication_executor',
- '$BUILD_DIR/mongo/logger/logger',
- '$BUILD_DIR/mongo/db/namespace_string',
- '$BUILD_DIR/mongo/rpc/command_status',
- ],
-)
-
env.CppUnitTest(
target='reporter_test',
source='reporter_test.cpp',
@@ -445,15 +432,6 @@ env.CppUnitTest(
],
)
-env.CppUnitTest(
- target='fetcher_test',
- source='fetcher_test.cpp',
- LIBDEPS=[
- 'fetcher',
- 'replication_executor_test_fixture',
- ],
-)
-
env.Library(
target='base_cloner_test_fixture',
source=[
@@ -470,9 +448,9 @@ env.Library(
'collection_cloner.cpp',
],
LIBDEPS=[
- 'fetcher',
'replication_executor',
'$BUILD_DIR/mongo/db/catalog/collection_options',
+ '$BUILD_DIR/mongo/client/fetcher',
'$BUILD_DIR/mongo/logger/logger',
],
)
@@ -611,8 +589,8 @@ env.Library(
'applier',
'collection_cloner',
'database_cloner',
- 'fetcher',
'repl_coordinator_interface',
+ '$BUILD_DIR/mongo/client/fetcher',
],
)
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 37dc718195b..19ff6850163 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -35,10 +35,10 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/client/fetcher.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/base_cloner.h"
-#include "mongo/db/repl/fetcher.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/condition_variable.h"
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 8e71942ec54..d0caa1eebd6 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -37,7 +37,9 @@
#include <thread>
#include "mongo/base/status.h"
+#include "mongo/client/query_fetcher.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/database_cloner.h"
@@ -92,43 +94,6 @@ namespace {
} // namespace
/**
- * Follows the fetcher pattern for a find+getmore
- */
- class QueryFetcher {
- MONGO_DISALLOW_COPYING(QueryFetcher);
- public:
- using CallbackFn = stdx::function<void (const BatchDataStatus&, NextAction*)>;
-
- QueryFetcher(ReplicationExecutor* exec,
- const HostAndPort& source,
- const NamespaceString& nss,
- const BSONObj& cmdBSON,
- const QueryFetcher::CallbackFn& onBatchAvailable);
- virtual ~QueryFetcher() = default;
-
- bool isActive() const { return _fetcher.isActive(); }
- Status schedule() { return _fetcher.schedule(); }
- void cancel() { return _fetcher.cancel(); }
- void wait() { if (_fetcher.isActive()) _fetcher.wait(); }
- std::string toString() const;
-
- protected:
- void _onFetchCallback(const BatchDataStatus& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob);
-
- virtual void _delegateCallback(const BatchDataStatus& fetchResult,
- NextAction* nextAction) {
- _work(fetchResult, nextAction);
- };
-
- ReplicationExecutor* _exec;
- Fetcher _fetcher;
- int _responses;
- const QueryFetcher::CallbackFn _work;
- };
-
- /**
* Follows the fetcher pattern for a find+getmore on an oplog
* Returns additional errors if the start oplog entry cannot be found.
*/
@@ -156,47 +121,6 @@ namespace {
const Timestamp _startTS;
};
- // QueryFetcher
- QueryFetcher::QueryFetcher(ReplicationExecutor* exec,
- const HostAndPort& src,
- const NamespaceString& nss,
- const BSONObj& cmdBSON,
- const CallbackFn& work)
- : _exec(exec),
- _fetcher(exec,
- src,
- nss.db().toString(),
- cmdBSON,
- stdx::bind(&QueryFetcher::_onFetchCallback,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- stdx::placeholders::_3)),
- _responses(0),
- _work(work) {
- }
-
- void QueryFetcher::_onFetchCallback(const BatchDataStatus& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob) {
- ++_responses;
-
- _delegateCallback(fetchResult, nextAction);
- // The fetcher will continue to call with kGetMore until an error or the last batch.
- if (fetchResult.isOK() && *nextAction == NextAction::kGetMore) {
- const auto batchData(fetchResult.getValue());
- invariant(getMoreBob);
- getMoreBob->append("getMore", batchData.cursorId);
- getMoreBob->append("collection", batchData.nss.coll());
- }
- }
-
- std::string QueryFetcher::toString() const {
- return str::stream() << "QueryFetcher -"
- << " responses: " << _responses
- << " fetcher: " << _fetcher.getDiagnosticString();
- }
-
// OplogFetcher
OplogFetcher::OplogFetcher(ReplicationExecutor* exec,
const Timestamp& startTS,
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index 59e242dfc0d..a01e0c61b19 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -41,7 +41,6 @@
#include "mongo/db/repl/applier.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/database_cloner.h"
-#include "mongo/db/repl/fetcher.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/reporter.h"
@@ -49,6 +48,9 @@
#include "mongo/util/queue.h"
namespace mongo {
+
+class QueryFetcher;
+
namespace repl {
using Operations = Applier::Operations;
@@ -65,7 +67,6 @@ using Response = RemoteCommandResponse;
using TimestampStatus = StatusWith<Timestamp>;
using UniqueLock = stdx::unique_lock<stdx::mutex>;
-class QueryFetcher;
class OplogFetcher;
struct InitialSyncState;
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 854829ef113..f10fb2514fa 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -32,10 +32,10 @@
#include <memory>
+#include "mongo/client/fetcher.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/base_cloner_test_fixture.h"
#include "mongo/db/repl/data_replicator.h"
-#include "mongo/db/repl/fetcher.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replica_set_config.h"
diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h
index 447d7ce07fd..071b33d0e96 100644
--- a/src/mongo/db/repl/database_cloner.h
+++ b/src/mongo/db/repl/database_cloner.h
@@ -35,10 +35,10 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/client/fetcher.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/base_cloner.h"
-#include "mongo/db/repl/fetcher.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
diff --git a/src/mongo/db/repl/fetcher.cpp b/src/mongo/db/repl/fetcher.cpp
deleted file mode 100644
index cfe8402057d..00000000000
--- a/src/mongo/db/repl/fetcher.cpp
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/repl/fetcher.h"
-
-#include "mongo/db/jsobj.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/mongoutils/str.h"
-
-namespace mongo {
-namespace repl {
-
-namespace {
-
- const char* kCursorFieldName = "cursor";
- const char* kCursorIdFieldName = "id";
- const char* kNamespaceFieldName = "ns";
-
- const char* kFirstBatchFieldName = "firstBatch";
- const char* kNextBatchFieldName = "nextBatch";
-
- /**
- * Parses cursor response in command result for cursor ID, namespace and documents.
- * 'batchFieldName' will be 'firstBatch' for the initial remote command invocation and
- * 'nextBatch' for getMore.
- */
- Status parseCursorResponse(const BSONObj& obj,
- const std::string& batchFieldName,
- Fetcher::BatchData* batchData) {
- invariant(batchFieldName == kFirstBatchFieldName || batchFieldName == kNextBatchFieldName);
- invariant(batchData);
-
- BSONElement cursorElement = obj.getField(kCursorFieldName);
- if (cursorElement.eoo()) {
- return Status(ErrorCodes::FailedToParse, str::stream() <<
- "cursor response must contain '" << kCursorFieldName <<
- "' field: " << obj);
- }
- if (!cursorElement.isABSONObj()) {
- return Status(ErrorCodes::FailedToParse, str::stream() <<
- "'" << kCursorFieldName << "' field must be an object: " << obj);
- }
- BSONObj cursorObj = cursorElement.Obj();
-
- BSONElement cursorIdElement = cursorObj.getField(kCursorIdFieldName);
- if (cursorIdElement.eoo()) {
- return Status(ErrorCodes::FailedToParse, str::stream() <<
- "cursor response must contain '" << kCursorFieldName << "." <<
- kCursorIdFieldName << "' field: " << obj);
- }
- if (!(cursorIdElement.type() == mongo::NumberLong ||
- cursorIdElement.type() == mongo::NumberInt)) {
- return Status(ErrorCodes::FailedToParse, str::stream() <<
- "'" << kCursorFieldName << "." << kCursorIdFieldName <<
- "' field must be a integral number of type 'int' or 'long' but was a '"
- << typeName(cursorIdElement.type()) << "': " << obj);
- }
- batchData->cursorId = cursorIdElement.numberLong();
-
- BSONElement namespaceElement = cursorObj.getField(kNamespaceFieldName);
- if (namespaceElement.eoo()) {
- return Status(ErrorCodes::FailedToParse, str::stream() <<
- "cursor response must contain " <<
- "'" << kCursorFieldName << "." << kNamespaceFieldName << "' field: " <<
- obj);
- }
- if (namespaceElement.type() != mongo::String) {
- return Status(ErrorCodes::FailedToParse, str::stream() <<
- "'" << kCursorFieldName << "." << kNamespaceFieldName <<
- "' field must be a string: " << obj);
- }
- NamespaceString tempNss(namespaceElement.valuestrsafe());
- if (!tempNss.isValid()) {
- return Status(ErrorCodes::BadValue, str::stream() <<
- "'" << kCursorFieldName << "." << kNamespaceFieldName <<
- "' contains an invalid namespace: " << obj);
- }
- batchData->nss = tempNss;
-
- BSONElement batchElement = cursorObj.getField(batchFieldName);
- if (batchElement.eoo()) {
- return Status(ErrorCodes::FailedToParse, str::stream() <<
- "cursor response must contain '" << kCursorFieldName << "." <<
- batchFieldName << "' field: " << obj);
- }
- if (!batchElement.isABSONObj()) {
- return Status(ErrorCodes::FailedToParse, str::stream() <<
- "'" << kCursorFieldName << "." << batchFieldName <<
- "' field must be an array: " << obj);
- }
- BSONObj batchObj = batchElement.Obj();
- for (auto itemElement : batchObj) {
- if (!itemElement.isABSONObj()) {
- return Status(ErrorCodes::FailedToParse, str::stream() <<
- "found non-object " << itemElement << " in " <<
- "'" << kCursorFieldName << "." << batchFieldName << "' field: " <<
- obj);
- }
- batchData->documents.push_back(itemElement.Obj().getOwned());
- }
-
- return Status::OK();
- }
-
- Status parseReplResponse(const BSONObj& obj) {
- return Status::OK();
- }
-
-} // namespace
-
- Fetcher::BatchData::BatchData(CursorId theCursorId,
- const NamespaceString& theNss,
- Documents theDocuments)
- : cursorId(theCursorId),
- nss(theNss),
- documents(theDocuments) { }
-
- Fetcher::Fetcher(executor::TaskExecutor* executor,
- const HostAndPort& source,
- const std::string& dbname,
- const BSONObj& findCmdObj,
- const CallbackFn& work)
- : _executor(executor),
- _source(source),
- _dbname(dbname),
- _cmdObj(findCmdObj.getOwned()),
- _work(work),
- _active(false),
- _remoteCommandCallbackHandle() {
-
- uassert(ErrorCodes::BadValue, "null replication executor", executor);
- uassert(ErrorCodes::BadValue, "database name cannot be empty", !dbname.empty());
- uassert(ErrorCodes::BadValue, "command object cannot be empty", !findCmdObj.isEmpty());
- uassert(ErrorCodes::BadValue, "callback function cannot be null", work);
- }
-
- Fetcher::~Fetcher() {
- DESTRUCTOR_GUARD(
- cancel();
- wait();
- );
- }
-
- std::string Fetcher::getDiagnosticString() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- str::stream output;
- output << "Fetcher";
- output << " executor: " << _executor->getDiagnosticString();
- output << " source: " << _source.toString();
- output << " database: " << _dbname;
- output << " query: " << _cmdObj;
- output << " active: " << _active;
- return output;
- }
-
- bool Fetcher::isActive() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
- }
-
- Status Fetcher::schedule() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_active) {
- return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled");
- }
- return _schedule_inlock(_cmdObj, kFirstBatchFieldName);
- }
-
- void Fetcher::cancel() {
- executor::TaskExecutor::CallbackHandle remoteCommandCallbackHandle;
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (!_active) {
- return;
- }
-
- remoteCommandCallbackHandle = _remoteCommandCallbackHandle;
- }
-
- invariant(remoteCommandCallbackHandle.isValid());
- _executor->cancel(remoteCommandCallbackHandle);
- }
-
- void Fetcher::wait() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _condition.wait(lk, [this]() { return !_active; });
- }
-
- Status Fetcher::_schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName) {
- StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult =
- _executor->scheduleRemoteCommand(
- RemoteCommandRequest(_source, _dbname, cmdObj),
- stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, batchFieldName));
-
- if (!scheduleResult.isOK()) {
- return scheduleResult.getStatus();
- }
-
- _active = true;
- _remoteCommandCallbackHandle = scheduleResult.getValue();
- return Status::OK();
- }
-
- void Fetcher::_callback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd,
- const char* batchFieldName) {
-
- if (!rcbd.response.isOK()) {
- _work(StatusWith<Fetcher::BatchData>(rcbd.response.getStatus()), nullptr, nullptr);
- _finishCallback();
- return;
- }
-
- const BSONObj& queryResponseObj = rcbd.response.getValue().data;
- Status status = getStatusFromCommandResult(queryResponseObj);
- if (!status.isOK()) {
- _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr);
- _finishCallback();
- return;
- }
-
- status = parseReplResponse(queryResponseObj);
- if (!status.isOK()) {
- _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr);
- _finishCallback();
- return;
- }
-
- BatchData batchData;
- status = parseCursorResponse(queryResponseObj, batchFieldName, &batchData);
- if (!status.isOK()) {
- _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr);
- _finishCallback();
- return;
- }
-
- NextAction nextAction = NextAction::kNoAction;
-
- if (batchData.cursorId) {
- nextAction = NextAction::kGetMore;
- }
-
- BSONObjBuilder bob;
- _work(StatusWith<BatchData>(batchData), &nextAction, &bob);
-
- // Callback function _work may modify nextAction to request the fetcher
- // not to schedule a getMore command.
- if (!batchData.cursorId || nextAction != NextAction::kGetMore) {
- _finishCallback();
- return;
- }
-
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- status = _schedule_inlock(bob.obj(), kNextBatchFieldName);
- }
- if (!status.isOK()) {
- nextAction = NextAction::kNoAction;
- _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr);
- _finishCallback();
- return;
- }
- }
-
- void Fetcher::_finishCallback() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = false;
- _condition.notify_all();
- }
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/fetcher.h b/src/mongo/db/repl/fetcher.h
deleted file mode 100644
index 15ea466981a..00000000000
--- a/src/mongo/db/repl/fetcher.h
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <string>
-#include <vector>
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/base/status.h"
-#include "mongo/base/status_with.h"
-#include "mongo/bson/bsonobj.h"
-#include "mongo/db/clientcursor.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/util/net/hostandport.h"
-
-namespace mongo {
-namespace repl {
-
- class Fetcher {
- MONGO_DISALLOW_COPYING(Fetcher);
- public:
-
- /**
- * Container for BSON documents extracted from cursor results.
- */
- typedef std::vector<BSONObj> Documents;
-
- /**
- * Documents in current batch with cursor ID and associated namespace name.
- * If cursor ID is zero, there are no additional batches.
- */
- struct BatchData {
- BatchData() = default;
- BatchData(CursorId theCursorId, const NamespaceString& theNss, Documents theDocuments);
- CursorId cursorId = 0;
- NamespaceString nss;
- Documents documents;
- };
-
- /**
- * Represents next steps of fetcher.
- */
- enum class NextAction : int {
- kInvalid=0,
- kNoAction=1,
- kGetMore=2
- };
-
- /**
- * Type of a fetcher callback function.
- */
- typedef stdx::function<void (const StatusWith<BatchData>&,
- NextAction*,
- BSONObjBuilder*)> CallbackFn;
-
- /**
- * Creates Fetcher task but does not schedule it to be run by the executor.
- *
- * First remote command to be run by the executor will be 'cmdObj'. The results
- * of 'cmdObj' must contain a cursor response object.
- * See Commands::appendCursorResponseObject.
- *
- * Callback function 'work' will be called 1 or more times after a successful
- * schedule() call depending on the results of the remote command.
- *
- * Depending on the cursor ID in the initial cursor response object, the fetcher may run
- * subsequent getMore commands on the remote server in order to obtain a complete
- * set of results.
- *
- * Failed remote commands will also cause 'work' to be invoked with the
- * error details provided by the remote server. On failure, the fetcher will stop
- * sending getMore requests to the remote server.
- *
- * If the fetcher is canceled (either by calling cancel() or shutting down the executor),
- * 'work' will not be invoked.
- *
- * Fetcher uses the NextAction argument to inform client via callback if a getMore command
- * will be scheduled to be run by the executor to retrieve additional results.
- * Also, note that the NextAction is both an input and output argument to allow
- * the client to suggest a different action for the fetcher to take post-callback.
- *
- * The callback function 'work' is not allowed to call into the Fetcher instance. This
- * behavior is undefined and may result in a deadlock.
- */
- Fetcher(executor::TaskExecutor* executor,
- const HostAndPort& source,
- const std::string& dbname,
- const BSONObj& cmdObj,
- const CallbackFn& work);
-
- virtual ~Fetcher();
-
- /**
- * Returns diagnostic information.
- */
- std::string getDiagnosticString() const;
-
- /**
- * Returns true if a remote command has been scheduled (but not completed)
- * with the executor.
- */
- bool isActive() const;
-
- /**
- * Schedules 'cmdObj' to be run on the remote server.
- */
- Status schedule();
-
- /**
- * Cancels remote command request.
- * Returns immediately if fetcher is not active.
- */
- void cancel();
-
- /**
- * Waits for remote command requests to complete.
- * Returns immediately if fetcher is not active.
- */
- void wait();
-
- private:
-
- /**
- * Schedules remote command to be run by the executor
- */
- Status _schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName);
-
- /**
- * Callback for remote command.
- */
- void _callback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd,
- const char* batchFieldName);
-
- /**
- * Sets fetcher state to inactive and notifies waiters.
- */
- void _finishCallback();
-
- // Not owned by us.
- executor::TaskExecutor* _executor;
-
- HostAndPort _source;
- std::string _dbname;
- BSONObj _cmdObj;
- CallbackFn _work;
-
- // Protects member data of this Fetcher.
- mutable stdx::mutex _mutex;
-
- mutable stdx::condition_variable _condition;
-
- // _active is true when Fetcher is scheduled to be run by the executor.
- bool _active;
- // Callback handle to the scheduled remote command.
- executor::TaskExecutor::CallbackHandle _remoteCommandCallbackHandle;
- };
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/fetcher_test.cpp b/src/mongo/db/repl/fetcher_test.cpp
deleted file mode 100644
index 4672dd9759d..00000000000
--- a/src/mongo/db/repl/fetcher_test.cpp
+++ /dev/null
@@ -1,627 +0,0 @@
-/**
- * Copyright 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include <memory>
-
-#include "mongo/db/jsobj.h"
-#include "mongo/db/repl/fetcher.h"
-#include "mongo/db/repl/replication_executor.h"
-#include "mongo/db/repl/replication_executor_test_fixture.h"
-#include "mongo/executor/network_interface_mock.h"
-
-#include "mongo/unittest/unittest.h"
-
-namespace {
-
- using namespace mongo;
- using namespace mongo::repl;
- using executor::NetworkInterfaceMock;
-
- const HostAndPort target("localhost", -1);
- const BSONObj findCmdObj = BSON("find" << "coll");
-
- class FetcherTest : public ReplicationExecutorTest {
- public:
- static Status getDetectableErrorStatus();
- FetcherTest();
- void setUp() override;
- void tearDown() override;
- void clear();
- void scheduleNetworkResponse(const BSONObj& obj);
- void scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason);
- void processNetworkResponse(const BSONObj& obj);
- void processNetworkResponse(ErrorCodes::Error code, const std::string& reason);
- void finishProcessingNetworkResponse();
-
- protected:
- Status status;
- CursorId cursorId;
- Fetcher::Documents documents;
- Fetcher::NextAction nextAction;
- Fetcher::NextAction newNextAction;
- std::unique_ptr<Fetcher> fetcher;
- // Called at end of _callback
- Fetcher::CallbackFn callbackHook;
-
- private:
- void _callback(const StatusWith<Fetcher::BatchData>& result,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob);
- };
-
- Status FetcherTest::getDetectableErrorStatus() {
- return Status(ErrorCodes::InternalError, "Not mutated");
- }
-
- FetcherTest::FetcherTest()
- : status(getDetectableErrorStatus()),
- cursorId(-1),
- nextAction(Fetcher::NextAction::kInvalid) { }
-
- void FetcherTest::setUp() {
- ReplicationExecutorTest::setUp();
- clear();
- fetcher.reset(new Fetcher(
- &getExecutor(), target, "db", findCmdObj,
- stdx::bind(&FetcherTest::_callback, this,
- stdx::placeholders::_1, stdx::placeholders::_2, stdx::placeholders::_3)));
- launchExecutorThread();
- }
-
- void FetcherTest::tearDown() {
- ReplicationExecutorTest::tearDown();
- // Executor may still invoke fetcher's callback before shutting down.
- fetcher.reset();
- }
-
- void FetcherTest::clear() {
- status = getDetectableErrorStatus();
- cursorId = -1;
- documents.clear();
- nextAction = Fetcher::NextAction::kInvalid;
- }
-
- void FetcherTest::scheduleNetworkResponse(const BSONObj& obj) {
- NetworkInterfaceMock* net = getNet();
- ASSERT_TRUE(net->hasReadyRequests());
- Milliseconds millis(0);
- RemoteCommandResponse response(obj, millis);
- ReplicationExecutor::ResponseStatus responseStatus(response);
- net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
- }
-
- void FetcherTest::scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason) {
- NetworkInterfaceMock* net = getNet();
- ASSERT_TRUE(net->hasReadyRequests());
- ReplicationExecutor::ResponseStatus responseStatus(code, reason);
- net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
- }
-
- void FetcherTest::processNetworkResponse(const BSONObj& obj) {
- scheduleNetworkResponse(obj);
- finishProcessingNetworkResponse();
- }
-
- void FetcherTest::processNetworkResponse(ErrorCodes::Error code,
- const std::string& reason) {
- scheduleNetworkResponse(code, reason);
- finishProcessingNetworkResponse();
- }
-
- void FetcherTest::finishProcessingNetworkResponse() {
- clear();
- ASSERT_TRUE(fetcher->isActive());
- getNet()->runReadyNetworkOperations();
- ASSERT_FALSE(getNet()->hasReadyRequests());
- ASSERT_FALSE(fetcher->isActive());
- }
-
- void FetcherTest::_callback(const StatusWith<Fetcher::BatchData>& result,
- Fetcher::NextAction* nextActionFromFetcher,
- BSONObjBuilder* getMoreBob) {
- status = result.getStatus();
- if (result.isOK()) {
- const Fetcher::BatchData& batchData = result.getValue();
- cursorId = batchData.cursorId;
- documents = batchData.documents;
- }
-
- if (callbackHook) {
- callbackHook(result, nextActionFromFetcher, getMoreBob);
- }
-
- if (nextActionFromFetcher) {
- nextAction = *nextActionFromFetcher;
- }
- }
-
- void unusedFetcherCallback(const StatusWith<Fetcher::BatchData>& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob) {
- FAIL("should not reach here");
- }
-
- TEST_F(FetcherTest, InvalidConstruction) {
- ReplicationExecutor& executor = getExecutor();
-
- // Null executor.
- ASSERT_THROWS(Fetcher(nullptr, target, "db", findCmdObj, unusedFetcherCallback),
- UserException);
-
- // Empty database name.
- ASSERT_THROWS(Fetcher(&executor, target, "", findCmdObj, unusedFetcherCallback),
- UserException);
-
- // Empty command object.
- ASSERT_THROWS(Fetcher(&executor, target, "db", BSONObj(), unusedFetcherCallback),
- UserException);
-
- // Callback function cannot be null.
- ASSERT_THROWS(Fetcher(&executor, target, "db", findCmdObj, Fetcher::CallbackFn()),
- UserException);
- }
-
- // Command object can refer to any command that returns a cursor. This
- // includes listIndexes and listCollections.
- TEST_F(FetcherTest, NonFindCommand) {
- ReplicationExecutor& executor = getExecutor();
-
- Fetcher(&executor, target, "db", BSON("listIndexes" << "coll"), unusedFetcherCallback);
- Fetcher(&executor, target, "db", BSON("listCollections" << 1), unusedFetcherCallback);
- Fetcher(&executor, target, "db", BSON("a" << 1), unusedFetcherCallback);
- }
-
- TEST_F(FetcherTest, GetDiagnosticString) {
- Fetcher fetcher(&getExecutor(), target, "db", findCmdObj, unusedFetcherCallback);
- ASSERT_FALSE(fetcher.getDiagnosticString().empty());
- }
-
- TEST_F(FetcherTest, IsActiveAfterSchedule) {
- ASSERT_FALSE(fetcher->isActive());
- ASSERT_OK(fetcher->schedule());
- ASSERT_TRUE(fetcher->isActive());
- }
-
- TEST_F(FetcherTest, ScheduleWhenActive) {
- ASSERT_OK(fetcher->schedule());
- ASSERT_TRUE(fetcher->isActive());
- ASSERT_NOT_OK(fetcher->schedule());
- }
-
- TEST_F(FetcherTest, CancelWithoutSchedule) {
- ASSERT_FALSE(fetcher->isActive());
- fetcher->cancel();
- }
-
- TEST_F(FetcherTest, WaitWithoutSchedule) {
- ASSERT_FALSE(fetcher->isActive());
- fetcher->wait();
- }
-
- TEST_F(FetcherTest, ShutdownBeforeSchedule) {
- getExecutor().shutdown();
- ASSERT_NOT_OK(fetcher->schedule());
- ASSERT_FALSE(fetcher->isActive());
- }
-
- TEST_F(FetcherTest, ScheduleAndCancel) {
- ASSERT_OK(fetcher->schedule());
- scheduleNetworkResponse(BSON("ok" << 1));
-
- fetcher->cancel();
- finishProcessingNetworkResponse();
-
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
- }
-
- TEST_F(FetcherTest, ScheduleButShutdown) {
- ASSERT_OK(fetcher->schedule());
- scheduleNetworkResponse(BSON("ok" << 1));
-
- getExecutor().shutdown();
- // Network interface should not deliver mock response to callback.
- finishProcessingNetworkResponse();
-
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
- }
-
- TEST_F(FetcherTest, FindCommandFailed1) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(ErrorCodes::BadValue, "bad hint");
- ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
- ASSERT_EQUALS("bad hint", status.reason());
- }
-
- TEST_F(FetcherTest, FindCommandFailed2) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("ok" << 0 <<
- "errmsg" << "bad hint" <<
- "code" << int(ErrorCodes::BadValue)));
- ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
- ASSERT_EQUALS("bad hint", status.reason());
- }
-
- TEST_F(FetcherTest, CursorFieldMissing) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("ok" << 1));
- ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
- ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor' field");
- }
-
- TEST_F(FetcherTest, CursorNotAnObject) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << 123 << "ok" << 1));
- ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
- ASSERT_STRING_CONTAINS(status.reason(), "'cursor' field must be an object");
- }
-
- TEST_F(FetcherTest, CursorIdFieldMissing) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << BSON("ns" << "db.coll" <<
- "firstBatch" << BSONArray()) <<
- "ok" << 1));
- ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
- ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.id' field");
- }
-
- TEST_F(FetcherTest, CursorIdNotLongNumber) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << BSON("id" << 123.1 <<
- "ns" << "db.coll" <<
- "firstBatch" << BSONArray()) <<
- "ok" << 1));
- ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
- ASSERT_STRING_CONTAINS(status.reason(),
- "'cursor.id' field must be");
- ASSERT_EQ((int)Fetcher::NextAction::kInvalid, (int)nextAction);
- }
-
- TEST_F(FetcherTest, NamespaceFieldMissing) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << BSON("id" << 123LL <<
- "firstBatch" << BSONArray()) <<
- "ok" << 1));
- ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
- ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.ns' field");
- }
-
- TEST_F(FetcherTest, NamespaceNotAString) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << BSON("id" << 123LL <<
- "ns" << 123 <<
- "firstBatch" << BSONArray()) <<
- "ok" << 1));
- ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
- ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' field must be a string");
- }
-
- TEST_F(FetcherTest, NamespaceEmpty) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << BSON("id" << 123LL <<
- "ns" << "" <<
- "firstBatch" << BSONArray()) <<
- "ok" << 1));
- ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
- ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' contains an invalid namespace");
- }
-
- TEST_F(FetcherTest, NamespaceMissingCollectionName) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << BSON("id" << 123LL <<
- "ns" << "db." <<
- "firstBatch" << BSONArray()) <<
- "ok" << 1));
- ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
- ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' contains an invalid namespace");
- }
-
- TEST_F(FetcherTest, FirstBatchFieldMissing) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << BSON("id" << 0LL <<
- "ns" << "db.coll") <<
- "ok" << 1));
- ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
- ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.firstBatch' field");
- }
-
- TEST_F(FetcherTest, FirstBatchNotAnArray) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << BSON("id" << 0LL <<
- "ns" << "db.coll" <<
- "firstBatch" << 123) <<
- "ok" << 1));
- ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
- ASSERT_STRING_CONTAINS(status.reason(), "'cursor.firstBatch' field must be an array");
- }
-
- TEST_F(FetcherTest, FirstBatchArrayContainsNonObject) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << BSON("id" << 0LL <<
- "ns" << "db.coll" <<
- "firstBatch" << BSON_ARRAY(8)) <<
- "ok" << 1));
- ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
- ASSERT_STRING_CONTAINS(status.reason(), "found non-object");
- ASSERT_STRING_CONTAINS(status.reason(), "in 'cursor.firstBatch' field");
- }
-
- TEST_F(FetcherTest, FirstBatchEmptyArray) {
- ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << BSON("id" << 0LL <<
- "ns" << "db.coll" <<
- "firstBatch" << BSONArray()) <<
- "ok" << 1));
- ASSERT_OK(status);
- ASSERT_TRUE(documents.empty());
- }
-
- TEST_F(FetcherTest, FetchOneDocument) {
- ASSERT_OK(fetcher->schedule());
- const BSONObj doc = BSON("_id" << 1);
- processNetworkResponse(BSON("cursor" << BSON("id" << 0LL <<
- "ns" << "db.coll" <<
- "firstBatch" << BSON_ARRAY(doc)) <<
- "ok" << 1));
- ASSERT_OK(status);
- ASSERT_EQUALS(0, cursorId);
- ASSERT_EQUALS(1U, documents.size());
- ASSERT_EQUALS(doc, documents.front());
- }
-
- TEST_F(FetcherTest, SetNextActionToContinueWhenNextBatchIsNotAvailable) {
- ASSERT_OK(fetcher->schedule());
- const BSONObj doc = BSON("_id" << 1);
- callbackHook = [](const StatusWith<Fetcher::BatchData>& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob) {
- ASSERT_OK(fetchResult.getStatus());
- Fetcher::BatchData batchData{fetchResult.getValue()};
-
- ASSERT(nextAction);
- *nextAction = Fetcher::NextAction::kGetMore;
- ASSERT(getMoreBob);
- getMoreBob->append("getMore", batchData.cursorId);
- getMoreBob->append("collection", batchData.nss.coll());
- };
- processNetworkResponse(BSON("cursor" << BSON("id" << 0LL <<
- "ns" << "db.coll" <<
- "firstBatch" << BSON_ARRAY(doc)) <<
- "ok" << 1));
- ASSERT_OK(status);
- ASSERT_EQUALS(0, cursorId);
- ASSERT_EQUALS(1U, documents.size());
- ASSERT_EQUALS(doc, documents.front());
- }
-
- TEST_F(FetcherTest, FetchMultipleBatches) {
- ASSERT_OK(fetcher->schedule());
- const BSONObj doc = BSON("_id" << 1);
- scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
- "ns" << "db.coll" <<
- "firstBatch" << BSON_ARRAY(doc)) <<
- "ok" << 1));
- getNet()->runReadyNetworkOperations();
- ASSERT_OK(status);
- ASSERT_EQUALS(1U, documents.size());
- ASSERT_EQUALS(doc, documents.front());
- ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
-
- ASSERT_TRUE(getNet()->hasReadyRequests());
- const BSONObj doc2 = BSON("_id" << 2);
- scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
- "ns" << "db.coll" <<
- "nextBatch" << BSON_ARRAY(doc2)) <<
- "ok" << 1));
- getNet()->runReadyNetworkOperations();
- ASSERT_OK(status);
- ASSERT_EQUALS(1U, documents.size());
- ASSERT_EQUALS(doc2, documents.front());
- ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
-
- ASSERT_TRUE(getNet()->hasReadyRequests());
- const BSONObj doc3 = BSON("_id" << 3);
- scheduleNetworkResponse(BSON("cursor" << BSON("id" << 0LL <<
- "ns" << "db.coll" <<
- "nextBatch" << BSON_ARRAY(doc3)) <<
- "ok" << 1));
- getNet()->runReadyNetworkOperations();
- ASSERT_OK(status);
- ASSERT_EQUALS(1U, documents.size());
- ASSERT_EQUALS(doc3, documents.front());
- ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction);
- ASSERT_FALSE(fetcher->isActive());
-
- ASSERT_FALSE(getNet()->hasReadyRequests());
- }
-
- TEST_F(FetcherTest, ScheduleGetMoreAndCancel) {
- ASSERT_OK(fetcher->schedule());
- const BSONObj doc = BSON("_id" << 1);
- scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
- "ns" << "db.coll" <<
- "firstBatch" << BSON_ARRAY(doc)) <<
- "ok" << 1));
- getNet()->runReadyNetworkOperations();
- ASSERT_OK(status);
- ASSERT_EQUALS(1U, documents.size());
- ASSERT_EQUALS(doc, documents.front());
- ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
-
- ASSERT_TRUE(getNet()->hasReadyRequests());
- const BSONObj doc2 = BSON("_id" << 2);
- scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
- "ns" << "db.coll" <<
- "nextBatch" << BSON_ARRAY(doc2)) <<
- "ok" << 1));
- getNet()->runReadyNetworkOperations();
- ASSERT_OK(status);
- ASSERT_EQUALS(1U, documents.size());
- ASSERT_EQUALS(doc2, documents.front());
- ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
-
- fetcher->cancel();
- finishProcessingNetworkResponse();
- ASSERT_NOT_OK(status);
- }
-
- TEST_F(FetcherTest, ScheduleGetMoreButShutdown) {
- ASSERT_OK(fetcher->schedule());
- const BSONObj doc = BSON("_id" << 1);
- scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
- "ns" << "db.coll" <<
- "firstBatch" << BSON_ARRAY(doc)) <<
- "ok" << 1));
- getNet()->runReadyNetworkOperations();
- ASSERT_OK(status);
- ASSERT_EQUALS(1U, documents.size());
- ASSERT_EQUALS(doc, documents.front());
- ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
-
- ASSERT_TRUE(getNet()->hasReadyRequests());
- const BSONObj doc2 = BSON("_id" << 2);
- scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
- "ns" << "db.coll" <<
- "nextBatch" << BSON_ARRAY(doc2)) <<
- "ok" << 1));
- getNet()->runReadyNetworkOperations();
- ASSERT_OK(status);
- ASSERT_EQUALS(1U, documents.size());
- ASSERT_EQUALS(doc2, documents.front());
- ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
-
- getExecutor().shutdown();
- finishProcessingNetworkResponse();
- ASSERT_NOT_OK(status);
- }
-
- void setNextActionToNoAction(const StatusWith<Fetcher::BatchData>& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob) {
- *nextAction = Fetcher::NextAction::kNoAction;
- }
-
- TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) {
- ASSERT_OK(fetcher->schedule());
- const BSONObj doc = BSON("_id" << 1);
- scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
- "ns" << "db.coll" <<
- "firstBatch" << BSON_ARRAY(doc)) <<
- "ok" << 1));
- getNet()->runReadyNetworkOperations();
- ASSERT_OK(status);
- ASSERT_EQUALS(1U, documents.size());
- ASSERT_EQUALS(doc, documents.front());
- ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
-
- ASSERT_TRUE(getNet()->hasReadyRequests());
- const BSONObj doc2 = BSON("_id" << 2);
- scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
- "ns" << "db.coll" <<
- "nextBatch" << BSON_ARRAY(doc2)) <<
- "ok" << 1));
-
- callbackHook = setNextActionToNoAction;
-
- getNet()->runReadyNetworkOperations();
- ASSERT_OK(status);
- ASSERT_EQUALS(1U, documents.size());
- ASSERT_EQUALS(doc2, documents.front());
- ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction);
- ASSERT_FALSE(fetcher->isActive());
- }
-
- /**
- * This will be invoked twice before the fetcher returns control to the replication executor.
- */
- void shutdownDuringSecondBatch(const StatusWith<Fetcher::BatchData>& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob,
- const BSONObj& doc2,
- ReplicationExecutor* executor, bool* isShutdownCalled) {
- if (*isShutdownCalled) {
- return;
- }
-
- // First time during second batch
- ASSERT_OK(fetchResult.getStatus());
- Fetcher::BatchData batchData{fetchResult.getValue()};
- ASSERT_EQUALS(1U, batchData.documents.size());
- ASSERT_EQUALS(doc2, batchData.documents.front());
- ASSERT_TRUE(Fetcher::NextAction::kGetMore == *nextAction);
- ASSERT(getMoreBob);
- getMoreBob->append("getMore", batchData.cursorId);
- getMoreBob->append("collection", batchData.nss.coll());
-
- executor->shutdown();
- *isShutdownCalled = true;
- }
-
- TEST_F(FetcherTest, ShutdownDuringSecondBatch) {
- ASSERT_OK(fetcher->schedule());
- const BSONObj doc = BSON("_id" << 1);
- scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
- "ns" << "db.coll" <<
- "firstBatch" << BSON_ARRAY(doc)) <<
- "ok" << 1));
- getNet()->runReadyNetworkOperations();
- ASSERT_OK(status);
- ASSERT_EQUALS(1U, documents.size());
- ASSERT_EQUALS(doc, documents.front());
- ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
-
- ASSERT_TRUE(getNet()->hasReadyRequests());
- const BSONObj doc2 = BSON("_id" << 2);
- scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
- "ns" << "db.coll" <<
- "nextBatch" << BSON_ARRAY(doc2)) <<
- "ok" << 1));
-
- bool isShutdownCalled = false;
- callbackHook = stdx::bind(shutdownDuringSecondBatch,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- stdx::placeholders::_3,
- doc2,
- &getExecutor(), &isShutdownCalled);
-
- getNet()->runReadyNetworkOperations();
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
- ASSERT_FALSE(fetcher->isActive());
- }
-
-} // namespace