diff options
author | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2018-07-27 09:56:07 -0400 |
---|---|---|
committer | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2018-08-22 10:49:30 -0400 |
commit | e9226c9e1e1f0f4990279f207c41f78289e7c2ea (patch) | |
tree | 27cdc3f2f3ddf595cbdc12f630c05781779c901c /src/mongo | |
parent | 6e85d023feb6e87ba476108f4f6f149e4bd2449d (diff) | |
download | mongo-e9226c9e1e1f0f4990279f207c41f78289e7c2ea.tar.gz |
SERVER-36102 Create initial chunks on appropriate shards for zoned sharding
(cherry picked from commit e444d98f411566bcd983e7ca2eccfda1bc14fe5a)
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy.cpp | 172 | ||||
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy.h | 18 | ||||
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy_test.cpp | 311 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_shard_collection.cpp | 102 |
5 files changed, 485 insertions, 121 deletions
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index 2e642c73d56..1ce7a2297c7 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -38,12 +38,79 @@ #include "mongo/db/server_options.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" +#include "mongo/s/catalog/type_shard.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" #include "mongo/util/log.h" namespace mongo { +/* + * Creates a chunk based on the given arguments, appends it to 'chunks', and + * increments the given chunk version + */ +void appendChunk(const NamespaceString& nss, + const BSONObj& min, + const BSONObj& max, + ChunkVersion* version, + const Timestamp& validAfter, + const ShardId& shardId, + std::vector<ChunkType>* chunks) { + chunks->emplace_back(nss, ChunkRange(min, max), *version, shardId); + if (serverGlobalParams.featureCompatibility.getVersion() >= + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) { + auto& chunk = chunks->back(); + chunk.setHistory({ChunkHistory(validAfter, shardId)}); + } + version->incMinor(); +} + +/* + * Returns a map mapping each tag name to a vector of shard ids with that tag name + */ +StringMap<std::vector<ShardId>> getTagToShardIds(OperationContext* opCtx, + const std::vector<TagsType>& tags) { + StringMap<std::vector<ShardId>> tagToShardIds; + if (tags.empty()) { + return tagToShardIds; + } + + // get all docs in config.shards + auto configServer = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + auto findShardsStatus = + configServer->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting(ReadPreference::Nearest), + repl::ReadConcernLevel::kMajorityReadConcern, + ShardType::ConfigNS, + BSONObj(), + BSONObj(), + 0); + uassertStatusOK(findShardsStatus); + uassert(ErrorCodes::InternalError, + str::stream() << "cannot find any shard documents", + !findShardsStatus.getValue().docs.empty()); + + for (const auto& tag : tags) { + tagToShardIds[tag.getTag()] = {}; + } + + const auto& shardDocList = findShardsStatus.getValue().docs; + + for (const auto& shardDoc : shardDocList) { + auto shardParseStatus = ShardType::fromBSON(shardDoc); + uassertStatusOK(shardParseStatus); + auto parsedShard = shardParseStatus.getValue(); + for (const auto& tag : parsedShard.getTags()) { + auto it = tagToShardIds.find(tag); + if (it != tagToShardIds.end()) { + it->second.push_back(parsedShard.getName()); + } + } + } + + return tagToShardIds; +} + void InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( const ShardKeyPattern& shardKeyPattern, bool isEmpty, @@ -113,17 +180,11 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle const ShardId& databasePrimaryShardId, const Timestamp& validAfter, const std::vector<BSONObj>& splitPoints, - const std::vector<ShardId>& shardIds, + const std::vector<ShardId>& allShardIds, const int numContiguousChunksPerShard) { - invariant(!shardIds.empty()); + invariant(!allShardIds.empty()); ChunkVersion version(1, 0, OID::gen()); - - const size_t numChunksToCreate = splitPoints.size() + 1; - - log() << "Going to create " << numChunksToCreate << " chunk(s) for: " << nss - << " using new epoch " << version.epoch(); - const auto& keyPattern(shardKeyPattern.getKeyPattern()); std::vector<ChunkType> chunks; @@ -134,21 +195,65 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle // It's possible there are no split points or fewer split points than total number of // shards, and we need to be sure that at least one chunk is placed on the primary shard - const ShardId shardId = (i == 0 && splitPoints.size() + 1 < shardIds.size()) + const ShardId shardId = (i == 0 && splitPoints.size() + 1 < allShardIds.size()) ? databasePrimaryShardId - : shardIds[(i / numContiguousChunksPerShard) % shardIds.size()]; + : allShardIds[(i / numContiguousChunksPerShard) % allShardIds.size()]; - // const ShardId shardId = shardIds[(i / numContiguousChunksPerShard) % shardIds.size()]; + appendChunk(nss, min, max, &version, validAfter, shardId, &chunks); + } - chunks.emplace_back(nss, ChunkRange(min, max), version, shardId); - if (serverGlobalParams.featureCompatibility.getVersion() >= - ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) { - auto& chunk = chunks.back(); - chunk.setHistory({ChunkHistory(validAfter, shardId)}); - } + log() << "Created " << chunks.size() << " chunk(s) for: " << nss << " using new epoch " + << version.epoch(); + + return {std::move(chunks)}; +} + +InitialSplitPolicy::ShardCollectionConfig +InitialSplitPolicy::generateShardCollectionInitialZonedChunks( + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const Timestamp& validAfter, + const std::vector<TagsType>& tags, + const StringMap<std::vector<ShardId>>& tagToShards, + const std::vector<ShardId>& allShardIds) { + invariant(!allShardIds.empty()); + + uassert(ErrorCodes::InvalidOptions, + str::stream() << "cannot find zone split points because no zone docs were found", + !tags.empty()); - version.incMinor(); + ChunkVersion version(1, 0, OID::gen()); + const auto& keyPattern = shardKeyPattern.getKeyPattern(); + auto lastChunkMax = keyPattern.globalMin(); + int indx = 0; + + std::vector<ChunkType> chunks; + + for (const auto& tag : tags) { + if (tag.getMinKey().woCompare(lastChunkMax) > 0) { + // create a chunk for the hole between zones + const ShardId shardId = allShardIds[indx++ % allShardIds.size()]; + appendChunk(nss, lastChunkMax, tag.getMinKey(), &version, validAfter, shardId, &chunks); + } + // create a chunk for the zone + appendChunk(nss, + tag.getMinKey(), + tag.getMaxKey(), + &version, + validAfter, + tagToShards.find(tag.getTag())->second[0], + &chunks); + lastChunkMax = tag.getMaxKey(); } + if (lastChunkMax.woCompare(keyPattern.globalMax()) < 0) { + // existing zones do not span to $maxKey so create a chunk for that + const ShardId shardId = allShardIds[indx++ % allShardIds.size()]; + appendChunk( + nss, lastChunkMax, keyPattern.globalMax(), &version, validAfter, shardId, &chunks); + } + + log() << "Created " << chunks.size() << " chunk(s) for: " << nss << " using new epoch " + << version.epoch(); return {std::move(chunks)}; } @@ -159,6 +264,7 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::writeFirstChunksTo const ShardKeyPattern& shardKeyPattern, const ShardId& primaryShardId, const std::vector<BSONObj>& splitPoints, + const std::vector<TagsType>& tags, const bool distributeInitialChunks, const int numContiguousChunksPerShard) { const auto& keyPattern = shardKeyPattern.getKeyPattern(); @@ -166,8 +272,9 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::writeFirstChunksTo std::vector<BSONObj> finalSplitPoints; std::vector<ShardId> shardIds; - if (splitPoints.empty()) { - // If no split points were specified use the shard's data distribution to determine them + if (splitPoints.empty() && tags.empty()) { + // If neither split points nor tags were specified use the shard's data distribution to + // determine them auto primaryShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId)); @@ -223,14 +330,23 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::writeFirstChunksTo } } - auto initialChunks = InitialSplitPolicy::generateShardCollectionInitialChunks( - nss, - shardKeyPattern, - primaryShardId, - LogicalClock::get(opCtx)->getClusterTime().asTimestamp(), - finalSplitPoints, - shardIds, - numContiguousChunksPerShard); + const auto tagToShards = getTagToShardIds(opCtx, tags); + const Timestamp& validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); + + uassert(ErrorCodes::InternalError, + str::stream() << "cannot generate initial chunks based on both split points and tags", + tags.empty() || finalSplitPoints.empty()); + + auto initialChunks = tags.empty() + ? InitialSplitPolicy::generateShardCollectionInitialChunks(nss, + shardKeyPattern, + primaryShardId, + validAfter, + finalSplitPoints, + shardIds, + numContiguousChunksPerShard) + : InitialSplitPolicy::generateShardCollectionInitialZonedChunks( + nss, shardKeyPattern, validAfter, tags, tagToShards, shardIds); for (const auto& chunk : initialChunks.chunks) { uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( opCtx, diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h index 521dbf123d6..64af117472f 100644 --- a/src/mongo/db/s/config/initial_split_policy.h +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -33,6 +33,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_tags.h" #include "mongo/s/shard_id.h" #include "mongo/s/shard_key_pattern.h" @@ -88,10 +89,23 @@ public: const ShardId& databasePrimaryShardId, const Timestamp& validAfter, const std::vector<BSONObj>& splitPoints, - const std::vector<ShardId>& shardIds, + const std::vector<ShardId>& allShardIds, const int numContiguousChunksPerShard = 1); /** + * Produces the initial chunks that need to be written for a collection which is being + * newly-sharded based on the given tags. Chunks that do not correspond to any pre-defined + * zones are assigned to available shards in a round-robin fashion. + */ + static ShardCollectionConfig generateShardCollectionInitialZonedChunks( + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const Timestamp& validAfter, + const std::vector<TagsType>& tags, + const StringMap<std::vector<ShardId>>& tagToShards, + const std::vector<ShardId>& allShardIds); + + /** * Creates and writes to the config server the first chunks for a newly sharded collection. * Returns the created chunks. */ @@ -101,8 +115,8 @@ public: const ShardKeyPattern& shardKeyPattern, const ShardId& primaryShardId, const std::vector<BSONObj>& splitPoints, + const std::vector<TagsType>& tags, const bool distributeInitialChunks, const int numContiguousChunksPerShard = 1); }; - } // namespace mongo diff --git a/src/mongo/db/s/config/initial_split_policy_test.cpp b/src/mongo/db/s/config/initial_split_policy_test.cpp index 0c0370a37e5..ee593935a2f 100644 --- a/src/mongo/db/s/config/initial_split_policy_test.cpp +++ b/src/mongo/db/s/config/initial_split_policy_test.cpp @@ -31,6 +31,9 @@ #include "mongo/platform/basic.h" #include "mongo/db/s/config/initial_split_policy.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/catalog/type_tags.h" +#include "mongo/s/config_server_test_fixture.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" @@ -51,6 +54,20 @@ void assertBSONObjVectorsAreEqual(const std::vector<BSONObj>& expected, } /** + * Asserts that the given vectors of ChunkType objects are equal + */ +void assertChunkVectorsAreEqual(const std::vector<ChunkType>& expected, + const std::vector<ChunkType>& actual) { + ASSERT_EQ(expected.size(), actual.size()); + for (auto expectedIt = expected.begin(), actualIt = actual.begin(); + expectedIt != expected.end() && actualIt != actual.end(); + ++expectedIt, ++actualIt) { + ASSERT_BSONOBJ_EQ((*expectedIt).toShardBSON().removeField("lastmod"), + (*actualIt).toShardBSON().removeField("lastmod")); + } +} + +/** * Returns a test hashed shard key pattern if isHashed is true. * Otherwise, returns a regular shard key pattern. */ @@ -128,20 +145,37 @@ TEST(CalculateHashedSplitPointsTest, NotHashedWithInitialSplitsFails) { class GenerateInitialSplitChunksTest : public unittest::Test { public: - const std::vector<BSONObj>& hashedChunkBounds() { - return _hashedChunkBounds; - } + /** + * Returns a vector of ChunkType objects for the given chunk ranges. + * shardIds[i] is the id of shard for the chunk for chunkRanges[i]. + * Checks that chunkRanges and shardIds have the same length. + */ + const std::vector<ChunkType> makeChunks(const std::vector<ChunkRange> chunkRanges, + const std::vector<ShardId> shardIds) { + ASSERT_EQ(chunkRanges.size(), shardIds.size()); + std::vector<ChunkType> chunks; - const std::vector<BSONObj>& hashedSplitPoints() { - return _splitPoints; + for (unsigned long i = 0; i < chunkRanges.size(); ++i) { + ChunkVersion version(1, 0, OID::gen()); + ChunkType chunk(_nss, chunkRanges[i], version, shardIds[i]); + chunk.setHistory({ChunkHistory(_timeStamp, shardIds[i])}); + chunks.push_back(chunk); + } + return chunks; } - ChunkType makeChunk(const BSONObj min, const BSONObj max, const ShardId shardId) { - ChunkVersion version(1, 0, OID::gen()); - ChunkType chunk(_nss, ChunkRange(min, max), version, shardId); - chunk.setHistory({ChunkHistory(_timeStamp, shardId)}); - return chunk; + /** + * Returns a vector of numShards shard ids with shard names + * prefixed by _shardName + */ + const std::vector<ShardId> makeShardIds(const int numShards) { + std::vector<ShardId> shardIds; + for (int i = 0; i < numShards; i++) { + shardIds.push_back(shardId(std::to_string(i))); + } + return shardIds; } + const NamespaceString nss() { return _nss; } @@ -150,8 +184,12 @@ public: return _shardKeyPattern; } - const std::vector<ShardId> shardIds() { - return _shardIds; + const KeyPattern& keyPattern() { + return _shardKeyPattern.getKeyPattern(); + } + + const ShardId shardId(std::string shardNum) { + return ShardId(_shardName + shardNum); } const Timestamp timeStamp() { @@ -161,62 +199,233 @@ public: private: const NamespaceString _nss{"test.foo"}; const ShardKeyPattern _shardKeyPattern = makeShardKeyPattern(true); - const std::vector<ShardId> _shardIds = {ShardId("testShard0"), ShardId("testShard1")}; + const std::string _shardName = "testShard"; const Timestamp _timeStamp{Date_t::now()}; - const KeyPattern& keyPattern = shardKeyPattern().getKeyPattern(); - const std::vector<BSONObj> _hashedChunkBounds = {keyPattern.globalMin(), - BSON("x" << -4611686018427387902LL), - BSON("x" << 0), - BSON("x" << 4611686018427387902LL), - keyPattern.globalMax()}; - const std::vector<BSONObj> _splitPoints{_hashedChunkBounds.begin() + 1, - _hashedChunkBounds.end() - 1}; }; -TEST_F(GenerateInitialSplitChunksTest, NoSplitPoints) { +class GenerateInitialHashedSplitChunksTest : public GenerateInitialSplitChunksTest { +public: + const std::vector<BSONObj>& hashedSplitPoints() { + return _splitPoints; + } + + const std::vector<ChunkRange>& hashedChunkRanges() { + return _chunkRanges; + } + +private: + const std::vector<BSONObj> _splitPoints{ + BSON("x" << -4611686018427387902LL), BSON("x" << 0), BSON("x" << 4611686018427387902LL)}; + const std::vector<ChunkRange> _chunkRanges = { + ChunkRange(keyPattern().globalMin(), BSON("x" << -4611686018427387902LL)), + ChunkRange(BSON("x" << -4611686018427387902LL), BSON("x" << 0)), + ChunkRange(BSON("x" << 0), BSON("x" << 4611686018427387902LL)), + ChunkRange(BSON("x" << 4611686018427387902LL), keyPattern().globalMax()), + }; +}; + +TEST_F(GenerateInitialHashedSplitChunksTest, NoSplitPoints) { const std::vector<BSONObj> splitPoints; + const std::vector<ShardId> shardIds = makeShardIds(2); const auto shardCollectionConfig = InitialSplitPolicy::generateShardCollectionInitialChunks( - nss(), shardKeyPattern(), shardIds()[0], timeStamp(), splitPoints, shardIds()); + nss(), shardKeyPattern(), shardIds[0], timeStamp(), splitPoints, shardIds); - const auto& keyPattern = shardKeyPattern().getKeyPattern(); - const auto expectedChunk = - makeChunk(keyPattern.globalMin(), keyPattern.globalMax(), shardIds()[0]); - ASSERT_EQ(1U, shardCollectionConfig.chunks.size()); - ASSERT_BSONOBJ_EQ(expectedChunk.toShardBSON(), shardCollectionConfig.chunks[0].toShardBSON()); + // there should only be one chunk + const auto expectedChunks = makeChunks( + {ChunkRange(keyPattern().globalMin(), keyPattern().globalMax())}, {shardId("0")}); + assertChunkVectorsAreEqual(expectedChunks, shardCollectionConfig.chunks); } -TEST_F(GenerateInitialSplitChunksTest, SplitPointsMoreThanAvailableShards) { +TEST_F(GenerateInitialHashedSplitChunksTest, SplitPointsMoreThanAvailableShards) { + const std::vector<ShardId> shardIds = makeShardIds(2); const auto shardCollectionConfig = InitialSplitPolicy::generateShardCollectionInitialChunks( - nss(), shardKeyPattern(), shardIds()[0], timeStamp(), hashedSplitPoints(), shardIds()); + nss(), shardKeyPattern(), shardIds[0], timeStamp(), hashedSplitPoints(), shardIds); - ASSERT_EQ(hashedSplitPoints().size() + 1, shardCollectionConfig.chunks.size()); + // // chunks should be distributed in a round-robin manner + const std::vector<ChunkType> expectedChunks = + makeChunks(hashedChunkRanges(), {shardId("0"), shardId("1"), shardId("0"), shardId("1")}); + assertChunkVectorsAreEqual(expectedChunks, shardCollectionConfig.chunks); +} - // chunks should be distributed in a round-robin manner - const std::vector<ShardId> expectedShardIds = { - ShardId("testShard0"), ShardId("testShard1"), ShardId("testShard0"), ShardId("testShard1")}; - for (unsigned long i = 0; i < hashedChunkBounds().size() - 1; ++i) { - const auto expectedChunk = - makeChunk(hashedChunkBounds()[i], hashedChunkBounds()[i + 1], expectedShardIds[i]); - ASSERT_BSONOBJ_EQ(expectedChunk.toShardBSON().removeField("lastmod"), - shardCollectionConfig.chunks[i].toShardBSON().removeField("lastmod")); +TEST_F(GenerateInitialHashedSplitChunksTest, + SplitPointsNumContiguousChunksPerShardsGreaterThanOne) { + const std::vector<ShardId> shardIds = makeShardIds(2); + const auto shardCollectionConfig = InitialSplitPolicy::generateShardCollectionInitialChunks( + nss(), shardKeyPattern(), shardIds[0], timeStamp(), hashedSplitPoints(), shardIds, 2); + + // chunks should be distributed in a round-robin manner two chunks at a time + const std::vector<ChunkType> expectedChunks = + makeChunks(hashedChunkRanges(), {shardId("0"), shardId("0"), shardId("1"), shardId("1")}); + assertChunkVectorsAreEqual(expectedChunks, shardCollectionConfig.chunks); +} + +class GenerateShardCollectionInitialZonedChunksTest : public GenerateInitialSplitChunksTest { +public: + /** + * Calls generateShardCollectionInitialZonedChunks according to the given arguments + * and asserts that returned chunks match with the chunks created using expectedChunkRanges + * and expectedShardIds. + */ + void checkGeneratedInitialZoneChunks(const std::vector<TagsType>& tags, + const int numShards, + const std::vector<ChunkRange>& expectedChunkRanges, + const std::vector<ShardId>& expectedShardIds) { + const auto shardCollectionConfig = + InitialSplitPolicy::generateShardCollectionInitialZonedChunks( + nss(), + shardKeyPattern(), + timeStamp(), + tags, + makeTagToShards(numShards), + makeShardIds(numShards)); + const std::vector<ChunkType> expectedChunks = + makeChunks(expectedChunkRanges, expectedShardIds); + assertChunkVectorsAreEqual(expectedChunks, shardCollectionConfig.chunks); + } + + const std::string shardKey() { + return _shardKey; + } + + const std::string zoneName(std::string zoneNum) { + return _zoneName + zoneNum; + } + + TagsType makeTag(const ChunkRange range, std::string zoneName) { + BSONObjBuilder tagDocBuilder; + tagDocBuilder.append("_id", + BSON(TagsType::ns(nss().ns()) << TagsType::min(range.getMin()))); + tagDocBuilder.append(TagsType::ns(), nss().ns()); + tagDocBuilder.append(TagsType::min(), range.getMin()); + tagDocBuilder.append(TagsType::max(), range.getMax()); + tagDocBuilder.append(TagsType::tag(), zoneName); + const auto parseStatus = TagsType::fromBSON(tagDocBuilder.obj()); + uassertStatusOK(parseStatus); + return parseStatus.getValue(); } + + /** + * Returns a map of size numTags mapping _zoneName\d to _shardName\d + */ + StringMap<std::vector<ShardId>> makeTagToShards(const int numTags) { + StringMap<std::vector<ShardId>> tagToShards; + for (int i = 0; i < numTags; i++) { + tagToShards[zoneName(std::to_string(i))] = {shardId(std::to_string(i))}; + } + return tagToShards; + } + +private: + const ShardKeyPattern _shardKeyPattern = makeShardKeyPattern(true); + const std::string _zoneName = "zoneName"; + const std::string _shardKey = "x"; +}; + +TEST_F(GenerateShardCollectionInitialZonedChunksTest, PredefinedZonesSpanFromMinToMax) { + const std::vector<ChunkRange> expectedChunkRanges = { + ChunkRange(keyPattern().globalMin(), keyPattern().globalMax()), // corresponds to a zone + }; + const std::vector<TagsType> tags = {makeTag(expectedChunkRanges[0], zoneName("0"))}; + const std::vector<ShardId> expectedShardIds = {shardId("0")}; + checkGeneratedInitialZoneChunks(tags, 1, expectedChunkRanges, expectedShardIds); } -TEST_F(GenerateInitialSplitChunksTest, SplitPointsNumContiguousChunksPerShardsGreaterThanOne) { - const auto shardCollectionConfig = InitialSplitPolicy::generateShardCollectionInitialChunks( - nss(), shardKeyPattern(), shardIds()[0], timeStamp(), hashedSplitPoints(), shardIds(), 2); +TEST_F(GenerateShardCollectionInitialZonedChunksTest, PredefinedZonesSpanDoNotSpanFromMinToMax) { + const std::vector<ChunkRange> expectedChunkRanges = { + ChunkRange(keyPattern().globalMin(), BSON(shardKey() << 0)), + ChunkRange(BSON(shardKey() << 0), BSON(shardKey() << 10)), // corresponds to a zone + ChunkRange(BSON(shardKey() << 10), keyPattern().globalMax()), + }; + const std::vector<TagsType> tags = {makeTag(expectedChunkRanges[1], zoneName("0"))}; + const std::vector<ShardId> expectedShardIds = {shardId("0"), shardId("0"), shardId("1")}; + checkGeneratedInitialZoneChunks(tags, 2, expectedChunkRanges, expectedShardIds); +} - ASSERT_EQ(hashedSplitPoints().size() + 1, shardCollectionConfig.chunks.size()); +TEST_F(GenerateShardCollectionInitialZonedChunksTest, PredefinedZonesContainGlobalMin) { + const std::vector<ChunkRange> expectedChunkRanges = { + ChunkRange(keyPattern().globalMin(), BSON(shardKey() << 0)), // corresponds to a zone + ChunkRange(BSON(shardKey() << 0), keyPattern().globalMax()), + }; + const std::vector<TagsType> tags = {makeTag(expectedChunkRanges[0], zoneName("0"))}; + const std::vector<ShardId> expectedShardIds = {shardId("0"), shardId("0")}; + checkGeneratedInitialZoneChunks(tags, 2, expectedChunkRanges, expectedShardIds); +} - // chunks should be distributed in a round-robin manner two chunks at a time +TEST_F(GenerateShardCollectionInitialZonedChunksTest, PredefinedZonesContainGlobalMax) { + const std::vector<ChunkRange> expectedChunkRanges = { + ChunkRange(keyPattern().globalMin(), BSON(shardKey() << 0)), + ChunkRange(BSON(shardKey() << 0), keyPattern().globalMax()), // corresponds to a zone + }; + const std::vector<TagsType> tags = {makeTag(expectedChunkRanges[1], zoneName("0"))}; + const std::vector<ShardId> expectedShardIds = {shardId("0"), shardId("0")}; + checkGeneratedInitialZoneChunks(tags, 2, expectedChunkRanges, expectedShardIds); +} + +TEST_F(GenerateShardCollectionInitialZonedChunksTest, PredefinedZonesContainGlobalMinAndMax) { + const std::vector<ChunkRange> expectedChunkRanges = { + ChunkRange(keyPattern().globalMin(), BSON(shardKey() << 0)), // corresponds to a zone + ChunkRange(BSON(shardKey() << 0), BSON(shardKey() << 10)), + ChunkRange(BSON(shardKey() << 10), keyPattern().globalMax()), // corresponds to a zone + }; + const std::vector<TagsType> tags = {makeTag(expectedChunkRanges[0], zoneName("0")), + makeTag(expectedChunkRanges[2], zoneName("1"))}; + const std::vector<ShardId> expectedShardIds = {shardId("0"), shardId("0"), shardId("1")}; + checkGeneratedInitialZoneChunks(tags, 2, expectedChunkRanges, expectedShardIds); +} + +TEST_F(GenerateShardCollectionInitialZonedChunksTest, PredefinedZonesContiguous) { + const std::vector<ChunkRange> expectedChunkRanges = { + ChunkRange(keyPattern().globalMin(), BSON(shardKey() << 0)), + ChunkRange(BSON(shardKey() << 0), BSON(shardKey() << 10)), // corresponds to a zone + ChunkRange(BSON(shardKey() << 10), BSON(shardKey() << 20)), // corresponds to a zone + ChunkRange(BSON(shardKey() << 20), keyPattern().globalMax()), + }; + const std::vector<TagsType> tags = {makeTag(expectedChunkRanges[1], zoneName("1")), + makeTag(expectedChunkRanges[2], zoneName("0"))}; const std::vector<ShardId> expectedShardIds = { - ShardId("testShard0"), ShardId("testShard0"), ShardId("testShard1"), ShardId("testShard1")}; - for (unsigned long i = 0; i < hashedChunkBounds().size() - 1; ++i) { - const auto expectedChunk = - makeChunk(hashedChunkBounds()[i], hashedChunkBounds()[i + 1], expectedShardIds[i]); - ASSERT_BSONOBJ_EQ(expectedChunk.toShardBSON().removeField("lastmod"), - shardCollectionConfig.chunks[i].toShardBSON().removeField("lastmod")); - } + shardId("0"), shardId("1"), shardId("0"), shardId("1")}; + checkGeneratedInitialZoneChunks(tags, 2, expectedChunkRanges, expectedShardIds); +} + +TEST_F(GenerateShardCollectionInitialZonedChunksTest, PredefinedZonesNotContiguous) { + const std::vector<ChunkRange> expectedChunkRanges = { + ChunkRange(keyPattern().globalMin(), BSON(shardKey() << 0)), + ChunkRange(BSON(shardKey() << 0), BSON(shardKey() << 10)), // corresponds to a zone + ChunkRange(BSON(shardKey() << 10), BSON(shardKey() << 20)), + ChunkRange(BSON(shardKey() << 20), BSON(shardKey() << 30)), // corresponds to a zone + ChunkRange(BSON(shardKey() << 30), keyPattern().globalMax()), + }; + const std::vector<TagsType> tags = {makeTag(expectedChunkRanges[1], zoneName("0")), + makeTag(expectedChunkRanges[3], zoneName("1"))}; + const std::vector<ShardId> expectedShardIds = { + shardId("0"), shardId("0"), shardId("1"), shardId("1"), shardId("2")}; + checkGeneratedInitialZoneChunks(tags, 3, expectedChunkRanges, expectedShardIds); +} + +TEST_F(GenerateShardCollectionInitialZonedChunksTest, NumRemainingChunksGreaterThanNumShards) { + const std::vector<ChunkRange> expectedChunkRanges = { + ChunkRange(keyPattern().globalMin(), BSON(shardKey() << 0)), + ChunkRange(BSON(shardKey() << 0), BSON(shardKey() << 10)), // corresponds to a zone + ChunkRange(BSON(shardKey() << 10), BSON(shardKey() << 20)), + ChunkRange(BSON(shardKey() << 20), BSON(shardKey() << 30)), + ChunkRange(BSON(shardKey() << 30), keyPattern().globalMax()), + }; + const std::vector<TagsType> tags = {makeTag(expectedChunkRanges[1], zoneName("0")), + makeTag(expectedChunkRanges[3], zoneName("1"))}; + // shard assignment should wrap around to the first shard + const std::vector<ShardId> expectedShardIds = { + shardId("0"), shardId("0"), shardId("1"), shardId("1"), shardId("0")}; + checkGeneratedInitialZoneChunks(tags, 2, expectedChunkRanges, expectedShardIds); +} + +TEST_F(GenerateShardCollectionInitialZonedChunksTest, EmptyTagsShouldFail) { + const std::vector<ChunkRange> expectedChunkRanges; + const std::vector<TagsType> tags; + const std::vector<ShardId> expectedShardIds; + ASSERT_THROWS_CODE( + checkGeneratedInitialZoneChunks(tags, 1, expectedChunkRanges, expectedShardIds), + AssertionException, + ErrorCodes::InvalidOptions); } } // namespace diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index dd1ffdad078..d0b5d303608 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -400,8 +400,9 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, ->makeFromBSON(defaultCollation)); } + std::vector<TagsType> tags; const auto initialChunks = InitialSplitPolicy::writeFirstChunksToConfig( - opCtx, nss, fieldsAndOrder, dbPrimaryShardId, splitPoints, distributeInitialChunks); + opCtx, nss, fieldsAndOrder, dbPrimaryShardId, splitPoints, tags, distributeInitialChunks); { CollectionType coll; diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp index c93ef0cb29c..a35be56bd72 100644 --- a/src/mongo/db/s/shardsvr_shard_collection.cpp +++ b/src/mongo/db/s/shardsvr_shard_collection.cpp @@ -274,30 +274,19 @@ void createCollectionOrValidateExisting(OperationContext* opCtx, void validateShardKeyAgainstExistingZones(OperationContext* opCtx, const BSONObj& proposedKey, const ShardKeyPattern& shardKeyPattern, - const std::vector<BSONObj>& tagDocList) { - for (const auto& tagDoc : tagDocList) { - auto tagParseStatus = TagsType::fromBSON(tagDoc); - uassertStatusOK(tagParseStatus); - const auto& parsedTagDoc = tagParseStatus.getValue(); - uassert(ErrorCodes::InvalidOptions, - str::stream() << "the min and max of the existing zone " << parsedTagDoc.getMinKey() - << " -->> " - << parsedTagDoc.getMaxKey() - << " have non-matching number of keys", - parsedTagDoc.getMinKey().nFields() == parsedTagDoc.getMaxKey().nFields()); - - BSONObjIterator tagMinFields(parsedTagDoc.getMinKey()); - BSONObjIterator tagMaxFields(parsedTagDoc.getMaxKey()); + const std::vector<TagsType>& tags) { + for (const auto& tag : tags) { + BSONObjIterator tagMinFields(tag.getMinKey()); + BSONObjIterator tagMaxFields(tag.getMaxKey()); BSONObjIterator proposedFields(proposedKey); while (tagMinFields.more() && proposedFields.more()) { BSONElement tagMinKeyElement = tagMinFields.next(); BSONElement tagMaxKeyElement = tagMaxFields.next(); uassert(ErrorCodes::InvalidOptions, - str::stream() << "the min and max of the existing zone " - << parsedTagDoc.getMinKey() + str::stream() << "the min and max of the existing zone " << tag.getMinKey() << " -->> " - << parsedTagDoc.getMaxKey() + << tag.getMaxKey() << " have non-matching keys", str::equals(tagMinKeyElement.fieldName(), tagMaxKeyElement.fieldName())); @@ -309,9 +298,9 @@ void validateShardKeyAgainstExistingZones(OperationContext* opCtx, uassert(ErrorCodes::InvalidOptions, str::stream() << "the proposed shard key " << proposedKey.toString() << " does not match with the shard key of the existing zone " - << parsedTagDoc.getMinKey() + << tag.getMinKey() << " -->> " - << parsedTagDoc.getMaxKey(), + << tag.getMaxKey(), match); if (ShardKeyPattern::isHashedPatternEl(proposedKeyElement) && @@ -320,9 +309,9 @@ void validateShardKeyAgainstExistingZones(OperationContext* opCtx, str::stream() << "cannot do hash sharding with the proposed key " << proposedKey.toString() << " because there exists a zone " - << parsedTagDoc.getMinKey() + << tag.getMinKey() << " -->> " - << parsedTagDoc.getMaxKey() + << tag.getMaxKey() << " whose boundaries are not " "of type NumberLong"); } @@ -411,6 +400,7 @@ void shardCollection(OperationContext* opCtx, const BSONObj& defaultCollation, bool unique, const std::vector<BSONObj>& splitPoints, + const std::vector<TagsType>& tags, const bool fromMapReduce, const ShardId& dbPrimaryShardId, const int numContiguousChunksPerShard) { @@ -418,7 +408,8 @@ void shardCollection(OperationContext* opCtx, const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); const auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbPrimaryShardId)); - const bool distributeChunks = fromMapReduce || fieldsAndOrder.isHashedPattern(); + const bool distributeChunks = + fromMapReduce || fieldsAndOrder.isHashedPattern() || !tags.empty(); // Fail if there are partially written chunks from a previous failed shardCollection. checkForExistingChunks(opCtx, nss); @@ -453,6 +444,7 @@ void shardCollection(OperationContext* opCtx, fieldsAndOrder, dbPrimaryShardId, splitPoints, + tags, distributeChunks, numContiguousChunksPerShard); @@ -527,6 +519,43 @@ void shardCollection(OperationContext* opCtx, .ignore(); } +std::vector<TagsType> getExistingTags(OperationContext* opCtx, const NamespaceString& nss) { + auto configServer = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + auto tagStatus = + configServer->exhaustiveFindOnConfig(opCtx, + kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, + TagsType::ConfigNS, + BSON(TagsType::ns(nss.ns())), + BSONObj(), + 0); + uassertStatusOK(tagStatus); + + const auto& tagDocList = tagStatus.getValue().docs; + std::vector<TagsType> tags; + for (const auto& tagDoc : tagDocList) { + auto tagParseStatus = TagsType::fromBSON(tagDoc); + uassertStatusOK(tagParseStatus); + const auto& parsedTag = tagParseStatus.getValue(); + uassert(ErrorCodes::InvalidOptions, + str::stream() << "the min and max of the existing zone " << parsedTag.getMinKey() + << " -->> " + << parsedTag.getMaxKey() + << " have non-matching number of keys", + parsedTag.getMinKey().nFields() == parsedTag.getMaxKey().nFields()); + + const auto& rangeMin = parsedTag.getMinKey(); + const auto& rangeMax = parsedTag.getMaxKey(); + uassert(ErrorCodes::InvalidOptions, + str::stream() << "zone " << rangeMin << " -->> " << rangeMax + << " has min greater than max", + rangeMin.woCompare(rangeMax) < 0); + + tags.push_back(parsedTag); + } + return tags; +} + /** * Internal sharding command run on primary shard server to shard a collection. */ @@ -584,20 +613,10 @@ public: createCollectionOrValidateExisting(opCtx, nss, proposedKey, shardKeyPattern, request); // Read zone info - auto configServer = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - auto tagStatus = - configServer->exhaustiveFindOnConfig(opCtx, - kConfigReadSelector, - repl::ReadConcernLevel::kMajorityReadConcern, - TagsType::ConfigNS, - BSON(TagsType::ns(nss.ns())), - BSONObj(), - 0); - uassertStatusOK(tagStatus); - const auto& tagDocList = tagStatus.getValue().docs; - - if (!tagDocList.empty()) { - validateShardKeyAgainstExistingZones(opCtx, proposedKey, shardKeyPattern, tagDocList); + auto tags = getExistingTags(opCtx, nss); + + if (!tags.empty()) { + validateShardKeyAgainstExistingZones(opCtx, proposedKey, shardKeyPattern, tags); } boost::optional<UUID> uuid; @@ -618,13 +637,17 @@ public: shardRegistry->getAllShardIds(opCtx, &shardIds); const int numShards = shardIds.size(); - // SERVER-35794 TODO: Use zone info to determine which shards should have chunks placed on - // them. - std::vector<BSONObj> initialSplitPoints; std::vector<BSONObj> finalSplitPoints; + if (request.getInitialSplitPoints()) { finalSplitPoints = std::move(*request.getInitialSplitPoints()); + } else if (!tags.empty()) { + // no need to find split points since we will create chunks based on + // the existing zones + uassert(ErrorCodes::InvalidOptions, + str::stream() << "found existing zones but the collection is not empty", + isEmpty); } else { InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( shardKeyPattern, @@ -662,6 +685,7 @@ public: *request.getCollation(), request.getUnique(), finalSplitPoints, + tags, fromMapReduce, ShardingState::get(opCtx)->getShardName(), numContiguousChunksPerShard); |