diff options
author | Rachita Dhawan <rachita.dhawan@gmail.com> | 2022-12-16 20:48:30 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-19 01:52:53 +0000 |
commit | ed1c0c635535b5ca7deda5ba3671ede2b1e2d59b (patch) | |
tree | f6c9e17e23b1460438d091e1395ed50594f3c086 | |
parent | b5dda11fcdb180aecc11312db8059bd6ef9febec (diff) | |
download | mongo-ed1c0c635535b5ca7deda5ba3671ede2b1e2d59b.tar.gz |
SERVER-72046 Add test for concurrent migration fetching and insertion
(cherry picked from commit ab506f144f44e24addaccbcb755b8d99e7ef29c3)
-rw-r--r-- | jstests/sharding/move_chunk_concurrent_cloning.js | 96 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_runtime_d_params.h | 4 |
2 files changed, 99 insertions, 1 deletions
diff --git a/jstests/sharding/move_chunk_concurrent_cloning.js b/jstests/sharding/move_chunk_concurrent_cloning.js new file mode 100644 index 00000000000..954c2d7cd35 --- /dev/null +++ b/jstests/sharding/move_chunk_concurrent_cloning.js @@ -0,0 +1,96 @@ +/** + * @tags: [featureFlagConcurrencyInChunkMigration] + */ +(function() { +"use strict"; + +load('./jstests/libs/chunk_manipulation_util.js'); + +// For startParallelOps to write its state +let staticMongod = MongoRunner.runMongod({}); + +let st = new ShardingTest({shards: 2}); +st.stopBalancer(); + +const kThreadCount = Math.floor(Math.random() * 31) + 1; +const kPadding = new Array(1024).join("x"); + +let testDB = st.s.getDB('test'); +assert.commandWorked(testDB.adminCommand({enableSharding: 'test'})); +st.ensurePrimaryShard('test', st.shard0.shardName); +assert.commandWorked(testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}})); + +let shardKeyVal = 0; +const kDocsInBatch = 8 * 1000; +const kMinCollSize = 128 * 1024 * 1024; +let approxInsertedSize = 0; +while (approxInsertedSize < kMinCollSize) { + var bulk = testDB.user.initializeUnorderedBulkOp(); + for (let docs = 0; docs < kDocsInBatch; docs++) { + shardKeyVal++; + bulk.insert({_id: shardKeyVal, x: shardKeyVal, padding: kPadding}); + } + assert.commandWorked(bulk.execute()); + + approxInsertedSize = approxInsertedSize + (kDocsInBatch * 1024); +} + +const kInitialLoadFinalKey = shardKeyVal; + +print(`Running tests with migrationConcurrency == ${kThreadCount}`); +st._rs.forEach((replSet) => { + assert.commandWorked(replSet.test.getPrimary().adminCommand( + {setParameter: 1, migrationConcurrency: kThreadCount})); +}); + +const configCollEntry = + st.s.getDB('config').getCollection('collections').findOne({_id: 'test.user'}); +let chunks = st.s.getDB('config').chunks.find({uuid: configCollEntry.uuid}).toArray(); +assert.eq(1, chunks.length, tojson(chunks)); + +let joinMoveChunk = + moveChunkParallel(staticMongod, st.s0.host, {x: 0}, null, 'test.user', st.shard1.shardName); + +// Migration cloning scans by shard key order. Perform some writes against the collection on both +// the lower and upper ends of the shard key values while migration is happening to exercise +// xferMods logic. +const kDeleteIndexOffset = kInitialLoadFinalKey - 3000; +const kUpdateIndexOffset = kInitialLoadFinalKey - 5000; +for (let x = 0; x < 1000; x++) { + assert.commandWorked(testDB.user.remove({x: x})); + assert.commandWorked(testDB.user.update({x: 4000 + x}, {$set: {updated: true}})); + + assert.commandWorked(testDB.user.remove({x: kDeleteIndexOffset + x})); + assert.commandWorked(testDB.user.update({x: kUpdateIndexOffset + x}, {$set: {updated: true}})); + + let newShardKey = kInitialLoadFinalKey + x + 1; + assert.commandWorked(testDB.user.insert({_id: newShardKey, x: newShardKey})); +} + +joinMoveChunk(); + +let shardKeyIdx = 1000; // Index starts at 1k since we deleted the first 1k docs. +let cursor = testDB.user.find().sort({x: 1}); + +while (cursor.hasNext()) { + let next = cursor.next(); + assert.eq(next.x, shardKeyIdx); + + if ((shardKeyIdx >= 4000 && shardKeyIdx < 5000) || + (shardKeyIdx >= kUpdateIndexOffset && shardKeyIdx < (kUpdateIndexOffset + 1000))) { + assert.eq(true, next.updated, tojson(next)); + } + + shardKeyIdx++; + + if (shardKeyIdx == kDeleteIndexOffset) { + shardKeyIdx += 1000; + } +} + +shardKeyIdx--; +assert.eq(shardKeyIdx, kInitialLoadFinalKey + 1000); + +st.stop(); +MongoRunner.stopMongod(staticMongod); +})(); diff --git a/src/mongo/db/s/sharding_runtime_d_params.h b/src/mongo/db/s/sharding_runtime_d_params.h index 190e442a5fb..00dd1fddfed 100644 --- a/src/mongo/db/s/sharding_runtime_d_params.h +++ b/src/mongo/db/s/sharding_runtime_d_params.h @@ -31,6 +31,7 @@ #include "fmt/core.h" #include "mongo/base/status.h" +#include "mongo/db/commands/test_commands_enabled.h" #include "mongo/s/sharding_feature_flags_gen.h" #include "mongo/util/processinfo.h" @@ -43,7 +44,8 @@ inline Status validateMigrationConcurrency(const int& migrationConcurrency) { "concurrency feature flag"}; } int maxConcurrency = ProcessInfo::getNumCores(); - if (migrationConcurrency <= 0 || migrationConcurrency > maxConcurrency) { + if (migrationConcurrency <= 0 || + (migrationConcurrency > maxConcurrency && !getTestCommandsEnabled())) { return Status{ ErrorCodes::InvalidOptions, fmt::format( |