diff options
Diffstat (limited to 'src/mongo/db/s/migration_destination_manager.cpp')
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 139 |
1 files changed, 113 insertions, 26 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 4cbcf6cb260..721fab26d3d 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -29,7 +29,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration -#include "mongo/db/s/migration_batch_fetcher.h" #include "mongo/platform/basic.h" #include "mongo/db/s/migration_destination_manager.h" @@ -412,8 +411,8 @@ void MigrationDestinationManager::report(BSONObjBuilder& b, } BSONObjBuilder bb(b.subobjStart("counts")); - bb.append("cloned", _getNumCloned()); - bb.append("clonedBytes", _getNumBytesCloned()); + bb.append("cloned", _numCloned); + bb.append("clonedBytes", _clonedBytes); bb.append("catchup", _numCatchup); bb.append("steady", _numSteady); bb.done(); @@ -447,8 +446,6 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, _lsid = cloneRequest.getLsid(); _txnNumber = cloneRequest.getTxnNumber(); - _parallelFetchersSupported = cloneRequest.parallelFetchingSupported(); - _nss = nss; _fromShard = cloneRequest.getFromShardId(); _fromShardConnString = @@ -465,8 +462,8 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, _chunkMarkedPending = false; - _migrationCloningProgress = std::make_shared<MigrationCloningProgressSharedState>(); - + _numCloned = 0; + _clonedBytes = 0; _numCatchup = 0; _numSteady = 0; @@ -1341,32 +1338,122 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, _sessionMigration->start(opCtx->getServiceContext()); + const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId); + _chunkMarkedPending = true; // no lock needed, only the migrate thread looks. - { - // Destructor of MigrationBatchFetcher is non-trivial. Therefore, - // this scope has semantic significance. - MigrationBatchFetcher<MigrationBatchInserter> fetcher{outerOpCtx, - opCtx, - _nss, - *_sessionId, - _writeConcern, - _fromShard, - range, - *_migrationId, - *_collectionUuid, - _migrationCloningProgress, - _parallelFetchersSupported}; - fetcher.fetchAndScheduleInsertion(); - } - opCtx->checkForInterrupt(); - lastOpApplied = _migrationCloningProgress->getMaxOptime(); + auto assertNotAborted = [&](OperationContext* opCtx) { + opCtx->checkForInterrupt(); + outerOpCtx->checkForInterrupt(); + uassert(50748, "Migration aborted while copying documents", getState() != kAbort); + }; + + auto insertBatchFn = [&](OperationContext* opCtx, BSONObj nextBatch) { + auto arr = nextBatch["objects"].Obj(); + if (arr.isEmpty()) { + return false; + } + auto it = arr.begin(); + while (it != arr.end()) { + int batchNumCloned = 0; + int batchClonedBytes = 0; + const int batchMaxCloned = migrateCloneInsertionBatchSize.load(); + + assertNotAborted(opCtx); + + write_ops::InsertCommandRequest insertOp(_nss); + insertOp.getWriteCommandRequestBase().setOrdered(true); + insertOp.setDocuments([&] { + std::vector<BSONObj> toInsert; + while (it != arr.end() && + (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) { + const auto& doc = *it; + BSONObj docToClone = doc.Obj(); + toInsert.push_back(docToClone); + batchNumCloned++; + batchClonedBytes += docToClone.objsize(); + ++it; + } + return toInsert; + }()); + + { + // Disable the schema validation (during document inserts and updates) + // and any internal validation for opCtx for performInserts() + DisableDocumentValidation documentValidationDisabler( + opCtx, + DocumentValidationSettings::kDisableSchemaValidation | + DocumentValidationSettings::kDisableInternalValidation); + const auto reply = write_ops_exec::performInserts( + opCtx, insertOp, OperationSource::kFromMigrate); + for (unsigned long i = 0; i < reply.results.size(); ++i) { + uassertStatusOKWithContext(reply.results[i], + str::stream() << "Insert of " + << insertOp.getDocuments()[i] + << " failed."); + } + // Revert to the original DocumentValidationSettings for opCtx + } + + migrationutil::persistUpdatedNumOrphans( + opCtx, _migrationId.get(), *_collectionUuid, batchNumCloned); + + { + stdx::lock_guard<Latch> statsLock(_mutex); + _numCloned += batchNumCloned; + ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch( + batchNumCloned); + _clonedBytes += batchClonedBytes; + } + if (_writeConcern.needToWaitForOtherNodes()) { + runWithoutSession(outerOpCtx, [&] { + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::ReplicationCoordinator::get(opCtx)->awaitReplication( + opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), + _writeConcern); + if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { + LOGV2_WARNING( + 22011, + "secondaryThrottle on, but doc insert timed out; continuing", + "migrationId"_attr = _migrationId->toBSON()); + } else { + uassertStatusOK(replStatus.status); + } + }); + } + + sleepmillis(migrateCloneInsertionBatchDelayMS.load()); + } + return true; + }; + + auto fetchBatchFn = [&](OperationContext* opCtx, BSONObj* nextBatch) { + auto commandResponse = uassertStatusOKWithContext( + fromShard->runCommand(opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + "admin", + migrateCloneRequest, + Shard::RetryPolicy::kNoRetry), + "_migrateClone failed: "); + + uassertStatusOKWithContext( + Shard::CommandResponse::getEffectiveStatus(commandResponse), + "_migrateClone failed: "); + + *nextBatch = commandResponse.response; + return nextBatch->getField("objects").Obj().isEmpty(); + }; + + // If running on a replicated system, we'll need to flush the docs we cloned to the + // secondaries + lastOpApplied = fetchAndApplyBatch(opCtx, insertBatchFn, fetchBatchFn); timing->done(4); migrateThreadHangAtStep4.pauseWhileSet(); if (MONGO_unlikely(failMigrationOnRecipient.shouldFail())) { - _setStateFail(str::stream() << "failing migration after cloning " << _getNumCloned() + _setStateFail(str::stream() << "failing migration after cloning " << _numCloned << " docs due to failMigrationOnRecipient failpoint"); return; } |