diff options
author | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2018-07-27 09:56:07 -0400 |
---|---|---|
committer | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2018-07-31 17:50:44 -0400 |
commit | d83b73ea2db96ccbcf5f2a0710f360f88896ab9c (patch) | |
tree | 452fd6c857dd2e9398dcfc66fac3dc55d4ff90fb /src | |
parent | 78dec3622268ad27bb855eda4c6a4ed345412fd9 (diff) | |
download | mongo-d83b73ea2db96ccbcf5f2a0710f360f88896ab9c.tar.gz |
SERVER-14394 Create initial hashed shard key chunks directly on shards
Diffstat (limited to 'src')
10 files changed, 313 insertions, 294 deletions
diff --git a/src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp b/src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp index e49b7ab3ae9..b51549e5283 100644 --- a/src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp +++ b/src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp @@ -30,20 +30,10 @@ #include "mongo/platform/basic.h" -#include "mongo/db/audit.h" -#include "mongo/db/auth/action_set.h" -#include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_session.h" -#include "mongo/db/auth/privilege.h" -#include "mongo/db/catalog/index_create.h" -#include "mongo/db/catalog_raii.h" #include "mongo/db/commands.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/operation_context.h" #include "mongo/db/s/migration_destination_manager.h" -#include "mongo/db/service_context.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/grid.h" +#include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/s/request_types/clone_collection_options_from_primary_shard_gen.h" #include "mongo/s/shard_id.h" #include "mongo/util/log.h" @@ -63,11 +53,15 @@ public: auto primaryShardId = ShardId(request().getPrimaryShard().toString()); MigrationDestinationManager::cloneCollectionIndexesAndOptions( opCtx, ns(), primaryShardId); + + // At the time this command is invoked, the config server primary has already written + // the collection's routing metadata, so sync from the config server + forceShardFilteringMetadataRefresh(opCtx, ns()); } private: bool supportsWriteConcern() const override { - return false; + return true; } NamespaceString ns() const override { @@ -93,7 +87,7 @@ public: } bool adminOnly() const override { - return true; + return false; } } CloneCollectionOptionsFromPrimaryShardCmd; diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp index 458495012b3..581c2459aeb 100644 --- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp @@ -440,8 +440,8 @@ void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx, /** * Migrates the initial "big chunks" from the primary shard to spread them evenly across the shards. * - * If 'allSplits' is not empty, additionally splits each "big chunk" into smaller chunks using the - * points in 'allSplits.' + * If 'finalSplitPoints' is not empty, additionally splits each "big chunk" into smaller chunks + * using the points in 'finalSplitPoints.' */ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx, const NamespaceString& nss, @@ -449,7 +449,7 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx, const std::vector<ShardId>& shardIds, bool isEmpty, const ShardKeyPattern& shardKeyPattern, - const std::vector<BSONObj>& allSplits) { + const std::vector<BSONObj>& finalSplitPoints) { auto catalogCache = Grid::get(opCtx)->catalogCache(); if (!shardKeyPattern.isHashedPattern()) { @@ -505,7 +505,7 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx, } } - if (allSplits.empty()) { + if (finalSplitPoints.empty()) { return; } @@ -517,14 +517,14 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx, routingInfo.cm()); chunkManager = routingInfo.cm(); - // Subdivide the big chunks by splitting at each of the points in "allSplits" + // Subdivide the big chunks by splitting at each of the points in "finalSplitPoints" // that we haven't already split by. boost::optional<Chunk> currentChunk( - chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[0])); + chunkManager->findIntersectingChunkWithSimpleCollation(finalSplitPoints[0])); std::vector<BSONObj> subSplits; - for (unsigned i = 0; i <= allSplits.size(); i++) { - if (i == allSplits.size() || !currentChunk->containsKey(allSplits[i])) { + for (unsigned i = 0; i <= finalSplitPoints.size(); i++) { + if (i == finalSplitPoints.size() || !currentChunk->containsKey(finalSplitPoints[i])) { if (!subSplits.empty()) { auto splitStatus = shardutil::splitChunkAtMultiplePoints( opCtx, @@ -543,12 +543,12 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx, subSplits.clear(); } - if (i < allSplits.size()) { + if (i < finalSplitPoints.size()) { currentChunk.emplace( - chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[i])); + chunkManager->findIntersectingChunkWithSimpleCollation(finalSplitPoints[i])); } } else { - BSONObj splitPoint(allSplits[i]); + BSONObj splitPoint(finalSplitPoints[i]); // Do not split on the boundaries if (currentChunk->getMin().woCompare(splitPoint) == 0) { @@ -791,22 +791,12 @@ public: result << "collectionUUID" << *uuid; } - // Make sure the cached metadata for the collection knows that we are now sharded - catalogCache->invalidateShardedCollection(nss); - - // Free the distlocks to allow the splits and migrations below to proceed. - collDistLock.reset(); - dbDistLock.reset(); - lk.unlock(); - - // Step 7. Migrate initial chunks to distribute them across shards. - migrateAndFurtherSplitInitialChunks(opCtx, - nss, - numShards, - shardIds, - isEmpty, - shardKeyPattern, - std::move(shardCollResponse.getAllSplits())); + auto routingInfo = + uassertStatusOK(catalogCache->getCollectionRoutingInfoWithRefresh(opCtx, nss)); + uassert(ErrorCodes::ConflictingOperationInProgress, + "Collection was successfully written as sharded but got dropped before it " + "could be evenly distributed", + routingInfo.cm()); return true; } @@ -823,18 +813,18 @@ public: } // Step 5. - std::vector<BSONObj> initSplits; // there will be at most numShards-1 of these - std::vector<BSONObj> allSplits; // all of the initial desired split points + std::vector<BSONObj> initialSplitPoints; // there will be at most numShards-1 of these + std::vector<BSONObj> finalSplitPoints; // all of the desired split points if (request.getInitialSplitPoints()) { - initSplits = std::move(*request.getInitialSplitPoints()); + initialSplitPoints = std::move(*request.getInitialSplitPoints()); } else { InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( shardKeyPattern, isEmpty, numShards, request.getNumInitialChunks(), - &initSplits, - &allSplits); + &initialSplitPoints, + &finalSplitPoints); } LOG(0) << "CMD: shardcollection: " << cmdObj; @@ -854,7 +844,7 @@ public: shardKeyPattern, *request.getCollation(), request.getUnique(), - initSplits, + initialSplitPoints, distributeInitialChunks, primaryShardId); result << "collectionsharded" << nss.ns(); @@ -871,7 +861,7 @@ public: // Step 7. Migrate initial chunks to distribute them across shards. migrateAndFurtherSplitInitialChunks( - opCtx, nss, numShards, shardIds, isEmpty, shardKeyPattern, allSplits); + opCtx, nss, numShards, shardIds, isEmpty, shardKeyPattern, finalSplitPoints); return true; } diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index 879a7b90339..ca16683c2ad 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -32,6 +32,13 @@ #include "mongo/db/s/config/initial_split_policy.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/read_preference.h" +#include "mongo/db/logical_clock.h" +#include "mongo/s/balancer_configuration.h" +#include "mongo/s/catalog/sharding_catalog_client_impl.h" +#include "mongo/s/grid.h" +#include "mongo/s/shard_util.h" #include "mongo/util/log.h" namespace mongo { @@ -105,7 +112,8 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle const ShardId& databasePrimaryShardId, const Timestamp& validAfter, const std::vector<BSONObj>& splitPoints, - const std::vector<ShardId>& shardIds) { + const std::vector<ShardId>& shardIds, + const int numContiguousChunksPerShard) { invariant(!shardIds.empty()); ChunkVersion version(1, 0, OID::gen()); @@ -125,7 +133,11 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle // It's possible there are no split points or fewer split points than total number of // shards, and we need to be sure that at least one chunk is placed on the primary shard - const ShardId shardId = (i == 0) ? databasePrimaryShardId : shardIds[i % shardIds.size()]; + const ShardId shardId = (i == 0 && splitPoints.size() + 1 < shardIds.size()) + ? databasePrimaryShardId + : shardIds[(i / numContiguousChunksPerShard) % shardIds.size()]; + + // const ShardId shardId = shardIds[(i / numContiguousChunksPerShard) % shardIds.size()]; chunks.emplace_back(nss, ChunkRange(min, max), version, shardId); auto& chunk = chunks.back(); @@ -137,4 +149,93 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle return {std::move(chunks)}; } +InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::writeFirstChunksToConfig( + OperationContext* opCtx, + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const ShardId& primaryShardId, + const std::vector<BSONObj>& splitPoints, + const bool distributeInitialChunks, + const int numContiguousChunksPerShard) { + const auto& keyPattern = shardKeyPattern.getKeyPattern(); + + std::vector<BSONObj> finalSplitPoints; + std::vector<ShardId> shardIds; + + if (splitPoints.empty()) { + // If no split points were specified use the shard's data distribution to determine them + auto primaryShard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId)); + + auto result = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, + nss.db().toString(), + BSON("count" << nss.coll()), + Shard::RetryPolicy::kIdempotent)); + + long long numObjects = 0; + uassertStatusOK(result.commandStatus); + uassertStatusOK(bsonExtractIntegerField(result.response, "n", &numObjects)); + + // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of + // the splitVector command and affects the number of chunks returned, has been loaded. + uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx)); + + if (numObjects > 0) { + finalSplitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( + opCtx, + primaryShardId, + nss, + shardKeyPattern, + ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()), + Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), + 0)); + } + + // If docs already exist for the collection, must use primary shard, + // otherwise defer to passed-in distribution option. + if (numObjects == 0 && distributeInitialChunks) { + Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); + } else { + shardIds.push_back(primaryShardId); + } + } else { + // Make sure points are unique and ordered + auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + + for (const auto& splitPoint : splitPoints) { + orderedPts.insert(splitPoint); + } + + for (const auto& splitPoint : orderedPts) { + finalSplitPoints.push_back(splitPoint); + } + + if (distributeInitialChunks) { + Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); + } else { + shardIds.push_back(primaryShardId); + } + } + + auto initialChunks = InitialSplitPolicy::generateShardCollectionInitialChunks( + nss, + shardKeyPattern, + primaryShardId, + LogicalClock::get(opCtx)->getClusterTime().asTimestamp(), + finalSplitPoints, + shardIds, + numContiguousChunksPerShard); + for (const auto& chunk : initialChunks.chunks) { + uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( + opCtx, + ChunkType::ConfigNS, + chunk.toConfigBSON(), + ShardingCatalogClient::kMajorityWriteConcern)); + } + + return initialChunks; +} + } // namespace mongo diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h index 3379554e870..521dbf123d6 100644 --- a/src/mongo/db/s/config/initial_split_policy.h +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -68,9 +68,19 @@ public: }; /** - * Produces the the initial chunks that need to be written for a collection which is being + * Produces the initial chunks that need to be written for a collection which is being * newly-sharded. The function performs some basic validation of the input parameters, but there * is no checking whether the collection contains any data or not. + * + * Chunks are assigned to a shard in a round-robin fashion, numContiguousChunksPerShard (k) + * chunks at a time. For example, the first k chunks are assigned to the first available shard, + * and the next k chunks are assigned to the second available shard and so on. + * numContiguousChunksPerShard should only be > 1 when we do not pre-split the range + * into larger chunks and then split the resulting chunks on the destination shards as in + * configSvrShardCollection, thus should be equal the number of final split points + 1 divided + * by the number of initial split points + 1. It serves to preserve the ordering/contigousness + * of chunks when split by shardSvrShardCollection so that its yields the exact same shard + * assignments as configSvrShardCollection. */ static ShardCollectionConfig generateShardCollectionInitialChunks( const NamespaceString& nss, @@ -78,7 +88,21 @@ public: const ShardId& databasePrimaryShardId, const Timestamp& validAfter, const std::vector<BSONObj>& splitPoints, - const std::vector<ShardId>& shardIds); + const std::vector<ShardId>& shardIds, + const int numContiguousChunksPerShard = 1); + + /** + * Creates and writes to the config server the first chunks for a newly sharded collection. + * Returns the created chunks. + */ + static ShardCollectionConfig writeFirstChunksToConfig( + OperationContext* opCtx, + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const ShardId& primaryShardId, + const std::vector<BSONObj>& splitPoints, + const bool distributeInitialChunks, + const int numContiguousChunksPerShard = 1); }; } // namespace mongo diff --git a/src/mongo/db/s/config/initial_split_policy_test.cpp b/src/mongo/db/s/config/initial_split_policy_test.cpp index 7ab068feeac..0c0370a37e5 100644 --- a/src/mongo/db/s/config/initial_split_policy_test.cpp +++ b/src/mongo/db/s/config/initial_split_policy_test.cpp @@ -79,8 +79,8 @@ void checkCalculatedHashedSplitPoints(bool isHashed, numInitialChunks, &initialSplitPoints, &finalSplitPoints); - assertBSONObjVectorsAreEqual(initialSplitPoints, *expectedInitialSplitPoints); - assertBSONObjVectorsAreEqual(finalSplitPoints, *expectedFinalSplitPoints); + assertBSONObjVectorsAreEqual(*expectedInitialSplitPoints, initialSplitPoints); + assertBSONObjVectorsAreEqual(*expectedFinalSplitPoints, finalSplitPoints); } TEST(CalculateHashedSplitPointsTest, EmptyCollectionMoreChunksThanShards) { @@ -128,6 +128,14 @@ TEST(CalculateHashedSplitPointsTest, NotHashedWithInitialSplitsFails) { class GenerateInitialSplitChunksTest : public unittest::Test { public: + const std::vector<BSONObj>& hashedChunkBounds() { + return _hashedChunkBounds; + } + + const std::vector<BSONObj>& hashedSplitPoints() { + return _splitPoints; + } + ChunkType makeChunk(const BSONObj min, const BSONObj max, const ShardId shardId) { ChunkVersion version(1, 0, OID::gen()); ChunkType chunk(_nss, ChunkRange(min, max), version, shardId); @@ -155,6 +163,14 @@ private: const ShardKeyPattern _shardKeyPattern = makeShardKeyPattern(true); const std::vector<ShardId> _shardIds = {ShardId("testShard0"), ShardId("testShard1")}; const Timestamp _timeStamp{Date_t::now()}; + const KeyPattern& keyPattern = shardKeyPattern().getKeyPattern(); + const std::vector<BSONObj> _hashedChunkBounds = {keyPattern.globalMin(), + BSON("x" << -4611686018427387902LL), + BSON("x" << 0), + BSON("x" << 4611686018427387902LL), + keyPattern.globalMax()}; + const std::vector<BSONObj> _splitPoints{_hashedChunkBounds.begin() + 1, + _hashedChunkBounds.end() - 1}; }; TEST_F(GenerateInitialSplitChunksTest, NoSplitPoints) { @@ -170,22 +186,34 @@ TEST_F(GenerateInitialSplitChunksTest, NoSplitPoints) { } TEST_F(GenerateInitialSplitChunksTest, SplitPointsMoreThanAvailableShards) { - const auto& keyPattern = shardKeyPattern().getKeyPattern(); - const std::vector<BSONObj> expectedChunkBounds = {keyPattern.globalMin(), - BSON("x" << -4611686018427387902LL), - BSON("x" << 0), - BSON("x" << 4611686018427387902LL), - keyPattern.globalMax()}; - const std::vector<BSONObj> splitPoints(expectedChunkBounds.begin() + 1, - expectedChunkBounds.end() - 1); const auto shardCollectionConfig = InitialSplitPolicy::generateShardCollectionInitialChunks( - nss(), shardKeyPattern(), shardIds()[0], timeStamp(), splitPoints, shardIds()); + nss(), shardKeyPattern(), shardIds()[0], timeStamp(), hashedSplitPoints(), shardIds()); + + ASSERT_EQ(hashedSplitPoints().size() + 1, shardCollectionConfig.chunks.size()); + + // chunks should be distributed in a round-robin manner + const std::vector<ShardId> expectedShardIds = { + ShardId("testShard0"), ShardId("testShard1"), ShardId("testShard0"), ShardId("testShard1")}; + for (unsigned long i = 0; i < hashedChunkBounds().size() - 1; ++i) { + const auto expectedChunk = + makeChunk(hashedChunkBounds()[i], hashedChunkBounds()[i + 1], expectedShardIds[i]); + ASSERT_BSONOBJ_EQ(expectedChunk.toShardBSON().removeField("lastmod"), + shardCollectionConfig.chunks[i].toShardBSON().removeField("lastmod")); + } +} + +TEST_F(GenerateInitialSplitChunksTest, SplitPointsNumContiguousChunksPerShardsGreaterThanOne) { + const auto shardCollectionConfig = InitialSplitPolicy::generateShardCollectionInitialChunks( + nss(), shardKeyPattern(), shardIds()[0], timeStamp(), hashedSplitPoints(), shardIds(), 2); + + ASSERT_EQ(hashedSplitPoints().size() + 1, shardCollectionConfig.chunks.size()); - ASSERT_EQ(splitPoints.size() + 1, shardCollectionConfig.chunks.size()); - for (unsigned long i = 0; i < expectedChunkBounds.size() - 1; ++i) { - // chunks should be distributed in a round-robin manner - const auto expectedChunk = makeChunk( - expectedChunkBounds[i], expectedChunkBounds[i + 1], shardIds()[i % shardIds().size()]); + // chunks should be distributed in a round-robin manner two chunks at a time + const std::vector<ShardId> expectedShardIds = { + ShardId("testShard0"), ShardId("testShard0"), ShardId("testShard1"), ShardId("testShard1")}; + for (unsigned long i = 0; i < hashedChunkBounds().size() - 1; ++i) { + const auto expectedChunk = + makeChunk(hashedChunkBounds()[i], hashedChunkBounds()[i + 1], expectedShardIds[i]); ASSERT_BSONOBJ_EQ(expectedChunk.toShardBSON().removeField("lastmod"), shardCollectionConfig.chunks[i].toShardBSON().removeField("lastmod")); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index e01d06d11a9..dd1ffdad078 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -130,96 +130,6 @@ boost::optional<UUID> checkCollectionOptions(OperationContext* opCtx, } // namespace -/** - * Creates and writes to the config server the first chunks for a newly sharded collection. Returns - * the version generated for the collection. - */ -InitialSplitPolicy::ShardCollectionConfig createFirstChunks(OperationContext* opCtx, - const NamespaceString& nss, - const ShardKeyPattern& shardKeyPattern, - const ShardId& primaryShardId, - const std::vector<BSONObj>& initPoints, - const bool distributeInitialChunks) { - const auto& keyPattern = shardKeyPattern.getKeyPattern(); - - std::vector<BSONObj> splitPoints; - std::vector<ShardId> shardIds; - - if (initPoints.empty()) { - // If no split points were specified use the shard's data distribution to determine them - auto primaryShard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId)); - - auto result = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, - nss.db().toString(), - BSON("count" << nss.coll()), - Shard::RetryPolicy::kIdempotent)); - - long long numObjects = 0; - uassertStatusOK(result.commandStatus); - uassertStatusOK(bsonExtractIntegerField(result.response, "n", &numObjects)); - - // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of - // the splitVector command and affects the number of chunks returned, has been loaded. - uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx)); - - if (numObjects > 0) { - splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( - opCtx, - primaryShardId, - nss, - shardKeyPattern, - ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()), - Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), - 0)); - } - - // If docs already exist for the collection, must use primary shard, - // otherwise defer to passed-in distribution option. - if (numObjects == 0 && distributeInitialChunks) { - Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); - } else { - shardIds.push_back(primaryShardId); - } - } else { - // Make sure points are unique and ordered - auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); - - for (const auto& initPoint : initPoints) { - orderedPts.insert(initPoint); - } - - for (const auto& initPoint : orderedPts) { - splitPoints.push_back(initPoint); - } - - if (distributeInitialChunks) { - Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); - } else { - shardIds.push_back(primaryShardId); - } - } - - auto initialChunks = InitialSplitPolicy::generateShardCollectionInitialChunks( - nss, - shardKeyPattern, - primaryShardId, - LogicalClock::get(opCtx)->getClusterTime().asTimestamp(), - splitPoints, - shardIds); - for (const auto& chunk : initialChunks.chunks) { - uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( - opCtx, - ChunkType::ConfigNS, - chunk.toConfigBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - } - - return initialChunks; -} - void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) { BSONObjBuilder countBuilder; countBuilder.append("count", ChunkType::ConfigNS.coll()); @@ -455,7 +365,7 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, const ShardKeyPattern& fieldsAndOrder, const BSONObj& defaultCollation, bool unique, - const vector<BSONObj>& initPoints, + const vector<BSONObj>& splitPoints, const bool distributeInitialChunks, const ShardId& dbPrimaryShardId) { const auto catalogClient = Grid::get(opCtx)->catalogClient(); @@ -475,7 +385,7 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, uuid->appendToBuilder(&collectionDetail, "uuid"); } collectionDetail.append("primary", primaryShard->toString()); - collectionDetail.append("numChunks", static_cast<int>(initPoints.size() + 1)); + collectionDetail.append("numChunks", static_cast<int>(splitPoints.size() + 1)); uassertStatusOK(catalogClient->logChange(opCtx, "shardCollection.start", nss.ns(), @@ -490,8 +400,8 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, ->makeFromBSON(defaultCollation)); } - const auto initialChunks = createFirstChunks( - opCtx, nss, fieldsAndOrder, dbPrimaryShardId, initPoints, distributeInitialChunks); + const auto initialChunks = InitialSplitPolicy::writeFirstChunksToConfig( + opCtx, nss, fieldsAndOrder, dbPrimaryShardId, splitPoints, distributeInitialChunks); { CollectionType coll; diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp index 10a9b5e1d8e..ec2528b6835 100644 --- a/src/mongo/db/s/shardsvr_shard_collection.cpp +++ b/src/mongo/db/s/shardsvr_shard_collection.cpp @@ -32,7 +32,6 @@ #include "mongo/bson/simple_bsonelement_comparator.h" #include "mongo/bson/util/bson_extract.h" -#include "mongo/client/connpool.h" #include "mongo/db/audit.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_session.h" @@ -56,7 +55,7 @@ #include "mongo/s/catalog/type_tags.h" #include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/grid.h" -#include "mongo/s/request_types/set_shard_version_request.h" +#include "mongo/s/request_types/clone_collection_options_from_primary_shard_gen.h" #include "mongo/s/request_types/shard_collection_gen.h" #include "mongo/s/shard_util.h" #include "mongo/util/log.h" @@ -139,11 +138,11 @@ BSONObj makeCreateIndexesCmd(const NamespaceString& nss, * * If the collection is empty and no index on the shard key exists, creates the required index. */ -void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& proposedKey, - const ShardKeyPattern& shardKeyPattern, - const ShardsvrShardCollection& request) { +void createCollectionOrValidateExisting(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& proposedKey, + const ShardKeyPattern& shardKeyPattern, + const ShardsvrShardCollection& request) { // The proposed shard key must be validated against the set of existing indexes. // In particular, we must ensure the following constraints // @@ -173,7 +172,7 @@ void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx, DBDirectClient localClient(opCtx); std::list<BSONObj> indexes = localClient.getIndexSpecs(nss.ns()); - // 1. Verify consistency with existing unique indexes + // 1. Verify consistency with existing unique indexes for (const auto& idx : indexes) { BSONObj currentKey = idx["key"].embeddedObject(); bool isUnique = idx["unique"].trueValue(); @@ -370,96 +369,6 @@ boost::optional<UUID> getUUIDFromPrimaryShard(OperationContext* opCtx, const Nam return uassertStatusOK(UUID::parse(collectionInfo["uuid"])); } -/** - * Creates and writes to the config server the first chunks for a newly sharded collection. Returns - * the version generated for the collection. - */ -InitialSplitPolicy::ShardCollectionConfig createFirstChunks(OperationContext* opCtx, - const NamespaceString& nss, - const ShardKeyPattern& shardKeyPattern, - const ShardId& primaryShardId, - const std::vector<BSONObj>& initPoints, - const bool distributeInitialChunks) { - const auto& keyPattern = shardKeyPattern.getKeyPattern(); - - std::vector<BSONObj> splitPoints; - std::vector<ShardId> shardIds; - - if (initPoints.empty()) { - // If no split points were specified use the shard's data distribution to determine them - auto primaryShard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId)); - - auto result = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, - nss.db().toString(), - BSON("count" << nss.coll()), - Shard::RetryPolicy::kIdempotent)); - - long long numObjects = 0; - uassertStatusOK(result.commandStatus); - uassertStatusOK(bsonExtractIntegerField(result.response, "n", &numObjects)); - - // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of - // the splitVector command and affects the number of chunks returned, has been loaded. - uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx)); - - if (numObjects > 0) { - splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( - opCtx, - primaryShardId, - nss, - shardKeyPattern, - ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()), - Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), - 0)); - } - - // If docs already exist for the collection, must use primary shard, - // otherwise defer to passed-in distribution option. - if (numObjects == 0 && distributeInitialChunks) { - Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); - } else { - shardIds.push_back(primaryShardId); - } - } else { - // Make sure points are unique and ordered - auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); - - for (const auto& initPoint : initPoints) { - orderedPts.insert(initPoint); - } - - for (const auto& initPoint : orderedPts) { - splitPoints.push_back(initPoint); - } - - if (distributeInitialChunks) { - Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); - } else { - shardIds.push_back(primaryShardId); - } - } - - auto initialChunks = InitialSplitPolicy::generateShardCollectionInitialChunks( - nss, - shardKeyPattern, - primaryShardId, - LogicalClock::get(opCtx)->getClusterTime().asTimestamp(), - splitPoints, - shardIds); - for (const auto& chunk : initialChunks.chunks) { - uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( - opCtx, - ChunkType::ConfigNS, - chunk.toConfigBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - } - - return initialChunks; -} - void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) { BSONObjBuilder countBuilder; countBuilder.append("count", ChunkType::ConfigNS.coll()); @@ -501,13 +410,15 @@ void shardCollection(OperationContext* opCtx, const ShardKeyPattern& fieldsAndOrder, const BSONObj& defaultCollation, bool unique, - const std::vector<BSONObj>& initPoints, - const bool distributeInitialChunks, - const ShardId& dbPrimaryShardId) { + const std::vector<BSONObj>& splitPoints, + const bool fromMapReduce, + const ShardId& dbPrimaryShardId, + const int numContiguousChunksPerShard) { const auto catalogClient = Grid::get(opCtx)->catalogClient(); const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); const auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbPrimaryShardId)); + const bool distributeChunks = fromMapReduce || fieldsAndOrder.isHashedPattern(); // Fail if there are partially written chunks from a previous failed shardCollection. checkForExistingChunks(opCtx, nss); @@ -521,7 +432,7 @@ void shardCollection(OperationContext* opCtx, uuid->appendToBuilder(&collectionDetail, "uuid"); } collectionDetail.append("primary", primaryShard->toString()); - collectionDetail.append("numChunks", static_cast<int>(initPoints.size() + 1)); + collectionDetail.append("numChunks", static_cast<int>(splitPoints.size() + 1)); uassertStatusOK(catalogClient->logChange(opCtx, "shardCollection.start", nss.ns(), @@ -536,8 +447,14 @@ void shardCollection(OperationContext* opCtx, ->makeFromBSON(defaultCollation)); } - const auto initialChunks = createFirstChunks( - opCtx, nss, fieldsAndOrder, dbPrimaryShardId, initPoints, distributeInitialChunks); + const auto initialChunks = + InitialSplitPolicy::writeFirstChunksToConfig(opCtx, + nss, + fieldsAndOrder, + dbPrimaryShardId, + splitPoints, + distributeChunks, + numContiguousChunksPerShard); { CollectionType coll; @@ -556,6 +473,51 @@ void shardCollection(OperationContext* opCtx, forceShardFilteringMetadataRefresh(opCtx, nss); + // Create collections on all shards that will receive chunks. We need to do this after we mark + // the collection as sharded so that the shards will update their metadata correctly. We do not + // want to do this for mapReduce. + if (!fromMapReduce) { + std::vector<AsyncRequestsSender::Request> requests; + for (const auto& chunk : initialChunks.chunks) { + if (chunk.getShard() == dbPrimaryShardId) + continue; + + CloneCollectionOptionsFromPrimaryShard cloneCollectionOptionsFromPrimaryShardRequest( + nss); + cloneCollectionOptionsFromPrimaryShardRequest.setPrimaryShard( + dbPrimaryShardId.toString()); + cloneCollectionOptionsFromPrimaryShardRequest.setDbName(nss.db()); + + requests.emplace_back( + chunk.getShard(), + cloneCollectionOptionsFromPrimaryShardRequest.toBSON( + BSON("writeConcern" << ShardingCatalogClient::kMajorityWriteConcern.toBSON()))); + } + + if (!requests.empty()) { + auto responses = gatherResponses(opCtx, + nss.db(), + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + requests); + + // If any shards fail to create the collection, fail the entire shardCollection command + // (potentially leaving incomplely created sharded collection) + for (const auto& response : responses) { + auto shardResponse = uassertStatusOKWithContext( + std::move(response.swResponse), + str::stream() << "Unable to create collection on " << response.shardId); + auto status = getStatusFromCommandResult(shardResponse.data); + uassertStatusOK(status.withContext( + str::stream() << "Unable to create collection on " << response.shardId)); + + auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data); + uassertStatusOK(wcStatus.withContext( + str::stream() << "Unable to create collection on " << response.shardId)); + } + } + } + catalogClient ->logChange(opCtx, "shardCollection.end", @@ -619,9 +581,9 @@ public: auto proposedKey(request.getKey().getOwned()); ShardKeyPattern shardKeyPattern(proposedKey); - validateShardKeyAgainstExistingIndexes(opCtx, nss, proposedKey, shardKeyPattern, request); + createCollectionOrValidateExisting(opCtx, nss, proposedKey, shardKeyPattern, request); - // read zone info + // Read zone info auto configServer = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto tagStatus = configServer->exhaustiveFindOnConfig(opCtx, @@ -634,7 +596,7 @@ public: uassertStatusOK(tagStatus); const auto& tagDocList = tagStatus.getValue().docs; - if (tagDocList.size() > 0) { + if (!tagDocList.empty()) { validateShardKeyAgainstExistingZones(opCtx, proposedKey, shardKeyPattern, tagDocList); } @@ -647,6 +609,7 @@ public: Grid::get(opCtx)->shardRegistry()->reload(opCtx); auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + shardRegistry->reload(opCtx); DBDirectClient localClient(opCtx); bool isEmpty = (localClient.count(nss.ns()) == 0); @@ -657,18 +620,19 @@ public: // SERVER-35794 TODO: Use zone info to determine which shards should have chunks placed on // them. - std::vector<BSONObj> initSplits; // there will be at most numShards-1 of these - std::vector<BSONObj> allSplits; // all of the initial desired split points + + std::vector<BSONObj> initialSplitPoints; + std::vector<BSONObj> finalSplitPoints; if (request.getInitialSplitPoints()) { - initSplits = std::move(*request.getInitialSplitPoints()); + finalSplitPoints = std::move(*request.getInitialSplitPoints()); } else { InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( shardKeyPattern, isEmpty, numShards, request.getNumInitialChunks(), - &initSplits, - &allSplits); + &initialSplitPoints, + &finalSplitPoints); } result << "collectionsharded" << nss.ns(); @@ -676,19 +640,19 @@ public: result << "collectionUUID" << *uuid; } - result << "allSplits" << allSplits; - critSec.enterCommitPhase(); LOG(0) << "CMD: shardcollection: " << cmdObj; audit::logShardCollection(Client::getCurrent(), nss.ns(), proposedKey, request.getUnique()); - // The initial chunks are distributed evenly across shards only if the initial split - // points were specified in the request, i.e., by mapReduce. Otherwise, all the initial - // chunks are placed on the primary shard, and may be distributed across shards through - // migrations (below) if using a hashed shard key. - const bool distributeInitialChunks = bool(request.getInitialSplitPoints()); + // The initial chunks are distributed evenly across shards if the initial split points were + // specified in the request by mapReduce or if we are using a hashed shard key. Otherwise, + // all the initial chunks are placed on the primary shard. + const bool fromMapReduce = bool(request.getInitialSplitPoints()); + const int numContiguousChunksPerShard = initialSplitPoints.empty() + ? 1 + : (finalSplitPoints.size() + 1) / (initialSplitPoints.size() + 1); // Step 6. Actually shard the collection. shardCollection(opCtx, @@ -697,9 +661,10 @@ public: shardKeyPattern, *request.getCollation(), request.getUnique(), - initSplits, - distributeInitialChunks, - ShardingState::get(opCtx)->getShardName()); + finalSplitPoints, + fromMapReduce, + ShardingState::get(opCtx)->getShardName(), + numContiguousChunksPerShard); return true; } diff --git a/src/mongo/s/commands/cluster_commands_helpers.cpp b/src/mongo/s/commands/cluster_commands_helpers.cpp index 2820cc62d91..86597aea280 100644 --- a/src/mongo/s/commands/cluster_commands_helpers.cpp +++ b/src/mongo/s/commands/cluster_commands_helpers.cpp @@ -151,9 +151,8 @@ std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShard return requests; } -/** - * Throws StaleConfigException if any remote returns a stale shardVersion error. - */ +} // namespace + std::vector<AsyncRequestsSender::Response> gatherResponses( OperationContext* opCtx, StringData dbName, @@ -232,8 +231,6 @@ std::vector<AsyncRequestsSender::Response> gatherResponses( return responses; } -} // namespace - BSONObj appendShardVersion(BSONObj cmdObj, ChunkVersion version) { BSONObjBuilder cmdWithVersionBob(std::move(cmdObj)); version.appendToCommand(&cmdWithVersionBob); diff --git a/src/mongo/s/commands/cluster_commands_helpers.h b/src/mongo/s/commands/cluster_commands_helpers.h index 4517f951699..bc848435357 100644 --- a/src/mongo/s/commands/cluster_commands_helpers.h +++ b/src/mongo/s/commands/cluster_commands_helpers.h @@ -48,6 +48,20 @@ namespace mongo { void appendWriteConcernErrorToCmdResponse(const ShardId& shardID, const BSONElement& wcErrorElem, BSONObjBuilder& responseBuilder); + +/** + * Dispatches all the specified requests in parallel and waits until all complete, returning a + * vector of the same size and positions as that of 'requests'. + * + * Throws StaleConfigException if any remote returns a stale shardVersion error. + */ +std::vector<AsyncRequestsSender::Response> gatherResponses( + OperationContext* opCtx, + StringData dbName, + const ReadPreferenceSetting& readPref, + Shard::RetryPolicy retryPolicy, + const std::vector<AsyncRequestsSender::Request>& requests); + /** * Returns a copy of 'cmdObj' with 'version' appended. */ diff --git a/src/mongo/s/request_types/shard_collection.idl b/src/mongo/s/request_types/shard_collection.idl index 97e97aa5ba5..1adcd923527 100644 --- a/src/mongo/s/request_types/shard_collection.idl +++ b/src/mongo/s/request_types/shard_collection.idl @@ -104,7 +104,7 @@ structs: type: uuid description: "The UUID of the collection that just got sharded." optional: true - + ShardsvrShardCollection: description: "The internal shardCollection command on a primary shard" strict: false @@ -148,11 +148,7 @@ structs: type: uuid description: "The UUID of the collection that just got sharded." optional: true - allSplits: - type: array<object> - description: "All split points." - optional: false - + ConfigsvrCommitShardCollection: description: "The internal commitShardCollection command on the config server" strict: false |