diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-01-28 20:55:04 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-01-30 08:34:09 -0500 |
commit | c9a446e51769484fc4aa911046942752c4cb9cdb (patch) | |
tree | e36cc36d47abe3a01c69814247e3ba4e716f64fb | |
parent | 88216373b7f1ba0ee16db972b81010efeee3022a (diff) | |
download | mongo-c9a446e51769484fc4aa911046942752c4cb9cdb.tar.gz |
SERVER-39234 Ensure `shardCollection` initial split works the same between config server and shard primary
(cherry picked from commit 1c61dfa3307c2009dd29c893b8d2b08af6bcf7d6)
14 files changed, 706 insertions, 308 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_misc.yml b/buildscripts/resmokeconfig/suites/sharding_misc.yml index 93551faabbd..ee21cfff768 100644 --- a/buildscripts/resmokeconfig/suites/sharding_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_misc.yml @@ -347,7 +347,7 @@ selector: - jstests/sharding/mongos_query_comment.js - jstests/sharding/kill_op_overflow.js - jstests/sharding/mongos_local_explain.js - - jstests/sharding/add_shard_to_zone.js + - jstests/sharding/add_and_remove_shard_from_zone.js - jstests/sharding/authwhere.js - jstests/sharding/authmr.js - jstests/sharding/sessions_collection_auto_healing.js @@ -355,7 +355,6 @@ selector: - jstests/sharding/read_does_not_create_namespaces.js - jstests/sharding/ismaster.js - jstests/sharding/balancer_shell_commands.js - - jstests/sharding/remove_shard_from_zone.js - jstests/sharding/forget_mr_temp_ns.js - jstests/sharding/auth_sharding_cmd_metadata.js - jstests/sharding/autodiscover_config_rs_from_secondary.js diff --git a/jstests/multiVersion/shard_collection_between_mixed_version_mongods.js b/jstests/multiVersion/shard_collection_between_mixed_version_mongods.js new file mode 100644 index 00000000000..cd895bf1916 --- /dev/null +++ b/jstests/multiVersion/shard_collection_between_mixed_version_mongods.js @@ -0,0 +1,62 @@ +// +// Testing shardCollection between 3.6 and latest mongod versions for both config servers and +// shards. +// + +load("./jstests/multiVersion/libs/verify_versions.js"); + +(function() { + "use strict"; + + var options = { + shards: [{binVersion: "latest"}, {binVersion: "3.6"}, {binVersion: "3.6"}], + mongos: 1, + other: { + mongosOptions: {binVersion: "latest"}, + configOptions: {binVersion: "latest"}, + shardAsReplicaSet: true + } + }; + + var st = new ShardingTest(options); + assert.binVersion(st.shard0, "latest"); + assert.binVersion(st.shard1, "3.6"); + assert.binVersion(st.shard2, "3.6"); + assert.binVersion(st.s0, "latest"); + + var mongos = st.s0; + var admin = mongos.getDB('admin'); + + const kDBOnShardWithLatestBinary = "DBWithPrimaryOnLatestBinary"; + const kNSOnLatestShard = kDBOnShardWithLatestBinary + ".Coll"; + const kDBOnShardWithOldBinary = "DBWithPrimaryOnOldBinary"; + const kNSOnOldShard = kDBOnShardWithOldBinary + ".Coll"; + + assert.commandWorked(admin.runCommand({enableSharding: kDBOnShardWithLatestBinary})); + assert.commandWorked(admin.runCommand({enableSharding: kDBOnShardWithOldBinary})); + st.ensurePrimaryShard(kDBOnShardWithLatestBinary, st.shard0.shardName); + st.ensurePrimaryShard(kDBOnShardWithOldBinary, st.shard1.shardName); + + // Test that shardCollection succeeds when both the config server and primary shard are + // running with latest binVersion, but other shards are running with 4.0.1 which does not + // have the new shardCollection protocol. + assert.commandWorked(admin.runCommand({shardCollection: kNSOnLatestShard, key: {a: 1}})); + + // Test that shardCollection succeeds when the config server is running with the latest + // binVersion, but the primary is running with 4.0.1. + assert.commandWorked(admin.runCommand({shardCollection: kNSOnOldShard, key: {a: 1}})); + + mongos.getDB(kDBOnShardWithLatestBinary).Coll.drop(); + mongos.getDB(kDBOnShardWithOldBinary).Coll.drop(); + + // Test that shardCollection with a hashed shard key succeeds when both the config server and + // primary shard are running with latest binVersion, but other shards are running with 4.0.1 + // which does not have the new shardCollection protocol. + assert.commandWorked(admin.runCommand({shardCollection: kNSOnLatestShard, key: {a: "hashed"}})); + + // Test that shardCollection with a hashed shard key succeeds when the config server is running + // with the latest binVersion, but the primary is running with 4.0.1. + assert.commandWorked(admin.runCommand({shardCollection: kNSOnOldShard, key: {a: "hashed"}})); + + st.stop(); +})(); diff --git a/jstests/sharding/add_and_remove_shard_from_zone.js b/jstests/sharding/add_and_remove_shard_from_zone.js new file mode 100644 index 00000000000..d4773597259 --- /dev/null +++ b/jstests/sharding/add_and_remove_shard_from_zone.js @@ -0,0 +1,40 @@ +/** + * Basic integration tests for addShardToZone command. More detailed tests can be found + * in sharding_catalog_add_shard_to_zone_test.cpp. + */ +(function() { + 'use strict'; + + let st = new ShardingTest({shards: 1}); + let mongos = st.s0; + + let config = mongos.getDB('config'); + var shardName = st.shard0.shardName; + + // Test adding shard with no zone to a new zone. + assert.commandWorked(mongos.adminCommand({addShardToZone: shardName, zone: 'x'})); + var shardDoc = config.shards.findOne(); + assert.eq(['x'], shardDoc.tags); + + // Test adding zone to a shard with existing zones. + assert.commandWorked(mongos.adminCommand({addShardToZone: shardName, zone: 'y'})); + shardDoc = config.shards.findOne(); + assert.eq(['x', 'y'], shardDoc.tags); + + // Test removing shard from existing zone. + assert.commandWorked(mongos.adminCommand({removeShardFromZone: shardName, zone: 'x'})); + shardDoc = config.shards.findOne(); + assert.eq(['y'], shardDoc.tags); + + // Test removing shard from zone that no longer exists. + assert.commandWorked(mongos.adminCommand({removeShardFromZone: shardName, zone: 'x'})); + shardDoc = config.shards.findOne(); + assert.eq(['y'], shardDoc.tags); + + // Test removing the last zone from a shard + assert.commandWorked(mongos.adminCommand({removeShardFromZone: shardName, zone: 'y'})); + shardDoc = config.shards.findOne(); + assert.eq([], shardDoc.tags); + + st.stop(); +})(); diff --git a/jstests/sharding/add_shard_to_zone.js b/jstests/sharding/add_shard_to_zone.js deleted file mode 100644 index bb142585cc2..00000000000 --- a/jstests/sharding/add_shard_to_zone.js +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Basic integration tests for addShardToZone command. More detailed tests can be found - * in sharding_catalog_add_shard_to_zone_test.cpp. - */ -(function() { - var st = new ShardingTest({shards: 1}); - - var configDB = st.s.getDB('config'); - var shardName = configDB.shards.findOne()._id; - - // Test adding shard with no zone to a new zone. - assert.commandWorked(st.s.adminCommand({addShardToZone: shardName, zone: 'x'})); - var shardDoc = configDB.shards.findOne(); - assert.eq(['x'], shardDoc.tags); - - // Test adding zone to a shard with existing zones. - assert.commandWorked(st.s.adminCommand({addShardToZone: shardName, zone: 'y'})); - shardDoc = configDB.shards.findOne(); - assert.eq(['x', 'y'], shardDoc.tags); - - st.stop(); -})(); diff --git a/jstests/sharding/remove_shard_from_zone.js b/jstests/sharding/remove_shard_from_zone.js deleted file mode 100644 index 273d597d038..00000000000 --- a/jstests/sharding/remove_shard_from_zone.js +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Basic integration tests for removeShardFromZone command. More detailed tests can be found - * in sharding_catalog_remove_shard_from_zone_test.cpp. - */ -(function() { - "use strict"; - - var st = new ShardingTest({shards: 1}); - - var configDB = st.s.getDB('config'); - var shardName = st.shard0.shardName; - - assert.commandWorked(st.s.adminCommand({addShardToZone: shardName, zone: 'x'})); - var shardDoc = configDB.shards.findOne(); - assert.eq(['x'], shardDoc.tags); - - // Test removing shard from existing zone. - assert.commandWorked(st.s.adminCommand({removeShardFromZone: shardName, zone: 'x'})); - shardDoc = configDB.shards.findOne(); - assert.eq([], shardDoc.tags); - - // Test removing shard from zone that no longer exists. - assert.commandWorked(st.s.adminCommand({removeShardFromZone: shardName, zone: 'x'})); - shardDoc = configDB.shards.findOne(); - assert.eq([], shardDoc.tags); - - st.stop(); -})(); diff --git a/jstests/sharding/shard_collection_verify_initial_chunks.js b/jstests/sharding/shard_collection_verify_initial_chunks.js index 0538bed1b34..e7072132b11 100644 --- a/jstests/sharding/shard_collection_verify_initial_chunks.js +++ b/jstests/sharding/shard_collection_verify_initial_chunks.js @@ -1,30 +1,57 @@ -// -// Verify numInitialChunks can not be set for non hashed key or nonempty collections -// - +/** + * Verify initial chunks are properly created and distributed in various combinations of shard key + * and empty/non-empty collections. + */ (function() { 'use strict'; - var st = new ShardingTest({mongos: 1, shards: 2}); - var kDbName = 'db'; - var mongos = st.s0; + let st = new ShardingTest({mongos: 1, shards: 2}); + let mongos = st.s0; - assert.commandWorked(mongos.adminCommand({enableSharding: kDbName})); + let config = mongos.getDB("config"); + let db = mongos.getDB('TestDB'); - assert.commandFailed(mongos.adminCommand( - {shardCollection: kDbName + '.foo', key: {aKey: 1}, numInitialChunks: 5})); + assert.commandWorked(mongos.adminCommand({enableSharding: 'TestDB'})); + st.ensurePrimaryShard('TestDB', st.shard1.shardName); - assert.writeOK(mongos.getDB(kDbName).foo.insert({aKey: 1})); - assert.commandWorked(mongos.getDB(kDbName).foo.createIndex({aKey: "hashed"})); - assert.commandFailed(mongos.adminCommand( - {shardCollection: kDbName + '.foo', key: {aKey: "hashed"}, numInitialChunks: 5})); + function checkChunkCounts(collName, chunksOnShard0, chunksOnShard1) { + let counts = st.chunkCounts(collName, 'TestDB'); + assert.eq(chunksOnShard0, + counts[st.shard0.shardName], + 'Count mismatch on shard0: ' + tojson(counts)); + assert.eq(chunksOnShard1, + counts[st.shard1.shardName], + 'Count mismatch on shard1: ' + tojson(counts)); + } - assert.writeOK(mongos.getDB(kDbName).foo.remove({})); + // Unsupported: Range sharding + numInitialChunks + assert.commandFailed(mongos.adminCommand( + {shardCollection: 'TestDB.RangeCollEmpty', key: {aKey: 1}, numInitialChunks: 6})); + + // Unsupported: Hashed sharding + numInitialChunks + non-empty collection + assert.writeOK(db.HashedCollNotEmpty.insert({aKey: 1})); + assert.commandWorked(db.HashedCollNotEmpty.createIndex({aKey: "hashed"})); + assert.commandFailed(mongos.adminCommand({ + shardCollection: 'TestDB.HashedCollNotEmpty', + key: {aKey: "hashed"}, + numInitialChunks: 6 + })); + + // Supported: Hashed sharding + numInitialChunks + empty collection + // Expected: Even chunk distribution + assert.commandWorked(db.HashedCollEmpty.createIndex({aKey: "hashed"})); assert.commandWorked(mongos.adminCommand( - {shardCollection: kDbName + '.foo', key: {aKey: "hashed"}, numInitialChunks: 5})); - - mongos.getDB(kDbName).dropDatabase(); + {shardCollection: 'TestDB.HashedCollEmpty', key: {aKey: "hashed"}, numInitialChunks: 6})); + checkChunkCounts('HashedCollEmpty', 3, 3); + + // Supported: Hashed sharding + numInitialChunks + non-existent collection + // Expected: Even chunk distribution + assert.commandWorked(mongos.adminCommand({ + shardCollection: 'TestDB.HashedCollNonExistent', + key: {aKey: "hashed"}, + numInitialChunks: 6 + })); + checkChunkCounts('HashedCollNonExistent', 3, 3); st.stop(); - })(); diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp index 81b01a612d3..ef3eda30ff1 100644 --- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp @@ -397,22 +397,9 @@ void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx, */ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx, const NamespaceString& nss, - int numShards, const std::vector<ShardId>& shardIds, - bool isEmpty, - const ShardKeyPattern& shardKeyPattern, const std::vector<BSONObj>& finalSplitPoints) { - auto catalogCache = Grid::get(opCtx)->catalogCache(); - - if (!shardKeyPattern.isHashedPattern()) { - // Only initially move chunks when using a hashed shard key. - return; - } - - if (!isEmpty) { - // If the collection is not empty, rely on the balancer to migrate the chunks. - return; - } + const auto catalogCache = Grid::get(opCtx)->catalogCache(); auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); uassert(ErrorCodes::ConflictingOperationInProgress, @@ -423,13 +410,18 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx, auto chunkManager = routingInfo.cm(); // Move and commit each "big chunk" to a different shard. - int i = 0; + auto nextShardId = [&, indx = 0 ]() mutable { + return shardIds[indx++ % shardIds.size()]; + }; + for (auto chunk : chunkManager->chunks()) { - const ShardId& shardId = shardIds[i++ % numShards]; + const auto shardId = nextShardId(); + const auto toStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId); if (!toStatus.isOK()) { continue; } + const auto to = toStatus.getValue(); // Can't move chunk to shard it's already on @@ -648,11 +640,9 @@ public: std::vector<ShardId> shardIds; shardRegistry->getAllShardIds(opCtx, &shardIds); - const int numShards = shardIds.size(); - uassert(ErrorCodes::IllegalOperation, "cannot shard collections before there are shards", - numShards > 0); + !shardIds.empty()); // Handle collections in the config db separately. if (nss.db() == NamespaceString::kConfigDb) { @@ -688,7 +678,8 @@ public: ON_BLOCK_EXIT([&conn] { conn.done(); }); // Step 1. - validateAndDeduceFullRequestOptions(opCtx, nss, shardKeyPattern, numShards, conn, &request); + validateAndDeduceFullRequestOptions( + opCtx, nss, shardKeyPattern, shardIds.size(), conn, &request); // The collation option should have been set to the collection default collation after being // validated. @@ -755,7 +746,23 @@ public: return true; } - bool isEmpty = (conn->count(nss.ns()) == 0); + // This check for empty collection is racy, because it is not guaranteed that documents + // will not show up in the collection right after the count below has executed. It is + // left here for backwards compatiblity with pre-4.0.4 clusters, which do not support + // sharding being performed by the primary shard. + const bool isEmpty = (conn->count(nss.ns()) == 0); + + // Map/reduce with output to an empty collection assumes it has full control of the + // output collection and it would be an unsupported operation if the collection is being + // concurrently written + const bool fromMapReduce = bool(request.getInitialSplitPoints()); + if (fromMapReduce) { + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Map reduce with sharded output to a new collection found " + << nss.ns() + << " to be non-empty which is not supported.", + isEmpty); + } // Step 3. validateShardKeyAgainstExistingIndexes( @@ -772,12 +779,12 @@ public: std::vector<BSONObj> initialSplitPoints; // there will be at most numShards-1 of these std::vector<BSONObj> finalSplitPoints; // all of the desired split points if (request.getInitialSplitPoints()) { - initialSplitPoints = std::move(*request.getInitialSplitPoints()); + initialSplitPoints = *request.getInitialSplitPoints(); } else { InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( shardKeyPattern, isEmpty, - numShards, + shardIds.size(), request.getNumInitialChunks(), &initialSplitPoints, &finalSplitPoints); @@ -786,15 +793,7 @@ public: LOG(0) << "CMD: shardcollection: " << cmdObj; audit::logShardCollection( - Client::getCurrent(), nss.ns(), proposedKey, request.getUnique()); - - // The initial chunks are distributed evenly across shards only if the initial split - // points - // were specified in the request, i.e., by mapReduce. Otherwise, all the initial chunks - // are - // placed on the primary shard, and may be distributed across shards through migrations - // (below) if using a hashed shard key. - const bool distributeInitialChunks = bool(request.getInitialSplitPoints()); + opCtx->getClient(), nss.ns(), proposedKey, request.getUnique()); // Step 6. Actually shard the collection. catalogManager->shardCollection(opCtx, @@ -804,7 +803,7 @@ public: *request.getCollation(), request.getUnique(), initialSplitPoints, - distributeInitialChunks, + fromMapReduce, primaryShardId); result << "collectionsharded" << nss.ns(); if (uuid) { @@ -818,9 +817,12 @@ public: collDistLock.reset(); dbDistLock.reset(); - // Step 7. Migrate initial chunks to distribute them across shards. - migrateAndFurtherSplitInitialChunks( - opCtx, nss, numShards, shardIds, isEmpty, shardKeyPattern, finalSplitPoints); + // Step 7. If the collection is empty and using hashed sharding, migrate initial chunks + // to spread them evenly across shards from the beginning. Otherwise rely on the + // balancer to do it. + if (isEmpty && shardKeyPattern.isHashedPattern()) { + migrateAndFurtherSplitInitialChunks(opCtx, nss, shardIds, finalSplitPoints); + } return true; } diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index 3654a465313..81d0ecb22ed 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -278,34 +278,32 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks( const ShardId& primaryShardId, const std::vector<BSONObj>& splitPoints, const std::vector<TagsType>& tags, - const bool distributeInitialChunks, - const bool isEmpty, - const int numContiguousChunksPerShard) { - const auto& keyPattern = shardKeyPattern.getKeyPattern(); + bool isEmpty, + int numContiguousChunksPerShard) { + uassert(ErrorCodes::InvalidOptions, + "Cannot generate initial chunks based on both split points and zones", + tags.empty() || splitPoints.empty()); - std::vector<BSONObj> finalSplitPoints; + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (splitPoints.empty() && tags.empty()) { - // If neither split points nor tags were specified use the shard's data distribution to - // determine them - auto primaryShard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId)); + const auto& keyPattern = shardKeyPattern.getKeyPattern(); - // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of - // the splitVector command and affects the number of chunks returned, has been loaded. - uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx)); - - if (!isEmpty) { - finalSplitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( - opCtx, - primaryShardId, - nss, - shardKeyPattern, - ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()), - Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), - 0)); - } + const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); + + // On which shards are the generated chunks allowed to be placed + std::vector<ShardId> shardIds; + if (isEmpty) { + shardRegistry->getAllShardIdsNoReload(&shardIds); } else { + shardIds.push_back(primaryShardId); + } + + ShardCollectionConfig initialChunks; + + // If split points are requested, they take precedence over zones + if (!splitPoints.empty()) { + std::vector<BSONObj> finalSplitPoints; + // Make sure points are unique and ordered auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); @@ -316,37 +314,63 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks( for (const auto& splitPoint : orderedPts) { finalSplitPoints.push_back(splitPoint); } - } - - uassert(ErrorCodes::InvalidOptions, - str::stream() << "cannot generate initial chunks based on both split points and tags", - tags.empty() || finalSplitPoints.empty()); - - const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); - - // If docs already exist for the collection, must use primary shard, otherwise defer to - // passed-in distribution option. - std::vector<ShardId> shardIds; - if (isEmpty && distributeInitialChunks) { - Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); - } else { - shardIds.push_back(primaryShardId); + initialChunks = generateShardCollectionInitialChunks(nss, + shardKeyPattern, + primaryShardId, + validAfter, + finalSplitPoints, + shardIds, + numContiguousChunksPerShard); } + // If zones are defined, use the zones + else if (!tags.empty()) { + if (isEmpty) { + initialChunks = generateShardCollectionInitialZonedChunks( + nss, shardKeyPattern, validAfter, tags, getTagToShardIds(opCtx, tags), shardIds); + } else { + // For a non-empty collection, create one chunk on the primary shard and leave it to the + // balancer to do the zone splitting and placement + ChunkVersion version(1, 0, OID::gen()); + appendChunk(nss, + keyPattern.globalMin(), + keyPattern.globalMax(), + &version, + validAfter, + primaryShardId, + &initialChunks.chunks); + } + } + // If neither split points nor zones are available and the collection is not empty, ask the + // shard to select split points based on the data distribution + else if (!isEmpty) { + auto primaryShard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId)); - ShardCollectionConfig initialChunks; + // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of + // the splitVector command and affects the number of chunks returned, has been loaded. + const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); + uassertStatusOK(balancerConfig->refreshAndCheck(opCtx)); + + const auto shardSelectedSplitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( + opCtx, + primaryShardId, + nss, + shardKeyPattern, + ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()), + balancerConfig->getMaxChunkSizeBytes(), + 0)); - if (tags.empty()) { initialChunks = generateShardCollectionInitialChunks(nss, shardKeyPattern, primaryShardId, validAfter, - finalSplitPoints, + shardSelectedSplitPoints, shardIds, numContiguousChunksPerShard); - } else if (!isEmpty) { - // For a non-empty collection, create one chunk on the primary shard and leave it to the - // balancer to do the zone split and rebalancing + } + // For empty collection, just create a single chunk + else { ChunkVersion version(1, 0, OID::gen()); appendChunk(nss, keyPattern.globalMin(), @@ -355,9 +379,6 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks( validAfter, primaryShardId, &initialChunks.chunks); - } else { - initialChunks = generateShardCollectionInitialZonedChunks( - nss, shardKeyPattern, validAfter, tags, getTagToShardIds(opCtx, tags), shardIds); } LOG(0) << "Created " << initialChunks.chunks.size() << " chunk(s) for: " << nss diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h index 6e43265b3ae..b7bf33c797a 100644 --- a/src/mongo/db/s/config/initial_split_policy.h +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -120,8 +120,12 @@ public: const std::vector<ShardId>& shardIdsForGaps); /** - * Creates the first chunks for a newly sharded collection. - * Returns the created chunks. + * Generates a list with what are the most optimal first chunks and placement for a newly + * sharded collection. + * + * If the collection 'isEmpty', chunks will be spread across all available (appropriate based on + * zoning rules) shards. Otherwise, they will all end up on the primary shard after which the + * balancer will take care of properly distributing them around. */ static ShardCollectionConfig createFirstChunks(OperationContext* opCtx, const NamespaceString& nss, @@ -129,9 +133,8 @@ public: const ShardId& primaryShardId, const std::vector<BSONObj>& splitPoints, const std::vector<TagsType>& tags, - const bool distributeInitialChunks, - const bool isEmpty, - const int numContiguousChunksPerShard = 1); + bool isEmpty, + int numContiguousChunksPerShard = 1); /** * Writes to the config server the first chunks for a newly sharded collection. diff --git a/src/mongo/db/s/config/initial_split_policy_test.cpp b/src/mongo/db/s/config/initial_split_policy_test.cpp index 7ff8e44fef2..daa8d0c29f3 100644 --- a/src/mongo/db/s/config/initial_split_policy_test.cpp +++ b/src/mongo/db/s/config/initial_split_policy_test.cpp @@ -118,7 +118,7 @@ TEST(CalculateHashedSplitPointsTest, EmptyCollectionChunksEqualToShards) { checkCalculatedHashedSplitPoints(true, true, 3, 3, &expectedSplitPoints, &expectedSplitPoints); } -TEST(CalculateHashedSplitPointsTest, EmptyCollectionHashedWithInitialSplitsReturnsEmptySplits) { +TEST(CalculateHashedSplitPointsTest, EmptyCollectionHashedWithNoInitialSplitsReturnsEmptySplits) { const std::vector<BSONObj> expectedSplitPoints; checkCalculatedHashedSplitPoints(true, true, 2, 1, &expectedSplitPoints, &expectedSplitPoints); } @@ -147,7 +147,7 @@ TEST(CalculateHashedSplitPointsTest, NotHashedWithInitialSplitsFails) { ErrorCodes::InvalidOptions); } -class GenerateInitialSplitChunksTest : public unittest::Test { +class GenerateInitialSplitChunksTestBase : public unittest::Test { public: /** * Returns a vector of ChunkType objects for the given chunk ranges. @@ -207,7 +207,7 @@ private: const Timestamp _timeStamp{Date_t::now()}; }; -class GenerateInitialHashedSplitChunksTest : public GenerateInitialSplitChunksTest { +class GenerateInitialHashedSplitChunksTest : public GenerateInitialSplitChunksTestBase { public: const std::vector<BSONObj>& hashedSplitPoints() { return _splitPoints; @@ -263,7 +263,7 @@ TEST_F(GenerateInitialHashedSplitChunksTest, assertChunkVectorsAreEqual(expectedChunks, shardCollectionConfig.chunks); } -class GenerateShardCollectionInitialZonedChunksTest : public GenerateInitialSplitChunksTest { +class GenerateShardCollectionInitialZonedChunksTest : public GenerateInitialSplitChunksTestBase { public: /** * Calls generateShardCollectionInitialZonedChunks according to the given arguments diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 091a2cd206b..6e4bef00edd 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -257,19 +257,18 @@ public: /** - * Shards a collection. Assumes that the database is enabled for sharding. + * Shards collection with namespace 'nss' and implicitly assumes that the database is enabled + * for sharding (i.e., doesn't check whether enableSharding has been called previously). * - * @param ns: namespace of collection to shard - * @param uuid: the collection's UUID. Optional because new in 3.6. - * @param fieldsAndOrder: shardKey pattern - * @param defaultCollation: the default collation for the collection, to be written to - * config.collections. If empty, the collection default collation is simple binary - * comparison. Note the the shard key collation will always be simple binary comparison, - * even if the collection default collation is non-simple. - * @param unique: if true, ensure underlying index enforces a unique constraint. - * @param initPoints: create chunks based on a set of specified split points. - * @param initShardIds: If non-empty, specifies the set of shards to assign chunks between. - * Otherwise all chunks will be assigned to the primary shard for the database. + * uuid - the collection's UUID. Optional because new in 3.6. + * fieldsAndOrder - shard key pattern to use. + * defaultCollation - the default collation for the collection, excluding the shard key. If + * empty, defaults to simple binary comparison. Note that the shard key collation will always + * be simple binary comparison, even if the collection default collation is non-simple. + * unique - if true, ensure underlying index enforces a unique constraint. + * initPoints - create chunks based on a set of specified split points. + * isFromMapReduce - whether this request comes from map/reduce, in which case the generated + * chunks can be spread across shards. Otherwise they will stay on the primary shard. */ void shardCollection(OperationContext* opCtx, const NamespaceString& nss, @@ -278,7 +277,7 @@ public: const BSONObj& defaultCollation, bool unique, const std::vector<BSONObj>& initPoints, - const bool distributeInitialChunks, + bool isFromMapReduce, const ShardId& dbPrimaryShardId); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index 71106b3a396..9dbbc01b517 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -366,7 +366,7 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, const BSONObj& defaultCollation, bool unique, const vector<BSONObj>& splitPoints, - const bool distributeInitialChunks, + bool isFromMapReduce, const ShardId& dbPrimaryShardId) { const auto catalogClient = Grid::get(opCtx)->catalogClient(); const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); @@ -376,14 +376,23 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, // Fail if there are partially written chunks from a previous failed shardCollection. checkForExistingChunks(opCtx, nss); + // Prior to 4.0.5, zones cannot be taken into account at collection sharding time, so ignore + // them and let the balancer apply them later + const std::vector<TagsType> treatAsNoZonesDefined; + + // Map/reduce with output to sharded collection ignores consistency checks and requires the + // initial chunks to be spread across shards unconditionally + const bool treatAsEmpty = isFromMapReduce; + // Record start in changelog { BSONObjBuilder collectionDetail; collectionDetail.append("shardKey", fieldsAndOrder.toBSON()); collectionDetail.append("collection", nss.ns()); - if (uuid) { + if (uuid) uuid->appendToBuilder(&collectionDetail, "uuid"); - } + collectionDetail.append("empty", treatAsEmpty); + collectionDetail.append("fromMapReduce", isFromMapReduce); collectionDetail.append("primary", primaryShard->toString()); collectionDetail.append("numChunks", static_cast<int>(splitPoints.size() + 1)); uassertStatusOK(catalogClient->logChange(opCtx, @@ -400,19 +409,13 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, ->makeFromBSON(defaultCollation)); } - std::vector<TagsType> tags; - // Since this code runs on the config server, we cannot guarantee that the collection is still - // empty by the time the metadata is written so always assume we are sharding a non-empty - // collection. - bool isEmpty = false; const auto initialChunks = InitialSplitPolicy::createFirstChunks(opCtx, nss, fieldsAndOrder, dbPrimaryShardId, splitPoints, - tags, - distributeInitialChunks, - isEmpty); + treatAsNoZonesDefined, + treatAsEmpty); InitialSplitPolicy::writeFirstChunksToConfig(opCtx, initialChunks); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp index e8fda03ca89..c59ba36187f 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp @@ -39,25 +39,15 @@ #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/client/remote_command_targeter_mock.h" -#include "mongo/db/client.h" -#include "mongo/db/commands.h" #include "mongo/db/s/config/initial_split_policy.h" #include "mongo/db/s/config/sharding_catalog_manager.h" -#include "mongo/executor/network_interface_mock.h" -#include "mongo/executor/task_executor.h" #include "mongo/rpc/metadata/tracking_metadata.h" -#include "mongo/s/catalog/type_changelog.h" #include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog/type_collection.h" -#include "mongo/s/catalog/type_database.h" -#include "mongo/s/catalog/type_locks.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config_server_test_fixture.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" -#include "mongo/stdx/future.h" -#include "mongo/transport/mock_session.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" @@ -74,12 +64,8 @@ using std::string; using std::vector; using unittest::assertGet; -const ShardId testPrimaryShard = ShardId("shard0"); - -const NamespaceString kNamespace("db1.foo"); - -class ShardCollectionTest : public ConfigServerTestFixture { -public: +class ShardCollectionTestBase : public ConfigServerTestFixture { +protected: void expectSplitVector(const HostAndPort& shardHost, const ShardKeyPattern& keyPattern, const BSONObj& splitPoints) { @@ -106,13 +92,50 @@ public: }); } + const ShardId testPrimaryShard{"shard0"}; + const NamespaceString kNamespace{"db1.foo"}; + private: const HostAndPort configHost{"configHost1"}; const ConnectionString configCS{ConnectionString::forReplicaSet("configReplSet", {configHost})}; const HostAndPort clientHost{"clientHost1"}; }; -TEST_F(ShardCollectionTest, anotherMongosSharding) { + +// Tests which exercise the ShardingCatalogManager::shardCollection logic, which is what the config +// server uses to shard collections, when the '_shardsvrShardCollection' command is not available +// (fast initial split optimization) +class ConfigServerShardCollectionTest : public ShardCollectionTestBase { +protected: + void checkWrittenChunks(const std::vector<ChunkType>& expectedChunks) { + const auto grid = Grid::get(operationContext()); + const auto catalogClient = grid->catalogClient(); + repl::OpTime unusedOpTime; + const auto writtenChunks = + assertGet(catalogClient->getChunks(operationContext(), + BSON("ns" << kNamespace.ns()), + BSON("min" << 1), + boost::none, + &unusedOpTime, + repl::ReadConcernLevel::kLocalReadConcern)); + ASSERT_EQ(expectedChunks.size(), writtenChunks.size()); + + auto itE = expectedChunks.begin(); + auto itW = writtenChunks.begin(); + for (; itE != expectedChunks.end(); itE++, itW++) { + const auto& expected = *itE; + const auto& written = *itW; + ASSERT_BSONOBJ_EQ(expected.getMin(), expected.getMin()); + ASSERT_BSONOBJ_EQ(expected.getMax(), expected.getMax()); + ASSERT_EQ(expected.getShard(), written.getShard()); + } + } + + const ShardKeyPattern keyPattern{BSON("_id" << 1)}; + const BSONObj defaultCollation; +}; + +TEST_F(ConfigServerShardCollectionTest, Partially_Written_Chunks_Present) { ShardType shard; shard.setName("shard0"); shard.setHost("shardHost"); @@ -130,25 +153,21 @@ TEST_F(ShardCollectionTest, anotherMongosSharding) { chunk.setMax(BSON("_id" << 5)); ASSERT_OK(setupChunks({chunk})); - ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); - BSONObj defaultCollation; - ASSERT_THROWS_CODE(ShardingCatalogManager::get(operationContext()) ->shardCollection(operationContext(), kNamespace, boost::none, // UUID - shardKeyPattern, + keyPattern, defaultCollation, false, - vector<BSONObj>{}, - false, + {}, + false, // isFromMapReduce testPrimaryShard), AssertionException, ErrorCodes::ManualInterventionRequired); } -TEST_F(ShardCollectionTest, noInitialChunksOrData) { - // Initial setup +TEST_F(ConfigServerShardCollectionTest, RangeSharding_ForMapReduce_NoInitialSplitPoints) { const HostAndPort shardHost{"shardHost"}; ShardType shard; shard.setName("shard0"); @@ -164,9 +183,6 @@ TEST_F(ShardCollectionTest, noInitialChunksOrData) { setupDatabase(kNamespace.db().toString(), shard.getName(), true); - ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); - BSONObj defaultCollation; - // Now start actually sharding the collection. auto future = launchAsync([&] { ON_BLOCK_EXIT([&] { Client::destroy(); }); @@ -176,17 +192,14 @@ TEST_F(ShardCollectionTest, noInitialChunksOrData) { ->shardCollection(opCtx.get(), kNamespace, boost::none, // UUID - shardKeyPattern, + keyPattern, defaultCollation, false, - vector<BSONObj>{}, - false, + {}, // No split points + true, // isFromMapReduce testPrimaryShard); }); - // Respond to the splitVector command sent to the shard to figure out initial split points. - expectSplitVector(shardHost, shardKeyPattern, BSONObj()); - // Expect the set shard version for that namespace. // We do not check for a specific ChunkVersion, because we cannot easily know the OID that was // generated by shardCollection for the first chunk. @@ -194,10 +207,15 @@ TEST_F(ShardCollectionTest, noInitialChunksOrData) { expectSetShardVersion(shardHost, shard, kNamespace, boost::none /* expected ChunkVersion */); future.timed_get(kFutureTimeout); + + checkWrittenChunks( + {ChunkType(kNamespace, + {keyPattern.getKeyPattern().globalMin(), keyPattern.getKeyPattern().globalMax()}, + ChunkVersion::IGNORED(), + testPrimaryShard)}); } -TEST_F(ShardCollectionTest, withInitialChunks) { - // Initial setup +TEST_F(ConfigServerShardCollectionTest, RangeSharding_ForMapReduce_WithInitialSplitPoints) { const HostAndPort shard0Host{"shardHost0"}; const HostAndPort shard1Host{"shardHost1"}; const HostAndPort shard2Host{"shardHost2"}; @@ -234,64 +252,13 @@ TEST_F(ShardCollectionTest, withInitialChunks) { setupDatabase(kNamespace.db().toString(), shard0.getName(), true); - ShardKeyPattern keyPattern(BSON("_id" << 1)); - BSONObj splitPoint0 = BSON("_id" << 1); BSONObj splitPoint1 = BSON("_id" << 100); BSONObj splitPoint2 = BSON("_id" << 200); BSONObj splitPoint3 = BSON("_id" << 300); - ChunkVersion expectedVersion(1, 0, OID::gen()); - - ChunkType expectedChunk0; - expectedChunk0.setNS(kNamespace); - expectedChunk0.setShard(shard0.getName()); - expectedChunk0.setMin(keyPattern.getKeyPattern().globalMin()); - expectedChunk0.setMax(splitPoint0); - expectedChunk0.setVersion(expectedVersion); - expectedVersion.incMinor(); - - ChunkType expectedChunk1; - expectedChunk1.setNS(kNamespace); - expectedChunk1.setShard(shard1.getName()); - expectedChunk1.setMin(splitPoint0); - expectedChunk1.setMax(splitPoint1); - expectedChunk1.setVersion(expectedVersion); - expectedVersion.incMinor(); - - ChunkType expectedChunk2; - expectedChunk2.setNS(kNamespace); - expectedChunk2.setShard(shard2.getName()); - expectedChunk2.setMin(splitPoint1); - expectedChunk2.setMax(splitPoint2); - expectedChunk2.setVersion(expectedVersion); - expectedVersion.incMinor(); - - ChunkType expectedChunk3; - expectedChunk3.setNS(kNamespace); - expectedChunk3.setShard(shard0.getName()); - expectedChunk3.setMin(splitPoint2); - expectedChunk3.setMax(splitPoint3); - expectedChunk3.setVersion(expectedVersion); - expectedVersion.incMinor(); - - ChunkType expectedChunk4; - expectedChunk4.setNS(kNamespace); - expectedChunk4.setShard(shard1.getName()); - expectedChunk4.setMin(splitPoint3); - expectedChunk4.setMax(keyPattern.getKeyPattern().globalMax()); - expectedChunk4.setVersion(expectedVersion); - - vector<ChunkType> expectedChunks{ - expectedChunk0, expectedChunk1, expectedChunk2, expectedChunk3, expectedChunk4}; - - BSONObj defaultCollation; - // Now start actually sharding the collection. auto future = launchAsync([&] { - // TODO: can we mock the ShardRegistry to return these? - set<ShardId> shards{shard0.getName(), shard1.getName(), shard2.getName()}; - ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); auto opCtx = cc().makeOperationContext(); @@ -302,8 +269,8 @@ TEST_F(ShardCollectionTest, withInitialChunks) { keyPattern, defaultCollation, true, - vector<BSONObj>{splitPoint0, splitPoint1, splitPoint2, splitPoint3}, - true, + {splitPoint0, splitPoint1, splitPoint2, splitPoint3}, + true, // isFromMapReduce testPrimaryShard); }); @@ -314,10 +281,30 @@ TEST_F(ShardCollectionTest, withInitialChunks) { expectSetShardVersion(shard0Host, shard0, kNamespace, boost::none /* expected ChunkVersion */); future.timed_get(kFutureTimeout); + + checkWrittenChunks({ChunkType(kNamespace, + ChunkRange{keyPattern.getKeyPattern().globalMin(), splitPoint0}, + ChunkVersion::IGNORED(), + shard0.getName()), + ChunkType(kNamespace, + ChunkRange{splitPoint0, splitPoint1}, + ChunkVersion::IGNORED(), + shard1.getName()), + ChunkType(kNamespace, + ChunkRange{splitPoint1, splitPoint2}, + ChunkVersion::IGNORED(), + shard2.getName()), + ChunkType(kNamespace, + ChunkRange{splitPoint2, splitPoint3}, + ChunkVersion::IGNORED(), + shard0.getName()), + ChunkType(kNamespace, + ChunkRange{splitPoint3, keyPattern.getKeyPattern().globalMax()}, + ChunkVersion::IGNORED(), + shard1.getName())}); } -TEST_F(ShardCollectionTest, withInitialData) { - // Initial setup +TEST_F(ConfigServerShardCollectionTest, RangeSharding_NoInitialSplitPoints_NoSplitVectorPoints) { const HostAndPort shardHost{"shardHost"}; ShardType shard; shard.setName("shard0"); @@ -333,14 +320,56 @@ TEST_F(ShardCollectionTest, withInitialData) { setupDatabase(kNamespace.db().toString(), shard.getName(), true); - ShardKeyPattern keyPattern(BSON("_id" << 1)); + // Now start actually sharding the collection. + auto future = launchAsync([&] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready("Test"); + auto opCtx = cc().makeOperationContext(); + ShardingCatalogManager::get(operationContext()) + ->shardCollection(opCtx.get(), + kNamespace, + boost::none, // UUID + keyPattern, + defaultCollation, + false, + {}, // No split points + false, // isFromMapReduce + testPrimaryShard); + }); - BSONObj splitPoint0 = BSON("_id" << 1); - BSONObj splitPoint1 = BSON("_id" << 100); - BSONObj splitPoint2 = BSON("_id" << 200); - BSONObj splitPoint3 = BSON("_id" << 300); + // Respond to the splitVector command sent to the shard to figure out initial split points. + expectSplitVector(shardHost, keyPattern, BSONObj()); + + // Expect the set shard version for that namespace. + // We do not check for a specific ChunkVersion, because we cannot easily know the OID that was + // generated by shardCollection for the first chunk. + // TODO SERVER-29451: add hooks to the mock storage engine to expect reads and writes. + expectSetShardVersion(shardHost, shard, kNamespace, boost::none /* expected ChunkVersion */); + + future.timed_get(kFutureTimeout); + + checkWrittenChunks( + {ChunkType(kNamespace, + {keyPattern.getKeyPattern().globalMin(), keyPattern.getKeyPattern().globalMax()}, + ChunkVersion::IGNORED(), + testPrimaryShard)}); +} + +TEST_F(ConfigServerShardCollectionTest, RangeSharding_NoInitialSplitPoints_WithSplitVectorPoints) { + const HostAndPort shardHost{"shardHost"}; + ShardType shard; + shard.setName("shard0"); + shard.setHost(shardHost.toString()); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(ConnectionString(shardHost)); + targeter->setFindHostReturnValue(shardHost); + targeterFactory()->addTargeterToReturn(ConnectionString(shardHost), std::move(targeter)); - BSONObj defaultCollation; + ASSERT_OK(setupShards(vector<ShardType>{shard})); + + setupDatabase(kNamespace.db().toString(), shard.getName(), true); // Now start actually sharding the collection. auto future = launchAsync([&] { @@ -354,11 +383,16 @@ TEST_F(ShardCollectionTest, withInitialData) { keyPattern, defaultCollation, false, - vector<BSONObj>{}, - false, + {}, // No split points + false, // isFromMapReduce testPrimaryShard); }); + BSONObj splitPoint0 = BSON("_id" << 1); + BSONObj splitPoint1 = BSON("_id" << 100); + BSONObj splitPoint2 = BSON("_id" << 200); + BSONObj splitPoint3 = BSON("_id" << 300); + // Respond to the splitVector command sent to the shard to figure out initial split points. expectSplitVector(shardHost, keyPattern, @@ -371,12 +405,136 @@ TEST_F(ShardCollectionTest, withInitialData) { expectSetShardVersion(shardHost, shard, kNamespace, boost::none); future.timed_get(kFutureTimeout); + + checkWrittenChunks({ChunkType(kNamespace, + ChunkRange{keyPattern.getKeyPattern().globalMin(), splitPoint0}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint0, splitPoint1}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint1, splitPoint2}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint2, splitPoint3}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint3, keyPattern.getKeyPattern().globalMax()}, + ChunkVersion::IGNORED(), + testPrimaryShard)}); } -using CreateFirstChunksTest = ShardCollectionTest; +TEST_F(ConfigServerShardCollectionTest, RangeSharding_WithInitialSplitPoints_NoSplitVectorPoints) { + const HostAndPort shardHost{"shardHost"}; + ShardType shard; + shard.setName("shard0"); + shard.setHost(shardHost.toString()); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(ConnectionString(shardHost)); + targeter->setFindHostReturnValue(shardHost); + targeterFactory()->addTargeterToReturn(ConnectionString(shardHost), std::move(targeter)); + + ASSERT_OK(setupShards(vector<ShardType>{shard})); -TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmptyCollection) { + setupDatabase(kNamespace.db().toString(), shard.getName(), true); + + BSONObj splitPoint0 = BSON("_id" << 1); + BSONObj splitPoint1 = BSON("_id" << 100); + BSONObj splitPoint2 = BSON("_id" << 200); + BSONObj splitPoint3 = BSON("_id" << 300); + + // Now start actually sharding the collection. + auto future = launchAsync([&] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready("Test"); + auto opCtx = cc().makeOperationContext(); + ShardingCatalogManager::get(operationContext()) + ->shardCollection(opCtx.get(), + kNamespace, + boost::none, // UUID + keyPattern, + defaultCollation, + false, + {splitPoint0, splitPoint1, splitPoint2, splitPoint3}, + false, // isFromMapReduce + testPrimaryShard); + }); + + // Expect the set shard version for that namespace + // We do not check for a specific ChunkVersion, because we cannot easily know the OID that was + // generated by shardCollection for the first chunk. + // TODO SERVER-29451: add hooks to the mock storage engine to expect reads and writes. + expectSetShardVersion(shardHost, shard, kNamespace, boost::none); + + future.timed_get(kFutureTimeout); + + checkWrittenChunks({ChunkType(kNamespace, + ChunkRange{keyPattern.getKeyPattern().globalMin(), splitPoint0}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint0, splitPoint1}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint1, splitPoint2}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint2, splitPoint3}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint3, keyPattern.getKeyPattern().globalMax()}, + ChunkVersion::IGNORED(), + testPrimaryShard)}); +} + + +// Direct tests for InitialSplitPolicy::createFirstChunks which is the base call for both the config +// server and shard server's shard collection logic +class CreateFirstChunksTest : public ShardCollectionTestBase { +protected: const ShardKeyPattern kShardKeyPattern{BSON("x" << 1)}; +}; + +TEST_F(CreateFirstChunksTest, Split_Disallowed_With_Both_SplitPoints_And_Zones) { + ASSERT_THROWS_CODE( + InitialSplitPolicy::createFirstChunks( + operationContext(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {BSON("x" << 0)}, + {TagsType(kNamespace, + "TestZone", + ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))}, + true /* isEmpty */), + AssertionException, + ErrorCodes::InvalidOptions); + + ASSERT_THROWS_CODE( + InitialSplitPolicy::createFirstChunks( + operationContext(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {BSON("x" << 0)}, /* No split points */ + {TagsType(kNamespace, + "TestZone", + ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))}, + false /* isEmpty */), + AssertionException, + ErrorCodes::InvalidOptions); +} + +TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromSplitVector_ManyChunksToPrimary) { const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"), ShardType("shard1", "rs1/shard1:123"), ShardType("shard2", "rs2/shard2:123")}; @@ -400,9 +558,8 @@ TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmp kNamespace, kShardKeyPattern, ShardId("shard1"), - {}, - {}, - true, + {}, /* No split points */ + {}, /* No zones */ false /* isEmpty */); }); @@ -414,8 +571,42 @@ TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmp ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard()); } -TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithTagsIgnoredForNonEmptyCollection) { - const ShardKeyPattern kShardKeyPattern{BSON("x" << 1)}; +TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromClient_ManyChunksToPrimary) { + const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"), + ShardType("shard1", "rs1/shard1:123"), + ShardType("shard2", "rs2/shard2:123")}; + + const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost())); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(connStr); + targeter->setFindHostReturnValue(connStr.getServers()[0]); + targeterFactory()->addTargeterToReturn(connStr, std::move(targeter)); + + ASSERT_OK(setupShards(kShards)); + shardRegistry()->reload(operationContext()); + + auto future = launchAsync([&] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready("Test"); + auto opCtx = cc().makeOperationContext(); + return InitialSplitPolicy::createFirstChunks(opCtx.get(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {BSON("x" << 0)}, + {}, /* No zones */ + false /* isEmpty */); + }); + + const auto& firstChunks = future.timed_get(kFutureTimeout); + ASSERT_EQ(2U, firstChunks.chunks.size()); + ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard()); + ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard()); +} + +TEST_F(CreateFirstChunksTest, NonEmptyCollection_WithZones_OneChunkToPrimary) { const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123", {"TestZone"}), ShardType("shard1", "rs1/shard1:123", {"TestZone"}), ShardType("shard2", "rs2/shard2:123")}; @@ -427,14 +618,108 @@ TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithTagsIgnoredForNonEmptyC kNamespace, kShardKeyPattern, ShardId("shard1"), - {}, - {TagsType(kNamespace, "TestZone", ChunkRange(BSON("x" << MinKey), BSON("x" << 0)))}, - true, + {}, /* No split points */ + {TagsType(kNamespace, + "TestZone", + ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))}, false /* isEmpty */); ASSERT_EQ(1U, firstChunks.chunks.size()); ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard()); } +TEST_F(CreateFirstChunksTest, EmptyCollection_SplitPoints_FromClient_ManyChunksDistributed) { + const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"), + ShardType("shard1", "rs1/shard1:123"), + ShardType("shard2", "rs2/shard2:123")}; + + const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost())); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(connStr); + targeter->setFindHostReturnValue(connStr.getServers()[0]); + targeterFactory()->addTargeterToReturn(connStr, std::move(targeter)); + + ASSERT_OK(setupShards(kShards)); + shardRegistry()->reload(operationContext()); + + auto future = launchAsync([&] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready("Test"); + auto opCtx = cc().makeOperationContext(); + return InitialSplitPolicy::createFirstChunks(opCtx.get(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {BSON("x" << 0), BSON("x" << 100)}, + {}, /* No zones */ + true /* isEmpty */); + }); + + const auto& firstChunks = future.timed_get(kFutureTimeout); + ASSERT_EQ(3U, firstChunks.chunks.size()); + ASSERT_EQ(kShards[0].getName(), firstChunks.chunks[0].getShard()); + ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard()); + ASSERT_EQ(kShards[2].getName(), firstChunks.chunks[2].getShard()); +} + +TEST_F(CreateFirstChunksTest, EmptyCollection_NoSplitPoints_OneChunkToPrimary) { + const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"), + ShardType("shard1", "rs1/shard1:123"), + ShardType("shard2", "rs2/shard2:123")}; + + const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost())); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(connStr); + targeter->setFindHostReturnValue(connStr.getServers()[0]); + targeterFactory()->addTargeterToReturn(connStr, std::move(targeter)); + + ASSERT_OK(setupShards(kShards)); + shardRegistry()->reload(operationContext()); + + auto future = launchAsync([&] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready("Test"); + auto opCtx = cc().makeOperationContext(); + return InitialSplitPolicy::createFirstChunks(opCtx.get(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {}, /* No split points */ + {}, /* No zones */ + true /* isEmpty */); + }); + + const auto& firstChunks = future.timed_get(kFutureTimeout); + ASSERT_EQ(1U, firstChunks.chunks.size()); + ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard()); +} + +TEST_F(CreateFirstChunksTest, EmptyCollection_WithZones_ManyChunksOnFirstZoneShard) { + const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123", {"TestZone"}), + ShardType("shard1", "rs1/shard1:123", {"TestZone"}), + ShardType("shard2", "rs2/shard2:123")}; + ASSERT_OK(setupShards(kShards)); + shardRegistry()->reload(operationContext()); + + const auto firstChunks = InitialSplitPolicy::createFirstChunks( + operationContext(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {}, /* No split points */ + {TagsType(kNamespace, + "TestZone", + ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))}, + true /* isEmpty */); + + ASSERT_EQ(2U, firstChunks.chunks.size()); + ASSERT_EQ(kShards[0].getName(), firstChunks.chunks[0].getShard()); + ASSERT_EQ(kShards[0].getName(), firstChunks.chunks[1].getShard()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp index ab296364b39..6f9d335b67f 100644 --- a/src/mongo/db/s/shardsvr_shard_collection.cpp +++ b/src/mongo/db/s/shardsvr_shard_collection.cpp @@ -421,7 +421,7 @@ void shardCollection(OperationContext* opCtx, bool unique, const std::vector<BSONObj>& splitPoints, const std::vector<TagsType>& tags, - const bool fromMapReduce, + bool fromMapReduce, const ShardId& dbPrimaryShardId, const int numContiguousChunksPerShard, const bool isEmpty) { @@ -429,8 +429,6 @@ void shardCollection(OperationContext* opCtx, const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); const auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbPrimaryShardId)); - const bool distributeChunks = - fromMapReduce || fieldsAndOrder.isHashedPattern() || !tags.empty(); // Fail if there are partially written chunks from a previous failed shardCollection. checkForExistingChunks(opCtx, nss); @@ -440,9 +438,10 @@ void shardCollection(OperationContext* opCtx, BSONObjBuilder collectionDetail; collectionDetail.append("shardKey", fieldsAndOrder.toBSON()); collectionDetail.append("collection", nss.ns()); - if (uuid) { + if (uuid) uuid->appendToBuilder(&collectionDetail, "uuid"); - } + collectionDetail.append("empty", isEmpty); + collectionDetail.append("fromMapReduce", fromMapReduce); collectionDetail.append("primary", primaryShard->toString()); collectionDetail.append("numChunks", static_cast<int>(splitPoints.size() + 1)); uassertStatusOK(catalogClient->logChange(opCtx, @@ -465,7 +464,6 @@ void shardCollection(OperationContext* opCtx, dbPrimaryShardId, splitPoints, tags, - distributeChunks, isEmpty, numContiguousChunksPerShard); @@ -680,7 +678,7 @@ public: std::vector<BSONObj> finalSplitPoints; if (request.getInitialSplitPoints()) { - finalSplitPoints = std::move(*request.getInitialSplitPoints()); + finalSplitPoints = *request.getInitialSplitPoints(); } else if (tags.empty()) { InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( shardKeyPattern, @@ -701,12 +699,21 @@ public: LOG(0) << "CMD: shardcollection: " << cmdObj; audit::logShardCollection( - Client::getCurrent(), nss.ns(), proposedKey, request.getUnique()); + opCtx->getClient(), nss.ns(), proposedKey, request.getUnique()); - // The initial chunks are distributed evenly across shards if the initial split - // points were specified in the request by mapReduce or if we are using a hashed - // shard key. Otherwise, all the initial chunks are placed on the primary shard. + // Map/reduce with output to an empty collection assumes it has full control of the + // output collection and it would be an unsupported operation if the collection is + // being concurrently written const bool fromMapReduce = bool(request.getInitialSplitPoints()); + if (fromMapReduce) { + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() + << "Map reduce with sharded output to a new collection found " + << nss.ns() + << " to be non-empty which is not supported.", + isEmpty); + } + const int numContiguousChunksPerShard = initialSplitPoints.empty() ? 1 : (finalSplitPoints.size() + 1) / (initialSplitPoints.size() + 1); |