From 262e5a961fa7221bfba5722aeea2db719f2149f5 Mon Sep 17 00:00:00 2001 From: Kshitij Gupta Date: Mon, 3 Aug 2020 18:48:21 +0000 Subject: SERVER-49510: Add functionality to pick new split points during resharding when zones not defined. SERVER-49525: Sample documents to pick new split points for resharding when new shard key has a hashed field. --- src/mongo/db/pipeline/SConscript | 1 + .../resharding_initial_split_policy_test.cpp | 241 +++++++++++++++++++++ src/mongo/db/s/SConscript | 1 + src/mongo/db/s/config/initial_split_policy.cpp | 134 ++++++++++-- src/mongo/db/s/config/initial_split_policy.h | 27 +++ 5 files changed, 384 insertions(+), 20 deletions(-) create mode 100644 src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 1929babc330..c9e6c4df264 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -381,6 +381,7 @@ env.CppUnitTest( 'lookup_set_cache_test.cpp', 'pipeline_metadata_tree_test.cpp', 'pipeline_test.cpp', + 'resharding_initial_split_policy_test.cpp', 'resume_token_test.cpp', 'semantic_analysis_test.cpp', 'sequential_document_cache_test.cpp', diff --git a/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp b/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp new file mode 100644 index 00000000000..8b3806f5f53 --- /dev/null +++ b/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp @@ -0,0 +1,241 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" +#include "mongo/db/s/config/initial_split_policy.h" +#include "mongo/s/query/sharded_agg_test_fixture.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using ReshardingSplitPolicyTest = ShardedAggTestFixture; + +const ShardId primaryShardId = ShardId("0"); + +TEST_F(ReshardingSplitPolicyTest, ShardKeyWithNonDottedFieldAndIdIsNotProjectedSucceeds) { + auto shardKeyPattern = ShardKeyPattern(BSON("a" << 1)); + + auto pipeline = + Pipeline::parse(ReshardingSplitPolicy::createRawPipeline( + shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */), + expCtx()); + auto mockSource = + DocumentSourceMock::createForTest({"{_id: 10, a: 15}", "{_id: 3, a: 5}"}, expCtx()); + pipeline->addInitialSource(mockSource.get()); + + // We sample all of the documents since numSplitPoints(1) * samplingRatio (2) = 2 and the + // document source has 2 chunks. So we can assert on the returned values. + auto next = pipeline->getNext(); + ASSERT_EQUALS(next.get().getField("a").getInt(), 5); + ASSERT(next.get().getField("_id").missing()); + next = pipeline->getNext(); + ASSERT_EQUALS(next.get().getField("a").getInt(), 15); + ASSERT(next.get().getField("_id").missing()); + ASSERT(!pipeline->getNext()); +} + +TEST_F(ReshardingSplitPolicyTest, ShardKeyWithIdFieldIsProjectedSucceeds) { + auto shardKeyPattern = ShardKeyPattern(BSON("_id" << 1)); + + auto pipeline = + Pipeline::parse(ReshardingSplitPolicy::createRawPipeline( + shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */), + expCtx()); + auto mockSource = + DocumentSourceMock::createForTest({"{_id: 10, a: 15}", "{_id: 3, a: 5}"}, expCtx()); + pipeline->addInitialSource(mockSource.get()); + + // We sample all of the documents since numSplitPoints(1) * samplingRatio (2) = 2 and the + // document source has 2 chunks. So we can assert on the returned values. + auto next = pipeline->getNext(); + ASSERT_EQUALS(next.get().getField("_id").getInt(), 3); + ASSERT(next.get().getField("a").missing()); + next = pipeline->getNext(); + ASSERT_EQUALS(next.get().getField("_id").getInt(), 10); + ASSERT(next.get().getField("a").missing()); + ASSERT(!pipeline->getNext()); +} + +TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithNonDottedHashedFieldSucceeds) { + auto shardKeyPattern = ShardKeyPattern(BSON("a" << 1 << "b" + << "hashed")); + + auto pipeline = + Pipeline::parse(ReshardingSplitPolicy::createRawPipeline( + shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */), + expCtx()); + auto mockSource = DocumentSourceMock::createForTest( + {"{x: 1, b: 16, a: 15}", "{x: 2, b: 123, a: 5}"}, expCtx()); + pipeline->addInitialSource(mockSource.get()); + + // We sample all of the documents since numSplitPoints(1) * samplingRatio (2) = 2 and the + // document source has 2 chunks. So we can assert on the returned values. + auto next = pipeline->getNext(); + ASSERT_EQUALS(next.get().getField("a").getInt(), 5); + ASSERT_EQUALS(next.get().getField("b").getLong(), -6548868637522515075); + ASSERT(next.get().getField("x").missing()); + next = pipeline->getNext(); + ASSERT_EQUALS(next.get().getField("a").getInt(), 15); + ASSERT_EQUALS(next.get().getField("b").getLong(), 2598032665634823220); + ASSERT(next.get().getField("x").missing()); + ASSERT(!pipeline->getNext()); +} + +TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedFieldSucceeds) { + auto shardKeyPattern = ShardKeyPattern(BSON("a.b" << 1 << "c" << 1)); + + auto pipeline = + Pipeline::parse(ReshardingSplitPolicy::createRawPipeline( + shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */), + expCtx()); + auto mockSource = DocumentSourceMock::createForTest( + {"{x: 10, a: {b: 20}, c: 1}", "{x: 3, a: {b: 10}, c: 5}"}, expCtx()); + pipeline->addInitialSource(mockSource.get()); + + // We sample all of the documents since numSplitPoints(1) * samplingRatio (2) = 2 and the + // document source has 2 chunks. So we can assert on the returned values. + auto next = pipeline->getNext(); + ASSERT_BSONOBJ_EQ(next.get().toBson(), BSON("a" << BSON("b" << 10) << "c" << 5)); + next = pipeline->getNext(); + ASSERT_BSONOBJ_EQ(next.get().toBson(), BSON("a" << BSON("b" << 20) << "c" << 1)); + ASSERT(!pipeline->getNext()); +} + +TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedHashedFieldSucceeds) { + auto shardKeyPattern = ShardKeyPattern(BSON("a.b" << 1 << "c" << 1 << "a.c" + << "hashed")); + + auto pipeline = + Pipeline::parse(ReshardingSplitPolicy::createRawPipeline( + shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */), + expCtx()); + auto mockSource = DocumentSourceMock::createForTest( + {"{x: 10, a: {b: 20, c: 16}, c: 1}", "{x: 3, a: {b: 10, c: 123}, c: 5}"}, expCtx()); + pipeline->addInitialSource(mockSource.get()); + + // We sample all of the documents since numSplitPoints(1) * samplingRatio (2) = 2 and the + // document source has 2 chunks. So we can assert on the returned values. + auto next = pipeline->getNext(); + ASSERT_BSONOBJ_EQ(next.get().toBson(), + BSON("a" << BSON("b" << 10 << "c" << -6548868637522515075) << "c" << 5)); + next = pipeline->getNext(); + ASSERT_BSONOBJ_EQ(next.get().toBson(), + BSON("a" << BSON("b" << 20 << "c" << 2598032665634823220) << "c" << 1)); + ASSERT(!pipeline->getNext()); +} + +TEST_F(ReshardingSplitPolicyTest, SamplingSuceeds) { + auto shards = setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss); + // We add a $sortKey field since AsyncResultsMerger expects it in order to merge the batches + // from different shards. + std::vector firstShardChunks{ + BSON("a" << 0 << "$sortKey" << BSON_ARRAY(1)), + BSON("a" << 1 << "$sortKey" << BSON_ARRAY(1)), + BSON("a" << 2 << "$sortKey" << BSON_ARRAY(2)), + BSON("a" << 3 << "$sortKey" << BSON_ARRAY(3)), + BSON("a" << 4 << "$sortKey" << BSON_ARRAY(4)), + BSON("a" << 5 << "$sortKey" << BSON_ARRAY(5)), + BSON("a" << 6 << "$sortKey" << BSON_ARRAY(6)), + BSON("a" << 7 << "$sortKey" << BSON_ARRAY(7)), + BSON("a" << 8 << "$sortKey" << BSON_ARRAY(8)), + BSON("a" << 9 << "$sortKey" << BSON_ARRAY(9)), + BSON("a" << 10 << "$sortKey" << BSON_ARRAY(10)), + }; + + std::vector secondShardChunks{ + BSON("a" << 11 << "$sortKey" << BSON_ARRAY(11)), + BSON("a" << 12 << "$sortKey" << BSON_ARRAY(12)), + BSON("a" << 13 << "$sortKey" << BSON_ARRAY(13)), + BSON("a" << 14 << "$sortKey" << BSON_ARRAY(14)), + BSON("a" << 15 << "$sortKey" << BSON_ARRAY(15)), + BSON("a" << 16 << "$sortKey" << BSON_ARRAY(16)), + BSON("a" << 17 << "$sortKey" << BSON_ARRAY(17)), + BSON("a" << 18 << "$sortKey" << BSON_ARRAY(18)), + BSON("a" << 19 << "$sortKey" << BSON_ARRAY(19)), + BSON("a" << 20 << "$sortKey" << BSON_ARRAY(20)), + BSON("a" << 21 << "$sortKey" << BSON_ARRAY(21)), + BSON("a" << 22 << "$sortKey" << BSON_ARRAY(22)), + }; + + auto shardKeyPattern = ShardKeyPattern(BSON("a" << 1)); + std::vector shardIds; + for (auto&& shard : shards) { + shardIds.push_back(ShardId(shard.getName())); + } + + auto future = launchAsync([&] { + auto policy = ReshardingSplitPolicy(operationContext(), + kTestAggregateNss, + shardKeyPattern, + 4 /* numInitialChunks */, + shardIds, + expCtx()); + const auto chunks = policy + .createFirstChunks(operationContext(), + shardKeyPattern, + {kTestAggregateNss, primaryShardId}) + .chunks; + // We sample all of the documents since numSplitPoints(3) * samplingRatio (10) = 30 and the + // document source has 23 chunks. So we can assert on the split points. + ASSERT_EQ(chunks.size(), 4); + ASSERT_BSONOBJ_EQ(chunks.at(0).getMin(), shardKeyPattern.getKeyPattern().globalMin()); + ASSERT_BSONOBJ_EQ(chunks.at(0).getMax(), firstShardChunks.at(0).removeField("$sortKey")); + + ASSERT_BSONOBJ_EQ(chunks.at(1).getMin(), firstShardChunks.at(0).removeField("$sortKey")); + ASSERT_BSONOBJ_EQ(chunks.at(1).getMax(), firstShardChunks.at(10).removeField("$sortKey")); + + ASSERT_BSONOBJ_EQ(chunks.at(2).getMin(), firstShardChunks.at(10).removeField("$sortKey")); + ASSERT_BSONOBJ_EQ(chunks.at(2).getMax(), secondShardChunks.at(9).removeField("$sortKey")); + + ASSERT_BSONOBJ_EQ(chunks.at(3).getMin(), secondShardChunks.at(9).removeField("$sortKey")); + ASSERT_BSONOBJ_EQ(chunks.at(3).getMax(), shardKeyPattern.getKeyPattern().globalMax()); + }); + + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(kTestAggregateNss, CursorId{0}, firstShardChunks) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(kTestAggregateNss, CursorId{0}, secondShardChunks) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + future.default_timed_get(); +} + +} // namespace +} // namespace mongo \ No newline at end of file diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index eaad8ca1d68..61309e39100 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -253,6 +253,7 @@ env.Library( '$BUILD_DIR/mongo/db/vector_clock_mutable', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/util/options_parser/options_parser', + '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers', ], ) diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index e3fb330800c..36c0f1bf15b 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -35,6 +35,7 @@ #include "mongo/client/read_preference.h" #include "mongo/db/logical_clock.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/grid.h" @@ -97,6 +98,34 @@ StringMap> buildTagsToShardIdsMap(OperationContext* opCtx, return tagToShardIds; } +InitialSplitPolicy::ShardCollectionConfig createChunks(const ShardKeyPattern& shardKeyPattern, + SplitPolicyParams params, + int numContiguousChunksPerShard, + const std::vector& allShardIds, + std::vector& finalSplitPoints, + const Timestamp& validAfter) { + ChunkVersion version(1, 0, OID::gen()); + const auto& keyPattern(shardKeyPattern.getKeyPattern()); + + std::vector chunks; + + for (size_t i = 0; i <= finalSplitPoints.size(); i++) { + const BSONObj min = (i == 0) ? keyPattern.globalMin() : finalSplitPoints[i - 1]; + const BSONObj max = + (i < finalSplitPoints.size()) ? finalSplitPoints[i] : keyPattern.globalMax(); + + // It's possible there are no split points or fewer split points than total number of + // shards, and we need to be sure that at least one chunk is placed on the primary shard + const ShardId shardId = (i == 0 && finalSplitPoints.size() + 1 < allShardIds.size()) + ? params.primaryShardId + : allShardIds[(i / numContiguousChunksPerShard) % allShardIds.size()]; + + appendChunk(params.nss, min, max, &version, validAfter, shardId, &chunks); + } + + return {std::move(chunks)}; +} + } // namespace std::vector InitialSplitPolicy::calculateHashedSplitPoints( @@ -178,26 +207,12 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle finalSplitPoints.push_back(splitPoint); } - ChunkVersion version(1, 0, OID::gen()); - const auto& keyPattern(shardKeyPattern.getKeyPattern()); - - std::vector chunks; - - for (size_t i = 0; i <= finalSplitPoints.size(); i++) { - const BSONObj min = (i == 0) ? keyPattern.globalMin() : finalSplitPoints[i - 1]; - const BSONObj max = - (i < finalSplitPoints.size()) ? finalSplitPoints[i] : keyPattern.globalMax(); - - // It's possible there are no split points or fewer split points than total number of - // shards, and we need to be sure that at least one chunk is placed on the primary shard - const ShardId shardId = (i == 0 && finalSplitPoints.size() + 1 < allShardIds.size()) - ? databasePrimaryShardId - : allShardIds[(i / numContiguousChunksPerShard) % allShardIds.size()]; - - appendChunk(nss, min, max, &version, validAfter, shardId, &chunks); - } - - return {std::move(chunks)}; + return createChunks(shardKeyPattern, + {nss, databasePrimaryShardId}, + numContiguousChunksPerShard, + allShardIds, + finalSplitPoints, + validAfter); } std::unique_ptr InitialSplitPolicy::calculateOptimizationStrategy( @@ -581,4 +596,83 @@ void PresplitHashedZonesSplitPolicy::_validate(const ShardKeyPattern& shardKeyPa } } +std::vector ReshardingSplitPolicy::createRawPipeline(const ShardKeyPattern& shardKey, + int samplingRatio, + int numSplitPoints) { + + std::vector res; + const auto& shardKeyFields = shardKey.getKeyPatternFields(); + + BSONObjBuilder projectValBuilder; + BSONObjBuilder sortValBuilder; + + for (auto&& fieldRef : shardKeyFields) { + // If the shard key includes a hashed field and current fieldRef is the hashed field. + if (shardKey.isHashedPattern() && + fieldRef->dottedField().compare(shardKey.getHashedField().fieldNameStringData()) == 0) { + projectValBuilder.append(fieldRef->dottedField(), + BSON("$toHashedIndexKey" + << "$" + fieldRef->dottedField())); + } else { + projectValBuilder.append( + str::stream() << fieldRef->dottedField(), + BSON("$ifNull" << BSON_ARRAY("$" + fieldRef->dottedField() << BSONNULL))); + } + + sortValBuilder.append(fieldRef->dottedField().toString(), 1); + } + + // Do not project _id if it's not part of the shard key. + if (!shardKey.hasId()) { + projectValBuilder.append("_id", 0); + } + + res.push_back(BSON("$sample" << BSON("size" << numSplitPoints * samplingRatio))); + res.push_back(BSON("$project" << projectValBuilder.obj())); + res.push_back(BSON("$sort" << sortValBuilder.obj())); + + return res; +} + +ReshardingSplitPolicy::ReshardingSplitPolicy(OperationContext* opCtx, + const NamespaceString& nss, + const ShardKeyPattern& shardKey, + int numInitialChunks, + const std::vector& recipientShardIds, + const boost::intrusive_ptr& expCtx, + int samplingRatio) { + + invariant(!recipientShardIds.empty()); + + auto pipelineCursor = sharded_agg_helpers::attachCursorToPipeline( + Pipeline::parse(createRawPipeline(shardKey, samplingRatio, numInitialChunks - 1), expCtx) + .release(), + true); + + auto sampledDocument = pipelineCursor->getNext(); + size_t idx = 0; + while (sampledDocument) { + if (idx % samplingRatio == 0) { + _splitPoints.push_back(sampledDocument.get().toBson()); + } + sampledDocument = pipelineCursor->getNext(); + ++idx; + } + + _numContiguousChunksPerShard = + std::max(numInitialChunks / recipientShardIds.size(), static_cast(1)); + _recipientShardIds = recipientShardIds; +} + +InitialSplitPolicy::ShardCollectionConfig ReshardingSplitPolicy::createFirstChunks( + OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern, SplitPolicyParams params) { + + return createChunks(shardKeyPattern, + params, + _numContiguousChunksPerShard, + _recipientShardIds, + _splitPoints, + LogicalClock::get(opCtx)->getClusterTime().asTimestamp()); +} + } // namespace mongo diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h index 68f1c9253a5..41171b3b552 100644 --- a/src/mongo/db/s/config/initial_split_policy.h +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -272,4 +272,31 @@ private: StringMap _numTagsPerShard; }; +/** + * Split point building strategy to be used for resharding when zones are not defined. + */ +class ReshardingSplitPolicy : public InitialSplitPolicy { +public: + ReshardingSplitPolicy(OperationContext* opCtx, + const NamespaceString& nss, + const ShardKeyPattern& shardKey, + int numInitialChunks, + const std::vector& recipientShardIds, + const boost::intrusive_ptr& expCtx, + int samplingRatio = 10); + ShardCollectionConfig createFirstChunks(OperationContext* opCtx, + const ShardKeyPattern& shardKeyPattern, + SplitPolicyParams params); + /** + * Creates the aggregation pipeline BSON to get documents for sampling from shards. + */ + static std::vector createRawPipeline(const ShardKeyPattern& shardKey, + int samplingRatio, + int numSplitPoints); + +private: + std::vector _splitPoints; + std::vector _recipientShardIds; + int _numContiguousChunksPerShard; +}; } // namespace mongo -- cgit v1.2.1