From 8ddfeab4644aa38082b3cd03cfeef1dd6c65d35a Mon Sep 17 00:00:00 2001 From: William Schultz Date: Fri, 11 Aug 2017 17:09:56 -0400 Subject: Revert "SERVER-29617 replace fetcher with ARM and add numCursors server parameter" This reverts commit 0d3137df3879e86d92904309e968f25529904639. --- src/mongo/db/repl/collection_cloner.cpp | 417 +++++++++----------------------- 1 file changed, 114 insertions(+), 303 deletions(-) (limited to 'src/mongo/db/repl/collection_cloner.cpp') diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 98d658b49f9..13b27d0ed01 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -74,9 +74,9 @@ MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncCollectionFindAttempts, int, 3); // collection 'namespace'. MONGO_FP_DECLARE(initialSyncHangDuringCollectionClone); -// Failpoint which causes initial sync to hang after handling the next batch of results from the -// 'AsyncResultsMerger' for a specific collection. -MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterHandlingBatchResponse); +// Failpoint which causes initial sync to hang after the initial 'find' command of collection +// cloning, for a specific collection. +MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterInitialFind); CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, OldThreadPool* dbWorkThreadPool, @@ -85,8 +85,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, const CollectionOptions& options, const CallbackFn& onCompletion, StorageInterface* storageInterface, - const int batchSize, - const int maxNumClonerCursors) + const int batchSize) : _executor(executor), _dbWorkThreadPool(dbWorkThreadPool), _source(source), @@ -123,7 +122,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, executor::RemoteCommandRequest::kNoTimeout, RemoteCommandRetryScheduler::kAllRetriableErrors)), _indexSpecs(), - _documentsToInsert(), + _documents(), _dbWorkTaskRunner(_dbWorkThreadPool), _scheduleDbWorkFn([this](const executor::TaskExecutor::CallbackFn& work) { auto task = [work](OperationContext* opCtx, @@ -139,8 +138,9 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, kProgressMeterCheckInterval, "documents copied", str::stream() << _sourceNss.toString() << " collection clone progress"), - _collectionCloningBatchSize(batchSize), - _maxNumClonerCursors(maxNumClonerCursors) { + _batchSize(batchSize), + _arm(executor, + stdx::make_unique(_sourceNss, UserNameIterator()).get()) { // Fetcher throws an exception on null executor. invariant(executor); uassert(ErrorCodes::BadValue, @@ -215,19 +215,15 @@ void CollectionCloner::shutdown() { // Nothing to do if we are already in ShuttingDown or Complete state. return; } + _cancelRemainingWork_inlock(); } void CollectionCloner::_cancelRemainingWork_inlock() { - if (_arm) { - Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - _killArmHandle = _arm->kill(opCtx); - } _countScheduler.shutdown(); _listIndexesFetcher.shutdown(); - if (_establishCollectionCursorsScheduler) { - _establishCollectionCursorsScheduler->shutdown(); + if (_findFetcher) { + _findFetcher->shutdown(); } _dbWorkTaskRunner.cancel(); } @@ -239,9 +235,6 @@ CollectionCloner::Stats CollectionCloner::getStats() const { void CollectionCloner::join() { stdx::unique_lock lk(_mutex); - if (_killArmHandle) { - _executor->waitForEvent(_killArmHandle); - } _condition.wait(lk, [this]() { return !_isActive_inlock(); }); } @@ -257,11 +250,6 @@ void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& sched _scheduleDbWorkFn = scheduleDbWorkFn; } -std::vector CollectionCloner::getDocumentsToInsert_forTest() { - LockGuard lk(_mutex); - return _documentsToInsert; -} - void CollectionCloner::_countCallback( const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { @@ -403,181 +391,80 @@ void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& } } -void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) { - if (!cbd.status.isOK()) { - _finishCallback(cbd.status); - return; - } - if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) { - warning() - << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:" - << this->_sourceNss; - } +void CollectionCloner::_findCallback(const StatusWith& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob, + std::shared_ptr onCompletionGuard) { + if (!fetchResult.isOK()) { + // Wait for active inserts to complete. + waitForDbWorker(); - auto collectionBulkLoader = _storageInterface->createCollectionForBulkLoading( - _destNss, _options, _idIndexSpec, _indexSpecs); + Status newStatus{fetchResult.getStatus().code(), + str::stream() << "While querying collection '" << _sourceNss.ns() + << "' there was an error '" + << fetchResult.getStatus().reason() + << "'"}; - if (!collectionBulkLoader.isOK()) { - _finishCallback(collectionBulkLoader.getStatus()); + stdx::lock_guard lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, newStatus); return; } - _stats.indexes = _indexSpecs.size(); - if (!_idIndexSpec.isEmpty()) { - ++_stats.indexes; - } - - _collLoader = std::move(collectionBulkLoader.getValue()); - - BSONObjBuilder cmdObj; - EstablishCursorsCommand cursorCommand; - // The 'find' command is used when the number of cloning cursors is 1 to ensure - // the correctness of the collection cloning process until 'parallelCollectionScan' - // can be tested more extensively in context of initial sync. - if (_maxNumClonerCursors == 1) { - cmdObj.append("find", _sourceNss.coll()); - cmdObj.append("noCursorTimeout", true); - // Set batchSize to be 0 to establish the cursor without fetching any documents, - // similar to the response format of 'parallelCollectionScan'. - cmdObj.append("batchSize", 0); - cursorCommand = Find; - } else { - cmdObj.append("parallelCollectionScan", _sourceNss.coll()); - cmdObj.append("numCursors", _maxNumClonerCursors); - cursorCommand = ParallelCollScan; + auto batchData(fetchResult.getValue()); + bool lastBatch = *nextAction == Fetcher::NextAction::kNoAction; + if (batchData.documents.size() > 0) { + LockGuard lk(_mutex); + _documents.insert(_documents.end(), batchData.documents.begin(), batchData.documents.end()); + } else if (!batchData.first) { + warning() << "No documents returned in batch; ns: " << _sourceNss + << ", cursorId:" << batchData.cursorId << ", isLastBatch:" << lastBatch; } - Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - - _establishCollectionCursorsScheduler = stdx::make_unique( - _executor, - RemoteCommandRequest(_source, - _sourceNss.db().toString(), - cmdObj.obj(), - ReadPreferenceSetting::secondaryPreferredMetadata(), - opCtx, - RemoteCommandRequest::kNoTimeout), - stdx::bind(&CollectionCloner::_establishCollectionCursorsCallback, - this, - stdx::placeholders::_1, - cursorCommand), - RemoteCommandRetryScheduler::makeRetryPolicy( - numInitialSyncCollectionFindAttempts.load(), - executor::RemoteCommandRequest::kNoTimeout, - RemoteCommandRetryScheduler::kAllRetriableErrors)); - auto scheduleStatus = _establishCollectionCursorsScheduler->startup(); - LOG(1) << "Attempting to establish cursors with maxNumClonerCursors: " << _maxNumClonerCursors; + auto&& scheduleResult = + _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback, + this, + stdx::placeholders::_1, + lastBatch, + onCompletionGuard)); + if (!scheduleResult.isOK()) { + Status newStatus{scheduleResult.getStatus().code(), + str::stream() << "While cloning collection '" << _sourceNss.ns() + << "' there was an error '" + << scheduleResult.getStatus().reason() + << "'"}; - if (!scheduleStatus.isOK()) { - _establishCollectionCursorsScheduler.reset(); - _finishCallback(scheduleStatus); + stdx::lock_guard lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, newStatus); return; } -} -Status CollectionCloner::_parseCursorResponse(BSONObj response, - std::vector* cursors, - EstablishCursorsCommand cursorCommand) { - switch (cursorCommand) { - case Find: { - StatusWith findResponse = CursorResponse::parseFromBSON(response); - if (!findResponse.isOK()) { - Status errorStatus{findResponse.getStatus().code(), - str::stream() - << "While parsing the 'find' query against collection '" - << _sourceNss.ns() - << "' there was an error '" - << findResponse.getStatus().reason() - << "'"}; - return errorStatus; - } - cursors->push_back(std::move(findResponse.getValue())); - break; - } - case ParallelCollScan: { - auto cursorElements = _parseParallelCollectionScanResponse(response); - if (!cursorElements.isOK()) { - return cursorElements.getStatus(); - } - std::vector cursorsArray; - cursorsArray = cursorElements.getValue(); - // Parse each BSONElement into a 'CursorResponse' object. - for (BSONElement cursor : cursorsArray) { - if (!cursor.isABSONObj()) { - Status errorStatus( - ErrorCodes::FailedToParse, - "The 'cursor' field in the list of cursor responses is not a " - "valid BSON Object"); - return errorStatus; - } - const BSONObj cursorObj = cursor.Obj().getOwned(); - StatusWith parallelCollScanResponse = - CursorResponse::parseFromBSON(cursorObj); - if (!parallelCollScanResponse.isOK()) { - return parallelCollScanResponse.getStatus(); - } - cursors->push_back(std::move(parallelCollScanResponse.getValue())); + MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterInitialFind, nssData) { + const BSONObj& data = nssData.getData(); + auto nss = data["nss"].str(); + // Only hang when cloning the specified collection. + if (_destNss.toString() == nss) { + while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterInitialFind) && + !_isShuttingDown()) { + log() << "initialSyncHangCollectionClonerAfterInitialFind fail point enabled for " + << nss << ". Blocking until fail point is disabled."; + mongo::sleepsecs(1); } - break; - } - default: { - Status errorStatus( - ErrorCodes::FailedToParse, - "The command used to establish the collection cloner cursors is not valid."); - return errorStatus; } } - return Status::OK(); -} -void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd, - EstablishCursorsCommand cursorCommand) { - if (_state == State::kShuttingDown) { - Status shuttingDownStatus{ErrorCodes::CallbackCanceled, "Cloner shutting down."}; - _finishCallback(shuttingDownStatus); - return; - } - auto response = rcbd.response; - if (!response.isOK()) { - _finishCallback(response.status); - return; - } - Status commandStatus = getStatusFromCommandResult(response.data); - if (!commandStatus.isOK()) { - Status newStatus{commandStatus.code(), - str::stream() << "While querying collection '" << _sourceNss.ns() - << "' there was an error '" - << commandStatus.reason() - << "'"}; - _finishCallback(commandStatus); - return; + if (!lastBatch) { + invariant(getMoreBob); + getMoreBob->append("getMore", batchData.cursorId); + getMoreBob->append("collection", batchData.nss.coll()); + getMoreBob->append("batchSize", _batchSize); } +} - std::vector cursorResponses; - Status parseResponseStatus = - _parseCursorResponse(response.data, &cursorResponses, cursorCommand); - if (!parseResponseStatus.isOK()) { - _finishCallback(parseResponseStatus); +void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) { + if (!cbd.status.isOK()) { + _finishCallback(cbd.status); return; } - LOG(1) << "Collection cloner running with " << cursorResponses.size() - << " cursors established."; - - // Initialize the 'AsyncResultsMerger'(ARM). - std::vector remoteCursors; - for (auto&& cursorResponse : cursorResponses) { - // A placeholder 'ShardId' is used until the ARM is made less sharding specific. - remoteCursors.emplace_back( - ShardId("CollectionClonerSyncSource"), _source, std::move(cursorResponse)); - } - - // An empty list of authenticated users is passed into the cluster parameters - // as user information is not used in the ARM in context of collection cloning. - _clusterClientCursorParams = - stdx::make_unique(_sourceNss, UserNameIterator()); - _clusterClientCursorParams->remotes = std::move(remoteCursors); - _arm = stdx::make_unique(_executor, _clusterClientCursorParams.get()); // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; @@ -589,134 +476,50 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa // that will cause the destructor of the completion guard to run, the destructor must be run // outside the mutex. This is a necessary condition to invoke _finishCallback. stdx::lock_guard lock(_mutex); - Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); - if (!scheduleStatus.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); - return; - } -} - -StatusWith> CollectionCloner::_parseParallelCollectionScanResponse( - BSONObj resp) { - if (!resp.hasField("cursors")) { - return Status(ErrorCodes::CursorNotFound, - "The 'parallelCollectionScan' response does not contain a 'cursors' field."); - } - BSONElement response = resp["cursors"]; - if (response.type() == BSONType::Array) { - return response.Array(); - } else { - return Status( - ErrorCodes::FailedToParse, - "The 'parallelCollectionScan' response is unable to be transformed into an array."); - } -} - -Status CollectionCloner::_bufferNextBatchFromArm() { - while (_arm->ready()) { - auto armResultStatus = _arm->nextReady(); - if (!armResultStatus.getStatus().isOK()) { - return armResultStatus.getStatus(); - } - if (armResultStatus.getValue().isEOF()) { - // We have reached the end of the batch. - break; - } else { - auto queryResult = armResultStatus.getValue().getResult(); - _documentsToInsert.push_back(std::move(*queryResult)); - } + if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) { + warning() + << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:" + << this->_sourceNss; } - return Status::OK(); -} - -Status CollectionCloner::_scheduleNextARMResultsCallback( - std::shared_ptr onCompletionGuard) { - Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - auto nextEvent = _arm->nextEvent(opCtx); - if (!nextEvent.isOK()) { - return nextEvent.getStatus(); - } - auto event = nextEvent.getValue(); - auto handleARMResultsOnNextEvent = - _executor->onEvent(event, - stdx::bind(&CollectionCloner::_handleARMResultsCallback, - this, - stdx::placeholders::_1, - onCompletionGuard)); - return handleARMResultsOnNextEvent.getStatus(); -} - -void CollectionCloner::_handleARMResultsCallback( - const executor::TaskExecutor::CallbackArgs& cbd, - std::shared_ptr onCompletionGuard) { - auto setResultAndCancelRemainingWork = [this](std::shared_ptr guard, - Status status) { - stdx::lock_guard lock(_mutex); - guard->setResultAndCancelRemainingWork_inlock(lock, status); - return; - }; + auto status = _storageInterface->createCollectionForBulkLoading( + _destNss, _options, _idIndexSpec, _indexSpecs); - if (!cbd.status.isOK()) { - // Wait for active inserts to complete. - waitForDbWorker(); - Status newStatus{cbd.status.code(), - str::stream() << "While querying collection '" << _sourceNss.ns() - << "' there was an error '" - << cbd.status.reason() - << "'"}; - setResultAndCancelRemainingWork(onCompletionGuard, cbd.status); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status.getStatus()); return; } - // Pull the documents from the ARM into a buffer until the entire batch has been processed. - auto nextBatchStatus = _bufferNextBatchFromArm(); - if (!nextBatchStatus.isOK()) { - setResultAndCancelRemainingWork(onCompletionGuard, nextBatchStatus); - return; + _stats.indexes = _indexSpecs.size(); + if (!_idIndexSpec.isEmpty()) { + ++_stats.indexes; } - bool lastBatch = _arm->remotesExhausted(); - auto&& scheduleResult = - _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback, - this, - stdx::placeholders::_1, - lastBatch, - onCompletionGuard)); - if (!scheduleResult.isOK()) { - Status newStatus{scheduleResult.getStatus().code(), - str::stream() << "While cloning collection '" << _sourceNss.ns() - << "' there was an error '" - << scheduleResult.getStatus().reason() - << "'"}; - setResultAndCancelRemainingWork(onCompletionGuard, scheduleResult.getStatus()); - return; - } + _collLoader = std::move(status.getValue()); - MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterHandlingBatchResponse, nssData) { - const BSONObj& data = nssData.getData(); - auto nss = data["nss"].str(); - // Only hang when cloning the specified collection. - if (_destNss.toString() == nss) { - while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterHandlingBatchResponse) && - !_isShuttingDown()) { - log() << "initialSyncHangCollectionClonerAfterHandlingBatchResponse fail point " - "enabled for " - << nss << ". Blocking until fail point is disabled."; - mongo::sleepsecs(1); - } - } - } + _findFetcher = stdx::make_unique( + _executor, + _source, + _sourceNss.db().toString(), + BSON("find" << _sourceNss.coll() << "noCursorTimeout" << true << "batchSize" << _batchSize), + stdx::bind(&CollectionCloner::_findCallback, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3, + onCompletionGuard), + ReadPreferenceSetting::secondaryPreferredMetadata(), + RemoteCommandRequest::kNoTimeout, + RemoteCommandRetryScheduler::makeRetryPolicy( + numInitialSyncCollectionFindAttempts.load(), + executor::RemoteCommandRequest::kNoTimeout, + RemoteCommandRetryScheduler::kAllRetriableErrors)); - // If the remote cursors are not exhausted, schedule this callback again to handle - // the impending cursor response. - if (!lastBatch) { - Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); - if (!scheduleStatus.isOK()) { - setResultAndCancelRemainingWork(onCompletionGuard, scheduleStatus); - return; - } + Status scheduleStatus = _findFetcher->schedule(); + if (!scheduleStatus.isOK()) { + _findFetcher.reset(); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); + return; } } @@ -730,21 +533,24 @@ void CollectionCloner::_insertDocumentsCallback( return; } - UniqueLock lk(_mutex); std::vector docs; - if (_documentsToInsert.size() == 0) { + UniqueLock lk(_mutex); + if (_documents.size() == 0) { warning() << "_insertDocumentsCallback, but no documents to insert for ns:" << _destNss; + if (lastBatch) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK()); } return; } - _documentsToInsert.swap(docs); + + _documents.swap(docs); _stats.documentsCopied += docs.size(); ++_stats.fetchBatches; _progressMeter.hit(int(docs.size())); invariant(_collLoader); const auto status = _collLoader->insertDocuments(docs.cbegin(), docs.cend()); + if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, status); return; @@ -764,10 +570,12 @@ void CollectionCloner::_insertDocumentsCallback( } } - if (lastBatch) { - // Clean up resources once the last batch has been copied over and set the status to OK. - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK()); + if (!lastBatch) { + return; } + + // Done with last batch and time to set result in completion guard to Status::OK(). + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK()); } void CollectionCloner::_finishCallback(const Status& status) { @@ -776,6 +584,7 @@ void CollectionCloner::_finishCallback(const Status& status) { // Copy the status so we can change it below if needed. auto finalStatus = status; bool callCollectionLoader = false; + decltype(_onCompletion) onCompletion; { LockGuard lk(_mutex); @@ -786,6 +595,7 @@ void CollectionCloner::_finishCallback(const Status& status) { invariant(_onCompletion); std::swap(_onCompletion, onCompletion); } + if (callCollectionLoader) { if (finalStatus.isOK()) { const auto loaderStatus = _collLoader->commit(); @@ -799,6 +609,7 @@ void CollectionCloner::_finishCallback(const Status& status) { // This will release the resources held by the loader. _collLoader.reset(); } + onCompletion(finalStatus); // This will release the resources held by the callback function object. '_onCompletion' is -- cgit v1.2.1