summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/config/initial_split_policy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/config/initial_split_policy.cpp')
-rw-r--r--src/mongo/db/s/config/initial_split_policy.cpp134
1 files changed, 114 insertions, 20 deletions
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp
index e3fb330800c..36c0f1bf15b 100644
--- a/src/mongo/db/s/config/initial_split_policy.cpp
+++ b/src/mongo/db/s/config/initial_split_policy.cpp
@@ -35,6 +35,7 @@
#include "mongo/client/read_preference.h"
#include "mongo/db/logical_clock.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/grid.h"
@@ -97,6 +98,34 @@ StringMap<std::vector<ShardId>> buildTagsToShardIdsMap(OperationContext* opCtx,
return tagToShardIds;
}
+InitialSplitPolicy::ShardCollectionConfig createChunks(const ShardKeyPattern& shardKeyPattern,
+ SplitPolicyParams params,
+ int numContiguousChunksPerShard,
+ const std::vector<ShardId>& allShardIds,
+ std::vector<BSONObj>& finalSplitPoints,
+ const Timestamp& validAfter) {
+ ChunkVersion version(1, 0, OID::gen());
+ const auto& keyPattern(shardKeyPattern.getKeyPattern());
+
+ std::vector<ChunkType> chunks;
+
+ for (size_t i = 0; i <= finalSplitPoints.size(); i++) {
+ const BSONObj min = (i == 0) ? keyPattern.globalMin() : finalSplitPoints[i - 1];
+ const BSONObj max =
+ (i < finalSplitPoints.size()) ? finalSplitPoints[i] : keyPattern.globalMax();
+
+ // It's possible there are no split points or fewer split points than total number of
+ // shards, and we need to be sure that at least one chunk is placed on the primary shard
+ const ShardId shardId = (i == 0 && finalSplitPoints.size() + 1 < allShardIds.size())
+ ? params.primaryShardId
+ : allShardIds[(i / numContiguousChunksPerShard) % allShardIds.size()];
+
+ appendChunk(params.nss, min, max, &version, validAfter, shardId, &chunks);
+ }
+
+ return {std::move(chunks)};
+}
+
} // namespace
std::vector<BSONObj> InitialSplitPolicy::calculateHashedSplitPoints(
@@ -178,26 +207,12 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle
finalSplitPoints.push_back(splitPoint);
}
- ChunkVersion version(1, 0, OID::gen());
- const auto& keyPattern(shardKeyPattern.getKeyPattern());
-
- std::vector<ChunkType> chunks;
-
- for (size_t i = 0; i <= finalSplitPoints.size(); i++) {
- const BSONObj min = (i == 0) ? keyPattern.globalMin() : finalSplitPoints[i - 1];
- const BSONObj max =
- (i < finalSplitPoints.size()) ? finalSplitPoints[i] : keyPattern.globalMax();
-
- // It's possible there are no split points or fewer split points than total number of
- // shards, and we need to be sure that at least one chunk is placed on the primary shard
- const ShardId shardId = (i == 0 && finalSplitPoints.size() + 1 < allShardIds.size())
- ? databasePrimaryShardId
- : allShardIds[(i / numContiguousChunksPerShard) % allShardIds.size()];
-
- appendChunk(nss, min, max, &version, validAfter, shardId, &chunks);
- }
-
- return {std::move(chunks)};
+ return createChunks(shardKeyPattern,
+ {nss, databasePrimaryShardId},
+ numContiguousChunksPerShard,
+ allShardIds,
+ finalSplitPoints,
+ validAfter);
}
std::unique_ptr<InitialSplitPolicy> InitialSplitPolicy::calculateOptimizationStrategy(
@@ -581,4 +596,83 @@ void PresplitHashedZonesSplitPolicy::_validate(const ShardKeyPattern& shardKeyPa
}
}
+std::vector<BSONObj> ReshardingSplitPolicy::createRawPipeline(const ShardKeyPattern& shardKey,
+ int samplingRatio,
+ int numSplitPoints) {
+
+ std::vector<BSONObj> res;
+ const auto& shardKeyFields = shardKey.getKeyPatternFields();
+
+ BSONObjBuilder projectValBuilder;
+ BSONObjBuilder sortValBuilder;
+
+ for (auto&& fieldRef : shardKeyFields) {
+ // If the shard key includes a hashed field and current fieldRef is the hashed field.
+ if (shardKey.isHashedPattern() &&
+ fieldRef->dottedField().compare(shardKey.getHashedField().fieldNameStringData()) == 0) {
+ projectValBuilder.append(fieldRef->dottedField(),
+ BSON("$toHashedIndexKey"
+ << "$" + fieldRef->dottedField()));
+ } else {
+ projectValBuilder.append(
+ str::stream() << fieldRef->dottedField(),
+ BSON("$ifNull" << BSON_ARRAY("$" + fieldRef->dottedField() << BSONNULL)));
+ }
+
+ sortValBuilder.append(fieldRef->dottedField().toString(), 1);
+ }
+
+ // Do not project _id if it's not part of the shard key.
+ if (!shardKey.hasId()) {
+ projectValBuilder.append("_id", 0);
+ }
+
+ res.push_back(BSON("$sample" << BSON("size" << numSplitPoints * samplingRatio)));
+ res.push_back(BSON("$project" << projectValBuilder.obj()));
+ res.push_back(BSON("$sort" << sortValBuilder.obj()));
+
+ return res;
+}
+
+ReshardingSplitPolicy::ReshardingSplitPolicy(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ShardKeyPattern& shardKey,
+ int numInitialChunks,
+ const std::vector<ShardId>& recipientShardIds,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ int samplingRatio) {
+
+ invariant(!recipientShardIds.empty());
+
+ auto pipelineCursor = sharded_agg_helpers::attachCursorToPipeline(
+ Pipeline::parse(createRawPipeline(shardKey, samplingRatio, numInitialChunks - 1), expCtx)
+ .release(),
+ true);
+
+ auto sampledDocument = pipelineCursor->getNext();
+ size_t idx = 0;
+ while (sampledDocument) {
+ if (idx % samplingRatio == 0) {
+ _splitPoints.push_back(sampledDocument.get().toBson());
+ }
+ sampledDocument = pipelineCursor->getNext();
+ ++idx;
+ }
+
+ _numContiguousChunksPerShard =
+ std::max(numInitialChunks / recipientShardIds.size(), static_cast<size_t>(1));
+ _recipientShardIds = recipientShardIds;
+}
+
+InitialSplitPolicy::ShardCollectionConfig ReshardingSplitPolicy::createFirstChunks(
+ OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern, SplitPolicyParams params) {
+
+ return createChunks(shardKeyPattern,
+ params,
+ _numContiguousChunksPerShard,
+ _recipientShardIds,
+ _splitPoints,
+ LogicalClock::get(opCtx)->getClusterTime().asTimestamp());
+}
+
} // namespace mongo