summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js2
-rw-r--r--jstests/sharding/move_chunk_concurrent_cloning.js160
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/migration_batch_fetcher.cpp230
-rw-r--r--src/mongo/db/s/migration_batch_fetcher.h168
-rw-r--r--src/mongo/db/s/migration_batch_fetcher_test.cpp269
-rw-r--r--src/mongo/db/s/migration_batch_inserter.cpp200
-rw-r--r--src/mongo/db/s/migration_batch_inserter.h136
-rw-r--r--src/mongo/db/s/migration_batch_mock_inserter.h67
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.cpp2
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp132
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h52
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp139
-rw-r--r--src/mongo/db/s/migration_destination_manager.h24
-rw-r--r--src/mongo/db/s/start_chunk_clone_request.cpp10
-rw-r--r--src/mongo/db/s/start_chunk_clone_request.h6
16 files changed, 225 insertions, 1375 deletions
diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js b/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js
index ff7a8364d86..1dcf4f203b9 100644
--- a/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js
+++ b/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js
@@ -40,7 +40,7 @@ var $config = extendWorkload($config, function($config, $super) {
// So, if the range deleter has not yet cleaned up that document when the chunk is
// moved back to the original shard, the moveChunk may fail as a result of a duplicate
// key error on the recipient.
- err.message.includes("Location51008") || err.message.includes("Location6718402"));
+ err.message.includes("Location51008"));
};
$config.data.runningWithStepdowns =
diff --git a/jstests/sharding/move_chunk_concurrent_cloning.js b/jstests/sharding/move_chunk_concurrent_cloning.js
index 3166f823351..954c2d7cd35 100644
--- a/jstests/sharding/move_chunk_concurrent_cloning.js
+++ b/jstests/sharding/move_chunk_concurrent_cloning.js
@@ -6,101 +6,91 @@
load('./jstests/libs/chunk_manipulation_util.js');
-const runParallelMoveChunk = (numThreads) => {
- // For startParallelOps to write its state
- let staticMongod = MongoRunner.runMongod({});
-
- let st = new ShardingTest({shards: 2});
- st.stopBalancer();
-
- const kThreadCount = numThreads;
- const kPadding = new Array(1024).join("x");
-
- let testDB = st.s.getDB('test');
- assert.commandWorked(testDB.adminCommand({enableSharding: 'test'}));
- st.ensurePrimaryShard('test', st.shard0.shardName);
- assert.commandWorked(testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}}));
-
- let shardKeyVal = 0;
- const kDocsInBatch = 8 * 1000;
- const kMinCollSize = 128 * 1024 * 1024;
- let approxInsertedSize = 0;
- while (approxInsertedSize < kMinCollSize) {
- var bulk = testDB.user.initializeUnorderedBulkOp();
- for (let docs = 0; docs < kDocsInBatch; docs++) {
- shardKeyVal++;
- bulk.insert({_id: shardKeyVal, x: shardKeyVal, padding: kPadding});
- }
- assert.commandWorked(bulk.execute());
-
- approxInsertedSize = approxInsertedSize + (kDocsInBatch * 1024);
+// For startParallelOps to write its state
+let staticMongod = MongoRunner.runMongod({});
+
+let st = new ShardingTest({shards: 2});
+st.stopBalancer();
+
+const kThreadCount = Math.floor(Math.random() * 31) + 1;
+const kPadding = new Array(1024).join("x");
+
+let testDB = st.s.getDB('test');
+assert.commandWorked(testDB.adminCommand({enableSharding: 'test'}));
+st.ensurePrimaryShard('test', st.shard0.shardName);
+assert.commandWorked(testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}}));
+
+let shardKeyVal = 0;
+const kDocsInBatch = 8 * 1000;
+const kMinCollSize = 128 * 1024 * 1024;
+let approxInsertedSize = 0;
+while (approxInsertedSize < kMinCollSize) {
+ var bulk = testDB.user.initializeUnorderedBulkOp();
+ for (let docs = 0; docs < kDocsInBatch; docs++) {
+ shardKeyVal++;
+ bulk.insert({_id: shardKeyVal, x: shardKeyVal, padding: kPadding});
}
+ assert.commandWorked(bulk.execute());
- const kInitialLoadFinalKey = shardKeyVal;
-
- print(`Running tests with migrationConcurrency == ${kThreadCount}`);
- st._rs.forEach((replSet) => {
- assert.commandWorked(replSet.test.getPrimary().adminCommand(
- {setParameter: 1, migrationConcurrency: kThreadCount}));
- });
-
- const configCollEntry =
- st.s.getDB('config').getCollection('collections').findOne({_id: 'test.user'});
- let chunks = st.s.getDB('config').chunks.find({uuid: configCollEntry.uuid}).toArray();
- assert.eq(1, chunks.length, tojson(chunks));
-
- let joinMoveChunk =
- moveChunkParallel(staticMongod, st.s0.host, {x: 0}, null, 'test.user', st.shard1.shardName);
-
- // Migration cloning scans by shard key order. Perform some writes against the collection on
- // both the lower and upper ends of the shard key values while migration is happening to
- // exercise xferMods logic.
- const kDeleteIndexOffset = kInitialLoadFinalKey - 3000;
- const kUpdateIndexOffset = kInitialLoadFinalKey - 5000;
- for (let x = 0; x < 1000; x++) {
- assert.commandWorked(testDB.user.remove({x: x}));
- assert.commandWorked(testDB.user.update({x: 4000 + x}, {$set: {updated: true}}));
-
- assert.commandWorked(testDB.user.remove({x: kDeleteIndexOffset + x}));
- assert.commandWorked(
- testDB.user.update({x: kUpdateIndexOffset + x}, {$set: {updated: true}}));
-
- let newShardKey = kInitialLoadFinalKey + x + 1;
- assert.commandWorked(testDB.user.insert({_id: newShardKey, x: newShardKey}));
- }
+ approxInsertedSize = approxInsertedSize + (kDocsInBatch * 1024);
+}
- joinMoveChunk();
+const kInitialLoadFinalKey = shardKeyVal;
- let shardKeyIdx = 1000; // Index starts at 1k since we deleted the first 1k docs.
- let cursor = testDB.user.find().sort({x: 1});
+print(`Running tests with migrationConcurrency == ${kThreadCount}`);
+st._rs.forEach((replSet) => {
+ assert.commandWorked(replSet.test.getPrimary().adminCommand(
+ {setParameter: 1, migrationConcurrency: kThreadCount}));
+});
- while (cursor.hasNext()) {
- let next = cursor.next();
- assert.eq(next.x, shardKeyIdx);
+const configCollEntry =
+ st.s.getDB('config').getCollection('collections').findOne({_id: 'test.user'});
+let chunks = st.s.getDB('config').chunks.find({uuid: configCollEntry.uuid}).toArray();
+assert.eq(1, chunks.length, tojson(chunks));
- if ((shardKeyIdx >= 4000 && shardKeyIdx < 5000) ||
- (shardKeyIdx >= kUpdateIndexOffset && shardKeyIdx < (kUpdateIndexOffset + 1000))) {
- assert.eq(true, next.updated, tojson(next));
- }
+let joinMoveChunk =
+ moveChunkParallel(staticMongod, st.s0.host, {x: 0}, null, 'test.user', st.shard1.shardName);
- shardKeyIdx++;
+// Migration cloning scans by shard key order. Perform some writes against the collection on both
+// the lower and upper ends of the shard key values while migration is happening to exercise
+// xferMods logic.
+const kDeleteIndexOffset = kInitialLoadFinalKey - 3000;
+const kUpdateIndexOffset = kInitialLoadFinalKey - 5000;
+for (let x = 0; x < 1000; x++) {
+ assert.commandWorked(testDB.user.remove({x: x}));
+ assert.commandWorked(testDB.user.update({x: 4000 + x}, {$set: {updated: true}}));
- if (shardKeyIdx == kDeleteIndexOffset) {
- shardKeyIdx += 1000;
- }
- }
+ assert.commandWorked(testDB.user.remove({x: kDeleteIndexOffset + x}));
+ assert.commandWorked(testDB.user.update({x: kUpdateIndexOffset + x}, {$set: {updated: true}}));
- shardKeyIdx--;
- assert.eq(shardKeyIdx, kInitialLoadFinalKey + 1000);
+ let newShardKey = kInitialLoadFinalKey + x + 1;
+ assert.commandWorked(testDB.user.insert({_id: newShardKey, x: newShardKey}));
+}
- st.stop();
- MongoRunner.stopMongod(staticMongod);
-};
+joinMoveChunk();
-// Run test 10 times with random concurrency levels.
-for (let i = 1; i <= 5; i++) {
- runParallelMoveChunk(Math.floor(Math.random() * 31) + 1);
-}
+let shardKeyIdx = 1000; // Index starts at 1k since we deleted the first 1k docs.
+let cursor = testDB.user.find().sort({x: 1});
+
+while (cursor.hasNext()) {
+ let next = cursor.next();
+ assert.eq(next.x, shardKeyIdx);
+
+ if ((shardKeyIdx >= 4000 && shardKeyIdx < 5000) ||
+ (shardKeyIdx >= kUpdateIndexOffset && shardKeyIdx < (kUpdateIndexOffset + 1000))) {
+ assert.eq(true, next.updated, tojson(next));
+ }
+
+ shardKeyIdx++;
+
+ if (shardKeyIdx == kDeleteIndexOffset) {
+ shardKeyIdx += 1000;
+ }
}
- )();
+shardKeyIdx--;
+assert.eq(shardKeyIdx, kInitialLoadFinalKey + 1000);
+
+st.stop();
+MongoRunner.stopMongod(staticMongod);
+})();
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index b581f666302..6906cd0c57d 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -68,8 +68,6 @@ env.Library(
'config_server_op_observer.cpp',
'global_index_metrics.cpp',
'metadata_manager.cpp',
- 'migration_batch_fetcher.cpp',
- 'migration_batch_inserter.cpp',
'migration_chunk_cloner_source_legacy.cpp',
'migration_chunk_cloner_source.cpp',
'migration_coordinator_document.idl',
@@ -570,7 +568,6 @@ env.CppUnitTest(
'global_index_metrics_test.cpp',
'implicit_collection_creation_test.cpp',
'metadata_manager_test.cpp',
- 'migration_batch_fetcher_test.cpp',
'migration_chunk_cloner_source_legacy_test.cpp',
'migration_destination_manager_test.cpp',
'migration_session_id_test.cpp',
diff --git a/src/mongo/db/s/migration_batch_fetcher.cpp b/src/mongo/db/s/migration_batch_fetcher.cpp
deleted file mode 100644
index 4611cd91ec7..00000000000
--- a/src/mongo/db/s/migration_batch_fetcher.cpp
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
-
-#include "mongo/db/s/migration_batch_fetcher.h"
-
-#include "mongo/logv2/log.h"
-#include "mongo/util/timer.h"
-
-namespace mongo {
-
-template <typename Inserter>
-MigrationBatchFetcher<Inserter>::MigrationBatchFetcher(
- OperationContext* outerOpCtx,
- OperationContext* innerOpCtx,
- NamespaceString nss,
- MigrationSessionId sessionId,
- const WriteConcernOptions& writeConcern,
- const ShardId& fromShardId,
- const ChunkRange& range,
- const UUID& migrationId,
- const UUID& collectionId,
- std::shared_ptr<MigrationCloningProgressSharedState> migrationProgress,
- bool parallelFetchingSupported)
- : _nss{std::move(nss)},
- _migrationConcurrency{
- mongo::feature_flags::gConcurrencyInChunkMigration.isEnabledAndIgnoreFCV()
- ? migrationConcurrency.load()
- : 1},
- _sessionId{std::move(sessionId)},
- _inserterWorkers{[&]() {
- ThreadPool::Options options;
- options.poolName = "ChunkMigrationInserters";
- options.minThreads = _migrationConcurrency;
- options.maxThreads = _migrationConcurrency;
- options.onCreateThread = Inserter::onCreateThread;
- return std::make_unique<ThreadPool>(options);
- }()},
- _migrateCloneRequest{_createMigrateCloneRequest()},
- _outerOpCtx{outerOpCtx},
- _innerOpCtx{innerOpCtx},
- _fromShard{uassertStatusOK(
- Grid::get(_outerOpCtx)->shardRegistry()->getShard(_outerOpCtx, fromShardId))},
- _migrationProgress{migrationProgress},
- _range{range},
- _collectionUuid(collectionId),
- _migrationId{migrationId},
- _writeConcern{writeConcern},
- _isParallelFetchingSupported{parallelFetchingSupported} {
- _inserterWorkers->startup();
-}
-
-template <typename Inserter>
-BSONObj MigrationBatchFetcher<Inserter>::_fetchBatch(OperationContext* opCtx) {
- auto commandResponse = uassertStatusOKWithContext(
- _fromShard->runCommand(opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- "admin",
- _migrateCloneRequest,
- Shard::RetryPolicy::kNoRetry),
- "_migrateClone failed: ");
-
- uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(commandResponse),
- "_migrateClone failed: ");
-
- return commandResponse.response;
-}
-
-template <typename Inserter>
-void MigrationBatchFetcher<Inserter>::fetchAndScheduleInsertion() {
- auto numFetchers = _isParallelFetchingSupported ? _migrationConcurrency : 1;
- auto fetchersThreadPool = [&]() {
- ThreadPool::Options options;
- options.poolName = "ChunkMigrationFetchers";
- options.minThreads = numFetchers;
- options.maxThreads = numFetchers;
- options.onCreateThread = onCreateThread;
- return std::make_unique<ThreadPool>(options);
- }();
- fetchersThreadPool->startup();
- for (int i = 0; i < numFetchers; ++i) {
- fetchersThreadPool->schedule([this](Status status) { this->_runFetcher(); });
- }
-
- fetchersThreadPool->shutdown();
- fetchersThreadPool->join();
-}
-
-
-template <typename Inserter>
-void MigrationBatchFetcher<Inserter>::_runFetcher() try {
- auto executor =
- Grid::get(_innerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
-
- auto applicationOpCtx = CancelableOperationContext(
- cc().makeOperationContext(), _innerOpCtx->getCancellationToken(), executor);
-
- auto opCtx = applicationOpCtx.get();
- auto assertNotAborted = [&]() {
- {
- stdx::lock_guard<Client> lk(*_outerOpCtx->getClient());
- _outerOpCtx->checkForInterrupt();
- }
- opCtx->checkForInterrupt();
- };
-
- LOGV2_DEBUG(6718405, 0, "Chunk migration data fetch start", "migrationId"_attr = _migrationId);
- while (true) {
- Timer totalTimer;
- BSONObj nextBatch = _fetchBatch(opCtx);
- assertNotAborted();
- if (_isEmptyBatch(nextBatch)) {
- LOGV2_DEBUG(6718404,
- 0,
- "Chunk migration initial clone complete",
- "migrationId"_attr = _migrationId,
- "duration"_attr = totalTimer.elapsed());
- break;
- }
-
- const auto batchSize = nextBatch.objsize();
- const auto fetchTime = totalTimer.elapsed();
- LOGV2_DEBUG(6718416,
- 0,
- "Chunk migration initial clone fetch end",
- "migrationId"_attr = _migrationId,
- "batchSize"_attr = batchSize,
- "fetch"_attr = duration_cast<Milliseconds>(fetchTime));
-
-
- Inserter inserter{_outerOpCtx,
- _innerOpCtx,
- nextBatch.getOwned(),
- _nss,
- _range,
- _writeConcern,
- _collectionUuid,
- _migrationProgress,
- _migrationId,
- _migrationConcurrency};
-
- _inserterWorkers->schedule([batchSize,
- fetchTime,
- totalTimer = std::move(totalTimer),
- insertTimer = Timer(),
- migrationId = _migrationId,
- inserter = std::move(inserter)](Status status) {
- inserter.run(status);
-
- const auto checkDivByZero = [](auto divisor, auto expression) {
- return divisor == 0 ? -1 : expression();
- };
- const auto calcThroughput = [&](auto bytes, auto duration) {
- return checkDivByZero(durationCount<Microseconds>(duration), [&]() {
- return static_cast<double>(bytes) / durationCount<Microseconds>(duration);
- });
- };
-
- const auto insertTime = insertTimer.elapsed();
- const auto totalTime = totalTimer.elapsed();
- const auto batchThroughputMBps = calcThroughput(batchSize, totalTime);
- const auto insertThroughputMBps = calcThroughput(batchSize, insertTime);
- const auto fetchThroughputMBps = calcThroughput(batchSize, fetchTime);
-
- LOGV2_DEBUG(6718417,
- 1,
- "Chunk migration initial clone apply batch",
- "migrationId"_attr = migrationId,
- "batchSize"_attr = batchSize,
- "total"_attr = duration_cast<Milliseconds>(totalTime),
- "totalThroughputMBps"_attr = batchThroughputMBps,
- "fetch"_attr = duration_cast<Milliseconds>(fetchTime),
- "fetchThroughputMBps"_attr = fetchThroughputMBps,
- "insert"_attr = duration_cast<Milliseconds>(insertTime),
- "insertThroughputMBps"_attr = insertThroughputMBps);
- });
- }
-} catch (const DBException& e) {
- stdx::lock_guard<Client> lk(*_innerOpCtx->getClient());
- _innerOpCtx->getServiceContext()->killOperation(lk, _innerOpCtx, ErrorCodes::Error(6718400));
- LOGV2_ERROR(6718413,
- "Chunk migration failure fetching data",
- "migrationId"_attr = _migrationId,
- "failure"_attr = e.toStatus());
-}
-
-template <typename Inserter>
-MigrationBatchFetcher<Inserter>::~MigrationBatchFetcher() {
- LOGV2(6718401,
- "Shutting down and joining inserter threads for migration {migrationId}",
- "migrationId"_attr = _migrationId);
- _inserterWorkers->shutdown();
- _inserterWorkers->join();
- LOGV2(6718415,
- "Inserter threads for migration {migrationId} joined",
- "migrationId"_attr = _migrationId);
-}
-
-template class MigrationBatchFetcher<MigrationBatchInserter>;
-
-template class MigrationBatchFetcher<MigrationBatchMockInserter>;
-
-} // namespace mongo
diff --git a/src/mongo/db/s/migration_batch_fetcher.h b/src/mongo/db/s/migration_batch_fetcher.h
deleted file mode 100644
index d402c3a92e6..00000000000
--- a/src/mongo/db/s/migration_batch_fetcher.h
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/base/error_extra_info.h"
-#include "mongo/db/client.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/optime.h"
-#include "mongo/db/s/migration_batch_inserter.h"
-#include "mongo/db/s/migration_batch_mock_inserter.h"
-#include "mongo/db/s/migration_session_id.h"
-#include "mongo/s/client/shard.h"
-#include "mongo/s/grid.h"
-#include "mongo/s/shard_id.h"
-#include "mongo/util/cancellation.h"
-#include "mongo/util/concurrency/thread_pool.h"
-#include "mongo/util/producer_consumer_queue.h"
-
-#pragma once
-
-namespace mongo {
-
-
-// This class is only instantiated on the destination of a chunk migration and
-// has a single purpose: to manage two thread pools, one
-// on which threads perform inserters, and one on which
-// threads run _migrateClone requests (to fetch batches of documents to insert).
-//
-// The constructor creates and starts the inserter thread pool. The destructor shuts down
-// and joins the inserter thread pool.
-//
-// The main work of the class is in method fetchAndScheduleInsertion. That method
-// starts a thread pool for fetchers. Each thread in that thread pool sits in a loop
-// sending out _migrateClone requests, blocking on the response, and scheduling an
-// inserter on the inserter thread pool. This function joins and shuts down the
-// fetcher thread pool once all batches have been fetched.
-//
-// Inserter is templated only to allow a mock inserter to exist.
-// There is only one implementation of inserter currently, which is MigrationBatchInserter.
-//
-// A few things to note:
-// - After fetchAndScheduleInsertion returns, insertions are still being executed (although fetches
-// are not).
-// - Sending out _migrateClone requests in parallel implies the need for synchronization on the
-// source. See the comments in migration_chunk_cloner_source.h for details around
-// that.
-// - The requirement on source side synchronization implies that care must be taken on upgrade.
-// In particular, if the source is running an earlier binary that doesn't have code for
-// source side synchronization, it is unsafe to send _migrateClone requests in parallel.
-// To handle that case, when the source is prepared to service _migrateClone requests in
-// parallel, the field "parallelMigrateCloneSupported" is included in the "_recvChunkStart"
-// command. The inclusion of that field indicates to the destination that it is safe
-// to send _migrateClone requests in parallel. Its exclusion indicates that it is unsafe.
-template <typename Inserter>
-class MigrationBatchFetcher {
-public:
- MigrationBatchFetcher(OperationContext* outerOpCtx,
- OperationContext* innerOpCtx,
- NamespaceString nss,
- MigrationSessionId sessionId,
- const WriteConcernOptions& writeConcern,
- const ShardId& fromShardId,
- const ChunkRange& range,
- const UUID& migrationId,
- const UUID& collectionId,
- std::shared_ptr<MigrationCloningProgressSharedState> migrationInfo,
- bool parallelFetchingSupported);
-
- ~MigrationBatchFetcher();
-
- // Repeatedly fetch batches (using _migrateClone request) and schedule inserter jobs
- // on thread pool.
- void fetchAndScheduleInsertion();
-
- // Get inserter thread pool stats.
- ThreadPool::Stats getThreadPoolStats() const {
- return _inserterWorkers->getStats();
- }
-
-private:
- NamespaceString _nss;
-
- // Size of thread pools.
- int _migrationConcurrency;
-
- MigrationSessionId _sessionId;
-
- // Inserter thread pool.
- std::unique_ptr<ThreadPool> _inserterWorkers;
-
- BSONObj _migrateCloneRequest;
-
- OperationContext* _outerOpCtx;
-
- OperationContext* _innerOpCtx;
-
- std::shared_ptr<Shard> _fromShard;
-
- // Shared state, by which the progress of migration is communicated
- // to MigrationDestinationManager.
- std::shared_ptr<MigrationCloningProgressSharedState> _migrationProgress;
-
- ChunkRange _range;
-
- UUID _collectionUuid;
-
- UUID _migrationId;
-
- WriteConcernOptions _writeConcern;
-
- // Indicates if source is prepared to service _migrateClone requests in parallel.
- bool _isParallelFetchingSupported;
-
- // Given session id and namespace, create migrateCloneRequest.
- // Only should be created once for the lifetime of the object.
- BSONObj _createMigrateCloneRequest() const {
- BSONObjBuilder builder;
- builder.append("_migrateClone", _nss.ns());
- _sessionId.append(&builder);
- return builder.obj();
- }
-
- void _runFetcher();
-
- // Fetches next batch using _migrateClone request and return it. May return an empty batch.
- BSONObj _fetchBatch(OperationContext* opCtx);
-
- static bool _isEmptyBatch(const BSONObj& batch) {
- return batch.getField("objects").Obj().isEmpty();
- }
-
- static void onCreateThread(const std::string& threadName) {
- Client::initThread(threadName, getGlobalServiceContext(), nullptr);
- {
- stdx::lock_guard<Client> lk(cc());
- cc().setSystemOperationKillableByStepdown(lk);
- }
- }
-
-}; // namespace mongo
-
-} // namespace mongo
diff --git a/src/mongo/db/s/migration_batch_fetcher_test.cpp b/src/mongo/db/s/migration_batch_fetcher_test.cpp
deleted file mode 100644
index f86368b321b..00000000000
--- a/src/mongo/db/s/migration_batch_fetcher_test.cpp
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
-
-#include "mongo/base/status_with.h"
-#include "mongo/bson/bsonobj.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/s/migration_batch_fetcher.h"
-#include "mongo/db/s/migration_session_id.h"
-#include "mongo/db/s/shard_server_test_fixture.h"
-#include "mongo/db/write_concern_options.h"
-#include "mongo/dbtests/mock/mock_replica_set.h"
-#include "mongo/executor/cancelable_executor.h"
-#include "mongo/executor/network_interface_mock.h"
-#include "mongo/executor/thread_pool_mock.h"
-#include "mongo/executor/thread_pool_task_executor.h"
-#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
-#include "mongo/idl/server_parameter_test_util.h"
-#include "mongo/logv2/log.h"
-#include "mongo/platform/basic.h"
-#include "mongo/s/catalog/sharding_catalog_client_mock.h"
-#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/stdx/future.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/duration.h"
-#include "mongo/util/net/hostandport.h"
-#include "mongo/util/uuid.h"
-
-namespace mongo {
-namespace {
-
-using unittest::assertGet;
-
-const ConnectionString kDonorConnStr =
- ConnectionString::forReplicaSet("Donor",
- {HostAndPort("DonorHost1:1234"),
- HostAndPort{"DonorHost2:1234"},
- HostAndPort{"DonorHost3:1234"}});
-const ConnectionString kRecipientConnStr =
- ConnectionString::forReplicaSet("Recipient",
- {HostAndPort("RecipientHost1:1234"),
- HostAndPort("RecipientHost2:1234"),
- HostAndPort("RecipientHost3:1234")});
-
-class MigrationBatchFetcherTestFixture : public ShardServerTestFixture {
-
-protected:
- /**
- * Sets up the task executor as well as a TopologyListenerMock for each unit test.
- */
- void setUp() override {
- ShardServerTestFixture::setUp();
-
- {
- auto donorShard = assertGet(
- shardRegistry()->getShard(operationContext(), kDonorConnStr.getSetName()));
- RemoteCommandTargeterMock::get(donorShard->getTargeter())
- ->setConnectionStringReturnValue(kDonorConnStr);
- RemoteCommandTargeterMock::get(donorShard->getTargeter())
- ->setFindHostReturnValue(kDonorConnStr.getServers()[0]);
- }
- }
-
- void tearDown() override {
- ShardServerTestFixture::tearDown();
- }
-
- /**
- * Instantiates a BSON object in which both "_id" and "X" are set to value.
- */
- static BSONObj createDocument(int value) {
- return BSON("_id" << value << "X" << value);
- }
- static BSONObj createEmpty() {
- return BSONObj{};
- }
- /**
- * Creates a list of documents to clone.
- */
- static std::vector<BSONObj> createDocumentsToClone() {
- return {createDocument(1), createDocument(2), createDocument(3)};
- }
-
- /**
- * Creates a list of documents to clone and converts it to a BSONArray.
- */
- static BSONArray createDocumentsToCloneArray() {
- BSONArrayBuilder arrayBuilder;
- for (auto& doc : createDocumentsToClone()) {
- arrayBuilder.append(doc);
- }
- return arrayBuilder.arr();
- }
- static BSONArray createEmptyCloneArray() {
- return BSONArrayBuilder().arr();
- }
-
- static BSONObj getTerminalBsonObj() {
- return BSON("Status"
- << "OK"
- << "ok" << 1 << "objects" << createEmptyCloneArray());
- }
-
- static BSONObj getBatchBsonObj() {
- return BSON("Status"
- << "OK"
- << "ok" << 1 << "objects" << createDocumentsToCloneArray());
- }
-
-private:
- OperationContext* _opCtx;
- ServiceContext* _svcCtx;
- executor::NetworkInterfaceMock* _net;
-
- std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient() override {
- class StaticCatalogClient final : public ShardingCatalogClientMock {
- public:
- StaticCatalogClient() = default;
-
- StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
- OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
-
- ShardType donorShard;
- donorShard.setName(kDonorConnStr.getSetName());
- donorShard.setHost(kDonorConnStr.toString());
-
- ShardType recipientShard;
- recipientShard.setName(kRecipientConnStr.getSetName());
- recipientShard.setHost(kRecipientConnStr.toString());
-
- return repl::OpTimeWith<std::vector<ShardType>>({donorShard, recipientShard});
- }
- };
-
- return std::make_unique<StaticCatalogClient>();
- }
-};
-
-auto getOnMigrateCloneCommandCb(BSONObj ret) {
- return [ret](const executor::RemoteCommandRequest& request) -> StatusWith<BSONObj> {
- ASSERT_EQ(request.cmdObj.getField("_migrateClone").String(), "test.foo");
- return ret;
- };
-}
-
-TEST_F(MigrationBatchFetcherTestFixture, BasicEmptyFetchingTest) {
- NamespaceString nss{"test", "foo"};
- ShardId fromShard{"Donor"};
- auto msid = MigrationSessionId::generate(fromShard, "Recipient");
- auto outerOpCtx = operationContext();
- auto newClient = outerOpCtx->getServiceContext()->makeClient("MigrationCoordinator");
-
- int concurrency = 30;
- RAIIServerParameterControllerForTest featureFlagController(
- "featureFlagConcurrencyInChunkMigration", true);
- RAIIServerParameterControllerForTest setMigrationConcurrencyParam{"migrationConcurrency",
- concurrency};
-
- AlternativeClientRegion acr(newClient);
- auto executor =
- Grid::get(outerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
- auto newOpCtxPtr = CancelableOperationContext(
- cc().makeOperationContext(), outerOpCtx->getCancellationToken(), executor);
- auto opCtx = newOpCtxPtr.get();
-
- auto fetcher = std::make_unique<MigrationBatchFetcher<MigrationBatchMockInserter>>(
- outerOpCtx,
- opCtx,
- nss,
- msid,
- WriteConcernOptions::parse(WriteConcernOptions::Majority).getValue(),
- fromShard,
- ChunkRange{BSON("x" << 1), BSON("x" << 2)},
- UUID::gen(),
- UUID::gen(),
- nullptr,
- true);
-
- // Start asynchronous task for responding to _migrateClone requests.
- // Must name the return of value std::async. The destructor of std::future joins the
- // asynchrounous task. (If it were left unnamed, the destructor would run inline, and the test
- // would hang forever.)
- auto fut = stdx::async(stdx::launch::async, [&]() {
- // One terminal response for each thread
- for (int i = 0; i < concurrency; ++i) {
- onCommand(getOnMigrateCloneCommandCb(getTerminalBsonObj()));
- }
- });
- fetcher->fetchAndScheduleInsertion();
-}
-
-TEST_F(MigrationBatchFetcherTestFixture, BasicFetching) {
- NamespaceString nss{"test", "foo"};
- ShardId fromShard{"Donor"};
- auto msid = MigrationSessionId::generate(fromShard, "Recipient");
-
- auto outerOpCtx = operationContext();
- auto newClient = outerOpCtx->getServiceContext()->makeClient("MigrationCoordinator");
- AlternativeClientRegion acr(newClient);
-
- auto executor =
- Grid::get(outerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
- auto newOpCtxPtr = CancelableOperationContext(
- cc().makeOperationContext(), outerOpCtx->getCancellationToken(), executor);
- auto opCtx = newOpCtxPtr.get();
-
-
- int concurrency = 30;
- RAIIServerParameterControllerForTest featureFlagController(
- "featureFlagConcurrencyInChunkMigration", true);
- RAIIServerParameterControllerForTest setMigrationConcurrencyParam{"migrationConcurrency",
- concurrency};
-
- auto fetcher = std::make_unique<MigrationBatchFetcher<MigrationBatchMockInserter>>(
- outerOpCtx,
- opCtx,
- nss,
- msid,
- WriteConcernOptions::parse(WriteConcernOptions::Majority).getValue(),
- fromShard,
- ChunkRange{BSON("x" << 1), BSON("x" << 2)},
- UUID::gen(),
- UUID::gen(),
- nullptr,
- true);
-
- auto fut = stdx::async(stdx::launch::async, [&]() {
- for (int i = 0; i < 8; ++i) {
- onCommand(getOnMigrateCloneCommandCb(getBatchBsonObj()));
- }
- // One terminal response for each thread
- for (int i = 0; i < concurrency; ++i) {
- onCommand(getOnMigrateCloneCommandCb(getTerminalBsonObj()));
- }
- });
- fetcher->fetchAndScheduleInsertion();
-}
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/db/s/migration_batch_inserter.cpp b/src/mongo/db/s/migration_batch_inserter.cpp
deleted file mode 100644
index 37f55947745..00000000000
--- a/src/mongo/db/s/migration_batch_inserter.cpp
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration
-
-#include "mongo/db/s/migration_batch_inserter.h"
-
-#include "mongo/db/s/migration_util.h"
-#include "mongo/db/transaction_participant.h"
-#include "mongo/logv2/log.h"
-
-namespace mongo {
-
-namespace {
-
-void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) {
- MongoDOperationContextSession::checkOut(opCtx);
- TransactionParticipant::get(opCtx).beginOrContinue(opCtx,
- {*opCtx->getTxnNumber()},
- boost::none /* autocommit */,
- boost::none /* startTransaction */);
-}
-
-template <typename Callable>
-constexpr bool returnsVoid() {
- return std::is_void_v<std::invoke_result_t<Callable>>;
-}
-
-// Yields the checked out session before running the given function. If the function runs without
-// throwing, will reacquire the session and verify it is still valid to proceed with the migration.
-template <typename Callable, std::enable_if_t<!returnsVoid<Callable>(), int> = 0>
-auto runWithoutSession(OperationContext* opCtx, Callable callable) {
- MongoDOperationContextSession::checkIn(opCtx, OperationContextSession::CheckInReason::kYield);
-
- auto retVal = callable();
-
- // The below code can throw, so it cannot run in a scope guard.
- opCtx->checkForInterrupt();
- checkOutSessionAndVerifyTxnState(opCtx);
-
- return retVal;
-}
-
-// Same as runWithoutSession above but takes a void function.
-template <typename Callable, std::enable_if_t<returnsVoid<Callable>(), int> = 0>
-void runWithoutSession(OperationContext* opCtx, Callable callable) {
- MongoDOperationContextSession::checkIn(opCtx, OperationContextSession::CheckInReason::kYield);
-
- callable();
-
- // The below code can throw, so it cannot run in a scope guard.
- opCtx->checkForInterrupt();
- checkOutSessionAndVerifyTxnState(opCtx);
-}
-} // namespace
-
-
-void MigrationBatchInserter::onCreateThread(const std::string& threadName) {
- Client::initThread(threadName, getGlobalServiceContext(), nullptr);
- {
- stdx::lock_guard<Client> lk(cc());
- cc().setSystemOperationKillableByStepdown(lk);
- }
-}
-
-void MigrationBatchInserter::run(Status status) const try {
- // Run is passed in a non-ok status if this function runs inline.
- // That happens if we schedule this task on a ThreadPool that is
- // already shutdown. If we were to schedule a task on a shutdown ThreadPool,
- // then there is a logic error in our code. Therefore, we assert that here.
-
- invariant(status.isOK());
- auto arr = _batch["objects"].Obj();
- if (arr.isEmpty())
- return;
-
- auto executor =
- Grid::get(_innerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
-
- auto applicationOpCtx = CancelableOperationContext(
- cc().makeOperationContext(), _innerOpCtx->getCancellationToken(), executor);
-
- auto opCtx = applicationOpCtx.get();
-
- auto assertNotAborted = [&]() {
- {
- stdx::lock_guard<Client> lk(*_outerOpCtx->getClient());
- _outerOpCtx->checkForInterrupt();
- }
- opCtx->checkForInterrupt();
- };
-
- auto it = arr.begin();
- while (it != arr.end()) {
- int batchNumCloned = 0;
- int batchClonedBytes = 0;
- const int batchMaxCloned = migrateCloneInsertionBatchSize.load();
-
- assertNotAborted();
-
- write_ops::InsertCommandRequest insertOp(_nss);
- insertOp.getWriteCommandRequestBase().setOrdered(true);
- insertOp.setDocuments([&] {
- std::vector<BSONObj> toInsert;
- while (it != arr.end() && (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) {
- const auto& doc = *it;
- BSONObj docToClone = doc.Obj();
- toInsert.push_back(docToClone);
- batchNumCloned++;
- batchClonedBytes += docToClone.objsize();
- ++it;
- }
- return toInsert;
- }());
-
- {
- // Disable the schema validation (during document inserts and updates)
- // and any internal validation for opCtx for performInserts()
- DisableDocumentValidation documentValidationDisabler(
- opCtx,
- DocumentValidationSettings::kDisableSchemaValidation |
- DocumentValidationSettings::kDisableInternalValidation);
- const auto reply =
- write_ops_exec::performInserts(opCtx, insertOp, OperationSource::kFromMigrate);
- for (unsigned long i = 0; i < reply.results.size(); ++i) {
- uassertStatusOKWithContext(
- reply.results[i],
- str::stream() << "Insert of " << insertOp.getDocuments()[i] << " failed.");
- }
- // Revert to the original DocumentValidationSettings for opCtx
- }
-
- migrationutil::persistUpdatedNumOrphans(
- opCtx, _migrationId, _collectionUuid, batchNumCloned);
- _migrationProgress->updateMaxOptime(
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp());
-
- ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(batchNumCloned);
- LOGV2(6718408,
- "Incrementing numCloned count by {batchNumCloned} and numClonedBytes by "
- "{batchClonedBytes}",
- "batchNumCloned"_attr = batchNumCloned,
- "batchClonedBytes"_attr = batchClonedBytes);
- _migrationProgress->incNumCloned(batchNumCloned);
- _migrationProgress->incNumBytes(batchClonedBytes);
-
- if (_writeConcern.needToWaitForOtherNodes() && _threadCount == 1) {
- runWithoutSession(_outerOpCtx, [&] {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- _writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- LOGV2_WARNING(22011,
- "secondaryThrottle on, but doc insert timed out; continuing",
- "migrationId"_attr = _migrationId.toBSON());
- } else {
- uassertStatusOK(replStatus.status);
- }
- });
- }
-
- sleepmillis(migrateCloneInsertionBatchDelayMS.load());
- }
-} catch (const DBException& e) {
- stdx::lock_guard<Client> lk(*_innerOpCtx->getClient());
- _innerOpCtx->getServiceContext()->killOperation(lk, _innerOpCtx, ErrorCodes::Error(6718402));
- LOGV2(6718407,
- "Batch application failed: {error}",
- "Batch application failed",
- "error"_attr = e.toStatus());
-}
-} // namespace mongo
diff --git a/src/mongo/db/s/migration_batch_inserter.h b/src/mongo/db/s/migration_batch_inserter.h
deleted file mode 100644
index c3223921602..00000000000
--- a/src/mongo/db/s/migration_batch_inserter.h
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "boost/optional/optional.hpp"
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/db/cancelable_operation_context.h"
-#include "mongo/db/catalog/document_validation.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/ops/write_ops_exec.h"
-#include "mongo/db/ops/write_ops_gen.h"
-#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/s/migration_session_id.h"
-#include "mongo/db/s/range_deletion_util.h"
-#include "mongo/db/s/sharding_runtime_d_params_gen.h"
-#include "mongo/db/s/sharding_statistics.h"
-#include "mongo/db/session_catalog_mongod.h"
-#include "mongo/db/write_concern_options.h"
-#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/grid.h"
-#include "mongo/util/uuid.h"
-
-#pragma once
-
-namespace mongo {
-
-// The purpose of this type is to allow inserters to communicate
-// their progress to the outside world.
-class MigrationCloningProgressSharedState {
- mutable Mutex _m;
- repl::OpTime _maxOptime;
- long long _numCloned = 0;
- long long _numBytes = 0;
-
-public:
- void updateMaxOptime(const repl::OpTime& _newOptime) {
- stdx::lock_guard lk(_m);
- _maxOptime = std::max(_maxOptime, _newOptime);
- }
- repl::OpTime getMaxOptime() const {
- stdx::lock_guard lk(_m);
- return _maxOptime;
- }
- void incNumCloned(int num) {
- stdx::lock_guard lk(_m);
- _numCloned += num;
- }
- void incNumBytes(int num) {
- stdx::lock_guard lk(_m);
- _numBytes += num;
- }
- long long getNumCloned() const {
- stdx::lock_guard lk(_m);
- return _numCloned;
- }
- long long getNumBytes() const {
- stdx::lock_guard lk(_m);
- return _numBytes;
- }
-};
-
-// This type contains a BSONObj _batch corresponding to a _migrateClone response.
-// The purpose of this type is to perform the insertions for this batch.
-// Those insertions happen in its "run" method. The MigrationBatchFetcher
-// schedules these jobs on a thread pool. This class has no knowledge that it runs
-// on a thread pool. It sole purpose is to perform insertions and communicate its progress
-// (inluding the new max opTime).
-class MigrationBatchInserter {
-public:
- // Do inserts.
- void run(Status status) const;
-
- MigrationBatchInserter(OperationContext* outerOpCtx,
- OperationContext* innerOpCtx,
- BSONObj batch,
- const NamespaceString& nss,
- const ChunkRange& range,
- const WriteConcernOptions& writeConcern,
- const UUID& collectionUuid,
- std::shared_ptr<MigrationCloningProgressSharedState> migrationProgress,
- const UUID& migrationId,
- int threadCount)
- : _outerOpCtx{outerOpCtx},
- _innerOpCtx{innerOpCtx},
- _batch{batch},
- _nss{nss},
- _range{range},
- _writeConcern{writeConcern},
- _collectionUuid{collectionUuid},
- _migrationProgress{migrationProgress},
- _migrationId{migrationId},
- _threadCount{threadCount} {}
-
- static void onCreateThread(const std::string& threadName);
-
-private:
- OperationContext* _outerOpCtx;
- OperationContext* _innerOpCtx;
- BSONObj _batch;
- NamespaceString _nss;
- ChunkRange _range;
- WriteConcernOptions _writeConcern;
- UUID _collectionUuid;
- std::shared_ptr<MigrationCloningProgressSharedState> _migrationProgress;
- UUID _migrationId;
- int _threadCount;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/s/migration_batch_mock_inserter.h b/src/mongo/db/s/migration_batch_mock_inserter.h
deleted file mode 100644
index 8b4b766480c..00000000000
--- a/src/mongo/db/s/migration_batch_mock_inserter.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/s/migration_batch_inserter.h"
-#include "mongo/db/write_concern_options.h"
-#include "mongo/s/catalog/type_chunk.h"
-
-#pragma once
-
-namespace mongo {
-
-class MigrationBatchMockInserter {
-public:
- void run(Status status) const {
- // Run is passed in a non-ok status if this function runs inline.
- // That happens if we schedule this task on a ThreadPool that is
- // already shutdown. We should never do that. Therefore,
- // we assert that here.
- invariant(status.isOK());
- }
- MigrationBatchMockInserter(OperationContext*,
- OperationContext*,
- BSONObj,
- NamespaceString,
- ChunkRange,
- WriteConcernOptions,
- UUID,
- std::shared_ptr<MigrationCloningProgressSharedState>,
- UUID,
- int) {}
-
- static void onCreateThread(const std::string& threadName) {}
-
-private:
- BSONObj _batch;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.cpp b/src/mongo/db/s/migration_chunk_cloner_source.cpp
index 194e929cb70..6df6ebadd6f 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source.cpp
@@ -27,8 +27,6 @@
* it in the license file.
*/
-
-#include "mongo/bson/bsonobj.h"
#include "mongo/platform/basic.h"
#include "mongo/db/s/migration_chunk_cloner_source.h"
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index d56601cb85b..1f378725369 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -36,7 +36,6 @@
#include <fmt/format.h>
#include "mongo/base/status.h"
-#include "mongo/bson/bsonobj.h"
#include "mongo/client/read_preference.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog_raii.h"
@@ -726,61 +725,6 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon
_jumboChunkCloneState->clonerExec->detachFromOperationContext();
}
-boost::optional<Snapshotted<BSONObj>> MigrationChunkClonerSourceLegacy::_getNextDoc(
- OperationContext* opCtx, const CollectionPtr& collection) {
- while (true) {
- stdx::unique_lock lk(_mutex);
- invariant(_inProgressReads >= 0);
- RecordId nextRecordId;
- Snapshotted<BSONObj> doc;
-
- _moreDocsCV.wait(lk, [&]() {
- return _cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty() ||
- _inProgressReads == 0;
- });
-
- // One of the following must now be true (corresponding to the three if conditions):
- // 1. There is a document in the overflow set
- // 2. The iterator has not reached the end of the record id set
- // 3. The overflow set is empty, the iterator is at the end, and
- // no threads are holding a document. This condition indicates
- // that there are no more docs to return for the cloning phase.
- if (!_overflowDocs.empty()) {
- doc = std::move(_overflowDocs.front());
- _overflowDocs.pop_front();
- ++_inProgressReads;
- return doc;
- } else if (_cloneRecordIdsIter != _cloneLocs.end()) {
- nextRecordId = *_cloneRecordIdsIter;
- ++_cloneRecordIdsIter;
- ++_inProgressReads;
- } else {
- invariant(_numRecordsCloned + _numRecordsPassedOver == _cloneLocs.size());
- return boost::none;
- }
-
- // In order to saturate the disk, the I/O operation must occur without
- // holding the mutex.
- lk.unlock();
- if (collection->findDoc(opCtx, nextRecordId, &doc))
- return doc;
- lk.lock();
- ++_numRecordsPassedOver;
-
- // It is possible that this document is no longer in the collection,
- // in which case, we try again and indicate to other threads that this
- // thread is not holding a document.
- --_inProgressReads;
- _moreDocsCV.notify_one();
- }
-}
-
-void MigrationChunkClonerSourceLegacy::_insertOverflowDoc(Snapshotted<BSONObj> doc) {
- stdx::lock_guard lk(_mutex);
- invariant(_inProgressReads >= 1);
- _overflowDocs.push_back(std::move(doc));
-}
-
void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx,
const CollectionPtr& collection,
BSONArrayBuilder* arrBuilder) {
@@ -788,56 +732,49 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
internalQueryExecYieldIterations.load(),
Milliseconds(internalQueryExecYieldPeriodMS.load()));
- while (auto doc = _getNextDoc(opCtx, collection)) {
- ON_BLOCK_EXIT([&]() {
- stdx::lock_guard lk(_mutex);
- invariant(_inProgressReads > 0);
- --_inProgressReads;
- _moreDocsCV.notify_one();
- });
+ stdx::unique_lock<Latch> lk(_mutex);
+ auto iter = _cloneLocs.begin();
+ for (; iter != _cloneLocs.end(); ++iter) {
// We must always make progress in this method by at least one document because empty
// return indicates there is no more initial clone data.
if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) {
- _insertOverflowDoc(std::move(*doc));
break;
}
- // Do not send documents that are no longer in the chunk range being moved. This can
- // happen when document shard key value of the document changed after the initial
- // index scan during cloning. This is needed because the destination is very
- // conservative in processing xferMod deletes and won't delete docs that are not in
- // the range of the chunk being migrated.
- if (!isDocInRange(
- doc->value(), _args.getMin().value(), _args.getMax().value(), _shardKeyPattern)) {
- {
- stdx::lock_guard lk(_mutex);
- _numRecordsPassedOver++;
+ auto nextRecordId = *iter;
+
+ lk.unlock();
+ ON_BLOCK_EXIT([&lk] { lk.lock(); });
+
+ Snapshotted<BSONObj> doc;
+ if (collection->findDoc(opCtx, nextRecordId, &doc)) {
+ // Do not send documents that are no longer in the chunk range being moved. This can
+ // happen when document shard key value of the document changed after the initial
+ // index scan during cloning. This is needed because the destination is very
+ // conservative in processing xferMod deletes and won't delete docs that are not in
+ // the range of the chunk being migrated.
+ if (!isDocInRange(doc.value(),
+ _args.getMin().value(),
+ _args.getMax().value(),
+ _shardKeyPattern)) {
+ continue;
}
- continue;
- }
- // Use the builder size instead of accumulating the document sizes directly so
- // that we take into consideration the overhead of BSONArray indices.
- if (arrBuilder->arrSize() &&
- (arrBuilder->len() + doc->value().objsize() + 1024) > BSONObjMaxUserSize) {
- _insertOverflowDoc(std::move(*doc));
- break;
- }
+ // Use the builder size instead of accumulating the document sizes directly so
+ // that we take into consideration the overhead of BSONArray indices.
+ if (arrBuilder->arrSize() &&
+ (arrBuilder->len() + doc.value().objsize() + 1024) > BSONObjMaxUserSize) {
- {
- stdx::lock_guard lk(_mutex);
- _numRecordsCloned++;
+ break;
+ }
+
+ arrBuilder->append(doc.value());
+ ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1);
}
- arrBuilder->append(doc->value());
- ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1);
}
- // When we reach here, there are no more documents to return to the destination.
- // We therefore need to notify a other threads that maybe sleeping on the condition
- // variable that we are done.
- stdx::lock_guard lk(_mutex);
- _moreDocsCV.notify_one();
+ _cloneLocs.erase(_cloneLocs.begin(), iter);
}
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
@@ -947,7 +884,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
{
// All clone data must have been drained before starting to fetch the incremental changes.
stdx::unique_lock<Latch> lk(_mutex);
- invariant(_cloneRecordIdsIter == _cloneLocs.end());
+ invariant(_cloneLocs.empty());
// The "snapshot" for delete and update list must be taken under a single lock. This is to
// ensure that we will preserve the causal order of writes. Always consume the delete
@@ -1149,7 +1086,6 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
}
}
}
- _cloneRecordIdsIter = _cloneLocs.begin();
} catch (DBException& exception) {
exception.addContext("Executor error while scanning for documents belonging to chunk");
throw;
@@ -1247,6 +1183,7 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
stdx::lock_guard<Latch> sl(_mutex);
+ const std::size_t cloneLocsRemaining = _cloneLocs.size();
int64_t untransferredModsSizeBytes = _untransferredDeletesCounter * _averageObjectIdSize +
(_untransferredUpsertsCounter + _deferredUntransferredOpsCounter) *
_averageObjectSizeForCloneLocs;
@@ -1269,14 +1206,13 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
"moveChunk data transfer progress",
"response"_attr = redact(res),
"memoryUsedBytes"_attr = _memoryUsed,
- "docsRemainingToClone"_attr =
- _cloneLocs.size() - _numRecordsCloned - _numRecordsPassedOver,
+ "docsRemainingToClone"_attr = cloneLocsRemaining,
"untransferredModsSizeBytes"_attr = untransferredModsSizeBytes);
}
if (res["state"].String() == "steady" && sessionCatalogSourceInCatchupPhase &&
estimateUntransferredSessionsSize == 0) {
- if ((_cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty()) ||
+ if (cloneLocsRemaining != 0 ||
(_jumboChunkCloneState && _forceJumbo &&
PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) {
return {ErrorCodes::OperationIncomplete,
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 7a59f6029bc..4336a15d0c0 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -29,10 +29,8 @@
#pragma once
-#include <deque>
#include <list>
#include <memory>
-#include <mutex>
#include <set>
#include "mongo/bson/bsonobj.h"
@@ -265,15 +263,6 @@ private:
Status _storeCurrentLocs(OperationContext* opCtx);
/**
- * Returns boost::none if there are no more documents to get.
- * Increments _inProgressReads if and only if return value is not none.
- */
- boost::optional<Snapshotted<BSONObj>> _getNextDoc(OperationContext* opCtx,
- const CollectionPtr& collection);
-
- void _insertOverflowDoc(Snapshotted<BSONObj> doc);
-
- /**
* Adds the OpTime to the list of OpTimes for oplog entries that we should consider migrating as
* part of session migration.
*/
@@ -381,47 +370,6 @@ private:
// List of record ids that needs to be transferred (initial clone)
std::set<RecordId> _cloneLocs;
- // This iterator is a pointer into the _cloneLocs set. It allows concurrent access to
- // the _cloneLocs set by allowing threads servicing _migrateClone requests to do the
- // following:
- // 1. Acquire mutex "_mutex" above.
- // 2. Copy *_cloneRecordIdsIter into its local stack frame.
- // 3. Increment _cloneRecordIdsIter
- // 4. Unlock "_mutex."
- // 5. Do the I/O to fetch the document corresponding to this record Id.
- //
- // The purpose of this algorithm, is to allow different threads to concurrently start I/O jobs
- // in order to more fully saturate the disk.
- //
- // One issue with this algorithm, is that only 16MB worth of documents can be returned in
- // response to a _migrateClone request. But, the thread does not know the size of a document
- // until it does the I/O. At which point, if the document does not fit in the response to
- // _migrateClone request the document must be made available to a different thread servicing a
- // _migrateClone request. To solve this problem, the thread adds the document
- // to the below _overflowDocs deque.
- std::set<RecordId>::iterator _cloneRecordIdsIter;
-
- // This deque stores all documents that must be sent to the destination, but could not fit
- // in the response to a particular _migrateClone request.
- std::deque<Snapshotted<BSONObj>> _overflowDocs;
-
- // This integer represents how many documents are being "held" by threads servicing
- // _migrateClone requests. Any document that is "held" by a thread may be added to the
- // _overflowDocs deque if it doesn't fit in the response to a _migrateClone request.
- // This integer is necessary because it gives us a condition on when all documents to be sent
- // to the destination have been exhausted.
- //
- // If (_cloneRecordIdsIter == _cloneLocs.end() && _overflowDocs.empty() && _inProgressReads
- // == 0) then all documents have been returned to the destination.
- decltype(_cloneLocs.size()) _inProgressReads = 0;
-
- // This condition variable allows us to wait on the following condition:
- // Either we're done and the above condition is satisfied, or there is some document to
- // return.
- stdx::condition_variable _moreDocsCV;
- decltype(_cloneLocs.size()) _numRecordsCloned{0};
- decltype(_cloneLocs.size()) _numRecordsPassedOver{0};
-
// The estimated average object size during the clone phase. Used for buffer size
// pre-allocation (initial clone).
uint64_t _averageObjectSizeForCloneLocs{0};
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 4cbcf6cb260..721fab26d3d 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -29,7 +29,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration
-#include "mongo/db/s/migration_batch_fetcher.h"
#include "mongo/platform/basic.h"
#include "mongo/db/s/migration_destination_manager.h"
@@ -412,8 +411,8 @@ void MigrationDestinationManager::report(BSONObjBuilder& b,
}
BSONObjBuilder bb(b.subobjStart("counts"));
- bb.append("cloned", _getNumCloned());
- bb.append("clonedBytes", _getNumBytesCloned());
+ bb.append("cloned", _numCloned);
+ bb.append("clonedBytes", _clonedBytes);
bb.append("catchup", _numCatchup);
bb.append("steady", _numSteady);
bb.done();
@@ -447,8 +446,6 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
_lsid = cloneRequest.getLsid();
_txnNumber = cloneRequest.getTxnNumber();
- _parallelFetchersSupported = cloneRequest.parallelFetchingSupported();
-
_nss = nss;
_fromShard = cloneRequest.getFromShardId();
_fromShardConnString =
@@ -465,8 +462,8 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
_chunkMarkedPending = false;
- _migrationCloningProgress = std::make_shared<MigrationCloningProgressSharedState>();
-
+ _numCloned = 0;
+ _clonedBytes = 0;
_numCatchup = 0;
_numSteady = 0;
@@ -1341,32 +1338,122 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
_sessionMigration->start(opCtx->getServiceContext());
+ const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId);
+
_chunkMarkedPending = true; // no lock needed, only the migrate thread looks.
- {
- // Destructor of MigrationBatchFetcher is non-trivial. Therefore,
- // this scope has semantic significance.
- MigrationBatchFetcher<MigrationBatchInserter> fetcher{outerOpCtx,
- opCtx,
- _nss,
- *_sessionId,
- _writeConcern,
- _fromShard,
- range,
- *_migrationId,
- *_collectionUuid,
- _migrationCloningProgress,
- _parallelFetchersSupported};
- fetcher.fetchAndScheduleInsertion();
- }
- opCtx->checkForInterrupt();
- lastOpApplied = _migrationCloningProgress->getMaxOptime();
+ auto assertNotAborted = [&](OperationContext* opCtx) {
+ opCtx->checkForInterrupt();
+ outerOpCtx->checkForInterrupt();
+ uassert(50748, "Migration aborted while copying documents", getState() != kAbort);
+ };
+
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObj nextBatch) {
+ auto arr = nextBatch["objects"].Obj();
+ if (arr.isEmpty()) {
+ return false;
+ }
+ auto it = arr.begin();
+ while (it != arr.end()) {
+ int batchNumCloned = 0;
+ int batchClonedBytes = 0;
+ const int batchMaxCloned = migrateCloneInsertionBatchSize.load();
+
+ assertNotAborted(opCtx);
+
+ write_ops::InsertCommandRequest insertOp(_nss);
+ insertOp.getWriteCommandRequestBase().setOrdered(true);
+ insertOp.setDocuments([&] {
+ std::vector<BSONObj> toInsert;
+ while (it != arr.end() &&
+ (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) {
+ const auto& doc = *it;
+ BSONObj docToClone = doc.Obj();
+ toInsert.push_back(docToClone);
+ batchNumCloned++;
+ batchClonedBytes += docToClone.objsize();
+ ++it;
+ }
+ return toInsert;
+ }());
+
+ {
+ // Disable the schema validation (during document inserts and updates)
+ // and any internal validation for opCtx for performInserts()
+ DisableDocumentValidation documentValidationDisabler(
+ opCtx,
+ DocumentValidationSettings::kDisableSchemaValidation |
+ DocumentValidationSettings::kDisableInternalValidation);
+ const auto reply = write_ops_exec::performInserts(
+ opCtx, insertOp, OperationSource::kFromMigrate);
+ for (unsigned long i = 0; i < reply.results.size(); ++i) {
+ uassertStatusOKWithContext(reply.results[i],
+ str::stream() << "Insert of "
+ << insertOp.getDocuments()[i]
+ << " failed.");
+ }
+ // Revert to the original DocumentValidationSettings for opCtx
+ }
+
+ migrationutil::persistUpdatedNumOrphans(
+ opCtx, _migrationId.get(), *_collectionUuid, batchNumCloned);
+
+ {
+ stdx::lock_guard<Latch> statsLock(_mutex);
+ _numCloned += batchNumCloned;
+ ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(
+ batchNumCloned);
+ _clonedBytes += batchClonedBytes;
+ }
+ if (_writeConcern.needToWaitForOtherNodes()) {
+ runWithoutSession(outerOpCtx, [&] {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ _writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ LOGV2_WARNING(
+ 22011,
+ "secondaryThrottle on, but doc insert timed out; continuing",
+ "migrationId"_attr = _migrationId->toBSON());
+ } else {
+ uassertStatusOK(replStatus.status);
+ }
+ });
+ }
+
+ sleepmillis(migrateCloneInsertionBatchDelayMS.load());
+ }
+ return true;
+ };
+
+ auto fetchBatchFn = [&](OperationContext* opCtx, BSONObj* nextBatch) {
+ auto commandResponse = uassertStatusOKWithContext(
+ fromShard->runCommand(opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ "admin",
+ migrateCloneRequest,
+ Shard::RetryPolicy::kNoRetry),
+ "_migrateClone failed: ");
+
+ uassertStatusOKWithContext(
+ Shard::CommandResponse::getEffectiveStatus(commandResponse),
+ "_migrateClone failed: ");
+
+ *nextBatch = commandResponse.response;
+ return nextBatch->getField("objects").Obj().isEmpty();
+ };
+
+ // If running on a replicated system, we'll need to flush the docs we cloned to the
+ // secondaries
+ lastOpApplied = fetchAndApplyBatch(opCtx, insertBatchFn, fetchBatchFn);
timing->done(4);
migrateThreadHangAtStep4.pauseWhileSet();
if (MONGO_unlikely(failMigrationOnRecipient.shouldFail())) {
- _setStateFail(str::stream() << "failing migration after cloning " << _getNumCloned()
+ _setStateFail(str::stream() << "failing migration after cloning " << _numCloned
<< " docs due to failMigrationOnRecipient failpoint");
return;
}
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 74d737f8302..925296091cb 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -29,7 +29,6 @@
#pragma once
-#include <memory>
#include <string>
#include "mongo/base/string_data.h"
@@ -40,8 +39,6 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/replica_set_aware_service.h"
#include "mongo/db/s/active_migrations_registry.h"
-#include "mongo/db/s/collection_sharding_runtime.h"
-#include "mongo/db/s/migration_batch_fetcher.h"
#include "mongo/db/s/migration_recipient_recovery_document_gen.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/db/s/session_catalog_migration_destination.h"
@@ -181,11 +178,6 @@ public:
const boost::optional<ChunkManager>& cm,
boost::optional<Timestamp> afterClusterTime);
-
- bool isParallelFetchingSupported() {
- return _parallelFetchersSupported;
- }
-
/**
* Gets the collection uuid and options from fromShardId. If given a chunk manager, will fetch
* the collection options using the database version protocol.
@@ -291,22 +283,8 @@ private:
stdx::thread _migrateThreadHandle;
- long long _getNumCloned() {
- return _migrationCloningProgress->getNumCloned();
- }
-
- long long _getNumBytesCloned() {
- return _migrationCloningProgress->getNumBytes();
- }
-
boost::optional<UUID> _migrationId;
boost::optional<UUID> _collectionUuid;
-
- // State that is shared among all inserter threads.
- std::shared_ptr<MigrationCloningProgressSharedState> _migrationCloningProgress;
-
- bool _parallelFetchersSupported;
-
LogicalSessionId _lsid;
TxnNumber _txnNumber{kUninitializedTxnNumber};
NamespaceString _nss;
@@ -326,6 +304,8 @@ private:
// failure we can perform the appropriate cleanup.
bool _chunkMarkedPending{false};
+ long long _numCloned{0};
+ long long _clonedBytes{0};
long long _numCatchup{0};
long long _numSteady{0};
diff --git a/src/mongo/db/s/start_chunk_clone_request.cpp b/src/mongo/db/s/start_chunk_clone_request.cpp
index 6838aedaffc..86567dce161 100644
--- a/src/mongo/db/s/start_chunk_clone_request.cpp
+++ b/src/mongo/db/s/start_chunk_clone_request.cpp
@@ -50,7 +50,6 @@ const char kToShardId[] = "toShardName";
const char kChunkMinKey[] = "min";
const char kChunkMaxKey[] = "max";
const char kShardKeyPattern[] = "shardKeyPattern";
-const char kParallelMigration[] = "parallelMigrateCloneSupported";
} // namespace
@@ -153,14 +152,6 @@ StatusWith<StartChunkCloneRequest> StartChunkCloneRequest::createFromCommand(Nam
}
}
- {
- Status status = bsonExtractBooleanFieldWithDefault(
- obj, kParallelMigration, false, &request._parallelFetchingSupported);
- if (!status.isOK()) {
- return status;
- }
- }
-
request._migrationId = UUID::parse(obj);
request._lsid =
LogicalSessionId::parse(IDLParserErrorContext("StartChunkCloneRequest"), obj[kLsid].Obj());
@@ -188,7 +179,6 @@ void StartChunkCloneRequest::appendAsCommand(
invariant(fromShardConnectionString.isValid());
builder->append(kRecvChunkStart, nss.ns());
- builder->append(kParallelMigration, true);
migrationId.appendToBuilder(builder, kMigrationId);
builder->append(kLsid, lsid.toBSON());
diff --git a/src/mongo/db/s/start_chunk_clone_request.h b/src/mongo/db/s/start_chunk_clone_request.h
index 8f433afd2f7..c6ecba1f839 100644
--- a/src/mongo/db/s/start_chunk_clone_request.h
+++ b/src/mongo/db/s/start_chunk_clone_request.h
@@ -93,10 +93,6 @@ public:
return _migrationId.is_initialized();
}
- bool parallelFetchingSupported() const {
- return _parallelFetchingSupported;
- }
-
const UUID& getMigrationId() const {
invariant(_migrationId);
return *_migrationId;
@@ -165,8 +161,6 @@ private:
// The parsed secondary throttle options
MigrationSecondaryThrottleOptions _secondaryThrottle;
-
- bool _parallelFetchingSupported;
};
} // namespace mongo