If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/s/migration_batch_fetcher.h" #include "mongo/util/timer.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding namespace mongo { template MigrationBatchFetcher::MigrationBatchFetcher( OperationContext* outerOpCtx, OperationContext* innerOpCtx, NamespaceString nss, MigrationSessionId sessionId, const WriteConcernOptions& writeConcern, const ShardId& fromShardId, const ChunkRange& range, const UUID& migrationId, const UUID& collectionId, std::shared_ptr migrationProgress, bool parallelFetchingSupported) : _nss{std::move(nss)}, _migrationConcurrency{ mongo::feature_flags::gConcurrencyInChunkMigration.isEnabledAndIgnoreFCV() ? migrationConcurrency.load() : 1}, _sessionId{std::move(sessionId)}, _inserterWorkers{[&]() { ThreadPool::Options options; options.poolName = "ChunkMigrationInserters"; options.minThreads = _migrationConcurrency; options.maxThreads = _migrationConcurrency; options.onCreateThread = Inserter::onCreateThread; return std::make_unique(options); }()}, _migrateCloneRequest{_createMigrateCloneRequest()}, _outerOpCtx{outerOpCtx}, _innerOpCtx{innerOpCtx}, _fromShard{uassertStatusOK( Grid::get(_outerOpCtx)->shardRegistry()->getShard(_outerOpCtx, fromShardId))}, _migrationProgress{migrationProgress}, _range{range}, _collectionUuid(collectionId), _migrationId{migrationId}, _writeConcern{writeConcern}, _isParallelFetchingSupported{parallelFetchingSupported} { _inserterWorkers->startup(); } template BSONObj MigrationBatchFetcher::_fetchBatch(OperationContext* opCtx) { auto commandResponse = uassertStatusOKWithContext( _fromShard->runCommand(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), "admin", _migrateCloneRequest, Shard::RetryPolicy::kNoRetry), "_migrateClone failed: "); uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(commandResponse), "_migrateClone failed: "); return commandResponse.response; } template void MigrationBatchFetcher::fetchAndScheduleInsertion() { auto numFetchers = _isParallelFetchingSupported ? _migrationConcurrency : 1; auto fetchersThreadPool = [&]() { ThreadPool::Options options; options.poolName = "ChunkMigrationFetchers"; options.minThreads = numFetchers; options.maxThreads = numFetchers; options.onCreateThread = onCreateThread; return std::make_unique(options); }(); fetchersThreadPool->startup(); for (int i = 0; i < numFetchers; ++i) { fetchersThreadPool->schedule([this](Status status) { this->_runFetcher(); }); } fetchersThreadPool->shutdown(); fetchersThreadPool->join(); } template void MigrationBatchFetcher::_runFetcher() try { auto executor = Grid::get(_innerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor(); auto applicationOpCtx = CancelableOperationContext( cc().makeOperationContext(), _innerOpCtx->getCancellationToken(), executor); auto opCtx = applicationOpCtx.get(); auto assertNotAborted = [&]() { { stdx::lock_guard lk(*_outerOpCtx->getClient()); _outerOpCtx->checkForInterrupt(); } opCtx->checkForInterrupt(); }; LOGV2_DEBUG(6718405, 0, "Chunk migration data fetch start", "migrationId"_attr = _migrationId); while (true) { Timer totalTimer; BSONObj nextBatch = _fetchBatch(opCtx); assertNotAborted(); if (_isEmptyBatch(nextBatch)) { LOGV2_DEBUG(6718404, 0, "Chunk migration initial clone complete", "migrationId"_attr = _migrationId, "duration"_attr = totalTimer.elapsed()); break; } const auto batchSize = nextBatch.objsize(); const auto fetchTime = totalTimer.elapsed(); LOGV2_DEBUG(6718416, 0, "Chunk migration initial clone fetch end", "migrationId"_attr = _migrationId, "batchSize"_attr = batchSize, "fetch"_attr = duration_cast(fetchTime)); Inserter inserter{_outerOpCtx, _innerOpCtx, nextBatch.getOwned(), _nss, _range, _writeConcern, _collectionUuid, _migrationProgress, _migrationId, _migrationConcurrency}; _inserterWorkers->schedule([batchSize, fetchTime, totalTimer = std::move(totalTimer), insertTimer = Timer(), migrationId = _migrationId, inserter = std::move(inserter)](Status status) { inserter.run(status); const auto checkDivByZero = [](auto divisor, auto expression) { return divisor == 0 ? -1 : expression(); }; const auto calcThroughput = [&](auto bytes, auto duration) { return checkDivByZero(durationCount(duration), [&]() { return static_cast(bytes) / durationCount(duration); }); }; const auto insertTime = insertTimer.elapsed(); const auto totalTime = totalTimer.elapsed(); const auto batchThroughputMBps = calcThroughput(batchSize, totalTime); const auto insertThroughputMBps = calcThroughput(batchSize, insertTime); const auto fetchThroughputMBps = calcThroughput(batchSize, fetchTime); LOGV2_DEBUG(6718417, 1, "Chunk migration initial clone apply batch", "migrationId"_attr = migrationId, "batchSize"_attr = batchSize, "total"_attr = duration_cast(totalTime), "totalThroughputMBps"_attr = batchThroughputMBps, "fetch"_attr = duration_cast(fetchTime), "fetchThroughputMBps"_attr = fetchThroughputMBps, "insert"_attr = duration_cast(insertTime), "insertThroughputMBps"_attr = insertThroughputMBps); }); } } catch (const DBException& e) { stdx::lock_guard lk(*_innerOpCtx->getClient()); _innerOpCtx->getServiceContext()->killOperation(lk, _innerOpCtx, ErrorCodes::Error(6718400)); LOGV2_ERROR(6718413, "Chunk migration failure fetching data", "migrationId"_attr = _migrationId, "failure"_attr = e.toStatus()); } template MigrationBatchFetcher::~MigrationBatchFetcher() { LOGV2(6718401, "Shutting down and joining inserter threads for migration {migrationId}", "migrationId"_attr = _migrationId); _inserterWorkers->shutdown(); _inserterWorkers->join(); LOGV2(6718415, "Inserter threads for migration {migrationId} joined", "migrationId"_attr = _migrationId); } template class MigrationBatchFetcher; template class MigrationBatchFetcher; } // namespace mongo