summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKshitij Gupta <kshitij.gupta@mongodb.com>2020-08-03 18:48:21 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-05 22:15:25 +0000
commit262e5a961fa7221bfba5722aeea2db719f2149f5 (patch)
treee6ed8473165fab5fa2977ee32837d59c8aae10ab
parent441f18826c8469f871c1055032cf49be69e6d314 (diff)
downloadmongo-wireVersion.tar.gz
SERVER-49510: Add functionality to pick new split points during resharding when zones not defined. SERVER-49525: Sample documents to pick new split points for resharding when new shard key has a hashed field.wireVersion
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp241
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/config/initial_split_policy.cpp134
-rw-r--r--src/mongo/db/s/config/initial_split_policy.h27
5 files changed, 384 insertions, 20 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 1929babc330..c9e6c4df264 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -381,6 +381,7 @@ env.CppUnitTest(
'lookup_set_cache_test.cpp',
'pipeline_metadata_tree_test.cpp',
'pipeline_test.cpp',
+ 'resharding_initial_split_policy_test.cpp',
'resume_token_test.cpp',
'semantic_analysis_test.cpp',
'sequential_document_cache_test.cpp',
diff --git a/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp b/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp
new file mode 100644
index 00000000000..8b3806f5f53
--- /dev/null
+++ b/src/mongo/db/pipeline/resharding_initial_split_policy_test.cpp
@@ -0,0 +1,241 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
+#include "mongo/db/s/config/initial_split_policy.h"
+#include "mongo/s/query/sharded_agg_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using ReshardingSplitPolicyTest = ShardedAggTestFixture;
+
+const ShardId primaryShardId = ShardId("0");
+
+TEST_F(ReshardingSplitPolicyTest, ShardKeyWithNonDottedFieldAndIdIsNotProjectedSucceeds) {
+ auto shardKeyPattern = ShardKeyPattern(BSON("a" << 1));
+
+ auto pipeline =
+ Pipeline::parse(ReshardingSplitPolicy::createRawPipeline(
+ shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */),
+ expCtx());
+ auto mockSource =
+ DocumentSourceMock::createForTest({"{_id: 10, a: 15}", "{_id: 3, a: 5}"}, expCtx());
+ pipeline->addInitialSource(mockSource.get());
+
+ // We sample all of the documents since numSplitPoints(1) * samplingRatio (2) = 2 and the
+ // document source has 2 chunks. So we can assert on the returned values.
+ auto next = pipeline->getNext();
+ ASSERT_EQUALS(next.get().getField("a").getInt(), 5);
+ ASSERT(next.get().getField("_id").missing());
+ next = pipeline->getNext();
+ ASSERT_EQUALS(next.get().getField("a").getInt(), 15);
+ ASSERT(next.get().getField("_id").missing());
+ ASSERT(!pipeline->getNext());
+}
+
+TEST_F(ReshardingSplitPolicyTest, ShardKeyWithIdFieldIsProjectedSucceeds) {
+ auto shardKeyPattern = ShardKeyPattern(BSON("_id" << 1));
+
+ auto pipeline =
+ Pipeline::parse(ReshardingSplitPolicy::createRawPipeline(
+ shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */),
+ expCtx());
+ auto mockSource =
+ DocumentSourceMock::createForTest({"{_id: 10, a: 15}", "{_id: 3, a: 5}"}, expCtx());
+ pipeline->addInitialSource(mockSource.get());
+
+ // We sample all of the documents since numSplitPoints(1) * samplingRatio (2) = 2 and the
+ // document source has 2 chunks. So we can assert on the returned values.
+ auto next = pipeline->getNext();
+ ASSERT_EQUALS(next.get().getField("_id").getInt(), 3);
+ ASSERT(next.get().getField("a").missing());
+ next = pipeline->getNext();
+ ASSERT_EQUALS(next.get().getField("_id").getInt(), 10);
+ ASSERT(next.get().getField("a").missing());
+ ASSERT(!pipeline->getNext());
+}
+
+TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithNonDottedHashedFieldSucceeds) {
+ auto shardKeyPattern = ShardKeyPattern(BSON("a" << 1 << "b"
+ << "hashed"));
+
+ auto pipeline =
+ Pipeline::parse(ReshardingSplitPolicy::createRawPipeline(
+ shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */),
+ expCtx());
+ auto mockSource = DocumentSourceMock::createForTest(
+ {"{x: 1, b: 16, a: 15}", "{x: 2, b: 123, a: 5}"}, expCtx());
+ pipeline->addInitialSource(mockSource.get());
+
+ // We sample all of the documents since numSplitPoints(1) * samplingRatio (2) = 2 and the
+ // document source has 2 chunks. So we can assert on the returned values.
+ auto next = pipeline->getNext();
+ ASSERT_EQUALS(next.get().getField("a").getInt(), 5);
+ ASSERT_EQUALS(next.get().getField("b").getLong(), -6548868637522515075);
+ ASSERT(next.get().getField("x").missing());
+ next = pipeline->getNext();
+ ASSERT_EQUALS(next.get().getField("a").getInt(), 15);
+ ASSERT_EQUALS(next.get().getField("b").getLong(), 2598032665634823220);
+ ASSERT(next.get().getField("x").missing());
+ ASSERT(!pipeline->getNext());
+}
+
+TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedFieldSucceeds) {
+ auto shardKeyPattern = ShardKeyPattern(BSON("a.b" << 1 << "c" << 1));
+
+ auto pipeline =
+ Pipeline::parse(ReshardingSplitPolicy::createRawPipeline(
+ shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */),
+ expCtx());
+ auto mockSource = DocumentSourceMock::createForTest(
+ {"{x: 10, a: {b: 20}, c: 1}", "{x: 3, a: {b: 10}, c: 5}"}, expCtx());
+ pipeline->addInitialSource(mockSource.get());
+
+ // We sample all of the documents since numSplitPoints(1) * samplingRatio (2) = 2 and the
+ // document source has 2 chunks. So we can assert on the returned values.
+ auto next = pipeline->getNext();
+ ASSERT_BSONOBJ_EQ(next.get().toBson(), BSON("a" << BSON("b" << 10) << "c" << 5));
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_EQ(next.get().toBson(), BSON("a" << BSON("b" << 20) << "c" << 1));
+ ASSERT(!pipeline->getNext());
+}
+
+TEST_F(ReshardingSplitPolicyTest, CompoundShardKeyWithDottedHashedFieldSucceeds) {
+ auto shardKeyPattern = ShardKeyPattern(BSON("a.b" << 1 << "c" << 1 << "a.c"
+ << "hashed"));
+
+ auto pipeline =
+ Pipeline::parse(ReshardingSplitPolicy::createRawPipeline(
+ shardKeyPattern, 2 /* samplingRatio */, 1 /* numSplitPoints */),
+ expCtx());
+ auto mockSource = DocumentSourceMock::createForTest(
+ {"{x: 10, a: {b: 20, c: 16}, c: 1}", "{x: 3, a: {b: 10, c: 123}, c: 5}"}, expCtx());
+ pipeline->addInitialSource(mockSource.get());
+
+ // We sample all of the documents since numSplitPoints(1) * samplingRatio (2) = 2 and the
+ // document source has 2 chunks. So we can assert on the returned values.
+ auto next = pipeline->getNext();
+ ASSERT_BSONOBJ_EQ(next.get().toBson(),
+ BSON("a" << BSON("b" << 10 << "c" << -6548868637522515075) << "c" << 5));
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_EQ(next.get().toBson(),
+ BSON("a" << BSON("b" << 20 << "c" << 2598032665634823220) << "c" << 1));
+ ASSERT(!pipeline->getNext());
+}
+
+TEST_F(ReshardingSplitPolicyTest, SamplingSuceeds) {
+ auto shards = setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss);
+ // We add a $sortKey field since AsyncResultsMerger expects it in order to merge the batches
+ // from different shards.
+ std::vector<BSONObj> firstShardChunks{
+ BSON("a" << 0 << "$sortKey" << BSON_ARRAY(1)),
+ BSON("a" << 1 << "$sortKey" << BSON_ARRAY(1)),
+ BSON("a" << 2 << "$sortKey" << BSON_ARRAY(2)),
+ BSON("a" << 3 << "$sortKey" << BSON_ARRAY(3)),
+ BSON("a" << 4 << "$sortKey" << BSON_ARRAY(4)),
+ BSON("a" << 5 << "$sortKey" << BSON_ARRAY(5)),
+ BSON("a" << 6 << "$sortKey" << BSON_ARRAY(6)),
+ BSON("a" << 7 << "$sortKey" << BSON_ARRAY(7)),
+ BSON("a" << 8 << "$sortKey" << BSON_ARRAY(8)),
+ BSON("a" << 9 << "$sortKey" << BSON_ARRAY(9)),
+ BSON("a" << 10 << "$sortKey" << BSON_ARRAY(10)),
+ };
+
+ std::vector<BSONObj> secondShardChunks{
+ BSON("a" << 11 << "$sortKey" << BSON_ARRAY(11)),
+ BSON("a" << 12 << "$sortKey" << BSON_ARRAY(12)),
+ BSON("a" << 13 << "$sortKey" << BSON_ARRAY(13)),
+ BSON("a" << 14 << "$sortKey" << BSON_ARRAY(14)),
+ BSON("a" << 15 << "$sortKey" << BSON_ARRAY(15)),
+ BSON("a" << 16 << "$sortKey" << BSON_ARRAY(16)),
+ BSON("a" << 17 << "$sortKey" << BSON_ARRAY(17)),
+ BSON("a" << 18 << "$sortKey" << BSON_ARRAY(18)),
+ BSON("a" << 19 << "$sortKey" << BSON_ARRAY(19)),
+ BSON("a" << 20 << "$sortKey" << BSON_ARRAY(20)),
+ BSON("a" << 21 << "$sortKey" << BSON_ARRAY(21)),
+ BSON("a" << 22 << "$sortKey" << BSON_ARRAY(22)),
+ };
+
+ auto shardKeyPattern = ShardKeyPattern(BSON("a" << 1));
+ std::vector<ShardId> shardIds;
+ for (auto&& shard : shards) {
+ shardIds.push_back(ShardId(shard.getName()));
+ }
+
+ auto future = launchAsync([&] {
+ auto policy = ReshardingSplitPolicy(operationContext(),
+ kTestAggregateNss,
+ shardKeyPattern,
+ 4 /* numInitialChunks */,
+ shardIds,
+ expCtx());
+ const auto chunks = policy
+ .createFirstChunks(operationContext(),
+ shardKeyPattern,
+ {kTestAggregateNss, primaryShardId})
+ .chunks;
+ // We sample all of the documents since numSplitPoints(3) * samplingRatio (10) = 30 and the
+ // document source has 23 chunks. So we can assert on the split points.
+ ASSERT_EQ(chunks.size(), 4);
+ ASSERT_BSONOBJ_EQ(chunks.at(0).getMin(), shardKeyPattern.getKeyPattern().globalMin());
+ ASSERT_BSONOBJ_EQ(chunks.at(0).getMax(), firstShardChunks.at(0).removeField("$sortKey"));
+
+ ASSERT_BSONOBJ_EQ(chunks.at(1).getMin(), firstShardChunks.at(0).removeField("$sortKey"));
+ ASSERT_BSONOBJ_EQ(chunks.at(1).getMax(), firstShardChunks.at(10).removeField("$sortKey"));
+
+ ASSERT_BSONOBJ_EQ(chunks.at(2).getMin(), firstShardChunks.at(10).removeField("$sortKey"));
+ ASSERT_BSONOBJ_EQ(chunks.at(2).getMax(), secondShardChunks.at(9).removeField("$sortKey"));
+
+ ASSERT_BSONOBJ_EQ(chunks.at(3).getMin(), secondShardChunks.at(9).removeField("$sortKey"));
+ ASSERT_BSONOBJ_EQ(chunks.at(3).getMax(), shardKeyPattern.getKeyPattern().globalMax());
+ });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(kTestAggregateNss, CursorId{0}, firstShardChunks)
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(kTestAggregateNss, CursorId{0}, secondShardChunks)
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ future.default_timed_get();
+}
+
+} // namespace
+} // namespace mongo \ No newline at end of file
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index eaad8ca1d68..61309e39100 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -253,6 +253,7 @@ env.Library(
'$BUILD_DIR/mongo/db/vector_clock_mutable',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
'$BUILD_DIR/mongo/util/options_parser/options_parser',
+ '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers',
],
)
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp
index e3fb330800c..36c0f1bf15b 100644
--- a/src/mongo/db/s/config/initial_split_policy.cpp
+++ b/src/mongo/db/s/config/initial_split_policy.cpp
@@ -35,6 +35,7 @@
#include "mongo/client/read_preference.h"
#include "mongo/db/logical_clock.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/grid.h"
@@ -97,6 +98,34 @@ StringMap<std::vector<ShardId>> buildTagsToShardIdsMap(OperationContext* opCtx,
return tagToShardIds;
}
+InitialSplitPolicy::ShardCollectionConfig createChunks(const ShardKeyPattern& shardKeyPattern,
+ SplitPolicyParams params,
+ int numContiguousChunksPerShard,
+ const std::vector<ShardId>& allShardIds,
+ std::vector<BSONObj>& finalSplitPoints,
+ const Timestamp& validAfter) {
+ ChunkVersion version(1, 0, OID::gen());
+ const auto& keyPattern(shardKeyPattern.getKeyPattern());
+
+ std::vector<ChunkType> chunks;
+
+ for (size_t i = 0; i <= finalSplitPoints.size(); i++) {
+ const BSONObj min = (i == 0) ? keyPattern.globalMin() : finalSplitPoints[i - 1];
+ const BSONObj max =
+ (i < finalSplitPoints.size()) ? finalSplitPoints[i] : keyPattern.globalMax();
+
+ // It's possible there are no split points or fewer split points than total number of
+ // shards, and we need to be sure that at least one chunk is placed on the primary shard
+ const ShardId shardId = (i == 0 && finalSplitPoints.size() + 1 < allShardIds.size())
+ ? params.primaryShardId
+ : allShardIds[(i / numContiguousChunksPerShard) % allShardIds.size()];
+
+ appendChunk(params.nss, min, max, &version, validAfter, shardId, &chunks);
+ }
+
+ return {std::move(chunks)};
+}
+
} // namespace
std::vector<BSONObj> InitialSplitPolicy::calculateHashedSplitPoints(
@@ -178,26 +207,12 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle
finalSplitPoints.push_back(splitPoint);
}
- ChunkVersion version(1, 0, OID::gen());
- const auto& keyPattern(shardKeyPattern.getKeyPattern());
-
- std::vector<ChunkType> chunks;
-
- for (size_t i = 0; i <= finalSplitPoints.size(); i++) {
- const BSONObj min = (i == 0) ? keyPattern.globalMin() : finalSplitPoints[i - 1];
- const BSONObj max =
- (i < finalSplitPoints.size()) ? finalSplitPoints[i] : keyPattern.globalMax();
-
- // It's possible there are no split points or fewer split points than total number of
- // shards, and we need to be sure that at least one chunk is placed on the primary shard
- const ShardId shardId = (i == 0 && finalSplitPoints.size() + 1 < allShardIds.size())
- ? databasePrimaryShardId
- : allShardIds[(i / numContiguousChunksPerShard) % allShardIds.size()];
-
- appendChunk(nss, min, max, &version, validAfter, shardId, &chunks);
- }
-
- return {std::move(chunks)};
+ return createChunks(shardKeyPattern,
+ {nss, databasePrimaryShardId},
+ numContiguousChunksPerShard,
+ allShardIds,
+ finalSplitPoints,
+ validAfter);
}
std::unique_ptr<InitialSplitPolicy> InitialSplitPolicy::calculateOptimizationStrategy(
@@ -581,4 +596,83 @@ void PresplitHashedZonesSplitPolicy::_validate(const ShardKeyPattern& shardKeyPa
}
}
+std::vector<BSONObj> ReshardingSplitPolicy::createRawPipeline(const ShardKeyPattern& shardKey,
+ int samplingRatio,
+ int numSplitPoints) {
+
+ std::vector<BSONObj> res;
+ const auto& shardKeyFields = shardKey.getKeyPatternFields();
+
+ BSONObjBuilder projectValBuilder;
+ BSONObjBuilder sortValBuilder;
+
+ for (auto&& fieldRef : shardKeyFields) {
+ // If the shard key includes a hashed field and current fieldRef is the hashed field.
+ if (shardKey.isHashedPattern() &&
+ fieldRef->dottedField().compare(shardKey.getHashedField().fieldNameStringData()) == 0) {
+ projectValBuilder.append(fieldRef->dottedField(),
+ BSON("$toHashedIndexKey"
+ << "$" + fieldRef->dottedField()));
+ } else {
+ projectValBuilder.append(
+ str::stream() << fieldRef->dottedField(),
+ BSON("$ifNull" << BSON_ARRAY("$" + fieldRef->dottedField() << BSONNULL)));
+ }
+
+ sortValBuilder.append(fieldRef->dottedField().toString(), 1);
+ }
+
+ // Do not project _id if it's not part of the shard key.
+ if (!shardKey.hasId()) {
+ projectValBuilder.append("_id", 0);
+ }
+
+ res.push_back(BSON("$sample" << BSON("size" << numSplitPoints * samplingRatio)));
+ res.push_back(BSON("$project" << projectValBuilder.obj()));
+ res.push_back(BSON("$sort" << sortValBuilder.obj()));
+
+ return res;
+}
+
+ReshardingSplitPolicy::ReshardingSplitPolicy(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ShardKeyPattern& shardKey,
+ int numInitialChunks,
+ const std::vector<ShardId>& recipientShardIds,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ int samplingRatio) {
+
+ invariant(!recipientShardIds.empty());
+
+ auto pipelineCursor = sharded_agg_helpers::attachCursorToPipeline(
+ Pipeline::parse(createRawPipeline(shardKey, samplingRatio, numInitialChunks - 1), expCtx)
+ .release(),
+ true);
+
+ auto sampledDocument = pipelineCursor->getNext();
+ size_t idx = 0;
+ while (sampledDocument) {
+ if (idx % samplingRatio == 0) {
+ _splitPoints.push_back(sampledDocument.get().toBson());
+ }
+ sampledDocument = pipelineCursor->getNext();
+ ++idx;
+ }
+
+ _numContiguousChunksPerShard =
+ std::max(numInitialChunks / recipientShardIds.size(), static_cast<size_t>(1));
+ _recipientShardIds = recipientShardIds;
+}
+
+InitialSplitPolicy::ShardCollectionConfig ReshardingSplitPolicy::createFirstChunks(
+ OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern, SplitPolicyParams params) {
+
+ return createChunks(shardKeyPattern,
+ params,
+ _numContiguousChunksPerShard,
+ _recipientShardIds,
+ _splitPoints,
+ LogicalClock::get(opCtx)->getClusterTime().asTimestamp());
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h
index 68f1c9253a5..41171b3b552 100644
--- a/src/mongo/db/s/config/initial_split_policy.h
+++ b/src/mongo/db/s/config/initial_split_policy.h
@@ -272,4 +272,31 @@ private:
StringMap<size_t> _numTagsPerShard;
};
+/**
+ * Split point building strategy to be used for resharding when zones are not defined.
+ */
+class ReshardingSplitPolicy : public InitialSplitPolicy {
+public:
+ ReshardingSplitPolicy(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ShardKeyPattern& shardKey,
+ int numInitialChunks,
+ const std::vector<ShardId>& recipientShardIds,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ int samplingRatio = 10);
+ ShardCollectionConfig createFirstChunks(OperationContext* opCtx,
+ const ShardKeyPattern& shardKeyPattern,
+ SplitPolicyParams params);
+ /**
+ * Creates the aggregation pipeline BSON to get documents for sampling from shards.
+ */
+ static std::vector<BSONObj> createRawPipeline(const ShardKeyPattern& shardKey,
+ int samplingRatio,
+ int numSplitPoints);
+
+private:
+ std::vector<BSONObj> _splitPoints;
+ std::vector<ShardId> _recipientShardIds;
+ int _numContiguousChunksPerShard;
+};
} // namespace mongo