diff options
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy.cpp | 168 | ||||
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy.h | 24 | ||||
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy_test.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp | 64 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_shard_collection.cpp | 11 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_shard.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_shard.h | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_tags.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_tags.h | 3 |
9 files changed, 188 insertions, 97 deletions
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index d18663a6872..3654a465313 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -45,6 +45,7 @@ #include "mongo/util/log.h" namespace mongo { +namespace { /* * Creates a chunk based on the given arguments, appends it to 'chunks', and @@ -103,6 +104,8 @@ StringMap<std::vector<ShardId>> getTagToShardIds(OperationContext* opCtx, return tagToShardIds; } +} // namespace + void InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( const ShardKeyPattern& shardKeyPattern, bool isEmpty, @@ -194,9 +197,6 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle appendChunk(nss, min, max, &version, validAfter, shardId, &chunks); } - log() << "Created " << chunks.size() << " chunk(s) for: " << nss << " using new epoch " - << version.epoch(); - return {std::move(chunks)}; } @@ -207,71 +207,66 @@ InitialSplitPolicy::generateShardCollectionInitialZonedChunks( const Timestamp& validAfter, const std::vector<TagsType>& tags, const StringMap<std::vector<ShardId>>& tagToShards, - const std::vector<ShardId>& allShardIds, - const bool isEmpty) { - invariant(!allShardIds.empty()); + const std::vector<ShardId>& shardIdsForGaps) { + invariant(!shardIdsForGaps.empty()); invariant(!tags.empty()); - ChunkVersion version(1, 0, OID::gen()); const auto& keyPattern = shardKeyPattern.getKeyPattern(); - auto lastChunkMax = keyPattern.globalMin(); - int indx = 0; + + auto nextShardIdForHole = [&, indx = 0 ]() mutable { + return shardIdsForGaps[indx++ % shardIdsForGaps.size()]; + }; std::vector<ChunkType> chunks; - if (!isEmpty) { - // For a non-empty collection, create one chunk on the primary shard and leave it to the - // balancer to do the final zone partitioning/rebalancing. - appendChunk(nss, - keyPattern.globalMin(), - keyPattern.globalMax(), - &version, - validAfter, - allShardIds[0], - &chunks); - } else { - for (const auto& tag : tags) { - if (tag.getMinKey().woCompare(lastChunkMax) > 0) { - // create a chunk for the hole between zones - const ShardId shardId = allShardIds[indx++ % allShardIds.size()]; - appendChunk( - nss, lastChunkMax, tag.getMinKey(), &version, validAfter, shardId, &chunks); - } - - // check that this tag is associated with a shard and if so create a chunk for the zone. - const auto it = tagToShards.find(tag.getTag()); - invariant(it != tagToShards.end()); - const auto& shardIdsForChunk = it->second; - uassert( - 50973, - str::stream() - << "cannot shard collection " - << nss.ns() - << " because it is associated with zone: " - << tag.getTag() - << " which is not associated with a shard. please add this zone to a shard.", - !shardIdsForChunk.empty()); + ChunkVersion version(1, 0, OID::gen()); + auto lastChunkMax = keyPattern.globalMin(); + for (const auto& tag : tags) { + // Create a chunk for the hole [lastChunkMax, tag.getMinKey) + if (tag.getMinKey().woCompare(lastChunkMax) > 0) { appendChunk(nss, + lastChunkMax, tag.getMinKey(), - tag.getMaxKey(), &version, validAfter, - shardIdsForChunk[0], + nextShardIdForHole(), &chunks); - lastChunkMax = tag.getMaxKey(); } - if (lastChunkMax.woCompare(keyPattern.globalMax()) < 0) { - // existing zones do not span to $maxKey so create a chunk for that - const ShardId shardId = allShardIds[indx++ % allShardIds.size()]; - appendChunk( - nss, lastChunkMax, keyPattern.globalMax(), &version, validAfter, shardId, &chunks); - } + // Create chunk for the actual tag - [tag.getMinKey, tag.getMaxKey) + const auto it = tagToShards.find(tag.getTag()); + invariant(it != tagToShards.end()); + const auto& shardIdsForChunk = it->second; + uassert(50973, + str::stream() + << "Cannot shard collection " + << nss.ns() + << " due to zone " + << tag.getTag() + << " which is not assigned to a shard. Please assign this zone to a shard.", + !shardIdsForChunk.empty()); + + appendChunk(nss, + tag.getMinKey(), + tag.getMaxKey(), + &version, + validAfter, + shardIdsForChunk[0], + &chunks); + lastChunkMax = tag.getMaxKey(); } - log() << "Created " << chunks.size() << " chunk(s) for: " << nss << " using new epoch " - << version.epoch(); + // Create a chunk for the hole [lastChunkMax, MaxKey] + if (lastChunkMax.woCompare(keyPattern.globalMax()) < 0) { + appendChunk(nss, + lastChunkMax, + keyPattern.globalMax(), + &version, + validAfter, + nextShardIdForHole(), + &chunks); + } return {std::move(chunks)}; } @@ -289,7 +284,6 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks( const auto& keyPattern = shardKeyPattern.getKeyPattern(); std::vector<BSONObj> finalSplitPoints; - std::vector<ShardId> shardIds; if (splitPoints.empty() && tags.empty()) { // If neither split points nor tags were specified use the shard's data distribution to @@ -311,14 +305,6 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks( Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), 0)); } - - // If docs already exist for the collection, must use primary shard, - // otherwise defer to passed-in distribution option. - if (isEmpty && distributeInitialChunks) { - Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); - } else { - shardIds.push_back(primaryShardId); - } } else { // Make sure points are unique and ordered auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); @@ -330,12 +316,6 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks( for (const auto& splitPoint : orderedPts) { finalSplitPoints.push_back(splitPoint); } - - if (distributeInitialChunks) { - Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); - } else { - shardIds.push_back(primaryShardId); - } } uassert(ErrorCodes::InvalidOptions, @@ -344,22 +324,44 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks( const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); - auto initialChunks = tags.empty() - ? InitialSplitPolicy::generateShardCollectionInitialChunks(nss, - shardKeyPattern, - primaryShardId, - validAfter, - finalSplitPoints, - shardIds, - numContiguousChunksPerShard) - : InitialSplitPolicy::generateShardCollectionInitialZonedChunks( - nss, - shardKeyPattern, - validAfter, - tags, - getTagToShardIds(opCtx, tags), - shardIds, - isEmpty); + // If docs already exist for the collection, must use primary shard, otherwise defer to + // passed-in distribution option. + std::vector<ShardId> shardIds; + + if (isEmpty && distributeInitialChunks) { + Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); + } else { + shardIds.push_back(primaryShardId); + } + + ShardCollectionConfig initialChunks; + + if (tags.empty()) { + initialChunks = generateShardCollectionInitialChunks(nss, + shardKeyPattern, + primaryShardId, + validAfter, + finalSplitPoints, + shardIds, + numContiguousChunksPerShard); + } else if (!isEmpty) { + // For a non-empty collection, create one chunk on the primary shard and leave it to the + // balancer to do the zone split and rebalancing + ChunkVersion version(1, 0, OID::gen()); + appendChunk(nss, + keyPattern.globalMin(), + keyPattern.globalMax(), + &version, + validAfter, + primaryShardId, + &initialChunks.chunks); + } else { + initialChunks = generateShardCollectionInitialZonedChunks( + nss, shardKeyPattern, validAfter, tags, getTagToShardIds(opCtx, tags), shardIds); + } + + LOG(0) << "Created " << initialChunks.chunks.size() << " chunk(s) for: " << nss + << ", producing collection version " << initialChunks.collVersion(); return initialChunks; } diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h index 67492c24a98..6e43265b3ae 100644 --- a/src/mongo/db/s/config/initial_split_policy.h +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -74,9 +74,11 @@ public: }; /** - * Produces the initial chunks that need to be written for a collection which is being - * newly-sharded. The function performs some basic validation of the input parameters, but there - * is no checking whether the collection contains any data or not. + * Produces the initial chunks that need to be written for an *empty* collection which is being + * sharded based on a set of 'splitPoints' and 'numContiguousChunksPerShard'. + * + * NOTE: The function performs some basic validation of the input parameters, but there is no + * checking whether the collection contains any data or not. * * Chunks are assigned to a shard in a round-robin fashion, numContiguousChunksPerShard (k) * chunks at a time. For example, the first k chunks are assigned to the first available shard, @@ -98,9 +100,16 @@ public: const int numContiguousChunksPerShard = 1); /** - * Produces the initial chunks that need to be written for a collection which is being - * newly-sharded based on the given tags. Chunks that do not correspond to any pre-defined - * zones are assigned to available shards in a round-robin fashion. + * Produces the initial chunks that need to be written for an *empty* collection which is being + * sharded based on the given 'tags'. + * + * NOTE: The function performs some basic validation of the input parameters, but there is no + * checking whether the collection contains any data or not. + * + * The contents of 'tags' will be used to create chunks, which correspond to these zones and + * chunks will be assigned to shards from 'tagToShards'. If there are any holes in between the + * zones (zones are not contiguous), these holes will be assigned to 'shardIdsForGaps' in + * round-robin fashion. */ static ShardCollectionConfig generateShardCollectionInitialZonedChunks( const NamespaceString& nss, @@ -108,8 +117,7 @@ public: const Timestamp& validAfter, const std::vector<TagsType>& tags, const StringMap<std::vector<ShardId>>& tagToShards, - const std::vector<ShardId>& allShardIds, - const bool isEmpty); + const std::vector<ShardId>& shardIdsForGaps); /** * Creates the first chunks for a newly sharded collection. diff --git a/src/mongo/db/s/config/initial_split_policy_test.cpp b/src/mongo/db/s/config/initial_split_policy_test.cpp index 049f5b16892..7ff8e44fef2 100644 --- a/src/mongo/db/s/config/initial_split_policy_test.cpp +++ b/src/mongo/db/s/config/initial_split_policy_test.cpp @@ -281,8 +281,7 @@ public: timeStamp(), tags, makeTagToShards(numShards), - makeShardIds(numShards), - true); + makeShardIds(numShards)); const std::vector<ChunkType> expectedChunks = makeChunks(expectedChunkRanges, expectedShardIds); assertChunkVectorsAreEqual(expectedChunks, shardCollectionConfig.chunks); @@ -432,7 +431,7 @@ TEST_F(GenerateShardCollectionInitialZonedChunksTest, ZoneNotAssociatedWithAnySh ASSERT_THROWS_CODE( InitialSplitPolicy::generateShardCollectionInitialZonedChunks( - nss(), shardKeyPattern(), timeStamp(), tags, tagToShards, makeShardIds(1), true), + nss(), shardKeyPattern(), timeStamp(), tags, tagToShards, makeShardIds(1)), AssertionException, 50973); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp index 4b625547aa6..e8fda03ca89 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp @@ -41,6 +41,7 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/s/config/initial_split_policy.h" #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor.h" @@ -372,5 +373,68 @@ TEST_F(ShardCollectionTest, withInitialData) { future.timed_get(kFutureTimeout); } +using CreateFirstChunksTest = ShardCollectionTest; + +TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmptyCollection) { + const ShardKeyPattern kShardKeyPattern{BSON("x" << 1)}; + const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"), + ShardType("shard1", "rs1/shard1:123"), + ShardType("shard2", "rs2/shard2:123")}; + + const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost())); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(connStr); + targeter->setFindHostReturnValue(connStr.getServers()[0]); + targeterFactory()->addTargeterToReturn(connStr, std::move(targeter)); + + ASSERT_OK(setupShards(kShards)); + shardRegistry()->reload(operationContext()); + + auto future = launchAsync([&] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready("Test"); + auto opCtx = cc().makeOperationContext(); + return InitialSplitPolicy::createFirstChunks(opCtx.get(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {}, + {}, + true, + false /* isEmpty */); + }); + + expectSplitVector(connStr.getServers()[0], kShardKeyPattern, BSON_ARRAY(BSON("x" << 0))); + + const auto& firstChunks = future.timed_get(kFutureTimeout); + ASSERT_EQ(2U, firstChunks.chunks.size()); + ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard()); + ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard()); +} + +TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithTagsIgnoredForNonEmptyCollection) { + const ShardKeyPattern kShardKeyPattern{BSON("x" << 1)}; + const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123", {"TestZone"}), + ShardType("shard1", "rs1/shard1:123", {"TestZone"}), + ShardType("shard2", "rs2/shard2:123")}; + ASSERT_OK(setupShards(kShards)); + shardRegistry()->reload(operationContext()); + + const auto firstChunks = InitialSplitPolicy::createFirstChunks( + operationContext(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {}, + {TagsType(kNamespace, "TestZone", ChunkRange(BSON("x" << MinKey), BSON("x" << 0)))}, + true, + false /* isEmpty */); + + ASSERT_EQ(1U, firstChunks.chunks.size()); + ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp index 0e9344eb1db..ab296364b39 100644 --- a/src/mongo/db/s/shardsvr_shard_collection.cpp +++ b/src/mongo/db/s/shardsvr_shard_collection.cpp @@ -264,7 +264,7 @@ void createCollectionOrValidateExisting(OperationContext* opCtx, BSONObj res; auto success = localClient.runCommand("admin", checkShardingIndexCmd.obj(), res); uassert(ErrorCodes::OperationFailed, res["errmsg"].str(), success); - } else if (localClient.count(nss.ns()) != 0) { + } else if (!localClient.findOne(nss.ns(), Query()).isEmpty()) { // 4. if no useful index, and collection is non-empty, fail uasserted(ErrorCodes::InvalidOptions, "Please create an index that starts with the proposed shard key before " @@ -664,8 +664,13 @@ public: auto shardRegistry = Grid::get(opCtx)->shardRegistry(); shardRegistry->reload(opCtx); - DBDirectClient localClient(opCtx); - bool isEmpty = (localClient.count(nss.ns()) == 0); + const bool isEmpty = [&] { + // Use find with predicate instead of count in order to ensure that the count + // command doesn't just consult the cached metadata, which may not always be + // correct + DBDirectClient localClient(opCtx); + return localClient.findOne(nss.ns(), Query()).isEmpty(); + }(); std::vector<ShardId> shardIds; shardRegistry->getAllShardIds(opCtx, &shardIds); diff --git a/src/mongo/s/catalog/type_shard.cpp b/src/mongo/s/catalog/type_shard.cpp index 2b43b2b2381..1d129f04470 100644 --- a/src/mongo/s/catalog/type_shard.cpp +++ b/src/mongo/s/catalog/type_shard.cpp @@ -51,6 +51,9 @@ const BSONField<long long> ShardType::maxSizeMB("maxSize"); const BSONField<BSONArray> ShardType::tags("tags"); const BSONField<ShardType::ShardState> ShardType::state("state"); +ShardType::ShardType(std::string name, std::string host, std::vector<std::string> tags) + : _name(std::move(name)), _host(std::move(host)), _tags(std::move(tags)) {} + StatusWith<ShardType> ShardType::fromBSON(const BSONObj& source) { ShardType shard; diff --git a/src/mongo/s/catalog/type_shard.h b/src/mongo/s/catalog/type_shard.h index a3640f9fe08..13d996b5555 100644 --- a/src/mongo/s/catalog/type_shard.h +++ b/src/mongo/s/catalog/type_shard.h @@ -69,6 +69,8 @@ public: static const BSONField<BSONArray> tags; static const BSONField<ShardState> state; + ShardType() = default; + ShardType(std::string name, std::string host, std::vector<std::string> tags = {}); /** * Constructs a new ShardType object from BSON. diff --git a/src/mongo/s/catalog/type_tags.cpp b/src/mongo/s/catalog/type_tags.cpp index 4c0a67327f9..a26d9977cf0 100644 --- a/src/mongo/s/catalog/type_tags.cpp +++ b/src/mongo/s/catalog/type_tags.cpp @@ -51,6 +51,11 @@ const BSONField<std::string> TagsType::tag("tag"); const BSONField<BSONObj> TagsType::min("min"); const BSONField<BSONObj> TagsType::max("max"); +TagsType::TagsType(NamespaceString nss, std::string tag, ChunkRange range) + : _ns(std::move(nss)), + _tag(std::move(tag)), + _minKey(range.getMin().getOwned()), + _maxKey(range.getMax().getOwned()) {} StatusWith<TagsType> TagsType::fromBSON(const BSONObj& source) { TagsType tags; diff --git a/src/mongo/s/catalog/type_tags.h b/src/mongo/s/catalog/type_tags.h index e16f53f5164..d8ed443a488 100644 --- a/src/mongo/s/catalog/type_tags.h +++ b/src/mongo/s/catalog/type_tags.h @@ -35,6 +35,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" +#include "mongo/s/catalog/type_chunk.h" namespace mongo { @@ -60,6 +61,8 @@ public: static const BSONField<BSONObj> min; static const BSONField<BSONObj> max; + TagsType() = default; + TagsType(NamespaceString nss, std::string tag, ChunkRange range); /** * Constructs a new DatabaseType object from BSON. Validates that all required fields are |