summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-08-08 17:47:33 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-08-12 11:59:59 -0400
commit0c6bbea55ee45db0813043b470246fc62501a38c (patch)
treeb16f142b0ab05e297d42a9f99b3e76b6da280f07
parente0bd06f4aab2995bac09e7ef0b25ba47ad2cb97e (diff)
downloadmongo-0c6bbea55ee45db0813043b470246fc62501a38c.tar.gz
SERVER-25202 Support parallel manual chunk migrations
This change switches the MigrationManager to use the UsedResourcesMap class for acquiring the collection distributed lock on first migration attempt and dropping it on the last. This allows it to support parallel manual migrations.
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_auth.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_auth_audit.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml3
-rw-r--r--jstests/sharding/empty_doc_results.js114
-rw-r--r--jstests/sharding/migration_ignore_interrupts.js39
-rw-r--r--jstests/sharding/movechunk_parallel.js60
-rw-r--r--jstests/sharding/regex_targeting.js502
-rw-r--r--src/mongo/db/namespace_string-inl.h34
-rw-r--r--src/mongo/db/namespace_string.h30
-rw-r--r--src/mongo/db/namespace_string_test.cpp29
-rw-r--r--src/mongo/s/balancer/balancer.cpp56
-rw-r--r--src/mongo/s/balancer/balancer.h4
-rw-r--r--src/mongo/s/balancer/migration_manager.cpp563
-rw-r--r--src/mongo/s/balancer/migration_manager.h237
-rw-r--r--src/mongo/s/balancer/migration_manager_test.cpp114
-rw-r--r--src/mongo/util/concurrency/notification.h16
16 files changed, 897 insertions, 906 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth.yml b/buildscripts/resmokeconfig/suites/sharding_auth.yml
index fed68c5bb9f..83a63e2bd2e 100644
--- a/buildscripts/resmokeconfig/suites/sharding_auth.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_auth.yml
@@ -19,6 +19,7 @@ selector:
- jstests/sharding/migration_with_source_ops.js # SERVER-21713
- jstests/sharding/migration_sets_fromMigrate_flag.js # SERVER-21713
- jstests/sharding/migration_ignore_interrupts.js # SERVER-21713
+ - jstests/sharding/movechunk_parallel.js # SERVER-21713
# TODO: Enable when SERVER-22672 is complete
- jstests/sharding/printShardingStatus.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml
index a48b51abc96..3a85af28f14 100644
--- a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml
@@ -19,6 +19,7 @@ selector:
- jstests/sharding/migration_with_source_ops.js # SERVER-21713
- jstests/sharding/migration_sets_fromMigrate_flag.js # SERVER-21713
- jstests/sharding/migration_ignore_interrupts.js # SERVER-21713
+ - jstests/sharding/movechunk_parallel.js # SERVER-21713
# TODO: Enable when SERVER-22672 is complete
- jstests/sharding/printShardingStatus.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 1eb35b63fc0..fdbaedebda1 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -53,12 +53,13 @@ selector:
- jstests/sharding/collation_targeting_inherited.js
# Behavior change to addShard
- jstests/sharding/addshard_idempotent.js
-
# Uses a failpoint that does not exist in v3.2, so the test is incompatible with a v3.2 shard.
- jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js
- jstests/sharding/cleanup_orphaned_cmd_during_movechunk_hashed.js
- jstests/sharding/migration_failure.js
- jstests/sharding/migration_with_source_ops.js
+ # Parallel migrations are not supported with 3.2 shards
+ - jstests/sharding/movechunk_parallel.js
executor:
js_test:
diff --git a/jstests/sharding/empty_doc_results.js b/jstests/sharding/empty_doc_results.js
index 2038a27c538..8f75d65eb7d 100644
--- a/jstests/sharding/empty_doc_results.js
+++ b/jstests/sharding/empty_doc_results.js
@@ -1,62 +1,60 @@
-//
// Verifies that mongos correctly handles empty documents when all fields are projected out
-//
-
-var options = {mongosOptions: {binVersion: ""}, shardOptions: {binVersion: ""}};
-
-var st = new ShardingTest({shards: 2, other: options});
-
-var mongos = st.s0;
-var coll = mongos.getCollection("foo.bar");
-var admin = mongos.getDB("admin");
-var shards = mongos.getDB("config").shards.find().toArray();
-
-assert.commandWorked(admin.runCommand({enableSharding: coll.getDB().getName()}));
-printjson(admin.runCommand({movePrimary: coll.getDB().getName(), to: shards[0]._id}));
-assert.commandWorked(admin.runCommand({shardCollection: coll.getFullName(), key: {_id: 1}}));
-
-assert.commandWorked(admin.runCommand({split: coll.getFullName(), middle: {_id: 0}}));
-assert.commandWorked(
- admin.runCommand({moveChunk: coll.getFullName(), find: {_id: 0}, to: shards[1]._id}));
-
-st.printShardingStatus();
-
-// Insert 100 documents, half of which have an extra field
-for (var i = -50; i < 50; i++) {
- var doc = {};
- if (i >= 0)
- doc.positiveId = true;
- assert.writeOK(coll.insert(doc));
-}
-
-//
-//
-// Ensure projecting out all fields still returns the same number of documents
-assert.eq(100, coll.find({}).itcount());
-assert.eq(100, coll.find({}).sort({positiveId: 1}).itcount());
-assert.eq(100, coll.find({}, {_id: 0, positiveId: 0}).itcount());
-// Can't remove sort key from projection (SERVER-11877) but some documents will still be empty
-assert.eq(100, coll.find({}, {_id: 0}).sort({positiveId: 1}).itcount());
-
-//
-//
-// Ensure projecting out all fields still returns the same ordering of documents
-var assertLast50Positive = function(sortedDocs) {
- assert.eq(100, sortedDocs.length);
- var positiveCount = 0;
- for (var i = 0; i < sortedDocs.length; ++i) {
- if (sortedDocs[i].positiveId) {
- positiveCount++;
- } else {
- // Make sure only the last set of documents have "positiveId" set
- assert.eq(positiveCount, 0);
- }
+(function() {
+ 'use strict';
+
+ var st = new ShardingTest({shards: 2});
+
+ var mongos = st.s0;
+ var coll = mongos.getCollection("foo.bar");
+ var admin = mongos.getDB("admin");
+ var shards = mongos.getDB("config").shards.find().toArray();
+
+ assert.commandWorked(admin.runCommand({enableSharding: coll.getDB().getName()}));
+ printjson(admin.runCommand({movePrimary: coll.getDB().getName(), to: shards[0]._id}));
+ assert.commandWorked(admin.runCommand({shardCollection: coll.getFullName(), key: {_id: 1}}));
+
+ assert.commandWorked(admin.runCommand({split: coll.getFullName(), middle: {_id: 0}}));
+ assert.commandWorked(
+ admin.runCommand({moveChunk: coll.getFullName(), find: {_id: 0}, to: shards[1]._id}));
+
+ st.printShardingStatus();
+
+ // Insert 100 documents, half of which have an extra field
+ for (var i = -50; i < 50; i++) {
+ var doc = {};
+ if (i >= 0)
+ doc.positiveId = true;
+ assert.writeOK(coll.insert(doc));
}
- assert.eq(positiveCount, 50);
-};
-assertLast50Positive(coll.find({}).sort({positiveId: 1}).toArray());
-assertLast50Positive(coll.find({}, {_id: 0}).sort({positiveId: 1}).toArray());
+ //
+ //
+ // Ensure projecting out all fields still returns the same number of documents
+ assert.eq(100, coll.find({}).itcount());
+ assert.eq(100, coll.find({}).sort({positiveId: 1}).itcount());
+ assert.eq(100, coll.find({}, {_id: 0, positiveId: 0}).itcount());
+ // Can't remove sort key from projection (SERVER-11877) but some documents will still be empty
+ assert.eq(100, coll.find({}, {_id: 0}).sort({positiveId: 1}).itcount());
+
+ //
+ //
+ // Ensure projecting out all fields still returns the same ordering of documents
+ var assertLast50Positive = function(sortedDocs) {
+ assert.eq(100, sortedDocs.length);
+ var positiveCount = 0;
+ for (var i = 0; i < sortedDocs.length; ++i) {
+ if (sortedDocs[i].positiveId) {
+ positiveCount++;
+ } else {
+ // Make sure only the last set of documents have "positiveId" set
+ assert.eq(positiveCount, 0);
+ }
+ }
+ assert.eq(positiveCount, 50);
+ };
+
+ assertLast50Positive(coll.find({}).sort({positiveId: 1}).toArray());
+ assertLast50Positive(coll.find({}, {_id: 0}).sort({positiveId: 1}).toArray());
-jsTest.log("DONE!");
-st.stop(); \ No newline at end of file
+ st.stop();
+})();
diff --git a/jstests/sharding/migration_ignore_interrupts.js b/jstests/sharding/migration_ignore_interrupts.js
index 020771c2275..43b3628ca59 100644
--- a/jstests/sharding/migration_ignore_interrupts.js
+++ b/jstests/sharding/migration_ignore_interrupts.js
@@ -17,8 +17,7 @@ load('./jstests/libs/chunk_manipulation_util.js');
// Shard1:
// Shard2:
- var staticMongod1 = MongoRunner.runMongod({}); // For startParallelOps.
- var staticMongod2 = MongoRunner.runMongod({}); // For startParallelOps.
+ var staticMongod = MongoRunner.runMongod({}); // For startParallelOps.
var st = new ShardingTest({shards: 3});
@@ -75,8 +74,8 @@ load('./jstests/libs/chunk_manipulation_util.js');
// Start a migration between shard0 and shard1 on coll1 and then pause it
pauseMigrateAtStep(shard1, migrateStepNames.deletedPriorDataInRange);
- var joinMoveChunk1 = moveChunkParallel(
- staticMongod1, st.s0.host, {a: 0}, null, coll1.getFullName(), st.shard1.shardName);
+ var joinMoveChunk = moveChunkParallel(
+ staticMongod, st.s0.host, {a: 0}, null, coll1.getFullName(), st.shard1.shardName);
waitForMigrateStep(shard1, migrateStepNames.deletedPriorDataInRange);
assert.commandFailed(
@@ -92,7 +91,7 @@ load('./jstests/libs/chunk_manipulation_util.js');
// Finish migration
unpauseMigrateAtStep(shard1, migrateStepNames.deletedPriorDataInRange);
assert.doesNotThrow(function() {
- joinMoveChunk1();
+ joinMoveChunk();
});
assert.eq(0, shard0Coll1.count());
assert.eq(1, shard1Coll1.count());
@@ -126,8 +125,8 @@ load('./jstests/libs/chunk_manipulation_util.js');
// Start a migration between shard0 and shard1 on coll1, pause in steady state before commit
pauseMoveChunkAtStep(shard0, moveChunkStepNames.reachedSteadyState);
- joinMoveChunk1 = moveChunkParallel(
- staticMongod1, st.s0.host, {a: 0}, null, coll1.getFullName(), st.shard1.shardName);
+ joinMoveChunk = moveChunkParallel(
+ staticMongod, st.s0.host, {a: 0}, null, coll1.getFullName(), st.shard1.shardName);
waitForMoveChunkStep(shard0, moveChunkStepNames.reachedSteadyState);
jsTest.log('Sending false commit command....');
@@ -142,7 +141,7 @@ load('./jstests/libs/chunk_manipulation_util.js');
// Finish migration
unpauseMoveChunkAtStep(shard0, moveChunkStepNames.reachedSteadyState);
assert.doesNotThrow(function() {
- joinMoveChunk1();
+ joinMoveChunk();
});
assert.eq(1, shard0Coll1.count());
assert.eq(1, shard1Coll1.count());
@@ -174,8 +173,8 @@ load('./jstests/libs/chunk_manipulation_util.js');
// check
pauseMigrateAtStep(shard1, migrateStepNames.deletedPriorDataInRange);
pauseMoveChunkAtStep(shard0, moveChunkStepNames.startedMoveChunk);
- joinMoveChunk1 = moveChunkParallel(
- staticMongod1, st.s0.host, {a: 0}, null, coll1.getFullName(), st.shard1.shardName);
+ joinMoveChunk = moveChunkParallel(
+ staticMongod, st.s0.host, {a: 0}, null, coll1.getFullName(), st.shard1.shardName);
waitForMigrateStep(shard1, migrateStepNames.deletedPriorDataInRange);
// Abort migration on donor side, recipient is unaware
@@ -192,13 +191,13 @@ load('./jstests/libs/chunk_manipulation_util.js');
"Failed to abort migration, current running ops: " + tojson(inProgressOps));
unpauseMoveChunkAtStep(shard0, moveChunkStepNames.startedMoveChunk);
assert.throws(function() {
- joinMoveChunk1();
+ joinMoveChunk();
});
// Start coll2 migration to shard2, pause recipient after delete step
pauseMigrateAtStep(shard2, migrateStepNames.deletedPriorDataInRange);
- var joinMoveChunk2 = moveChunkParallel(
- staticMongod2, st.s0.host, {a: 0}, null, coll2.getFullName(), st.shard2.shardName);
+ joinMoveChunk = moveChunkParallel(
+ staticMongod, st.s0.host, {a: 0}, null, coll2.getFullName(), st.shard2.shardName);
waitForMigrateStep(shard2, migrateStepNames.deletedPriorDataInRange);
jsTest.log('Releasing coll1 migration recipient, whose clone command should fail....');
@@ -209,7 +208,7 @@ load('./jstests/libs/chunk_manipulation_util.js');
jsTest.log('Finishing coll2 migration, which should succeed....');
unpauseMigrateAtStep(shard2, migrateStepNames.deletedPriorDataInRange);
assert.doesNotThrow(function() {
- joinMoveChunk2();
+ joinMoveChunk();
});
assert.eq(1,
shard0Coll2.count(),
@@ -241,8 +240,8 @@ load('./jstests/libs/chunk_manipulation_util.js');
// Start coll1 migration to shard1: pause recipient after cloning, donor before interrupt check
pauseMigrateAtStep(shard1, migrateStepNames.cloned);
pauseMoveChunkAtStep(shard0, moveChunkStepNames.startedMoveChunk);
- joinMoveChunk1 = moveChunkParallel(
- staticMongod1, st.s0.host, {a: 0}, null, coll1.getFullName(), st.shard1.shardName);
+ joinMoveChunk = moveChunkParallel(
+ staticMongod, st.s0.host, {a: 0}, null, coll1.getFullName(), st.shard1.shardName);
waitForMigrateStep(shard1, migrateStepNames.cloned);
// Abort migration on donor side, recipient is unaware
@@ -259,13 +258,13 @@ load('./jstests/libs/chunk_manipulation_util.js');
"Failed to abort migration, current running ops: " + tojson(inProgressOps));
unpauseMoveChunkAtStep(shard0, moveChunkStepNames.startedMoveChunk);
assert.throws(function() {
- joinMoveChunk1();
+ joinMoveChunk();
});
// Start coll2 migration to shard2, pause recipient after cloning step
pauseMigrateAtStep(shard2, migrateStepNames.cloned);
- var joinMoveChunk2 = moveChunkParallel(
- staticMongod2, st.s0.host, {a: 0}, null, coll2.getFullName(), st.shard2.shardName);
+ joinMoveChunk = moveChunkParallel(
+ staticMongod, st.s0.host, {a: 0}, null, coll2.getFullName(), st.shard2.shardName);
waitForMigrateStep(shard2, migrateStepNames.cloned);
// Populate donor (shard0) xfermods log.
@@ -284,7 +283,7 @@ load('./jstests/libs/chunk_manipulation_util.js');
jsTest.log('Finishing coll2 migration, which should succeed....');
unpauseMigrateAtStep(shard2, migrateStepNames.cloned);
assert.doesNotThrow(function() {
- joinMoveChunk2();
+ joinMoveChunk();
});
assert.eq(1,
shard0Coll2.count(),
diff --git a/jstests/sharding/movechunk_parallel.js b/jstests/sharding/movechunk_parallel.js
new file mode 100644
index 00000000000..81e39839fe7
--- /dev/null
+++ b/jstests/sharding/movechunk_parallel.js
@@ -0,0 +1,60 @@
+// Ensures that two manual moveChunk commands for the same collection will proceed in parallel so
+// long as they do not touch the same shards
+
+load('./jstests/libs/chunk_manipulation_util.js');
+
+(function() {
+ 'use strict';
+
+ // For startParallelOps to write its state
+ var staticMongod = MongoRunner.runMongod({});
+
+ var st = new ShardingTest({shards: 4});
+
+ assert.commandWorked(st.s0.adminCommand({enableSharding: 'TestDB'}));
+ st.ensurePrimaryShard('TestDB', st.shard0.shardName);
+ assert.commandWorked(st.s0.adminCommand({shardCollection: 'TestDB.TestColl', key: {Key: 1}}));
+
+ var coll = st.s0.getDB('TestDB').TestColl;
+
+ // Create 4 chunks initially
+ assert.writeOK(coll.insert({Key: 1, Value: 'Test value 1'}));
+ assert.writeOK(coll.insert({Key: 10, Value: 'Test value 10'}));
+ assert.writeOK(coll.insert({Key: 20, Value: 'Test value 20'}));
+ assert.writeOK(coll.insert({Key: 30, Value: 'Test value 30'}));
+
+ assert.commandWorked(st.splitAt('TestDB.TestColl', {Key: 10}));
+ assert.commandWorked(st.splitAt('TestDB.TestColl', {Key: 20}));
+ assert.commandWorked(st.splitAt('TestDB.TestColl', {Key: 30}));
+
+ // Move two of the chunks to shard0001 so we have option to do parallel balancing
+ assert.commandWorked(st.moveChunk('TestDB.TestColl', {Key: 20}, st.shard1.shardName));
+ assert.commandWorked(st.moveChunk('TestDB.TestColl', {Key: 30}, st.shard1.shardName));
+
+ assert.eq(2, st.s0.getDB('config').chunks.find({shard: st.shard0.shardName}).itcount());
+ assert.eq(2, st.s0.getDB('config').chunks.find({shard: st.shard1.shardName}).itcount());
+
+ // Pause migrations at shards 2 and 3
+ pauseMigrateAtStep(st.shard2, migrateStepNames.deletedPriorDataInRange);
+ pauseMigrateAtStep(st.shard3, migrateStepNames.deletedPriorDataInRange);
+
+ // Both move chunk operations should proceed
+ var joinMoveChunk1 = moveChunkParallel(
+ staticMongod, st.s0.host, {Key: 10}, null, 'TestDB.TestColl', st.shard2.shardName);
+ var joinMoveChunk2 = moveChunkParallel(
+ staticMongod, st.s0.host, {Key: 30}, null, 'TestDB.TestColl', st.shard3.shardName);
+
+ waitForMigrateStep(st.shard2, migrateStepNames.deletedPriorDataInRange);
+ waitForMigrateStep(st.shard3, migrateStepNames.deletedPriorDataInRange);
+
+ unpauseMigrateAtStep(st.shard2, migrateStepNames.deletedPriorDataInRange);
+ unpauseMigrateAtStep(st.shard3, migrateStepNames.deletedPriorDataInRange);
+
+ joinMoveChunk1();
+ joinMoveChunk2();
+
+ assert.eq(1, st.s0.getDB('config').chunks.find({shard: st.shard0.shardName}).itcount());
+ assert.eq(1, st.s0.getDB('config').chunks.find({shard: st.shard1.shardName}).itcount());
+ assert.eq(1, st.s0.getDB('config').chunks.find({shard: st.shard2.shardName}).itcount());
+ assert.eq(1, st.s0.getDB('config').chunks.find({shard: st.shard3.shardName}).itcount());
+})();
diff --git a/jstests/sharding/regex_targeting.js b/jstests/sharding/regex_targeting.js
index 14eaf0b53de..2a8ca1ad7d5 100644
--- a/jstests/sharding/regex_targeting.js
+++ b/jstests/sharding/regex_targeting.js
@@ -1,252 +1,252 @@
-//
// This checks to make sure that sharded regex queries behave the same as unsharded regex queries
-//
-
-var options = {
- mongosOptions: {binVersion: ""},
- shardOptions: {binVersion: ""},
- configOptions: {binVersion: ""}
-};
-
-var st = new ShardingTest({shards: 2, other: options});
-st.stopBalancer();
-
-var mongos = st.s0;
-var admin = mongos.getDB("admin");
-var shards = mongos.getDB("config").shards.find().toArray();
-
-//
-// Set up multiple collections to target with regex shard keys on two shards
-//
-
-var coll = mongos.getCollection("foo.bar");
-var collSharded = mongos.getCollection("foo.barSharded");
-var collCompound = mongos.getCollection("foo.barCompound");
-var collNested = mongos.getCollection("foo.barNested");
-var collHashed = mongos.getCollection("foo.barHashed");
-
-assert.commandWorked(admin.runCommand({enableSharding: coll.getDB().toString()}));
-st.ensurePrimaryShard(coll.getDB().toString(), shards[0]._id);
-
-//
-// Split the collection so that "abcde-0" and "abcde-1" go on different shards when possible
-//
-
-assert.commandWorked(admin.runCommand({shardCollection: collSharded.toString(), key: {a: 1}}));
-assert.commandWorked(admin.runCommand({split: collSharded.toString(), middle: {a: "abcde-1"}}));
-assert.commandWorked(admin.runCommand(
- {moveChunk: collSharded.toString(), find: {a: 0}, to: shards[1]._id, _waitForDelete: true}));
-
-assert.commandWorked(
- admin.runCommand({shardCollection: collCompound.toString(), key: {a: 1, b: 1}}));
-assert.commandWorked(
- admin.runCommand({split: collCompound.toString(), middle: {a: "abcde-1", b: 0}}));
-assert.commandWorked(admin.runCommand({
- moveChunk: collCompound.toString(),
- find: {a: 0, b: 0},
- to: shards[1]._id,
- _waitForDelete: true
-}));
-
-assert.commandWorked(admin.runCommand({shardCollection: collNested.toString(), key: {'a.b': 1}}));
-assert.commandWorked(admin.runCommand({split: collNested.toString(), middle: {'a.b': "abcde-1"}}));
-assert.commandWorked(admin.runCommand({
- moveChunk: collNested.toString(),
- find: {a: {b: 0}},
- to: shards[1]._id,
- _waitForDelete: true
-}));
-
-assert.commandWorked(
- admin.runCommand({shardCollection: collHashed.toString(), key: {hash: "hashed"}}));
-
-st.printShardingStatus();
-
-//
-//
-// Cannot insert regex _id
-assert.writeError(coll.insert({_id: /regex value/}));
-assert.writeError(collSharded.insert({_id: /regex value/, a: 0}));
-assert.writeError(collCompound.insert({_id: /regex value/, a: 0, b: 0}));
-assert.writeError(collNested.insert({_id: /regex value/, a: {b: 0}}));
-assert.writeError(collHashed.insert({_id: /regex value/, hash: 0}));
-
-//
-//
-// (For now) we can insert a regex shard key
-assert.writeOK(collSharded.insert({a: /regex value/}));
-assert.writeOK(collCompound.insert({a: /regex value/, b: "other value"}));
-assert.writeOK(collNested.insert({a: {b: /regex value/}}));
-assert.writeOK(collHashed.insert({hash: /regex value/}));
-
-//
-//
-// Query by regex should hit all matching keys, across all shards if applicable
-coll.remove({});
-assert.writeOK(coll.insert({a: "abcde-0"}));
-assert.writeOK(coll.insert({a: "abcde-1"}));
-assert.writeOK(coll.insert({a: /abcde.*/}));
-assert.eq(coll.find().itcount(), coll.find({a: /abcde.*/}).itcount());
-
-collSharded.remove({});
-assert.writeOK(collSharded.insert({a: "abcde-0"}));
-assert.writeOK(collSharded.insert({a: "abcde-1"}));
-assert.writeOK(collSharded.insert({a: /abcde.*/}));
-assert.eq(collSharded.find().itcount(), collSharded.find({a: /abcde.*/}).itcount());
-
-collCompound.remove({});
-assert.writeOK(collCompound.insert({a: "abcde-0", b: 0}));
-assert.writeOK(collCompound.insert({a: "abcde-1", b: 0}));
-assert.writeOK(collCompound.insert({a: /abcde.*/, b: 0}));
-assert.eq(collCompound.find().itcount(), collCompound.find({a: /abcde.*/}).itcount());
-
-collNested.remove({});
-assert.writeOK(collNested.insert({a: {b: "abcde-0"}}));
-assert.writeOK(collNested.insert({a: {b: "abcde-1"}}));
-assert.writeOK(collNested.insert({a: {b: /abcde.*/}}));
-assert.eq(collNested.find().itcount(), collNested.find({'a.b': /abcde.*/}).itcount());
-
-collHashed.remove({});
-while (st.shard0.getCollection(collHashed.toString()).count() == 0 ||
- st.shard1.getCollection(collHashed.toString()).count() == 0) {
- assert.writeOK(collHashed.insert({hash: "abcde-" + ObjectId().toString()}));
-}
-assert.writeOK(collHashed.insert({hash: /abcde.*/}));
-assert.eq(collHashed.find().itcount(), collHashed.find({hash: /abcde.*/}).itcount());
-
-//
-//
-// Update by regex should hit all matching keys, across all shards if applicable
-coll.remove({});
-assert.writeOK(coll.insert({a: "abcde-0"}));
-assert.writeOK(coll.insert({a: "abcde-1"}));
-assert.writeOK(coll.insert({a: /abcde.*/}));
-assert.writeOK(coll.update({a: /abcde.*/}, {$set: {updated: true}}, {multi: true}));
-assert.eq(coll.find().itcount(), coll.find({updated: true}).itcount());
-
-collSharded.remove({});
-assert.writeOK(collSharded.insert({a: "abcde-0"}));
-assert.writeOK(collSharded.insert({a: "abcde-1"}));
-assert.writeOK(collSharded.insert({a: /abcde.*/}));
-assert.writeOK(collSharded.update({a: /abcde.*/}, {$set: {updated: true}}, {multi: true}));
-assert.eq(collSharded.find().itcount(), collSharded.find({updated: true}).itcount());
-
-collCompound.remove({});
-assert.writeOK(collCompound.insert({a: "abcde-0", b: 0}));
-assert.writeOK(collCompound.insert({a: "abcde-1", b: 0}));
-assert.writeOK(collCompound.insert({a: /abcde.*/, b: 0}));
-assert.writeOK(collCompound.update({a: /abcde.*/}, {$set: {updated: true}}, {multi: true}));
-assert.eq(collCompound.find().itcount(), collCompound.find({updated: true}).itcount());
-
-collNested.remove({});
-assert.writeOK(collNested.insert({a: {b: "abcde-0"}}));
-assert.writeOK(collNested.insert({a: {b: "abcde-1"}}));
-assert.writeOK(collNested.insert({a: {b: /abcde.*/}}));
-assert.writeOK(collNested.update({'a.b': /abcde.*/}, {$set: {updated: true}}, {multi: true}));
-assert.eq(collNested.find().itcount(), collNested.find({updated: true}).itcount());
-
-collHashed.remove({});
-while (st.shard0.getCollection(collHashed.toString()).count() == 0 ||
- st.shard1.getCollection(collHashed.toString()).count() == 0) {
- assert.writeOK(collHashed.insert({hash: "abcde-" + ObjectId().toString()}));
-}
-assert.writeOK(collHashed.insert({hash: /abcde.*/}));
-assert.writeOK(collHashed.update({hash: /abcde.*/}, {$set: {updated: true}}, {multi: true}));
-assert.eq(collHashed.find().itcount(), collHashed.find({updated: true}).itcount());
-
-//
-//
-// Upsert with op-style regex should fail on sharded collections
-// Query clause is targeted, and regex in query clause is ambiguous
-collSharded.remove({});
-collCompound.remove({});
-collNested.remove({});
-assert.writeError(collSharded.update({a: /abcde.*/}, {$set: {a: /abcde.*/}}, {upsert: true}));
-assert.writeError(
- collCompound.update({a: /abcde.*/}, {$set: {a: /abcde.*/, b: 1}}, {upsert: true}));
-// Exact regex in query never equality
-assert.writeError(
- collNested.update({'a.b': /abcde.*/}, {$set: {'a.b': /abcde.*/}}, {upsert: true}));
-// Even nested regexes are not extracted in queries
-assert.writeError(
- collNested.update({a: {b: /abcde.*/}}, {$set: {'a.b': /abcde.*/}}, {upsert: true}));
-assert.writeError(collNested.update({c: 1}, {$set: {'a.b': /abcde.*/}}, {upsert: true}));
-
-//
-//
-// Upsert by replacement-style regex should succeed on sharded collections
-// Replacement clause is targeted, and regex is unambiguously a value
-collSharded.remove({});
-collCompound.remove({});
-collNested.remove({});
-assert.writeOK(collSharded.update({a: /abcde.*/}, {a: /abcde.*/}, {upsert: true}));
-assert.writeOK(collCompound.update({a: /abcde.*/}, {a: /abcde.*/, b: 1}, {upsert: true}));
-assert.writeOK(collNested.update({'a.b': /abcde.*/}, {a: {b: /abcde.*/}}, {upsert: true}));
-assert.writeOK(collNested.update({a: {b: /abcde.*/}}, {a: {b: /abcde.*/}}, {upsert: true}));
-assert.writeOK(collNested.update({c: 1}, {a: {b: /abcde.*/}}, {upsert: true}));
-
-//
-//
-// Remove by regex should hit all matching keys, across all shards if applicable
-coll.remove({});
-assert.writeOK(coll.insert({a: "abcde-0"}));
-assert.writeOK(coll.insert({a: "abcde-1"}));
-assert.writeOK(coll.insert({a: /abcde.*/}));
-assert.writeOK(coll.remove({a: /abcde.*/}));
-assert.eq(0, coll.find({}).itcount());
-
-collSharded.remove({});
-assert.writeOK(collSharded.insert({a: "abcde-0"}));
-assert.writeOK(collSharded.insert({a: "abcde-1"}));
-assert.writeOK(collSharded.insert({a: /abcde.*/}));
-assert.writeOK(collSharded.remove({a: /abcde.*/}));
-assert.eq(0, collSharded.find({}).itcount());
-
-collCompound.remove({});
-assert.writeOK(collCompound.insert({a: "abcde-0", b: 0}));
-assert.writeOK(collCompound.insert({a: "abcde-1", b: 0}));
-assert.writeOK(collCompound.insert({a: /abcde.*/, b: 0}));
-assert.writeOK(collCompound.remove({a: /abcde.*/}));
-assert.eq(0, collCompound.find({}).itcount());
-
-collNested.remove({});
-assert.writeOK(collNested.insert({a: {b: "abcde-0"}}));
-assert.writeOK(collNested.insert({a: {b: "abcde-1"}}));
-assert.writeOK(collNested.insert({a: {b: /abcde.*/}}));
-assert.writeOK(collNested.remove({'a.b': /abcde.*/}));
-assert.eq(0, collNested.find({}).itcount());
-
-collHashed.remove({});
-while (st.shard0.getCollection(collHashed.toString()).count() == 0 ||
- st.shard1.getCollection(collHashed.toString()).count() == 0) {
- assert.writeOK(collHashed.insert({hash: "abcde-" + ObjectId().toString()}));
-}
-assert.writeOK(collHashed.insert({hash: /abcde.*/}));
-assert.writeOK(collHashed.remove({hash: /abcde.*/}));
-assert.eq(0, collHashed.find({}).itcount());
-
-//
-//
-// Query/Update/Remove by nested regex is different depending on how the nested regex is specified
-coll.remove({});
-assert.writeOK(coll.insert({a: {b: "abcde-0"}}));
-assert.writeOK(coll.insert({a: {b: "abcde-1"}}));
-assert.writeOK(coll.insert({a: {b: /abcde.*/}}));
-assert.eq(1, coll.find({a: {b: /abcde.*/}}).itcount());
-assert.writeOK(coll.update({a: {b: /abcde.*/}}, {$set: {updated: true}}, {multi: true}));
-assert.eq(1, coll.find({updated: true}).itcount());
-assert.writeOK(coll.remove({a: {b: /abcde.*/}}));
-assert.eq(2, coll.find().itcount());
-
-collNested.remove({});
-assert.writeOK(collNested.insert({a: {b: "abcde-0"}}));
-assert.writeOK(collNested.insert({a: {b: "abcde-1"}}));
-assert.writeOK(collNested.insert({a: {b: /abcde.*/}}));
-assert.eq(1, collNested.find({a: {b: /abcde.*/}}).itcount());
-assert.writeOK(collNested.update({a: {b: /abcde.*/}}, {$set: {updated: true}}, {multi: true}));
-assert.eq(1, collNested.find({updated: true}).itcount());
-assert.writeOK(collNested.remove({a: {b: /abcde.*/}}));
-assert.eq(2, collNested.find().itcount());
-
-jsTest.log("DONE!");
-st.stop();
+(function() {
+ 'use strict';
+
+ var st = new ShardingTest({shards: 2});
+
+ var mongos = st.s0;
+ var admin = mongos.getDB("admin");
+ var shards = mongos.getDB("config").shards.find().toArray();
+
+ //
+ // Set up multiple collections to target with regex shard keys on two shards
+ //
+
+ var coll = mongos.getCollection("foo.bar");
+ var collSharded = mongos.getCollection("foo.barSharded");
+ var collCompound = mongos.getCollection("foo.barCompound");
+ var collNested = mongos.getCollection("foo.barNested");
+ var collHashed = mongos.getCollection("foo.barHashed");
+
+ assert.commandWorked(admin.runCommand({enableSharding: coll.getDB().toString()}));
+ st.ensurePrimaryShard(coll.getDB().toString(), shards[0]._id);
+
+ //
+ // Split the collection so that "abcde-0" and "abcde-1" go on different shards when possible
+ //
+
+ assert.commandWorked(admin.runCommand({shardCollection: collSharded.toString(), key: {a: 1}}));
+ assert.commandWorked(admin.runCommand({split: collSharded.toString(), middle: {a: "abcde-1"}}));
+ assert.commandWorked(admin.runCommand({
+ moveChunk: collSharded.toString(),
+ find: {a: 0},
+ to: shards[1]._id,
+ _waitForDelete: true
+ }));
+
+ assert.commandWorked(
+ admin.runCommand({shardCollection: collCompound.toString(), key: {a: 1, b: 1}}));
+ assert.commandWorked(
+ admin.runCommand({split: collCompound.toString(), middle: {a: "abcde-1", b: 0}}));
+ assert.commandWorked(admin.runCommand({
+ moveChunk: collCompound.toString(),
+ find: {a: 0, b: 0},
+ to: shards[1]._id,
+ _waitForDelete: true
+ }));
+
+ assert.commandWorked(
+ admin.runCommand({shardCollection: collNested.toString(), key: {'a.b': 1}}));
+ assert.commandWorked(
+ admin.runCommand({split: collNested.toString(), middle: {'a.b': "abcde-1"}}));
+ assert.commandWorked(admin.runCommand({
+ moveChunk: collNested.toString(),
+ find: {a: {b: 0}},
+ to: shards[1]._id,
+ _waitForDelete: true
+ }));
+
+ assert.commandWorked(
+ admin.runCommand({shardCollection: collHashed.toString(), key: {hash: "hashed"}}));
+
+ st.printShardingStatus();
+
+ //
+ //
+ // Cannot insert regex _id
+ assert.writeError(coll.insert({_id: /regex value/}));
+ assert.writeError(collSharded.insert({_id: /regex value/, a: 0}));
+ assert.writeError(collCompound.insert({_id: /regex value/, a: 0, b: 0}));
+ assert.writeError(collNested.insert({_id: /regex value/, a: {b: 0}}));
+ assert.writeError(collHashed.insert({_id: /regex value/, hash: 0}));
+
+ //
+ //
+ // (For now) we can insert a regex shard key
+ assert.writeOK(collSharded.insert({a: /regex value/}));
+ assert.writeOK(collCompound.insert({a: /regex value/, b: "other value"}));
+ assert.writeOK(collNested.insert({a: {b: /regex value/}}));
+ assert.writeOK(collHashed.insert({hash: /regex value/}));
+
+ //
+ //
+ // Query by regex should hit all matching keys, across all shards if applicable
+ coll.remove({});
+ assert.writeOK(coll.insert({a: "abcde-0"}));
+ assert.writeOK(coll.insert({a: "abcde-1"}));
+ assert.writeOK(coll.insert({a: /abcde.*/}));
+ assert.eq(coll.find().itcount(), coll.find({a: /abcde.*/}).itcount());
+
+ collSharded.remove({});
+ assert.writeOK(collSharded.insert({a: "abcde-0"}));
+ assert.writeOK(collSharded.insert({a: "abcde-1"}));
+ assert.writeOK(collSharded.insert({a: /abcde.*/}));
+ assert.eq(collSharded.find().itcount(), collSharded.find({a: /abcde.*/}).itcount());
+
+ collCompound.remove({});
+ assert.writeOK(collCompound.insert({a: "abcde-0", b: 0}));
+ assert.writeOK(collCompound.insert({a: "abcde-1", b: 0}));
+ assert.writeOK(collCompound.insert({a: /abcde.*/, b: 0}));
+ assert.eq(collCompound.find().itcount(), collCompound.find({a: /abcde.*/}).itcount());
+
+ collNested.remove({});
+ assert.writeOK(collNested.insert({a: {b: "abcde-0"}}));
+ assert.writeOK(collNested.insert({a: {b: "abcde-1"}}));
+ assert.writeOK(collNested.insert({a: {b: /abcde.*/}}));
+ assert.eq(collNested.find().itcount(), collNested.find({'a.b': /abcde.*/}).itcount());
+
+ collHashed.remove({});
+ while (st.shard0.getCollection(collHashed.toString()).count() == 0 ||
+ st.shard1.getCollection(collHashed.toString()).count() == 0) {
+ assert.writeOK(collHashed.insert({hash: "abcde-" + ObjectId().toString()}));
+ }
+ assert.writeOK(collHashed.insert({hash: /abcde.*/}));
+ assert.eq(collHashed.find().itcount(), collHashed.find({hash: /abcde.*/}).itcount());
+
+ //
+ //
+ // Update by regex should hit all matching keys, across all shards if applicable
+ coll.remove({});
+ assert.writeOK(coll.insert({a: "abcde-0"}));
+ assert.writeOK(coll.insert({a: "abcde-1"}));
+ assert.writeOK(coll.insert({a: /abcde.*/}));
+ assert.writeOK(coll.update({a: /abcde.*/}, {$set: {updated: true}}, {multi: true}));
+ assert.eq(coll.find().itcount(), coll.find({updated: true}).itcount());
+
+ collSharded.remove({});
+ assert.writeOK(collSharded.insert({a: "abcde-0"}));
+ assert.writeOK(collSharded.insert({a: "abcde-1"}));
+ assert.writeOK(collSharded.insert({a: /abcde.*/}));
+ assert.writeOK(collSharded.update({a: /abcde.*/}, {$set: {updated: true}}, {multi: true}));
+ assert.eq(collSharded.find().itcount(), collSharded.find({updated: true}).itcount());
+
+ collCompound.remove({});
+ assert.writeOK(collCompound.insert({a: "abcde-0", b: 0}));
+ assert.writeOK(collCompound.insert({a: "abcde-1", b: 0}));
+ assert.writeOK(collCompound.insert({a: /abcde.*/, b: 0}));
+ assert.writeOK(collCompound.update({a: /abcde.*/}, {$set: {updated: true}}, {multi: true}));
+ assert.eq(collCompound.find().itcount(), collCompound.find({updated: true}).itcount());
+
+ collNested.remove({});
+ assert.writeOK(collNested.insert({a: {b: "abcde-0"}}));
+ assert.writeOK(collNested.insert({a: {b: "abcde-1"}}));
+ assert.writeOK(collNested.insert({a: {b: /abcde.*/}}));
+ assert.writeOK(collNested.update({'a.b': /abcde.*/}, {$set: {updated: true}}, {multi: true}));
+ assert.eq(collNested.find().itcount(), collNested.find({updated: true}).itcount());
+
+ collHashed.remove({});
+ while (st.shard0.getCollection(collHashed.toString()).count() == 0 ||
+ st.shard1.getCollection(collHashed.toString()).count() == 0) {
+ assert.writeOK(collHashed.insert({hash: "abcde-" + ObjectId().toString()}));
+ }
+ assert.writeOK(collHashed.insert({hash: /abcde.*/}));
+ assert.writeOK(collHashed.update({hash: /abcde.*/}, {$set: {updated: true}}, {multi: true}));
+ assert.eq(collHashed.find().itcount(), collHashed.find({updated: true}).itcount());
+
+ //
+ //
+ // Upsert with op-style regex should fail on sharded collections
+ // Query clause is targeted, and regex in query clause is ambiguous
+ collSharded.remove({});
+ collCompound.remove({});
+ collNested.remove({});
+ assert.writeError(collSharded.update({a: /abcde.*/}, {$set: {a: /abcde.*/}}, {upsert: true}));
+ assert.writeError(
+ collCompound.update({a: /abcde.*/}, {$set: {a: /abcde.*/, b: 1}}, {upsert: true}));
+ // Exact regex in query never equality
+ assert.writeError(
+ collNested.update({'a.b': /abcde.*/}, {$set: {'a.b': /abcde.*/}}, {upsert: true}));
+ // Even nested regexes are not extracted in queries
+ assert.writeError(
+ collNested.update({a: {b: /abcde.*/}}, {$set: {'a.b': /abcde.*/}}, {upsert: true}));
+ assert.writeError(collNested.update({c: 1}, {$set: {'a.b': /abcde.*/}}, {upsert: true}));
+
+ //
+ //
+ // Upsert by replacement-style regex should succeed on sharded collections
+ // Replacement clause is targeted, and regex is unambiguously a value
+ collSharded.remove({});
+ collCompound.remove({});
+ collNested.remove({});
+ assert.writeOK(collSharded.update({a: /abcde.*/}, {a: /abcde.*/}, {upsert: true}));
+ assert.writeOK(collCompound.update({a: /abcde.*/}, {a: /abcde.*/, b: 1}, {upsert: true}));
+ assert.writeOK(collNested.update({'a.b': /abcde.*/}, {a: {b: /abcde.*/}}, {upsert: true}));
+ assert.writeOK(collNested.update({a: {b: /abcde.*/}}, {a: {b: /abcde.*/}}, {upsert: true}));
+ assert.writeOK(collNested.update({c: 1}, {a: {b: /abcde.*/}}, {upsert: true}));
+
+ //
+ //
+ // Remove by regex should hit all matching keys, across all shards if applicable
+ coll.remove({});
+ assert.writeOK(coll.insert({a: "abcde-0"}));
+ assert.writeOK(coll.insert({a: "abcde-1"}));
+ assert.writeOK(coll.insert({a: /abcde.*/}));
+ assert.writeOK(coll.remove({a: /abcde.*/}));
+ assert.eq(0, coll.find({}).itcount());
+
+ collSharded.remove({});
+ assert.writeOK(collSharded.insert({a: "abcde-0"}));
+ assert.writeOK(collSharded.insert({a: "abcde-1"}));
+ assert.writeOK(collSharded.insert({a: /abcde.*/}));
+ assert.writeOK(collSharded.remove({a: /abcde.*/}));
+ assert.eq(0, collSharded.find({}).itcount());
+
+ collCompound.remove({});
+ assert.writeOK(collCompound.insert({a: "abcde-0", b: 0}));
+ assert.writeOK(collCompound.insert({a: "abcde-1", b: 0}));
+ assert.writeOK(collCompound.insert({a: /abcde.*/, b: 0}));
+ assert.writeOK(collCompound.remove({a: /abcde.*/}));
+ assert.eq(0, collCompound.find({}).itcount());
+
+ collNested.remove({});
+ assert.writeOK(collNested.insert({a: {b: "abcde-0"}}));
+ assert.writeOK(collNested.insert({a: {b: "abcde-1"}}));
+ assert.writeOK(collNested.insert({a: {b: /abcde.*/}}));
+ assert.writeOK(collNested.remove({'a.b': /abcde.*/}));
+ assert.eq(0, collNested.find({}).itcount());
+
+ collHashed.remove({});
+ while (st.shard0.getCollection(collHashed.toString()).count() == 0 ||
+ st.shard1.getCollection(collHashed.toString()).count() == 0) {
+ assert.writeOK(collHashed.insert({hash: "abcde-" + ObjectId().toString()}));
+ }
+ assert.writeOK(collHashed.insert({hash: /abcde.*/}));
+ assert.writeOK(collHashed.remove({hash: /abcde.*/}));
+ assert.eq(0, collHashed.find({}).itcount());
+
+ //
+ //
+ // Query/Update/Remove by nested regex is different depending on how the nested regex is
+ // specified
+ coll.remove({});
+ assert.writeOK(coll.insert({a: {b: "abcde-0"}}));
+ assert.writeOK(coll.insert({a: {b: "abcde-1"}}));
+ assert.writeOK(coll.insert({a: {b: /abcde.*/}}));
+ assert.eq(1, coll.find({a: {b: /abcde.*/}}).itcount());
+ assert.writeOK(coll.update({a: {b: /abcde.*/}}, {$set: {updated: true}}, {multi: true}));
+ assert.eq(1, coll.find({updated: true}).itcount());
+ assert.writeOK(coll.remove({a: {b: /abcde.*/}}));
+ assert.eq(2, coll.find().itcount());
+
+ collNested.remove({});
+ assert.writeOK(collNested.insert({a: {b: "abcde-0"}}));
+ assert.writeOK(collNested.insert({a: {b: "abcde-1"}}));
+ assert.writeOK(collNested.insert({a: {b: /abcde.*/}}));
+ assert.eq(1, collNested.find({a: {b: /abcde.*/}}).itcount());
+ assert.writeOK(collNested.update({a: {b: /abcde.*/}}, {$set: {updated: true}}, {multi: true}));
+ assert.eq(1, collNested.find({updated: true}).itcount());
+ assert.writeOK(collNested.remove({a: {b: /abcde.*/}}));
+ assert.eq(2, collNested.find().itcount());
+
+ st.stop();
+})();
diff --git a/src/mongo/db/namespace_string-inl.h b/src/mongo/db/namespace_string-inl.h
index 05387e4791a..569eddaeb57 100644
--- a/src/mongo/db/namespace_string-inl.h
+++ b/src/mongo/db/namespace_string-inl.h
@@ -155,37 +155,6 @@ inline int nsDBHash(const std::string& ns) {
return hash;
}
-inline bool nsDBEquals(const std::string& a, const std::string& b) {
- for (size_t i = 0; i < a.size(); i++) {
- if (a[i] == '.') {
- // b has to either be done or a '.'
-
- if (b.size() == i)
- return true;
-
- if (b[i] == '.')
- return true;
-
- return false;
- }
-
- // a is another character
- if (b.size() == i)
- return false;
-
- if (b[i] != a[i])
- return false;
- }
-
- // a is done
- // make sure b is done
- if (b.size() == a.size() || b[a.size()] == '.')
- return true;
-
- return false;
-}
-
-/* future : this doesn't need to be an inline. */
inline std::string NamespaceString::getSisterNS(StringData local) const {
verify(local.size() && local[0] != '.');
return db().toString() + "." + local.toString();
@@ -198,4 +167,5 @@ inline std::string NamespaceString::getSystemIndexesCollection() const {
inline std::string NamespaceString::getCommandNS() const {
return db().toString() + ".$cmd";
}
-}
+
+} // namespace mongo
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index bfab52bb92e..00ef04c4249 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -35,6 +35,7 @@
#include <string>
#include "mongo/base/string_data.h"
+#include "mongo/platform/hash_namespace.h"
#include "mongo/util/assert_util.h"
namespace mongo {
@@ -368,31 +369,20 @@ inline bool nsIsDbOnly(StringData ns) {
}
/**
- * NamespaceDBHash and NamespaceDBEquals allow you to do something like
- * unordered_map<std::string,int,NamespaceDBHash,NamespaceDBEquals>
- * and use the full namespace for the string
- * but comparisons are done only on the db piece
- */
-
-/**
* this can change, do not store on disk
*/
int nsDBHash(const std::string& ns);
-bool nsDBEquals(const std::string& a, const std::string& b);
+} // namespace mongo
-struct NamespaceDBHash {
- int operator()(const std::string& ns) const {
- return nsDBHash(ns);
- }
-};
+#include "mongo/db/namespace_string-inl.h"
-struct NamespaceDBEquals {
- bool operator()(const std::string& a, const std::string& b) const {
- return nsDBEquals(a, b);
+MONGO_HASH_NAMESPACE_START
+template <>
+struct hash<mongo::NamespaceString> {
+ size_t operator()(const mongo::NamespaceString& nss) const {
+ mongo::NamespaceString::Hasher hasher;
+ return hasher(nss);
}
};
-
-} // namespace mongo
-
-#include "mongo/db/namespace_string-inl.h"
+MONGO_HASH_NAMESPACE_END
diff --git a/src/mongo/db/namespace_string_test.cpp b/src/mongo/db/namespace_string_test.cpp
index 501c6d14e2d..db44e5b3148 100644
--- a/src/mongo/db/namespace_string_test.cpp
+++ b/src/mongo/db/namespace_string_test.cpp
@@ -202,35 +202,6 @@ TEST(NamespaceStringTest, DBHash) {
ASSERT_NOT_EQUALS(nsDBHash("foo.d"), nsDBHash("food"));
}
-#define testEqualsBothWays(X, Y) \
- ASSERT_TRUE(nsDBEquals((X), (Y))); \
- ASSERT_TRUE(nsDBEquals((Y), (X)));
-#define testNotEqualsBothWays(X, Y) \
- ASSERT_FALSE(nsDBEquals((X), (Y))); \
- ASSERT_FALSE(nsDBEquals((Y), (X)));
-
-TEST(NamespaceStringTest, DBEquals) {
- testEqualsBothWays("foo", "foo");
- testEqualsBothWays("foo", "foo.a");
- testEqualsBothWays("foo.a", "foo.a");
- testEqualsBothWays("foo.a", "foo.b");
-
- testEqualsBothWays("", "");
- testEqualsBothWays("", ".");
- testEqualsBothWays("", ".x");
-
- testNotEqualsBothWays("foo", "bar");
- testNotEqualsBothWays("foo", "food");
- testNotEqualsBothWays("foo.", "food");
-
- testNotEqualsBothWays("", "x");
- testNotEqualsBothWays("", "x.");
- testNotEqualsBothWays("", "x.y");
- testNotEqualsBothWays(".", "x");
- testNotEqualsBothWays(".", "x.");
- testNotEqualsBothWays(".", "x.y");
-}
-
TEST(NamespaceStringTest, nsToDatabase1) {
ASSERT_EQUALS("foo", nsToDatabaseSubstring("foo.bar"));
ASSERT_EQUALS("foo", nsToDatabaseSubstring("foo"));
diff --git a/src/mongo/s/balancer/balancer.cpp b/src/mongo/s/balancer/balancer.cpp
index 870e7e92d7d..b21213d7f0c 100644
--- a/src/mongo/s/balancer/balancer.cpp
+++ b/src/mongo/s/balancer/balancer.cpp
@@ -44,7 +44,6 @@
#include "mongo/s/balancer/balancer_chunk_selection_policy_impl.h"
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/balancer/cluster_statistics_impl.h"
-#include "mongo/s/balancer/migration_manager.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard.h"
@@ -238,15 +237,13 @@ Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& ch
return refreshStatus;
}
- MigrationManager migrationManager;
- auto migrationStatuses = migrationManager.scheduleMigrations(
- txn,
- {MigrationManager::MigrationRequest(std::move(*migrateInfo),
- balancerConfig->getMaxChunkSizeBytes(),
- balancerConfig->getSecondaryThrottle(),
- balancerConfig->waitForDelete())});
-
- invariant(migrationStatuses.size() == 1);
+ // Wait for the migration to complete
+ Status migrationStatus =
+ _migrationManager.scheduleManualMigration(txn,
+ *migrateInfo,
+ balancerConfig->getMaxChunkSizeBytes(),
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete());
auto scopedCMStatus = ScopedChunkManager::getExisting(txn, NamespaceString(chunk.getNS()));
if (!scopedCMStatus.isOK()) {
@@ -257,7 +254,7 @@ Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& ch
ChunkManager* const cm = scopedCM.cm();
cm->reload(txn);
- return migrationStatuses.begin()->second;
+ return migrationStatus;
}
Status Balancer::moveSingleChunk(OperationContext* txn,
@@ -271,15 +268,13 @@ Status Balancer::moveSingleChunk(OperationContext* txn,
return moveAllowedStatus;
}
- MigrationManager migrationManager;
- auto migrationStatuses = migrationManager.scheduleMigrations(
- txn,
- {MigrationManager::MigrationRequest(MigrateInfo(chunk.getNS(), newShardId, chunk),
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete)});
-
- invariant(migrationStatuses.size() == 1);
+ // Wait for the migration to complete
+ Status migrationStatus =
+ _migrationManager.scheduleManualMigration(txn,
+ MigrateInfo(chunk.getNS(), newShardId, chunk),
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete);
auto scopedCMStatus = ScopedChunkManager::getExisting(txn, NamespaceString(chunk.getNS()));
if (!scopedCMStatus.isOK()) {
@@ -290,7 +285,7 @@ Status Balancer::moveSingleChunk(OperationContext* txn,
ChunkManager* const cm = scopedCM.cm();
cm->reload(txn);
- return migrationStatuses.begin()->second;
+ return migrationStatus;
}
void Balancer::report(OperationContext* txn, BSONObjBuilder* builder) {
@@ -555,22 +550,15 @@ int Balancer::_moveChunks(OperationContext* txn,
return 0;
}
- // Schedule all migrations in parallel
- MigrationManager migrationManager;
-
- MigrationManager::MigrationRequestVector migrationRequests;
-
- for (const auto& migrateInfo : candidateChunks) {
- migrationRequests.emplace_back(migrateInfo,
- balancerConfig->getMaxChunkSizeBytes(),
- balancerConfig->getSecondaryThrottle(),
- balancerConfig->waitForDelete());
- }
+ auto migrationStatuses =
+ _migrationManager.executeMigrationsForAutoBalance(txn,
+ candidateChunks,
+ balancerConfig->getMaxChunkSizeBytes(),
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete());
int numChunksProcessed = 0;
- auto migrationStatuses = migrationManager.scheduleMigrations(txn, std::move(migrationRequests));
-
for (const auto& migrationStatusEntry : migrationStatuses) {
const Status& status = migrationStatusEntry.second;
if (status.isOK()) {
diff --git a/src/mongo/s/balancer/balancer.h b/src/mongo/s/balancer/balancer.h
index 9a8da493fa2..4c7dfe6b66b 100644
--- a/src/mongo/s/balancer/balancer.h
+++ b/src/mongo/s/balancer/balancer.h
@@ -30,6 +30,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/s/balancer/balancer_chunk_selection_policy.h"
+#include "mongo/s/balancer/migration_manager.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
@@ -216,6 +217,9 @@ private:
// Balancer policy. Depends on the cluster statistics instance above so it should be created
// after it and destroyed before it.
std::unique_ptr<BalancerChunkSelectionPolicy> _chunkSelectionPolicy;
+
+ // Migration manager used to schedule and manage migrations
+ MigrationManager _migrationManager;
};
} // namespace mongo
diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp
index f4c9dfe03ca..d015d3c6a44 100644
--- a/src/mongo/s/balancer/migration_manager.cpp
+++ b/src/mongo/s/balancer/migration_manager.cpp
@@ -36,349 +36,364 @@
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/remote_command_targeter.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/executor/task_executor.h"
+#include "mongo/db/client.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/move_chunk_request.h"
#include "mongo/s/sharding_raii.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/log.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
-namespace {
-
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
+using std::shared_ptr;
+using std::vector;
+using str::stream;
+
+namespace {
const char kChunkTooBig[] = "chunkTooBig";
-} // namespace
+/**
+ * Parses the specified asynchronous command response and converts it to status to use as outcome of
+ * an asynchronous migration command. In particular it is necessary in order to preserve backwards
+ * compatibility with 3.2 and earlier, where the move chunk command instead of returning a
+ * ChunkTooBig status includes an extra field in the response.
+ */
+Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandResponse& response) {
+ if (!response.isOK()) {
+ return response.status;
+ }
-MigrationManager::MigrationRequest::MigrationRequest(
- MigrateInfo inMigrateInfo,
- uint64_t inMaxChunkSizeBytes,
- MigrationSecondaryThrottleOptions inSecondaryThrottle,
- bool inWaitForDelete)
- : migrateInfo(std::move(inMigrateInfo)),
- maxChunkSizeBytes(inMaxChunkSizeBytes),
- secondaryThrottle(std::move(inSecondaryThrottle)),
- waitForDelete(inWaitForDelete) {}
-
-MigrationManager::Migration::Migration(MigrationRequest migrationRequest)
- : chunkInfo(std::move(migrationRequest)) {}
-
-void MigrationManager::Migration::setCallbackHandle(
- executor::TaskExecutor::CallbackHandle callbackHandle) {
- invariant(!moveChunkCallbackHandle);
- moveChunkCallbackHandle = std::move(callbackHandle);
-}
+ Status commandStatus = getStatusFromCommandResult(response.data);
-void MigrationManager::Migration::clearCallbackHandle() {
- moveChunkCallbackHandle = boost::none;
+ if (!commandStatus.isOK()) {
+ bool chunkTooBig = false;
+ bsonExtractBooleanFieldWithDefault(response.data, kChunkTooBig, false, &chunkTooBig);
+ if (chunkTooBig) {
+ commandStatus = {ErrorCodes::ChunkTooBig, commandStatus.reason()};
+ }
+ }
+
+ return commandStatus;
}
-MigrationManager::DistLockTracker::DistLockTracker(
- boost::optional<DistLockManager::ScopedDistLock> distlock)
- : distributedLock(std::move(distlock)) {
- if (distlock) {
- migrationCounter = 1;
- } else {
- migrationCounter = 0;
+/**
+ * Blocking call to acquire the distributed collection lock for the specified namespace.
+ */
+StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn, const NamespaceString& nss) {
+ const std::string whyMessage(stream() << "Migrating chunk(s) in collection " << nss.ns());
+
+ auto statusWithDistLockHandle =
+ Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lockWithSessionID(
+ txn, nss.ns(), whyMessage, OID::gen(), DistLockManager::kSingleLockAttemptTimeout);
+
+ if (!statusWithDistLockHandle.isOK()) {
+ // If we get LockBusy while trying to acquire the collection distributed lock, this implies
+ // that a concurrent collection operation is running either on a 3.2 shard or on mongos.
+ // Convert it to ConflictingOperationInProgress to better indicate the error.
+ //
+ // In addition, the code which re-schedules parallel migrations serially for 3.2 shard
+ // compatibility uses the LockBusy code as a hint to do the reschedule.
+ const ErrorCodes::Error code = (statusWithDistLockHandle == ErrorCodes::LockBusy
+ ? ErrorCodes::ConflictingOperationInProgress
+ : statusWithDistLockHandle.getStatus().code());
+
+ return {code,
+ stream() << "Could not acquire collection lock for " << nss.ns()
+ << " to migrate chunks, due to "
+ << statusWithDistLockHandle.getStatus().reason()};
}
+
+ return std::move(statusWithDistLockHandle.getValue());
}
+} // namespace
+
MigrationManager::MigrationManager() = default;
MigrationManager::~MigrationManager() {
// The migration manager must be completely quiesced at destruction time
- invariant(_activeMigrations.empty());
- invariant(_rescheduledMigrations.empty());
- invariant(_distributedLocks.empty());
+ invariant(_activeMigrationsWithoutDistLock.empty());
}
-MigrationStatuses MigrationManager::scheduleMigrations(OperationContext* txn,
- MigrationRequestVector candidateMigrations) {
- invariant(_activeMigrations.empty());
+MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
+ OperationContext* txn,
+ const vector<MigrateInfo>& migrateInfos,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+
+ vector<std::pair<shared_ptr<Notification<Status>>, MigrateInfo>> responses;
+
+ for (const auto& migrateInfo : migrateInfos) {
+ responses.emplace_back(_schedule(txn,
+ migrateInfo,
+ false, // Config server takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete),
+ migrateInfo);
+ }
MigrationStatuses migrationStatuses;
- for (auto& migrationRequest : candidateMigrations) {
- _activeMigrations.emplace_back(std::move(migrationRequest));
+ vector<MigrateInfo> rescheduledMigrations;
+
+ // Wait for all the scheduled migrations to complete and note the ones, which failed with a
+ // LockBusy error code. These need to be executed serially, without the distributed lock being
+ // held by the config server for backwards compatibility with 3.2 shards.
+ for (auto& response : responses) {
+ auto notification = std::move(response.first);
+ auto migrateInfo = std::move(response.second);
+
+ Status responseStatus = notification->get();
+
+ if (responseStatus == ErrorCodes::LockBusy) {
+ rescheduledMigrations.emplace_back(std::move(migrateInfo));
+ } else {
+ migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus));
+ }
+ }
+
+ // Schedule all 3.2 compatibility migrations sequentially
+ for (const auto& migrateInfo : rescheduledMigrations) {
+ Status responseStatus = _schedule(txn,
+ migrateInfo,
+ true, // Shard takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete)
+ ->get();
+
+ migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus));
}
- _executeMigrations(txn, &migrationStatuses);
+ invariant(migrationStatuses.size() == migrateInfos.size());
return migrationStatuses;
}
-void MigrationManager::_executeMigrations(OperationContext* txn,
- MigrationStatuses* migrationStatuses) {
- for (auto& migration : _activeMigrations) {
- const NamespaceString nss(migration.chunkInfo.migrateInfo.ns);
-
- const auto& migrateInfo = migration.chunkInfo.migrateInfo;
-
- auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss);
- if (!scopedCMStatus.isOK()) {
- // Unable to find the ChunkManager for "nss" for whatever reason; abandon this
- // migration and proceed to the next.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- migrationStatuses->emplace(migrateInfo.getName(),
- std::move(scopedCMStatus.getStatus()));
- continue;
- }
+Status MigrationManager::scheduleManualMigration(
+ OperationContext* txn,
+ const MigrateInfo& migrateInfo,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+ return _schedule(txn,
+ migrateInfo,
+ false, // Config server takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete)
+ ->get();
+}
- ChunkManager* const chunkManager = scopedCMStatus.getValue().cm();
-
- auto chunk =
- chunkManager->findIntersectingChunkWithSimpleCollation(txn, migrateInfo.minKey);
- invariant(chunk);
-
- // If the chunk is not found exactly as requested, the caller must have stale data
- if (chunk->getMin() != migrateInfo.minKey || chunk->getMax() != migrateInfo.maxKey) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- migrationStatuses->emplace(
- migrateInfo.getName(),
- Status(ErrorCodes::IncompatibleShardingMetadata,
- str::stream()
- << "Chunk "
- << ChunkRange(migrateInfo.minKey, migrateInfo.maxKey).toString()
- << " does not exist."));
- continue;
- }
+shared_ptr<Notification<Status>> MigrationManager::_schedule(
+ OperationContext* txn,
+ const MigrateInfo& migrateInfo,
+ bool shardTakesCollectionDistLock,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+ const NamespaceString nss(migrateInfo.ns);
+
+ // Sanity checks that the chunk being migrated is actually valid. These will be repeated at the
+ // shard as well, but doing them here saves an extra network call, which might otherwise fail.
+ auto statusWithScopedChunkManager = ScopedChunkManager::getExisting(txn, nss);
+ if (!statusWithScopedChunkManager.isOK()) {
+ return std::make_shared<Notification<Status>>(
+ std::move(statusWithScopedChunkManager.getStatus()));
+ }
- // If chunk is already on the correct shard, just treat the operation as success
- if (chunk->getShardId() == migrateInfo.to) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- migrationStatuses->emplace(migrateInfo.getName(), Status::OK());
- continue;
- }
+ ChunkManager* const chunkManager = statusWithScopedChunkManager.getValue().cm();
- {
- // No need to lock the mutex. Only this function and _takeDistLockForAMigration
- // manipulate "_distributedLocks". No need to protect serial actions.
- if (!_takeDistLockForAMigration(txn, migration, migrationStatuses)) {
- // If there is a lock conflict between the balancer and the shard, or a shard and a
- // shard, the migration has been rescheduled. Otherwise an attempt to take the lock
- // failed for whatever reason and this migration is being abandoned.
- continue;
- }
- }
+ auto chunk = chunkManager->findIntersectingChunkWithSimpleCollation(txn, migrateInfo.minKey);
+ invariant(chunk);
- const MigrationRequest& migrationRequest = migration.chunkInfo;
-
- BSONObjBuilder builder;
- MoveChunkRequest::appendAsCommand(
- &builder,
- nss,
- chunkManager->getVersion(),
- Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(),
- migrationRequest.migrateInfo.from,
- migrationRequest.migrateInfo.to,
- ChunkRange(chunk->getMin(), chunk->getMax()),
- migrationRequest.maxChunkSizeBytes,
- migrationRequest.secondaryThrottle,
- migrationRequest.waitForDelete,
- migration.oldShard ? true : false); // takeDistLock flag.
-
- BSONObj moveChunkRequestObj = builder.obj();
-
- const auto recipientShard =
- grid.shardRegistry()->getShard(txn, migration.chunkInfo.migrateInfo.from);
- const auto host = recipientShard->getTargeter()->findHost(
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
- if (!host.isOK()) {
- // Unable to find a target shard for whatever reason; abandon this migration and proceed
- // to the next.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- migrationStatuses->insert(MigrationStatuses::value_type(
- migration.chunkInfo.migrateInfo.getName(), std::move(host.getStatus())));
- continue;
- }
+ // If the chunk is not found exactly as requested, the caller must have stale data
+ if (chunk->getMin() != migrateInfo.minKey || chunk->getMax() != migrateInfo.maxKey) {
+ return std::make_shared<Notification<Status>>(Status(
+ ErrorCodes::IncompatibleShardingMetadata,
+ stream() << "Chunk " << ChunkRange(migrateInfo.minKey, migrateInfo.maxKey).toString()
+ << " does not exist."));
+ }
- RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj, txn);
-
- executor::TaskExecutor* executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
-
- StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus =
- executor->scheduleRemoteCommand(remoteRequest,
- stdx::bind(&MigrationManager::_checkMigrationCallback,
- this,
- stdx::placeholders::_1,
- txn,
- &migration,
- migrationStatuses));
-
- if (!callbackHandleWithStatus.isOK()) {
- // Scheduling the migration moveChunk failed.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- migrationStatuses->insert(
- MigrationStatuses::value_type(migration.chunkInfo.migrateInfo.getName(),
- std::move(callbackHandleWithStatus.getStatus())));
- continue;
- }
+ // If chunk is already on the correct shard, just treat the operation as success
+ if (chunk->getShardId() == migrateInfo.to) {
+ return std::make_shared<Notification<Status>>(Status::OK());
+ }
+
+ const auto recipientShard = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from);
+ auto hostStatus = recipientShard->getTargeter()->findHost(
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
+ if (!hostStatus.isOK()) {
+ return std::make_shared<Notification<Status>>(std::move(hostStatus.getStatus()));
+ }
- // The moveChunk command was successfully scheduled. Store the callback handle so that the
- // command's return can be waited for later.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- migration.setCallbackHandle(std::move(callbackHandleWithStatus.getValue()));
+ BSONObjBuilder builder;
+ MoveChunkRequest::appendAsCommand(
+ &builder,
+ nss,
+ chunkManager->getVersion(),
+ Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(),
+ migrateInfo.from,
+ migrateInfo.to,
+ ChunkRange(migrateInfo.minKey, migrateInfo.maxKey),
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete,
+ shardTakesCollectionDistLock);
+
+ Migration migration(nss, builder.obj());
+
+ auto retVal = migration.completionNotification;
+
+ if (shardTakesCollectionDistLock) {
+ _scheduleWithoutDistLock(txn, hostStatus.getValue(), std::move(migration));
+ } else {
+ _scheduleWithDistLock(txn, hostStatus.getValue(), std::move(migration));
}
- _waitForMigrations(txn);
- // At this point, there are no parallel running threads so it is safe not to lock the mutex.
+ return retVal;
+}
- // All the migrations have returned, release all of the distributed locks that are no longer
- // being used.
- _distributedLocks.clear();
+void MigrationManager::_scheduleWithDistLock(OperationContext* txn,
+ const HostAndPort& targetHost,
+ Migration migration) {
+ const NamespaceString nss(migration.nss);
- // If there are rescheduled migrations, move them to active and run the function again.
- if (!_rescheduledMigrations.empty()) {
- // Clear all the callback handles of the rescheduled migrations.
- for (auto& migration : _rescheduledMigrations) {
- migration.clearCallbackHandle();
+ executor::TaskExecutor* const executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ auto it = _activeMigrationsWithDistLock.find(nss);
+ if (it == _activeMigrationsWithDistLock.end()) {
+ // Acquire the collection distributed lock (blocking call)
+ auto distLockHandleStatus = acquireDistLock(txn, nss);
+ if (!distLockHandleStatus.isOK()) {
+ migration.completionNotification->set(distLockHandleStatus.getStatus());
+ return;
}
- _activeMigrations = std::move(_rescheduledMigrations);
- _rescheduledMigrations.clear();
- _executeMigrations(txn, migrationStatuses);
- } else {
- _activeMigrations.clear();
+ it = _activeMigrationsWithDistLock
+ .insert(std::make_pair(
+ nss, CollectionMigrationsState(std::move(distLockHandleStatus.getValue()))))
+ .first;
}
-}
-void MigrationManager::_checkMigrationCallback(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackArgs,
- OperationContext* txn,
- Migration* migration,
- MigrationStatuses* migrationStatuses) {
- const auto& remoteCommandResponse = callbackArgs.response;
-
- if (!remoteCommandResponse.isOK()) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- migrationStatuses->insert(MigrationStatuses::value_type(
- migration->chunkInfo.migrateInfo.getName(), std::move(remoteCommandResponse.status)));
- return;
- }
+ auto collectionMigrationState = &it->second;
+
+ // Add ourselves to the list of migrations on this collection so we can call completeMigration
+ // both in the scheduleRemoteCommand and in the callback failure cases
+ auto itMigration = collectionMigrationState->addMigration(std::move(migration));
+
+ const RemoteCommandRequest remoteRequest(
+ targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, txn);
+
+ StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus =
+ executor->scheduleRemoteCommand(
+ remoteRequest,
+ [this, collectionMigrationState, itMigration](
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
+ Client::initThread(getThreadName().c_str());
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ auto txn = cc().makeOperationContext();
- Status commandStatus = getStatusFromCommandResult(remoteCommandResponse.data);
+ const NamespaceString nss(itMigration->nss);
- if (commandStatus == ErrorCodes::LockBusy && !migration->oldShard) {
- migration->oldShard = true;
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _rescheduleMigration(*migration);
+ if (collectionMigrationState->completeMigration(
+ itMigration,
+ extractMigrationStatusFromRemoteCommandResponse(args.response))) {
+ Grid::get(txn.get())->catalogClient(txn.get())->getDistLockManager()->unlock(
+ txn.get(), collectionMigrationState->getDistLockHandle());
+ _activeMigrationsWithDistLock.erase(nss);
+ }
+ });
+
+ if (callbackHandleWithStatus.isOK()) {
+ itMigration->callbackHandle = std::move(callbackHandleWithStatus.getValue());
return;
}
- // This extra parsing below is in order to preserve backwards compatibility with 3.2 and
- // earlier, where the move chunk command instead of returning a ChunkTooBig status includes an
- // extra field in the response.
- if (!commandStatus.isOK()) {
- bool chunkTooBig = false;
- bsonExtractBooleanFieldWithDefault(
- remoteCommandResponse.data, kChunkTooBig, false, &chunkTooBig);
- if (chunkTooBig) {
- commandStatus = {ErrorCodes::ChunkTooBig, commandStatus.reason()};
- }
+ if (collectionMigrationState->completeMigration(
+ itMigration, std::move(callbackHandleWithStatus.getStatus()))) {
+ Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock(
+ txn, collectionMigrationState->getDistLockHandle());
+ _activeMigrationsWithDistLock.erase(nss);
}
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- migrationStatuses->insert(MigrationStatuses::value_type(
- migration->chunkInfo.migrateInfo.getName(), std::move(commandStatus)));
}
-void MigrationManager::_waitForMigrations(OperationContext* txn) const {
- executor::TaskExecutor* executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
- for (const auto& migration : _activeMigrations) {
- // Block until the command is carried out.
- if (migration.moveChunkCallbackHandle) {
- executor->wait(migration.moveChunkCallbackHandle.get());
- }
+void MigrationManager::_scheduleWithoutDistLock(OperationContext* txn,
+ const HostAndPort& targetHost,
+ Migration migration) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ auto itMigration = _activeMigrationsWithoutDistLock.emplace(
+ _activeMigrationsWithoutDistLock.begin(), std::move(migration));
+
+ executor::TaskExecutor* const executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
+
+ const RemoteCommandRequest remoteRequest(
+ targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, txn);
+
+ StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus =
+ executor->scheduleRemoteCommand(
+ remoteRequest,
+ [this, itMigration](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ itMigration->completionNotification->set(
+ extractMigrationStatusFromRemoteCommandResponse(args.response));
+ _activeMigrationsWithoutDistLock.erase(itMigration);
+ });
+
+ if (!callbackHandleWithStatus.isOK()) {
+ itMigration->completionNotification->set(std::move(callbackHandleWithStatus.getStatus()));
+ _activeMigrationsWithoutDistLock.erase(itMigration);
+ return;
}
+
+ itMigration->callbackHandle = std::move(callbackHandleWithStatus.getValue());
}
-void MigrationManager::_rescheduleMigration(const Migration& migration) {
- _rescheduledMigrations.push_back(migration);
+MigrationManager::Migration::Migration(NamespaceString inNss, BSONObj inMoveChunkCmdObj)
+ : nss(std::move(inNss)),
+ moveChunkCmdObj(std::move(inMoveChunkCmdObj)),
+ completionNotification(std::make_shared<Notification<Status>>()) {}
+
+MigrationManager::Migration::~Migration() {
+ invariant(completionNotification);
}
-bool MigrationManager::_takeDistLockForAMigration(OperationContext* txn,
- const Migration& migration,
- MigrationStatuses* migrationStatuses) {
- auto it = _distributedLocks.find(migration.chunkInfo.migrateInfo.ns);
-
- if (it == _distributedLocks.end()) {
- // Neither the balancer nor the shard has the distributed collection lock for "ns".
- if (migration.oldShard) {
- DistLockTracker distLockTracker(boost::none);
- _distributedLocks.insert(std::map<std::string, DistLockTracker>::value_type(
- migration.chunkInfo.migrateInfo.ns, std::move(distLockTracker)));
- } else {
- auto distlock = _getDistLock(txn, migration);
- if (!distlock.isOK()) {
- // Abandon the migration so the balancer doesn't reschedule endlessly if whatever is
- // preventing the distlock from being acquired doesn't go away.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- migrationStatuses->insert(MigrationStatuses::value_type(
- migration.chunkInfo.migrateInfo.getName(), std::move(distlock.getStatus())));
- return false;
- }
- DistLockTracker distLockTracker(std::move(distlock.getValue()));
- _distributedLocks.insert(std::map<std::string, DistLockTracker>::value_type(
- migration.chunkInfo.migrateInfo.ns, std::move(distLockTracker)));
- }
- } else {
- DistLockTracker* distLockTracker = &(it->second);
- if (!distLockTracker->distributedLock) {
- // Lock conflict. A shard holds the lock for a different migration.
- invariant(distLockTracker->migrationCounter == 0 && !distLockTracker->distributedLock);
- _rescheduleMigration(migration);
- return false;
- } else {
- invariant(distLockTracker->distributedLock && distLockTracker->migrationCounter > 0);
- if (migration.oldShard) {
- // Lock conflict. The balancer holds the lock, so the shard cannot take it yet.
- _rescheduleMigration(migration);
- return false;
- } else {
- ++(distLockTracker->migrationCounter);
- }
- }
- }
+MigrationManager::CollectionMigrationsState::CollectionMigrationsState(
+ DistLockHandle distLockHandle)
+ : _distLockHandle(std::move(distLockHandle)) {}
- return true;
+MigrationManager::CollectionMigrationsState::~CollectionMigrationsState() {
+ invariant(_migrations.empty());
}
-StatusWith<DistLockManager::ScopedDistLock> MigrationManager::_getDistLock(
- OperationContext* txn, const Migration& migration) {
- const std::string whyMessage(str::stream() << "migrating chunk "
- << ChunkRange(migration.chunkInfo.migrateInfo.minKey,
- migration.chunkInfo.migrateInfo.maxKey)
- .toString()
- << " in "
- << migration.chunkInfo.migrateInfo.ns);
-
- StatusWith<DistLockManager::ScopedDistLock> distLockStatus =
- Grid::get(txn)->catalogClient(txn)->distLock(
- txn, migration.chunkInfo.migrateInfo.ns, whyMessage);
-
- if (!distLockStatus.isOK()) {
- const std::string msg = str::stream()
- << "Could not acquire collection lock for " << migration.chunkInfo.migrateInfo.ns
- << " to migrate chunk " << redact(ChunkRange(migration.chunkInfo.migrateInfo.minKey,
- migration.chunkInfo.migrateInfo.maxKey)
- .toString())
- << " due to " << distLockStatus.getStatus().toString();
- warning() << msg;
- return {distLockStatus.getStatus().code(), msg};
- }
+MigrationManager::MigrationsList::iterator
+MigrationManager::CollectionMigrationsState::addMigration(Migration migration) {
+ return _migrations.emplace(_migrations.begin(), std::move(migration));
+}
+
+bool MigrationManager::CollectionMigrationsState::completeMigration(MigrationsList::iterator it,
+ Status status) {
+ it->completionNotification->set(status);
+
+ _migrations.erase(it);
- return std::move(distLockStatus.getValue());
+ return _migrations.empty();
}
} // namespace mongo
diff --git a/src/mongo/s/balancer/migration_manager.h b/src/mongo/s/balancer/migration_manager.h
index 76b31cbdd30..6a89fbd3a5a 100644
--- a/src/mongo/s/balancer/migration_manager.h
+++ b/src/mongo/s/balancer/migration_manager.h
@@ -28,17 +28,27 @@
#pragma once
+#include <list>
+#include <map>
+#include <unordered_map>
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/balancer/balancer_policy.h"
#include "mongo/s/catalog/dist_lock_manager.h"
#include "mongo/s/migration_secondary_throttle_options.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/notification.h"
namespace mongo {
class OperationContext;
class Status;
-class MigrationSecondaryThrottleOptions;
+template <typename T>
+class StatusWith;
// Uniquely identifies a migration, regardless of shard and version.
typedef std::string MigrationIdentifier;
@@ -57,153 +67,156 @@ public:
~MigrationManager();
/**
- * Encapsulates a migration request along with its parameters
+ * A blocking method that attempts to schedule all the migrations specified in
+ * "candidateMigrations" and wait for them to complete. Takes the distributed lock for each
+ * collection with a chunk being migrated.
+ *
+ * If any of the migrations, which were scheduled in parallel fails with a LockBusy error
+ * reported from the shard, retries it serially without the distributed lock.
+ *
+ * Returns a map of migration Status objects to indicate the success/failure of each migration.
*/
- struct MigrationRequest {
- public:
- MigrationRequest(MigrateInfo migrateInfo,
- uint64_t maxChunkSizeBytes,
- MigrationSecondaryThrottleOptions secondaryThrottle,
- bool waitForDelete);
-
- MigrateInfo migrateInfo;
- uint64_t maxChunkSizeBytes;
- MigrationSecondaryThrottleOptions secondaryThrottle;
- bool waitForDelete;
- };
-
- using MigrationRequestVector = std::vector<MigrationRequest>;
+ MigrationStatuses executeMigrationsForAutoBalance(
+ OperationContext* txn,
+ const std::vector<MigrateInfo>& migrateInfos,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete);
/**
- * A blocking method that attempts to schedule all the migrations specified in
- * "candidateMigrations". Takes the distributed lock for each collection with a chunk being
+ * A blocking method that attempts to schedule the migration specified in "migrateInfo" and
+ * waits for it to complete. Takes the distributed lock for the namespace which is being
* migrated.
*
- * Returns a map of migration Status objects to indicate the success/failure of each migration.
+ * Returns the status of the migration.
*/
- MigrationStatuses scheduleMigrations(OperationContext* txn,
- MigrationRequestVector candidateMigrations);
+ Status scheduleManualMigration(OperationContext* txn,
+ const MigrateInfo& migrateInfo,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete);
private:
/**
- * Holds the data associated with an ongoing migration. Stores a callback handle for the
- * moveChunk command when one is scheduled. Also holds a flag that indicates the source shard is
- * v3.2 and must take the distributed lock itself.
+ * Tracks the execution state of a single migration.
*/
struct Migration {
- explicit Migration(MigrationRequest migrationRequest);
+ Migration(NamespaceString nss, BSONObj moveChunkCmdObj);
+ ~Migration();
- void setCallbackHandle(executor::TaskExecutor::CallbackHandle callbackHandle);
- void clearCallbackHandle();
+ // Namespace for which this migration applies
+ NamespaceString nss;
- // Migration request
- MigrationRequest chunkInfo;
+ // Command object representing the migration
+ BSONObj moveChunkCmdObj;
- // Callback handle for the active moveChunk request. If no migration is active for the chunk
- // specified in "chunkInfo", this won't be set.
- boost::optional<executor::TaskExecutor::CallbackHandle> moveChunkCallbackHandle;
+ // Callback handle for the migration network request. If the migration has not yet been sent
+ // on the network, this value is not set.
+ boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle;
- // Indicates that the first moveChunk request failed with LockBusy. The second attempt must
- // be made without the balancer holding the collection distlock. This is necessary for
- // compatibility with a v3.2 shard, which expects to take the distlock itself.
- bool oldShard{false};
+ // Notification, which will be signaled when the migration completes
+ std::shared_ptr<Notification<Status>> completionNotification;
};
+ // Used as a type in which to store a list of active migrations. The reason to choose list is
+ // that its iterators do not get invalidated when entries are removed around them. This allows
+ // O(1) removal time.
+ using MigrationsList = std::list<Migration>;
+
/**
- * Manages and maintains a collection distlock, which should normally be held by the balancer,
- * but in the case of a migration with a v3.2 source shard the balancer must release it in order
- * to allow the shard to acquire it.
+ * Contains the runtime state for a single collection. This class does not have concurrency
+ * control of its own and relies on the migration manager's mutex.
*/
- struct DistLockTracker {
- DistLockTracker(boost::optional<DistLockManager::ScopedDistLock> distlock);
-
- // Holds the distributed lock, if the balancer should hold it for the migration. If this is
- // empty, then a shard has the distlock.
- boost::optional<DistLockManager::ScopedDistLock> distributedLock;
-
- // The number of migrations that are currently using the balancer held distributed lock.
- int migrationCounter;
+ class CollectionMigrationsState {
+ public:
+ CollectionMigrationsState(DistLockHandle distLockHandle);
+ ~CollectionMigrationsState();
+
+ /**
+ * Registers a new migration with this state tracker. Must be followed by a call to
+ * completeMigration with the returned handle.
+ */
+ MigrationsList::iterator addMigration(Migration migration);
+
+ /**
+ * Must be called exactly once, as a follow-up to an addMigration call, with the iterator
+ * returned from it. Removes the specified migration entry from the migrations list and sets
+ * its notification status.
+ *
+ * Returns true if this is the last migration for this collection, in which case it is the
+ * caller's responsibility to free the collection distributed lock and get rid of the object
+ * by removing it from the owning map.
+ */
+ bool completeMigration(MigrationsList::iterator it, Status status);
+
+ /**
+ * Retrieves the dist lock handle corresponding to the dist lock held for this collection.
+ */
+ const DistLockHandle& getDistLockHandle() const {
+ return _distLockHandle;
+ }
+
+ private:
+ // Dist lock handle, which should be released at destruction time
+ DistLockHandle _distLockHandle;
+
+ // Contains a set of migrations which are currently active for this namespace.
+ MigrationsList _migrations;
};
- /**
- * Blocking function that schedules all the migrations prepared in "_activeMigrations" and then
- * waits for them all to complete. This is also where the distributed locks are taken. Some
- * migrations may be rescheduled for a recursive call of this function if there are distributed
- * lock conflicts. A lock conflict can occur when:
- * 1) The source shard of a migration is v3.2 and expects to take the lock itself and the
- * balancer already holds it for a different migration.
- * 2) A v3.2 shard already has the distlock, so it isn't free for either the balancer to
- * take or another v3.2 shard.
- * All lock conflicts are resolved by waiting for all of the scheduled migrations to complete,
- * at which point all the locks are safely released.
- *
- * All the moveChunk command Status results are placed in "migrationStatuses".
- */
- void _executeMigrations(OperationContext* txn,
- std::map<MigrationIdentifier, Status>* migrationStatuses);
+ using CollectionMigrationsStateMap =
+ std::unordered_map<NamespaceString, CollectionMigrationsState>;
/**
- * Callback function that checks a remote command response for errors. If there is a LockBusy
- * error, the first time this happens the shard starting the migration is assumed to be v3.2 and
- * is marked such and rescheduled. On other errors, the migration is abandoned. Places all of
- * the Status results from the moveChunk commands in "migrationStatuses".
+ * Optionally takes the collection distributed lock and schedules a chunk migration with the
+ * specified parameters. May block for distributed lock acquisition. If dist lock acquisition is
+ * successful (or not done), schedules the migration request and returns a notification which
+ * can be used to obtain the outcome of the operation.
+ *
+ * The 'shardTakesCollectionDistLock' parameter controls whether the distributed lock is
+ * acquired by the migration manager or by the shard executing the migration request.
*/
- void _checkMigrationCallback(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackArgs,
+ std::shared_ptr<Notification<Status>> _schedule(
OperationContext* txn,
- Migration* migration,
- std::map<MigrationIdentifier, Status>* migrationStatuses);
-
- /**
- * Goes through the callback handles in "_activeMigrations" and waits for the moveChunk commands
- * to return.
- */
- void _waitForMigrations(OperationContext* txn) const;
+ const MigrateInfo& migrateInfo,
+ bool shardTakesCollectionDistLock,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete);
/**
- * Adds "migration" to "_rescheduledMigrations" vector.
- */
- void _rescheduleMigration(const Migration& migration);
-
- /**
- * Attempts to take a distlock for collection "ns", if appropriate. It may
- * 1) Take the distlock for the balancer and initialize the counter to 1.
- * 2) Increment the counter on the distlock that the balancer already holds.
- * 3) Initialize the counter to 0 to indicate a migration with a v3.2 shard, where the shard
- * will take the distlock.
+ * Acquires the collection distributed lock for the specified namespace and if it succeeds,
+ * schedules the migration.
*
- * If none of these actions are possible because of a lock conflict (shard can't take the lock
- * if the balancer already holds it, or vice versa) or if the lock is unavailable, returns
- * false to indicate that the migration cannot proceed right now. If the lock could not be
- * taken because of a lock conflict as described, then the migration is rescheduled; otherwise
- * it is abandoned.
- *
- * If an attempt to acquire the distributed lock fails and the migration is abandoned, the error
- * Status is placed in "migrationStatuses".
+ * The distributed lock is acquired before scheduling the first migration for the collection and
+ * is only released when all active migrations on the collection have finished.
*/
- bool _takeDistLockForAMigration(OperationContext* txn,
- const Migration& migration,
- MigrationStatuses* migrationStatuses);
+ void _scheduleWithDistLock(OperationContext* txn,
+ const HostAndPort& targetHost,
+ Migration migration);
/**
- * Attempts to acquire the distributed collection lock necessary required for "migration".
+ * Immediately schedules the specified migration without attempting to acquire the collection
+ * distributed lock or checking that it is not being held.
+ *
+ * This method is only used for retrying migrations that have failed with LockBusy errors
+ * returned by the shard, which only happens with legacy 3.2 shards that take the collection
+ * distributed lock themselves.
*/
- StatusWith<DistLockManager::ScopedDistLock> _getDistLock(OperationContext* txn,
- const Migration& migration);
+ void _scheduleWithoutDistLock(OperationContext* txn,
+ const HostAndPort& targetHost,
+ Migration migration);
- // Protects class variables when migrations run in parallel.
+ // Protects the class state below
stdx::mutex _mutex;
- // Holds information about each ongoing migration.
- std::vector<Migration> _activeMigrations;
-
- // Temporary container for migrations that must be rescheduled. After all of the
- // _activeMigrations are finished, this variable is used to reset _activeMigrations before
- // executing migrations again.
- std::vector<Migration> _rescheduledMigrations;
+ // Holds information about each collection's distributed lock and active migrations via a
+ // CollectionMigrationState object.
+ CollectionMigrationsStateMap _activeMigrationsWithDistLock;
- // Manages the distributed locks and whether the balancer or shard holds them.
- std::map<std::string, DistLockTracker> _distributedLocks;
+ // Holds information about migrations, which have been scheduled without the collection
+ // distributed lock acquired (i.e., the shard is asked to acquire it).
+ MigrationsList _activeMigrationsWithoutDistLock;
};
} // namespace mongo
diff --git a/src/mongo/s/balancer/migration_manager_test.cpp b/src/mongo/s/balancer/migration_manager_test.cpp
index 1bf8311ea89..0c8ee51ebf7 100644
--- a/src/mongo/s/balancer/migration_manager_test.cpp
+++ b/src/mongo/s/balancer/migration_manager_test.cpp
@@ -47,8 +47,7 @@ namespace {
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
-using MigrationRequest = MigrationManager::MigrationRequest;
-using MigrationRequestVector = MigrationManager::MigrationRequestVector;
+using std::vector;
const auto kShardId0 = ShardId("shard0");
const auto kShardId1 = ShardId("shard1");
@@ -252,11 +251,8 @@ TEST_F(MigrationManagerTest, OneCollectionTwoMigrations) {
setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
// Going to request that these two chunks get migrated.
- const MigrationRequestVector migrationRequests{
- MigrationRequest(
- MigrateInfo(chunk1.getNS(), kShardId1, chunk1), 0, kDefaultSecondaryThrottle, false),
- MigrationRequest(
- MigrateInfo(chunk2.getNS(), kShardId3, chunk2), 0, kDefaultSecondaryThrottle, false)};
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
auto future = launchAsync([this, migrationRequests] {
Client::initThreadIfNotAlready("Test");
@@ -267,11 +263,11 @@ TEST_F(MigrationManagerTest, OneCollectionTwoMigrations) {
shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
- MigrationStatuses migrationStatuses =
- _migrationManager->scheduleMigrations(txn.get(), migrationRequests);
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
for (const auto& migrateInfo : migrationRequests) {
- ASSERT_OK(migrationStatuses.at(migrateInfo.migrateInfo.getName()));
+ ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
}
});
@@ -315,23 +311,10 @@ TEST_F(MigrationManagerTest, TwoCollectionsTwoMigrationsEach) {
setUpChunk(collName2, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version2);
// Going to request that these four chunks get migrated.
- const MigrationRequestVector migrationRequests{
- MigrationRequest(MigrateInfo(chunk1coll1.getNS(), kShardId1, chunk1coll1),
- 0,
- kDefaultSecondaryThrottle,
- false),
- MigrationRequest(MigrateInfo(chunk2coll1.getNS(), kShardId3, chunk2coll1),
- 0,
- kDefaultSecondaryThrottle,
- false),
- MigrationRequest(MigrateInfo(chunk1coll2.getNS(), kShardId1, chunk1coll2),
- 0,
- kDefaultSecondaryThrottle,
- false),
- MigrationRequest(MigrateInfo(chunk2coll2.getNS(), kShardId3, chunk2coll2),
- 0,
- kDefaultSecondaryThrottle,
- false)};
+ const std::vector<MigrateInfo> migrationRequests{{chunk1coll1.getNS(), kShardId1, chunk1coll1},
+ {chunk2coll1.getNS(), kShardId3, chunk2coll1},
+ {chunk1coll2.getNS(), kShardId1, chunk1coll2},
+ {chunk2coll2.getNS(), kShardId3, chunk2coll2}};
auto future = launchAsync([this, migrationRequests] {
Client::initThreadIfNotAlready("Test");
@@ -342,11 +325,11 @@ TEST_F(MigrationManagerTest, TwoCollectionsTwoMigrationsEach) {
shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
- MigrationStatuses migrationStatuses =
- _migrationManager->scheduleMigrations(txn.get(), migrationRequests);
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
for (const auto& migrateInfo : migrationRequests) {
- ASSERT_OK(migrationStatuses.at(migrateInfo.migrateInfo.getName()));
+ ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
}
});
@@ -386,11 +369,8 @@ TEST_F(MigrationManagerTest, SameCollectionOldShardMigration) {
setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
// Going to request that these two chunks get migrated.
- const MigrationRequestVector migrationRequests{
- MigrationRequest(
- MigrateInfo(chunk1.getNS(), kShardId1, chunk1), 0, kDefaultSecondaryThrottle, false),
- MigrationRequest(
- MigrateInfo(chunk2.getNS(), kShardId3, chunk2), 0, kDefaultSecondaryThrottle, false)};
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
auto future = launchAsync([this, migrationRequests] {
Client::initThreadIfNotAlready("Test");
@@ -401,11 +381,11 @@ TEST_F(MigrationManagerTest, SameCollectionOldShardMigration) {
shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
- MigrationStatuses migrationStatuses =
- _migrationManager->scheduleMigrations(txn.get(), migrationRequests);
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
for (const auto& migrateInfo : migrationRequests) {
- ASSERT_OK(migrationStatuses.at(migrateInfo.migrateInfo.getName()));
+ ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
}
});
@@ -443,8 +423,7 @@ TEST_F(MigrationManagerTest, SameOldShardFailsToAcquireDistributedLockTwice) {
setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
// Going to request that this chunk get migrated.
- const MigrationRequestVector migrationRequests{MigrationRequest(
- MigrateInfo(chunk1.getNS(), kShardId1, chunk1), 0, kDefaultSecondaryThrottle, false)};
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}};
auto future = launchAsync([this, migrationRequests] {
Client::initThreadIfNotAlready("Test");
@@ -454,12 +433,11 @@ TEST_F(MigrationManagerTest, SameOldShardFailsToAcquireDistributedLockTwice) {
// Set up a dummy host for the source shard.
shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
- MigrationStatuses migrationStatuses =
- _migrationManager->scheduleMigrations(txn.get(), migrationRequests);
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
for (const auto& migrateInfo : migrationRequests) {
- ASSERT_EQ(ErrorCodes::LockBusy,
- migrationStatuses.at(migrateInfo.migrateInfo.getName()));
+ ASSERT_EQ(ErrorCodes::LockBusy, migrationStatuses.at(migrateInfo.getName()));
}
});
@@ -504,11 +482,8 @@ TEST_F(MigrationManagerTest, SameCollectionTwoOldShardMigrations) {
setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
// Going to request that these two chunks get migrated.
- const MigrationRequestVector migrationRequests{
- MigrationRequest(
- MigrateInfo(chunk1.getNS(), kShardId1, chunk1), 0, kDefaultSecondaryThrottle, false),
- MigrationRequest(
- MigrateInfo(chunk2.getNS(), kShardId3, chunk2), 0, kDefaultSecondaryThrottle, false)};
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
auto future = launchAsync([this, migrationRequests] {
Client::initThreadIfNotAlready("Test");
@@ -519,11 +494,11 @@ TEST_F(MigrationManagerTest, SameCollectionTwoOldShardMigrations) {
shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
- MigrationStatuses migrationStatuses =
- _migrationManager->scheduleMigrations(txn.get(), migrationRequests);
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
for (const auto& migrateInfo : migrationRequests) {
- ASSERT_OK(migrationStatuses.at(migrateInfo.migrateInfo.getName()));
+ ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
}
});
@@ -571,22 +546,23 @@ TEST_F(MigrationManagerTest, FailToAcquireDistributedLock) {
setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
// Going to request that these two chunks get migrated.
- const MigrationRequestVector migrationRequests{
- MigrationRequest(
- MigrateInfo(chunk1.getNS(), kShardId1, chunk1), 0, kDefaultSecondaryThrottle, false),
- MigrationRequest(
- MigrateInfo(chunk2.getNS(), kShardId3, chunk2), 0, kDefaultSecondaryThrottle, false)};
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
+
+ shardTargeterMock(operationContext(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(operationContext(), kShardId2)->setFindHostReturnValue(kShardHost2);
// Take the distributed lock for the collection before scheduling via the MigrationManager.
const std::string whyMessage("FailToAcquireDistributedLock unit-test taking distributed lock");
DistLockManager::ScopedDistLock distLockStatus = unittest::assertGet(
catalogClient()->distLock(operationContext(), chunk1.getNS(), whyMessage));
- MigrationStatuses migrationStatuses =
- _migrationManager->scheduleMigrations(operationContext(), migrationRequests);
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ operationContext(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
for (const auto& migrateInfo : migrationRequests) {
- ASSERT_EQ(ErrorCodes::LockBusy, migrationStatuses.at(migrateInfo.migrateInfo.getName()));
+ ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress,
+ migrationStatuses.at(migrateInfo.getName()));
}
}
@@ -615,11 +591,8 @@ TEST_F(MigrationManagerTest, SourceShardNotFound) {
setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
// Going to request that these two chunks get migrated.
- const MigrationRequestVector migrationRequests{
- MigrationRequest(
- MigrateInfo(chunk1.getNS(), kShardId1, chunk1), 0, kDefaultSecondaryThrottle, false),
- MigrationRequest(
- MigrateInfo(chunk2.getNS(), kShardId3, chunk2), 0, kDefaultSecondaryThrottle, false)};
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
auto future = launchAsync([this, chunk1, chunk2, migrationRequests] {
Client::initThreadIfNotAlready("Test");
@@ -632,8 +605,8 @@ TEST_F(MigrationManagerTest, SourceShardNotFound) {
->setFindHostReturnValue(
Status(ErrorCodes::ReplicaSetNotFound, "SourceShardNotFound generated error."));
- MigrationStatuses migrationStatuses =
- _migrationManager->scheduleMigrations(txn.get(), migrationRequests);
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
ASSERT_OK(migrationStatuses.at(chunk1.getName()));
ASSERT_EQ(ErrorCodes::ReplicaSetNotFound, migrationStatuses.at(chunk2.getName()));
@@ -664,8 +637,7 @@ TEST_F(MigrationManagerTest, JumboChunkResponseBackwardsCompatibility) {
setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
// Going to request that this chunk gets migrated.
- const MigrationRequestVector migrationRequests{MigrationRequest(
- MigrateInfo(chunk1.getNS(), kShardId1, chunk1), 0, kDefaultSecondaryThrottle, false)};
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}};
auto future = launchAsync([this, chunk1, migrationRequests] {
Client::initThreadIfNotAlready("Test");
@@ -675,8 +647,8 @@ TEST_F(MigrationManagerTest, JumboChunkResponseBackwardsCompatibility) {
// up a dummy host for kShardHost0.
shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
- MigrationStatuses migrationStatuses =
- _migrationManager->scheduleMigrations(txn.get(), migrationRequests);
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
ASSERT_EQ(ErrorCodes::ChunkTooBig, migrationStatuses.at(chunk1.getName()));
});
diff --git a/src/mongo/util/concurrency/notification.h b/src/mongo/util/concurrency/notification.h
index b34871ad3f4..538012cb257 100644
--- a/src/mongo/util/concurrency/notification.h
+++ b/src/mongo/util/concurrency/notification.h
@@ -46,11 +46,19 @@ class OperationContext;
template <class T>
class Notification {
public:
+ Notification() = default;
+
+ /**
+ * Creates a notification object, which has already been set. Calls to any of the getters will
+ * return immediately.
+ */
+ explicit Notification(T value) : _value(value) {}
+
/**
* Returns true if the notification has been set (i.e., the call to get/waitFor would not
* block).
*/
- explicit operator bool() const {
+ explicit operator bool() {
stdx::unique_lock<stdx::mutex> lock(_mutex);
return !!_value;
}
@@ -100,8 +108,8 @@ public:
}
private:
- mutable stdx::mutex _mutex;
- mutable stdx::condition_variable _condVar;
+ stdx::mutex _mutex;
+ stdx::condition_variable _condVar;
// Protected by mutex and only moves from not-set to set once
boost::optional<T> _value{boost::none};
@@ -110,7 +118,7 @@ private:
template <>
class Notification<void> {
public:
- explicit operator bool() const {
+ explicit operator bool() {
return _notification.operator bool();
}