diff options
author | Andrew Witten <andrew.witten@mongodb.com> | 2022-12-22 23:28:14 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-23 17:01:18 +0000 |
commit | ede7bdea766fff40678d03a7051b0d15de0fbba6 (patch) | |
tree | 2422031308d3b708c76d9bc58e62cf6bbfdc651f | |
parent | c585f971a68e707da7420a6a5c19988bb65a6205 (diff) | |
download | mongo-ede7bdea766fff40678d03a7051b0d15de0fbba6.tar.gz |
SERVER-67183 Add parallel fetchers and inserters for chunk migration
(cherry picked from commit 1564f715d16870df7a524ad702aad6be0f2da1f5)
-rw-r--r-- | jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js | 2 | ||||
-rw-r--r-- | jstests/sharding/move_chunk_concurrent_cloning.js | 160 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/s/migration_batch_fetcher.cpp | 230 | ||||
-rw-r--r-- | src/mongo/db/s/migration_batch_fetcher.h | 168 | ||||
-rw-r--r-- | src/mongo/db/s/migration_batch_fetcher_test.cpp | 269 | ||||
-rw-r--r-- | src/mongo/db/s/migration_batch_inserter.cpp | 200 | ||||
-rw-r--r-- | src/mongo/db/s/migration_batch_inserter.h | 136 | ||||
-rw-r--r-- | src/mongo/db/s/migration_batch_mock_inserter.h | 67 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 132 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 52 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 139 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 24 | ||||
-rw-r--r-- | src/mongo/db/s/start_chunk_clone_request.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/start_chunk_clone_request.h | 6 |
16 files changed, 1375 insertions, 225 deletions
diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js b/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js index 1dcf4f203b9..ff7a8364d86 100644 --- a/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js +++ b/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js @@ -40,7 +40,7 @@ var $config = extendWorkload($config, function($config, $super) { // So, if the range deleter has not yet cleaned up that document when the chunk is // moved back to the original shard, the moveChunk may fail as a result of a duplicate // key error on the recipient. - err.message.includes("Location51008")); + err.message.includes("Location51008") || err.message.includes("Location6718402")); }; $config.data.runningWithStepdowns = diff --git a/jstests/sharding/move_chunk_concurrent_cloning.js b/jstests/sharding/move_chunk_concurrent_cloning.js index 954c2d7cd35..3166f823351 100644 --- a/jstests/sharding/move_chunk_concurrent_cloning.js +++ b/jstests/sharding/move_chunk_concurrent_cloning.js @@ -6,91 +6,101 @@ load('./jstests/libs/chunk_manipulation_util.js'); -// For startParallelOps to write its state -let staticMongod = MongoRunner.runMongod({}); - -let st = new ShardingTest({shards: 2}); -st.stopBalancer(); - -const kThreadCount = Math.floor(Math.random() * 31) + 1; -const kPadding = new Array(1024).join("x"); - -let testDB = st.s.getDB('test'); -assert.commandWorked(testDB.adminCommand({enableSharding: 'test'})); -st.ensurePrimaryShard('test', st.shard0.shardName); -assert.commandWorked(testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}})); - -let shardKeyVal = 0; -const kDocsInBatch = 8 * 1000; -const kMinCollSize = 128 * 1024 * 1024; -let approxInsertedSize = 0; -while (approxInsertedSize < kMinCollSize) { - var bulk = testDB.user.initializeUnorderedBulkOp(); - for (let docs = 0; docs < kDocsInBatch; docs++) { - shardKeyVal++; - bulk.insert({_id: shardKeyVal, x: shardKeyVal, padding: kPadding}); +const runParallelMoveChunk = (numThreads) => { + // For startParallelOps to write its state + let staticMongod = MongoRunner.runMongod({}); + + let st = new ShardingTest({shards: 2}); + st.stopBalancer(); + + const kThreadCount = numThreads; + const kPadding = new Array(1024).join("x"); + + let testDB = st.s.getDB('test'); + assert.commandWorked(testDB.adminCommand({enableSharding: 'test'})); + st.ensurePrimaryShard('test', st.shard0.shardName); + assert.commandWorked(testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}})); + + let shardKeyVal = 0; + const kDocsInBatch = 8 * 1000; + const kMinCollSize = 128 * 1024 * 1024; + let approxInsertedSize = 0; + while (approxInsertedSize < kMinCollSize) { + var bulk = testDB.user.initializeUnorderedBulkOp(); + for (let docs = 0; docs < kDocsInBatch; docs++) { + shardKeyVal++; + bulk.insert({_id: shardKeyVal, x: shardKeyVal, padding: kPadding}); + } + assert.commandWorked(bulk.execute()); + + approxInsertedSize = approxInsertedSize + (kDocsInBatch * 1024); } - assert.commandWorked(bulk.execute()); - approxInsertedSize = approxInsertedSize + (kDocsInBatch * 1024); -} - -const kInitialLoadFinalKey = shardKeyVal; - -print(`Running tests with migrationConcurrency == ${kThreadCount}`); -st._rs.forEach((replSet) => { - assert.commandWorked(replSet.test.getPrimary().adminCommand( - {setParameter: 1, migrationConcurrency: kThreadCount})); -}); - -const configCollEntry = - st.s.getDB('config').getCollection('collections').findOne({_id: 'test.user'}); -let chunks = st.s.getDB('config').chunks.find({uuid: configCollEntry.uuid}).toArray(); -assert.eq(1, chunks.length, tojson(chunks)); - -let joinMoveChunk = - moveChunkParallel(staticMongod, st.s0.host, {x: 0}, null, 'test.user', st.shard1.shardName); - -// Migration cloning scans by shard key order. Perform some writes against the collection on both -// the lower and upper ends of the shard key values while migration is happening to exercise -// xferMods logic. -const kDeleteIndexOffset = kInitialLoadFinalKey - 3000; -const kUpdateIndexOffset = kInitialLoadFinalKey - 5000; -for (let x = 0; x < 1000; x++) { - assert.commandWorked(testDB.user.remove({x: x})); - assert.commandWorked(testDB.user.update({x: 4000 + x}, {$set: {updated: true}})); + const kInitialLoadFinalKey = shardKeyVal; + + print(`Running tests with migrationConcurrency == ${kThreadCount}`); + st._rs.forEach((replSet) => { + assert.commandWorked(replSet.test.getPrimary().adminCommand( + {setParameter: 1, migrationConcurrency: kThreadCount})); + }); + + const configCollEntry = + st.s.getDB('config').getCollection('collections').findOne({_id: 'test.user'}); + let chunks = st.s.getDB('config').chunks.find({uuid: configCollEntry.uuid}).toArray(); + assert.eq(1, chunks.length, tojson(chunks)); + + let joinMoveChunk = + moveChunkParallel(staticMongod, st.s0.host, {x: 0}, null, 'test.user', st.shard1.shardName); + + // Migration cloning scans by shard key order. Perform some writes against the collection on + // both the lower and upper ends of the shard key values while migration is happening to + // exercise xferMods logic. + const kDeleteIndexOffset = kInitialLoadFinalKey - 3000; + const kUpdateIndexOffset = kInitialLoadFinalKey - 5000; + for (let x = 0; x < 1000; x++) { + assert.commandWorked(testDB.user.remove({x: x})); + assert.commandWorked(testDB.user.update({x: 4000 + x}, {$set: {updated: true}})); + + assert.commandWorked(testDB.user.remove({x: kDeleteIndexOffset + x})); + assert.commandWorked( + testDB.user.update({x: kUpdateIndexOffset + x}, {$set: {updated: true}})); + + let newShardKey = kInitialLoadFinalKey + x + 1; + assert.commandWorked(testDB.user.insert({_id: newShardKey, x: newShardKey})); + } - assert.commandWorked(testDB.user.remove({x: kDeleteIndexOffset + x})); - assert.commandWorked(testDB.user.update({x: kUpdateIndexOffset + x}, {$set: {updated: true}})); + joinMoveChunk(); - let newShardKey = kInitialLoadFinalKey + x + 1; - assert.commandWorked(testDB.user.insert({_id: newShardKey, x: newShardKey})); -} + let shardKeyIdx = 1000; // Index starts at 1k since we deleted the first 1k docs. + let cursor = testDB.user.find().sort({x: 1}); -joinMoveChunk(); + while (cursor.hasNext()) { + let next = cursor.next(); + assert.eq(next.x, shardKeyIdx); -let shardKeyIdx = 1000; // Index starts at 1k since we deleted the first 1k docs. -let cursor = testDB.user.find().sort({x: 1}); + if ((shardKeyIdx >= 4000 && shardKeyIdx < 5000) || + (shardKeyIdx >= kUpdateIndexOffset && shardKeyIdx < (kUpdateIndexOffset + 1000))) { + assert.eq(true, next.updated, tojson(next)); + } -while (cursor.hasNext()) { - let next = cursor.next(); - assert.eq(next.x, shardKeyIdx); + shardKeyIdx++; - if ((shardKeyIdx >= 4000 && shardKeyIdx < 5000) || - (shardKeyIdx >= kUpdateIndexOffset && shardKeyIdx < (kUpdateIndexOffset + 1000))) { - assert.eq(true, next.updated, tojson(next)); + if (shardKeyIdx == kDeleteIndexOffset) { + shardKeyIdx += 1000; + } } - shardKeyIdx++; + shardKeyIdx--; + assert.eq(shardKeyIdx, kInitialLoadFinalKey + 1000); - if (shardKeyIdx == kDeleteIndexOffset) { - shardKeyIdx += 1000; - } -} + st.stop(); + MongoRunner.stopMongod(staticMongod); +}; -shardKeyIdx--; -assert.eq(shardKeyIdx, kInitialLoadFinalKey + 1000); +// Run test 10 times with random concurrency levels. +for (let i = 1; i <= 5; i++) { + runParallelMoveChunk(Math.floor(Math.random() * 31) + 1); +} +} -st.stop(); -MongoRunner.stopMongod(staticMongod); -})(); + )(); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 6906cd0c57d..b581f666302 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -68,6 +68,8 @@ env.Library( 'config_server_op_observer.cpp', 'global_index_metrics.cpp', 'metadata_manager.cpp', + 'migration_batch_fetcher.cpp', + 'migration_batch_inserter.cpp', 'migration_chunk_cloner_source_legacy.cpp', 'migration_chunk_cloner_source.cpp', 'migration_coordinator_document.idl', @@ -568,6 +570,7 @@ env.CppUnitTest( 'global_index_metrics_test.cpp', 'implicit_collection_creation_test.cpp', 'metadata_manager_test.cpp', + 'migration_batch_fetcher_test.cpp', 'migration_chunk_cloner_source_legacy_test.cpp', 'migration_destination_manager_test.cpp', 'migration_session_id_test.cpp', diff --git a/src/mongo/db/s/migration_batch_fetcher.cpp b/src/mongo/db/s/migration_batch_fetcher.cpp new file mode 100644 index 00000000000..4611cd91ec7 --- /dev/null +++ b/src/mongo/db/s/migration_batch_fetcher.cpp @@ -0,0 +1,230 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/db/s/migration_batch_fetcher.h" + +#include "mongo/logv2/log.h" +#include "mongo/util/timer.h" + +namespace mongo { + +template <typename Inserter> +MigrationBatchFetcher<Inserter>::MigrationBatchFetcher( + OperationContext* outerOpCtx, + OperationContext* innerOpCtx, + NamespaceString nss, + MigrationSessionId sessionId, + const WriteConcernOptions& writeConcern, + const ShardId& fromShardId, + const ChunkRange& range, + const UUID& migrationId, + const UUID& collectionId, + std::shared_ptr<MigrationCloningProgressSharedState> migrationProgress, + bool parallelFetchingSupported) + : _nss{std::move(nss)}, + _migrationConcurrency{ + mongo::feature_flags::gConcurrencyInChunkMigration.isEnabledAndIgnoreFCV() + ? migrationConcurrency.load() + : 1}, + _sessionId{std::move(sessionId)}, + _inserterWorkers{[&]() { + ThreadPool::Options options; + options.poolName = "ChunkMigrationInserters"; + options.minThreads = _migrationConcurrency; + options.maxThreads = _migrationConcurrency; + options.onCreateThread = Inserter::onCreateThread; + return std::make_unique<ThreadPool>(options); + }()}, + _migrateCloneRequest{_createMigrateCloneRequest()}, + _outerOpCtx{outerOpCtx}, + _innerOpCtx{innerOpCtx}, + _fromShard{uassertStatusOK( + Grid::get(_outerOpCtx)->shardRegistry()->getShard(_outerOpCtx, fromShardId))}, + _migrationProgress{migrationProgress}, + _range{range}, + _collectionUuid(collectionId), + _migrationId{migrationId}, + _writeConcern{writeConcern}, + _isParallelFetchingSupported{parallelFetchingSupported} { + _inserterWorkers->startup(); +} + +template <typename Inserter> +BSONObj MigrationBatchFetcher<Inserter>::_fetchBatch(OperationContext* opCtx) { + auto commandResponse = uassertStatusOKWithContext( + _fromShard->runCommand(opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + "admin", + _migrateCloneRequest, + Shard::RetryPolicy::kNoRetry), + "_migrateClone failed: "); + + uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(commandResponse), + "_migrateClone failed: "); + + return commandResponse.response; +} + +template <typename Inserter> +void MigrationBatchFetcher<Inserter>::fetchAndScheduleInsertion() { + auto numFetchers = _isParallelFetchingSupported ? _migrationConcurrency : 1; + auto fetchersThreadPool = [&]() { + ThreadPool::Options options; + options.poolName = "ChunkMigrationFetchers"; + options.minThreads = numFetchers; + options.maxThreads = numFetchers; + options.onCreateThread = onCreateThread; + return std::make_unique<ThreadPool>(options); + }(); + fetchersThreadPool->startup(); + for (int i = 0; i < numFetchers; ++i) { + fetchersThreadPool->schedule([this](Status status) { this->_runFetcher(); }); + } + + fetchersThreadPool->shutdown(); + fetchersThreadPool->join(); +} + + +template <typename Inserter> +void MigrationBatchFetcher<Inserter>::_runFetcher() try { + auto executor = + Grid::get(_innerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor(); + + auto applicationOpCtx = CancelableOperationContext( + cc().makeOperationContext(), _innerOpCtx->getCancellationToken(), executor); + + auto opCtx = applicationOpCtx.get(); + auto assertNotAborted = [&]() { + { + stdx::lock_guard<Client> lk(*_outerOpCtx->getClient()); + _outerOpCtx->checkForInterrupt(); + } + opCtx->checkForInterrupt(); + }; + + LOGV2_DEBUG(6718405, 0, "Chunk migration data fetch start", "migrationId"_attr = _migrationId); + while (true) { + Timer totalTimer; + BSONObj nextBatch = _fetchBatch(opCtx); + assertNotAborted(); + if (_isEmptyBatch(nextBatch)) { + LOGV2_DEBUG(6718404, + 0, + "Chunk migration initial clone complete", + "migrationId"_attr = _migrationId, + "duration"_attr = totalTimer.elapsed()); + break; + } + + const auto batchSize = nextBatch.objsize(); + const auto fetchTime = totalTimer.elapsed(); + LOGV2_DEBUG(6718416, + 0, + "Chunk migration initial clone fetch end", + "migrationId"_attr = _migrationId, + "batchSize"_attr = batchSize, + "fetch"_attr = duration_cast<Milliseconds>(fetchTime)); + + + Inserter inserter{_outerOpCtx, + _innerOpCtx, + nextBatch.getOwned(), + _nss, + _range, + _writeConcern, + _collectionUuid, + _migrationProgress, + _migrationId, + _migrationConcurrency}; + + _inserterWorkers->schedule([batchSize, + fetchTime, + totalTimer = std::move(totalTimer), + insertTimer = Timer(), + migrationId = _migrationId, + inserter = std::move(inserter)](Status status) { + inserter.run(status); + + const auto checkDivByZero = [](auto divisor, auto expression) { + return divisor == 0 ? -1 : expression(); + }; + const auto calcThroughput = [&](auto bytes, auto duration) { + return checkDivByZero(durationCount<Microseconds>(duration), [&]() { + return static_cast<double>(bytes) / durationCount<Microseconds>(duration); + }); + }; + + const auto insertTime = insertTimer.elapsed(); + const auto totalTime = totalTimer.elapsed(); + const auto batchThroughputMBps = calcThroughput(batchSize, totalTime); + const auto insertThroughputMBps = calcThroughput(batchSize, insertTime); + const auto fetchThroughputMBps = calcThroughput(batchSize, fetchTime); + + LOGV2_DEBUG(6718417, + 1, + "Chunk migration initial clone apply batch", + "migrationId"_attr = migrationId, + "batchSize"_attr = batchSize, + "total"_attr = duration_cast<Milliseconds>(totalTime), + "totalThroughputMBps"_attr = batchThroughputMBps, + "fetch"_attr = duration_cast<Milliseconds>(fetchTime), + "fetchThroughputMBps"_attr = fetchThroughputMBps, + "insert"_attr = duration_cast<Milliseconds>(insertTime), + "insertThroughputMBps"_attr = insertThroughputMBps); + }); + } +} catch (const DBException& e) { + stdx::lock_guard<Client> lk(*_innerOpCtx->getClient()); + _innerOpCtx->getServiceContext()->killOperation(lk, _innerOpCtx, ErrorCodes::Error(6718400)); + LOGV2_ERROR(6718413, + "Chunk migration failure fetching data", + "migrationId"_attr = _migrationId, + "failure"_attr = e.toStatus()); +} + +template <typename Inserter> +MigrationBatchFetcher<Inserter>::~MigrationBatchFetcher() { + LOGV2(6718401, + "Shutting down and joining inserter threads for migration {migrationId}", + "migrationId"_attr = _migrationId); + _inserterWorkers->shutdown(); + _inserterWorkers->join(); + LOGV2(6718415, + "Inserter threads for migration {migrationId} joined", + "migrationId"_attr = _migrationId); +} + +template class MigrationBatchFetcher<MigrationBatchInserter>; + +template class MigrationBatchFetcher<MigrationBatchMockInserter>; + +} // namespace mongo diff --git a/src/mongo/db/s/migration_batch_fetcher.h b/src/mongo/db/s/migration_batch_fetcher.h new file mode 100644 index 00000000000..d402c3a92e6 --- /dev/null +++ b/src/mongo/db/s/migration_batch_fetcher.h @@ -0,0 +1,168 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/base/error_extra_info.h" +#include "mongo/db/client.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/s/migration_batch_inserter.h" +#include "mongo/db/s/migration_batch_mock_inserter.h" +#include "mongo/db/s/migration_session_id.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/grid.h" +#include "mongo/s/shard_id.h" +#include "mongo/util/cancellation.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/producer_consumer_queue.h" + +#pragma once + +namespace mongo { + + +// This class is only instantiated on the destination of a chunk migration and +// has a single purpose: to manage two thread pools, one +// on which threads perform inserters, and one on which +// threads run _migrateClone requests (to fetch batches of documents to insert). +// +// The constructor creates and starts the inserter thread pool. The destructor shuts down +// and joins the inserter thread pool. +// +// The main work of the class is in method fetchAndScheduleInsertion. That method +// starts a thread pool for fetchers. Each thread in that thread pool sits in a loop +// sending out _migrateClone requests, blocking on the response, and scheduling an +// inserter on the inserter thread pool. This function joins and shuts down the +// fetcher thread pool once all batches have been fetched. +// +// Inserter is templated only to allow a mock inserter to exist. +// There is only one implementation of inserter currently, which is MigrationBatchInserter. +// +// A few things to note: +// - After fetchAndScheduleInsertion returns, insertions are still being executed (although fetches +// are not). +// - Sending out _migrateClone requests in parallel implies the need for synchronization on the +// source. See the comments in migration_chunk_cloner_source.h for details around +// that. +// - The requirement on source side synchronization implies that care must be taken on upgrade. +// In particular, if the source is running an earlier binary that doesn't have code for +// source side synchronization, it is unsafe to send _migrateClone requests in parallel. +// To handle that case, when the source is prepared to service _migrateClone requests in +// parallel, the field "parallelMigrateCloneSupported" is included in the "_recvChunkStart" +// command. The inclusion of that field indicates to the destination that it is safe +// to send _migrateClone requests in parallel. Its exclusion indicates that it is unsafe. +template <typename Inserter> +class MigrationBatchFetcher { +public: + MigrationBatchFetcher(OperationContext* outerOpCtx, + OperationContext* innerOpCtx, + NamespaceString nss, + MigrationSessionId sessionId, + const WriteConcernOptions& writeConcern, + const ShardId& fromShardId, + const ChunkRange& range, + const UUID& migrationId, + const UUID& collectionId, + std::shared_ptr<MigrationCloningProgressSharedState> migrationInfo, + bool parallelFetchingSupported); + + ~MigrationBatchFetcher(); + + // Repeatedly fetch batches (using _migrateClone request) and schedule inserter jobs + // on thread pool. + void fetchAndScheduleInsertion(); + + // Get inserter thread pool stats. + ThreadPool::Stats getThreadPoolStats() const { + return _inserterWorkers->getStats(); + } + +private: + NamespaceString _nss; + + // Size of thread pools. + int _migrationConcurrency; + + MigrationSessionId _sessionId; + + // Inserter thread pool. + std::unique_ptr<ThreadPool> _inserterWorkers; + + BSONObj _migrateCloneRequest; + + OperationContext* _outerOpCtx; + + OperationContext* _innerOpCtx; + + std::shared_ptr<Shard> _fromShard; + + // Shared state, by which the progress of migration is communicated + // to MigrationDestinationManager. + std::shared_ptr<MigrationCloningProgressSharedState> _migrationProgress; + + ChunkRange _range; + + UUID _collectionUuid; + + UUID _migrationId; + + WriteConcernOptions _writeConcern; + + // Indicates if source is prepared to service _migrateClone requests in parallel. + bool _isParallelFetchingSupported; + + // Given session id and namespace, create migrateCloneRequest. + // Only should be created once for the lifetime of the object. + BSONObj _createMigrateCloneRequest() const { + BSONObjBuilder builder; + builder.append("_migrateClone", _nss.ns()); + _sessionId.append(&builder); + return builder.obj(); + } + + void _runFetcher(); + + // Fetches next batch using _migrateClone request and return it. May return an empty batch. + BSONObj _fetchBatch(OperationContext* opCtx); + + static bool _isEmptyBatch(const BSONObj& batch) { + return batch.getField("objects").Obj().isEmpty(); + } + + static void onCreateThread(const std::string& threadName) { + Client::initThread(threadName, getGlobalServiceContext(), nullptr); + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationKillableByStepdown(lk); + } + } + +}; // namespace mongo + +} // namespace mongo diff --git a/src/mongo/db/s/migration_batch_fetcher_test.cpp b/src/mongo/db/s/migration_batch_fetcher_test.cpp new file mode 100644 index 00000000000..f86368b321b --- /dev/null +++ b/src/mongo/db/s/migration_batch_fetcher_test.cpp @@ -0,0 +1,269 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/s/migration_batch_fetcher.h" +#include "mongo/db/s/migration_session_id.h" +#include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/dbtests/mock/mock_replica_set.h" +#include "mongo/executor/cancelable_executor.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_mock.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/idl/server_parameter_test_util.h" +#include "mongo/logv2/log.h" +#include "mongo/platform/basic.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/stdx/future.h" +#include "mongo/stdx/thread.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/duration.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/uuid.h" + +namespace mongo { +namespace { + +using unittest::assertGet; + +const ConnectionString kDonorConnStr = + ConnectionString::forReplicaSet("Donor", + {HostAndPort("DonorHost1:1234"), + HostAndPort{"DonorHost2:1234"}, + HostAndPort{"DonorHost3:1234"}}); +const ConnectionString kRecipientConnStr = + ConnectionString::forReplicaSet("Recipient", + {HostAndPort("RecipientHost1:1234"), + HostAndPort("RecipientHost2:1234"), + HostAndPort("RecipientHost3:1234")}); + +class MigrationBatchFetcherTestFixture : public ShardServerTestFixture { + +protected: + /** + * Sets up the task executor as well as a TopologyListenerMock for each unit test. + */ + void setUp() override { + ShardServerTestFixture::setUp(); + + { + auto donorShard = assertGet( + shardRegistry()->getShard(operationContext(), kDonorConnStr.getSetName())); + RemoteCommandTargeterMock::get(donorShard->getTargeter()) + ->setConnectionStringReturnValue(kDonorConnStr); + RemoteCommandTargeterMock::get(donorShard->getTargeter()) + ->setFindHostReturnValue(kDonorConnStr.getServers()[0]); + } + } + + void tearDown() override { + ShardServerTestFixture::tearDown(); + } + + /** + * Instantiates a BSON object in which both "_id" and "X" are set to value. + */ + static BSONObj createDocument(int value) { + return BSON("_id" << value << "X" << value); + } + static BSONObj createEmpty() { + return BSONObj{}; + } + /** + * Creates a list of documents to clone. + */ + static std::vector<BSONObj> createDocumentsToClone() { + return {createDocument(1), createDocument(2), createDocument(3)}; + } + + /** + * Creates a list of documents to clone and converts it to a BSONArray. + */ + static BSONArray createDocumentsToCloneArray() { + BSONArrayBuilder arrayBuilder; + for (auto& doc : createDocumentsToClone()) { + arrayBuilder.append(doc); + } + return arrayBuilder.arr(); + } + static BSONArray createEmptyCloneArray() { + return BSONArrayBuilder().arr(); + } + + static BSONObj getTerminalBsonObj() { + return BSON("Status" + << "OK" + << "ok" << 1 << "objects" << createEmptyCloneArray()); + } + + static BSONObj getBatchBsonObj() { + return BSON("Status" + << "OK" + << "ok" << 1 << "objects" << createDocumentsToCloneArray()); + } + +private: + OperationContext* _opCtx; + ServiceContext* _svcCtx; + executor::NetworkInterfaceMock* _net; + + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient() override { + class StaticCatalogClient final : public ShardingCatalogClientMock { + public: + StaticCatalogClient() = default; + + StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( + OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { + + ShardType donorShard; + donorShard.setName(kDonorConnStr.getSetName()); + donorShard.setHost(kDonorConnStr.toString()); + + ShardType recipientShard; + recipientShard.setName(kRecipientConnStr.getSetName()); + recipientShard.setHost(kRecipientConnStr.toString()); + + return repl::OpTimeWith<std::vector<ShardType>>({donorShard, recipientShard}); + } + }; + + return std::make_unique<StaticCatalogClient>(); + } +}; + +auto getOnMigrateCloneCommandCb(BSONObj ret) { + return [ret](const executor::RemoteCommandRequest& request) -> StatusWith<BSONObj> { + ASSERT_EQ(request.cmdObj.getField("_migrateClone").String(), "test.foo"); + return ret; + }; +} + +TEST_F(MigrationBatchFetcherTestFixture, BasicEmptyFetchingTest) { + NamespaceString nss{"test", "foo"}; + ShardId fromShard{"Donor"}; + auto msid = MigrationSessionId::generate(fromShard, "Recipient"); + auto outerOpCtx = operationContext(); + auto newClient = outerOpCtx->getServiceContext()->makeClient("MigrationCoordinator"); + + int concurrency = 30; + RAIIServerParameterControllerForTest featureFlagController( + "featureFlagConcurrencyInChunkMigration", true); + RAIIServerParameterControllerForTest setMigrationConcurrencyParam{"migrationConcurrency", + concurrency}; + + AlternativeClientRegion acr(newClient); + auto executor = + Grid::get(outerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor(); + auto newOpCtxPtr = CancelableOperationContext( + cc().makeOperationContext(), outerOpCtx->getCancellationToken(), executor); + auto opCtx = newOpCtxPtr.get(); + + auto fetcher = std::make_unique<MigrationBatchFetcher<MigrationBatchMockInserter>>( + outerOpCtx, + opCtx, + nss, + msid, + WriteConcernOptions::parse(WriteConcernOptions::Majority).getValue(), + fromShard, + ChunkRange{BSON("x" << 1), BSON("x" << 2)}, + UUID::gen(), + UUID::gen(), + nullptr, + true); + + // Start asynchronous task for responding to _migrateClone requests. + // Must name the return of value std::async. The destructor of std::future joins the + // asynchrounous task. (If it were left unnamed, the destructor would run inline, and the test + // would hang forever.) + auto fut = stdx::async(stdx::launch::async, [&]() { + // One terminal response for each thread + for (int i = 0; i < concurrency; ++i) { + onCommand(getOnMigrateCloneCommandCb(getTerminalBsonObj())); + } + }); + fetcher->fetchAndScheduleInsertion(); +} + +TEST_F(MigrationBatchFetcherTestFixture, BasicFetching) { + NamespaceString nss{"test", "foo"}; + ShardId fromShard{"Donor"}; + auto msid = MigrationSessionId::generate(fromShard, "Recipient"); + + auto outerOpCtx = operationContext(); + auto newClient = outerOpCtx->getServiceContext()->makeClient("MigrationCoordinator"); + AlternativeClientRegion acr(newClient); + + auto executor = + Grid::get(outerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor(); + auto newOpCtxPtr = CancelableOperationContext( + cc().makeOperationContext(), outerOpCtx->getCancellationToken(), executor); + auto opCtx = newOpCtxPtr.get(); + + + int concurrency = 30; + RAIIServerParameterControllerForTest featureFlagController( + "featureFlagConcurrencyInChunkMigration", true); + RAIIServerParameterControllerForTest setMigrationConcurrencyParam{"migrationConcurrency", + concurrency}; + + auto fetcher = std::make_unique<MigrationBatchFetcher<MigrationBatchMockInserter>>( + outerOpCtx, + opCtx, + nss, + msid, + WriteConcernOptions::parse(WriteConcernOptions::Majority).getValue(), + fromShard, + ChunkRange{BSON("x" << 1), BSON("x" << 2)}, + UUID::gen(), + UUID::gen(), + nullptr, + true); + + auto fut = stdx::async(stdx::launch::async, [&]() { + for (int i = 0; i < 8; ++i) { + onCommand(getOnMigrateCloneCommandCb(getBatchBsonObj())); + } + // One terminal response for each thread + for (int i = 0; i < concurrency; ++i) { + onCommand(getOnMigrateCloneCommandCb(getTerminalBsonObj())); + } + }); + fetcher->fetchAndScheduleInsertion(); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/migration_batch_inserter.cpp b/src/mongo/db/s/migration_batch_inserter.cpp new file mode 100644 index 00000000000..37f55947745 --- /dev/null +++ b/src/mongo/db/s/migration_batch_inserter.cpp @@ -0,0 +1,200 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration + +#include "mongo/db/s/migration_batch_inserter.h" + +#include "mongo/db/s/migration_util.h" +#include "mongo/db/transaction_participant.h" +#include "mongo/logv2/log.h" + +namespace mongo { + +namespace { + +void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) { + MongoDOperationContextSession::checkOut(opCtx); + TransactionParticipant::get(opCtx).beginOrContinue(opCtx, + {*opCtx->getTxnNumber()}, + boost::none /* autocommit */, + boost::none /* startTransaction */); +} + +template <typename Callable> +constexpr bool returnsVoid() { + return std::is_void_v<std::invoke_result_t<Callable>>; +} + +// Yields the checked out session before running the given function. If the function runs without +// throwing, will reacquire the session and verify it is still valid to proceed with the migration. +template <typename Callable, std::enable_if_t<!returnsVoid<Callable>(), int> = 0> +auto runWithoutSession(OperationContext* opCtx, Callable callable) { + MongoDOperationContextSession::checkIn(opCtx, OperationContextSession::CheckInReason::kYield); + + auto retVal = callable(); + + // The below code can throw, so it cannot run in a scope guard. + opCtx->checkForInterrupt(); + checkOutSessionAndVerifyTxnState(opCtx); + + return retVal; +} + +// Same as runWithoutSession above but takes a void function. +template <typename Callable, std::enable_if_t<returnsVoid<Callable>(), int> = 0> +void runWithoutSession(OperationContext* opCtx, Callable callable) { + MongoDOperationContextSession::checkIn(opCtx, OperationContextSession::CheckInReason::kYield); + + callable(); + + // The below code can throw, so it cannot run in a scope guard. + opCtx->checkForInterrupt(); + checkOutSessionAndVerifyTxnState(opCtx); +} +} // namespace + + +void MigrationBatchInserter::onCreateThread(const std::string& threadName) { + Client::initThread(threadName, getGlobalServiceContext(), nullptr); + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationKillableByStepdown(lk); + } +} + +void MigrationBatchInserter::run(Status status) const try { + // Run is passed in a non-ok status if this function runs inline. + // That happens if we schedule this task on a ThreadPool that is + // already shutdown. If we were to schedule a task on a shutdown ThreadPool, + // then there is a logic error in our code. Therefore, we assert that here. + + invariant(status.isOK()); + auto arr = _batch["objects"].Obj(); + if (arr.isEmpty()) + return; + + auto executor = + Grid::get(_innerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor(); + + auto applicationOpCtx = CancelableOperationContext( + cc().makeOperationContext(), _innerOpCtx->getCancellationToken(), executor); + + auto opCtx = applicationOpCtx.get(); + + auto assertNotAborted = [&]() { + { + stdx::lock_guard<Client> lk(*_outerOpCtx->getClient()); + _outerOpCtx->checkForInterrupt(); + } + opCtx->checkForInterrupt(); + }; + + auto it = arr.begin(); + while (it != arr.end()) { + int batchNumCloned = 0; + int batchClonedBytes = 0; + const int batchMaxCloned = migrateCloneInsertionBatchSize.load(); + + assertNotAborted(); + + write_ops::InsertCommandRequest insertOp(_nss); + insertOp.getWriteCommandRequestBase().setOrdered(true); + insertOp.setDocuments([&] { + std::vector<BSONObj> toInsert; + while (it != arr.end() && (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) { + const auto& doc = *it; + BSONObj docToClone = doc.Obj(); + toInsert.push_back(docToClone); + batchNumCloned++; + batchClonedBytes += docToClone.objsize(); + ++it; + } + return toInsert; + }()); + + { + // Disable the schema validation (during document inserts and updates) + // and any internal validation for opCtx for performInserts() + DisableDocumentValidation documentValidationDisabler( + opCtx, + DocumentValidationSettings::kDisableSchemaValidation | + DocumentValidationSettings::kDisableInternalValidation); + const auto reply = + write_ops_exec::performInserts(opCtx, insertOp, OperationSource::kFromMigrate); + for (unsigned long i = 0; i < reply.results.size(); ++i) { + uassertStatusOKWithContext( + reply.results[i], + str::stream() << "Insert of " << insertOp.getDocuments()[i] << " failed."); + } + // Revert to the original DocumentValidationSettings for opCtx + } + + migrationutil::persistUpdatedNumOrphans( + opCtx, _migrationId, _collectionUuid, batchNumCloned); + _migrationProgress->updateMaxOptime( + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp()); + + ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(batchNumCloned); + LOGV2(6718408, + "Incrementing numCloned count by {batchNumCloned} and numClonedBytes by " + "{batchClonedBytes}", + "batchNumCloned"_attr = batchNumCloned, + "batchClonedBytes"_attr = batchClonedBytes); + _migrationProgress->incNumCloned(batchNumCloned); + _migrationProgress->incNumBytes(batchClonedBytes); + + if (_writeConcern.needToWaitForOtherNodes() && _threadCount == 1) { + runWithoutSession(_outerOpCtx, [&] { + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::ReplicationCoordinator::get(opCtx)->awaitReplication( + opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), + _writeConcern); + if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { + LOGV2_WARNING(22011, + "secondaryThrottle on, but doc insert timed out; continuing", + "migrationId"_attr = _migrationId.toBSON()); + } else { + uassertStatusOK(replStatus.status); + } + }); + } + + sleepmillis(migrateCloneInsertionBatchDelayMS.load()); + } +} catch (const DBException& e) { + stdx::lock_guard<Client> lk(*_innerOpCtx->getClient()); + _innerOpCtx->getServiceContext()->killOperation(lk, _innerOpCtx, ErrorCodes::Error(6718402)); + LOGV2(6718407, + "Batch application failed: {error}", + "Batch application failed", + "error"_attr = e.toStatus()); +} +} // namespace mongo diff --git a/src/mongo/db/s/migration_batch_inserter.h b/src/mongo/db/s/migration_batch_inserter.h new file mode 100644 index 00000000000..c3223921602 --- /dev/null +++ b/src/mongo/db/s/migration_batch_inserter.h @@ -0,0 +1,136 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "boost/optional/optional.hpp" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/cancelable_operation_context.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/s/migration_session_id.h" +#include "mongo/db/s/range_deletion_util.h" +#include "mongo/db/s/sharding_runtime_d_params_gen.h" +#include "mongo/db/s/sharding_statistics.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/grid.h" +#include "mongo/util/uuid.h" + +#pragma once + +namespace mongo { + +// The purpose of this type is to allow inserters to communicate +// their progress to the outside world. +class MigrationCloningProgressSharedState { + mutable Mutex _m; + repl::OpTime _maxOptime; + long long _numCloned = 0; + long long _numBytes = 0; + +public: + void updateMaxOptime(const repl::OpTime& _newOptime) { + stdx::lock_guard lk(_m); + _maxOptime = std::max(_maxOptime, _newOptime); + } + repl::OpTime getMaxOptime() const { + stdx::lock_guard lk(_m); + return _maxOptime; + } + void incNumCloned(int num) { + stdx::lock_guard lk(_m); + _numCloned += num; + } + void incNumBytes(int num) { + stdx::lock_guard lk(_m); + _numBytes += num; + } + long long getNumCloned() const { + stdx::lock_guard lk(_m); + return _numCloned; + } + long long getNumBytes() const { + stdx::lock_guard lk(_m); + return _numBytes; + } +}; + +// This type contains a BSONObj _batch corresponding to a _migrateClone response. +// The purpose of this type is to perform the insertions for this batch. +// Those insertions happen in its "run" method. The MigrationBatchFetcher +// schedules these jobs on a thread pool. This class has no knowledge that it runs +// on a thread pool. It sole purpose is to perform insertions and communicate its progress +// (inluding the new max opTime). +class MigrationBatchInserter { +public: + // Do inserts. + void run(Status status) const; + + MigrationBatchInserter(OperationContext* outerOpCtx, + OperationContext* innerOpCtx, + BSONObj batch, + const NamespaceString& nss, + const ChunkRange& range, + const WriteConcernOptions& writeConcern, + const UUID& collectionUuid, + std::shared_ptr<MigrationCloningProgressSharedState> migrationProgress, + const UUID& migrationId, + int threadCount) + : _outerOpCtx{outerOpCtx}, + _innerOpCtx{innerOpCtx}, + _batch{batch}, + _nss{nss}, + _range{range}, + _writeConcern{writeConcern}, + _collectionUuid{collectionUuid}, + _migrationProgress{migrationProgress}, + _migrationId{migrationId}, + _threadCount{threadCount} {} + + static void onCreateThread(const std::string& threadName); + +private: + OperationContext* _outerOpCtx; + OperationContext* _innerOpCtx; + BSONObj _batch; + NamespaceString _nss; + ChunkRange _range; + WriteConcernOptions _writeConcern; + UUID _collectionUuid; + std::shared_ptr<MigrationCloningProgressSharedState> _migrationProgress; + UUID _migrationId; + int _threadCount; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/migration_batch_mock_inserter.h b/src/mongo/db/s/migration_batch_mock_inserter.h new file mode 100644 index 00000000000..8b4b766480c --- /dev/null +++ b/src/mongo/db/s/migration_batch_mock_inserter.h @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/s/migration_batch_inserter.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/s/catalog/type_chunk.h" + +#pragma once + +namespace mongo { + +class MigrationBatchMockInserter { +public: + void run(Status status) const { + // Run is passed in a non-ok status if this function runs inline. + // That happens if we schedule this task on a ThreadPool that is + // already shutdown. We should never do that. Therefore, + // we assert that here. + invariant(status.isOK()); + } + MigrationBatchMockInserter(OperationContext*, + OperationContext*, + BSONObj, + NamespaceString, + ChunkRange, + WriteConcernOptions, + UUID, + std::shared_ptr<MigrationCloningProgressSharedState>, + UUID, + int) {} + + static void onCreateThread(const std::string& threadName) {} + +private: + BSONObj _batch; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source.cpp b/src/mongo/db/s/migration_chunk_cloner_source.cpp index 6df6ebadd6f..194e929cb70 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source.cpp @@ -27,6 +27,8 @@ * it in the license file. */ + +#include "mongo/bson/bsonobj.h" #include "mongo/platform/basic.h" #include "mongo/db/s/migration_chunk_cloner_source.h" diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 0aed75b5303..a5079a4041d 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -34,6 +34,7 @@ #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" #include "mongo/base/status.h" +#include "mongo/bson/bsonobj.h" #include "mongo/client/read_preference.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog_raii.h" @@ -711,6 +712,61 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon _jumboChunkCloneState->clonerExec->detachFromOperationContext(); } +boost::optional<Snapshotted<BSONObj>> MigrationChunkClonerSourceLegacy::_getNextDoc( + OperationContext* opCtx, const CollectionPtr& collection) { + while (true) { + stdx::unique_lock lk(_mutex); + invariant(_inProgressReads >= 0); + RecordId nextRecordId; + Snapshotted<BSONObj> doc; + + _moreDocsCV.wait(lk, [&]() { + return _cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty() || + _inProgressReads == 0; + }); + + // One of the following must now be true (corresponding to the three if conditions): + // 1. There is a document in the overflow set + // 2. The iterator has not reached the end of the record id set + // 3. The overflow set is empty, the iterator is at the end, and + // no threads are holding a document. This condition indicates + // that there are no more docs to return for the cloning phase. + if (!_overflowDocs.empty()) { + doc = std::move(_overflowDocs.front()); + _overflowDocs.pop_front(); + ++_inProgressReads; + return doc; + } else if (_cloneRecordIdsIter != _cloneLocs.end()) { + nextRecordId = *_cloneRecordIdsIter; + ++_cloneRecordIdsIter; + ++_inProgressReads; + } else { + invariant(_numRecordsCloned + _numRecordsPassedOver == _cloneLocs.size()); + return boost::none; + } + + // In order to saturate the disk, the I/O operation must occur without + // holding the mutex. + lk.unlock(); + if (collection->findDoc(opCtx, nextRecordId, &doc)) + return doc; + lk.lock(); + ++_numRecordsPassedOver; + + // It is possible that this document is no longer in the collection, + // in which case, we try again and indicate to other threads that this + // thread is not holding a document. + --_inProgressReads; + _moreDocsCV.notify_one(); + } +} + +void MigrationChunkClonerSourceLegacy::_insertOverflowDoc(Snapshotted<BSONObj> doc) { + stdx::lock_guard lk(_mutex); + invariant(_inProgressReads >= 1); + _overflowDocs.push_back(std::move(doc)); +} + void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx, const CollectionPtr& collection, BSONArrayBuilder* arrBuilder) { @@ -718,49 +774,56 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon internalQueryExecYieldIterations.load(), Milliseconds(internalQueryExecYieldPeriodMS.load())); - stdx::unique_lock<Latch> lk(_mutex); - auto iter = _cloneLocs.begin(); + while (auto doc = _getNextDoc(opCtx, collection)) { + ON_BLOCK_EXIT([&]() { + stdx::lock_guard lk(_mutex); + invariant(_inProgressReads > 0); + --_inProgressReads; + _moreDocsCV.notify_one(); + }); - for (; iter != _cloneLocs.end(); ++iter) { // We must always make progress in this method by at least one document because empty // return indicates there is no more initial clone data. if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) { + _insertOverflowDoc(std::move(*doc)); break; } - auto nextRecordId = *iter; - - lk.unlock(); - ON_BLOCK_EXIT([&lk] { lk.lock(); }); - - Snapshotted<BSONObj> doc; - if (collection->findDoc(opCtx, nextRecordId, &doc)) { - // Do not send documents that are no longer in the chunk range being moved. This can - // happen when document shard key value of the document changed after the initial - // index scan during cloning. This is needed because the destination is very - // conservative in processing xferMod deletes and won't delete docs that are not in - // the range of the chunk being migrated. - if (!isDocInRange(doc.value(), - _args.getMin().value(), - _args.getMax().value(), - _shardKeyPattern)) { - continue; + // Do not send documents that are no longer in the chunk range being moved. This can + // happen when document shard key value of the document changed after the initial + // index scan during cloning. This is needed because the destination is very + // conservative in processing xferMod deletes and won't delete docs that are not in + // the range of the chunk being migrated. + if (!isDocInRange( + doc->value(), _args.getMin().value(), _args.getMax().value(), _shardKeyPattern)) { + { + stdx::lock_guard lk(_mutex); + _numRecordsPassedOver++; } + continue; + } - // Use the builder size instead of accumulating the document sizes directly so - // that we take into consideration the overhead of BSONArray indices. - if (arrBuilder->arrSize() && - (arrBuilder->len() + doc.value().objsize() + 1024) > BSONObjMaxUserSize) { - - break; - } + // Use the builder size instead of accumulating the document sizes directly so + // that we take into consideration the overhead of BSONArray indices. + if (arrBuilder->arrSize() && + (arrBuilder->len() + doc->value().objsize() + 1024) > BSONObjMaxUserSize) { + _insertOverflowDoc(std::move(*doc)); + break; + } - arrBuilder->append(doc.value()); - ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1); + { + stdx::lock_guard lk(_mutex); + _numRecordsCloned++; } + arrBuilder->append(doc->value()); + ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1); } - _cloneLocs.erase(_cloneLocs.begin(), iter); + // When we reach here, there are no more documents to return to the destination. + // We therefore need to notify a other threads that maybe sleeping on the condition + // variable that we are done. + stdx::lock_guard lk(_mutex); + _moreDocsCV.notify_one(); } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { @@ -803,7 +866,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, { // All clone data must have been drained before starting to fetch the incremental changes. stdx::unique_lock<Latch> lk(_mutex); - invariant(_cloneLocs.empty()); + invariant(_cloneRecordIdsIter == _cloneLocs.end()); // The "snapshot" for delete and update list must be taken under a single lock. This is to // ensure that we will preserve the causal order of writes. Always consume the delete @@ -1002,6 +1065,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC } } } + _cloneRecordIdsIter = _cloneLocs.begin(); } catch (DBException& exception) { exception.addContext("Executor error while scanning for documents belonging to chunk"); throw; @@ -1099,7 +1163,6 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC stdx::lock_guard<Latch> sl(_mutex); - const std::size_t cloneLocsRemaining = _cloneLocs.size(); int64_t untransferredModsSizeBytes = _untransferredDeletesCounter * _averageObjectIdSize + _untransferredUpsertsCounter * _averageObjectSizeForCloneLocs; @@ -1121,13 +1184,14 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC "moveChunk data transfer progress", "response"_attr = redact(res), "memoryUsedBytes"_attr = _memoryUsed, - "docsRemainingToClone"_attr = cloneLocsRemaining, + "docsRemainingToClone"_attr = + _cloneLocs.size() - _numRecordsCloned - _numRecordsPassedOver, "untransferredModsSizeBytes"_attr = untransferredModsSizeBytes); } if (res["state"].String() == "steady" && sessionCatalogSourceInCatchupPhase && estimateUntransferredSessionsSize == 0) { - if (cloneLocsRemaining != 0 || + if ((_cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty()) || (_jumboChunkCloneState && _forceJumbo && PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) { return {ErrorCodes::OperationIncomplete, diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 8c15fa7a0cb..081ffc799ba 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -29,8 +29,10 @@ #pragma once +#include <deque> #include <list> #include <memory> +#include <mutex> #include <set> #include "mongo/bson/bsonobj.h" @@ -261,6 +263,15 @@ private: Status _storeCurrentLocs(OperationContext* opCtx); /** + * Returns boost::none if there are no more documents to get. + * Increments _inProgressReads if and only if return value is not none. + */ + boost::optional<Snapshotted<BSONObj>> _getNextDoc(OperationContext* opCtx, + const CollectionPtr& collection); + + void _insertOverflowDoc(Snapshotted<BSONObj> doc); + + /** * Adds the OpTime to the list of OpTimes for oplog entries that we should consider migrating as * part of session migration. */ @@ -351,6 +362,47 @@ private: // List of record ids that needs to be transferred (initial clone) std::set<RecordId> _cloneLocs; + // This iterator is a pointer into the _cloneLocs set. It allows concurrent access to + // the _cloneLocs set by allowing threads servicing _migrateClone requests to do the + // following: + // 1. Acquire mutex "_mutex" above. + // 2. Copy *_cloneRecordIdsIter into its local stack frame. + // 3. Increment _cloneRecordIdsIter + // 4. Unlock "_mutex." + // 5. Do the I/O to fetch the document corresponding to this record Id. + // + // The purpose of this algorithm, is to allow different threads to concurrently start I/O jobs + // in order to more fully saturate the disk. + // + // One issue with this algorithm, is that only 16MB worth of documents can be returned in + // response to a _migrateClone request. But, the thread does not know the size of a document + // until it does the I/O. At which point, if the document does not fit in the response to + // _migrateClone request the document must be made available to a different thread servicing a + // _migrateClone request. To solve this problem, the thread adds the document + // to the below _overflowDocs deque. + std::set<RecordId>::iterator _cloneRecordIdsIter; + + // This deque stores all documents that must be sent to the destination, but could not fit + // in the response to a particular _migrateClone request. + std::deque<Snapshotted<BSONObj>> _overflowDocs; + + // This integer represents how many documents are being "held" by threads servicing + // _migrateClone requests. Any document that is "held" by a thread may be added to the + // _overflowDocs deque if it doesn't fit in the response to a _migrateClone request. + // This integer is necessary because it gives us a condition on when all documents to be sent + // to the destination have been exhausted. + // + // If (_cloneRecordIdsIter == _cloneLocs.end() && _overflowDocs.empty() && _inProgressReads + // == 0) then all documents have been returned to the destination. + decltype(_cloneLocs.size()) _inProgressReads = 0; + + // This condition variable allows us to wait on the following condition: + // Either we're done and the above condition is satisfied, or there is some document to + // return. + stdx::condition_variable _moreDocsCV; + decltype(_cloneLocs.size()) _numRecordsCloned{0}; + decltype(_cloneLocs.size()) _numRecordsPassedOver{0}; + // The estimated average object size during the clone phase. Used for buffer size // pre-allocation (initial clone). uint64_t _averageObjectSizeForCloneLocs{0}; diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 721fab26d3d..4cbcf6cb260 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -29,6 +29,7 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration +#include "mongo/db/s/migration_batch_fetcher.h" #include "mongo/platform/basic.h" #include "mongo/db/s/migration_destination_manager.h" @@ -411,8 +412,8 @@ void MigrationDestinationManager::report(BSONObjBuilder& b, } BSONObjBuilder bb(b.subobjStart("counts")); - bb.append("cloned", _numCloned); - bb.append("clonedBytes", _clonedBytes); + bb.append("cloned", _getNumCloned()); + bb.append("clonedBytes", _getNumBytesCloned()); bb.append("catchup", _numCatchup); bb.append("steady", _numSteady); bb.done(); @@ -446,6 +447,8 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, _lsid = cloneRequest.getLsid(); _txnNumber = cloneRequest.getTxnNumber(); + _parallelFetchersSupported = cloneRequest.parallelFetchingSupported(); + _nss = nss; _fromShard = cloneRequest.getFromShardId(); _fromShardConnString = @@ -462,8 +465,8 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, _chunkMarkedPending = false; - _numCloned = 0; - _clonedBytes = 0; + _migrationCloningProgress = std::make_shared<MigrationCloningProgressSharedState>(); + _numCatchup = 0; _numSteady = 0; @@ -1338,122 +1341,32 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, _sessionMigration->start(opCtx->getServiceContext()); - const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId); - _chunkMarkedPending = true; // no lock needed, only the migrate thread looks. - auto assertNotAborted = [&](OperationContext* opCtx) { - opCtx->checkForInterrupt(); - outerOpCtx->checkForInterrupt(); - uassert(50748, "Migration aborted while copying documents", getState() != kAbort); - }; - - auto insertBatchFn = [&](OperationContext* opCtx, BSONObj nextBatch) { - auto arr = nextBatch["objects"].Obj(); - if (arr.isEmpty()) { - return false; - } - auto it = arr.begin(); - while (it != arr.end()) { - int batchNumCloned = 0; - int batchClonedBytes = 0; - const int batchMaxCloned = migrateCloneInsertionBatchSize.load(); - - assertNotAborted(opCtx); - - write_ops::InsertCommandRequest insertOp(_nss); - insertOp.getWriteCommandRequestBase().setOrdered(true); - insertOp.setDocuments([&] { - std::vector<BSONObj> toInsert; - while (it != arr.end() && - (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) { - const auto& doc = *it; - BSONObj docToClone = doc.Obj(); - toInsert.push_back(docToClone); - batchNumCloned++; - batchClonedBytes += docToClone.objsize(); - ++it; - } - return toInsert; - }()); - - { - // Disable the schema validation (during document inserts and updates) - // and any internal validation for opCtx for performInserts() - DisableDocumentValidation documentValidationDisabler( - opCtx, - DocumentValidationSettings::kDisableSchemaValidation | - DocumentValidationSettings::kDisableInternalValidation); - const auto reply = write_ops_exec::performInserts( - opCtx, insertOp, OperationSource::kFromMigrate); - for (unsigned long i = 0; i < reply.results.size(); ++i) { - uassertStatusOKWithContext(reply.results[i], - str::stream() << "Insert of " - << insertOp.getDocuments()[i] - << " failed."); - } - // Revert to the original DocumentValidationSettings for opCtx - } - - migrationutil::persistUpdatedNumOrphans( - opCtx, _migrationId.get(), *_collectionUuid, batchNumCloned); - - { - stdx::lock_guard<Latch> statsLock(_mutex); - _numCloned += batchNumCloned; - ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch( - batchNumCloned); - _clonedBytes += batchClonedBytes; - } - if (_writeConcern.needToWaitForOtherNodes()) { - runWithoutSession(outerOpCtx, [&] { - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::ReplicationCoordinator::get(opCtx)->awaitReplication( - opCtx, - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - _writeConcern); - if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { - LOGV2_WARNING( - 22011, - "secondaryThrottle on, but doc insert timed out; continuing", - "migrationId"_attr = _migrationId->toBSON()); - } else { - uassertStatusOK(replStatus.status); - } - }); - } - - sleepmillis(migrateCloneInsertionBatchDelayMS.load()); - } - return true; - }; - - auto fetchBatchFn = [&](OperationContext* opCtx, BSONObj* nextBatch) { - auto commandResponse = uassertStatusOKWithContext( - fromShard->runCommand(opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - "admin", - migrateCloneRequest, - Shard::RetryPolicy::kNoRetry), - "_migrateClone failed: "); - - uassertStatusOKWithContext( - Shard::CommandResponse::getEffectiveStatus(commandResponse), - "_migrateClone failed: "); - - *nextBatch = commandResponse.response; - return nextBatch->getField("objects").Obj().isEmpty(); - }; - - // If running on a replicated system, we'll need to flush the docs we cloned to the - // secondaries - lastOpApplied = fetchAndApplyBatch(opCtx, insertBatchFn, fetchBatchFn); + { + // Destructor of MigrationBatchFetcher is non-trivial. Therefore, + // this scope has semantic significance. + MigrationBatchFetcher<MigrationBatchInserter> fetcher{outerOpCtx, + opCtx, + _nss, + *_sessionId, + _writeConcern, + _fromShard, + range, + *_migrationId, + *_collectionUuid, + _migrationCloningProgress, + _parallelFetchersSupported}; + fetcher.fetchAndScheduleInsertion(); + } + opCtx->checkForInterrupt(); + lastOpApplied = _migrationCloningProgress->getMaxOptime(); timing->done(4); migrateThreadHangAtStep4.pauseWhileSet(); if (MONGO_unlikely(failMigrationOnRecipient.shouldFail())) { - _setStateFail(str::stream() << "failing migration after cloning " << _numCloned + _setStateFail(str::stream() << "failing migration after cloning " << _getNumCloned() << " docs due to failMigrationOnRecipient failpoint"); return; } diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 925296091cb..74d737f8302 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -29,6 +29,7 @@ #pragma once +#include <memory> #include <string> #include "mongo/base/string_data.h" @@ -39,6 +40,8 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/replica_set_aware_service.h" #include "mongo/db/s/active_migrations_registry.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/migration_batch_fetcher.h" #include "mongo/db/s/migration_recipient_recovery_document_gen.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/db/s/session_catalog_migration_destination.h" @@ -178,6 +181,11 @@ public: const boost::optional<ChunkManager>& cm, boost::optional<Timestamp> afterClusterTime); + + bool isParallelFetchingSupported() { + return _parallelFetchersSupported; + } + /** * Gets the collection uuid and options from fromShardId. If given a chunk manager, will fetch * the collection options using the database version protocol. @@ -283,8 +291,22 @@ private: stdx::thread _migrateThreadHandle; + long long _getNumCloned() { + return _migrationCloningProgress->getNumCloned(); + } + + long long _getNumBytesCloned() { + return _migrationCloningProgress->getNumBytes(); + } + boost::optional<UUID> _migrationId; boost::optional<UUID> _collectionUuid; + + // State that is shared among all inserter threads. + std::shared_ptr<MigrationCloningProgressSharedState> _migrationCloningProgress; + + bool _parallelFetchersSupported; + LogicalSessionId _lsid; TxnNumber _txnNumber{kUninitializedTxnNumber}; NamespaceString _nss; @@ -304,8 +326,6 @@ private: // failure we can perform the appropriate cleanup. bool _chunkMarkedPending{false}; - long long _numCloned{0}; - long long _clonedBytes{0}; long long _numCatchup{0}; long long _numSteady{0}; diff --git a/src/mongo/db/s/start_chunk_clone_request.cpp b/src/mongo/db/s/start_chunk_clone_request.cpp index 86567dce161..6838aedaffc 100644 --- a/src/mongo/db/s/start_chunk_clone_request.cpp +++ b/src/mongo/db/s/start_chunk_clone_request.cpp @@ -50,6 +50,7 @@ const char kToShardId[] = "toShardName"; const char kChunkMinKey[] = "min"; const char kChunkMaxKey[] = "max"; const char kShardKeyPattern[] = "shardKeyPattern"; +const char kParallelMigration[] = "parallelMigrateCloneSupported"; } // namespace @@ -152,6 +153,14 @@ StatusWith<StartChunkCloneRequest> StartChunkCloneRequest::createFromCommand(Nam } } + { + Status status = bsonExtractBooleanFieldWithDefault( + obj, kParallelMigration, false, &request._parallelFetchingSupported); + if (!status.isOK()) { + return status; + } + } + request._migrationId = UUID::parse(obj); request._lsid = LogicalSessionId::parse(IDLParserErrorContext("StartChunkCloneRequest"), obj[kLsid].Obj()); @@ -179,6 +188,7 @@ void StartChunkCloneRequest::appendAsCommand( invariant(fromShardConnectionString.isValid()); builder->append(kRecvChunkStart, nss.ns()); + builder->append(kParallelMigration, true); migrationId.appendToBuilder(builder, kMigrationId); builder->append(kLsid, lsid.toBSON()); diff --git a/src/mongo/db/s/start_chunk_clone_request.h b/src/mongo/db/s/start_chunk_clone_request.h index c6ecba1f839..8f433afd2f7 100644 --- a/src/mongo/db/s/start_chunk_clone_request.h +++ b/src/mongo/db/s/start_chunk_clone_request.h @@ -93,6 +93,10 @@ public: return _migrationId.is_initialized(); } + bool parallelFetchingSupported() const { + return _parallelFetchingSupported; + } + const UUID& getMigrationId() const { invariant(_migrationId); return *_migrationId; @@ -161,6 +165,8 @@ private: // The parsed secondary throttle options MigrationSecondaryThrottleOptions _secondaryThrottle; + + bool _parallelFetchingSupported; }; } // namespace mongo |