summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRachita Dhawan <rachita.dhawan@gmail.com>2022-12-16 20:48:30 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-19 01:52:53 +0000
commited1c0c635535b5ca7deda5ba3671ede2b1e2d59b (patch)
treef6c9e17e23b1460438d091e1395ed50594f3c086
parentb5dda11fcdb180aecc11312db8059bd6ef9febec (diff)
downloadmongo-ed1c0c635535b5ca7deda5ba3671ede2b1e2d59b.tar.gz
SERVER-72046 Add test for concurrent migration fetching and insertion
(cherry picked from commit ab506f144f44e24addaccbcb755b8d99e7ef29c3)
-rw-r--r--jstests/sharding/move_chunk_concurrent_cloning.js96
-rw-r--r--src/mongo/db/s/sharding_runtime_d_params.h4
2 files changed, 99 insertions, 1 deletions
diff --git a/jstests/sharding/move_chunk_concurrent_cloning.js b/jstests/sharding/move_chunk_concurrent_cloning.js
new file mode 100644
index 00000000000..954c2d7cd35
--- /dev/null
+++ b/jstests/sharding/move_chunk_concurrent_cloning.js
@@ -0,0 +1,96 @@
+/**
+ * @tags: [featureFlagConcurrencyInChunkMigration]
+ */
+(function() {
+"use strict";
+
+load('./jstests/libs/chunk_manipulation_util.js');
+
+// For startParallelOps to write its state
+let staticMongod = MongoRunner.runMongod({});
+
+let st = new ShardingTest({shards: 2});
+st.stopBalancer();
+
+const kThreadCount = Math.floor(Math.random() * 31) + 1;
+const kPadding = new Array(1024).join("x");
+
+let testDB = st.s.getDB('test');
+assert.commandWorked(testDB.adminCommand({enableSharding: 'test'}));
+st.ensurePrimaryShard('test', st.shard0.shardName);
+assert.commandWorked(testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}}));
+
+let shardKeyVal = 0;
+const kDocsInBatch = 8 * 1000;
+const kMinCollSize = 128 * 1024 * 1024;
+let approxInsertedSize = 0;
+while (approxInsertedSize < kMinCollSize) {
+ var bulk = testDB.user.initializeUnorderedBulkOp();
+ for (let docs = 0; docs < kDocsInBatch; docs++) {
+ shardKeyVal++;
+ bulk.insert({_id: shardKeyVal, x: shardKeyVal, padding: kPadding});
+ }
+ assert.commandWorked(bulk.execute());
+
+ approxInsertedSize = approxInsertedSize + (kDocsInBatch * 1024);
+}
+
+const kInitialLoadFinalKey = shardKeyVal;
+
+print(`Running tests with migrationConcurrency == ${kThreadCount}`);
+st._rs.forEach((replSet) => {
+ assert.commandWorked(replSet.test.getPrimary().adminCommand(
+ {setParameter: 1, migrationConcurrency: kThreadCount}));
+});
+
+const configCollEntry =
+ st.s.getDB('config').getCollection('collections').findOne({_id: 'test.user'});
+let chunks = st.s.getDB('config').chunks.find({uuid: configCollEntry.uuid}).toArray();
+assert.eq(1, chunks.length, tojson(chunks));
+
+let joinMoveChunk =
+ moveChunkParallel(staticMongod, st.s0.host, {x: 0}, null, 'test.user', st.shard1.shardName);
+
+// Migration cloning scans by shard key order. Perform some writes against the collection on both
+// the lower and upper ends of the shard key values while migration is happening to exercise
+// xferMods logic.
+const kDeleteIndexOffset = kInitialLoadFinalKey - 3000;
+const kUpdateIndexOffset = kInitialLoadFinalKey - 5000;
+for (let x = 0; x < 1000; x++) {
+ assert.commandWorked(testDB.user.remove({x: x}));
+ assert.commandWorked(testDB.user.update({x: 4000 + x}, {$set: {updated: true}}));
+
+ assert.commandWorked(testDB.user.remove({x: kDeleteIndexOffset + x}));
+ assert.commandWorked(testDB.user.update({x: kUpdateIndexOffset + x}, {$set: {updated: true}}));
+
+ let newShardKey = kInitialLoadFinalKey + x + 1;
+ assert.commandWorked(testDB.user.insert({_id: newShardKey, x: newShardKey}));
+}
+
+joinMoveChunk();
+
+let shardKeyIdx = 1000; // Index starts at 1k since we deleted the first 1k docs.
+let cursor = testDB.user.find().sort({x: 1});
+
+while (cursor.hasNext()) {
+ let next = cursor.next();
+ assert.eq(next.x, shardKeyIdx);
+
+ if ((shardKeyIdx >= 4000 && shardKeyIdx < 5000) ||
+ (shardKeyIdx >= kUpdateIndexOffset && shardKeyIdx < (kUpdateIndexOffset + 1000))) {
+ assert.eq(true, next.updated, tojson(next));
+ }
+
+ shardKeyIdx++;
+
+ if (shardKeyIdx == kDeleteIndexOffset) {
+ shardKeyIdx += 1000;
+ }
+}
+
+shardKeyIdx--;
+assert.eq(shardKeyIdx, kInitialLoadFinalKey + 1000);
+
+st.stop();
+MongoRunner.stopMongod(staticMongod);
+})();
diff --git a/src/mongo/db/s/sharding_runtime_d_params.h b/src/mongo/db/s/sharding_runtime_d_params.h
index 190e442a5fb..00dd1fddfed 100644
--- a/src/mongo/db/s/sharding_runtime_d_params.h
+++ b/src/mongo/db/s/sharding_runtime_d_params.h
@@ -31,6 +31,7 @@
#include "fmt/core.h"
#include "mongo/base/status.h"
+#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/s/sharding_feature_flags_gen.h"
#include "mongo/util/processinfo.h"
@@ -43,7 +44,8 @@ inline Status validateMigrationConcurrency(const int& migrationConcurrency) {
"concurrency feature flag"};
}
int maxConcurrency = ProcessInfo::getNumCores();
- if (migrationConcurrency <= 0 || migrationConcurrency > maxConcurrency) {
+ if (migrationConcurrency <= 0 ||
+ (migrationConcurrency > maxConcurrency && !getTestCommandsEnabled())) {
return Status{
ErrorCodes::InvalidOptions,
fmt::format(