summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_misc.yml3
-rw-r--r--jstests/multiVersion/shard_collection_between_mixed_version_mongods.js62
-rw-r--r--jstests/sharding/add_and_remove_shard_from_zone.js40
-rw-r--r--jstests/sharding/add_shard_to_zone.js22
-rw-r--r--jstests/sharding/remove_shard_from_zone.js28
-rw-r--r--jstests/sharding/shard_collection_verify_initial_chunks.js65
-rw-r--r--src/mongo/db/s/config/configsvr_shard_collection_command.cpp74
-rw-r--r--src/mongo/db/s/config/initial_split_policy.cpp117
-rw-r--r--src/mongo/db/s/config/initial_split_policy.h13
-rw-r--r--src/mongo/db/s/config/initial_split_policy_test.cpp8
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h25
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp25
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp503
-rw-r--r--src/mongo/db/s/shardsvr_shard_collection.cpp29
14 files changed, 706 insertions, 308 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_misc.yml b/buildscripts/resmokeconfig/suites/sharding_misc.yml
index 93551faabbd..ee21cfff768 100644
--- a/buildscripts/resmokeconfig/suites/sharding_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_misc.yml
@@ -347,7 +347,7 @@ selector:
- jstests/sharding/mongos_query_comment.js
- jstests/sharding/kill_op_overflow.js
- jstests/sharding/mongos_local_explain.js
- - jstests/sharding/add_shard_to_zone.js
+ - jstests/sharding/add_and_remove_shard_from_zone.js
- jstests/sharding/authwhere.js
- jstests/sharding/authmr.js
- jstests/sharding/sessions_collection_auto_healing.js
@@ -355,7 +355,6 @@ selector:
- jstests/sharding/read_does_not_create_namespaces.js
- jstests/sharding/ismaster.js
- jstests/sharding/balancer_shell_commands.js
- - jstests/sharding/remove_shard_from_zone.js
- jstests/sharding/forget_mr_temp_ns.js
- jstests/sharding/auth_sharding_cmd_metadata.js
- jstests/sharding/autodiscover_config_rs_from_secondary.js
diff --git a/jstests/multiVersion/shard_collection_between_mixed_version_mongods.js b/jstests/multiVersion/shard_collection_between_mixed_version_mongods.js
new file mode 100644
index 00000000000..cd895bf1916
--- /dev/null
+++ b/jstests/multiVersion/shard_collection_between_mixed_version_mongods.js
@@ -0,0 +1,62 @@
+//
+// Testing shardCollection between 3.6 and latest mongod versions for both config servers and
+// shards.
+//
+
+load("./jstests/multiVersion/libs/verify_versions.js");
+
+(function() {
+ "use strict";
+
+ var options = {
+ shards: [{binVersion: "latest"}, {binVersion: "3.6"}, {binVersion: "3.6"}],
+ mongos: 1,
+ other: {
+ mongosOptions: {binVersion: "latest"},
+ configOptions: {binVersion: "latest"},
+ shardAsReplicaSet: true
+ }
+ };
+
+ var st = new ShardingTest(options);
+ assert.binVersion(st.shard0, "latest");
+ assert.binVersion(st.shard1, "3.6");
+ assert.binVersion(st.shard2, "3.6");
+ assert.binVersion(st.s0, "latest");
+
+ var mongos = st.s0;
+ var admin = mongos.getDB('admin');
+
+ const kDBOnShardWithLatestBinary = "DBWithPrimaryOnLatestBinary";
+ const kNSOnLatestShard = kDBOnShardWithLatestBinary + ".Coll";
+ const kDBOnShardWithOldBinary = "DBWithPrimaryOnOldBinary";
+ const kNSOnOldShard = kDBOnShardWithOldBinary + ".Coll";
+
+ assert.commandWorked(admin.runCommand({enableSharding: kDBOnShardWithLatestBinary}));
+ assert.commandWorked(admin.runCommand({enableSharding: kDBOnShardWithOldBinary}));
+ st.ensurePrimaryShard(kDBOnShardWithLatestBinary, st.shard0.shardName);
+ st.ensurePrimaryShard(kDBOnShardWithOldBinary, st.shard1.shardName);
+
+ // Test that shardCollection succeeds when both the config server and primary shard are
+ // running with latest binVersion, but other shards are running with 4.0.1 which does not
+ // have the new shardCollection protocol.
+ assert.commandWorked(admin.runCommand({shardCollection: kNSOnLatestShard, key: {a: 1}}));
+
+ // Test that shardCollection succeeds when the config server is running with the latest
+ // binVersion, but the primary is running with 4.0.1.
+ assert.commandWorked(admin.runCommand({shardCollection: kNSOnOldShard, key: {a: 1}}));
+
+ mongos.getDB(kDBOnShardWithLatestBinary).Coll.drop();
+ mongos.getDB(kDBOnShardWithOldBinary).Coll.drop();
+
+ // Test that shardCollection with a hashed shard key succeeds when both the config server and
+ // primary shard are running with latest binVersion, but other shards are running with 4.0.1
+ // which does not have the new shardCollection protocol.
+ assert.commandWorked(admin.runCommand({shardCollection: kNSOnLatestShard, key: {a: "hashed"}}));
+
+ // Test that shardCollection with a hashed shard key succeeds when the config server is running
+ // with the latest binVersion, but the primary is running with 4.0.1.
+ assert.commandWorked(admin.runCommand({shardCollection: kNSOnOldShard, key: {a: "hashed"}}));
+
+ st.stop();
+})();
diff --git a/jstests/sharding/add_and_remove_shard_from_zone.js b/jstests/sharding/add_and_remove_shard_from_zone.js
new file mode 100644
index 00000000000..d4773597259
--- /dev/null
+++ b/jstests/sharding/add_and_remove_shard_from_zone.js
@@ -0,0 +1,40 @@
+/**
+ * Basic integration tests for addShardToZone command. More detailed tests can be found
+ * in sharding_catalog_add_shard_to_zone_test.cpp.
+ */
+(function() {
+ 'use strict';
+
+ let st = new ShardingTest({shards: 1});
+ let mongos = st.s0;
+
+ let config = mongos.getDB('config');
+ var shardName = st.shard0.shardName;
+
+ // Test adding shard with no zone to a new zone.
+ assert.commandWorked(mongos.adminCommand({addShardToZone: shardName, zone: 'x'}));
+ var shardDoc = config.shards.findOne();
+ assert.eq(['x'], shardDoc.tags);
+
+ // Test adding zone to a shard with existing zones.
+ assert.commandWorked(mongos.adminCommand({addShardToZone: shardName, zone: 'y'}));
+ shardDoc = config.shards.findOne();
+ assert.eq(['x', 'y'], shardDoc.tags);
+
+ // Test removing shard from existing zone.
+ assert.commandWorked(mongos.adminCommand({removeShardFromZone: shardName, zone: 'x'}));
+ shardDoc = config.shards.findOne();
+ assert.eq(['y'], shardDoc.tags);
+
+ // Test removing shard from zone that no longer exists.
+ assert.commandWorked(mongos.adminCommand({removeShardFromZone: shardName, zone: 'x'}));
+ shardDoc = config.shards.findOne();
+ assert.eq(['y'], shardDoc.tags);
+
+ // Test removing the last zone from a shard
+ assert.commandWorked(mongos.adminCommand({removeShardFromZone: shardName, zone: 'y'}));
+ shardDoc = config.shards.findOne();
+ assert.eq([], shardDoc.tags);
+
+ st.stop();
+})();
diff --git a/jstests/sharding/add_shard_to_zone.js b/jstests/sharding/add_shard_to_zone.js
deleted file mode 100644
index bb142585cc2..00000000000
--- a/jstests/sharding/add_shard_to_zone.js
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Basic integration tests for addShardToZone command. More detailed tests can be found
- * in sharding_catalog_add_shard_to_zone_test.cpp.
- */
-(function() {
- var st = new ShardingTest({shards: 1});
-
- var configDB = st.s.getDB('config');
- var shardName = configDB.shards.findOne()._id;
-
- // Test adding shard with no zone to a new zone.
- assert.commandWorked(st.s.adminCommand({addShardToZone: shardName, zone: 'x'}));
- var shardDoc = configDB.shards.findOne();
- assert.eq(['x'], shardDoc.tags);
-
- // Test adding zone to a shard with existing zones.
- assert.commandWorked(st.s.adminCommand({addShardToZone: shardName, zone: 'y'}));
- shardDoc = configDB.shards.findOne();
- assert.eq(['x', 'y'], shardDoc.tags);
-
- st.stop();
-})();
diff --git a/jstests/sharding/remove_shard_from_zone.js b/jstests/sharding/remove_shard_from_zone.js
deleted file mode 100644
index 273d597d038..00000000000
--- a/jstests/sharding/remove_shard_from_zone.js
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Basic integration tests for removeShardFromZone command. More detailed tests can be found
- * in sharding_catalog_remove_shard_from_zone_test.cpp.
- */
-(function() {
- "use strict";
-
- var st = new ShardingTest({shards: 1});
-
- var configDB = st.s.getDB('config');
- var shardName = st.shard0.shardName;
-
- assert.commandWorked(st.s.adminCommand({addShardToZone: shardName, zone: 'x'}));
- var shardDoc = configDB.shards.findOne();
- assert.eq(['x'], shardDoc.tags);
-
- // Test removing shard from existing zone.
- assert.commandWorked(st.s.adminCommand({removeShardFromZone: shardName, zone: 'x'}));
- shardDoc = configDB.shards.findOne();
- assert.eq([], shardDoc.tags);
-
- // Test removing shard from zone that no longer exists.
- assert.commandWorked(st.s.adminCommand({removeShardFromZone: shardName, zone: 'x'}));
- shardDoc = configDB.shards.findOne();
- assert.eq([], shardDoc.tags);
-
- st.stop();
-})();
diff --git a/jstests/sharding/shard_collection_verify_initial_chunks.js b/jstests/sharding/shard_collection_verify_initial_chunks.js
index 0538bed1b34..e7072132b11 100644
--- a/jstests/sharding/shard_collection_verify_initial_chunks.js
+++ b/jstests/sharding/shard_collection_verify_initial_chunks.js
@@ -1,30 +1,57 @@
-//
-// Verify numInitialChunks can not be set for non hashed key or nonempty collections
-//
-
+/**
+ * Verify initial chunks are properly created and distributed in various combinations of shard key
+ * and empty/non-empty collections.
+ */
(function() {
'use strict';
- var st = new ShardingTest({mongos: 1, shards: 2});
- var kDbName = 'db';
- var mongos = st.s0;
+ let st = new ShardingTest({mongos: 1, shards: 2});
+ let mongos = st.s0;
- assert.commandWorked(mongos.adminCommand({enableSharding: kDbName}));
+ let config = mongos.getDB("config");
+ let db = mongos.getDB('TestDB');
- assert.commandFailed(mongos.adminCommand(
- {shardCollection: kDbName + '.foo', key: {aKey: 1}, numInitialChunks: 5}));
+ assert.commandWorked(mongos.adminCommand({enableSharding: 'TestDB'}));
+ st.ensurePrimaryShard('TestDB', st.shard1.shardName);
- assert.writeOK(mongos.getDB(kDbName).foo.insert({aKey: 1}));
- assert.commandWorked(mongos.getDB(kDbName).foo.createIndex({aKey: "hashed"}));
- assert.commandFailed(mongos.adminCommand(
- {shardCollection: kDbName + '.foo', key: {aKey: "hashed"}, numInitialChunks: 5}));
+ function checkChunkCounts(collName, chunksOnShard0, chunksOnShard1) {
+ let counts = st.chunkCounts(collName, 'TestDB');
+ assert.eq(chunksOnShard0,
+ counts[st.shard0.shardName],
+ 'Count mismatch on shard0: ' + tojson(counts));
+ assert.eq(chunksOnShard1,
+ counts[st.shard1.shardName],
+ 'Count mismatch on shard1: ' + tojson(counts));
+ }
- assert.writeOK(mongos.getDB(kDbName).foo.remove({}));
+ // Unsupported: Range sharding + numInitialChunks
+ assert.commandFailed(mongos.adminCommand(
+ {shardCollection: 'TestDB.RangeCollEmpty', key: {aKey: 1}, numInitialChunks: 6}));
+
+ // Unsupported: Hashed sharding + numInitialChunks + non-empty collection
+ assert.writeOK(db.HashedCollNotEmpty.insert({aKey: 1}));
+ assert.commandWorked(db.HashedCollNotEmpty.createIndex({aKey: "hashed"}));
+ assert.commandFailed(mongos.adminCommand({
+ shardCollection: 'TestDB.HashedCollNotEmpty',
+ key: {aKey: "hashed"},
+ numInitialChunks: 6
+ }));
+
+ // Supported: Hashed sharding + numInitialChunks + empty collection
+ // Expected: Even chunk distribution
+ assert.commandWorked(db.HashedCollEmpty.createIndex({aKey: "hashed"}));
assert.commandWorked(mongos.adminCommand(
- {shardCollection: kDbName + '.foo', key: {aKey: "hashed"}, numInitialChunks: 5}));
-
- mongos.getDB(kDbName).dropDatabase();
+ {shardCollection: 'TestDB.HashedCollEmpty', key: {aKey: "hashed"}, numInitialChunks: 6}));
+ checkChunkCounts('HashedCollEmpty', 3, 3);
+
+ // Supported: Hashed sharding + numInitialChunks + non-existent collection
+ // Expected: Even chunk distribution
+ assert.commandWorked(mongos.adminCommand({
+ shardCollection: 'TestDB.HashedCollNonExistent',
+ key: {aKey: "hashed"},
+ numInitialChunks: 6
+ }));
+ checkChunkCounts('HashedCollNonExistent', 3, 3);
st.stop();
-
})();
diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
index 81b01a612d3..ef3eda30ff1 100644
--- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
+++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
@@ -397,22 +397,9 @@ void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx,
*/
void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx,
const NamespaceString& nss,
- int numShards,
const std::vector<ShardId>& shardIds,
- bool isEmpty,
- const ShardKeyPattern& shardKeyPattern,
const std::vector<BSONObj>& finalSplitPoints) {
- auto catalogCache = Grid::get(opCtx)->catalogCache();
-
- if (!shardKeyPattern.isHashedPattern()) {
- // Only initially move chunks when using a hashed shard key.
- return;
- }
-
- if (!isEmpty) {
- // If the collection is not empty, rely on the balancer to migrate the chunks.
- return;
- }
+ const auto catalogCache = Grid::get(opCtx)->catalogCache();
auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
uassert(ErrorCodes::ConflictingOperationInProgress,
@@ -423,13 +410,18 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx,
auto chunkManager = routingInfo.cm();
// Move and commit each "big chunk" to a different shard.
- int i = 0;
+ auto nextShardId = [&, indx = 0 ]() mutable {
+ return shardIds[indx++ % shardIds.size()];
+ };
+
for (auto chunk : chunkManager->chunks()) {
- const ShardId& shardId = shardIds[i++ % numShards];
+ const auto shardId = nextShardId();
+
const auto toStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId);
if (!toStatus.isOK()) {
continue;
}
+
const auto to = toStatus.getValue();
// Can't move chunk to shard it's already on
@@ -648,11 +640,9 @@ public:
std::vector<ShardId> shardIds;
shardRegistry->getAllShardIds(opCtx, &shardIds);
- const int numShards = shardIds.size();
-
uassert(ErrorCodes::IllegalOperation,
"cannot shard collections before there are shards",
- numShards > 0);
+ !shardIds.empty());
// Handle collections in the config db separately.
if (nss.db() == NamespaceString::kConfigDb) {
@@ -688,7 +678,8 @@ public:
ON_BLOCK_EXIT([&conn] { conn.done(); });
// Step 1.
- validateAndDeduceFullRequestOptions(opCtx, nss, shardKeyPattern, numShards, conn, &request);
+ validateAndDeduceFullRequestOptions(
+ opCtx, nss, shardKeyPattern, shardIds.size(), conn, &request);
// The collation option should have been set to the collection default collation after being
// validated.
@@ -755,7 +746,23 @@ public:
return true;
}
- bool isEmpty = (conn->count(nss.ns()) == 0);
+ // This check for empty collection is racy, because it is not guaranteed that documents
+ // will not show up in the collection right after the count below has executed. It is
+ // left here for backwards compatiblity with pre-4.0.4 clusters, which do not support
+ // sharding being performed by the primary shard.
+ const bool isEmpty = (conn->count(nss.ns()) == 0);
+
+ // Map/reduce with output to an empty collection assumes it has full control of the
+ // output collection and it would be an unsupported operation if the collection is being
+ // concurrently written
+ const bool fromMapReduce = bool(request.getInitialSplitPoints());
+ if (fromMapReduce) {
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Map reduce with sharded output to a new collection found "
+ << nss.ns()
+ << " to be non-empty which is not supported.",
+ isEmpty);
+ }
// Step 3.
validateShardKeyAgainstExistingIndexes(
@@ -772,12 +779,12 @@ public:
std::vector<BSONObj> initialSplitPoints; // there will be at most numShards-1 of these
std::vector<BSONObj> finalSplitPoints; // all of the desired split points
if (request.getInitialSplitPoints()) {
- initialSplitPoints = std::move(*request.getInitialSplitPoints());
+ initialSplitPoints = *request.getInitialSplitPoints();
} else {
InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection(
shardKeyPattern,
isEmpty,
- numShards,
+ shardIds.size(),
request.getNumInitialChunks(),
&initialSplitPoints,
&finalSplitPoints);
@@ -786,15 +793,7 @@ public:
LOG(0) << "CMD: shardcollection: " << cmdObj;
audit::logShardCollection(
- Client::getCurrent(), nss.ns(), proposedKey, request.getUnique());
-
- // The initial chunks are distributed evenly across shards only if the initial split
- // points
- // were specified in the request, i.e., by mapReduce. Otherwise, all the initial chunks
- // are
- // placed on the primary shard, and may be distributed across shards through migrations
- // (below) if using a hashed shard key.
- const bool distributeInitialChunks = bool(request.getInitialSplitPoints());
+ opCtx->getClient(), nss.ns(), proposedKey, request.getUnique());
// Step 6. Actually shard the collection.
catalogManager->shardCollection(opCtx,
@@ -804,7 +803,7 @@ public:
*request.getCollation(),
request.getUnique(),
initialSplitPoints,
- distributeInitialChunks,
+ fromMapReduce,
primaryShardId);
result << "collectionsharded" << nss.ns();
if (uuid) {
@@ -818,9 +817,12 @@ public:
collDistLock.reset();
dbDistLock.reset();
- // Step 7. Migrate initial chunks to distribute them across shards.
- migrateAndFurtherSplitInitialChunks(
- opCtx, nss, numShards, shardIds, isEmpty, shardKeyPattern, finalSplitPoints);
+ // Step 7. If the collection is empty and using hashed sharding, migrate initial chunks
+ // to spread them evenly across shards from the beginning. Otherwise rely on the
+ // balancer to do it.
+ if (isEmpty && shardKeyPattern.isHashedPattern()) {
+ migrateAndFurtherSplitInitialChunks(opCtx, nss, shardIds, finalSplitPoints);
+ }
return true;
}
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp
index 3654a465313..81d0ecb22ed 100644
--- a/src/mongo/db/s/config/initial_split_policy.cpp
+++ b/src/mongo/db/s/config/initial_split_policy.cpp
@@ -278,34 +278,32 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks(
const ShardId& primaryShardId,
const std::vector<BSONObj>& splitPoints,
const std::vector<TagsType>& tags,
- const bool distributeInitialChunks,
- const bool isEmpty,
- const int numContiguousChunksPerShard) {
- const auto& keyPattern = shardKeyPattern.getKeyPattern();
+ bool isEmpty,
+ int numContiguousChunksPerShard) {
+ uassert(ErrorCodes::InvalidOptions,
+ "Cannot generate initial chunks based on both split points and zones",
+ tags.empty() || splitPoints.empty());
- std::vector<BSONObj> finalSplitPoints;
+ const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (splitPoints.empty() && tags.empty()) {
- // If neither split points nor tags were specified use the shard's data distribution to
- // determine them
- auto primaryShard =
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId));
+ const auto& keyPattern = shardKeyPattern.getKeyPattern();
- // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of
- // the splitVector command and affects the number of chunks returned, has been loaded.
- uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx));
-
- if (!isEmpty) {
- finalSplitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints(
- opCtx,
- primaryShardId,
- nss,
- shardKeyPattern,
- ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()),
- Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
- 0));
- }
+ const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp();
+
+ // On which shards are the generated chunks allowed to be placed
+ std::vector<ShardId> shardIds;
+ if (isEmpty) {
+ shardRegistry->getAllShardIdsNoReload(&shardIds);
} else {
+ shardIds.push_back(primaryShardId);
+ }
+
+ ShardCollectionConfig initialChunks;
+
+ // If split points are requested, they take precedence over zones
+ if (!splitPoints.empty()) {
+ std::vector<BSONObj> finalSplitPoints;
+
// Make sure points are unique and ordered
auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
@@ -316,37 +314,63 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks(
for (const auto& splitPoint : orderedPts) {
finalSplitPoints.push_back(splitPoint);
}
- }
-
- uassert(ErrorCodes::InvalidOptions,
- str::stream() << "cannot generate initial chunks based on both split points and tags",
- tags.empty() || finalSplitPoints.empty());
-
- const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp();
-
- // If docs already exist for the collection, must use primary shard, otherwise defer to
- // passed-in distribution option.
- std::vector<ShardId> shardIds;
- if (isEmpty && distributeInitialChunks) {
- Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
- } else {
- shardIds.push_back(primaryShardId);
+ initialChunks = generateShardCollectionInitialChunks(nss,
+ shardKeyPattern,
+ primaryShardId,
+ validAfter,
+ finalSplitPoints,
+ shardIds,
+ numContiguousChunksPerShard);
}
+ // If zones are defined, use the zones
+ else if (!tags.empty()) {
+ if (isEmpty) {
+ initialChunks = generateShardCollectionInitialZonedChunks(
+ nss, shardKeyPattern, validAfter, tags, getTagToShardIds(opCtx, tags), shardIds);
+ } else {
+ // For a non-empty collection, create one chunk on the primary shard and leave it to the
+ // balancer to do the zone splitting and placement
+ ChunkVersion version(1, 0, OID::gen());
+ appendChunk(nss,
+ keyPattern.globalMin(),
+ keyPattern.globalMax(),
+ &version,
+ validAfter,
+ primaryShardId,
+ &initialChunks.chunks);
+ }
+ }
+ // If neither split points nor zones are available and the collection is not empty, ask the
+ // shard to select split points based on the data distribution
+ else if (!isEmpty) {
+ auto primaryShard =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId));
- ShardCollectionConfig initialChunks;
+ // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of
+ // the splitVector command and affects the number of chunks returned, has been loaded.
+ const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
+ uassertStatusOK(balancerConfig->refreshAndCheck(opCtx));
+
+ const auto shardSelectedSplitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints(
+ opCtx,
+ primaryShardId,
+ nss,
+ shardKeyPattern,
+ ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()),
+ balancerConfig->getMaxChunkSizeBytes(),
+ 0));
- if (tags.empty()) {
initialChunks = generateShardCollectionInitialChunks(nss,
shardKeyPattern,
primaryShardId,
validAfter,
- finalSplitPoints,
+ shardSelectedSplitPoints,
shardIds,
numContiguousChunksPerShard);
- } else if (!isEmpty) {
- // For a non-empty collection, create one chunk on the primary shard and leave it to the
- // balancer to do the zone split and rebalancing
+ }
+ // For empty collection, just create a single chunk
+ else {
ChunkVersion version(1, 0, OID::gen());
appendChunk(nss,
keyPattern.globalMin(),
@@ -355,9 +379,6 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks(
validAfter,
primaryShardId,
&initialChunks.chunks);
- } else {
- initialChunks = generateShardCollectionInitialZonedChunks(
- nss, shardKeyPattern, validAfter, tags, getTagToShardIds(opCtx, tags), shardIds);
}
LOG(0) << "Created " << initialChunks.chunks.size() << " chunk(s) for: " << nss
diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h
index 6e43265b3ae..b7bf33c797a 100644
--- a/src/mongo/db/s/config/initial_split_policy.h
+++ b/src/mongo/db/s/config/initial_split_policy.h
@@ -120,8 +120,12 @@ public:
const std::vector<ShardId>& shardIdsForGaps);
/**
- * Creates the first chunks for a newly sharded collection.
- * Returns the created chunks.
+ * Generates a list with what are the most optimal first chunks and placement for a newly
+ * sharded collection.
+ *
+ * If the collection 'isEmpty', chunks will be spread across all available (appropriate based on
+ * zoning rules) shards. Otherwise, they will all end up on the primary shard after which the
+ * balancer will take care of properly distributing them around.
*/
static ShardCollectionConfig createFirstChunks(OperationContext* opCtx,
const NamespaceString& nss,
@@ -129,9 +133,8 @@ public:
const ShardId& primaryShardId,
const std::vector<BSONObj>& splitPoints,
const std::vector<TagsType>& tags,
- const bool distributeInitialChunks,
- const bool isEmpty,
- const int numContiguousChunksPerShard = 1);
+ bool isEmpty,
+ int numContiguousChunksPerShard = 1);
/**
* Writes to the config server the first chunks for a newly sharded collection.
diff --git a/src/mongo/db/s/config/initial_split_policy_test.cpp b/src/mongo/db/s/config/initial_split_policy_test.cpp
index 7ff8e44fef2..daa8d0c29f3 100644
--- a/src/mongo/db/s/config/initial_split_policy_test.cpp
+++ b/src/mongo/db/s/config/initial_split_policy_test.cpp
@@ -118,7 +118,7 @@ TEST(CalculateHashedSplitPointsTest, EmptyCollectionChunksEqualToShards) {
checkCalculatedHashedSplitPoints(true, true, 3, 3, &expectedSplitPoints, &expectedSplitPoints);
}
-TEST(CalculateHashedSplitPointsTest, EmptyCollectionHashedWithInitialSplitsReturnsEmptySplits) {
+TEST(CalculateHashedSplitPointsTest, EmptyCollectionHashedWithNoInitialSplitsReturnsEmptySplits) {
const std::vector<BSONObj> expectedSplitPoints;
checkCalculatedHashedSplitPoints(true, true, 2, 1, &expectedSplitPoints, &expectedSplitPoints);
}
@@ -147,7 +147,7 @@ TEST(CalculateHashedSplitPointsTest, NotHashedWithInitialSplitsFails) {
ErrorCodes::InvalidOptions);
}
-class GenerateInitialSplitChunksTest : public unittest::Test {
+class GenerateInitialSplitChunksTestBase : public unittest::Test {
public:
/**
* Returns a vector of ChunkType objects for the given chunk ranges.
@@ -207,7 +207,7 @@ private:
const Timestamp _timeStamp{Date_t::now()};
};
-class GenerateInitialHashedSplitChunksTest : public GenerateInitialSplitChunksTest {
+class GenerateInitialHashedSplitChunksTest : public GenerateInitialSplitChunksTestBase {
public:
const std::vector<BSONObj>& hashedSplitPoints() {
return _splitPoints;
@@ -263,7 +263,7 @@ TEST_F(GenerateInitialHashedSplitChunksTest,
assertChunkVectorsAreEqual(expectedChunks, shardCollectionConfig.chunks);
}
-class GenerateShardCollectionInitialZonedChunksTest : public GenerateInitialSplitChunksTest {
+class GenerateShardCollectionInitialZonedChunksTest : public GenerateInitialSplitChunksTestBase {
public:
/**
* Calls generateShardCollectionInitialZonedChunks according to the given arguments
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 091a2cd206b..6e4bef00edd 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -257,19 +257,18 @@ public:
/**
- * Shards a collection. Assumes that the database is enabled for sharding.
+ * Shards collection with namespace 'nss' and implicitly assumes that the database is enabled
+ * for sharding (i.e., doesn't check whether enableSharding has been called previously).
*
- * @param ns: namespace of collection to shard
- * @param uuid: the collection's UUID. Optional because new in 3.6.
- * @param fieldsAndOrder: shardKey pattern
- * @param defaultCollation: the default collation for the collection, to be written to
- * config.collections. If empty, the collection default collation is simple binary
- * comparison. Note the the shard key collation will always be simple binary comparison,
- * even if the collection default collation is non-simple.
- * @param unique: if true, ensure underlying index enforces a unique constraint.
- * @param initPoints: create chunks based on a set of specified split points.
- * @param initShardIds: If non-empty, specifies the set of shards to assign chunks between.
- * Otherwise all chunks will be assigned to the primary shard for the database.
+ * uuid - the collection's UUID. Optional because new in 3.6.
+ * fieldsAndOrder - shard key pattern to use.
+ * defaultCollation - the default collation for the collection, excluding the shard key. If
+ * empty, defaults to simple binary comparison. Note that the shard key collation will always
+ * be simple binary comparison, even if the collection default collation is non-simple.
+ * unique - if true, ensure underlying index enforces a unique constraint.
+ * initPoints - create chunks based on a set of specified split points.
+ * isFromMapReduce - whether this request comes from map/reduce, in which case the generated
+ * chunks can be spread across shards. Otherwise they will stay on the primary shard.
*/
void shardCollection(OperationContext* opCtx,
const NamespaceString& nss,
@@ -278,7 +277,7 @@ public:
const BSONObj& defaultCollation,
bool unique,
const std::vector<BSONObj>& initPoints,
- const bool distributeInitialChunks,
+ bool isFromMapReduce,
const ShardId& dbPrimaryShardId);
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index 71106b3a396..9dbbc01b517 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -366,7 +366,7 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx,
const BSONObj& defaultCollation,
bool unique,
const vector<BSONObj>& splitPoints,
- const bool distributeInitialChunks,
+ bool isFromMapReduce,
const ShardId& dbPrimaryShardId) {
const auto catalogClient = Grid::get(opCtx)->catalogClient();
const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
@@ -376,14 +376,23 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx,
// Fail if there are partially written chunks from a previous failed shardCollection.
checkForExistingChunks(opCtx, nss);
+ // Prior to 4.0.5, zones cannot be taken into account at collection sharding time, so ignore
+ // them and let the balancer apply them later
+ const std::vector<TagsType> treatAsNoZonesDefined;
+
+ // Map/reduce with output to sharded collection ignores consistency checks and requires the
+ // initial chunks to be spread across shards unconditionally
+ const bool treatAsEmpty = isFromMapReduce;
+
// Record start in changelog
{
BSONObjBuilder collectionDetail;
collectionDetail.append("shardKey", fieldsAndOrder.toBSON());
collectionDetail.append("collection", nss.ns());
- if (uuid) {
+ if (uuid)
uuid->appendToBuilder(&collectionDetail, "uuid");
- }
+ collectionDetail.append("empty", treatAsEmpty);
+ collectionDetail.append("fromMapReduce", isFromMapReduce);
collectionDetail.append("primary", primaryShard->toString());
collectionDetail.append("numChunks", static_cast<int>(splitPoints.size() + 1));
uassertStatusOK(catalogClient->logChange(opCtx,
@@ -400,19 +409,13 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx,
->makeFromBSON(defaultCollation));
}
- std::vector<TagsType> tags;
- // Since this code runs on the config server, we cannot guarantee that the collection is still
- // empty by the time the metadata is written so always assume we are sharding a non-empty
- // collection.
- bool isEmpty = false;
const auto initialChunks = InitialSplitPolicy::createFirstChunks(opCtx,
nss,
fieldsAndOrder,
dbPrimaryShardId,
splitPoints,
- tags,
- distributeInitialChunks,
- isEmpty);
+ treatAsNoZonesDefined,
+ treatAsEmpty);
InitialSplitPolicy::writeFirstChunksToConfig(opCtx, initialChunks);
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp
index e8fda03ca89..c59ba36187f 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp
@@ -39,25 +39,15 @@
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter_factory_mock.h"
#include "mongo/client/remote_command_targeter_mock.h"
-#include "mongo/db/client.h"
-#include "mongo/db/commands.h"
#include "mongo/db/s/config/initial_split_policy.h"
#include "mongo/db/s/config/sharding_catalog_manager.h"
-#include "mongo/executor/network_interface_mock.h"
-#include "mongo/executor/task_executor.h"
#include "mongo/rpc/metadata/tracking_metadata.h"
-#include "mongo/s/catalog/type_changelog.h"
#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/catalog/type_collection.h"
-#include "mongo/s/catalog/type_database.h"
-#include "mongo/s/catalog/type_locks.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/config_server_test_fixture.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
-#include "mongo/stdx/future.h"
-#include "mongo/transport/mock_session.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/time_support.h"
@@ -74,12 +64,8 @@ using std::string;
using std::vector;
using unittest::assertGet;
-const ShardId testPrimaryShard = ShardId("shard0");
-
-const NamespaceString kNamespace("db1.foo");
-
-class ShardCollectionTest : public ConfigServerTestFixture {
-public:
+class ShardCollectionTestBase : public ConfigServerTestFixture {
+protected:
void expectSplitVector(const HostAndPort& shardHost,
const ShardKeyPattern& keyPattern,
const BSONObj& splitPoints) {
@@ -106,13 +92,50 @@ public:
});
}
+ const ShardId testPrimaryShard{"shard0"};
+ const NamespaceString kNamespace{"db1.foo"};
+
private:
const HostAndPort configHost{"configHost1"};
const ConnectionString configCS{ConnectionString::forReplicaSet("configReplSet", {configHost})};
const HostAndPort clientHost{"clientHost1"};
};
-TEST_F(ShardCollectionTest, anotherMongosSharding) {
+
+// Tests which exercise the ShardingCatalogManager::shardCollection logic, which is what the config
+// server uses to shard collections, when the '_shardsvrShardCollection' command is not available
+// (fast initial split optimization)
+class ConfigServerShardCollectionTest : public ShardCollectionTestBase {
+protected:
+ void checkWrittenChunks(const std::vector<ChunkType>& expectedChunks) {
+ const auto grid = Grid::get(operationContext());
+ const auto catalogClient = grid->catalogClient();
+ repl::OpTime unusedOpTime;
+ const auto writtenChunks =
+ assertGet(catalogClient->getChunks(operationContext(),
+ BSON("ns" << kNamespace.ns()),
+ BSON("min" << 1),
+ boost::none,
+ &unusedOpTime,
+ repl::ReadConcernLevel::kLocalReadConcern));
+ ASSERT_EQ(expectedChunks.size(), writtenChunks.size());
+
+ auto itE = expectedChunks.begin();
+ auto itW = writtenChunks.begin();
+ for (; itE != expectedChunks.end(); itE++, itW++) {
+ const auto& expected = *itE;
+ const auto& written = *itW;
+ ASSERT_BSONOBJ_EQ(expected.getMin(), expected.getMin());
+ ASSERT_BSONOBJ_EQ(expected.getMax(), expected.getMax());
+ ASSERT_EQ(expected.getShard(), written.getShard());
+ }
+ }
+
+ const ShardKeyPattern keyPattern{BSON("_id" << 1)};
+ const BSONObj defaultCollation;
+};
+
+TEST_F(ConfigServerShardCollectionTest, Partially_Written_Chunks_Present) {
ShardType shard;
shard.setName("shard0");
shard.setHost("shardHost");
@@ -130,25 +153,21 @@ TEST_F(ShardCollectionTest, anotherMongosSharding) {
chunk.setMax(BSON("_id" << 5));
ASSERT_OK(setupChunks({chunk}));
- ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
- BSONObj defaultCollation;
-
ASSERT_THROWS_CODE(ShardingCatalogManager::get(operationContext())
->shardCollection(operationContext(),
kNamespace,
boost::none, // UUID
- shardKeyPattern,
+ keyPattern,
defaultCollation,
false,
- vector<BSONObj>{},
- false,
+ {},
+ false, // isFromMapReduce
testPrimaryShard),
AssertionException,
ErrorCodes::ManualInterventionRequired);
}
-TEST_F(ShardCollectionTest, noInitialChunksOrData) {
- // Initial setup
+TEST_F(ConfigServerShardCollectionTest, RangeSharding_ForMapReduce_NoInitialSplitPoints) {
const HostAndPort shardHost{"shardHost"};
ShardType shard;
shard.setName("shard0");
@@ -164,9 +183,6 @@ TEST_F(ShardCollectionTest, noInitialChunksOrData) {
setupDatabase(kNamespace.db().toString(), shard.getName(), true);
- ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
- BSONObj defaultCollation;
-
// Now start actually sharding the collection.
auto future = launchAsync([&] {
ON_BLOCK_EXIT([&] { Client::destroy(); });
@@ -176,17 +192,14 @@ TEST_F(ShardCollectionTest, noInitialChunksOrData) {
->shardCollection(opCtx.get(),
kNamespace,
boost::none, // UUID
- shardKeyPattern,
+ keyPattern,
defaultCollation,
false,
- vector<BSONObj>{},
- false,
+ {}, // No split points
+ true, // isFromMapReduce
testPrimaryShard);
});
- // Respond to the splitVector command sent to the shard to figure out initial split points.
- expectSplitVector(shardHost, shardKeyPattern, BSONObj());
-
// Expect the set shard version for that namespace.
// We do not check for a specific ChunkVersion, because we cannot easily know the OID that was
// generated by shardCollection for the first chunk.
@@ -194,10 +207,15 @@ TEST_F(ShardCollectionTest, noInitialChunksOrData) {
expectSetShardVersion(shardHost, shard, kNamespace, boost::none /* expected ChunkVersion */);
future.timed_get(kFutureTimeout);
+
+ checkWrittenChunks(
+ {ChunkType(kNamespace,
+ {keyPattern.getKeyPattern().globalMin(), keyPattern.getKeyPattern().globalMax()},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard)});
}
-TEST_F(ShardCollectionTest, withInitialChunks) {
- // Initial setup
+TEST_F(ConfigServerShardCollectionTest, RangeSharding_ForMapReduce_WithInitialSplitPoints) {
const HostAndPort shard0Host{"shardHost0"};
const HostAndPort shard1Host{"shardHost1"};
const HostAndPort shard2Host{"shardHost2"};
@@ -234,64 +252,13 @@ TEST_F(ShardCollectionTest, withInitialChunks) {
setupDatabase(kNamespace.db().toString(), shard0.getName(), true);
- ShardKeyPattern keyPattern(BSON("_id" << 1));
-
BSONObj splitPoint0 = BSON("_id" << 1);
BSONObj splitPoint1 = BSON("_id" << 100);
BSONObj splitPoint2 = BSON("_id" << 200);
BSONObj splitPoint3 = BSON("_id" << 300);
- ChunkVersion expectedVersion(1, 0, OID::gen());
-
- ChunkType expectedChunk0;
- expectedChunk0.setNS(kNamespace);
- expectedChunk0.setShard(shard0.getName());
- expectedChunk0.setMin(keyPattern.getKeyPattern().globalMin());
- expectedChunk0.setMax(splitPoint0);
- expectedChunk0.setVersion(expectedVersion);
- expectedVersion.incMinor();
-
- ChunkType expectedChunk1;
- expectedChunk1.setNS(kNamespace);
- expectedChunk1.setShard(shard1.getName());
- expectedChunk1.setMin(splitPoint0);
- expectedChunk1.setMax(splitPoint1);
- expectedChunk1.setVersion(expectedVersion);
- expectedVersion.incMinor();
-
- ChunkType expectedChunk2;
- expectedChunk2.setNS(kNamespace);
- expectedChunk2.setShard(shard2.getName());
- expectedChunk2.setMin(splitPoint1);
- expectedChunk2.setMax(splitPoint2);
- expectedChunk2.setVersion(expectedVersion);
- expectedVersion.incMinor();
-
- ChunkType expectedChunk3;
- expectedChunk3.setNS(kNamespace);
- expectedChunk3.setShard(shard0.getName());
- expectedChunk3.setMin(splitPoint2);
- expectedChunk3.setMax(splitPoint3);
- expectedChunk3.setVersion(expectedVersion);
- expectedVersion.incMinor();
-
- ChunkType expectedChunk4;
- expectedChunk4.setNS(kNamespace);
- expectedChunk4.setShard(shard1.getName());
- expectedChunk4.setMin(splitPoint3);
- expectedChunk4.setMax(keyPattern.getKeyPattern().globalMax());
- expectedChunk4.setVersion(expectedVersion);
-
- vector<ChunkType> expectedChunks{
- expectedChunk0, expectedChunk1, expectedChunk2, expectedChunk3, expectedChunk4};
-
- BSONObj defaultCollation;
-
// Now start actually sharding the collection.
auto future = launchAsync([&] {
- // TODO: can we mock the ShardRegistry to return these?
- set<ShardId> shards{shard0.getName(), shard1.getName(), shard2.getName()};
-
ON_BLOCK_EXIT([&] { Client::destroy(); });
Client::initThreadIfNotAlready("Test");
auto opCtx = cc().makeOperationContext();
@@ -302,8 +269,8 @@ TEST_F(ShardCollectionTest, withInitialChunks) {
keyPattern,
defaultCollation,
true,
- vector<BSONObj>{splitPoint0, splitPoint1, splitPoint2, splitPoint3},
- true,
+ {splitPoint0, splitPoint1, splitPoint2, splitPoint3},
+ true, // isFromMapReduce
testPrimaryShard);
});
@@ -314,10 +281,30 @@ TEST_F(ShardCollectionTest, withInitialChunks) {
expectSetShardVersion(shard0Host, shard0, kNamespace, boost::none /* expected ChunkVersion */);
future.timed_get(kFutureTimeout);
+
+ checkWrittenChunks({ChunkType(kNamespace,
+ ChunkRange{keyPattern.getKeyPattern().globalMin(), splitPoint0},
+ ChunkVersion::IGNORED(),
+ shard0.getName()),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint0, splitPoint1},
+ ChunkVersion::IGNORED(),
+ shard1.getName()),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint1, splitPoint2},
+ ChunkVersion::IGNORED(),
+ shard2.getName()),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint2, splitPoint3},
+ ChunkVersion::IGNORED(),
+ shard0.getName()),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint3, keyPattern.getKeyPattern().globalMax()},
+ ChunkVersion::IGNORED(),
+ shard1.getName())});
}
-TEST_F(ShardCollectionTest, withInitialData) {
- // Initial setup
+TEST_F(ConfigServerShardCollectionTest, RangeSharding_NoInitialSplitPoints_NoSplitVectorPoints) {
const HostAndPort shardHost{"shardHost"};
ShardType shard;
shard.setName("shard0");
@@ -333,14 +320,56 @@ TEST_F(ShardCollectionTest, withInitialData) {
setupDatabase(kNamespace.db().toString(), shard.getName(), true);
- ShardKeyPattern keyPattern(BSON("_id" << 1));
+ // Now start actually sharding the collection.
+ auto future = launchAsync([&] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready("Test");
+ auto opCtx = cc().makeOperationContext();
+ ShardingCatalogManager::get(operationContext())
+ ->shardCollection(opCtx.get(),
+ kNamespace,
+ boost::none, // UUID
+ keyPattern,
+ defaultCollation,
+ false,
+ {}, // No split points
+ false, // isFromMapReduce
+ testPrimaryShard);
+ });
- BSONObj splitPoint0 = BSON("_id" << 1);
- BSONObj splitPoint1 = BSON("_id" << 100);
- BSONObj splitPoint2 = BSON("_id" << 200);
- BSONObj splitPoint3 = BSON("_id" << 300);
+ // Respond to the splitVector command sent to the shard to figure out initial split points.
+ expectSplitVector(shardHost, keyPattern, BSONObj());
+
+ // Expect the set shard version for that namespace.
+ // We do not check for a specific ChunkVersion, because we cannot easily know the OID that was
+ // generated by shardCollection for the first chunk.
+ // TODO SERVER-29451: add hooks to the mock storage engine to expect reads and writes.
+ expectSetShardVersion(shardHost, shard, kNamespace, boost::none /* expected ChunkVersion */);
+
+ future.timed_get(kFutureTimeout);
+
+ checkWrittenChunks(
+ {ChunkType(kNamespace,
+ {keyPattern.getKeyPattern().globalMin(), keyPattern.getKeyPattern().globalMax()},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard)});
+}
+
+TEST_F(ConfigServerShardCollectionTest, RangeSharding_NoInitialSplitPoints_WithSplitVectorPoints) {
+ const HostAndPort shardHost{"shardHost"};
+ ShardType shard;
+ shard.setName("shard0");
+ shard.setHost(shardHost.toString());
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(ConnectionString(shardHost));
+ targeter->setFindHostReturnValue(shardHost);
+ targeterFactory()->addTargeterToReturn(ConnectionString(shardHost), std::move(targeter));
- BSONObj defaultCollation;
+ ASSERT_OK(setupShards(vector<ShardType>{shard}));
+
+ setupDatabase(kNamespace.db().toString(), shard.getName(), true);
// Now start actually sharding the collection.
auto future = launchAsync([&] {
@@ -354,11 +383,16 @@ TEST_F(ShardCollectionTest, withInitialData) {
keyPattern,
defaultCollation,
false,
- vector<BSONObj>{},
- false,
+ {}, // No split points
+ false, // isFromMapReduce
testPrimaryShard);
});
+ BSONObj splitPoint0 = BSON("_id" << 1);
+ BSONObj splitPoint1 = BSON("_id" << 100);
+ BSONObj splitPoint2 = BSON("_id" << 200);
+ BSONObj splitPoint3 = BSON("_id" << 300);
+
// Respond to the splitVector command sent to the shard to figure out initial split points.
expectSplitVector(shardHost,
keyPattern,
@@ -371,12 +405,136 @@ TEST_F(ShardCollectionTest, withInitialData) {
expectSetShardVersion(shardHost, shard, kNamespace, boost::none);
future.timed_get(kFutureTimeout);
+
+ checkWrittenChunks({ChunkType(kNamespace,
+ ChunkRange{keyPattern.getKeyPattern().globalMin(), splitPoint0},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint0, splitPoint1},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint1, splitPoint2},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint2, splitPoint3},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint3, keyPattern.getKeyPattern().globalMax()},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard)});
}
-using CreateFirstChunksTest = ShardCollectionTest;
+TEST_F(ConfigServerShardCollectionTest, RangeSharding_WithInitialSplitPoints_NoSplitVectorPoints) {
+ const HostAndPort shardHost{"shardHost"};
+ ShardType shard;
+ shard.setName("shard0");
+ shard.setHost(shardHost.toString());
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(ConnectionString(shardHost));
+ targeter->setFindHostReturnValue(shardHost);
+ targeterFactory()->addTargeterToReturn(ConnectionString(shardHost), std::move(targeter));
+
+ ASSERT_OK(setupShards(vector<ShardType>{shard}));
-TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmptyCollection) {
+ setupDatabase(kNamespace.db().toString(), shard.getName(), true);
+
+ BSONObj splitPoint0 = BSON("_id" << 1);
+ BSONObj splitPoint1 = BSON("_id" << 100);
+ BSONObj splitPoint2 = BSON("_id" << 200);
+ BSONObj splitPoint3 = BSON("_id" << 300);
+
+ // Now start actually sharding the collection.
+ auto future = launchAsync([&] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready("Test");
+ auto opCtx = cc().makeOperationContext();
+ ShardingCatalogManager::get(operationContext())
+ ->shardCollection(opCtx.get(),
+ kNamespace,
+ boost::none, // UUID
+ keyPattern,
+ defaultCollation,
+ false,
+ {splitPoint0, splitPoint1, splitPoint2, splitPoint3},
+ false, // isFromMapReduce
+ testPrimaryShard);
+ });
+
+ // Expect the set shard version for that namespace
+ // We do not check for a specific ChunkVersion, because we cannot easily know the OID that was
+ // generated by shardCollection for the first chunk.
+ // TODO SERVER-29451: add hooks to the mock storage engine to expect reads and writes.
+ expectSetShardVersion(shardHost, shard, kNamespace, boost::none);
+
+ future.timed_get(kFutureTimeout);
+
+ checkWrittenChunks({ChunkType(kNamespace,
+ ChunkRange{keyPattern.getKeyPattern().globalMin(), splitPoint0},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint0, splitPoint1},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint1, splitPoint2},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint2, splitPoint3},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint3, keyPattern.getKeyPattern().globalMax()},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard)});
+}
+
+
+// Direct tests for InitialSplitPolicy::createFirstChunks which is the base call for both the config
+// server and shard server's shard collection logic
+class CreateFirstChunksTest : public ShardCollectionTestBase {
+protected:
const ShardKeyPattern kShardKeyPattern{BSON("x" << 1)};
+};
+
+TEST_F(CreateFirstChunksTest, Split_Disallowed_With_Both_SplitPoints_And_Zones) {
+ ASSERT_THROWS_CODE(
+ InitialSplitPolicy::createFirstChunks(
+ operationContext(),
+ kNamespace,
+ kShardKeyPattern,
+ ShardId("shard1"),
+ {BSON("x" << 0)},
+ {TagsType(kNamespace,
+ "TestZone",
+ ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))},
+ true /* isEmpty */),
+ AssertionException,
+ ErrorCodes::InvalidOptions);
+
+ ASSERT_THROWS_CODE(
+ InitialSplitPolicy::createFirstChunks(
+ operationContext(),
+ kNamespace,
+ kShardKeyPattern,
+ ShardId("shard1"),
+ {BSON("x" << 0)}, /* No split points */
+ {TagsType(kNamespace,
+ "TestZone",
+ ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))},
+ false /* isEmpty */),
+ AssertionException,
+ ErrorCodes::InvalidOptions);
+}
+
+TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromSplitVector_ManyChunksToPrimary) {
const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"),
ShardType("shard1", "rs1/shard1:123"),
ShardType("shard2", "rs2/shard2:123")};
@@ -400,9 +558,8 @@ TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmp
kNamespace,
kShardKeyPattern,
ShardId("shard1"),
- {},
- {},
- true,
+ {}, /* No split points */
+ {}, /* No zones */
false /* isEmpty */);
});
@@ -414,8 +571,42 @@ TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmp
ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard());
}
-TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithTagsIgnoredForNonEmptyCollection) {
- const ShardKeyPattern kShardKeyPattern{BSON("x" << 1)};
+TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromClient_ManyChunksToPrimary) {
+ const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"),
+ ShardType("shard1", "rs1/shard1:123"),
+ ShardType("shard2", "rs2/shard2:123")};
+
+ const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost()));
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(connStr);
+ targeter->setFindHostReturnValue(connStr.getServers()[0]);
+ targeterFactory()->addTargeterToReturn(connStr, std::move(targeter));
+
+ ASSERT_OK(setupShards(kShards));
+ shardRegistry()->reload(operationContext());
+
+ auto future = launchAsync([&] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready("Test");
+ auto opCtx = cc().makeOperationContext();
+ return InitialSplitPolicy::createFirstChunks(opCtx.get(),
+ kNamespace,
+ kShardKeyPattern,
+ ShardId("shard1"),
+ {BSON("x" << 0)},
+ {}, /* No zones */
+ false /* isEmpty */);
+ });
+
+ const auto& firstChunks = future.timed_get(kFutureTimeout);
+ ASSERT_EQ(2U, firstChunks.chunks.size());
+ ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard());
+ ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard());
+}
+
+TEST_F(CreateFirstChunksTest, NonEmptyCollection_WithZones_OneChunkToPrimary) {
const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123", {"TestZone"}),
ShardType("shard1", "rs1/shard1:123", {"TestZone"}),
ShardType("shard2", "rs2/shard2:123")};
@@ -427,14 +618,108 @@ TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithTagsIgnoredForNonEmptyC
kNamespace,
kShardKeyPattern,
ShardId("shard1"),
- {},
- {TagsType(kNamespace, "TestZone", ChunkRange(BSON("x" << MinKey), BSON("x" << 0)))},
- true,
+ {}, /* No split points */
+ {TagsType(kNamespace,
+ "TestZone",
+ ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))},
false /* isEmpty */);
ASSERT_EQ(1U, firstChunks.chunks.size());
ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard());
}
+TEST_F(CreateFirstChunksTest, EmptyCollection_SplitPoints_FromClient_ManyChunksDistributed) {
+ const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"),
+ ShardType("shard1", "rs1/shard1:123"),
+ ShardType("shard2", "rs2/shard2:123")};
+
+ const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost()));
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(connStr);
+ targeter->setFindHostReturnValue(connStr.getServers()[0]);
+ targeterFactory()->addTargeterToReturn(connStr, std::move(targeter));
+
+ ASSERT_OK(setupShards(kShards));
+ shardRegistry()->reload(operationContext());
+
+ auto future = launchAsync([&] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready("Test");
+ auto opCtx = cc().makeOperationContext();
+ return InitialSplitPolicy::createFirstChunks(opCtx.get(),
+ kNamespace,
+ kShardKeyPattern,
+ ShardId("shard1"),
+ {BSON("x" << 0), BSON("x" << 100)},
+ {}, /* No zones */
+ true /* isEmpty */);
+ });
+
+ const auto& firstChunks = future.timed_get(kFutureTimeout);
+ ASSERT_EQ(3U, firstChunks.chunks.size());
+ ASSERT_EQ(kShards[0].getName(), firstChunks.chunks[0].getShard());
+ ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard());
+ ASSERT_EQ(kShards[2].getName(), firstChunks.chunks[2].getShard());
+}
+
+TEST_F(CreateFirstChunksTest, EmptyCollection_NoSplitPoints_OneChunkToPrimary) {
+ const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"),
+ ShardType("shard1", "rs1/shard1:123"),
+ ShardType("shard2", "rs2/shard2:123")};
+
+ const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost()));
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(connStr);
+ targeter->setFindHostReturnValue(connStr.getServers()[0]);
+ targeterFactory()->addTargeterToReturn(connStr, std::move(targeter));
+
+ ASSERT_OK(setupShards(kShards));
+ shardRegistry()->reload(operationContext());
+
+ auto future = launchAsync([&] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready("Test");
+ auto opCtx = cc().makeOperationContext();
+ return InitialSplitPolicy::createFirstChunks(opCtx.get(),
+ kNamespace,
+ kShardKeyPattern,
+ ShardId("shard1"),
+ {}, /* No split points */
+ {}, /* No zones */
+ true /* isEmpty */);
+ });
+
+ const auto& firstChunks = future.timed_get(kFutureTimeout);
+ ASSERT_EQ(1U, firstChunks.chunks.size());
+ ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard());
+}
+
+TEST_F(CreateFirstChunksTest, EmptyCollection_WithZones_ManyChunksOnFirstZoneShard) {
+ const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123", {"TestZone"}),
+ ShardType("shard1", "rs1/shard1:123", {"TestZone"}),
+ ShardType("shard2", "rs2/shard2:123")};
+ ASSERT_OK(setupShards(kShards));
+ shardRegistry()->reload(operationContext());
+
+ const auto firstChunks = InitialSplitPolicy::createFirstChunks(
+ operationContext(),
+ kNamespace,
+ kShardKeyPattern,
+ ShardId("shard1"),
+ {}, /* No split points */
+ {TagsType(kNamespace,
+ "TestZone",
+ ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))},
+ true /* isEmpty */);
+
+ ASSERT_EQ(2U, firstChunks.chunks.size());
+ ASSERT_EQ(kShards[0].getName(), firstChunks.chunks[0].getShard());
+ ASSERT_EQ(kShards[0].getName(), firstChunks.chunks[1].getShard());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp
index ab296364b39..6f9d335b67f 100644
--- a/src/mongo/db/s/shardsvr_shard_collection.cpp
+++ b/src/mongo/db/s/shardsvr_shard_collection.cpp
@@ -421,7 +421,7 @@ void shardCollection(OperationContext* opCtx,
bool unique,
const std::vector<BSONObj>& splitPoints,
const std::vector<TagsType>& tags,
- const bool fromMapReduce,
+ bool fromMapReduce,
const ShardId& dbPrimaryShardId,
const int numContiguousChunksPerShard,
const bool isEmpty) {
@@ -429,8 +429,6 @@ void shardCollection(OperationContext* opCtx,
const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
const auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbPrimaryShardId));
- const bool distributeChunks =
- fromMapReduce || fieldsAndOrder.isHashedPattern() || !tags.empty();
// Fail if there are partially written chunks from a previous failed shardCollection.
checkForExistingChunks(opCtx, nss);
@@ -440,9 +438,10 @@ void shardCollection(OperationContext* opCtx,
BSONObjBuilder collectionDetail;
collectionDetail.append("shardKey", fieldsAndOrder.toBSON());
collectionDetail.append("collection", nss.ns());
- if (uuid) {
+ if (uuid)
uuid->appendToBuilder(&collectionDetail, "uuid");
- }
+ collectionDetail.append("empty", isEmpty);
+ collectionDetail.append("fromMapReduce", fromMapReduce);
collectionDetail.append("primary", primaryShard->toString());
collectionDetail.append("numChunks", static_cast<int>(splitPoints.size() + 1));
uassertStatusOK(catalogClient->logChange(opCtx,
@@ -465,7 +464,6 @@ void shardCollection(OperationContext* opCtx,
dbPrimaryShardId,
splitPoints,
tags,
- distributeChunks,
isEmpty,
numContiguousChunksPerShard);
@@ -680,7 +678,7 @@ public:
std::vector<BSONObj> finalSplitPoints;
if (request.getInitialSplitPoints()) {
- finalSplitPoints = std::move(*request.getInitialSplitPoints());
+ finalSplitPoints = *request.getInitialSplitPoints();
} else if (tags.empty()) {
InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection(
shardKeyPattern,
@@ -701,12 +699,21 @@ public:
LOG(0) << "CMD: shardcollection: " << cmdObj;
audit::logShardCollection(
- Client::getCurrent(), nss.ns(), proposedKey, request.getUnique());
+ opCtx->getClient(), nss.ns(), proposedKey, request.getUnique());
- // The initial chunks are distributed evenly across shards if the initial split
- // points were specified in the request by mapReduce or if we are using a hashed
- // shard key. Otherwise, all the initial chunks are placed on the primary shard.
+ // Map/reduce with output to an empty collection assumes it has full control of the
+ // output collection and it would be an unsupported operation if the collection is
+ // being concurrently written
const bool fromMapReduce = bool(request.getInitialSplitPoints());
+ if (fromMapReduce) {
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream()
+ << "Map reduce with sharded output to a new collection found "
+ << nss.ns()
+ << " to be non-empty which is not supported.",
+ isEmpty);
+ }
+
const int numContiguousChunksPerShard = initialSplitPoints.empty()
? 1
: (finalSplitPoints.size() + 1) / (initialSplitPoints.size() + 1);