diff options
Diffstat (limited to 'src/mongo/db/s/config')
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy.cpp | 134 | ||||
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy.h | 27 |
2 files changed, 141 insertions, 20 deletions
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index e3fb330800c..36c0f1bf15b 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -35,6 +35,7 @@ #include "mongo/client/read_preference.h" #include "mongo/db/logical_clock.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/grid.h" @@ -97,6 +98,34 @@ StringMap<std::vector<ShardId>> buildTagsToShardIdsMap(OperationContext* opCtx, return tagToShardIds; } +InitialSplitPolicy::ShardCollectionConfig createChunks(const ShardKeyPattern& shardKeyPattern, + SplitPolicyParams params, + int numContiguousChunksPerShard, + const std::vector<ShardId>& allShardIds, + std::vector<BSONObj>& finalSplitPoints, + const Timestamp& validAfter) { + ChunkVersion version(1, 0, OID::gen()); + const auto& keyPattern(shardKeyPattern.getKeyPattern()); + + std::vector<ChunkType> chunks; + + for (size_t i = 0; i <= finalSplitPoints.size(); i++) { + const BSONObj min = (i == 0) ? keyPattern.globalMin() : finalSplitPoints[i - 1]; + const BSONObj max = + (i < finalSplitPoints.size()) ? finalSplitPoints[i] : keyPattern.globalMax(); + + // It's possible there are no split points or fewer split points than total number of + // shards, and we need to be sure that at least one chunk is placed on the primary shard + const ShardId shardId = (i == 0 && finalSplitPoints.size() + 1 < allShardIds.size()) + ? params.primaryShardId + : allShardIds[(i / numContiguousChunksPerShard) % allShardIds.size()]; + + appendChunk(params.nss, min, max, &version, validAfter, shardId, &chunks); + } + + return {std::move(chunks)}; +} + } // namespace std::vector<BSONObj> InitialSplitPolicy::calculateHashedSplitPoints( @@ -178,26 +207,12 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle finalSplitPoints.push_back(splitPoint); } - ChunkVersion version(1, 0, OID::gen()); - const auto& keyPattern(shardKeyPattern.getKeyPattern()); - - std::vector<ChunkType> chunks; - - for (size_t i = 0; i <= finalSplitPoints.size(); i++) { - const BSONObj min = (i == 0) ? keyPattern.globalMin() : finalSplitPoints[i - 1]; - const BSONObj max = - (i < finalSplitPoints.size()) ? finalSplitPoints[i] : keyPattern.globalMax(); - - // It's possible there are no split points or fewer split points than total number of - // shards, and we need to be sure that at least one chunk is placed on the primary shard - const ShardId shardId = (i == 0 && finalSplitPoints.size() + 1 < allShardIds.size()) - ? databasePrimaryShardId - : allShardIds[(i / numContiguousChunksPerShard) % allShardIds.size()]; - - appendChunk(nss, min, max, &version, validAfter, shardId, &chunks); - } - - return {std::move(chunks)}; + return createChunks(shardKeyPattern, + {nss, databasePrimaryShardId}, + numContiguousChunksPerShard, + allShardIds, + finalSplitPoints, + validAfter); } std::unique_ptr<InitialSplitPolicy> InitialSplitPolicy::calculateOptimizationStrategy( @@ -581,4 +596,83 @@ void PresplitHashedZonesSplitPolicy::_validate(const ShardKeyPattern& shardKeyPa } } +std::vector<BSONObj> ReshardingSplitPolicy::createRawPipeline(const ShardKeyPattern& shardKey, + int samplingRatio, + int numSplitPoints) { + + std::vector<BSONObj> res; + const auto& shardKeyFields = shardKey.getKeyPatternFields(); + + BSONObjBuilder projectValBuilder; + BSONObjBuilder sortValBuilder; + + for (auto&& fieldRef : shardKeyFields) { + // If the shard key includes a hashed field and current fieldRef is the hashed field. + if (shardKey.isHashedPattern() && + fieldRef->dottedField().compare(shardKey.getHashedField().fieldNameStringData()) == 0) { + projectValBuilder.append(fieldRef->dottedField(), + BSON("$toHashedIndexKey" + << "$" + fieldRef->dottedField())); + } else { + projectValBuilder.append( + str::stream() << fieldRef->dottedField(), + BSON("$ifNull" << BSON_ARRAY("$" + fieldRef->dottedField() << BSONNULL))); + } + + sortValBuilder.append(fieldRef->dottedField().toString(), 1); + } + + // Do not project _id if it's not part of the shard key. + if (!shardKey.hasId()) { + projectValBuilder.append("_id", 0); + } + + res.push_back(BSON("$sample" << BSON("size" << numSplitPoints * samplingRatio))); + res.push_back(BSON("$project" << projectValBuilder.obj())); + res.push_back(BSON("$sort" << sortValBuilder.obj())); + + return res; +} + +ReshardingSplitPolicy::ReshardingSplitPolicy(OperationContext* opCtx, + const NamespaceString& nss, + const ShardKeyPattern& shardKey, + int numInitialChunks, + const std::vector<ShardId>& recipientShardIds, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + int samplingRatio) { + + invariant(!recipientShardIds.empty()); + + auto pipelineCursor = sharded_agg_helpers::attachCursorToPipeline( + Pipeline::parse(createRawPipeline(shardKey, samplingRatio, numInitialChunks - 1), expCtx) + .release(), + true); + + auto sampledDocument = pipelineCursor->getNext(); + size_t idx = 0; + while (sampledDocument) { + if (idx % samplingRatio == 0) { + _splitPoints.push_back(sampledDocument.get().toBson()); + } + sampledDocument = pipelineCursor->getNext(); + ++idx; + } + + _numContiguousChunksPerShard = + std::max(numInitialChunks / recipientShardIds.size(), static_cast<size_t>(1)); + _recipientShardIds = recipientShardIds; +} + +InitialSplitPolicy::ShardCollectionConfig ReshardingSplitPolicy::createFirstChunks( + OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern, SplitPolicyParams params) { + + return createChunks(shardKeyPattern, + params, + _numContiguousChunksPerShard, + _recipientShardIds, + _splitPoints, + LogicalClock::get(opCtx)->getClusterTime().asTimestamp()); +} + } // namespace mongo diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h index 68f1c9253a5..41171b3b552 100644 --- a/src/mongo/db/s/config/initial_split_policy.h +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -272,4 +272,31 @@ private: StringMap<size_t> _numTagsPerShard; }; +/** + * Split point building strategy to be used for resharding when zones are not defined. + */ +class ReshardingSplitPolicy : public InitialSplitPolicy { +public: + ReshardingSplitPolicy(OperationContext* opCtx, + const NamespaceString& nss, + const ShardKeyPattern& shardKey, + int numInitialChunks, + const std::vector<ShardId>& recipientShardIds, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + int samplingRatio = 10); + ShardCollectionConfig createFirstChunks(OperationContext* opCtx, + const ShardKeyPattern& shardKeyPattern, + SplitPolicyParams params); + /** + * Creates the aggregation pipeline BSON to get documents for sampling from shards. + */ + static std::vector<BSONObj> createRawPipeline(const ShardKeyPattern& shardKey, + int samplingRatio, + int numSplitPoints); + +private: + std::vector<BSONObj> _splitPoints; + std::vector<ShardId> _recipientShardIds; + int _numContiguousChunksPerShard; +}; } // namespace mongo |