diff options
author | Jason Chan <jason.chan@mongodb.com> | 2017-07-21 13:13:18 -0400 |
---|---|---|
committer | Jason Chan <jason.chan@mongodb.com> | 2017-08-11 16:25:04 -0400 |
commit | 0d3137df3879e86d92904309e968f25529904639 (patch) | |
tree | 4c235dea6963452a90b69afbe1cc9d9b19464aa7 | |
parent | 9ab84fb795502c2a362fe74e4d438a952433d41a (diff) | |
download | mongo-0d3137df3879e86d92904309e968f25529904639.tar.gz |
SERVER-29617 replace fetcher with ARM and add numCursors server parameter
-rw-r--r-- | jstests/replsets/initial_sync_capped_index.js | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/base_cloner_test_fixture.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/base_cloner_test_fixture.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 417 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 92 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 894 | ||||
-rw-r--r-- | src/mongo/db/repl/database_cloner.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_task_executor.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 14 |
9 files changed, 1255 insertions, 190 deletions
diff --git a/jstests/replsets/initial_sync_capped_index.js b/jstests/replsets/initial_sync_capped_index.js index 0290e14fdef..2100b6f14bc 100644 --- a/jstests/replsets/initial_sync_capped_index.js +++ b/jstests/replsets/initial_sync_capped_index.js @@ -84,7 +84,7 @@ replTest.add(); var secondary = replTest.getSecondary(); - var collectionClonerFailPoint = "initialSyncHangCollectionClonerAfterInitialFind"; + var collectionClonerFailPoint = "initialSyncHangCollectionClonerAfterHandlingBatchResponse"; // Make the collection cloner pause after its initial 'find' response on the capped collection. var nss = dbName + "." + cappedCollName; @@ -98,7 +98,8 @@ jsTestLog("Waiting for the initial 'find' response of capped collection cloner to complete."); checkLog.contains( - secondary, "initialSyncHangCollectionClonerAfterInitialFind fail point enabled for " + nss); + secondary, + "initialSyncHangCollectionClonerAfterHandlingBatchResponse fail point enabled for " + nss); // Append documents to the capped collection so that the SECONDARY will clone these // additional documents. diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index e1a8131fb10..fa418d338fb 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -76,6 +76,11 @@ BSONObj BaseClonerTest::createCursorResponse(CursorId cursorId, const BSONArray& } // static +BSONObj BaseClonerTest::createFinalCursorResponse(const BSONArray& docs) { + return createCursorResponse(0, docs, "nextBatch"); +} + +// static BSONObj BaseClonerTest::createListCollectionsResponse(CursorId cursorId, const BSONArray& colls, const char* fieldName) { diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h index fb8ecf22861..e5615806fc6 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.h +++ b/src/mongo/db/repl/base_cloner_test_fixture.h @@ -76,6 +76,8 @@ public: static BSONObj createCursorResponse(CursorId cursorId, const BSONArray& docs); + static BSONObj createFinalCursorResponse(const BSONArray& docs); + /** * Creates a listCollections response with given array of index specs. */ diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 13b27d0ed01..98d658b49f9 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -74,9 +74,9 @@ MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncCollectionFindAttempts, int, 3); // collection 'namespace'. MONGO_FP_DECLARE(initialSyncHangDuringCollectionClone); -// Failpoint which causes initial sync to hang after the initial 'find' command of collection -// cloning, for a specific collection. -MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterInitialFind); +// Failpoint which causes initial sync to hang after handling the next batch of results from the +// 'AsyncResultsMerger' for a specific collection. +MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterHandlingBatchResponse); CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, OldThreadPool* dbWorkThreadPool, @@ -85,7 +85,8 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, const CollectionOptions& options, const CallbackFn& onCompletion, StorageInterface* storageInterface, - const int batchSize) + const int batchSize, + const int maxNumClonerCursors) : _executor(executor), _dbWorkThreadPool(dbWorkThreadPool), _source(source), @@ -122,7 +123,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, executor::RemoteCommandRequest::kNoTimeout, RemoteCommandRetryScheduler::kAllRetriableErrors)), _indexSpecs(), - _documents(), + _documentsToInsert(), _dbWorkTaskRunner(_dbWorkThreadPool), _scheduleDbWorkFn([this](const executor::TaskExecutor::CallbackFn& work) { auto task = [work](OperationContext* opCtx, @@ -138,9 +139,8 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, kProgressMeterCheckInterval, "documents copied", str::stream() << _sourceNss.toString() << " collection clone progress"), - _batchSize(batchSize), - _arm(executor, - stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator()).get()) { + _collectionCloningBatchSize(batchSize), + _maxNumClonerCursors(maxNumClonerCursors) { // Fetcher throws an exception on null executor. invariant(executor); uassert(ErrorCodes::BadValue, @@ -215,15 +215,19 @@ void CollectionCloner::shutdown() { // Nothing to do if we are already in ShuttingDown or Complete state. return; } - _cancelRemainingWork_inlock(); } void CollectionCloner::_cancelRemainingWork_inlock() { + if (_arm) { + Client::initThreadIfNotAlready(); + auto opCtx = cc().getOperationContext(); + _killArmHandle = _arm->kill(opCtx); + } _countScheduler.shutdown(); _listIndexesFetcher.shutdown(); - if (_findFetcher) { - _findFetcher->shutdown(); + if (_establishCollectionCursorsScheduler) { + _establishCollectionCursorsScheduler->shutdown(); } _dbWorkTaskRunner.cancel(); } @@ -235,6 +239,9 @@ CollectionCloner::Stats CollectionCloner::getStats() const { void CollectionCloner::join() { stdx::unique_lock<stdx::mutex> lk(_mutex); + if (_killArmHandle) { + _executor->waitForEvent(_killArmHandle); + } _condition.wait(lk, [this]() { return !_isActive_inlock(); }); } @@ -250,6 +257,11 @@ void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& sched _scheduleDbWorkFn = scheduleDbWorkFn; } +std::vector<BSONObj> CollectionCloner::getDocumentsToInsert_forTest() { + LockGuard lk(_mutex); + return _documentsToInsert; +} + void CollectionCloner::_countCallback( const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { @@ -391,80 +403,181 @@ void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& } } -void CollectionCloner::_findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob, - std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - if (!fetchResult.isOK()) { - // Wait for active inserts to complete. - waitForDbWorker(); +void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) { + if (!cbd.status.isOK()) { + _finishCallback(cbd.status); + return; + } + if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) { + warning() + << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:" + << this->_sourceNss; + } - Status newStatus{fetchResult.getStatus().code(), - str::stream() << "While querying collection '" << _sourceNss.ns() - << "' there was an error '" - << fetchResult.getStatus().reason() - << "'"}; + auto collectionBulkLoader = _storageInterface->createCollectionForBulkLoading( + _destNss, _options, _idIndexSpec, _indexSpecs); - stdx::lock_guard<stdx::mutex> lock(_mutex); - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, newStatus); + if (!collectionBulkLoader.isOK()) { + _finishCallback(collectionBulkLoader.getStatus()); return; } - auto batchData(fetchResult.getValue()); - bool lastBatch = *nextAction == Fetcher::NextAction::kNoAction; - if (batchData.documents.size() > 0) { - LockGuard lk(_mutex); - _documents.insert(_documents.end(), batchData.documents.begin(), batchData.documents.end()); - } else if (!batchData.first) { - warning() << "No documents returned in batch; ns: " << _sourceNss - << ", cursorId:" << batchData.cursorId << ", isLastBatch:" << lastBatch; + _stats.indexes = _indexSpecs.size(); + if (!_idIndexSpec.isEmpty()) { + ++_stats.indexes; } - auto&& scheduleResult = - _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback, - this, - stdx::placeholders::_1, - lastBatch, - onCompletionGuard)); - if (!scheduleResult.isOK()) { - Status newStatus{scheduleResult.getStatus().code(), - str::stream() << "While cloning collection '" << _sourceNss.ns() - << "' there was an error '" - << scheduleResult.getStatus().reason() - << "'"}; + _collLoader = std::move(collectionBulkLoader.getValue()); - stdx::lock_guard<stdx::mutex> lock(_mutex); - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, newStatus); + BSONObjBuilder cmdObj; + EstablishCursorsCommand cursorCommand; + // The 'find' command is used when the number of cloning cursors is 1 to ensure + // the correctness of the collection cloning process until 'parallelCollectionScan' + // can be tested more extensively in context of initial sync. + if (_maxNumClonerCursors == 1) { + cmdObj.append("find", _sourceNss.coll()); + cmdObj.append("noCursorTimeout", true); + // Set batchSize to be 0 to establish the cursor without fetching any documents, + // similar to the response format of 'parallelCollectionScan'. + cmdObj.append("batchSize", 0); + cursorCommand = Find; + } else { + cmdObj.append("parallelCollectionScan", _sourceNss.coll()); + cmdObj.append("numCursors", _maxNumClonerCursors); + cursorCommand = ParallelCollScan; + } + + Client::initThreadIfNotAlready(); + auto opCtx = cc().getOperationContext(); + + _establishCollectionCursorsScheduler = stdx::make_unique<RemoteCommandRetryScheduler>( + _executor, + RemoteCommandRequest(_source, + _sourceNss.db().toString(), + cmdObj.obj(), + ReadPreferenceSetting::secondaryPreferredMetadata(), + opCtx, + RemoteCommandRequest::kNoTimeout), + stdx::bind(&CollectionCloner::_establishCollectionCursorsCallback, + this, + stdx::placeholders::_1, + cursorCommand), + RemoteCommandRetryScheduler::makeRetryPolicy( + numInitialSyncCollectionFindAttempts.load(), + executor::RemoteCommandRequest::kNoTimeout, + RemoteCommandRetryScheduler::kAllRetriableErrors)); + auto scheduleStatus = _establishCollectionCursorsScheduler->startup(); + LOG(1) << "Attempting to establish cursors with maxNumClonerCursors: " << _maxNumClonerCursors; + + if (!scheduleStatus.isOK()) { + _establishCollectionCursorsScheduler.reset(); + _finishCallback(scheduleStatus); return; } +} - MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterInitialFind, nssData) { - const BSONObj& data = nssData.getData(); - auto nss = data["nss"].str(); - // Only hang when cloning the specified collection. - if (_destNss.toString() == nss) { - while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterInitialFind) && - !_isShuttingDown()) { - log() << "initialSyncHangCollectionClonerAfterInitialFind fail point enabled for " - << nss << ". Blocking until fail point is disabled."; - mongo::sleepsecs(1); +Status CollectionCloner::_parseCursorResponse(BSONObj response, + std::vector<CursorResponse>* cursors, + EstablishCursorsCommand cursorCommand) { + switch (cursorCommand) { + case Find: { + StatusWith<CursorResponse> findResponse = CursorResponse::parseFromBSON(response); + if (!findResponse.isOK()) { + Status errorStatus{findResponse.getStatus().code(), + str::stream() + << "While parsing the 'find' query against collection '" + << _sourceNss.ns() + << "' there was an error '" + << findResponse.getStatus().reason() + << "'"}; + return errorStatus; } + cursors->push_back(std::move(findResponse.getValue())); + break; + } + case ParallelCollScan: { + auto cursorElements = _parseParallelCollectionScanResponse(response); + if (!cursorElements.isOK()) { + return cursorElements.getStatus(); + } + std::vector<BSONElement> cursorsArray; + cursorsArray = cursorElements.getValue(); + // Parse each BSONElement into a 'CursorResponse' object. + for (BSONElement cursor : cursorsArray) { + if (!cursor.isABSONObj()) { + Status errorStatus( + ErrorCodes::FailedToParse, + "The 'cursor' field in the list of cursor responses is not a " + "valid BSON Object"); + return errorStatus; + } + const BSONObj cursorObj = cursor.Obj().getOwned(); + StatusWith<CursorResponse> parallelCollScanResponse = + CursorResponse::parseFromBSON(cursorObj); + if (!parallelCollScanResponse.isOK()) { + return parallelCollScanResponse.getStatus(); + } + cursors->push_back(std::move(parallelCollScanResponse.getValue())); + } + break; + } + default: { + Status errorStatus( + ErrorCodes::FailedToParse, + "The command used to establish the collection cloner cursors is not valid."); + return errorStatus; } } + return Status::OK(); +} - if (!lastBatch) { - invariant(getMoreBob); - getMoreBob->append("getMore", batchData.cursorId); - getMoreBob->append("collection", batchData.nss.coll()); - getMoreBob->append("batchSize", _batchSize); +void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd, + EstablishCursorsCommand cursorCommand) { + if (_state == State::kShuttingDown) { + Status shuttingDownStatus{ErrorCodes::CallbackCanceled, "Cloner shutting down."}; + _finishCallback(shuttingDownStatus); + return; + } + auto response = rcbd.response; + if (!response.isOK()) { + _finishCallback(response.status); + return; + } + Status commandStatus = getStatusFromCommandResult(response.data); + if (!commandStatus.isOK()) { + Status newStatus{commandStatus.code(), + str::stream() << "While querying collection '" << _sourceNss.ns() + << "' there was an error '" + << commandStatus.reason() + << "'"}; + _finishCallback(commandStatus); + return; } -} -void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) { - if (!cbd.status.isOK()) { - _finishCallback(cbd.status); + std::vector<CursorResponse> cursorResponses; + Status parseResponseStatus = + _parseCursorResponse(response.data, &cursorResponses, cursorCommand); + if (!parseResponseStatus.isOK()) { + _finishCallback(parseResponseStatus); return; } + LOG(1) << "Collection cloner running with " << cursorResponses.size() + << " cursors established."; + + // Initialize the 'AsyncResultsMerger'(ARM). + std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; + for (auto&& cursorResponse : cursorResponses) { + // A placeholder 'ShardId' is used until the ARM is made less sharding specific. + remoteCursors.emplace_back( + ShardId("CollectionClonerSyncSource"), _source, std::move(cursorResponse)); + } + + // An empty list of authenticated users is passed into the cluster parameters + // as user information is not used in the ARM in context of collection cloning. + _clusterClientCursorParams = + stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator()); + _clusterClientCursorParams->remotes = std::move(remoteCursors); + _arm = stdx::make_unique<AsyncResultsMerger>(_executor, _clusterClientCursorParams.get()); // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; @@ -476,50 +589,134 @@ void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::Ca // that will cause the destructor of the completion guard to run, the destructor must be run // outside the mutex. This is a necessary condition to invoke _finishCallback. stdx::lock_guard<stdx::mutex> lock(_mutex); - if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) { - warning() - << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:" - << this->_sourceNss; + Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); + if (!scheduleStatus.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); + return; } +} - auto status = _storageInterface->createCollectionForBulkLoading( - _destNss, _options, _idIndexSpec, _indexSpecs); +StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionScanResponse( + BSONObj resp) { + if (!resp.hasField("cursors")) { + return Status(ErrorCodes::CursorNotFound, + "The 'parallelCollectionScan' response does not contain a 'cursors' field."); + } + BSONElement response = resp["cursors"]; + if (response.type() == BSONType::Array) { + return response.Array(); + } else { + return Status( + ErrorCodes::FailedToParse, + "The 'parallelCollectionScan' response is unable to be transformed into an array."); + } +} - if (!status.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status.getStatus()); +Status CollectionCloner::_bufferNextBatchFromArm() { + while (_arm->ready()) { + auto armResultStatus = _arm->nextReady(); + if (!armResultStatus.getStatus().isOK()) { + return armResultStatus.getStatus(); + } + if (armResultStatus.getValue().isEOF()) { + // We have reached the end of the batch. + break; + } else { + auto queryResult = armResultStatus.getValue().getResult(); + _documentsToInsert.push_back(std::move(*queryResult)); + } + } + + return Status::OK(); +} + +Status CollectionCloner::_scheduleNextARMResultsCallback( + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + Client::initThreadIfNotAlready(); + auto opCtx = cc().getOperationContext(); + auto nextEvent = _arm->nextEvent(opCtx); + if (!nextEvent.isOK()) { + return nextEvent.getStatus(); + } + auto event = nextEvent.getValue(); + auto handleARMResultsOnNextEvent = + _executor->onEvent(event, + stdx::bind(&CollectionCloner::_handleARMResultsCallback, + this, + stdx::placeholders::_1, + onCompletionGuard)); + return handleARMResultsOnNextEvent.getStatus(); +} + +void CollectionCloner::_handleARMResultsCallback( + const executor::TaskExecutor::CallbackArgs& cbd, + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + auto setResultAndCancelRemainingWork = [this](std::shared_ptr<OnCompletionGuard> guard, + Status status) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + guard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + }; + + if (!cbd.status.isOK()) { + // Wait for active inserts to complete. + waitForDbWorker(); + Status newStatus{cbd.status.code(), + str::stream() << "While querying collection '" << _sourceNss.ns() + << "' there was an error '" + << cbd.status.reason() + << "'"}; + setResultAndCancelRemainingWork(onCompletionGuard, cbd.status); return; } - _stats.indexes = _indexSpecs.size(); - if (!_idIndexSpec.isEmpty()) { - ++_stats.indexes; + // Pull the documents from the ARM into a buffer until the entire batch has been processed. + auto nextBatchStatus = _bufferNextBatchFromArm(); + if (!nextBatchStatus.isOK()) { + setResultAndCancelRemainingWork(onCompletionGuard, nextBatchStatus); + return; } - _collLoader = std::move(status.getValue()); + bool lastBatch = _arm->remotesExhausted(); + auto&& scheduleResult = + _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback, + this, + stdx::placeholders::_1, + lastBatch, + onCompletionGuard)); + if (!scheduleResult.isOK()) { + Status newStatus{scheduleResult.getStatus().code(), + str::stream() << "While cloning collection '" << _sourceNss.ns() + << "' there was an error '" + << scheduleResult.getStatus().reason() + << "'"}; + setResultAndCancelRemainingWork(onCompletionGuard, scheduleResult.getStatus()); + return; + } - _findFetcher = stdx::make_unique<Fetcher>( - _executor, - _source, - _sourceNss.db().toString(), - BSON("find" << _sourceNss.coll() << "noCursorTimeout" << true << "batchSize" << _batchSize), - stdx::bind(&CollectionCloner::_findCallback, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3, - onCompletionGuard), - ReadPreferenceSetting::secondaryPreferredMetadata(), - RemoteCommandRequest::kNoTimeout, - RemoteCommandRetryScheduler::makeRetryPolicy( - numInitialSyncCollectionFindAttempts.load(), - executor::RemoteCommandRequest::kNoTimeout, - RemoteCommandRetryScheduler::kAllRetriableErrors)); + MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterHandlingBatchResponse, nssData) { + const BSONObj& data = nssData.getData(); + auto nss = data["nss"].str(); + // Only hang when cloning the specified collection. + if (_destNss.toString() == nss) { + while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterHandlingBatchResponse) && + !_isShuttingDown()) { + log() << "initialSyncHangCollectionClonerAfterHandlingBatchResponse fail point " + "enabled for " + << nss << ". Blocking until fail point is disabled."; + mongo::sleepsecs(1); + } + } + } - Status scheduleStatus = _findFetcher->schedule(); - if (!scheduleStatus.isOK()) { - _findFetcher.reset(); - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); - return; + // If the remote cursors are not exhausted, schedule this callback again to handle + // the impending cursor response. + if (!lastBatch) { + Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); + if (!scheduleStatus.isOK()) { + setResultAndCancelRemainingWork(onCompletionGuard, scheduleStatus); + return; + } } } @@ -533,24 +730,21 @@ void CollectionCloner::_insertDocumentsCallback( return; } - std::vector<BSONObj> docs; UniqueLock lk(_mutex); - if (_documents.size() == 0) { + std::vector<BSONObj> docs; + if (_documentsToInsert.size() == 0) { warning() << "_insertDocumentsCallback, but no documents to insert for ns:" << _destNss; - if (lastBatch) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK()); } return; } - - _documents.swap(docs); + _documentsToInsert.swap(docs); _stats.documentsCopied += docs.size(); ++_stats.fetchBatches; _progressMeter.hit(int(docs.size())); invariant(_collLoader); const auto status = _collLoader->insertDocuments(docs.cbegin(), docs.cend()); - if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, status); return; @@ -570,12 +764,10 @@ void CollectionCloner::_insertDocumentsCallback( } } - if (!lastBatch) { - return; + if (lastBatch) { + // Clean up resources once the last batch has been copied over and set the status to OK. + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK()); } - - // Done with last batch and time to set result in completion guard to Status::OK(). - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK()); } void CollectionCloner::_finishCallback(const Status& status) { @@ -584,7 +776,6 @@ void CollectionCloner::_finishCallback(const Status& status) { // Copy the status so we can change it below if needed. auto finalStatus = status; bool callCollectionLoader = false; - decltype(_onCompletion) onCompletion; { LockGuard lk(_mutex); @@ -595,7 +786,6 @@ void CollectionCloner::_finishCallback(const Status& status) { invariant(_onCompletion); std::swap(_onCompletion, onCompletion); } - if (callCollectionLoader) { if (finalStatus.isOK()) { const auto loaderStatus = _collLoader->commit(); @@ -609,7 +799,6 @@ void CollectionCloner::_finishCallback(const Status& status) { // This will release the resources held by the loader. _collLoader.reset(); } - onCompletion(finalStatus); // This will release the resources held by the callback function object. '_onCompletion' is diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index af029b0f1cd..222886b9d72 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -69,6 +69,7 @@ public: /** * Callback completion guard for CollectionCloner. */ + using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; using OnCompletionGuard = CallbackCompletionGuard<Status>; struct Stats { @@ -111,7 +112,8 @@ public: const CollectionOptions& options, const CallbackFn& onCompletion, StorageInterface* storageInterface, - const int batchSize); + const int batchSize, + const int maxNumClonerCursors); virtual ~CollectionCloner(); @@ -146,6 +148,14 @@ public: */ void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn); + /** + * Returns the documents currently stored in the '_documents' buffer that is intended + * to be inserted through the collection loader. + * + * For testing only. + */ + std::vector<BSONObj> getDocumentsToInsert_forTest(); + private: bool _isActive_inlock() const; @@ -173,14 +183,6 @@ private: BSONObjBuilder* getMoreBob); /** - * Read collection documents from find result. - */ - void _findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob, - std::shared_ptr<OnCompletionGuard> onCompletionGuard); - - /** * Request storage interface to create collection. * * Called multiple times if there are more than one batch of responses from listIndexes @@ -192,13 +194,59 @@ private: void _beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& callbackData); /** - * Called multiple times if there are more than one batch of documents from the fetcher. + * The possible command types that can be used to establish the initial cursors on the + * remote collection. + */ + enum EstablishCursorsCommand { Find, ParallelCollScan }; + + /** + * Parses the cursor responses from the 'find' or 'parallelCollectionScan' command + * and passes them into the 'AsyncResultsMerger'. + */ + void _establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd, + EstablishCursorsCommand cursorCommand); + + /** + * Parses the response from a 'parallelCollectionScan' command into a vector of cursor + * elements. + */ + StatusWith<std::vector<BSONElement>> _parseParallelCollectionScanResponse(BSONObj resp); + + /** + * Takes a cursors buffer and parses the 'parallelCollectionScan' response into cursor + * responses that are pushed onto the buffer. + */ + Status _parseCursorResponse(BSONObj response, + std::vector<CursorResponse>* cursors, + EstablishCursorsCommand cursorCommand); + + /** + * Calls to get the next event from the 'AsyncResultsMerger'. This schedules + * '_handleAsyncResultsCallback' to be run when the event is signaled successfully. + */ + Status _scheduleNextARMResultsCallback(std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Runs for each time a new batch of documents can be retrieved from the 'AsyncResultsMerger'. + * Buffers the documents retrieved for insertion and schedules a '_insertDocumentsCallback' + * to insert the contents of the buffer. + */ + void _handleARMResultsCallback(const executor::TaskExecutor::CallbackArgs& cbd, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** + * Pull all ready results from the ARM into a buffer to be inserted. + */ + Status _bufferNextBatchFromArm(); + + /** + * Called whenever there is a new batch of documents ready from the 'AsyncResultsMerger'. * On the last batch, 'lastBatch' will be true. * * Each document returned will be inserted via the storage interfaceRequest storage * interface. */ - void _insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& callbackData, + void _insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd, bool lastBatch, std::shared_ptr<OnCompletionGuard> onCompletionGuard); @@ -231,18 +279,30 @@ private: StorageInterface* _storageInterface; // (R) Not owned by us. RemoteCommandRetryScheduler _countScheduler; // (S) Fetcher _listIndexesFetcher; // (S) - std::unique_ptr<Fetcher> _findFetcher; // (M) std::vector<BSONObj> _indexSpecs; // (M) BSONObj _idIndexSpec; // (M) - std::vector<BSONObj> _documents; // (M) Documents read from fetcher to insert. - TaskRunner _dbWorkTaskRunner; // (R) + std::vector<BSONObj> + _documentsToInsert; // (M) Documents read from 'AsyncResultsMerger' to insert. + TaskRunner _dbWorkTaskRunner; // (R) ScheduleDbWorkFn _scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor. Stats _stats; // (M) stats for this instance. ProgressMeter _progressMeter; // (M) progress meter for this instance. - const int _batchSize; + const int _collectionCloningBatchSize; // (R) The size of the batches of documents returned in + // collection cloning. + + // (R) The maximum number of cursors to use in the collection cloning process. + const int _maxNumClonerCursors; + // (M) Component responsible for fetching the documents from the collection cloner cursor(s). + std::unique_ptr<AsyncResultsMerger> _arm; + // (R) The cursor parameters used by the 'AsyncResultsMerger'. + std::unique_ptr<ClusterClientCursorParams> _clusterClientCursorParams; + + // (M) The event handle for the 'kill' event of the 'AsyncResultsMerger'. + executor::TaskExecutor::EventHandle _killArmHandle; - AsyncResultsMerger _arm; + // (M) Scheduler used to establish the initial cursor or set of cursors. + std::unique_ptr<RemoteCommandRetryScheduler> _establishCollectionCursorsScheduler; // State transitions: // PreStart --> Running --> ShuttingDown --> Complete diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 4fb7a7fe208..d3e8966a304 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -66,8 +66,12 @@ protected: void setUp() override; void tearDown() override; - // 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use. - const int defaultBatchSize = (16 * 1024 * 1024) / 12 * 10; + // A simple arbitrary value to use as the default batch size. + const int defaultBatchSize = 1024; + + // Running initial sync with a single cursor will default to using the 'find' command until + // 'parallelCollectionScan' has more complete testing. + const int defaultNumCloningCursors = 1; CollectionOptions options; std::unique_ptr<CollectionCloner> collectionCloner; @@ -87,7 +91,8 @@ void CollectionClonerTest::setUp() { options, stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), storageInterface.get(), - defaultBatchSize); + defaultBatchSize, + defaultNumCloningCursors); collectionStats = CollectionMockStats(); storageInterface->createCollectionForBulkFn = [this](const NamespaceString& nss, @@ -114,6 +119,7 @@ BaseCloner* CollectionClonerTest::getCloner() const { return collectionCloner.get(); } + TEST_F(CollectionClonerTest, InvalidConstruction) { executor::TaskExecutor& executor = getExecutor(); auto pool = dbWorkThreadPool.get(); @@ -123,29 +129,50 @@ TEST_F(CollectionClonerTest, InvalidConstruction) { // Null executor -- error from Fetcher, not CollectionCloner. { StorageInterface* si = storageInterface.get(); - ASSERT_THROWS_CODE_AND_WHAT( - CollectionCloner(nullptr, pool, target, nss, options, cb, si, defaultBatchSize), - UserException, - ErrorCodes::BadValue, - "task executor cannot be null"); + ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(nullptr, + pool, + target, + nss, + options, + cb, + si, + defaultBatchSize, + defaultNumCloningCursors), + UserException, + ErrorCodes::BadValue, + "task executor cannot be null"); } // Null storage interface - ASSERT_THROWS_CODE_AND_WHAT( - CollectionCloner(&executor, pool, target, nss, options, cb, nullptr, defaultBatchSize), - UserException, - ErrorCodes::BadValue, - "storage interface cannot be null"); + ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(&executor, + pool, + target, + nss, + options, + cb, + nullptr, + defaultBatchSize, + defaultNumCloningCursors), + UserException, + ErrorCodes::BadValue, + "storage interface cannot be null"); // Invalid namespace. { NamespaceString badNss("db."); StorageInterface* si = storageInterface.get(); - ASSERT_THROWS_CODE_AND_WHAT( - CollectionCloner(&executor, pool, target, badNss, options, cb, si, defaultBatchSize), - UserException, - ErrorCodes::BadValue, - "invalid collection namespace: db."); + ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(&executor, + pool, + target, + badNss, + options, + cb, + si, + defaultBatchSize, + defaultNumCloningCursors), + UserException, + ErrorCodes::BadValue, + "invalid collection namespace: db."); } // Invalid collection options - error from CollectionOptions::validate(), not CollectionCloner. @@ -155,8 +182,15 @@ TEST_F(CollectionClonerTest, InvalidConstruction) { << "not a document"); StorageInterface* si = storageInterface.get(); ASSERT_THROWS_CODE_AND_WHAT( - CollectionCloner( - &executor, pool, target, nss, invalidOptions, cb, si, defaultBatchSize), + CollectionCloner(&executor, + pool, + target, + nss, + invalidOptions, + cb, + si, + defaultBatchSize, + defaultNumCloningCursors), UserException, ErrorCodes::BadValue, "'storageEngine.storageEngine1' has to be an embedded document."); @@ -166,11 +200,18 @@ TEST_F(CollectionClonerTest, InvalidConstruction) { { CollectionCloner::CallbackFn nullCb; StorageInterface* si = storageInterface.get(); - ASSERT_THROWS_CODE_AND_WHAT( - CollectionCloner(&executor, pool, target, nss, options, nullCb, si, defaultBatchSize), - UserException, - ErrorCodes::BadValue, - "callback function cannot be null"); + ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(&executor, + pool, + target, + nss, + options, + nullCb, + si, + defaultBatchSize, + defaultNumCloningCursors), + UserException, + ErrorCodes::BadValue, + "callback function cannot be null"); } } @@ -316,7 +357,8 @@ TEST_F(CollectionClonerTest, options, stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), storageInterface.get(), - defaultBatchSize); + defaultBatchSize, + defaultNumCloningCursors); ASSERT_OK(collectionCloner->startup()); @@ -339,7 +381,8 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { options, stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), storageInterface.get(), - defaultBatchSize)); + defaultBatchSize, + defaultNumCloningCursors)); NamespaceString collNss; CollectionOptions collOptions; @@ -370,10 +413,19 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, emptyArray)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc))); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); } collectionCloner->join(); ASSERT_EQUALS(1, collectionStats.insertCount); @@ -699,7 +751,7 @@ TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) { ASSERT_FALSE(net->hasReadyRequests()); } -TEST_F(CollectionClonerTest, FindCommandFailed) { +TEST_F(CollectionClonerTest, EstablishCursorCommandFailed) { ASSERT_OK(collectionCloner->startup()); { @@ -747,13 +799,15 @@ TEST_F(CollectionClonerTest, CollectionClonerResendsFindCommandOnRetriableError) net->runReadyNetworkOperations(); ASSERT_TRUE(collectionCloner->isActive()); - // Confirm that CollectionCloner resends the find request. + // This check exists to ensure that the command used to establish the cursors is retried, + // regardless of the command format. Therefore, it shouldn't be necessary to have a separate + // similar test case for the 'parallelCollectionScan' command. auto noi = net->getNextReadyRequest(); assertRemoteCommandNameEquals("find", noi->getRequest()); net->blackHole(noi); } -TEST_F(CollectionClonerTest, FindCommandCanceled) { +TEST_F(CollectionClonerTest, EstablishCursorCommandCanceled) { ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); @@ -810,10 +864,19 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) { return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, ""); }); + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, emptyArray)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc))); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); } ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code()); @@ -844,9 +907,18 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) { return StatusWith<executor::TaskExecutor::CallbackHandle>(handle); }); + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, emptyArray)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(0, BSON_ARRAY(BSON("_id" << 1)))); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1)))); } collectionCloner->join(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); @@ -875,9 +947,18 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) { return Status(ErrorCodes::OperationFailed, ""); }; + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, emptyArray)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(0, BSON_ARRAY(BSON("_id" << 1)))); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1)))); } collectionCloner->join(); @@ -902,10 +983,19 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) { ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, emptyArray)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc))); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); } collectionCloner->join(); @@ -933,6 +1023,15 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) { ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, emptyArray)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); @@ -950,7 +1049,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) { const BSONObj doc2 = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc2), "nextBatch")); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2))); } collectionCloner->join(); @@ -978,6 +1077,15 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) { ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, emptyArray)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); @@ -1002,10 +1110,9 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) { ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); ASSERT_TRUE(collectionCloner->isActive()); - BSONArray emptyArray; { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(0, emptyArray, "nextBatch")); + processNetworkResponse(createFinalCursorResponse(emptyArray)); } collectionCloner->join(); @@ -1031,6 +1138,15 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) { ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, emptyArray)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); @@ -1043,7 +1159,6 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) { ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); ASSERT_TRUE(collectionCloner->isActive()); - BSONArray emptyArray; { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch")); @@ -1058,7 +1173,7 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) { const BSONObj doc2 = BSON("_id" << 2); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc2), "nextBatch")); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2))); } collectionCloner->join(); @@ -1097,6 +1212,15 @@ TEST_F(CollectionClonerTest, CollectionClonerCannotBeRestartedAfterPreviousFailu ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, emptyArray)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse(createCursorResponse(1, BSON_ARRAY(BSON("_id" << 1)))); @@ -1156,7 +1280,8 @@ TEST_F(CollectionClonerTest, CollectionClonerResetsOnCompletionCallbackFunctionA result = status; }, storageInterface.get(), - defaultBatchSize); + defaultBatchSize, + defaultNumCloningCursors); ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); @@ -1202,7 +1327,620 @@ TEST_F(CollectionClonerTest, ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); - // At this point, the CollectionCloner has sent the find request for the collection documents. + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, emptyArray)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + + // At this point, the CollectionCloner has sent the find request to establish the cursor. + // We want to return the first batch of documents for the collection from the network so that + // the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for + // the next batch of documents. + + // Store the scheduled CollectionCloner::_insertDocuments task but do not run it yet. + executor::TaskExecutor::CallbackFn insertDocumentsFn; + collectionCloner->setScheduleDbWorkFn_forTest( + [&](const executor::TaskExecutor::CallbackFn& workFn) { + insertDocumentsFn = workFn; + executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>()); + return StatusWith<executor::TaskExecutor::CallbackHandle>(handle); + }); + ASSERT_FALSE(insertDocumentsFn); + + // Return first batch of collection documents from remote server for the getMore request. + const BSONObj doc = BSON("_id" << 1); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + + assertRemoteCommandNameEquals( + "getMore", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc)))); + net->runReadyNetworkOperations(); + } + + // Confirm that CollectionCloner attempted to schedule _insertDocuments task. + ASSERT_TRUE(insertDocumentsFn); + + // Return an error for the getMore request for the next batch of collection documents. + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + + assertRemoteCommandNameEquals( + "getMore", + net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "getMore failed"))); + net->runReadyNetworkOperations(); + } + + // CollectionCloner should still be active because we have not finished processing the + // insertDocuments task. + ASSERT_TRUE(collectionCloner->isActive()); + ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); + + // Run the insertDocuments task. The final status of the CollectionCloner should match the first + // error passed to the completion guard (ie. from the failed getMore request). + executor::TaskExecutor::CallbackArgs callbackArgs( + &getExecutor(), {}, Status(ErrorCodes::CallbackCanceled, "")); + insertDocumentsFn(callbackArgs); + + // Reset 'insertDocumentsFn' to release last reference count on completion guard. + insertDocumentsFn = {}; + + // No need to call CollectionCloner::join() because we invoked the _insertDocuments callback + // synchronously. + + ASSERT_FALSE(collectionCloner->isActive()); + ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); +} + +class ParallelCollectionClonerTest : public BaseClonerTest { +public: + BaseCloner* getCloner() const override; + +protected: + void setUp() override; + void tearDown() override; + std::vector<BSONObj> generateDocs(std::size_t numDocs); + + // A simple arbitrary value to use as the default batch size. + const int defaultBatchSize = 1024; + + // Running initial sync with a single cursor will default to using the 'find' command until + // 'parallelCollectionScan' has more complete testing. + const int defaultNumCloningCursors = 3; + + CollectionOptions options; + std::unique_ptr<CollectionCloner> collectionCloner; + CollectionMockStats collectionStats; // Used by the _loader. + CollectionBulkLoaderMock* _loader; // Owned by CollectionCloner. +}; + +void ParallelCollectionClonerTest::setUp() { + BaseClonerTest::setUp(); + options = {}; + collectionCloner.reset(nullptr); + collectionCloner = stdx::make_unique<CollectionCloner>( + &getExecutor(), + dbWorkThreadPool.get(), + target, + nss, + options, + stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), + storageInterface.get(), + defaultBatchSize, + defaultNumCloningCursors); + collectionStats = CollectionMockStats(); + storageInterface->createCollectionForBulkFn = + [this](const NamespaceString& nss, + const CollectionOptions& options, + const BSONObj idIndexSpec, + const std::vector<BSONObj>& secondaryIndexSpecs) { + _loader = new CollectionBulkLoaderMock(&collectionStats); + Status initCollectionBulkLoader = _loader->init(secondaryIndexSpecs); + ASSERT_OK(initCollectionBulkLoader); + + return StatusWith<std::unique_ptr<CollectionBulkLoader>>( + std::unique_ptr<CollectionBulkLoader>(_loader)); + }; +} + +void ParallelCollectionClonerTest::tearDown() { + BaseClonerTest::tearDown(); + // Executor may still invoke collection cloner's callback before shutting down. + collectionCloner.reset(nullptr); + options = {}; +} + +BaseCloner* ParallelCollectionClonerTest::getCloner() const { + return collectionCloner.get(); +} + +std::vector<BSONObj> ParallelCollectionClonerTest::generateDocs(std::size_t numDocs) { + std::vector<BSONObj> docs; + for (unsigned int i = 0; i < numDocs; i++) { + docs.push_back(BSON("_id" << i)); + } + return docs; +} + +TEST_F(ParallelCollectionClonerTest, InsertDocumentsSingleBatchWithMultipleCloningCursors) { + ASSERT_OK(collectionCloner->startup()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); + } + ASSERT_TRUE(collectionCloner->isActive()); + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + ASSERT_TRUE(collectionStats.initCalled); + + // A single cursor response is returned because there is only a single document to insert. + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse( + BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)) << "ok" << 1)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + + auto exec = &getExecutor(); + std::vector<BSONObj> docs; + // Record the buffered documents before they are inserted so we can + // validate them. + collectionCloner->setScheduleDbWorkFn_forTest( + [&](const executor::TaskExecutor::CallbackFn& workFn) { + docs = collectionCloner->getDocumentsToInsert_forTest(); + return exec->scheduleWork(workFn); + }); + + const BSONObj doc = BSON("_id" << 1); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); + } + + collectionCloner->join(); + + ASSERT_BSONOBJ_EQ(docs[0], doc); + ASSERT_EQUALS(1, collectionStats.insertCount); + ASSERT_TRUE(collectionStats.commitCalled); + + ASSERT_OK(getStatus()); + ASSERT_FALSE(collectionCloner->isActive()); +} + +TEST_F(ParallelCollectionClonerTest, + InsertDocumentsSingleBatchOfMultipleDocumentsWithMultipleCloningCursors) { + ASSERT_OK(collectionCloner->startup()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); + } + ASSERT_TRUE(collectionCloner->isActive()); + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + ASSERT_TRUE(collectionStats.initCalled); + + // A single cursor response is returned because there is only a single batch of documents to + // insert. + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse( + BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)) << "ok" << 1)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + + auto exec = &getExecutor(); + std::vector<BSONObj> docs; + // Record the buffered documents before they are inserted so we can + // validate them. + collectionCloner->setScheduleDbWorkFn_forTest( + [&](const executor::TaskExecutor::CallbackFn& workFn) { + docs = collectionCloner->getDocumentsToInsert_forTest(); + return exec->scheduleWork(workFn); + }); + + auto generatedDocs = generateDocs(3); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createFinalCursorResponse( + BSON_ARRAY(generatedDocs[0] << generatedDocs[1] << generatedDocs[2]))); + } + + collectionCloner->join(); + + ASSERT_EQUALS(3U, docs.size()); + for (int i = 0; i < 3; i++) { + ASSERT_BSONOBJ_EQ(docs[i], generatedDocs[i]); + } + ASSERT_EQUALS(3, collectionStats.insertCount); + ASSERT_TRUE(collectionStats.commitCalled); + + ASSERT_OK(getStatus()); + ASSERT_FALSE(collectionCloner->isActive()); +} + +TEST_F(ParallelCollectionClonerTest, InsertDocumentsWithMultipleCursorsOfDifferentNumberOfBatches) { + ASSERT_OK(collectionCloner->startup()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); + } + ASSERT_TRUE(collectionCloner->isActive()); + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + ASSERT_TRUE(collectionStats.initCalled); + + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) + << createCursorResponse(2, emptyArray) + << createCursorResponse(3, emptyArray)) + << "ok" + << 1)); + } + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + + auto exec = &getExecutor(); + std::vector<BSONObj> docs; + + // Record the buffered documents before they are inserted so we can + // validate them. + collectionCloner->setScheduleDbWorkFn_forTest( + [&](const executor::TaskExecutor::CallbackFn& workFn) { + auto buffered = collectionCloner->getDocumentsToInsert_forTest(); + docs.insert(docs.end(), buffered.begin(), buffered.end()); + return exec->scheduleWork(workFn); + }); + + int numDocs = 9; + std::vector<BSONObj> generatedDocs = generateDocs(numDocs); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]), "nextBatch")); + processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]), "nextBatch")); + processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]), "nextBatch")); + } + + collectionCloner->waitForDbWorker(); + ASSERT_EQUALS(3U, docs.size()); + for (int i = 0; i < 3; i++) { + ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); + } + ASSERT_EQUALS(3, collectionStats.insertCount); + ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[3]), "nextBatch")); + processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[4]), "nextBatch")); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5]))); + } + + collectionCloner->waitForDbWorker(); + ASSERT_EQUALS(6U, docs.size()); + for (int i = 3; i < 6; i++) { + ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); + } + ASSERT_EQUALS(6, collectionStats.insertCount); + ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[6]), "nextBatch")); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[7]))); + } + + collectionCloner->waitForDbWorker(); + ASSERT_EQUALS(8U, docs.size()); + for (int i = 6; i < 8; i++) { + ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); + } + ASSERT_EQUALS(8, collectionStats.insertCount); + ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[8]))); + } + + collectionCloner->join(); + ASSERT_EQUALS(9U, docs.size()); + ASSERT_BSONOBJ_EQ(generatedDocs[8], docs[8]); + ASSERT_EQUALS(numDocs, collectionStats.insertCount); + ASSERT_TRUE(collectionStats.commitCalled); + + ASSERT_OK(getStatus()); + ASSERT_FALSE(collectionCloner->isActive()); +} + +TEST_F(ParallelCollectionClonerTest, MiddleBatchContainsNoDocumentsWithMultipleCursors) { + ASSERT_OK(collectionCloner->startup()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); + } + ASSERT_TRUE(collectionCloner->isActive()); + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + ASSERT_TRUE(collectionStats.initCalled); + + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) + << createCursorResponse(2, emptyArray) + << createCursorResponse(3, emptyArray)) + << "ok" + << 1)); + } + collectionCloner->waitForDbWorker(); + + auto exec = &getExecutor(); + std::vector<BSONObj> docs; + // Record the buffered documents before they are inserted so we can + // validate them. + collectionCloner->setScheduleDbWorkFn_forTest( + [&](const executor::TaskExecutor::CallbackFn& workFn) { + auto buffered = collectionCloner->getDocumentsToInsert_forTest(); + docs.insert(docs.end(), buffered.begin(), buffered.end()); + return exec->scheduleWork(workFn); + }); + + ASSERT_TRUE(collectionCloner->isActive()); + + int numDocs = 6; + std::vector<BSONObj> generatedDocs = generateDocs(numDocs); + const BSONObj doc = BSON("_id" << 1); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]))); + processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]))); + processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]))); + } + + collectionCloner->waitForDbWorker(); + ASSERT_EQUALS(3U, docs.size()); + for (int i = 0; i < 3; i++) { + ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); + } + ASSERT_EQUALS(3, collectionStats.insertCount); + + ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch")); + processNetworkResponse(createCursorResponse(2, emptyArray, "nextBatch")); + processNetworkResponse(createCursorResponse(3, emptyArray, "nextBatch")); + } + + collectionCloner->waitForDbWorker(); + ASSERT_EQUALS(3, collectionStats.insertCount); + + ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[3]))); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[4]))); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5]))); + } + + collectionCloner->join(); + + ASSERT_EQUALS(6U, docs.size()); + for (int i = 3; i < 6; i++) { + ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); + } + + ASSERT_EQUALS(numDocs, collectionStats.insertCount); + ASSERT_TRUE(collectionStats.commitCalled); + + ASSERT_OK(getStatus()); + ASSERT_FALSE(collectionCloner->isActive()); +} + +TEST_F(ParallelCollectionClonerTest, LastBatchContainsNoDocumentsWithMultipleCursors) { + ASSERT_OK(collectionCloner->startup()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); + } + ASSERT_TRUE(collectionCloner->isActive()); + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + ASSERT_TRUE(collectionStats.initCalled); + + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) + << createCursorResponse(2, emptyArray) + << createCursorResponse(3, emptyArray)) + << "ok" + << 1)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + + auto exec = &getExecutor(); + std::vector<BSONObj> docs; + // Record the buffered documents before they are inserted so we can + // validate them. + collectionCloner->setScheduleDbWorkFn_forTest( + [&](const executor::TaskExecutor::CallbackFn& workFn) { + auto buffered = collectionCloner->getDocumentsToInsert_forTest(); + docs.insert(docs.end(), buffered.begin(), buffered.end()); + return exec->scheduleWork(workFn); + }); + + int numDocs = 6; + std::vector<BSONObj> generatedDocs = generateDocs(numDocs); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]), "nextBatch")); + processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]), "nextBatch")); + processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]), "nextBatch")); + } + + collectionCloner->waitForDbWorker(); + ASSERT_EQUALS(3U, docs.size()); + for (int i = 0; i < 3; i++) { + ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); + } + ASSERT_EQUALS(3, collectionStats.insertCount); + + ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[3]), "nextBatch")); + processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[4]), "nextBatch")); + processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[5]), "nextBatch")); + } + + collectionCloner->waitForDbWorker(); + ASSERT_EQUALS(6U, docs.size()); + for (int i = 3; i < 6; i++) { + ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); + } + ASSERT_EQUALS(numDocs, collectionStats.insertCount); + + ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createFinalCursorResponse(emptyArray)); + processNetworkResponse(createFinalCursorResponse(emptyArray)); + processNetworkResponse(createFinalCursorResponse(emptyArray)); + } + + collectionCloner->join(); + ASSERT_EQUALS(6, collectionStats.insertCount); + ASSERT_TRUE(collectionStats.commitCalled); + + ASSERT_OK(getStatus()); + ASSERT_FALSE(collectionCloner->isActive()); +} + +TEST_F(ParallelCollectionClonerTest, InsertDocumentsScheduleDbWorkFailedWithMultipleCursors) { + ASSERT_OK(collectionCloner->startup()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); + } + + collectionCloner->waitForDbWorker(); + + // Replace scheduleDbWork function so that cloner will fail to schedule DB work after + // getting documents. + collectionCloner->setScheduleDbWorkFn_forTest( + [](const executor::TaskExecutor::CallbackFn& workFn) { + return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, ""); + }); + + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) + << createCursorResponse(2, emptyArray) + << createCursorResponse(3, emptyArray)) + << "ok" + << 1)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + + const BSONObj doc = BSON("_id" << 1); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); + } + + ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code()); + ASSERT_FALSE(collectionCloner->isActive()); +} + +TEST_F(ParallelCollectionClonerTest, + CollectionClonerWaitsForPendingTasksToCompleteBeforeInvokingOnCompletionCallback) { + ASSERT_OK(collectionCloner->startup()); + ASSERT_TRUE(collectionCloner->isActive()); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + + assertRemoteCommandNameEquals("count", + net->scheduleSuccessfulResponse(createCountResponse(0))); + net->runReadyNetworkOperations(); + + assertRemoteCommandNameEquals( + "listIndexes", + net->scheduleSuccessfulResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)))); + net->runReadyNetworkOperations(); + } + ASSERT_TRUE(collectionCloner->isActive()); + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + ASSERT_TRUE(collectionStats.initCalled); + + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) + << createCursorResponse(2, emptyArray) + << createCursorResponse(3, emptyArray)) + << "ok" + << 1)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + + // At this point, the CollectionCloner has sent the find request to establish the cursor. // We want to return the first batch of documents for the collection from the network so that // the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for // the next batch of documents. @@ -1217,13 +1955,13 @@ TEST_F(CollectionClonerTest, }); ASSERT_FALSE(insertDocumentsFn); - // Return first batch of collection documents from remote server for the find request. + // Return first batch of collection documents from remote server for the getMore request. const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); assertRemoteCommandNameEquals( - "find", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc)))); + "getMore", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc)))); net->runReadyNetworkOperations(); } @@ -1261,4 +1999,72 @@ TEST_F(CollectionClonerTest, ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); } +TEST_F(ParallelCollectionClonerTest, CollectionClonerCannotBeRestartedAfterPreviousFailure) { + // First cloning attempt - fails while reading documents from source collection. + unittest::log() << "Starting first collection cloning attempt"; + ASSERT_OK(collectionCloner->startup()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); + } + ASSERT_TRUE(collectionCloner->isActive()); + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + ASSERT_TRUE(collectionStats.initCalled); + + BSONArray emptyArray; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) + << createCursorResponse(2, emptyArray) + << createCursorResponse(3, emptyArray)) + << "ok" + << 1)); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCloner->isActive()); + int numDocs = 5; + std::vector<BSONObj> docs = generateDocs(numDocs); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCursorResponse(1, BSON_ARRAY(docs[0]))); + processNetworkResponse(createCursorResponse(2, BSON_ARRAY(docs[1]))); + processNetworkResponse(createCursorResponse(3, BSON_ARRAY(docs[2]))); + } + + collectionCloner->waitForDbWorker(); + ASSERT_EQUALS(3, collectionStats.insertCount); + + // Check that the status hasn't changed from the initial value. + ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); + ASSERT_TRUE(collectionCloner->isActive()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(docs[3]))); + processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(docs[4]))); + processNetworkResponse(ErrorCodes::OperationFailed, + "failed to read remaining documents from source collection"); + } + + collectionCloner->join(); + ASSERT_EQUALS(numDocs, collectionStats.insertCount); + + ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); + ASSERT_FALSE(collectionCloner->isActive()); + + // Second cloning attempt - run to completion. + unittest::log() << "Starting second collection cloning attempt - startup() should fail"; + collectionStats = CollectionMockStats(); + setStatus(getDetectableErrorStatus()); + + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, collectionCloner->startup()); +} + } // namespace diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index 0ebaf66e098..bc7ea109918 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -68,6 +68,9 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(collectionClonerBatchSize, int, defaultBat // The number of attempts for the listCollections commands. MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListCollectionsAttempts, int, 3); +// The number of cursors to use in the collection cloning process. +MONGO_EXPORT_SERVER_PARAMETER(maxNumInitialSyncCollectionClonerCursors, int, 1); + /** * Default listCollections predicate. */ @@ -364,7 +367,8 @@ void DatabaseCloner::_listCollectionsCallback(const StatusWith<Fetcher::QueryRes stdx::bind( &DatabaseCloner::_collectionClonerCallback, this, stdx::placeholders::_1, nss), _storageInterface, - collectionClonerBatchSize); + collectionClonerBatchSize, + maxNumInitialSyncCollectionClonerCursors.load()); } catch (const UserException& ex) { _finishCallback_inlock(lk, ex.toStatus()); return; diff --git a/src/mongo/db/s/sharding_task_executor.cpp b/src/mongo/db/s/sharding_task_executor.cpp index 24fc512459b..0769a42ed40 100644 --- a/src/mongo/db/s/sharding_task_executor.cpp +++ b/src/mongo/db/s/sharding_task_executor.cpp @@ -38,8 +38,11 @@ #include "mongo/db/logical_time.h" #include "mongo/db/operation_time_tracker.h" #include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/sharding_metadata.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_last_error_info.h" +#include "mongo/s/grid.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -116,10 +119,17 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCom const TaskExecutor::RemoteCommandCallbackArgs& args) { ON_BLOCK_EXIT([&cb, &args]() { cb(args); }); + auto shard = grid.shardRegistry()->getShardForHostNoReload(request.target); + if (!shard) { + LOG(1) << "Could not find shard containing host: " << request.target.toString(); + return; + } if (!args.response.isOK()) { + shard->updateReplSetMonitor(request.target, args.response.status); LOG(1) << "Error processing the remote request, not updating operationTime or gLE"; return; } + shard->updateReplSetMonitor(request.target, getStatusFromCommandResult(args.response.data)); // Update the logical clock. invariant(timeTracker); diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 037765fd5f9..5a550ae7e24 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -334,7 +334,6 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent( // the new event right away to propagate the fact that the previous event had been signaled to // the new event. signalCurrentEventIfReady_inlock(); - return eventToReturn; } @@ -398,22 +397,11 @@ void AsyncResultsMerger::handleBatchResponse( // Early return from this point on signal anyone waiting on an event, if ready() is true. ScopeGuard signaller = MakeGuard(&AsyncResultsMerger::signalCurrentEventIfReady_inlock, this); - StatusWith<CursorResponse> cursorResponseStatus( cbData.response.isOK() ? parseCursorResponse(cbData.response.data, remote) : cbData.response.status); - if (!cursorResponseStatus.isOK()) { - auto shard = remote.getShard(); - if (!shard) { - remote.status = Status(cursorResponseStatus.getStatus().code(), - str::stream() << "Could not find shard containing host " - << remote.getTargetHost().toString()); - } else { - shard->updateReplSetMonitor(remote.getTargetHost(), cursorResponseStatus.getStatus()); - remote.status = cursorResponseStatus.getStatus(); - } - + remote.status = cursorResponseStatus.getStatus(); // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We // remove the unreachable host entirely from consideration by marking it as exhausted. if (_params->isAllowPartialResults) { |