summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_batch_fetcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_batch_fetcher.cpp')
-rw-r--r--src/mongo/db/s/migration_batch_fetcher.cpp230
1 files changed, 0 insertions, 230 deletions
diff --git a/src/mongo/db/s/migration_batch_fetcher.cpp b/src/mongo/db/s/migration_batch_fetcher.cpp
deleted file mode 100644
index 4611cd91ec7..00000000000
--- a/src/mongo/db/s/migration_batch_fetcher.cpp
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
-
-#include "mongo/db/s/migration_batch_fetcher.h"
-
-#include "mongo/logv2/log.h"
-#include "mongo/util/timer.h"
-
-namespace mongo {
-
-template <typename Inserter>
-MigrationBatchFetcher<Inserter>::MigrationBatchFetcher(
- OperationContext* outerOpCtx,
- OperationContext* innerOpCtx,
- NamespaceString nss,
- MigrationSessionId sessionId,
- const WriteConcernOptions& writeConcern,
- const ShardId& fromShardId,
- const ChunkRange& range,
- const UUID& migrationId,
- const UUID& collectionId,
- std::shared_ptr<MigrationCloningProgressSharedState> migrationProgress,
- bool parallelFetchingSupported)
- : _nss{std::move(nss)},
- _migrationConcurrency{
- mongo::feature_flags::gConcurrencyInChunkMigration.isEnabledAndIgnoreFCV()
- ? migrationConcurrency.load()
- : 1},
- _sessionId{std::move(sessionId)},
- _inserterWorkers{[&]() {
- ThreadPool::Options options;
- options.poolName = "ChunkMigrationInserters";
- options.minThreads = _migrationConcurrency;
- options.maxThreads = _migrationConcurrency;
- options.onCreateThread = Inserter::onCreateThread;
- return std::make_unique<ThreadPool>(options);
- }()},
- _migrateCloneRequest{_createMigrateCloneRequest()},
- _outerOpCtx{outerOpCtx},
- _innerOpCtx{innerOpCtx},
- _fromShard{uassertStatusOK(
- Grid::get(_outerOpCtx)->shardRegistry()->getShard(_outerOpCtx, fromShardId))},
- _migrationProgress{migrationProgress},
- _range{range},
- _collectionUuid(collectionId),
- _migrationId{migrationId},
- _writeConcern{writeConcern},
- _isParallelFetchingSupported{parallelFetchingSupported} {
- _inserterWorkers->startup();
-}
-
-template <typename Inserter>
-BSONObj MigrationBatchFetcher<Inserter>::_fetchBatch(OperationContext* opCtx) {
- auto commandResponse = uassertStatusOKWithContext(
- _fromShard->runCommand(opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- "admin",
- _migrateCloneRequest,
- Shard::RetryPolicy::kNoRetry),
- "_migrateClone failed: ");
-
- uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(commandResponse),
- "_migrateClone failed: ");
-
- return commandResponse.response;
-}
-
-template <typename Inserter>
-void MigrationBatchFetcher<Inserter>::fetchAndScheduleInsertion() {
- auto numFetchers = _isParallelFetchingSupported ? _migrationConcurrency : 1;
- auto fetchersThreadPool = [&]() {
- ThreadPool::Options options;
- options.poolName = "ChunkMigrationFetchers";
- options.minThreads = numFetchers;
- options.maxThreads = numFetchers;
- options.onCreateThread = onCreateThread;
- return std::make_unique<ThreadPool>(options);
- }();
- fetchersThreadPool->startup();
- for (int i = 0; i < numFetchers; ++i) {
- fetchersThreadPool->schedule([this](Status status) { this->_runFetcher(); });
- }
-
- fetchersThreadPool->shutdown();
- fetchersThreadPool->join();
-}
-
-
-template <typename Inserter>
-void MigrationBatchFetcher<Inserter>::_runFetcher() try {
- auto executor =
- Grid::get(_innerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
-
- auto applicationOpCtx = CancelableOperationContext(
- cc().makeOperationContext(), _innerOpCtx->getCancellationToken(), executor);
-
- auto opCtx = applicationOpCtx.get();
- auto assertNotAborted = [&]() {
- {
- stdx::lock_guard<Client> lk(*_outerOpCtx->getClient());
- _outerOpCtx->checkForInterrupt();
- }
- opCtx->checkForInterrupt();
- };
-
- LOGV2_DEBUG(6718405, 0, "Chunk migration data fetch start", "migrationId"_attr = _migrationId);
- while (true) {
- Timer totalTimer;
- BSONObj nextBatch = _fetchBatch(opCtx);
- assertNotAborted();
- if (_isEmptyBatch(nextBatch)) {
- LOGV2_DEBUG(6718404,
- 0,
- "Chunk migration initial clone complete",
- "migrationId"_attr = _migrationId,
- "duration"_attr = totalTimer.elapsed());
- break;
- }
-
- const auto batchSize = nextBatch.objsize();
- const auto fetchTime = totalTimer.elapsed();
- LOGV2_DEBUG(6718416,
- 0,
- "Chunk migration initial clone fetch end",
- "migrationId"_attr = _migrationId,
- "batchSize"_attr = batchSize,
- "fetch"_attr = duration_cast<Milliseconds>(fetchTime));
-
-
- Inserter inserter{_outerOpCtx,
- _innerOpCtx,
- nextBatch.getOwned(),
- _nss,
- _range,
- _writeConcern,
- _collectionUuid,
- _migrationProgress,
- _migrationId,
- _migrationConcurrency};
-
- _inserterWorkers->schedule([batchSize,
- fetchTime,
- totalTimer = std::move(totalTimer),
- insertTimer = Timer(),
- migrationId = _migrationId,
- inserter = std::move(inserter)](Status status) {
- inserter.run(status);
-
- const auto checkDivByZero = [](auto divisor, auto expression) {
- return divisor == 0 ? -1 : expression();
- };
- const auto calcThroughput = [&](auto bytes, auto duration) {
- return checkDivByZero(durationCount<Microseconds>(duration), [&]() {
- return static_cast<double>(bytes) / durationCount<Microseconds>(duration);
- });
- };
-
- const auto insertTime = insertTimer.elapsed();
- const auto totalTime = totalTimer.elapsed();
- const auto batchThroughputMBps = calcThroughput(batchSize, totalTime);
- const auto insertThroughputMBps = calcThroughput(batchSize, insertTime);
- const auto fetchThroughputMBps = calcThroughput(batchSize, fetchTime);
-
- LOGV2_DEBUG(6718417,
- 1,
- "Chunk migration initial clone apply batch",
- "migrationId"_attr = migrationId,
- "batchSize"_attr = batchSize,
- "total"_attr = duration_cast<Milliseconds>(totalTime),
- "totalThroughputMBps"_attr = batchThroughputMBps,
- "fetch"_attr = duration_cast<Milliseconds>(fetchTime),
- "fetchThroughputMBps"_attr = fetchThroughputMBps,
- "insert"_attr = duration_cast<Milliseconds>(insertTime),
- "insertThroughputMBps"_attr = insertThroughputMBps);
- });
- }
-} catch (const DBException& e) {
- stdx::lock_guard<Client> lk(*_innerOpCtx->getClient());
- _innerOpCtx->getServiceContext()->killOperation(lk, _innerOpCtx, ErrorCodes::Error(6718400));
- LOGV2_ERROR(6718413,
- "Chunk migration failure fetching data",
- "migrationId"_attr = _migrationId,
- "failure"_attr = e.toStatus());
-}
-
-template <typename Inserter>
-MigrationBatchFetcher<Inserter>::~MigrationBatchFetcher() {
- LOGV2(6718401,
- "Shutting down and joining inserter threads for migration {migrationId}",
- "migrationId"_attr = _migrationId);
- _inserterWorkers->shutdown();
- _inserterWorkers->join();
- LOGV2(6718415,
- "Inserter threads for migration {migrationId} joined",
- "migrationId"_attr = _migrationId);
-}
-
-template class MigrationBatchFetcher<MigrationBatchInserter>;
-
-template class MigrationBatchFetcher<MigrationBatchMockInserter>;
-
-} // namespace mongo