diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/base_cloner_test_fixture.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/base_cloner_test_fixture.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 417 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 92 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 894 | ||||
-rw-r--r-- | src/mongo/db/repl/database_cloner.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_task_executor.cpp | 10 |
7 files changed, 175 insertions, 1251 deletions
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index fa418d338fb..e1a8131fb10 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -76,11 +76,6 @@ BSONObj BaseClonerTest::createCursorResponse(CursorId cursorId, const BSONArray& } // static -BSONObj BaseClonerTest::createFinalCursorResponse(const BSONArray& docs) { - return createCursorResponse(0, docs, "nextBatch"); -} - -// static BSONObj BaseClonerTest::createListCollectionsResponse(CursorId cursorId, const BSONArray& colls, const char* fieldName) { diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h index e5615806fc6..fb8ecf22861 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.h +++ b/src/mongo/db/repl/base_cloner_test_fixture.h @@ -76,8 +76,6 @@ public: static BSONObj createCursorResponse(CursorId cursorId, const BSONArray& docs); - static BSONObj createFinalCursorResponse(const BSONArray& docs); - /** * Creates a listCollections response with given array of index specs. */ diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 98d658b49f9..13b27d0ed01 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -74,9 +74,9 @@ MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncCollectionFindAttempts, int, 3); // collection 'namespace'. MONGO_FP_DECLARE(initialSyncHangDuringCollectionClone); -// Failpoint which causes initial sync to hang after handling the next batch of results from the -// 'AsyncResultsMerger' for a specific collection. -MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterHandlingBatchResponse); +// Failpoint which causes initial sync to hang after the initial 'find' command of collection +// cloning, for a specific collection. +MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterInitialFind); CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, OldThreadPool* dbWorkThreadPool, @@ -85,8 +85,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, const CollectionOptions& options, const CallbackFn& onCompletion, StorageInterface* storageInterface, - const int batchSize, - const int maxNumClonerCursors) + const int batchSize) : _executor(executor), _dbWorkThreadPool(dbWorkThreadPool), _source(source), @@ -123,7 +122,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, executor::RemoteCommandRequest::kNoTimeout, RemoteCommandRetryScheduler::kAllRetriableErrors)), _indexSpecs(), - _documentsToInsert(), + _documents(), _dbWorkTaskRunner(_dbWorkThreadPool), _scheduleDbWorkFn([this](const executor::TaskExecutor::CallbackFn& work) { auto task = [work](OperationContext* opCtx, @@ -139,8 +138,9 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, kProgressMeterCheckInterval, "documents copied", str::stream() << _sourceNss.toString() << " collection clone progress"), - _collectionCloningBatchSize(batchSize), - _maxNumClonerCursors(maxNumClonerCursors) { + _batchSize(batchSize), + _arm(executor, + stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator()).get()) { // Fetcher throws an exception on null executor. invariant(executor); uassert(ErrorCodes::BadValue, @@ -215,19 +215,15 @@ void CollectionCloner::shutdown() { // Nothing to do if we are already in ShuttingDown or Complete state. return; } + _cancelRemainingWork_inlock(); } void CollectionCloner::_cancelRemainingWork_inlock() { - if (_arm) { - Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - _killArmHandle = _arm->kill(opCtx); - } _countScheduler.shutdown(); _listIndexesFetcher.shutdown(); - if (_establishCollectionCursorsScheduler) { - _establishCollectionCursorsScheduler->shutdown(); + if (_findFetcher) { + _findFetcher->shutdown(); } _dbWorkTaskRunner.cancel(); } @@ -239,9 +235,6 @@ CollectionCloner::Stats CollectionCloner::getStats() const { void CollectionCloner::join() { stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_killArmHandle) { - _executor->waitForEvent(_killArmHandle); - } _condition.wait(lk, [this]() { return !_isActive_inlock(); }); } @@ -257,11 +250,6 @@ void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& sched _scheduleDbWorkFn = scheduleDbWorkFn; } -std::vector<BSONObj> CollectionCloner::getDocumentsToInsert_forTest() { - LockGuard lk(_mutex); - return _documentsToInsert; -} - void CollectionCloner::_countCallback( const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { @@ -403,181 +391,80 @@ void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& } } -void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) { - if (!cbd.status.isOK()) { - _finishCallback(cbd.status); - return; - } - if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) { - warning() - << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:" - << this->_sourceNss; - } +void CollectionCloner::_findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob, + std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + if (!fetchResult.isOK()) { + // Wait for active inserts to complete. + waitForDbWorker(); - auto collectionBulkLoader = _storageInterface->createCollectionForBulkLoading( - _destNss, _options, _idIndexSpec, _indexSpecs); + Status newStatus{fetchResult.getStatus().code(), + str::stream() << "While querying collection '" << _sourceNss.ns() + << "' there was an error '" + << fetchResult.getStatus().reason() + << "'"}; - if (!collectionBulkLoader.isOK()) { - _finishCallback(collectionBulkLoader.getStatus()); + stdx::lock_guard<stdx::mutex> lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, newStatus); return; } - _stats.indexes = _indexSpecs.size(); - if (!_idIndexSpec.isEmpty()) { - ++_stats.indexes; - } - - _collLoader = std::move(collectionBulkLoader.getValue()); - - BSONObjBuilder cmdObj; - EstablishCursorsCommand cursorCommand; - // The 'find' command is used when the number of cloning cursors is 1 to ensure - // the correctness of the collection cloning process until 'parallelCollectionScan' - // can be tested more extensively in context of initial sync. - if (_maxNumClonerCursors == 1) { - cmdObj.append("find", _sourceNss.coll()); - cmdObj.append("noCursorTimeout", true); - // Set batchSize to be 0 to establish the cursor without fetching any documents, - // similar to the response format of 'parallelCollectionScan'. - cmdObj.append("batchSize", 0); - cursorCommand = Find; - } else { - cmdObj.append("parallelCollectionScan", _sourceNss.coll()); - cmdObj.append("numCursors", _maxNumClonerCursors); - cursorCommand = ParallelCollScan; + auto batchData(fetchResult.getValue()); + bool lastBatch = *nextAction == Fetcher::NextAction::kNoAction; + if (batchData.documents.size() > 0) { + LockGuard lk(_mutex); + _documents.insert(_documents.end(), batchData.documents.begin(), batchData.documents.end()); + } else if (!batchData.first) { + warning() << "No documents returned in batch; ns: " << _sourceNss + << ", cursorId:" << batchData.cursorId << ", isLastBatch:" << lastBatch; } - Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - - _establishCollectionCursorsScheduler = stdx::make_unique<RemoteCommandRetryScheduler>( - _executor, - RemoteCommandRequest(_source, - _sourceNss.db().toString(), - cmdObj.obj(), - ReadPreferenceSetting::secondaryPreferredMetadata(), - opCtx, - RemoteCommandRequest::kNoTimeout), - stdx::bind(&CollectionCloner::_establishCollectionCursorsCallback, - this, - stdx::placeholders::_1, - cursorCommand), - RemoteCommandRetryScheduler::makeRetryPolicy( - numInitialSyncCollectionFindAttempts.load(), - executor::RemoteCommandRequest::kNoTimeout, - RemoteCommandRetryScheduler::kAllRetriableErrors)); - auto scheduleStatus = _establishCollectionCursorsScheduler->startup(); - LOG(1) << "Attempting to establish cursors with maxNumClonerCursors: " << _maxNumClonerCursors; + auto&& scheduleResult = + _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback, + this, + stdx::placeholders::_1, + lastBatch, + onCompletionGuard)); + if (!scheduleResult.isOK()) { + Status newStatus{scheduleResult.getStatus().code(), + str::stream() << "While cloning collection '" << _sourceNss.ns() + << "' there was an error '" + << scheduleResult.getStatus().reason() + << "'"}; - if (!scheduleStatus.isOK()) { - _establishCollectionCursorsScheduler.reset(); - _finishCallback(scheduleStatus); + stdx::lock_guard<stdx::mutex> lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, newStatus); return; } -} -Status CollectionCloner::_parseCursorResponse(BSONObj response, - std::vector<CursorResponse>* cursors, - EstablishCursorsCommand cursorCommand) { - switch (cursorCommand) { - case Find: { - StatusWith<CursorResponse> findResponse = CursorResponse::parseFromBSON(response); - if (!findResponse.isOK()) { - Status errorStatus{findResponse.getStatus().code(), - str::stream() - << "While parsing the 'find' query against collection '" - << _sourceNss.ns() - << "' there was an error '" - << findResponse.getStatus().reason() - << "'"}; - return errorStatus; - } - cursors->push_back(std::move(findResponse.getValue())); - break; - } - case ParallelCollScan: { - auto cursorElements = _parseParallelCollectionScanResponse(response); - if (!cursorElements.isOK()) { - return cursorElements.getStatus(); - } - std::vector<BSONElement> cursorsArray; - cursorsArray = cursorElements.getValue(); - // Parse each BSONElement into a 'CursorResponse' object. - for (BSONElement cursor : cursorsArray) { - if (!cursor.isABSONObj()) { - Status errorStatus( - ErrorCodes::FailedToParse, - "The 'cursor' field in the list of cursor responses is not a " - "valid BSON Object"); - return errorStatus; - } - const BSONObj cursorObj = cursor.Obj().getOwned(); - StatusWith<CursorResponse> parallelCollScanResponse = - CursorResponse::parseFromBSON(cursorObj); - if (!parallelCollScanResponse.isOK()) { - return parallelCollScanResponse.getStatus(); - } - cursors->push_back(std::move(parallelCollScanResponse.getValue())); + MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterInitialFind, nssData) { + const BSONObj& data = nssData.getData(); + auto nss = data["nss"].str(); + // Only hang when cloning the specified collection. + if (_destNss.toString() == nss) { + while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterInitialFind) && + !_isShuttingDown()) { + log() << "initialSyncHangCollectionClonerAfterInitialFind fail point enabled for " + << nss << ". Blocking until fail point is disabled."; + mongo::sleepsecs(1); } - break; - } - default: { - Status errorStatus( - ErrorCodes::FailedToParse, - "The command used to establish the collection cloner cursors is not valid."); - return errorStatus; } } - return Status::OK(); -} -void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd, - EstablishCursorsCommand cursorCommand) { - if (_state == State::kShuttingDown) { - Status shuttingDownStatus{ErrorCodes::CallbackCanceled, "Cloner shutting down."}; - _finishCallback(shuttingDownStatus); - return; - } - auto response = rcbd.response; - if (!response.isOK()) { - _finishCallback(response.status); - return; - } - Status commandStatus = getStatusFromCommandResult(response.data); - if (!commandStatus.isOK()) { - Status newStatus{commandStatus.code(), - str::stream() << "While querying collection '" << _sourceNss.ns() - << "' there was an error '" - << commandStatus.reason() - << "'"}; - _finishCallback(commandStatus); - return; + if (!lastBatch) { + invariant(getMoreBob); + getMoreBob->append("getMore", batchData.cursorId); + getMoreBob->append("collection", batchData.nss.coll()); + getMoreBob->append("batchSize", _batchSize); } +} - std::vector<CursorResponse> cursorResponses; - Status parseResponseStatus = - _parseCursorResponse(response.data, &cursorResponses, cursorCommand); - if (!parseResponseStatus.isOK()) { - _finishCallback(parseResponseStatus); +void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) { + if (!cbd.status.isOK()) { + _finishCallback(cbd.status); return; } - LOG(1) << "Collection cloner running with " << cursorResponses.size() - << " cursors established."; - - // Initialize the 'AsyncResultsMerger'(ARM). - std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; - for (auto&& cursorResponse : cursorResponses) { - // A placeholder 'ShardId' is used until the ARM is made less sharding specific. - remoteCursors.emplace_back( - ShardId("CollectionClonerSyncSource"), _source, std::move(cursorResponse)); - } - - // An empty list of authenticated users is passed into the cluster parameters - // as user information is not used in the ARM in context of collection cloning. - _clusterClientCursorParams = - stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator()); - _clusterClientCursorParams->remotes = std::move(remoteCursors); - _arm = stdx::make_unique<AsyncResultsMerger>(_executor, _clusterClientCursorParams.get()); // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; @@ -589,134 +476,50 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa // that will cause the destructor of the completion guard to run, the destructor must be run // outside the mutex. This is a necessary condition to invoke _finishCallback. stdx::lock_guard<stdx::mutex> lock(_mutex); - Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); - if (!scheduleStatus.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); - return; - } -} - -StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionScanResponse( - BSONObj resp) { - if (!resp.hasField("cursors")) { - return Status(ErrorCodes::CursorNotFound, - "The 'parallelCollectionScan' response does not contain a 'cursors' field."); - } - BSONElement response = resp["cursors"]; - if (response.type() == BSONType::Array) { - return response.Array(); - } else { - return Status( - ErrorCodes::FailedToParse, - "The 'parallelCollectionScan' response is unable to be transformed into an array."); - } -} - -Status CollectionCloner::_bufferNextBatchFromArm() { - while (_arm->ready()) { - auto armResultStatus = _arm->nextReady(); - if (!armResultStatus.getStatus().isOK()) { - return armResultStatus.getStatus(); - } - if (armResultStatus.getValue().isEOF()) { - // We have reached the end of the batch. - break; - } else { - auto queryResult = armResultStatus.getValue().getResult(); - _documentsToInsert.push_back(std::move(*queryResult)); - } + if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) { + warning() + << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:" + << this->_sourceNss; } - return Status::OK(); -} - -Status CollectionCloner::_scheduleNextARMResultsCallback( - std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - auto nextEvent = _arm->nextEvent(opCtx); - if (!nextEvent.isOK()) { - return nextEvent.getStatus(); - } - auto event = nextEvent.getValue(); - auto handleARMResultsOnNextEvent = - _executor->onEvent(event, - stdx::bind(&CollectionCloner::_handleARMResultsCallback, - this, - stdx::placeholders::_1, - onCompletionGuard)); - return handleARMResultsOnNextEvent.getStatus(); -} - -void CollectionCloner::_handleARMResultsCallback( - const executor::TaskExecutor::CallbackArgs& cbd, - std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - auto setResultAndCancelRemainingWork = [this](std::shared_ptr<OnCompletionGuard> guard, - Status status) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - guard->setResultAndCancelRemainingWork_inlock(lock, status); - return; - }; + auto status = _storageInterface->createCollectionForBulkLoading( + _destNss, _options, _idIndexSpec, _indexSpecs); - if (!cbd.status.isOK()) { - // Wait for active inserts to complete. - waitForDbWorker(); - Status newStatus{cbd.status.code(), - str::stream() << "While querying collection '" << _sourceNss.ns() - << "' there was an error '" - << cbd.status.reason() - << "'"}; - setResultAndCancelRemainingWork(onCompletionGuard, cbd.status); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status.getStatus()); return; } - // Pull the documents from the ARM into a buffer until the entire batch has been processed. - auto nextBatchStatus = _bufferNextBatchFromArm(); - if (!nextBatchStatus.isOK()) { - setResultAndCancelRemainingWork(onCompletionGuard, nextBatchStatus); - return; + _stats.indexes = _indexSpecs.size(); + if (!_idIndexSpec.isEmpty()) { + ++_stats.indexes; } - bool lastBatch = _arm->remotesExhausted(); - auto&& scheduleResult = - _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback, - this, - stdx::placeholders::_1, - lastBatch, - onCompletionGuard)); - if (!scheduleResult.isOK()) { - Status newStatus{scheduleResult.getStatus().code(), - str::stream() << "While cloning collection '" << _sourceNss.ns() - << "' there was an error '" - << scheduleResult.getStatus().reason() - << "'"}; - setResultAndCancelRemainingWork(onCompletionGuard, scheduleResult.getStatus()); - return; - } + _collLoader = std::move(status.getValue()); - MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterHandlingBatchResponse, nssData) { - const BSONObj& data = nssData.getData(); - auto nss = data["nss"].str(); - // Only hang when cloning the specified collection. - if (_destNss.toString() == nss) { - while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterHandlingBatchResponse) && - !_isShuttingDown()) { - log() << "initialSyncHangCollectionClonerAfterHandlingBatchResponse fail point " - "enabled for " - << nss << ". Blocking until fail point is disabled."; - mongo::sleepsecs(1); - } - } - } + _findFetcher = stdx::make_unique<Fetcher>( + _executor, + _source, + _sourceNss.db().toString(), + BSON("find" << _sourceNss.coll() << "noCursorTimeout" << true << "batchSize" << _batchSize), + stdx::bind(&CollectionCloner::_findCallback, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3, + onCompletionGuard), + ReadPreferenceSetting::secondaryPreferredMetadata(), + RemoteCommandRequest::kNoTimeout, + RemoteCommandRetryScheduler::makeRetryPolicy( + numInitialSyncCollectionFindAttempts.load(), + executor::RemoteCommandRequest::kNoTimeout, + RemoteCommandRetryScheduler::kAllRetriableErrors)); - // If the remote cursors are not exhausted, schedule this callback again to handle - // the impending cursor response. - if (!lastBatch) { - Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); - if (!scheduleStatus.isOK()) { - setResultAndCancelRemainingWork(onCompletionGuard, scheduleStatus); - return; - } + Status scheduleStatus = _findFetcher->schedule(); + if (!scheduleStatus.isOK()) { + _findFetcher.reset(); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); + return; } } @@ -730,21 +533,24 @@ void CollectionCloner::_insertDocumentsCallback( return; } - UniqueLock lk(_mutex); std::vector<BSONObj> docs; - if (_documentsToInsert.size() == 0) { + UniqueLock lk(_mutex); + if (_documents.size() == 0) { warning() << "_insertDocumentsCallback, but no documents to insert for ns:" << _destNss; + if (lastBatch) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK()); } return; } - _documentsToInsert.swap(docs); + + _documents.swap(docs); _stats.documentsCopied += docs.size(); ++_stats.fetchBatches; _progressMeter.hit(int(docs.size())); invariant(_collLoader); const auto status = _collLoader->insertDocuments(docs.cbegin(), docs.cend()); + if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, status); return; @@ -764,10 +570,12 @@ void CollectionCloner::_insertDocumentsCallback( } } - if (lastBatch) { - // Clean up resources once the last batch has been copied over and set the status to OK. - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK()); + if (!lastBatch) { + return; } + + // Done with last batch and time to set result in completion guard to Status::OK(). + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK()); } void CollectionCloner::_finishCallback(const Status& status) { @@ -776,6 +584,7 @@ void CollectionCloner::_finishCallback(const Status& status) { // Copy the status so we can change it below if needed. auto finalStatus = status; bool callCollectionLoader = false; + decltype(_onCompletion) onCompletion; { LockGuard lk(_mutex); @@ -786,6 +595,7 @@ void CollectionCloner::_finishCallback(const Status& status) { invariant(_onCompletion); std::swap(_onCompletion, onCompletion); } + if (callCollectionLoader) { if (finalStatus.isOK()) { const auto loaderStatus = _collLoader->commit(); @@ -799,6 +609,7 @@ void CollectionCloner::_finishCallback(const Status& status) { // This will release the resources held by the loader. _collLoader.reset(); } + onCompletion(finalStatus); // This will release the resources held by the callback function object. '_onCompletion' is diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 222886b9d72..af029b0f1cd 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -69,7 +69,6 @@ public: /** * Callback completion guard for CollectionCloner. */ - using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; using OnCompletionGuard = CallbackCompletionGuard<Status>; struct Stats { @@ -112,8 +111,7 @@ public: const CollectionOptions& options, const CallbackFn& onCompletion, StorageInterface* storageInterface, - const int batchSize, - const int maxNumClonerCursors); + const int batchSize); virtual ~CollectionCloner(); @@ -148,14 +146,6 @@ public: */ void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn); - /** - * Returns the documents currently stored in the '_documents' buffer that is intended - * to be inserted through the collection loader. - * - * For testing only. - */ - std::vector<BSONObj> getDocumentsToInsert_forTest(); - private: bool _isActive_inlock() const; @@ -183,6 +173,14 @@ private: BSONObjBuilder* getMoreBob); /** + * Read collection documents from find result. + */ + void _findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob, + std::shared_ptr<OnCompletionGuard> onCompletionGuard); + + /** * Request storage interface to create collection. * * Called multiple times if there are more than one batch of responses from listIndexes @@ -194,59 +192,13 @@ private: void _beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& callbackData); /** - * The possible command types that can be used to establish the initial cursors on the - * remote collection. - */ - enum EstablishCursorsCommand { Find, ParallelCollScan }; - - /** - * Parses the cursor responses from the 'find' or 'parallelCollectionScan' command - * and passes them into the 'AsyncResultsMerger'. - */ - void _establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd, - EstablishCursorsCommand cursorCommand); - - /** - * Parses the response from a 'parallelCollectionScan' command into a vector of cursor - * elements. - */ - StatusWith<std::vector<BSONElement>> _parseParallelCollectionScanResponse(BSONObj resp); - - /** - * Takes a cursors buffer and parses the 'parallelCollectionScan' response into cursor - * responses that are pushed onto the buffer. - */ - Status _parseCursorResponse(BSONObj response, - std::vector<CursorResponse>* cursors, - EstablishCursorsCommand cursorCommand); - - /** - * Calls to get the next event from the 'AsyncResultsMerger'. This schedules - * '_handleAsyncResultsCallback' to be run when the event is signaled successfully. - */ - Status _scheduleNextARMResultsCallback(std::shared_ptr<OnCompletionGuard> onCompletionGuard); - - /** - * Runs for each time a new batch of documents can be retrieved from the 'AsyncResultsMerger'. - * Buffers the documents retrieved for insertion and schedules a '_insertDocumentsCallback' - * to insert the contents of the buffer. - */ - void _handleARMResultsCallback(const executor::TaskExecutor::CallbackArgs& cbd, - std::shared_ptr<OnCompletionGuard> onCompletionGuard); - - /** - * Pull all ready results from the ARM into a buffer to be inserted. - */ - Status _bufferNextBatchFromArm(); - - /** - * Called whenever there is a new batch of documents ready from the 'AsyncResultsMerger'. + * Called multiple times if there are more than one batch of documents from the fetcher. * On the last batch, 'lastBatch' will be true. * * Each document returned will be inserted via the storage interfaceRequest storage * interface. */ - void _insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd, + void _insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& callbackData, bool lastBatch, std::shared_ptr<OnCompletionGuard> onCompletionGuard); @@ -279,30 +231,18 @@ private: StorageInterface* _storageInterface; // (R) Not owned by us. RemoteCommandRetryScheduler _countScheduler; // (S) Fetcher _listIndexesFetcher; // (S) + std::unique_ptr<Fetcher> _findFetcher; // (M) std::vector<BSONObj> _indexSpecs; // (M) BSONObj _idIndexSpec; // (M) - std::vector<BSONObj> - _documentsToInsert; // (M) Documents read from 'AsyncResultsMerger' to insert. - TaskRunner _dbWorkTaskRunner; // (R) + std::vector<BSONObj> _documents; // (M) Documents read from fetcher to insert. + TaskRunner _dbWorkTaskRunner; // (R) ScheduleDbWorkFn _scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor. Stats _stats; // (M) stats for this instance. ProgressMeter _progressMeter; // (M) progress meter for this instance. - const int _collectionCloningBatchSize; // (R) The size of the batches of documents returned in - // collection cloning. - - // (R) The maximum number of cursors to use in the collection cloning process. - const int _maxNumClonerCursors; - // (M) Component responsible for fetching the documents from the collection cloner cursor(s). - std::unique_ptr<AsyncResultsMerger> _arm; - // (R) The cursor parameters used by the 'AsyncResultsMerger'. - std::unique_ptr<ClusterClientCursorParams> _clusterClientCursorParams; - - // (M) The event handle for the 'kill' event of the 'AsyncResultsMerger'. - executor::TaskExecutor::EventHandle _killArmHandle; + const int _batchSize; - // (M) Scheduler used to establish the initial cursor or set of cursors. - std::unique_ptr<RemoteCommandRetryScheduler> _establishCollectionCursorsScheduler; + AsyncResultsMerger _arm; // State transitions: // PreStart --> Running --> ShuttingDown --> Complete diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index d3e8966a304..4fb7a7fe208 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -66,12 +66,8 @@ protected: void setUp() override; void tearDown() override; - // A simple arbitrary value to use as the default batch size. - const int defaultBatchSize = 1024; - - // Running initial sync with a single cursor will default to using the 'find' command until - // 'parallelCollectionScan' has more complete testing. - const int defaultNumCloningCursors = 1; + // 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use. + const int defaultBatchSize = (16 * 1024 * 1024) / 12 * 10; CollectionOptions options; std::unique_ptr<CollectionCloner> collectionCloner; @@ -91,8 +87,7 @@ void CollectionClonerTest::setUp() { options, stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), storageInterface.get(), - defaultBatchSize, - defaultNumCloningCursors); + defaultBatchSize); collectionStats = CollectionMockStats(); storageInterface->createCollectionForBulkFn = [this](const NamespaceString& nss, @@ -119,7 +114,6 @@ BaseCloner* CollectionClonerTest::getCloner() const { return collectionCloner.get(); } - TEST_F(CollectionClonerTest, InvalidConstruction) { executor::TaskExecutor& executor = getExecutor(); auto pool = dbWorkThreadPool.get(); @@ -129,50 +123,29 @@ TEST_F(CollectionClonerTest, InvalidConstruction) { // Null executor -- error from Fetcher, not CollectionCloner. { StorageInterface* si = storageInterface.get(); - ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(nullptr, - pool, - target, - nss, - options, - cb, - si, - defaultBatchSize, - defaultNumCloningCursors), - UserException, - ErrorCodes::BadValue, - "task executor cannot be null"); + ASSERT_THROWS_CODE_AND_WHAT( + CollectionCloner(nullptr, pool, target, nss, options, cb, si, defaultBatchSize), + UserException, + ErrorCodes::BadValue, + "task executor cannot be null"); } // Null storage interface - ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(&executor, - pool, - target, - nss, - options, - cb, - nullptr, - defaultBatchSize, - defaultNumCloningCursors), - UserException, - ErrorCodes::BadValue, - "storage interface cannot be null"); + ASSERT_THROWS_CODE_AND_WHAT( + CollectionCloner(&executor, pool, target, nss, options, cb, nullptr, defaultBatchSize), + UserException, + ErrorCodes::BadValue, + "storage interface cannot be null"); // Invalid namespace. { NamespaceString badNss("db."); StorageInterface* si = storageInterface.get(); - ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(&executor, - pool, - target, - badNss, - options, - cb, - si, - defaultBatchSize, - defaultNumCloningCursors), - UserException, - ErrorCodes::BadValue, - "invalid collection namespace: db."); + ASSERT_THROWS_CODE_AND_WHAT( + CollectionCloner(&executor, pool, target, badNss, options, cb, si, defaultBatchSize), + UserException, + ErrorCodes::BadValue, + "invalid collection namespace: db."); } // Invalid collection options - error from CollectionOptions::validate(), not CollectionCloner. @@ -182,15 +155,8 @@ TEST_F(CollectionClonerTest, InvalidConstruction) { << "not a document"); StorageInterface* si = storageInterface.get(); ASSERT_THROWS_CODE_AND_WHAT( - CollectionCloner(&executor, - pool, - target, - nss, - invalidOptions, - cb, - si, - defaultBatchSize, - defaultNumCloningCursors), + CollectionCloner( + &executor, pool, target, nss, invalidOptions, cb, si, defaultBatchSize), UserException, ErrorCodes::BadValue, "'storageEngine.storageEngine1' has to be an embedded document."); @@ -200,18 +166,11 @@ TEST_F(CollectionClonerTest, InvalidConstruction) { { CollectionCloner::CallbackFn nullCb; StorageInterface* si = storageInterface.get(); - ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(&executor, - pool, - target, - nss, - options, - nullCb, - si, - defaultBatchSize, - defaultNumCloningCursors), - UserException, - ErrorCodes::BadValue, - "callback function cannot be null"); + ASSERT_THROWS_CODE_AND_WHAT( + CollectionCloner(&executor, pool, target, nss, options, nullCb, si, defaultBatchSize), + UserException, + ErrorCodes::BadValue, + "callback function cannot be null"); } } @@ -357,8 +316,7 @@ TEST_F(CollectionClonerTest, options, stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), storageInterface.get(), - defaultBatchSize, - defaultNumCloningCursors); + defaultBatchSize); ASSERT_OK(collectionCloner->startup()); @@ -381,8 +339,7 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { options, stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), storageInterface.get(), - defaultBatchSize, - defaultNumCloningCursors)); + defaultBatchSize)); NamespaceString collNss; CollectionOptions collOptions; @@ -413,19 +370,10 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); + processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc))); } collectionCloner->join(); ASSERT_EQUALS(1, collectionStats.insertCount); @@ -751,7 +699,7 @@ TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) { ASSERT_FALSE(net->hasReadyRequests()); } -TEST_F(CollectionClonerTest, EstablishCursorCommandFailed) { +TEST_F(CollectionClonerTest, FindCommandFailed) { ASSERT_OK(collectionCloner->startup()); { @@ -799,15 +747,13 @@ TEST_F(CollectionClonerTest, CollectionClonerResendsFindCommandOnRetriableError) net->runReadyNetworkOperations(); ASSERT_TRUE(collectionCloner->isActive()); - // This check exists to ensure that the command used to establish the cursors is retried, - // regardless of the command format. Therefore, it shouldn't be necessary to have a separate - // similar test case for the 'parallelCollectionScan' command. + // Confirm that CollectionCloner resends the find request. auto noi = net->getNextReadyRequest(); assertRemoteCommandNameEquals("find", noi->getRequest()); net->blackHole(noi); } -TEST_F(CollectionClonerTest, EstablishCursorCommandCanceled) { +TEST_F(CollectionClonerTest, FindCommandCanceled) { ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); @@ -864,19 +810,10 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) { return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, ""); }); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); + processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc))); } ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code()); @@ -907,18 +844,9 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) { return StatusWith<executor::TaskExecutor::CallbackHandle>(handle); }); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1)))); + processNetworkResponse(createCursorResponse(0, BSON_ARRAY(BSON("_id" << 1)))); } collectionCloner->join(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); @@ -947,18 +875,9 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) { return Status(ErrorCodes::OperationFailed, ""); }; - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1)))); + processNetworkResponse(createCursorResponse(0, BSON_ARRAY(BSON("_id" << 1)))); } collectionCloner->join(); @@ -983,19 +902,10 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) { ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); + processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc))); } collectionCloner->join(); @@ -1023,15 +933,6 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) { ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); @@ -1049,7 +950,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) { const BSONObj doc2 = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2))); + processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc2), "nextBatch")); } collectionCloner->join(); @@ -1077,15 +978,6 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) { ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); @@ -1110,9 +1002,10 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) { ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); ASSERT_TRUE(collectionCloner->isActive()); + BSONArray emptyArray; { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(emptyArray)); + processNetworkResponse(createCursorResponse(0, emptyArray, "nextBatch")); } collectionCloner->join(); @@ -1138,15 +1031,6 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) { ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); @@ -1159,6 +1043,7 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) { ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); ASSERT_TRUE(collectionCloner->isActive()); + BSONArray emptyArray; { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch")); @@ -1173,7 +1058,7 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) { const BSONObj doc2 = BSON("_id" << 2); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2))); + processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc2), "nextBatch")); } collectionCloner->join(); @@ -1212,15 +1097,6 @@ TEST_F(CollectionClonerTest, CollectionClonerCannotBeRestartedAfterPreviousFailu ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse(createCursorResponse(1, BSON_ARRAY(BSON("_id" << 1)))); @@ -1280,8 +1156,7 @@ TEST_F(CollectionClonerTest, CollectionClonerResetsOnCompletionCallbackFunctionA result = status; }, storageInterface.get(), - defaultBatchSize, - defaultNumCloningCursors); + defaultBatchSize); ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); @@ -1327,620 +1202,7 @@ TEST_F(CollectionClonerTest, ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - // At this point, the CollectionCloner has sent the find request to establish the cursor. - // We want to return the first batch of documents for the collection from the network so that - // the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for - // the next batch of documents. - - // Store the scheduled CollectionCloner::_insertDocuments task but do not run it yet. - executor::TaskExecutor::CallbackFn insertDocumentsFn; - collectionCloner->setScheduleDbWorkFn_forTest( - [&](const executor::TaskExecutor::CallbackFn& workFn) { - insertDocumentsFn = workFn; - executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>()); - return StatusWith<executor::TaskExecutor::CallbackHandle>(handle); - }); - ASSERT_FALSE(insertDocumentsFn); - - // Return first batch of collection documents from remote server for the getMore request. - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - - assertRemoteCommandNameEquals( - "getMore", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc)))); - net->runReadyNetworkOperations(); - } - - // Confirm that CollectionCloner attempted to schedule _insertDocuments task. - ASSERT_TRUE(insertDocumentsFn); - - // Return an error for the getMore request for the next batch of collection documents. - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - - assertRemoteCommandNameEquals( - "getMore", - net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "getMore failed"))); - net->runReadyNetworkOperations(); - } - - // CollectionCloner should still be active because we have not finished processing the - // insertDocuments task. - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - - // Run the insertDocuments task. The final status of the CollectionCloner should match the first - // error passed to the completion guard (ie. from the failed getMore request). - executor::TaskExecutor::CallbackArgs callbackArgs( - &getExecutor(), {}, Status(ErrorCodes::CallbackCanceled, "")); - insertDocumentsFn(callbackArgs); - - // Reset 'insertDocumentsFn' to release last reference count on completion guard. - insertDocumentsFn = {}; - - // No need to call CollectionCloner::join() because we invoked the _insertDocuments callback - // synchronously. - - ASSERT_FALSE(collectionCloner->isActive()); - ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); -} - -class ParallelCollectionClonerTest : public BaseClonerTest { -public: - BaseCloner* getCloner() const override; - -protected: - void setUp() override; - void tearDown() override; - std::vector<BSONObj> generateDocs(std::size_t numDocs); - - // A simple arbitrary value to use as the default batch size. - const int defaultBatchSize = 1024; - - // Running initial sync with a single cursor will default to using the 'find' command until - // 'parallelCollectionScan' has more complete testing. - const int defaultNumCloningCursors = 3; - - CollectionOptions options; - std::unique_ptr<CollectionCloner> collectionCloner; - CollectionMockStats collectionStats; // Used by the _loader. - CollectionBulkLoaderMock* _loader; // Owned by CollectionCloner. -}; - -void ParallelCollectionClonerTest::setUp() { - BaseClonerTest::setUp(); - options = {}; - collectionCloner.reset(nullptr); - collectionCloner = stdx::make_unique<CollectionCloner>( - &getExecutor(), - dbWorkThreadPool.get(), - target, - nss, - options, - stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), - storageInterface.get(), - defaultBatchSize, - defaultNumCloningCursors); - collectionStats = CollectionMockStats(); - storageInterface->createCollectionForBulkFn = - [this](const NamespaceString& nss, - const CollectionOptions& options, - const BSONObj idIndexSpec, - const std::vector<BSONObj>& secondaryIndexSpecs) { - _loader = new CollectionBulkLoaderMock(&collectionStats); - Status initCollectionBulkLoader = _loader->init(secondaryIndexSpecs); - ASSERT_OK(initCollectionBulkLoader); - - return StatusWith<std::unique_ptr<CollectionBulkLoader>>( - std::unique_ptr<CollectionBulkLoader>(_loader)); - }; -} - -void ParallelCollectionClonerTest::tearDown() { - BaseClonerTest::tearDown(); - // Executor may still invoke collection cloner's callback before shutting down. - collectionCloner.reset(nullptr); - options = {}; -} - -BaseCloner* ParallelCollectionClonerTest::getCloner() const { - return collectionCloner.get(); -} - -std::vector<BSONObj> ParallelCollectionClonerTest::generateDocs(std::size_t numDocs) { - std::vector<BSONObj> docs; - for (unsigned int i = 0; i < numDocs; i++) { - docs.push_back(BSON("_id" << i)); - } - return docs; -} - -TEST_F(ParallelCollectionClonerTest, InsertDocumentsSingleBatchWithMultipleCloningCursors) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - // A single cursor response is returned because there is only a single document to insert. - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse( - BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)) << "ok" << 1)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - auto exec = &getExecutor(); - std::vector<BSONObj> docs; - // Record the buffered documents before they are inserted so we can - // validate them. - collectionCloner->setScheduleDbWorkFn_forTest( - [&](const executor::TaskExecutor::CallbackFn& workFn) { - docs = collectionCloner->getDocumentsToInsert_forTest(); - return exec->scheduleWork(workFn); - }); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); - } - - collectionCloner->join(); - - ASSERT_BSONOBJ_EQ(docs[0], doc); - ASSERT_EQUALS(1, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - -TEST_F(ParallelCollectionClonerTest, - InsertDocumentsSingleBatchOfMultipleDocumentsWithMultipleCloningCursors) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - // A single cursor response is returned because there is only a single batch of documents to - // insert. - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse( - BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)) << "ok" << 1)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - auto exec = &getExecutor(); - std::vector<BSONObj> docs; - // Record the buffered documents before they are inserted so we can - // validate them. - collectionCloner->setScheduleDbWorkFn_forTest( - [&](const executor::TaskExecutor::CallbackFn& workFn) { - docs = collectionCloner->getDocumentsToInsert_forTest(); - return exec->scheduleWork(workFn); - }); - - auto generatedDocs = generateDocs(3); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse( - BSON_ARRAY(generatedDocs[0] << generatedDocs[1] << generatedDocs[2]))); - } - - collectionCloner->join(); - - ASSERT_EQUALS(3U, docs.size()); - for (int i = 0; i < 3; i++) { - ASSERT_BSONOBJ_EQ(docs[i], generatedDocs[i]); - } - ASSERT_EQUALS(3, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - -TEST_F(ParallelCollectionClonerTest, InsertDocumentsWithMultipleCursorsOfDifferentNumberOfBatches) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) - << createCursorResponse(2, emptyArray) - << createCursorResponse(3, emptyArray)) - << "ok" - << 1)); - } - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - auto exec = &getExecutor(); - std::vector<BSONObj> docs; - - // Record the buffered documents before they are inserted so we can - // validate them. - collectionCloner->setScheduleDbWorkFn_forTest( - [&](const executor::TaskExecutor::CallbackFn& workFn) { - auto buffered = collectionCloner->getDocumentsToInsert_forTest(); - docs.insert(docs.end(), buffered.begin(), buffered.end()); - return exec->scheduleWork(workFn); - }); - - int numDocs = 9; - std::vector<BSONObj> generatedDocs = generateDocs(numDocs); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]), "nextBatch")); - processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]), "nextBatch")); - processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]), "nextBatch")); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(3U, docs.size()); - for (int i = 0; i < 3; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - ASSERT_EQUALS(3, collectionStats.insertCount); - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[3]), "nextBatch")); - processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[4]), "nextBatch")); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5]))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(6U, docs.size()); - for (int i = 3; i < 6; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - ASSERT_EQUALS(6, collectionStats.insertCount); - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[6]), "nextBatch")); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[7]))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(8U, docs.size()); - for (int i = 6; i < 8; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - ASSERT_EQUALS(8, collectionStats.insertCount); - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[8]))); - } - - collectionCloner->join(); - ASSERT_EQUALS(9U, docs.size()); - ASSERT_BSONOBJ_EQ(generatedDocs[8], docs[8]); - ASSERT_EQUALS(numDocs, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - -TEST_F(ParallelCollectionClonerTest, MiddleBatchContainsNoDocumentsWithMultipleCursors) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) - << createCursorResponse(2, emptyArray) - << createCursorResponse(3, emptyArray)) - << "ok" - << 1)); - } - collectionCloner->waitForDbWorker(); - - auto exec = &getExecutor(); - std::vector<BSONObj> docs; - // Record the buffered documents before they are inserted so we can - // validate them. - collectionCloner->setScheduleDbWorkFn_forTest( - [&](const executor::TaskExecutor::CallbackFn& workFn) { - auto buffered = collectionCloner->getDocumentsToInsert_forTest(); - docs.insert(docs.end(), buffered.begin(), buffered.end()); - return exec->scheduleWork(workFn); - }); - - ASSERT_TRUE(collectionCloner->isActive()); - - int numDocs = 6; - std::vector<BSONObj> generatedDocs = generateDocs(numDocs); - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]))); - processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]))); - processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(3U, docs.size()); - for (int i = 0; i < 3; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - ASSERT_EQUALS(3, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch")); - processNetworkResponse(createCursorResponse(2, emptyArray, "nextBatch")); - processNetworkResponse(createCursorResponse(3, emptyArray, "nextBatch")); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(3, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[3]))); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[4]))); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5]))); - } - - collectionCloner->join(); - - ASSERT_EQUALS(6U, docs.size()); - for (int i = 3; i < 6; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - - ASSERT_EQUALS(numDocs, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - -TEST_F(ParallelCollectionClonerTest, LastBatchContainsNoDocumentsWithMultipleCursors) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) - << createCursorResponse(2, emptyArray) - << createCursorResponse(3, emptyArray)) - << "ok" - << 1)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - auto exec = &getExecutor(); - std::vector<BSONObj> docs; - // Record the buffered documents before they are inserted so we can - // validate them. - collectionCloner->setScheduleDbWorkFn_forTest( - [&](const executor::TaskExecutor::CallbackFn& workFn) { - auto buffered = collectionCloner->getDocumentsToInsert_forTest(); - docs.insert(docs.end(), buffered.begin(), buffered.end()); - return exec->scheduleWork(workFn); - }); - - int numDocs = 6; - std::vector<BSONObj> generatedDocs = generateDocs(numDocs); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]), "nextBatch")); - processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]), "nextBatch")); - processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]), "nextBatch")); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(3U, docs.size()); - for (int i = 0; i < 3; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - ASSERT_EQUALS(3, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[3]), "nextBatch")); - processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[4]), "nextBatch")); - processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[5]), "nextBatch")); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(6U, docs.size()); - for (int i = 3; i < 6; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - ASSERT_EQUALS(numDocs, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(emptyArray)); - processNetworkResponse(createFinalCursorResponse(emptyArray)); - processNetworkResponse(createFinalCursorResponse(emptyArray)); - } - - collectionCloner->join(); - ASSERT_EQUALS(6, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - -TEST_F(ParallelCollectionClonerTest, InsertDocumentsScheduleDbWorkFailedWithMultipleCursors) { - ASSERT_OK(collectionCloner->startup()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - - collectionCloner->waitForDbWorker(); - - // Replace scheduleDbWork function so that cloner will fail to schedule DB work after - // getting documents. - collectionCloner->setScheduleDbWorkFn_forTest( - [](const executor::TaskExecutor::CallbackFn& workFn) { - return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, ""); - }); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) - << createCursorResponse(2, emptyArray) - << createCursorResponse(3, emptyArray)) - << "ok" - << 1)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); - } - - ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code()); - ASSERT_FALSE(collectionCloner->isActive()); -} - -TEST_F(ParallelCollectionClonerTest, - CollectionClonerWaitsForPendingTasksToCompleteBeforeInvokingOnCompletionCallback) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - auto net = getNet(); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(net); - - assertRemoteCommandNameEquals("count", - net->scheduleSuccessfulResponse(createCountResponse(0))); - net->runReadyNetworkOperations(); - - assertRemoteCommandNameEquals( - "listIndexes", - net->scheduleSuccessfulResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)))); - net->runReadyNetworkOperations(); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) - << createCursorResponse(2, emptyArray) - << createCursorResponse(3, emptyArray)) - << "ok" - << 1)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - // At this point, the CollectionCloner has sent the find request to establish the cursor. + // At this point, the CollectionCloner has sent the find request for the collection documents. // We want to return the first batch of documents for the collection from the network so that // the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for // the next batch of documents. @@ -1955,13 +1217,13 @@ TEST_F(ParallelCollectionClonerTest, }); ASSERT_FALSE(insertDocumentsFn); - // Return first batch of collection documents from remote server for the getMore request. + // Return first batch of collection documents from remote server for the find request. const BSONObj doc = BSON("_id" << 1); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); assertRemoteCommandNameEquals( - "getMore", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc)))); + "find", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc)))); net->runReadyNetworkOperations(); } @@ -1999,72 +1261,4 @@ TEST_F(ParallelCollectionClonerTest, ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); } -TEST_F(ParallelCollectionClonerTest, CollectionClonerCannotBeRestartedAfterPreviousFailure) { - // First cloning attempt - fails while reading documents from source collection. - unittest::log() << "Starting first collection cloning attempt"; - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) - << createCursorResponse(2, emptyArray) - << createCursorResponse(3, emptyArray)) - << "ok" - << 1)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - int numDocs = 5; - std::vector<BSONObj> docs = generateDocs(numDocs); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(docs[0]))); - processNetworkResponse(createCursorResponse(2, BSON_ARRAY(docs[1]))); - processNetworkResponse(createCursorResponse(3, BSON_ARRAY(docs[2]))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(3, collectionStats.insertCount); - - // Check that the status hasn't changed from the initial value. - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(docs[3]))); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(docs[4]))); - processNetworkResponse(ErrorCodes::OperationFailed, - "failed to read remaining documents from source collection"); - } - - collectionCloner->join(); - ASSERT_EQUALS(numDocs, collectionStats.insertCount); - - ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); - - // Second cloning attempt - run to completion. - unittest::log() << "Starting second collection cloning attempt - startup() should fail"; - collectionStats = CollectionMockStats(); - setStatus(getDetectableErrorStatus()); - - ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, collectionCloner->startup()); -} - } // namespace diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index bc7ea109918..0ebaf66e098 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -68,9 +68,6 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(collectionClonerBatchSize, int, defaultBat // The number of attempts for the listCollections commands. MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListCollectionsAttempts, int, 3); -// The number of cursors to use in the collection cloning process. -MONGO_EXPORT_SERVER_PARAMETER(maxNumInitialSyncCollectionClonerCursors, int, 1); - /** * Default listCollections predicate. */ @@ -367,8 +364,7 @@ void DatabaseCloner::_listCollectionsCallback(const StatusWith<Fetcher::QueryRes stdx::bind( &DatabaseCloner::_collectionClonerCallback, this, stdx::placeholders::_1, nss), _storageInterface, - collectionClonerBatchSize, - maxNumInitialSyncCollectionClonerCursors.load()); + collectionClonerBatchSize); } catch (const UserException& ex) { _finishCallback_inlock(lk, ex.toStatus()); return; diff --git a/src/mongo/db/s/sharding_task_executor.cpp b/src/mongo/db/s/sharding_task_executor.cpp index 0769a42ed40..24fc512459b 100644 --- a/src/mongo/db/s/sharding_task_executor.cpp +++ b/src/mongo/db/s/sharding_task_executor.cpp @@ -38,11 +38,8 @@ #include "mongo/db/logical_time.h" #include "mongo/db/operation_time_tracker.h" #include "mongo/executor/thread_pool_task_executor.h" -#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/sharding_metadata.h" -#include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_last_error_info.h" -#include "mongo/s/grid.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -119,17 +116,10 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCom const TaskExecutor::RemoteCommandCallbackArgs& args) { ON_BLOCK_EXIT([&cb, &args]() { cb(args); }); - auto shard = grid.shardRegistry()->getShardForHostNoReload(request.target); - if (!shard) { - LOG(1) << "Could not find shard containing host: " << request.target.toString(); - return; - } if (!args.response.isOK()) { - shard->updateReplSetMonitor(request.target, args.response.status); LOG(1) << "Error processing the remote request, not updating operationTime or gLE"; return; } - shard->updateReplSetMonitor(request.target, getStatusFromCommandResult(args.response.data)); // Update the logical clock. invariant(timeTracker); |