summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Witten <andrew.witten@mongodb.com>2022-12-22 23:28:14 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-23 17:01:18 +0000
commitede7bdea766fff40678d03a7051b0d15de0fbba6 (patch)
tree2422031308d3b708c76d9bc58e62cf6bbfdc651f
parentc585f971a68e707da7420a6a5c19988bb65a6205 (diff)
downloadmongo-ede7bdea766fff40678d03a7051b0d15de0fbba6.tar.gz
SERVER-67183 Add parallel fetchers and inserters for chunk migration
(cherry picked from commit 1564f715d16870df7a524ad702aad6be0f2da1f5)
-rw-r--r--jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js2
-rw-r--r--jstests/sharding/move_chunk_concurrent_cloning.js160
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/migration_batch_fetcher.cpp230
-rw-r--r--src/mongo/db/s/migration_batch_fetcher.h168
-rw-r--r--src/mongo/db/s/migration_batch_fetcher_test.cpp269
-rw-r--r--src/mongo/db/s/migration_batch_inserter.cpp200
-rw-r--r--src/mongo/db/s/migration_batch_inserter.h136
-rw-r--r--src/mongo/db/s/migration_batch_mock_inserter.h67
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.cpp2
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp132
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h52
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp139
-rw-r--r--src/mongo/db/s/migration_destination_manager.h24
-rw-r--r--src/mongo/db/s/start_chunk_clone_request.cpp10
-rw-r--r--src/mongo/db/s/start_chunk_clone_request.h6
16 files changed, 1375 insertions, 225 deletions
diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js b/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js
index 1dcf4f203b9..ff7a8364d86 100644
--- a/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js
+++ b/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js
@@ -40,7 +40,7 @@ var $config = extendWorkload($config, function($config, $super) {
// So, if the range deleter has not yet cleaned up that document when the chunk is
// moved back to the original shard, the moveChunk may fail as a result of a duplicate
// key error on the recipient.
- err.message.includes("Location51008"));
+ err.message.includes("Location51008") || err.message.includes("Location6718402"));
};
$config.data.runningWithStepdowns =
diff --git a/jstests/sharding/move_chunk_concurrent_cloning.js b/jstests/sharding/move_chunk_concurrent_cloning.js
index 954c2d7cd35..3166f823351 100644
--- a/jstests/sharding/move_chunk_concurrent_cloning.js
+++ b/jstests/sharding/move_chunk_concurrent_cloning.js
@@ -6,91 +6,101 @@
load('./jstests/libs/chunk_manipulation_util.js');
-// For startParallelOps to write its state
-let staticMongod = MongoRunner.runMongod({});
-
-let st = new ShardingTest({shards: 2});
-st.stopBalancer();
-
-const kThreadCount = Math.floor(Math.random() * 31) + 1;
-const kPadding = new Array(1024).join("x");
-
-let testDB = st.s.getDB('test');
-assert.commandWorked(testDB.adminCommand({enableSharding: 'test'}));
-st.ensurePrimaryShard('test', st.shard0.shardName);
-assert.commandWorked(testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}}));
-
-let shardKeyVal = 0;
-const kDocsInBatch = 8 * 1000;
-const kMinCollSize = 128 * 1024 * 1024;
-let approxInsertedSize = 0;
-while (approxInsertedSize < kMinCollSize) {
- var bulk = testDB.user.initializeUnorderedBulkOp();
- for (let docs = 0; docs < kDocsInBatch; docs++) {
- shardKeyVal++;
- bulk.insert({_id: shardKeyVal, x: shardKeyVal, padding: kPadding});
+const runParallelMoveChunk = (numThreads) => {
+ // For startParallelOps to write its state
+ let staticMongod = MongoRunner.runMongod({});
+
+ let st = new ShardingTest({shards: 2});
+ st.stopBalancer();
+
+ const kThreadCount = numThreads;
+ const kPadding = new Array(1024).join("x");
+
+ let testDB = st.s.getDB('test');
+ assert.commandWorked(testDB.adminCommand({enableSharding: 'test'}));
+ st.ensurePrimaryShard('test', st.shard0.shardName);
+ assert.commandWorked(testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}}));
+
+ let shardKeyVal = 0;
+ const kDocsInBatch = 8 * 1000;
+ const kMinCollSize = 128 * 1024 * 1024;
+ let approxInsertedSize = 0;
+ while (approxInsertedSize < kMinCollSize) {
+ var bulk = testDB.user.initializeUnorderedBulkOp();
+ for (let docs = 0; docs < kDocsInBatch; docs++) {
+ shardKeyVal++;
+ bulk.insert({_id: shardKeyVal, x: shardKeyVal, padding: kPadding});
+ }
+ assert.commandWorked(bulk.execute());
+
+ approxInsertedSize = approxInsertedSize + (kDocsInBatch * 1024);
}
- assert.commandWorked(bulk.execute());
- approxInsertedSize = approxInsertedSize + (kDocsInBatch * 1024);
-}
-
-const kInitialLoadFinalKey = shardKeyVal;
-
-print(`Running tests with migrationConcurrency == ${kThreadCount}`);
-st._rs.forEach((replSet) => {
- assert.commandWorked(replSet.test.getPrimary().adminCommand(
- {setParameter: 1, migrationConcurrency: kThreadCount}));
-});
-
-const configCollEntry =
- st.s.getDB('config').getCollection('collections').findOne({_id: 'test.user'});
-let chunks = st.s.getDB('config').chunks.find({uuid: configCollEntry.uuid}).toArray();
-assert.eq(1, chunks.length, tojson(chunks));
-
-let joinMoveChunk =
- moveChunkParallel(staticMongod, st.s0.host, {x: 0}, null, 'test.user', st.shard1.shardName);
-
-// Migration cloning scans by shard key order. Perform some writes against the collection on both
-// the lower and upper ends of the shard key values while migration is happening to exercise
-// xferMods logic.
-const kDeleteIndexOffset = kInitialLoadFinalKey - 3000;
-const kUpdateIndexOffset = kInitialLoadFinalKey - 5000;
-for (let x = 0; x < 1000; x++) {
- assert.commandWorked(testDB.user.remove({x: x}));
- assert.commandWorked(testDB.user.update({x: 4000 + x}, {$set: {updated: true}}));
+ const kInitialLoadFinalKey = shardKeyVal;
+
+ print(`Running tests with migrationConcurrency == ${kThreadCount}`);
+ st._rs.forEach((replSet) => {
+ assert.commandWorked(replSet.test.getPrimary().adminCommand(
+ {setParameter: 1, migrationConcurrency: kThreadCount}));
+ });
+
+ const configCollEntry =
+ st.s.getDB('config').getCollection('collections').findOne({_id: 'test.user'});
+ let chunks = st.s.getDB('config').chunks.find({uuid: configCollEntry.uuid}).toArray();
+ assert.eq(1, chunks.length, tojson(chunks));
+
+ let joinMoveChunk =
+ moveChunkParallel(staticMongod, st.s0.host, {x: 0}, null, 'test.user', st.shard1.shardName);
+
+ // Migration cloning scans by shard key order. Perform some writes against the collection on
+ // both the lower and upper ends of the shard key values while migration is happening to
+ // exercise xferMods logic.
+ const kDeleteIndexOffset = kInitialLoadFinalKey - 3000;
+ const kUpdateIndexOffset = kInitialLoadFinalKey - 5000;
+ for (let x = 0; x < 1000; x++) {
+ assert.commandWorked(testDB.user.remove({x: x}));
+ assert.commandWorked(testDB.user.update({x: 4000 + x}, {$set: {updated: true}}));
+
+ assert.commandWorked(testDB.user.remove({x: kDeleteIndexOffset + x}));
+ assert.commandWorked(
+ testDB.user.update({x: kUpdateIndexOffset + x}, {$set: {updated: true}}));
+
+ let newShardKey = kInitialLoadFinalKey + x + 1;
+ assert.commandWorked(testDB.user.insert({_id: newShardKey, x: newShardKey}));
+ }
- assert.commandWorked(testDB.user.remove({x: kDeleteIndexOffset + x}));
- assert.commandWorked(testDB.user.update({x: kUpdateIndexOffset + x}, {$set: {updated: true}}));
+ joinMoveChunk();
- let newShardKey = kInitialLoadFinalKey + x + 1;
- assert.commandWorked(testDB.user.insert({_id: newShardKey, x: newShardKey}));
-}
+ let shardKeyIdx = 1000; // Index starts at 1k since we deleted the first 1k docs.
+ let cursor = testDB.user.find().sort({x: 1});
-joinMoveChunk();
+ while (cursor.hasNext()) {
+ let next = cursor.next();
+ assert.eq(next.x, shardKeyIdx);
-let shardKeyIdx = 1000; // Index starts at 1k since we deleted the first 1k docs.
-let cursor = testDB.user.find().sort({x: 1});
+ if ((shardKeyIdx >= 4000 && shardKeyIdx < 5000) ||
+ (shardKeyIdx >= kUpdateIndexOffset && shardKeyIdx < (kUpdateIndexOffset + 1000))) {
+ assert.eq(true, next.updated, tojson(next));
+ }
-while (cursor.hasNext()) {
- let next = cursor.next();
- assert.eq(next.x, shardKeyIdx);
+ shardKeyIdx++;
- if ((shardKeyIdx >= 4000 && shardKeyIdx < 5000) ||
- (shardKeyIdx >= kUpdateIndexOffset && shardKeyIdx < (kUpdateIndexOffset + 1000))) {
- assert.eq(true, next.updated, tojson(next));
+ if (shardKeyIdx == kDeleteIndexOffset) {
+ shardKeyIdx += 1000;
+ }
}
- shardKeyIdx++;
+ shardKeyIdx--;
+ assert.eq(shardKeyIdx, kInitialLoadFinalKey + 1000);
- if (shardKeyIdx == kDeleteIndexOffset) {
- shardKeyIdx += 1000;
- }
-}
+ st.stop();
+ MongoRunner.stopMongod(staticMongod);
+};
-shardKeyIdx--;
-assert.eq(shardKeyIdx, kInitialLoadFinalKey + 1000);
+// Run test 10 times with random concurrency levels.
+for (let i = 1; i <= 5; i++) {
+ runParallelMoveChunk(Math.floor(Math.random() * 31) + 1);
+}
+}
-st.stop();
-MongoRunner.stopMongod(staticMongod);
-})();
+ )();
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 6906cd0c57d..b581f666302 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -68,6 +68,8 @@ env.Library(
'config_server_op_observer.cpp',
'global_index_metrics.cpp',
'metadata_manager.cpp',
+ 'migration_batch_fetcher.cpp',
+ 'migration_batch_inserter.cpp',
'migration_chunk_cloner_source_legacy.cpp',
'migration_chunk_cloner_source.cpp',
'migration_coordinator_document.idl',
@@ -568,6 +570,7 @@ env.CppUnitTest(
'global_index_metrics_test.cpp',
'implicit_collection_creation_test.cpp',
'metadata_manager_test.cpp',
+ 'migration_batch_fetcher_test.cpp',
'migration_chunk_cloner_source_legacy_test.cpp',
'migration_destination_manager_test.cpp',
'migration_session_id_test.cpp',
diff --git a/src/mongo/db/s/migration_batch_fetcher.cpp b/src/mongo/db/s/migration_batch_fetcher.cpp
new file mode 100644
index 00000000000..4611cd91ec7
--- /dev/null
+++ b/src/mongo/db/s/migration_batch_fetcher.cpp
@@ -0,0 +1,230 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/db/s/migration_batch_fetcher.h"
+
+#include "mongo/logv2/log.h"
+#include "mongo/util/timer.h"
+
+namespace mongo {
+
+template <typename Inserter>
+MigrationBatchFetcher<Inserter>::MigrationBatchFetcher(
+ OperationContext* outerOpCtx,
+ OperationContext* innerOpCtx,
+ NamespaceString nss,
+ MigrationSessionId sessionId,
+ const WriteConcernOptions& writeConcern,
+ const ShardId& fromShardId,
+ const ChunkRange& range,
+ const UUID& migrationId,
+ const UUID& collectionId,
+ std::shared_ptr<MigrationCloningProgressSharedState> migrationProgress,
+ bool parallelFetchingSupported)
+ : _nss{std::move(nss)},
+ _migrationConcurrency{
+ mongo::feature_flags::gConcurrencyInChunkMigration.isEnabledAndIgnoreFCV()
+ ? migrationConcurrency.load()
+ : 1},
+ _sessionId{std::move(sessionId)},
+ _inserterWorkers{[&]() {
+ ThreadPool::Options options;
+ options.poolName = "ChunkMigrationInserters";
+ options.minThreads = _migrationConcurrency;
+ options.maxThreads = _migrationConcurrency;
+ options.onCreateThread = Inserter::onCreateThread;
+ return std::make_unique<ThreadPool>(options);
+ }()},
+ _migrateCloneRequest{_createMigrateCloneRequest()},
+ _outerOpCtx{outerOpCtx},
+ _innerOpCtx{innerOpCtx},
+ _fromShard{uassertStatusOK(
+ Grid::get(_outerOpCtx)->shardRegistry()->getShard(_outerOpCtx, fromShardId))},
+ _migrationProgress{migrationProgress},
+ _range{range},
+ _collectionUuid(collectionId),
+ _migrationId{migrationId},
+ _writeConcern{writeConcern},
+ _isParallelFetchingSupported{parallelFetchingSupported} {
+ _inserterWorkers->startup();
+}
+
+template <typename Inserter>
+BSONObj MigrationBatchFetcher<Inserter>::_fetchBatch(OperationContext* opCtx) {
+ auto commandResponse = uassertStatusOKWithContext(
+ _fromShard->runCommand(opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ "admin",
+ _migrateCloneRequest,
+ Shard::RetryPolicy::kNoRetry),
+ "_migrateClone failed: ");
+
+ uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(commandResponse),
+ "_migrateClone failed: ");
+
+ return commandResponse.response;
+}
+
+template <typename Inserter>
+void MigrationBatchFetcher<Inserter>::fetchAndScheduleInsertion() {
+ auto numFetchers = _isParallelFetchingSupported ? _migrationConcurrency : 1;
+ auto fetchersThreadPool = [&]() {
+ ThreadPool::Options options;
+ options.poolName = "ChunkMigrationFetchers";
+ options.minThreads = numFetchers;
+ options.maxThreads = numFetchers;
+ options.onCreateThread = onCreateThread;
+ return std::make_unique<ThreadPool>(options);
+ }();
+ fetchersThreadPool->startup();
+ for (int i = 0; i < numFetchers; ++i) {
+ fetchersThreadPool->schedule([this](Status status) { this->_runFetcher(); });
+ }
+
+ fetchersThreadPool->shutdown();
+ fetchersThreadPool->join();
+}
+
+
+template <typename Inserter>
+void MigrationBatchFetcher<Inserter>::_runFetcher() try {
+ auto executor =
+ Grid::get(_innerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
+
+ auto applicationOpCtx = CancelableOperationContext(
+ cc().makeOperationContext(), _innerOpCtx->getCancellationToken(), executor);
+
+ auto opCtx = applicationOpCtx.get();
+ auto assertNotAborted = [&]() {
+ {
+ stdx::lock_guard<Client> lk(*_outerOpCtx->getClient());
+ _outerOpCtx->checkForInterrupt();
+ }
+ opCtx->checkForInterrupt();
+ };
+
+ LOGV2_DEBUG(6718405, 0, "Chunk migration data fetch start", "migrationId"_attr = _migrationId);
+ while (true) {
+ Timer totalTimer;
+ BSONObj nextBatch = _fetchBatch(opCtx);
+ assertNotAborted();
+ if (_isEmptyBatch(nextBatch)) {
+ LOGV2_DEBUG(6718404,
+ 0,
+ "Chunk migration initial clone complete",
+ "migrationId"_attr = _migrationId,
+ "duration"_attr = totalTimer.elapsed());
+ break;
+ }
+
+ const auto batchSize = nextBatch.objsize();
+ const auto fetchTime = totalTimer.elapsed();
+ LOGV2_DEBUG(6718416,
+ 0,
+ "Chunk migration initial clone fetch end",
+ "migrationId"_attr = _migrationId,
+ "batchSize"_attr = batchSize,
+ "fetch"_attr = duration_cast<Milliseconds>(fetchTime));
+
+
+ Inserter inserter{_outerOpCtx,
+ _innerOpCtx,
+ nextBatch.getOwned(),
+ _nss,
+ _range,
+ _writeConcern,
+ _collectionUuid,
+ _migrationProgress,
+ _migrationId,
+ _migrationConcurrency};
+
+ _inserterWorkers->schedule([batchSize,
+ fetchTime,
+ totalTimer = std::move(totalTimer),
+ insertTimer = Timer(),
+ migrationId = _migrationId,
+ inserter = std::move(inserter)](Status status) {
+ inserter.run(status);
+
+ const auto checkDivByZero = [](auto divisor, auto expression) {
+ return divisor == 0 ? -1 : expression();
+ };
+ const auto calcThroughput = [&](auto bytes, auto duration) {
+ return checkDivByZero(durationCount<Microseconds>(duration), [&]() {
+ return static_cast<double>(bytes) / durationCount<Microseconds>(duration);
+ });
+ };
+
+ const auto insertTime = insertTimer.elapsed();
+ const auto totalTime = totalTimer.elapsed();
+ const auto batchThroughputMBps = calcThroughput(batchSize, totalTime);
+ const auto insertThroughputMBps = calcThroughput(batchSize, insertTime);
+ const auto fetchThroughputMBps = calcThroughput(batchSize, fetchTime);
+
+ LOGV2_DEBUG(6718417,
+ 1,
+ "Chunk migration initial clone apply batch",
+ "migrationId"_attr = migrationId,
+ "batchSize"_attr = batchSize,
+ "total"_attr = duration_cast<Milliseconds>(totalTime),
+ "totalThroughputMBps"_attr = batchThroughputMBps,
+ "fetch"_attr = duration_cast<Milliseconds>(fetchTime),
+ "fetchThroughputMBps"_attr = fetchThroughputMBps,
+ "insert"_attr = duration_cast<Milliseconds>(insertTime),
+ "insertThroughputMBps"_attr = insertThroughputMBps);
+ });
+ }
+} catch (const DBException& e) {
+ stdx::lock_guard<Client> lk(*_innerOpCtx->getClient());
+ _innerOpCtx->getServiceContext()->killOperation(lk, _innerOpCtx, ErrorCodes::Error(6718400));
+ LOGV2_ERROR(6718413,
+ "Chunk migration failure fetching data",
+ "migrationId"_attr = _migrationId,
+ "failure"_attr = e.toStatus());
+}
+
+template <typename Inserter>
+MigrationBatchFetcher<Inserter>::~MigrationBatchFetcher() {
+ LOGV2(6718401,
+ "Shutting down and joining inserter threads for migration {migrationId}",
+ "migrationId"_attr = _migrationId);
+ _inserterWorkers->shutdown();
+ _inserterWorkers->join();
+ LOGV2(6718415,
+ "Inserter threads for migration {migrationId} joined",
+ "migrationId"_attr = _migrationId);
+}
+
+template class MigrationBatchFetcher<MigrationBatchInserter>;
+
+template class MigrationBatchFetcher<MigrationBatchMockInserter>;
+
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_batch_fetcher.h b/src/mongo/db/s/migration_batch_fetcher.h
new file mode 100644
index 00000000000..d402c3a92e6
--- /dev/null
+++ b/src/mongo/db/s/migration_batch_fetcher.h
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/base/error_extra_info.h"
+#include "mongo/db/client.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/s/migration_batch_inserter.h"
+#include "mongo/db/s/migration_batch_mock_inserter.h"
+#include "mongo/db/s/migration_session_id.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/shard_id.h"
+#include "mongo/util/cancellation.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/producer_consumer_queue.h"
+
+#pragma once
+
+namespace mongo {
+
+
+// This class is only instantiated on the destination of a chunk migration and
+// has a single purpose: to manage two thread pools, one
+// on which threads perform inserters, and one on which
+// threads run _migrateClone requests (to fetch batches of documents to insert).
+//
+// The constructor creates and starts the inserter thread pool. The destructor shuts down
+// and joins the inserter thread pool.
+//
+// The main work of the class is in method fetchAndScheduleInsertion. That method
+// starts a thread pool for fetchers. Each thread in that thread pool sits in a loop
+// sending out _migrateClone requests, blocking on the response, and scheduling an
+// inserter on the inserter thread pool. This function joins and shuts down the
+// fetcher thread pool once all batches have been fetched.
+//
+// Inserter is templated only to allow a mock inserter to exist.
+// There is only one implementation of inserter currently, which is MigrationBatchInserter.
+//
+// A few things to note:
+// - After fetchAndScheduleInsertion returns, insertions are still being executed (although fetches
+// are not).
+// - Sending out _migrateClone requests in parallel implies the need for synchronization on the
+// source. See the comments in migration_chunk_cloner_source.h for details around
+// that.
+// - The requirement on source side synchronization implies that care must be taken on upgrade.
+// In particular, if the source is running an earlier binary that doesn't have code for
+// source side synchronization, it is unsafe to send _migrateClone requests in parallel.
+// To handle that case, when the source is prepared to service _migrateClone requests in
+// parallel, the field "parallelMigrateCloneSupported" is included in the "_recvChunkStart"
+// command. The inclusion of that field indicates to the destination that it is safe
+// to send _migrateClone requests in parallel. Its exclusion indicates that it is unsafe.
+template <typename Inserter>
+class MigrationBatchFetcher {
+public:
+ MigrationBatchFetcher(OperationContext* outerOpCtx,
+ OperationContext* innerOpCtx,
+ NamespaceString nss,
+ MigrationSessionId sessionId,
+ const WriteConcernOptions& writeConcern,
+ const ShardId& fromShardId,
+ const ChunkRange& range,
+ const UUID& migrationId,
+ const UUID& collectionId,
+ std::shared_ptr<MigrationCloningProgressSharedState> migrationInfo,
+ bool parallelFetchingSupported);
+
+ ~MigrationBatchFetcher();
+
+ // Repeatedly fetch batches (using _migrateClone request) and schedule inserter jobs
+ // on thread pool.
+ void fetchAndScheduleInsertion();
+
+ // Get inserter thread pool stats.
+ ThreadPool::Stats getThreadPoolStats() const {
+ return _inserterWorkers->getStats();
+ }
+
+private:
+ NamespaceString _nss;
+
+ // Size of thread pools.
+ int _migrationConcurrency;
+
+ MigrationSessionId _sessionId;
+
+ // Inserter thread pool.
+ std::unique_ptr<ThreadPool> _inserterWorkers;
+
+ BSONObj _migrateCloneRequest;
+
+ OperationContext* _outerOpCtx;
+
+ OperationContext* _innerOpCtx;
+
+ std::shared_ptr<Shard> _fromShard;
+
+ // Shared state, by which the progress of migration is communicated
+ // to MigrationDestinationManager.
+ std::shared_ptr<MigrationCloningProgressSharedState> _migrationProgress;
+
+ ChunkRange _range;
+
+ UUID _collectionUuid;
+
+ UUID _migrationId;
+
+ WriteConcernOptions _writeConcern;
+
+ // Indicates if source is prepared to service _migrateClone requests in parallel.
+ bool _isParallelFetchingSupported;
+
+ // Given session id and namespace, create migrateCloneRequest.
+ // Only should be created once for the lifetime of the object.
+ BSONObj _createMigrateCloneRequest() const {
+ BSONObjBuilder builder;
+ builder.append("_migrateClone", _nss.ns());
+ _sessionId.append(&builder);
+ return builder.obj();
+ }
+
+ void _runFetcher();
+
+ // Fetches next batch using _migrateClone request and return it. May return an empty batch.
+ BSONObj _fetchBatch(OperationContext* opCtx);
+
+ static bool _isEmptyBatch(const BSONObj& batch) {
+ return batch.getField("objects").Obj().isEmpty();
+ }
+
+ static void onCreateThread(const std::string& threadName) {
+ Client::initThread(threadName, getGlobalServiceContext(), nullptr);
+ {
+ stdx::lock_guard<Client> lk(cc());
+ cc().setSystemOperationKillableByStepdown(lk);
+ }
+ }
+
+}; // namespace mongo
+
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_batch_fetcher_test.cpp b/src/mongo/db/s/migration_batch_fetcher_test.cpp
new file mode 100644
index 00000000000..f86368b321b
--- /dev/null
+++ b/src/mongo/db/s/migration_batch_fetcher_test.cpp
@@ -0,0 +1,269 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/s/migration_batch_fetcher.h"
+#include "mongo/db/s/migration_session_id.h"
+#include "mongo/db/s/shard_server_test_fixture.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/dbtests/mock/mock_replica_set.h"
+#include "mongo/executor/cancelable_executor.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_mock.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/idl/server_parameter_test_util.h"
+#include "mongo/logv2/log.h"
+#include "mongo/platform/basic.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/stdx/future.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/uuid.h"
+
+namespace mongo {
+namespace {
+
+using unittest::assertGet;
+
+const ConnectionString kDonorConnStr =
+ ConnectionString::forReplicaSet("Donor",
+ {HostAndPort("DonorHost1:1234"),
+ HostAndPort{"DonorHost2:1234"},
+ HostAndPort{"DonorHost3:1234"}});
+const ConnectionString kRecipientConnStr =
+ ConnectionString::forReplicaSet("Recipient",
+ {HostAndPort("RecipientHost1:1234"),
+ HostAndPort("RecipientHost2:1234"),
+ HostAndPort("RecipientHost3:1234")});
+
+class MigrationBatchFetcherTestFixture : public ShardServerTestFixture {
+
+protected:
+ /**
+ * Sets up the task executor as well as a TopologyListenerMock for each unit test.
+ */
+ void setUp() override {
+ ShardServerTestFixture::setUp();
+
+ {
+ auto donorShard = assertGet(
+ shardRegistry()->getShard(operationContext(), kDonorConnStr.getSetName()));
+ RemoteCommandTargeterMock::get(donorShard->getTargeter())
+ ->setConnectionStringReturnValue(kDonorConnStr);
+ RemoteCommandTargeterMock::get(donorShard->getTargeter())
+ ->setFindHostReturnValue(kDonorConnStr.getServers()[0]);
+ }
+ }
+
+ void tearDown() override {
+ ShardServerTestFixture::tearDown();
+ }
+
+ /**
+ * Instantiates a BSON object in which both "_id" and "X" are set to value.
+ */
+ static BSONObj createDocument(int value) {
+ return BSON("_id" << value << "X" << value);
+ }
+ static BSONObj createEmpty() {
+ return BSONObj{};
+ }
+ /**
+ * Creates a list of documents to clone.
+ */
+ static std::vector<BSONObj> createDocumentsToClone() {
+ return {createDocument(1), createDocument(2), createDocument(3)};
+ }
+
+ /**
+ * Creates a list of documents to clone and converts it to a BSONArray.
+ */
+ static BSONArray createDocumentsToCloneArray() {
+ BSONArrayBuilder arrayBuilder;
+ for (auto& doc : createDocumentsToClone()) {
+ arrayBuilder.append(doc);
+ }
+ return arrayBuilder.arr();
+ }
+ static BSONArray createEmptyCloneArray() {
+ return BSONArrayBuilder().arr();
+ }
+
+ static BSONObj getTerminalBsonObj() {
+ return BSON("Status"
+ << "OK"
+ << "ok" << 1 << "objects" << createEmptyCloneArray());
+ }
+
+ static BSONObj getBatchBsonObj() {
+ return BSON("Status"
+ << "OK"
+ << "ok" << 1 << "objects" << createDocumentsToCloneArray());
+ }
+
+private:
+ OperationContext* _opCtx;
+ ServiceContext* _svcCtx;
+ executor::NetworkInterfaceMock* _net;
+
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient() override {
+ class StaticCatalogClient final : public ShardingCatalogClientMock {
+ public:
+ StaticCatalogClient() = default;
+
+ StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
+ OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
+
+ ShardType donorShard;
+ donorShard.setName(kDonorConnStr.getSetName());
+ donorShard.setHost(kDonorConnStr.toString());
+
+ ShardType recipientShard;
+ recipientShard.setName(kRecipientConnStr.getSetName());
+ recipientShard.setHost(kRecipientConnStr.toString());
+
+ return repl::OpTimeWith<std::vector<ShardType>>({donorShard, recipientShard});
+ }
+ };
+
+ return std::make_unique<StaticCatalogClient>();
+ }
+};
+
+auto getOnMigrateCloneCommandCb(BSONObj ret) {
+ return [ret](const executor::RemoteCommandRequest& request) -> StatusWith<BSONObj> {
+ ASSERT_EQ(request.cmdObj.getField("_migrateClone").String(), "test.foo");
+ return ret;
+ };
+}
+
+TEST_F(MigrationBatchFetcherTestFixture, BasicEmptyFetchingTest) {
+ NamespaceString nss{"test", "foo"};
+ ShardId fromShard{"Donor"};
+ auto msid = MigrationSessionId::generate(fromShard, "Recipient");
+ auto outerOpCtx = operationContext();
+ auto newClient = outerOpCtx->getServiceContext()->makeClient("MigrationCoordinator");
+
+ int concurrency = 30;
+ RAIIServerParameterControllerForTest featureFlagController(
+ "featureFlagConcurrencyInChunkMigration", true);
+ RAIIServerParameterControllerForTest setMigrationConcurrencyParam{"migrationConcurrency",
+ concurrency};
+
+ AlternativeClientRegion acr(newClient);
+ auto executor =
+ Grid::get(outerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
+ auto newOpCtxPtr = CancelableOperationContext(
+ cc().makeOperationContext(), outerOpCtx->getCancellationToken(), executor);
+ auto opCtx = newOpCtxPtr.get();
+
+ auto fetcher = std::make_unique<MigrationBatchFetcher<MigrationBatchMockInserter>>(
+ outerOpCtx,
+ opCtx,
+ nss,
+ msid,
+ WriteConcernOptions::parse(WriteConcernOptions::Majority).getValue(),
+ fromShard,
+ ChunkRange{BSON("x" << 1), BSON("x" << 2)},
+ UUID::gen(),
+ UUID::gen(),
+ nullptr,
+ true);
+
+ // Start asynchronous task for responding to _migrateClone requests.
+ // Must name the return of value std::async. The destructor of std::future joins the
+ // asynchrounous task. (If it were left unnamed, the destructor would run inline, and the test
+ // would hang forever.)
+ auto fut = stdx::async(stdx::launch::async, [&]() {
+ // One terminal response for each thread
+ for (int i = 0; i < concurrency; ++i) {
+ onCommand(getOnMigrateCloneCommandCb(getTerminalBsonObj()));
+ }
+ });
+ fetcher->fetchAndScheduleInsertion();
+}
+
+TEST_F(MigrationBatchFetcherTestFixture, BasicFetching) {
+ NamespaceString nss{"test", "foo"};
+ ShardId fromShard{"Donor"};
+ auto msid = MigrationSessionId::generate(fromShard, "Recipient");
+
+ auto outerOpCtx = operationContext();
+ auto newClient = outerOpCtx->getServiceContext()->makeClient("MigrationCoordinator");
+ AlternativeClientRegion acr(newClient);
+
+ auto executor =
+ Grid::get(outerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
+ auto newOpCtxPtr = CancelableOperationContext(
+ cc().makeOperationContext(), outerOpCtx->getCancellationToken(), executor);
+ auto opCtx = newOpCtxPtr.get();
+
+
+ int concurrency = 30;
+ RAIIServerParameterControllerForTest featureFlagController(
+ "featureFlagConcurrencyInChunkMigration", true);
+ RAIIServerParameterControllerForTest setMigrationConcurrencyParam{"migrationConcurrency",
+ concurrency};
+
+ auto fetcher = std::make_unique<MigrationBatchFetcher<MigrationBatchMockInserter>>(
+ outerOpCtx,
+ opCtx,
+ nss,
+ msid,
+ WriteConcernOptions::parse(WriteConcernOptions::Majority).getValue(),
+ fromShard,
+ ChunkRange{BSON("x" << 1), BSON("x" << 2)},
+ UUID::gen(),
+ UUID::gen(),
+ nullptr,
+ true);
+
+ auto fut = stdx::async(stdx::launch::async, [&]() {
+ for (int i = 0; i < 8; ++i) {
+ onCommand(getOnMigrateCloneCommandCb(getBatchBsonObj()));
+ }
+ // One terminal response for each thread
+ for (int i = 0; i < concurrency; ++i) {
+ onCommand(getOnMigrateCloneCommandCb(getTerminalBsonObj()));
+ }
+ });
+ fetcher->fetchAndScheduleInsertion();
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_batch_inserter.cpp b/src/mongo/db/s/migration_batch_inserter.cpp
new file mode 100644
index 00000000000..37f55947745
--- /dev/null
+++ b/src/mongo/db/s/migration_batch_inserter.cpp
@@ -0,0 +1,200 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration
+
+#include "mongo/db/s/migration_batch_inserter.h"
+
+#include "mongo/db/s/migration_util.h"
+#include "mongo/db/transaction_participant.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+
+namespace {
+
+void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) {
+ MongoDOperationContextSession::checkOut(opCtx);
+ TransactionParticipant::get(opCtx).beginOrContinue(opCtx,
+ {*opCtx->getTxnNumber()},
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */);
+}
+
+template <typename Callable>
+constexpr bool returnsVoid() {
+ return std::is_void_v<std::invoke_result_t<Callable>>;
+}
+
+// Yields the checked out session before running the given function. If the function runs without
+// throwing, will reacquire the session and verify it is still valid to proceed with the migration.
+template <typename Callable, std::enable_if_t<!returnsVoid<Callable>(), int> = 0>
+auto runWithoutSession(OperationContext* opCtx, Callable callable) {
+ MongoDOperationContextSession::checkIn(opCtx, OperationContextSession::CheckInReason::kYield);
+
+ auto retVal = callable();
+
+ // The below code can throw, so it cannot run in a scope guard.
+ opCtx->checkForInterrupt();
+ checkOutSessionAndVerifyTxnState(opCtx);
+
+ return retVal;
+}
+
+// Same as runWithoutSession above but takes a void function.
+template <typename Callable, std::enable_if_t<returnsVoid<Callable>(), int> = 0>
+void runWithoutSession(OperationContext* opCtx, Callable callable) {
+ MongoDOperationContextSession::checkIn(opCtx, OperationContextSession::CheckInReason::kYield);
+
+ callable();
+
+ // The below code can throw, so it cannot run in a scope guard.
+ opCtx->checkForInterrupt();
+ checkOutSessionAndVerifyTxnState(opCtx);
+}
+} // namespace
+
+
+void MigrationBatchInserter::onCreateThread(const std::string& threadName) {
+ Client::initThread(threadName, getGlobalServiceContext(), nullptr);
+ {
+ stdx::lock_guard<Client> lk(cc());
+ cc().setSystemOperationKillableByStepdown(lk);
+ }
+}
+
+void MigrationBatchInserter::run(Status status) const try {
+ // Run is passed in a non-ok status if this function runs inline.
+ // That happens if we schedule this task on a ThreadPool that is
+ // already shutdown. If we were to schedule a task on a shutdown ThreadPool,
+ // then there is a logic error in our code. Therefore, we assert that here.
+
+ invariant(status.isOK());
+ auto arr = _batch["objects"].Obj();
+ if (arr.isEmpty())
+ return;
+
+ auto executor =
+ Grid::get(_innerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
+
+ auto applicationOpCtx = CancelableOperationContext(
+ cc().makeOperationContext(), _innerOpCtx->getCancellationToken(), executor);
+
+ auto opCtx = applicationOpCtx.get();
+
+ auto assertNotAborted = [&]() {
+ {
+ stdx::lock_guard<Client> lk(*_outerOpCtx->getClient());
+ _outerOpCtx->checkForInterrupt();
+ }
+ opCtx->checkForInterrupt();
+ };
+
+ auto it = arr.begin();
+ while (it != arr.end()) {
+ int batchNumCloned = 0;
+ int batchClonedBytes = 0;
+ const int batchMaxCloned = migrateCloneInsertionBatchSize.load();
+
+ assertNotAborted();
+
+ write_ops::InsertCommandRequest insertOp(_nss);
+ insertOp.getWriteCommandRequestBase().setOrdered(true);
+ insertOp.setDocuments([&] {
+ std::vector<BSONObj> toInsert;
+ while (it != arr.end() && (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) {
+ const auto& doc = *it;
+ BSONObj docToClone = doc.Obj();
+ toInsert.push_back(docToClone);
+ batchNumCloned++;
+ batchClonedBytes += docToClone.objsize();
+ ++it;
+ }
+ return toInsert;
+ }());
+
+ {
+ // Disable the schema validation (during document inserts and updates)
+ // and any internal validation for opCtx for performInserts()
+ DisableDocumentValidation documentValidationDisabler(
+ opCtx,
+ DocumentValidationSettings::kDisableSchemaValidation |
+ DocumentValidationSettings::kDisableInternalValidation);
+ const auto reply =
+ write_ops_exec::performInserts(opCtx, insertOp, OperationSource::kFromMigrate);
+ for (unsigned long i = 0; i < reply.results.size(); ++i) {
+ uassertStatusOKWithContext(
+ reply.results[i],
+ str::stream() << "Insert of " << insertOp.getDocuments()[i] << " failed.");
+ }
+ // Revert to the original DocumentValidationSettings for opCtx
+ }
+
+ migrationutil::persistUpdatedNumOrphans(
+ opCtx, _migrationId, _collectionUuid, batchNumCloned);
+ _migrationProgress->updateMaxOptime(
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp());
+
+ ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(batchNumCloned);
+ LOGV2(6718408,
+ "Incrementing numCloned count by {batchNumCloned} and numClonedBytes by "
+ "{batchClonedBytes}",
+ "batchNumCloned"_attr = batchNumCloned,
+ "batchClonedBytes"_attr = batchClonedBytes);
+ _migrationProgress->incNumCloned(batchNumCloned);
+ _migrationProgress->incNumBytes(batchClonedBytes);
+
+ if (_writeConcern.needToWaitForOtherNodes() && _threadCount == 1) {
+ runWithoutSession(_outerOpCtx, [&] {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ _writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ LOGV2_WARNING(22011,
+ "secondaryThrottle on, but doc insert timed out; continuing",
+ "migrationId"_attr = _migrationId.toBSON());
+ } else {
+ uassertStatusOK(replStatus.status);
+ }
+ });
+ }
+
+ sleepmillis(migrateCloneInsertionBatchDelayMS.load());
+ }
+} catch (const DBException& e) {
+ stdx::lock_guard<Client> lk(*_innerOpCtx->getClient());
+ _innerOpCtx->getServiceContext()->killOperation(lk, _innerOpCtx, ErrorCodes::Error(6718402));
+ LOGV2(6718407,
+ "Batch application failed: {error}",
+ "Batch application failed",
+ "error"_attr = e.toStatus());
+}
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_batch_inserter.h b/src/mongo/db/s/migration_batch_inserter.h
new file mode 100644
index 00000000000..c3223921602
--- /dev/null
+++ b/src/mongo/db/s/migration_batch_inserter.h
@@ -0,0 +1,136 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "boost/optional/optional.hpp"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/cancelable_operation_context.h"
+#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/s/migration_session_id.h"
+#include "mongo/db/s/range_deletion_util.h"
+#include "mongo/db/s/sharding_runtime_d_params_gen.h"
+#include "mongo/db/s/sharding_statistics.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/grid.h"
+#include "mongo/util/uuid.h"
+
+#pragma once
+
+namespace mongo {
+
+// The purpose of this type is to allow inserters to communicate
+// their progress to the outside world.
+class MigrationCloningProgressSharedState {
+ mutable Mutex _m;
+ repl::OpTime _maxOptime;
+ long long _numCloned = 0;
+ long long _numBytes = 0;
+
+public:
+ void updateMaxOptime(const repl::OpTime& _newOptime) {
+ stdx::lock_guard lk(_m);
+ _maxOptime = std::max(_maxOptime, _newOptime);
+ }
+ repl::OpTime getMaxOptime() const {
+ stdx::lock_guard lk(_m);
+ return _maxOptime;
+ }
+ void incNumCloned(int num) {
+ stdx::lock_guard lk(_m);
+ _numCloned += num;
+ }
+ void incNumBytes(int num) {
+ stdx::lock_guard lk(_m);
+ _numBytes += num;
+ }
+ long long getNumCloned() const {
+ stdx::lock_guard lk(_m);
+ return _numCloned;
+ }
+ long long getNumBytes() const {
+ stdx::lock_guard lk(_m);
+ return _numBytes;
+ }
+};
+
+// This type contains a BSONObj _batch corresponding to a _migrateClone response.
+// The purpose of this type is to perform the insertions for this batch.
+// Those insertions happen in its "run" method. The MigrationBatchFetcher
+// schedules these jobs on a thread pool. This class has no knowledge that it runs
+// on a thread pool. It sole purpose is to perform insertions and communicate its progress
+// (inluding the new max opTime).
+class MigrationBatchInserter {
+public:
+ // Do inserts.
+ void run(Status status) const;
+
+ MigrationBatchInserter(OperationContext* outerOpCtx,
+ OperationContext* innerOpCtx,
+ BSONObj batch,
+ const NamespaceString& nss,
+ const ChunkRange& range,
+ const WriteConcernOptions& writeConcern,
+ const UUID& collectionUuid,
+ std::shared_ptr<MigrationCloningProgressSharedState> migrationProgress,
+ const UUID& migrationId,
+ int threadCount)
+ : _outerOpCtx{outerOpCtx},
+ _innerOpCtx{innerOpCtx},
+ _batch{batch},
+ _nss{nss},
+ _range{range},
+ _writeConcern{writeConcern},
+ _collectionUuid{collectionUuid},
+ _migrationProgress{migrationProgress},
+ _migrationId{migrationId},
+ _threadCount{threadCount} {}
+
+ static void onCreateThread(const std::string& threadName);
+
+private:
+ OperationContext* _outerOpCtx;
+ OperationContext* _innerOpCtx;
+ BSONObj _batch;
+ NamespaceString _nss;
+ ChunkRange _range;
+ WriteConcernOptions _writeConcern;
+ UUID _collectionUuid;
+ std::shared_ptr<MigrationCloningProgressSharedState> _migrationProgress;
+ UUID _migrationId;
+ int _threadCount;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_batch_mock_inserter.h b/src/mongo/db/s/migration_batch_mock_inserter.h
new file mode 100644
index 00000000000..8b4b766480c
--- /dev/null
+++ b/src/mongo/db/s/migration_batch_mock_inserter.h
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/s/migration_batch_inserter.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/s/catalog/type_chunk.h"
+
+#pragma once
+
+namespace mongo {
+
+class MigrationBatchMockInserter {
+public:
+ void run(Status status) const {
+ // Run is passed in a non-ok status if this function runs inline.
+ // That happens if we schedule this task on a ThreadPool that is
+ // already shutdown. We should never do that. Therefore,
+ // we assert that here.
+ invariant(status.isOK());
+ }
+ MigrationBatchMockInserter(OperationContext*,
+ OperationContext*,
+ BSONObj,
+ NamespaceString,
+ ChunkRange,
+ WriteConcernOptions,
+ UUID,
+ std::shared_ptr<MigrationCloningProgressSharedState>,
+ UUID,
+ int) {}
+
+ static void onCreateThread(const std::string& threadName) {}
+
+private:
+ BSONObj _batch;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.cpp b/src/mongo/db/s/migration_chunk_cloner_source.cpp
index 6df6ebadd6f..194e929cb70 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+
+#include "mongo/bson/bsonobj.h"
#include "mongo/platform/basic.h"
#include "mongo/db/s/migration_chunk_cloner_source.h"
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 0aed75b5303..a5079a4041d 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
#include "mongo/base/status.h"
+#include "mongo/bson/bsonobj.h"
#include "mongo/client/read_preference.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog_raii.h"
@@ -711,6 +712,61 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon
_jumboChunkCloneState->clonerExec->detachFromOperationContext();
}
+boost::optional<Snapshotted<BSONObj>> MigrationChunkClonerSourceLegacy::_getNextDoc(
+ OperationContext* opCtx, const CollectionPtr& collection) {
+ while (true) {
+ stdx::unique_lock lk(_mutex);
+ invariant(_inProgressReads >= 0);
+ RecordId nextRecordId;
+ Snapshotted<BSONObj> doc;
+
+ _moreDocsCV.wait(lk, [&]() {
+ return _cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty() ||
+ _inProgressReads == 0;
+ });
+
+ // One of the following must now be true (corresponding to the three if conditions):
+ // 1. There is a document in the overflow set
+ // 2. The iterator has not reached the end of the record id set
+ // 3. The overflow set is empty, the iterator is at the end, and
+ // no threads are holding a document. This condition indicates
+ // that there are no more docs to return for the cloning phase.
+ if (!_overflowDocs.empty()) {
+ doc = std::move(_overflowDocs.front());
+ _overflowDocs.pop_front();
+ ++_inProgressReads;
+ return doc;
+ } else if (_cloneRecordIdsIter != _cloneLocs.end()) {
+ nextRecordId = *_cloneRecordIdsIter;
+ ++_cloneRecordIdsIter;
+ ++_inProgressReads;
+ } else {
+ invariant(_numRecordsCloned + _numRecordsPassedOver == _cloneLocs.size());
+ return boost::none;
+ }
+
+ // In order to saturate the disk, the I/O operation must occur without
+ // holding the mutex.
+ lk.unlock();
+ if (collection->findDoc(opCtx, nextRecordId, &doc))
+ return doc;
+ lk.lock();
+ ++_numRecordsPassedOver;
+
+ // It is possible that this document is no longer in the collection,
+ // in which case, we try again and indicate to other threads that this
+ // thread is not holding a document.
+ --_inProgressReads;
+ _moreDocsCV.notify_one();
+ }
+}
+
+void MigrationChunkClonerSourceLegacy::_insertOverflowDoc(Snapshotted<BSONObj> doc) {
+ stdx::lock_guard lk(_mutex);
+ invariant(_inProgressReads >= 1);
+ _overflowDocs.push_back(std::move(doc));
+}
+
void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx,
const CollectionPtr& collection,
BSONArrayBuilder* arrBuilder) {
@@ -718,49 +774,56 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
internalQueryExecYieldIterations.load(),
Milliseconds(internalQueryExecYieldPeriodMS.load()));
- stdx::unique_lock<Latch> lk(_mutex);
- auto iter = _cloneLocs.begin();
+ while (auto doc = _getNextDoc(opCtx, collection)) {
+ ON_BLOCK_EXIT([&]() {
+ stdx::lock_guard lk(_mutex);
+ invariant(_inProgressReads > 0);
+ --_inProgressReads;
+ _moreDocsCV.notify_one();
+ });
- for (; iter != _cloneLocs.end(); ++iter) {
// We must always make progress in this method by at least one document because empty
// return indicates there is no more initial clone data.
if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) {
+ _insertOverflowDoc(std::move(*doc));
break;
}
- auto nextRecordId = *iter;
-
- lk.unlock();
- ON_BLOCK_EXIT([&lk] { lk.lock(); });
-
- Snapshotted<BSONObj> doc;
- if (collection->findDoc(opCtx, nextRecordId, &doc)) {
- // Do not send documents that are no longer in the chunk range being moved. This can
- // happen when document shard key value of the document changed after the initial
- // index scan during cloning. This is needed because the destination is very
- // conservative in processing xferMod deletes and won't delete docs that are not in
- // the range of the chunk being migrated.
- if (!isDocInRange(doc.value(),
- _args.getMin().value(),
- _args.getMax().value(),
- _shardKeyPattern)) {
- continue;
+ // Do not send documents that are no longer in the chunk range being moved. This can
+ // happen when document shard key value of the document changed after the initial
+ // index scan during cloning. This is needed because the destination is very
+ // conservative in processing xferMod deletes and won't delete docs that are not in
+ // the range of the chunk being migrated.
+ if (!isDocInRange(
+ doc->value(), _args.getMin().value(), _args.getMax().value(), _shardKeyPattern)) {
+ {
+ stdx::lock_guard lk(_mutex);
+ _numRecordsPassedOver++;
}
+ continue;
+ }
- // Use the builder size instead of accumulating the document sizes directly so
- // that we take into consideration the overhead of BSONArray indices.
- if (arrBuilder->arrSize() &&
- (arrBuilder->len() + doc.value().objsize() + 1024) > BSONObjMaxUserSize) {
-
- break;
- }
+ // Use the builder size instead of accumulating the document sizes directly so
+ // that we take into consideration the overhead of BSONArray indices.
+ if (arrBuilder->arrSize() &&
+ (arrBuilder->len() + doc->value().objsize() + 1024) > BSONObjMaxUserSize) {
+ _insertOverflowDoc(std::move(*doc));
+ break;
+ }
- arrBuilder->append(doc.value());
- ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1);
+ {
+ stdx::lock_guard lk(_mutex);
+ _numRecordsCloned++;
}
+ arrBuilder->append(doc->value());
+ ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1);
}
- _cloneLocs.erase(_cloneLocs.begin(), iter);
+ // When we reach here, there are no more documents to return to the destination.
+ // We therefore need to notify a other threads that maybe sleeping on the condition
+ // variable that we are done.
+ stdx::lock_guard lk(_mutex);
+ _moreDocsCV.notify_one();
}
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
@@ -803,7 +866,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
{
// All clone data must have been drained before starting to fetch the incremental changes.
stdx::unique_lock<Latch> lk(_mutex);
- invariant(_cloneLocs.empty());
+ invariant(_cloneRecordIdsIter == _cloneLocs.end());
// The "snapshot" for delete and update list must be taken under a single lock. This is to
// ensure that we will preserve the causal order of writes. Always consume the delete
@@ -1002,6 +1065,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
}
}
}
+ _cloneRecordIdsIter = _cloneLocs.begin();
} catch (DBException& exception) {
exception.addContext("Executor error while scanning for documents belonging to chunk");
throw;
@@ -1099,7 +1163,6 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
stdx::lock_guard<Latch> sl(_mutex);
- const std::size_t cloneLocsRemaining = _cloneLocs.size();
int64_t untransferredModsSizeBytes = _untransferredDeletesCounter * _averageObjectIdSize +
_untransferredUpsertsCounter * _averageObjectSizeForCloneLocs;
@@ -1121,13 +1184,14 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
"moveChunk data transfer progress",
"response"_attr = redact(res),
"memoryUsedBytes"_attr = _memoryUsed,
- "docsRemainingToClone"_attr = cloneLocsRemaining,
+ "docsRemainingToClone"_attr =
+ _cloneLocs.size() - _numRecordsCloned - _numRecordsPassedOver,
"untransferredModsSizeBytes"_attr = untransferredModsSizeBytes);
}
if (res["state"].String() == "steady" && sessionCatalogSourceInCatchupPhase &&
estimateUntransferredSessionsSize == 0) {
- if (cloneLocsRemaining != 0 ||
+ if ((_cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty()) ||
(_jumboChunkCloneState && _forceJumbo &&
PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) {
return {ErrorCodes::OperationIncomplete,
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 8c15fa7a0cb..081ffc799ba 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -29,8 +29,10 @@
#pragma once
+#include <deque>
#include <list>
#include <memory>
+#include <mutex>
#include <set>
#include "mongo/bson/bsonobj.h"
@@ -261,6 +263,15 @@ private:
Status _storeCurrentLocs(OperationContext* opCtx);
/**
+ * Returns boost::none if there are no more documents to get.
+ * Increments _inProgressReads if and only if return value is not none.
+ */
+ boost::optional<Snapshotted<BSONObj>> _getNextDoc(OperationContext* opCtx,
+ const CollectionPtr& collection);
+
+ void _insertOverflowDoc(Snapshotted<BSONObj> doc);
+
+ /**
* Adds the OpTime to the list of OpTimes for oplog entries that we should consider migrating as
* part of session migration.
*/
@@ -351,6 +362,47 @@ private:
// List of record ids that needs to be transferred (initial clone)
std::set<RecordId> _cloneLocs;
+ // This iterator is a pointer into the _cloneLocs set. It allows concurrent access to
+ // the _cloneLocs set by allowing threads servicing _migrateClone requests to do the
+ // following:
+ // 1. Acquire mutex "_mutex" above.
+ // 2. Copy *_cloneRecordIdsIter into its local stack frame.
+ // 3. Increment _cloneRecordIdsIter
+ // 4. Unlock "_mutex."
+ // 5. Do the I/O to fetch the document corresponding to this record Id.
+ //
+ // The purpose of this algorithm, is to allow different threads to concurrently start I/O jobs
+ // in order to more fully saturate the disk.
+ //
+ // One issue with this algorithm, is that only 16MB worth of documents can be returned in
+ // response to a _migrateClone request. But, the thread does not know the size of a document
+ // until it does the I/O. At which point, if the document does not fit in the response to
+ // _migrateClone request the document must be made available to a different thread servicing a
+ // _migrateClone request. To solve this problem, the thread adds the document
+ // to the below _overflowDocs deque.
+ std::set<RecordId>::iterator _cloneRecordIdsIter;
+
+ // This deque stores all documents that must be sent to the destination, but could not fit
+ // in the response to a particular _migrateClone request.
+ std::deque<Snapshotted<BSONObj>> _overflowDocs;
+
+ // This integer represents how many documents are being "held" by threads servicing
+ // _migrateClone requests. Any document that is "held" by a thread may be added to the
+ // _overflowDocs deque if it doesn't fit in the response to a _migrateClone request.
+ // This integer is necessary because it gives us a condition on when all documents to be sent
+ // to the destination have been exhausted.
+ //
+ // If (_cloneRecordIdsIter == _cloneLocs.end() && _overflowDocs.empty() && _inProgressReads
+ // == 0) then all documents have been returned to the destination.
+ decltype(_cloneLocs.size()) _inProgressReads = 0;
+
+ // This condition variable allows us to wait on the following condition:
+ // Either we're done and the above condition is satisfied, or there is some document to
+ // return.
+ stdx::condition_variable _moreDocsCV;
+ decltype(_cloneLocs.size()) _numRecordsCloned{0};
+ decltype(_cloneLocs.size()) _numRecordsPassedOver{0};
+
// The estimated average object size during the clone phase. Used for buffer size
// pre-allocation (initial clone).
uint64_t _averageObjectSizeForCloneLocs{0};
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 721fab26d3d..4cbcf6cb260 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -29,6 +29,7 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration
+#include "mongo/db/s/migration_batch_fetcher.h"
#include "mongo/platform/basic.h"
#include "mongo/db/s/migration_destination_manager.h"
@@ -411,8 +412,8 @@ void MigrationDestinationManager::report(BSONObjBuilder& b,
}
BSONObjBuilder bb(b.subobjStart("counts"));
- bb.append("cloned", _numCloned);
- bb.append("clonedBytes", _clonedBytes);
+ bb.append("cloned", _getNumCloned());
+ bb.append("clonedBytes", _getNumBytesCloned());
bb.append("catchup", _numCatchup);
bb.append("steady", _numSteady);
bb.done();
@@ -446,6 +447,8 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
_lsid = cloneRequest.getLsid();
_txnNumber = cloneRequest.getTxnNumber();
+ _parallelFetchersSupported = cloneRequest.parallelFetchingSupported();
+
_nss = nss;
_fromShard = cloneRequest.getFromShardId();
_fromShardConnString =
@@ -462,8 +465,8 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
_chunkMarkedPending = false;
- _numCloned = 0;
- _clonedBytes = 0;
+ _migrationCloningProgress = std::make_shared<MigrationCloningProgressSharedState>();
+
_numCatchup = 0;
_numSteady = 0;
@@ -1338,122 +1341,32 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
_sessionMigration->start(opCtx->getServiceContext());
- const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId);
-
_chunkMarkedPending = true; // no lock needed, only the migrate thread looks.
- auto assertNotAborted = [&](OperationContext* opCtx) {
- opCtx->checkForInterrupt();
- outerOpCtx->checkForInterrupt();
- uassert(50748, "Migration aborted while copying documents", getState() != kAbort);
- };
-
- auto insertBatchFn = [&](OperationContext* opCtx, BSONObj nextBatch) {
- auto arr = nextBatch["objects"].Obj();
- if (arr.isEmpty()) {
- return false;
- }
- auto it = arr.begin();
- while (it != arr.end()) {
- int batchNumCloned = 0;
- int batchClonedBytes = 0;
- const int batchMaxCloned = migrateCloneInsertionBatchSize.load();
-
- assertNotAborted(opCtx);
-
- write_ops::InsertCommandRequest insertOp(_nss);
- insertOp.getWriteCommandRequestBase().setOrdered(true);
- insertOp.setDocuments([&] {
- std::vector<BSONObj> toInsert;
- while (it != arr.end() &&
- (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) {
- const auto& doc = *it;
- BSONObj docToClone = doc.Obj();
- toInsert.push_back(docToClone);
- batchNumCloned++;
- batchClonedBytes += docToClone.objsize();
- ++it;
- }
- return toInsert;
- }());
-
- {
- // Disable the schema validation (during document inserts and updates)
- // and any internal validation for opCtx for performInserts()
- DisableDocumentValidation documentValidationDisabler(
- opCtx,
- DocumentValidationSettings::kDisableSchemaValidation |
- DocumentValidationSettings::kDisableInternalValidation);
- const auto reply = write_ops_exec::performInserts(
- opCtx, insertOp, OperationSource::kFromMigrate);
- for (unsigned long i = 0; i < reply.results.size(); ++i) {
- uassertStatusOKWithContext(reply.results[i],
- str::stream() << "Insert of "
- << insertOp.getDocuments()[i]
- << " failed.");
- }
- // Revert to the original DocumentValidationSettings for opCtx
- }
-
- migrationutil::persistUpdatedNumOrphans(
- opCtx, _migrationId.get(), *_collectionUuid, batchNumCloned);
-
- {
- stdx::lock_guard<Latch> statsLock(_mutex);
- _numCloned += batchNumCloned;
- ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(
- batchNumCloned);
- _clonedBytes += batchClonedBytes;
- }
- if (_writeConcern.needToWaitForOtherNodes()) {
- runWithoutSession(outerOpCtx, [&] {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- _writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- LOGV2_WARNING(
- 22011,
- "secondaryThrottle on, but doc insert timed out; continuing",
- "migrationId"_attr = _migrationId->toBSON());
- } else {
- uassertStatusOK(replStatus.status);
- }
- });
- }
-
- sleepmillis(migrateCloneInsertionBatchDelayMS.load());
- }
- return true;
- };
-
- auto fetchBatchFn = [&](OperationContext* opCtx, BSONObj* nextBatch) {
- auto commandResponse = uassertStatusOKWithContext(
- fromShard->runCommand(opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- "admin",
- migrateCloneRequest,
- Shard::RetryPolicy::kNoRetry),
- "_migrateClone failed: ");
-
- uassertStatusOKWithContext(
- Shard::CommandResponse::getEffectiveStatus(commandResponse),
- "_migrateClone failed: ");
-
- *nextBatch = commandResponse.response;
- return nextBatch->getField("objects").Obj().isEmpty();
- };
-
- // If running on a replicated system, we'll need to flush the docs we cloned to the
- // secondaries
- lastOpApplied = fetchAndApplyBatch(opCtx, insertBatchFn, fetchBatchFn);
+ {
+ // Destructor of MigrationBatchFetcher is non-trivial. Therefore,
+ // this scope has semantic significance.
+ MigrationBatchFetcher<MigrationBatchInserter> fetcher{outerOpCtx,
+ opCtx,
+ _nss,
+ *_sessionId,
+ _writeConcern,
+ _fromShard,
+ range,
+ *_migrationId,
+ *_collectionUuid,
+ _migrationCloningProgress,
+ _parallelFetchersSupported};
+ fetcher.fetchAndScheduleInsertion();
+ }
+ opCtx->checkForInterrupt();
+ lastOpApplied = _migrationCloningProgress->getMaxOptime();
timing->done(4);
migrateThreadHangAtStep4.pauseWhileSet();
if (MONGO_unlikely(failMigrationOnRecipient.shouldFail())) {
- _setStateFail(str::stream() << "failing migration after cloning " << _numCloned
+ _setStateFail(str::stream() << "failing migration after cloning " << _getNumCloned()
<< " docs due to failMigrationOnRecipient failpoint");
return;
}
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 925296091cb..74d737f8302 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -29,6 +29,7 @@
#pragma once
+#include <memory>
#include <string>
#include "mongo/base/string_data.h"
@@ -39,6 +40,8 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/replica_set_aware_service.h"
#include "mongo/db/s/active_migrations_registry.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/migration_batch_fetcher.h"
#include "mongo/db/s/migration_recipient_recovery_document_gen.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/db/s/session_catalog_migration_destination.h"
@@ -178,6 +181,11 @@ public:
const boost::optional<ChunkManager>& cm,
boost::optional<Timestamp> afterClusterTime);
+
+ bool isParallelFetchingSupported() {
+ return _parallelFetchersSupported;
+ }
+
/**
* Gets the collection uuid and options from fromShardId. If given a chunk manager, will fetch
* the collection options using the database version protocol.
@@ -283,8 +291,22 @@ private:
stdx::thread _migrateThreadHandle;
+ long long _getNumCloned() {
+ return _migrationCloningProgress->getNumCloned();
+ }
+
+ long long _getNumBytesCloned() {
+ return _migrationCloningProgress->getNumBytes();
+ }
+
boost::optional<UUID> _migrationId;
boost::optional<UUID> _collectionUuid;
+
+ // State that is shared among all inserter threads.
+ std::shared_ptr<MigrationCloningProgressSharedState> _migrationCloningProgress;
+
+ bool _parallelFetchersSupported;
+
LogicalSessionId _lsid;
TxnNumber _txnNumber{kUninitializedTxnNumber};
NamespaceString _nss;
@@ -304,8 +326,6 @@ private:
// failure we can perform the appropriate cleanup.
bool _chunkMarkedPending{false};
- long long _numCloned{0};
- long long _clonedBytes{0};
long long _numCatchup{0};
long long _numSteady{0};
diff --git a/src/mongo/db/s/start_chunk_clone_request.cpp b/src/mongo/db/s/start_chunk_clone_request.cpp
index 86567dce161..6838aedaffc 100644
--- a/src/mongo/db/s/start_chunk_clone_request.cpp
+++ b/src/mongo/db/s/start_chunk_clone_request.cpp
@@ -50,6 +50,7 @@ const char kToShardId[] = "toShardName";
const char kChunkMinKey[] = "min";
const char kChunkMaxKey[] = "max";
const char kShardKeyPattern[] = "shardKeyPattern";
+const char kParallelMigration[] = "parallelMigrateCloneSupported";
} // namespace
@@ -152,6 +153,14 @@ StatusWith<StartChunkCloneRequest> StartChunkCloneRequest::createFromCommand(Nam
}
}
+ {
+ Status status = bsonExtractBooleanFieldWithDefault(
+ obj, kParallelMigration, false, &request._parallelFetchingSupported);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
request._migrationId = UUID::parse(obj);
request._lsid =
LogicalSessionId::parse(IDLParserErrorContext("StartChunkCloneRequest"), obj[kLsid].Obj());
@@ -179,6 +188,7 @@ void StartChunkCloneRequest::appendAsCommand(
invariant(fromShardConnectionString.isValid());
builder->append(kRecvChunkStart, nss.ns());
+ builder->append(kParallelMigration, true);
migrationId.appendToBuilder(builder, kMigrationId);
builder->append(kLsid, lsid.toBSON());
diff --git a/src/mongo/db/s/start_chunk_clone_request.h b/src/mongo/db/s/start_chunk_clone_request.h
index c6ecba1f839..8f433afd2f7 100644
--- a/src/mongo/db/s/start_chunk_clone_request.h
+++ b/src/mongo/db/s/start_chunk_clone_request.h
@@ -93,6 +93,10 @@ public:
return _migrationId.is_initialized();
}
+ bool parallelFetchingSupported() const {
+ return _parallelFetchingSupported;
+ }
+
const UUID& getMigrationId() const {
invariant(_migrationId);
return *_migrationId;
@@ -161,6 +165,8 @@ private:
// The parsed secondary throttle options
MigrationSecondaryThrottleOptions _secondaryThrottle;
+
+ bool _parallelFetchingSupported;
};
} // namespace mongo