summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@mongodb.com>2017-07-21 13:13:18 -0400
committerJason Chan <jason.chan@mongodb.com>2017-08-11 16:25:04 -0400
commit0d3137df3879e86d92904309e968f25529904639 (patch)
tree4c235dea6963452a90b69afbe1cc9d9b19464aa7
parent9ab84fb795502c2a362fe74e4d438a952433d41a (diff)
downloadmongo-0d3137df3879e86d92904309e968f25529904639.tar.gz
SERVER-29617 replace fetcher with ARM and add numCursors server parameter
-rw-r--r--jstests/replsets/initial_sync_capped_index.js5
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp5
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.h2
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp417
-rw-r--r--src/mongo/db/repl/collection_cloner.h92
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp894
-rw-r--r--src/mongo/db/repl/database_cloner.cpp6
-rw-r--r--src/mongo/db/s/sharding_task_executor.cpp10
-rw-r--r--src/mongo/s/query/async_results_merger.cpp14
9 files changed, 1255 insertions, 190 deletions
diff --git a/jstests/replsets/initial_sync_capped_index.js b/jstests/replsets/initial_sync_capped_index.js
index 0290e14fdef..2100b6f14bc 100644
--- a/jstests/replsets/initial_sync_capped_index.js
+++ b/jstests/replsets/initial_sync_capped_index.js
@@ -84,7 +84,7 @@
replTest.add();
var secondary = replTest.getSecondary();
- var collectionClonerFailPoint = "initialSyncHangCollectionClonerAfterInitialFind";
+ var collectionClonerFailPoint = "initialSyncHangCollectionClonerAfterHandlingBatchResponse";
// Make the collection cloner pause after its initial 'find' response on the capped collection.
var nss = dbName + "." + cappedCollName;
@@ -98,7 +98,8 @@
jsTestLog("Waiting for the initial 'find' response of capped collection cloner to complete.");
checkLog.contains(
- secondary, "initialSyncHangCollectionClonerAfterInitialFind fail point enabled for " + nss);
+ secondary,
+ "initialSyncHangCollectionClonerAfterHandlingBatchResponse fail point enabled for " + nss);
// Append documents to the capped collection so that the SECONDARY will clone these
// additional documents.
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index e1a8131fb10..fa418d338fb 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -76,6 +76,11 @@ BSONObj BaseClonerTest::createCursorResponse(CursorId cursorId, const BSONArray&
}
// static
+BSONObj BaseClonerTest::createFinalCursorResponse(const BSONArray& docs) {
+ return createCursorResponse(0, docs, "nextBatch");
+}
+
+// static
BSONObj BaseClonerTest::createListCollectionsResponse(CursorId cursorId,
const BSONArray& colls,
const char* fieldName) {
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h
index fb8ecf22861..e5615806fc6 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.h
+++ b/src/mongo/db/repl/base_cloner_test_fixture.h
@@ -76,6 +76,8 @@ public:
static BSONObj createCursorResponse(CursorId cursorId, const BSONArray& docs);
+ static BSONObj createFinalCursorResponse(const BSONArray& docs);
+
/**
* Creates a listCollections response with given array of index specs.
*/
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 13b27d0ed01..98d658b49f9 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -74,9 +74,9 @@ MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncCollectionFindAttempts, int, 3);
// collection 'namespace'.
MONGO_FP_DECLARE(initialSyncHangDuringCollectionClone);
-// Failpoint which causes initial sync to hang after the initial 'find' command of collection
-// cloning, for a specific collection.
-MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterInitialFind);
+// Failpoint which causes initial sync to hang after handling the next batch of results from the
+// 'AsyncResultsMerger' for a specific collection.
+MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterHandlingBatchResponse);
CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
OldThreadPool* dbWorkThreadPool,
@@ -85,7 +85,8 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
const CollectionOptions& options,
const CallbackFn& onCompletion,
StorageInterface* storageInterface,
- const int batchSize)
+ const int batchSize,
+ const int maxNumClonerCursors)
: _executor(executor),
_dbWorkThreadPool(dbWorkThreadPool),
_source(source),
@@ -122,7 +123,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
executor::RemoteCommandRequest::kNoTimeout,
RemoteCommandRetryScheduler::kAllRetriableErrors)),
_indexSpecs(),
- _documents(),
+ _documentsToInsert(),
_dbWorkTaskRunner(_dbWorkThreadPool),
_scheduleDbWorkFn([this](const executor::TaskExecutor::CallbackFn& work) {
auto task = [work](OperationContext* opCtx,
@@ -138,9 +139,8 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
kProgressMeterCheckInterval,
"documents copied",
str::stream() << _sourceNss.toString() << " collection clone progress"),
- _batchSize(batchSize),
- _arm(executor,
- stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator()).get()) {
+ _collectionCloningBatchSize(batchSize),
+ _maxNumClonerCursors(maxNumClonerCursors) {
// Fetcher throws an exception on null executor.
invariant(executor);
uassert(ErrorCodes::BadValue,
@@ -215,15 +215,19 @@ void CollectionCloner::shutdown() {
// Nothing to do if we are already in ShuttingDown or Complete state.
return;
}
-
_cancelRemainingWork_inlock();
}
void CollectionCloner::_cancelRemainingWork_inlock() {
+ if (_arm) {
+ Client::initThreadIfNotAlready();
+ auto opCtx = cc().getOperationContext();
+ _killArmHandle = _arm->kill(opCtx);
+ }
_countScheduler.shutdown();
_listIndexesFetcher.shutdown();
- if (_findFetcher) {
- _findFetcher->shutdown();
+ if (_establishCollectionCursorsScheduler) {
+ _establishCollectionCursorsScheduler->shutdown();
}
_dbWorkTaskRunner.cancel();
}
@@ -235,6 +239,9 @@ CollectionCloner::Stats CollectionCloner::getStats() const {
void CollectionCloner::join() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (_killArmHandle) {
+ _executor->waitForEvent(_killArmHandle);
+ }
_condition.wait(lk, [this]() { return !_isActive_inlock(); });
}
@@ -250,6 +257,11 @@ void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& sched
_scheduleDbWorkFn = scheduleDbWorkFn;
}
+std::vector<BSONObj> CollectionCloner::getDocumentsToInsert_forTest() {
+ LockGuard lk(_mutex);
+ return _documentsToInsert;
+}
+
void CollectionCloner::_countCallback(
const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
@@ -391,80 +403,181 @@ void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus&
}
}
-void CollectionCloner::_findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- if (!fetchResult.isOK()) {
- // Wait for active inserts to complete.
- waitForDbWorker();
+void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) {
+ if (!cbd.status.isOK()) {
+ _finishCallback(cbd.status);
+ return;
+ }
+ if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) {
+ warning()
+ << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:"
+ << this->_sourceNss;
+ }
- Status newStatus{fetchResult.getStatus().code(),
- str::stream() << "While querying collection '" << _sourceNss.ns()
- << "' there was an error '"
- << fetchResult.getStatus().reason()
- << "'"};
+ auto collectionBulkLoader = _storageInterface->createCollectionForBulkLoading(
+ _destNss, _options, _idIndexSpec, _indexSpecs);
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, newStatus);
+ if (!collectionBulkLoader.isOK()) {
+ _finishCallback(collectionBulkLoader.getStatus());
return;
}
- auto batchData(fetchResult.getValue());
- bool lastBatch = *nextAction == Fetcher::NextAction::kNoAction;
- if (batchData.documents.size() > 0) {
- LockGuard lk(_mutex);
- _documents.insert(_documents.end(), batchData.documents.begin(), batchData.documents.end());
- } else if (!batchData.first) {
- warning() << "No documents returned in batch; ns: " << _sourceNss
- << ", cursorId:" << batchData.cursorId << ", isLastBatch:" << lastBatch;
+ _stats.indexes = _indexSpecs.size();
+ if (!_idIndexSpec.isEmpty()) {
+ ++_stats.indexes;
}
- auto&& scheduleResult =
- _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback,
- this,
- stdx::placeholders::_1,
- lastBatch,
- onCompletionGuard));
- if (!scheduleResult.isOK()) {
- Status newStatus{scheduleResult.getStatus().code(),
- str::stream() << "While cloning collection '" << _sourceNss.ns()
- << "' there was an error '"
- << scheduleResult.getStatus().reason()
- << "'"};
+ _collLoader = std::move(collectionBulkLoader.getValue());
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, newStatus);
+ BSONObjBuilder cmdObj;
+ EstablishCursorsCommand cursorCommand;
+ // The 'find' command is used when the number of cloning cursors is 1 to ensure
+ // the correctness of the collection cloning process until 'parallelCollectionScan'
+ // can be tested more extensively in context of initial sync.
+ if (_maxNumClonerCursors == 1) {
+ cmdObj.append("find", _sourceNss.coll());
+ cmdObj.append("noCursorTimeout", true);
+ // Set batchSize to be 0 to establish the cursor without fetching any documents,
+ // similar to the response format of 'parallelCollectionScan'.
+ cmdObj.append("batchSize", 0);
+ cursorCommand = Find;
+ } else {
+ cmdObj.append("parallelCollectionScan", _sourceNss.coll());
+ cmdObj.append("numCursors", _maxNumClonerCursors);
+ cursorCommand = ParallelCollScan;
+ }
+
+ Client::initThreadIfNotAlready();
+ auto opCtx = cc().getOperationContext();
+
+ _establishCollectionCursorsScheduler = stdx::make_unique<RemoteCommandRetryScheduler>(
+ _executor,
+ RemoteCommandRequest(_source,
+ _sourceNss.db().toString(),
+ cmdObj.obj(),
+ ReadPreferenceSetting::secondaryPreferredMetadata(),
+ opCtx,
+ RemoteCommandRequest::kNoTimeout),
+ stdx::bind(&CollectionCloner::_establishCollectionCursorsCallback,
+ this,
+ stdx::placeholders::_1,
+ cursorCommand),
+ RemoteCommandRetryScheduler::makeRetryPolicy(
+ numInitialSyncCollectionFindAttempts.load(),
+ executor::RemoteCommandRequest::kNoTimeout,
+ RemoteCommandRetryScheduler::kAllRetriableErrors));
+ auto scheduleStatus = _establishCollectionCursorsScheduler->startup();
+ LOG(1) << "Attempting to establish cursors with maxNumClonerCursors: " << _maxNumClonerCursors;
+
+ if (!scheduleStatus.isOK()) {
+ _establishCollectionCursorsScheduler.reset();
+ _finishCallback(scheduleStatus);
return;
}
+}
- MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterInitialFind, nssData) {
- const BSONObj& data = nssData.getData();
- auto nss = data["nss"].str();
- // Only hang when cloning the specified collection.
- if (_destNss.toString() == nss) {
- while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterInitialFind) &&
- !_isShuttingDown()) {
- log() << "initialSyncHangCollectionClonerAfterInitialFind fail point enabled for "
- << nss << ". Blocking until fail point is disabled.";
- mongo::sleepsecs(1);
+Status CollectionCloner::_parseCursorResponse(BSONObj response,
+ std::vector<CursorResponse>* cursors,
+ EstablishCursorsCommand cursorCommand) {
+ switch (cursorCommand) {
+ case Find: {
+ StatusWith<CursorResponse> findResponse = CursorResponse::parseFromBSON(response);
+ if (!findResponse.isOK()) {
+ Status errorStatus{findResponse.getStatus().code(),
+ str::stream()
+ << "While parsing the 'find' query against collection '"
+ << _sourceNss.ns()
+ << "' there was an error '"
+ << findResponse.getStatus().reason()
+ << "'"};
+ return errorStatus;
}
+ cursors->push_back(std::move(findResponse.getValue()));
+ break;
+ }
+ case ParallelCollScan: {
+ auto cursorElements = _parseParallelCollectionScanResponse(response);
+ if (!cursorElements.isOK()) {
+ return cursorElements.getStatus();
+ }
+ std::vector<BSONElement> cursorsArray;
+ cursorsArray = cursorElements.getValue();
+ // Parse each BSONElement into a 'CursorResponse' object.
+ for (BSONElement cursor : cursorsArray) {
+ if (!cursor.isABSONObj()) {
+ Status errorStatus(
+ ErrorCodes::FailedToParse,
+ "The 'cursor' field in the list of cursor responses is not a "
+ "valid BSON Object");
+ return errorStatus;
+ }
+ const BSONObj cursorObj = cursor.Obj().getOwned();
+ StatusWith<CursorResponse> parallelCollScanResponse =
+ CursorResponse::parseFromBSON(cursorObj);
+ if (!parallelCollScanResponse.isOK()) {
+ return parallelCollScanResponse.getStatus();
+ }
+ cursors->push_back(std::move(parallelCollScanResponse.getValue()));
+ }
+ break;
+ }
+ default: {
+ Status errorStatus(
+ ErrorCodes::FailedToParse,
+ "The command used to establish the collection cloner cursors is not valid.");
+ return errorStatus;
}
}
+ return Status::OK();
+}
- if (!lastBatch) {
- invariant(getMoreBob);
- getMoreBob->append("getMore", batchData.cursorId);
- getMoreBob->append("collection", batchData.nss.coll());
- getMoreBob->append("batchSize", _batchSize);
+void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd,
+ EstablishCursorsCommand cursorCommand) {
+ if (_state == State::kShuttingDown) {
+ Status shuttingDownStatus{ErrorCodes::CallbackCanceled, "Cloner shutting down."};
+ _finishCallback(shuttingDownStatus);
+ return;
+ }
+ auto response = rcbd.response;
+ if (!response.isOK()) {
+ _finishCallback(response.status);
+ return;
+ }
+ Status commandStatus = getStatusFromCommandResult(response.data);
+ if (!commandStatus.isOK()) {
+ Status newStatus{commandStatus.code(),
+ str::stream() << "While querying collection '" << _sourceNss.ns()
+ << "' there was an error '"
+ << commandStatus.reason()
+ << "'"};
+ _finishCallback(commandStatus);
+ return;
}
-}
-void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) {
- if (!cbd.status.isOK()) {
- _finishCallback(cbd.status);
+ std::vector<CursorResponse> cursorResponses;
+ Status parseResponseStatus =
+ _parseCursorResponse(response.data, &cursorResponses, cursorCommand);
+ if (!parseResponseStatus.isOK()) {
+ _finishCallback(parseResponseStatus);
return;
}
+ LOG(1) << "Collection cloner running with " << cursorResponses.size()
+ << " cursors established.";
+
+ // Initialize the 'AsyncResultsMerger'(ARM).
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
+ for (auto&& cursorResponse : cursorResponses) {
+ // A placeholder 'ShardId' is used until the ARM is made less sharding specific.
+ remoteCursors.emplace_back(
+ ShardId("CollectionClonerSyncSource"), _source, std::move(cursorResponse));
+ }
+
+ // An empty list of authenticated users is passed into the cluster parameters
+ // as user information is not used in the ARM in context of collection cloning.
+ _clusterClientCursorParams =
+ stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator());
+ _clusterClientCursorParams->remotes = std::move(remoteCursors);
+ _arm = stdx::make_unique<AsyncResultsMerger>(_executor, _clusterClientCursorParams.get());
// This completion guard invokes _finishCallback on destruction.
auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
@@ -476,50 +589,134 @@ void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::Ca
// that will cause the destructor of the completion guard to run, the destructor must be run
// outside the mutex. This is a necessary condition to invoke _finishCallback.
stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) {
- warning()
- << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:"
- << this->_sourceNss;
+ Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
+ if (!scheduleStatus.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
+ return;
}
+}
- auto status = _storageInterface->createCollectionForBulkLoading(
- _destNss, _options, _idIndexSpec, _indexSpecs);
+StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionScanResponse(
+ BSONObj resp) {
+ if (!resp.hasField("cursors")) {
+ return Status(ErrorCodes::CursorNotFound,
+ "The 'parallelCollectionScan' response does not contain a 'cursors' field.");
+ }
+ BSONElement response = resp["cursors"];
+ if (response.type() == BSONType::Array) {
+ return response.Array();
+ } else {
+ return Status(
+ ErrorCodes::FailedToParse,
+ "The 'parallelCollectionScan' response is unable to be transformed into an array.");
+ }
+}
- if (!status.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status.getStatus());
+Status CollectionCloner::_bufferNextBatchFromArm() {
+ while (_arm->ready()) {
+ auto armResultStatus = _arm->nextReady();
+ if (!armResultStatus.getStatus().isOK()) {
+ return armResultStatus.getStatus();
+ }
+ if (armResultStatus.getValue().isEOF()) {
+ // We have reached the end of the batch.
+ break;
+ } else {
+ auto queryResult = armResultStatus.getValue().getResult();
+ _documentsToInsert.push_back(std::move(*queryResult));
+ }
+ }
+
+ return Status::OK();
+}
+
+Status CollectionCloner::_scheduleNextARMResultsCallback(
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ Client::initThreadIfNotAlready();
+ auto opCtx = cc().getOperationContext();
+ auto nextEvent = _arm->nextEvent(opCtx);
+ if (!nextEvent.isOK()) {
+ return nextEvent.getStatus();
+ }
+ auto event = nextEvent.getValue();
+ auto handleARMResultsOnNextEvent =
+ _executor->onEvent(event,
+ stdx::bind(&CollectionCloner::_handleARMResultsCallback,
+ this,
+ stdx::placeholders::_1,
+ onCompletionGuard));
+ return handleARMResultsOnNextEvent.getStatus();
+}
+
+void CollectionCloner::_handleARMResultsCallback(
+ const executor::TaskExecutor::CallbackArgs& cbd,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ auto setResultAndCancelRemainingWork = [this](std::shared_ptr<OnCompletionGuard> guard,
+ Status status) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ guard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ };
+
+ if (!cbd.status.isOK()) {
+ // Wait for active inserts to complete.
+ waitForDbWorker();
+ Status newStatus{cbd.status.code(),
+ str::stream() << "While querying collection '" << _sourceNss.ns()
+ << "' there was an error '"
+ << cbd.status.reason()
+ << "'"};
+ setResultAndCancelRemainingWork(onCompletionGuard, cbd.status);
return;
}
- _stats.indexes = _indexSpecs.size();
- if (!_idIndexSpec.isEmpty()) {
- ++_stats.indexes;
+ // Pull the documents from the ARM into a buffer until the entire batch has been processed.
+ auto nextBatchStatus = _bufferNextBatchFromArm();
+ if (!nextBatchStatus.isOK()) {
+ setResultAndCancelRemainingWork(onCompletionGuard, nextBatchStatus);
+ return;
}
- _collLoader = std::move(status.getValue());
+ bool lastBatch = _arm->remotesExhausted();
+ auto&& scheduleResult =
+ _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback,
+ this,
+ stdx::placeholders::_1,
+ lastBatch,
+ onCompletionGuard));
+ if (!scheduleResult.isOK()) {
+ Status newStatus{scheduleResult.getStatus().code(),
+ str::stream() << "While cloning collection '" << _sourceNss.ns()
+ << "' there was an error '"
+ << scheduleResult.getStatus().reason()
+ << "'"};
+ setResultAndCancelRemainingWork(onCompletionGuard, scheduleResult.getStatus());
+ return;
+ }
- _findFetcher = stdx::make_unique<Fetcher>(
- _executor,
- _source,
- _sourceNss.db().toString(),
- BSON("find" << _sourceNss.coll() << "noCursorTimeout" << true << "batchSize" << _batchSize),
- stdx::bind(&CollectionCloner::_findCallback,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- stdx::placeholders::_3,
- onCompletionGuard),
- ReadPreferenceSetting::secondaryPreferredMetadata(),
- RemoteCommandRequest::kNoTimeout,
- RemoteCommandRetryScheduler::makeRetryPolicy(
- numInitialSyncCollectionFindAttempts.load(),
- executor::RemoteCommandRequest::kNoTimeout,
- RemoteCommandRetryScheduler::kAllRetriableErrors));
+ MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterHandlingBatchResponse, nssData) {
+ const BSONObj& data = nssData.getData();
+ auto nss = data["nss"].str();
+ // Only hang when cloning the specified collection.
+ if (_destNss.toString() == nss) {
+ while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterHandlingBatchResponse) &&
+ !_isShuttingDown()) {
+ log() << "initialSyncHangCollectionClonerAfterHandlingBatchResponse fail point "
+ "enabled for "
+ << nss << ". Blocking until fail point is disabled.";
+ mongo::sleepsecs(1);
+ }
+ }
+ }
- Status scheduleStatus = _findFetcher->schedule();
- if (!scheduleStatus.isOK()) {
- _findFetcher.reset();
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
- return;
+ // If the remote cursors are not exhausted, schedule this callback again to handle
+ // the impending cursor response.
+ if (!lastBatch) {
+ Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
+ if (!scheduleStatus.isOK()) {
+ setResultAndCancelRemainingWork(onCompletionGuard, scheduleStatus);
+ return;
+ }
}
}
@@ -533,24 +730,21 @@ void CollectionCloner::_insertDocumentsCallback(
return;
}
- std::vector<BSONObj> docs;
UniqueLock lk(_mutex);
- if (_documents.size() == 0) {
+ std::vector<BSONObj> docs;
+ if (_documentsToInsert.size() == 0) {
warning() << "_insertDocumentsCallback, but no documents to insert for ns:" << _destNss;
-
if (lastBatch) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
}
return;
}
-
- _documents.swap(docs);
+ _documentsToInsert.swap(docs);
_stats.documentsCopied += docs.size();
++_stats.fetchBatches;
_progressMeter.hit(int(docs.size()));
invariant(_collLoader);
const auto status = _collLoader->insertDocuments(docs.cbegin(), docs.cend());
-
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, status);
return;
@@ -570,12 +764,10 @@ void CollectionCloner::_insertDocumentsCallback(
}
}
- if (!lastBatch) {
- return;
+ if (lastBatch) {
+ // Clean up resources once the last batch has been copied over and set the status to OK.
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
}
-
- // Done with last batch and time to set result in completion guard to Status::OK().
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
}
void CollectionCloner::_finishCallback(const Status& status) {
@@ -584,7 +776,6 @@ void CollectionCloner::_finishCallback(const Status& status) {
// Copy the status so we can change it below if needed.
auto finalStatus = status;
bool callCollectionLoader = false;
-
decltype(_onCompletion) onCompletion;
{
LockGuard lk(_mutex);
@@ -595,7 +786,6 @@ void CollectionCloner::_finishCallback(const Status& status) {
invariant(_onCompletion);
std::swap(_onCompletion, onCompletion);
}
-
if (callCollectionLoader) {
if (finalStatus.isOK()) {
const auto loaderStatus = _collLoader->commit();
@@ -609,7 +799,6 @@ void CollectionCloner::_finishCallback(const Status& status) {
// This will release the resources held by the loader.
_collLoader.reset();
}
-
onCompletion(finalStatus);
// This will release the resources held by the callback function object. '_onCompletion' is
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index af029b0f1cd..222886b9d72 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -69,6 +69,7 @@ public:
/**
* Callback completion guard for CollectionCloner.
*/
+ using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
using OnCompletionGuard = CallbackCompletionGuard<Status>;
struct Stats {
@@ -111,7 +112,8 @@ public:
const CollectionOptions& options,
const CallbackFn& onCompletion,
StorageInterface* storageInterface,
- const int batchSize);
+ const int batchSize,
+ const int maxNumClonerCursors);
virtual ~CollectionCloner();
@@ -146,6 +148,14 @@ public:
*/
void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn);
+ /**
+ * Returns the documents currently stored in the '_documents' buffer that is intended
+ * to be inserted through the collection loader.
+ *
+ * For testing only.
+ */
+ std::vector<BSONObj> getDocumentsToInsert_forTest();
+
private:
bool _isActive_inlock() const;
@@ -173,14 +183,6 @@ private:
BSONObjBuilder* getMoreBob);
/**
- * Read collection documents from find result.
- */
- void _findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard);
-
- /**
* Request storage interface to create collection.
*
* Called multiple times if there are more than one batch of responses from listIndexes
@@ -192,13 +194,59 @@ private:
void _beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& callbackData);
/**
- * Called multiple times if there are more than one batch of documents from the fetcher.
+ * The possible command types that can be used to establish the initial cursors on the
+ * remote collection.
+ */
+ enum EstablishCursorsCommand { Find, ParallelCollScan };
+
+ /**
+ * Parses the cursor responses from the 'find' or 'parallelCollectionScan' command
+ * and passes them into the 'AsyncResultsMerger'.
+ */
+ void _establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd,
+ EstablishCursorsCommand cursorCommand);
+
+ /**
+ * Parses the response from a 'parallelCollectionScan' command into a vector of cursor
+ * elements.
+ */
+ StatusWith<std::vector<BSONElement>> _parseParallelCollectionScanResponse(BSONObj resp);
+
+ /**
+ * Takes a cursors buffer and parses the 'parallelCollectionScan' response into cursor
+ * responses that are pushed onto the buffer.
+ */
+ Status _parseCursorResponse(BSONObj response,
+ std::vector<CursorResponse>* cursors,
+ EstablishCursorsCommand cursorCommand);
+
+ /**
+ * Calls to get the next event from the 'AsyncResultsMerger'. This schedules
+ * '_handleAsyncResultsCallback' to be run when the event is signaled successfully.
+ */
+ Status _scheduleNextARMResultsCallback(std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Runs for each time a new batch of documents can be retrieved from the 'AsyncResultsMerger'.
+ * Buffers the documents retrieved for insertion and schedules a '_insertDocumentsCallback'
+ * to insert the contents of the buffer.
+ */
+ void _handleARMResultsCallback(const executor::TaskExecutor::CallbackArgs& cbd,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
+ * Pull all ready results from the ARM into a buffer to be inserted.
+ */
+ Status _bufferNextBatchFromArm();
+
+ /**
+ * Called whenever there is a new batch of documents ready from the 'AsyncResultsMerger'.
* On the last batch, 'lastBatch' will be true.
*
* Each document returned will be inserted via the storage interfaceRequest storage
* interface.
*/
- void _insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& callbackData,
+ void _insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd,
bool lastBatch,
std::shared_ptr<OnCompletionGuard> onCompletionGuard);
@@ -231,18 +279,30 @@ private:
StorageInterface* _storageInterface; // (R) Not owned by us.
RemoteCommandRetryScheduler _countScheduler; // (S)
Fetcher _listIndexesFetcher; // (S)
- std::unique_ptr<Fetcher> _findFetcher; // (M)
std::vector<BSONObj> _indexSpecs; // (M)
BSONObj _idIndexSpec; // (M)
- std::vector<BSONObj> _documents; // (M) Documents read from fetcher to insert.
- TaskRunner _dbWorkTaskRunner; // (R)
+ std::vector<BSONObj>
+ _documentsToInsert; // (M) Documents read from 'AsyncResultsMerger' to insert.
+ TaskRunner _dbWorkTaskRunner; // (R)
ScheduleDbWorkFn
_scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor.
Stats _stats; // (M) stats for this instance.
ProgressMeter _progressMeter; // (M) progress meter for this instance.
- const int _batchSize;
+ const int _collectionCloningBatchSize; // (R) The size of the batches of documents returned in
+ // collection cloning.
+
+ // (R) The maximum number of cursors to use in the collection cloning process.
+ const int _maxNumClonerCursors;
+ // (M) Component responsible for fetching the documents from the collection cloner cursor(s).
+ std::unique_ptr<AsyncResultsMerger> _arm;
+ // (R) The cursor parameters used by the 'AsyncResultsMerger'.
+ std::unique_ptr<ClusterClientCursorParams> _clusterClientCursorParams;
+
+ // (M) The event handle for the 'kill' event of the 'AsyncResultsMerger'.
+ executor::TaskExecutor::EventHandle _killArmHandle;
- AsyncResultsMerger _arm;
+ // (M) Scheduler used to establish the initial cursor or set of cursors.
+ std::unique_ptr<RemoteCommandRetryScheduler> _establishCollectionCursorsScheduler;
// State transitions:
// PreStart --> Running --> ShuttingDown --> Complete
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 4fb7a7fe208..d3e8966a304 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -66,8 +66,12 @@ protected:
void setUp() override;
void tearDown() override;
- // 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use.
- const int defaultBatchSize = (16 * 1024 * 1024) / 12 * 10;
+ // A simple arbitrary value to use as the default batch size.
+ const int defaultBatchSize = 1024;
+
+ // Running initial sync with a single cursor will default to using the 'find' command until
+ // 'parallelCollectionScan' has more complete testing.
+ const int defaultNumCloningCursors = 1;
CollectionOptions options;
std::unique_ptr<CollectionCloner> collectionCloner;
@@ -87,7 +91,8 @@ void CollectionClonerTest::setUp() {
options,
stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1),
storageInterface.get(),
- defaultBatchSize);
+ defaultBatchSize,
+ defaultNumCloningCursors);
collectionStats = CollectionMockStats();
storageInterface->createCollectionForBulkFn =
[this](const NamespaceString& nss,
@@ -114,6 +119,7 @@ BaseCloner* CollectionClonerTest::getCloner() const {
return collectionCloner.get();
}
+
TEST_F(CollectionClonerTest, InvalidConstruction) {
executor::TaskExecutor& executor = getExecutor();
auto pool = dbWorkThreadPool.get();
@@ -123,29 +129,50 @@ TEST_F(CollectionClonerTest, InvalidConstruction) {
// Null executor -- error from Fetcher, not CollectionCloner.
{
StorageInterface* si = storageInterface.get();
- ASSERT_THROWS_CODE_AND_WHAT(
- CollectionCloner(nullptr, pool, target, nss, options, cb, si, defaultBatchSize),
- UserException,
- ErrorCodes::BadValue,
- "task executor cannot be null");
+ ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(nullptr,
+ pool,
+ target,
+ nss,
+ options,
+ cb,
+ si,
+ defaultBatchSize,
+ defaultNumCloningCursors),
+ UserException,
+ ErrorCodes::BadValue,
+ "task executor cannot be null");
}
// Null storage interface
- ASSERT_THROWS_CODE_AND_WHAT(
- CollectionCloner(&executor, pool, target, nss, options, cb, nullptr, defaultBatchSize),
- UserException,
- ErrorCodes::BadValue,
- "storage interface cannot be null");
+ ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(&executor,
+ pool,
+ target,
+ nss,
+ options,
+ cb,
+ nullptr,
+ defaultBatchSize,
+ defaultNumCloningCursors),
+ UserException,
+ ErrorCodes::BadValue,
+ "storage interface cannot be null");
// Invalid namespace.
{
NamespaceString badNss("db.");
StorageInterface* si = storageInterface.get();
- ASSERT_THROWS_CODE_AND_WHAT(
- CollectionCloner(&executor, pool, target, badNss, options, cb, si, defaultBatchSize),
- UserException,
- ErrorCodes::BadValue,
- "invalid collection namespace: db.");
+ ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(&executor,
+ pool,
+ target,
+ badNss,
+ options,
+ cb,
+ si,
+ defaultBatchSize,
+ defaultNumCloningCursors),
+ UserException,
+ ErrorCodes::BadValue,
+ "invalid collection namespace: db.");
}
// Invalid collection options - error from CollectionOptions::validate(), not CollectionCloner.
@@ -155,8 +182,15 @@ TEST_F(CollectionClonerTest, InvalidConstruction) {
<< "not a document");
StorageInterface* si = storageInterface.get();
ASSERT_THROWS_CODE_AND_WHAT(
- CollectionCloner(
- &executor, pool, target, nss, invalidOptions, cb, si, defaultBatchSize),
+ CollectionCloner(&executor,
+ pool,
+ target,
+ nss,
+ invalidOptions,
+ cb,
+ si,
+ defaultBatchSize,
+ defaultNumCloningCursors),
UserException,
ErrorCodes::BadValue,
"'storageEngine.storageEngine1' has to be an embedded document.");
@@ -166,11 +200,18 @@ TEST_F(CollectionClonerTest, InvalidConstruction) {
{
CollectionCloner::CallbackFn nullCb;
StorageInterface* si = storageInterface.get();
- ASSERT_THROWS_CODE_AND_WHAT(
- CollectionCloner(&executor, pool, target, nss, options, nullCb, si, defaultBatchSize),
- UserException,
- ErrorCodes::BadValue,
- "callback function cannot be null");
+ ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(&executor,
+ pool,
+ target,
+ nss,
+ options,
+ nullCb,
+ si,
+ defaultBatchSize,
+ defaultNumCloningCursors),
+ UserException,
+ ErrorCodes::BadValue,
+ "callback function cannot be null");
}
}
@@ -316,7 +357,8 @@ TEST_F(CollectionClonerTest,
options,
stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1),
storageInterface.get(),
- defaultBatchSize);
+ defaultBatchSize,
+ defaultNumCloningCursors);
ASSERT_OK(collectionCloner->startup());
@@ -339,7 +381,8 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
options,
stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1),
storageInterface.get(),
- defaultBatchSize));
+ defaultBatchSize,
+ defaultNumCloningCursors));
NamespaceString collNss;
CollectionOptions collOptions;
@@ -370,10 +413,19 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, emptyArray));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
}
collectionCloner->join();
ASSERT_EQUALS(1, collectionStats.insertCount);
@@ -699,7 +751,7 @@ TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) {
ASSERT_FALSE(net->hasReadyRequests());
}
-TEST_F(CollectionClonerTest, FindCommandFailed) {
+TEST_F(CollectionClonerTest, EstablishCursorCommandFailed) {
ASSERT_OK(collectionCloner->startup());
{
@@ -747,13 +799,15 @@ TEST_F(CollectionClonerTest, CollectionClonerResendsFindCommandOnRetriableError)
net->runReadyNetworkOperations();
ASSERT_TRUE(collectionCloner->isActive());
- // Confirm that CollectionCloner resends the find request.
+ // This check exists to ensure that the command used to establish the cursors is retried,
+ // regardless of the command format. Therefore, it shouldn't be necessary to have a separate
+ // similar test case for the 'parallelCollectionScan' command.
auto noi = net->getNextReadyRequest();
assertRemoteCommandNameEquals("find", noi->getRequest());
net->blackHole(noi);
}
-TEST_F(CollectionClonerTest, FindCommandCanceled) {
+TEST_F(CollectionClonerTest, EstablishCursorCommandCanceled) {
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
@@ -810,10 +864,19 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) {
return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, "");
});
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, emptyArray));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
}
ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
@@ -844,9 +907,18 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) {
return StatusWith<executor::TaskExecutor::CallbackHandle>(handle);
});
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, emptyArray));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(0, BSON_ARRAY(BSON("_id" << 1))));
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1))));
}
collectionCloner->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
@@ -875,9 +947,18 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
return Status(ErrorCodes::OperationFailed, "");
};
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, emptyArray));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(0, BSON_ARRAY(BSON("_id" << 1))));
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1))));
}
collectionCloner->join();
@@ -902,10 +983,19 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) {
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, emptyArray));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
}
collectionCloner->join();
@@ -933,6 +1023,15 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) {
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, emptyArray));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
@@ -950,7 +1049,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) {
const BSONObj doc2 = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc2), "nextBatch"));
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2)));
}
collectionCloner->join();
@@ -978,6 +1077,15 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) {
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, emptyArray));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
@@ -1002,10 +1110,9 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) {
ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
ASSERT_TRUE(collectionCloner->isActive());
- BSONArray emptyArray;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(0, emptyArray, "nextBatch"));
+ processNetworkResponse(createFinalCursorResponse(emptyArray));
}
collectionCloner->join();
@@ -1031,6 +1138,15 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) {
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, emptyArray));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
@@ -1043,7 +1159,6 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) {
ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
ASSERT_TRUE(collectionCloner->isActive());
- BSONArray emptyArray;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch"));
@@ -1058,7 +1173,7 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) {
const BSONObj doc2 = BSON("_id" << 2);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc2), "nextBatch"));
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2)));
}
collectionCloner->join();
@@ -1097,6 +1212,15 @@ TEST_F(CollectionClonerTest, CollectionClonerCannotBeRestartedAfterPreviousFailu
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, emptyArray));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(createCursorResponse(1, BSON_ARRAY(BSON("_id" << 1))));
@@ -1156,7 +1280,8 @@ TEST_F(CollectionClonerTest, CollectionClonerResetsOnCompletionCallbackFunctionA
result = status;
},
storageInterface.get(),
- defaultBatchSize);
+ defaultBatchSize,
+ defaultNumCloningCursors);
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
@@ -1202,7 +1327,620 @@ TEST_F(CollectionClonerTest,
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
- // At this point, the CollectionCloner has sent the find request for the collection documents.
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, emptyArray));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ // At this point, the CollectionCloner has sent the find request to establish the cursor.
+ // We want to return the first batch of documents for the collection from the network so that
+ // the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for
+ // the next batch of documents.
+
+ // Store the scheduled CollectionCloner::_insertDocuments task but do not run it yet.
+ executor::TaskExecutor::CallbackFn insertDocumentsFn;
+ collectionCloner->setScheduleDbWorkFn_forTest(
+ [&](const executor::TaskExecutor::CallbackFn& workFn) {
+ insertDocumentsFn = workFn;
+ executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>());
+ return StatusWith<executor::TaskExecutor::CallbackHandle>(handle);
+ });
+ ASSERT_FALSE(insertDocumentsFn);
+
+ // Return first batch of collection documents from remote server for the getMore request.
+ const BSONObj doc = BSON("_id" << 1);
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+
+ assertRemoteCommandNameEquals(
+ "getMore", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc))));
+ net->runReadyNetworkOperations();
+ }
+
+ // Confirm that CollectionCloner attempted to schedule _insertDocuments task.
+ ASSERT_TRUE(insertDocumentsFn);
+
+ // Return an error for the getMore request for the next batch of collection documents.
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+
+ assertRemoteCommandNameEquals(
+ "getMore",
+ net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "getMore failed")));
+ net->runReadyNetworkOperations();
+ }
+
+ // CollectionCloner should still be active because we have not finished processing the
+ // insertDocuments task.
+ ASSERT_TRUE(collectionCloner->isActive());
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
+
+ // Run the insertDocuments task. The final status of the CollectionCloner should match the first
+ // error passed to the completion guard (ie. from the failed getMore request).
+ executor::TaskExecutor::CallbackArgs callbackArgs(
+ &getExecutor(), {}, Status(ErrorCodes::CallbackCanceled, ""));
+ insertDocumentsFn(callbackArgs);
+
+ // Reset 'insertDocumentsFn' to release last reference count on completion guard.
+ insertDocumentsFn = {};
+
+ // No need to call CollectionCloner::join() because we invoked the _insertDocuments callback
+ // synchronously.
+
+ ASSERT_FALSE(collectionCloner->isActive());
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
+}
+
+class ParallelCollectionClonerTest : public BaseClonerTest {
+public:
+ BaseCloner* getCloner() const override;
+
+protected:
+ void setUp() override;
+ void tearDown() override;
+ std::vector<BSONObj> generateDocs(std::size_t numDocs);
+
+ // A simple arbitrary value to use as the default batch size.
+ const int defaultBatchSize = 1024;
+
+ // Running initial sync with a single cursor will default to using the 'find' command until
+ // 'parallelCollectionScan' has more complete testing.
+ const int defaultNumCloningCursors = 3;
+
+ CollectionOptions options;
+ std::unique_ptr<CollectionCloner> collectionCloner;
+ CollectionMockStats collectionStats; // Used by the _loader.
+ CollectionBulkLoaderMock* _loader; // Owned by CollectionCloner.
+};
+
+void ParallelCollectionClonerTest::setUp() {
+ BaseClonerTest::setUp();
+ options = {};
+ collectionCloner.reset(nullptr);
+ collectionCloner = stdx::make_unique<CollectionCloner>(
+ &getExecutor(),
+ dbWorkThreadPool.get(),
+ target,
+ nss,
+ options,
+ stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1),
+ storageInterface.get(),
+ defaultBatchSize,
+ defaultNumCloningCursors);
+ collectionStats = CollectionMockStats();
+ storageInterface->createCollectionForBulkFn =
+ [this](const NamespaceString& nss,
+ const CollectionOptions& options,
+ const BSONObj idIndexSpec,
+ const std::vector<BSONObj>& secondaryIndexSpecs) {
+ _loader = new CollectionBulkLoaderMock(&collectionStats);
+ Status initCollectionBulkLoader = _loader->init(secondaryIndexSpecs);
+ ASSERT_OK(initCollectionBulkLoader);
+
+ return StatusWith<std::unique_ptr<CollectionBulkLoader>>(
+ std::unique_ptr<CollectionBulkLoader>(_loader));
+ };
+}
+
+void ParallelCollectionClonerTest::tearDown() {
+ BaseClonerTest::tearDown();
+ // Executor may still invoke collection cloner's callback before shutting down.
+ collectionCloner.reset(nullptr);
+ options = {};
+}
+
+BaseCloner* ParallelCollectionClonerTest::getCloner() const {
+ return collectionCloner.get();
+}
+
+std::vector<BSONObj> ParallelCollectionClonerTest::generateDocs(std::size_t numDocs) {
+ std::vector<BSONObj> docs;
+ for (unsigned int i = 0; i < numDocs; i++) {
+ docs.push_back(BSON("_id" << i));
+ }
+ return docs;
+}
+
+TEST_F(ParallelCollectionClonerTest, InsertDocumentsSingleBatchWithMultipleCloningCursors) {
+ ASSERT_OK(collectionCloner->startup());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ }
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+ ASSERT_TRUE(collectionStats.initCalled);
+
+ // A single cursor response is returned because there is only a single document to insert.
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(
+ BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)) << "ok" << 1));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ auto exec = &getExecutor();
+ std::vector<BSONObj> docs;
+ // Record the buffered documents before they are inserted so we can
+ // validate them.
+ collectionCloner->setScheduleDbWorkFn_forTest(
+ [&](const executor::TaskExecutor::CallbackFn& workFn) {
+ docs = collectionCloner->getDocumentsToInsert_forTest();
+ return exec->scheduleWork(workFn);
+ });
+
+ const BSONObj doc = BSON("_id" << 1);
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
+ }
+
+ collectionCloner->join();
+
+ ASSERT_BSONOBJ_EQ(docs[0], doc);
+ ASSERT_EQUALS(1, collectionStats.insertCount);
+ ASSERT_TRUE(collectionStats.commitCalled);
+
+ ASSERT_OK(getStatus());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
+
+TEST_F(ParallelCollectionClonerTest,
+ InsertDocumentsSingleBatchOfMultipleDocumentsWithMultipleCloningCursors) {
+ ASSERT_OK(collectionCloner->startup());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ }
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+ ASSERT_TRUE(collectionStats.initCalled);
+
+ // A single cursor response is returned because there is only a single batch of documents to
+ // insert.
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(
+ BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)) << "ok" << 1));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ auto exec = &getExecutor();
+ std::vector<BSONObj> docs;
+ // Record the buffered documents before they are inserted so we can
+ // validate them.
+ collectionCloner->setScheduleDbWorkFn_forTest(
+ [&](const executor::TaskExecutor::CallbackFn& workFn) {
+ docs = collectionCloner->getDocumentsToInsert_forTest();
+ return exec->scheduleWork(workFn);
+ });
+
+ auto generatedDocs = generateDocs(3);
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createFinalCursorResponse(
+ BSON_ARRAY(generatedDocs[0] << generatedDocs[1] << generatedDocs[2])));
+ }
+
+ collectionCloner->join();
+
+ ASSERT_EQUALS(3U, docs.size());
+ for (int i = 0; i < 3; i++) {
+ ASSERT_BSONOBJ_EQ(docs[i], generatedDocs[i]);
+ }
+ ASSERT_EQUALS(3, collectionStats.insertCount);
+ ASSERT_TRUE(collectionStats.commitCalled);
+
+ ASSERT_OK(getStatus());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
+
+TEST_F(ParallelCollectionClonerTest, InsertDocumentsWithMultipleCursorsOfDifferentNumberOfBatches) {
+ ASSERT_OK(collectionCloner->startup());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ }
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+ ASSERT_TRUE(collectionStats.initCalled);
+
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
+ << createCursorResponse(2, emptyArray)
+ << createCursorResponse(3, emptyArray))
+ << "ok"
+ << 1));
+ }
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ auto exec = &getExecutor();
+ std::vector<BSONObj> docs;
+
+ // Record the buffered documents before they are inserted so we can
+ // validate them.
+ collectionCloner->setScheduleDbWorkFn_forTest(
+ [&](const executor::TaskExecutor::CallbackFn& workFn) {
+ auto buffered = collectionCloner->getDocumentsToInsert_forTest();
+ docs.insert(docs.end(), buffered.begin(), buffered.end());
+ return exec->scheduleWork(workFn);
+ });
+
+ int numDocs = 9;
+ std::vector<BSONObj> generatedDocs = generateDocs(numDocs);
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]), "nextBatch"));
+ processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]), "nextBatch"));
+ processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]), "nextBatch"));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(3U, docs.size());
+ for (int i = 0; i < 3; i++) {
+ ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
+ }
+ ASSERT_EQUALS(3, collectionStats.insertCount);
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[3]), "nextBatch"));
+ processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[4]), "nextBatch"));
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5])));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(6U, docs.size());
+ for (int i = 3; i < 6; i++) {
+ ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
+ }
+ ASSERT_EQUALS(6, collectionStats.insertCount);
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[6]), "nextBatch"));
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[7])));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(8U, docs.size());
+ for (int i = 6; i < 8; i++) {
+ ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
+ }
+ ASSERT_EQUALS(8, collectionStats.insertCount);
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[8])));
+ }
+
+ collectionCloner->join();
+ ASSERT_EQUALS(9U, docs.size());
+ ASSERT_BSONOBJ_EQ(generatedDocs[8], docs[8]);
+ ASSERT_EQUALS(numDocs, collectionStats.insertCount);
+ ASSERT_TRUE(collectionStats.commitCalled);
+
+ ASSERT_OK(getStatus());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
+
+TEST_F(ParallelCollectionClonerTest, MiddleBatchContainsNoDocumentsWithMultipleCursors) {
+ ASSERT_OK(collectionCloner->startup());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ }
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+ ASSERT_TRUE(collectionStats.initCalled);
+
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
+ << createCursorResponse(2, emptyArray)
+ << createCursorResponse(3, emptyArray))
+ << "ok"
+ << 1));
+ }
+ collectionCloner->waitForDbWorker();
+
+ auto exec = &getExecutor();
+ std::vector<BSONObj> docs;
+ // Record the buffered documents before they are inserted so we can
+ // validate them.
+ collectionCloner->setScheduleDbWorkFn_forTest(
+ [&](const executor::TaskExecutor::CallbackFn& workFn) {
+ auto buffered = collectionCloner->getDocumentsToInsert_forTest();
+ docs.insert(docs.end(), buffered.begin(), buffered.end());
+ return exec->scheduleWork(workFn);
+ });
+
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ int numDocs = 6;
+ std::vector<BSONObj> generatedDocs = generateDocs(numDocs);
+ const BSONObj doc = BSON("_id" << 1);
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0])));
+ processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1])));
+ processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2])));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(3U, docs.size());
+ for (int i = 0; i < 3; i++) {
+ ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
+ }
+ ASSERT_EQUALS(3, collectionStats.insertCount);
+
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch"));
+ processNetworkResponse(createCursorResponse(2, emptyArray, "nextBatch"));
+ processNetworkResponse(createCursorResponse(3, emptyArray, "nextBatch"));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(3, collectionStats.insertCount);
+
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[3])));
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[4])));
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5])));
+ }
+
+ collectionCloner->join();
+
+ ASSERT_EQUALS(6U, docs.size());
+ for (int i = 3; i < 6; i++) {
+ ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
+ }
+
+ ASSERT_EQUALS(numDocs, collectionStats.insertCount);
+ ASSERT_TRUE(collectionStats.commitCalled);
+
+ ASSERT_OK(getStatus());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
+
+TEST_F(ParallelCollectionClonerTest, LastBatchContainsNoDocumentsWithMultipleCursors) {
+ ASSERT_OK(collectionCloner->startup());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ }
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+ ASSERT_TRUE(collectionStats.initCalled);
+
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
+ << createCursorResponse(2, emptyArray)
+ << createCursorResponse(3, emptyArray))
+ << "ok"
+ << 1));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ auto exec = &getExecutor();
+ std::vector<BSONObj> docs;
+ // Record the buffered documents before they are inserted so we can
+ // validate them.
+ collectionCloner->setScheduleDbWorkFn_forTest(
+ [&](const executor::TaskExecutor::CallbackFn& workFn) {
+ auto buffered = collectionCloner->getDocumentsToInsert_forTest();
+ docs.insert(docs.end(), buffered.begin(), buffered.end());
+ return exec->scheduleWork(workFn);
+ });
+
+ int numDocs = 6;
+ std::vector<BSONObj> generatedDocs = generateDocs(numDocs);
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]), "nextBatch"));
+ processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]), "nextBatch"));
+ processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]), "nextBatch"));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(3U, docs.size());
+ for (int i = 0; i < 3; i++) {
+ ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
+ }
+ ASSERT_EQUALS(3, collectionStats.insertCount);
+
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[3]), "nextBatch"));
+ processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[4]), "nextBatch"));
+ processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[5]), "nextBatch"));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(6U, docs.size());
+ for (int i = 3; i < 6; i++) {
+ ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
+ }
+ ASSERT_EQUALS(numDocs, collectionStats.insertCount);
+
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createFinalCursorResponse(emptyArray));
+ processNetworkResponse(createFinalCursorResponse(emptyArray));
+ processNetworkResponse(createFinalCursorResponse(emptyArray));
+ }
+
+ collectionCloner->join();
+ ASSERT_EQUALS(6, collectionStats.insertCount);
+ ASSERT_TRUE(collectionStats.commitCalled);
+
+ ASSERT_OK(getStatus());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
+
+TEST_F(ParallelCollectionClonerTest, InsertDocumentsScheduleDbWorkFailedWithMultipleCursors) {
+ ASSERT_OK(collectionCloner->startup());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ }
+
+ collectionCloner->waitForDbWorker();
+
+ // Replace scheduleDbWork function so that cloner will fail to schedule DB work after
+ // getting documents.
+ collectionCloner->setScheduleDbWorkFn_forTest(
+ [](const executor::TaskExecutor::CallbackFn& workFn) {
+ return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, "");
+ });
+
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
+ << createCursorResponse(2, emptyArray)
+ << createCursorResponse(3, emptyArray))
+ << "ok"
+ << 1));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ const BSONObj doc = BSON("_id" << 1);
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
+ }
+
+ ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
+
+TEST_F(ParallelCollectionClonerTest,
+ CollectionClonerWaitsForPendingTasksToCompleteBeforeInvokingOnCompletionCallback) {
+ ASSERT_OK(collectionCloner->startup());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+
+ assertRemoteCommandNameEquals("count",
+ net->scheduleSuccessfulResponse(createCountResponse(0)));
+ net->runReadyNetworkOperations();
+
+ assertRemoteCommandNameEquals(
+ "listIndexes",
+ net->scheduleSuccessfulResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))));
+ net->runReadyNetworkOperations();
+ }
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+ ASSERT_TRUE(collectionStats.initCalled);
+
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
+ << createCursorResponse(2, emptyArray)
+ << createCursorResponse(3, emptyArray))
+ << "ok"
+ << 1));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ // At this point, the CollectionCloner has sent the find request to establish the cursor.
// We want to return the first batch of documents for the collection from the network so that
// the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for
// the next batch of documents.
@@ -1217,13 +1955,13 @@ TEST_F(CollectionClonerTest,
});
ASSERT_FALSE(insertDocumentsFn);
- // Return first batch of collection documents from remote server for the find request.
+ // Return first batch of collection documents from remote server for the getMore request.
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
assertRemoteCommandNameEquals(
- "find", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc))));
+ "getMore", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc))));
net->runReadyNetworkOperations();
}
@@ -1261,4 +1999,72 @@ TEST_F(CollectionClonerTest,
ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
}
+TEST_F(ParallelCollectionClonerTest, CollectionClonerCannotBeRestartedAfterPreviousFailure) {
+ // First cloning attempt - fails while reading documents from source collection.
+ unittest::log() << "Starting first collection cloning attempt";
+ ASSERT_OK(collectionCloner->startup());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ }
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+ ASSERT_TRUE(collectionStats.initCalled);
+
+ BSONArray emptyArray;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
+ << createCursorResponse(2, emptyArray)
+ << createCursorResponse(3, emptyArray))
+ << "ok"
+ << 1));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCloner->isActive());
+ int numDocs = 5;
+ std::vector<BSONObj> docs = generateDocs(numDocs);
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCursorResponse(1, BSON_ARRAY(docs[0])));
+ processNetworkResponse(createCursorResponse(2, BSON_ARRAY(docs[1])));
+ processNetworkResponse(createCursorResponse(3, BSON_ARRAY(docs[2])));
+ }
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(3, collectionStats.insertCount);
+
+ // Check that the status hasn't changed from the initial value.
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(docs[3])));
+ processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(docs[4])));
+ processNetworkResponse(ErrorCodes::OperationFailed,
+ "failed to read remaining documents from source collection");
+ }
+
+ collectionCloner->join();
+ ASSERT_EQUALS(numDocs, collectionStats.insertCount);
+
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
+ ASSERT_FALSE(collectionCloner->isActive());
+
+ // Second cloning attempt - run to completion.
+ unittest::log() << "Starting second collection cloning attempt - startup() should fail";
+ collectionStats = CollectionMockStats();
+ setStatus(getDetectableErrorStatus());
+
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, collectionCloner->startup());
+}
+
} // namespace
diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp
index 0ebaf66e098..bc7ea109918 100644
--- a/src/mongo/db/repl/database_cloner.cpp
+++ b/src/mongo/db/repl/database_cloner.cpp
@@ -68,6 +68,9 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(collectionClonerBatchSize, int, defaultBat
// The number of attempts for the listCollections commands.
MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListCollectionsAttempts, int, 3);
+// The number of cursors to use in the collection cloning process.
+MONGO_EXPORT_SERVER_PARAMETER(maxNumInitialSyncCollectionClonerCursors, int, 1);
+
/**
* Default listCollections predicate.
*/
@@ -364,7 +367,8 @@ void DatabaseCloner::_listCollectionsCallback(const StatusWith<Fetcher::QueryRes
stdx::bind(
&DatabaseCloner::_collectionClonerCallback, this, stdx::placeholders::_1, nss),
_storageInterface,
- collectionClonerBatchSize);
+ collectionClonerBatchSize,
+ maxNumInitialSyncCollectionClonerCursors.load());
} catch (const UserException& ex) {
_finishCallback_inlock(lk, ex.toStatus());
return;
diff --git a/src/mongo/db/s/sharding_task_executor.cpp b/src/mongo/db/s/sharding_task_executor.cpp
index 24fc512459b..0769a42ed40 100644
--- a/src/mongo/db/s/sharding_task_executor.cpp
+++ b/src/mongo/db/s/sharding_task_executor.cpp
@@ -38,8 +38,11 @@
#include "mongo/db/logical_time.h"
#include "mongo/db/operation_time_tracker.h"
#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/sharding_metadata.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_last_error_info.h"
+#include "mongo/s/grid.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
@@ -116,10 +119,17 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCom
const TaskExecutor::RemoteCommandCallbackArgs& args) {
ON_BLOCK_EXIT([&cb, &args]() { cb(args); });
+ auto shard = grid.shardRegistry()->getShardForHostNoReload(request.target);
+ if (!shard) {
+ LOG(1) << "Could not find shard containing host: " << request.target.toString();
+ return;
+ }
if (!args.response.isOK()) {
+ shard->updateReplSetMonitor(request.target, args.response.status);
LOG(1) << "Error processing the remote request, not updating operationTime or gLE";
return;
}
+ shard->updateReplSetMonitor(request.target, getStatusFromCommandResult(args.response.data));
// Update the logical clock.
invariant(timeTracker);
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 037765fd5f9..5a550ae7e24 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -334,7 +334,6 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent(
// the new event right away to propagate the fact that the previous event had been signaled to
// the new event.
signalCurrentEventIfReady_inlock();
-
return eventToReturn;
}
@@ -398,22 +397,11 @@ void AsyncResultsMerger::handleBatchResponse(
// Early return from this point on signal anyone waiting on an event, if ready() is true.
ScopeGuard signaller = MakeGuard(&AsyncResultsMerger::signalCurrentEventIfReady_inlock, this);
-
StatusWith<CursorResponse> cursorResponseStatus(
cbData.response.isOK() ? parseCursorResponse(cbData.response.data, remote)
: cbData.response.status);
-
if (!cursorResponseStatus.isOK()) {
- auto shard = remote.getShard();
- if (!shard) {
- remote.status = Status(cursorResponseStatus.getStatus().code(),
- str::stream() << "Could not find shard containing host "
- << remote.getTargetHost().toString());
- } else {
- shard->updateReplSetMonitor(remote.getTargetHost(), cursorResponseStatus.getStatus());
- remote.status = cursorResponseStatus.getStatus();
- }
-
+ remote.status = cursorResponseStatus.getStatus();
// Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
// remove the unreachable host entirely from consideration by marking it as exhausted.
if (_params->isAllowPartialResults) {