diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2022-12-28 17:28:30 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-12-28 17:58:19 +0000 |
commit | d0d9e95c07df276f4fa3cef07982803d689da5c7 (patch) | |
tree | 8a8d616e4289bb787ecb16ad6b88715458e972fc /src | |
parent | 73b7a22328c766e1effbc28ecb02228dba4e1cf8 (diff) | |
download | mongo-d0d9e95c07df276f4fa3cef07982803d689da5c7.tar.gz |
SERVER-71771 Defragmenter should cap chunk size estimation
Diffstat (limited to 'src')
10 files changed, 144 insertions, 82 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index d7e3839b533..7f37b3a38ae 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -586,7 +586,8 @@ void Balancer::_consumeActionStreamLoop() { dataSizeAction.chunkRange, dataSizeAction.version, dataSizeAction.keyPattern, - dataSizeAction.estimatedValue) + dataSizeAction.estimatedValue, + dataSizeAction.maxSize) .thenRunOn(*executor) .onCompletion([this, selectedStream, diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler.h b/src/mongo/db/s/balancer/balancer_commands_scheduler.h index 2c75d93e357..d04126c2270 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler.h +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler.h @@ -147,7 +147,8 @@ public: const ChunkRange& chunkRange, const ShardVersion& version, const KeyPattern& keyPattern, - bool estimatedValue) = 0; + bool estimatedValue, + int64_t maxSize) = 0; virtual SemiFuture<void> requestMoveRange(OperationContext* opCtx, const ShardsvrMoveRange& request, diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp index 2e32bdd6808..d6ca29d85e6 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp @@ -184,6 +184,7 @@ const std::string DataSizeCommandInfo::kKeyPattern = "keyPattern"; const std::string DataSizeCommandInfo::kMinValue = "min"; const std::string DataSizeCommandInfo::kMaxValue = "max"; const std::string DataSizeCommandInfo::kEstimatedValue = "estimate"; +const std::string DataSizeCommandInfo::kMaxSizeValue = "maxSize"; const std::string SplitChunkCommandInfo::kCommandName = "splitChunk"; const std::string SplitChunkCommandInfo::kShardName = "from"; @@ -359,13 +360,15 @@ SemiFuture<DataSizeResponse> BalancerCommandsSchedulerImpl::requestDataSize( const ChunkRange& chunkRange, const ShardVersion& version, const KeyPattern& keyPattern, - bool estimatedValue) { + bool estimatedValue, + int64_t maxSize) { auto commandInfo = std::make_shared<DataSizeCommandInfo>(nss, shardId, keyPattern.toBSON(), chunkRange.getMin(), chunkRange.getMax(), estimatedValue, + maxSize, version); return _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo)) @@ -377,7 +380,8 @@ SemiFuture<DataSizeResponse> BalancerCommandsSchedulerImpl::requestDataSize( } long long sizeBytes = remoteResponse.data["size"].number(); long long numObjects = remoteResponse.data["numObjects"].number(); - return DataSizeResponse(sizeBytes, numObjects); + bool maxSizeReached = remoteResponse.data["maxReached"].trueValue(); + return DataSizeResponse(sizeBytes, numObjects, maxSizeReached); }) .semi(); } diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h index 0d36e2d5959..f68d8d4bbd2 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h @@ -337,12 +337,14 @@ public: const BSONObj& lowerBoundKey, const BSONObj& upperBoundKey, bool estimatedValue, + int64_t maxSize, const ShardVersion& version) : CommandInfo(shardId, nss, boost::none), _shardKeyPattern(shardKeyPattern), _lowerBoundKey(lowerBoundKey), _upperBoundKey(upperBoundKey), _estimatedValue(estimatedValue), + _maxSize(maxSize), _version(version) {} BSONObj serialise() const override { @@ -351,7 +353,8 @@ public: .append(kKeyPattern, _shardKeyPattern) .append(kMinValue, _lowerBoundKey) .append(kMaxValue, _upperBoundKey) - .append(kEstimatedValue, _estimatedValue); + .append(kEstimatedValue, _estimatedValue) + .append(kMaxSizeValue, _maxSize); _version.serialize(ShardVersion::kShardVersionField, &commandBuilder); @@ -363,6 +366,7 @@ private: BSONObj _lowerBoundKey; BSONObj _upperBoundKey; bool _estimatedValue; + int64_t _maxSize; ShardVersion _version; static const std::string kCommandName; @@ -370,6 +374,7 @@ private: static const std::string kMinValue; static const std::string kMaxValue; static const std::string kEstimatedValue; + static const std::string kMaxSizeValue; }; class SplitChunkCommandInfo : public CommandInfo { @@ -579,7 +584,8 @@ public: const ChunkRange& chunkRange, const ShardVersion& version, const KeyPattern& keyPattern, - bool estimatedValue) override; + bool estimatedValue, + int64_t maxSize) override; private: enum class SchedulerState { Recovering, Running, Stopping, Stopped }; diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp index b9c5827d83a..2c096547669 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp @@ -308,7 +308,8 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) { chunk.getRange(), ShardVersion(chunk.getVersion(), boost::optional<CollectionIndexes>(boost::none)), KeyPattern(BSON("x" << 1)), - false /* issuedByRemoteUser */); + false /* issuedByRemoteUser */, + (kDefaultMaxChunkSizeBytes / 100) * 25 /* maxSize */); auto swReceivedDataSize = futureResponse.getNoThrow(); ASSERT_OK(swReceivedDataSize.getStatus()); auto receivedDataSize = swReceivedDataSize.getValue(); diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp index c7d43d723df..f8e0372814a 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp @@ -169,6 +169,11 @@ public: auto collectionChunks = getCollectionChunks(opCtx, coll); const auto collectionZones = getCollectionZones(opCtx, coll); + // Calculate small chunk threshold to limit dataSize commands + const auto maxChunkSizeBytes = getCollectionMaxChunkSizeBytes(opCtx, coll); + const int64_t smallChunkSizeThreshold = + (maxChunkSizeBytes / 100) * kSmallChunkSizeThresholdPctg; + stdx::unordered_map<ShardId, PendingActions> pendingActionsByShards; // Find ranges of chunks; for single-chunk ranges, request DataSize; for multi-range, issue // merge @@ -195,6 +200,7 @@ public: new MergeAndMeasureChunksPhase(coll.getNss(), coll.getUuid(), coll.getKeyPattern().toBSON(), + smallChunkSizeThreshold, std::move(pendingActionsByShards))); } @@ -220,14 +226,15 @@ public: if (pendingActions.rangesWithoutDataSize.size() > pendingActions.rangesToMerge.size()) { const auto& rangeToMeasure = pendingActions.rangesWithoutDataSize.back(); - nextAction = - boost::optional<DefragmentationAction>(DataSizeInfo(shardId, - _nss, - _uuid, - rangeToMeasure, - shardVersion, - _shardKey, - true /* estimate */)); + nextAction = boost::optional<DefragmentationAction>( + DataSizeInfo(shardId, + _nss, + _uuid, + rangeToMeasure, + shardVersion, + _shardKey, + true /* estimate */, + _smallChunkSizeThresholdBytes /* maxSize */)); pendingActions.rangesWithoutDataSize.pop_back(); } else if (!pendingActions.rangesToMerge.empty()) { const auto& rangeToMerge = pendingActions.rangesToMerge.back(); @@ -266,64 +273,70 @@ public: return; } stdx::visit( - OverloadedVisitor{[&](const MergeInfo& mergeAction) { - auto& mergeResponse = stdx::get<Status>(response); - auto& shardingPendingActions = - _pendingActionsByShards[mergeAction.shardId]; - handleActionResult( - opCtx, - _nss, - _uuid, - getType(), - mergeResponse, - [&]() { - shardingPendingActions.rangesWithoutDataSize.emplace_back( - mergeAction.chunkRange); - }, - [&]() { - shardingPendingActions.rangesToMerge.emplace_back( - mergeAction.chunkRange); - }, - [&]() { _abort(getType()); }); - }, - [&](const DataSizeInfo& dataSizeAction) { - auto& dataSizeResponse = - stdx::get<StatusWith<DataSizeResponse>>(response); - handleActionResult( - opCtx, - _nss, - _uuid, - getType(), - dataSizeResponse.getStatus(), - [&]() { - ChunkType chunk(dataSizeAction.uuid, - dataSizeAction.chunkRange, - dataSizeAction.version.placementVersion(), - dataSizeAction.shardId); - auto catalogManager = ShardingCatalogManager::get(opCtx); - catalogManager->setChunkEstimatedSize( - opCtx, - chunk, - dataSizeResponse.getValue().sizeBytes, - ShardingCatalogClient::kMajorityWriteConcern); - }, - [&]() { - auto& shardingPendingActions = - _pendingActionsByShards[dataSizeAction.shardId]; - shardingPendingActions.rangesWithoutDataSize.emplace_back( - dataSizeAction.chunkRange); - }, - [&]() { _abort(getType()); }); - }, - [&](const AutoSplitVectorInfo& _) { - uasserted(ErrorCodes::BadValue, "Unexpected action type"); - }, - [&](const SplitInfoWithKeyPattern& _) { - uasserted(ErrorCodes::BadValue, "Unexpected action type"); - }, - [&](const MigrateInfo& _) { - uasserted(ErrorCodes::BadValue, "Unexpected action type"); - }}, + OverloadedVisitor{ + [&](const MergeInfo& mergeAction) { + auto& mergeResponse = stdx::get<Status>(response); + auto& shardingPendingActions = _pendingActionsByShards[mergeAction.shardId]; + handleActionResult( + opCtx, + _nss, + _uuid, + getType(), + mergeResponse, + [&]() { + shardingPendingActions.rangesWithoutDataSize.emplace_back( + mergeAction.chunkRange); + }, + [&]() { + shardingPendingActions.rangesToMerge.emplace_back( + mergeAction.chunkRange); + }, + [&]() { _abort(getType()); }); + }, + [&](const DataSizeInfo& dataSizeAction) { + auto& dataSizeResponse = stdx::get<StatusWith<DataSizeResponse>>(response); + handleActionResult( + opCtx, + _nss, + _uuid, + getType(), + dataSizeResponse.getStatus(), + [&]() { + ChunkType chunk(dataSizeAction.uuid, + dataSizeAction.chunkRange, + dataSizeAction.version.placementVersion(), + dataSizeAction.shardId); + auto catalogManager = ShardingCatalogManager::get(opCtx); + // Max out the chunk size if it has has been estimated as bigger + // than _smallChunkSizeThresholdBytes; this will exlude the + // chunk from the list of candidates considered by + // MoveAndMergeChunksPhase + auto estimatedSize = dataSizeResponse.getValue().maxSizeReached + ? std::numeric_limits<int64_t>::max() + : dataSizeResponse.getValue().sizeBytes; + catalogManager->setChunkEstimatedSize( + opCtx, + chunk, + estimatedSize, + ShardingCatalogClient::kMajorityWriteConcern); + }, + [&]() { + auto& shardingPendingActions = + _pendingActionsByShards[dataSizeAction.shardId]; + shardingPendingActions.rangesWithoutDataSize.emplace_back( + dataSizeAction.chunkRange); + }, + [&]() { _abort(getType()); }); + }, + [&](const AutoSplitVectorInfo& _) { + uasserted(ErrorCodes::BadValue, "Unexpected action type"); + }, + [&](const SplitInfoWithKeyPattern& _) { + uasserted(ErrorCodes::BadValue, "Unexpected action type"); + }, + [&](const MigrateInfo& _) { + uasserted(ErrorCodes::BadValue, "Unexpected action type"); + }}, action); } @@ -357,10 +370,12 @@ private: const NamespaceString& nss, const UUID& uuid, const BSONObj& shardKey, + const int64_t smallChunkSizeThresholdBytes, stdx::unordered_map<ShardId, PendingActions>&& pendingActionsByShards) : _nss(nss), _uuid(uuid), _shardKey(shardKey), + _smallChunkSizeThresholdBytes(smallChunkSizeThresholdBytes), _pendingActionsByShards(std::move(pendingActionsByShards)) {} void _abort(const DefragmentationPhaseEnum nextPhase) { @@ -372,6 +387,7 @@ private: const NamespaceString _nss; const UUID _uuid; const BSONObj _shardKey; + const int64_t _smallChunkSizeThresholdBytes; stdx::unordered_map<ShardId, PendingActions> _pendingActionsByShards; boost::optional<ShardId> _shardToProcess; size_t _outstandingActions{0}; @@ -765,8 +781,6 @@ private: bool _isChunkToMergeLeftSibling; }; - static constexpr uint64_t kSmallChunkSizeThresholdPctg = 25; - const NamespaceString _nss; const UUID _uuid; diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h index 06300c67809..dfa284aca13 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h @@ -64,6 +64,9 @@ public: virtual bool isComplete() const = 0; virtual void userAbort() = 0; + +protected: + static constexpr uint64_t kSmallChunkSizeThresholdPctg = 25; }; class BalancerDefragmentationPolicyImpl : public BalancerDefragmentationPolicy { diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp index 389066255a9..0ffb8b256d2 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp @@ -264,7 +264,7 @@ TEST_F(BalancerDefragmentationPolicyTest, ASSERT_TRUE(nextAction.has_value()); DataSizeInfo dataSizeAction = stdx::get<DataSizeInfo>(*nextAction); - auto resp = StatusWith(DataSizeResponse(2000, 4)); + auto resp = StatusWith(DataSizeResponse(2000, 4, false)); _defragmentationPolicy.applyActionResult(operationContext(), dataSizeAction, resp); // 1. The outcome of the data size has been stored in the expected document... @@ -281,6 +281,33 @@ TEST_F(BalancerDefragmentationPolicyTest, verifyExpectedDefragmentationPhaseOndisk(DefragmentationPhaseEnum::kMoveAndMergeChunks); } +TEST_F(BalancerDefragmentationPolicyTest, + TestPhaseOneDataSizeResponsesWithMaxSizeReachedCausesChunkToBeSkippedByPhaseTwo) { + auto coll = setupCollectionWithPhase({makeConfigChunkEntry()}); + setDefaultClusterStats(); + _defragmentationPolicy.startCollectionDefragmentation(operationContext(), coll); + auto nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext()); + ASSERT_TRUE(nextAction.has_value()); + DataSizeInfo dataSizeAction = stdx::get<DataSizeInfo>(*nextAction); + + auto resp = StatusWith(DataSizeResponse(2000, 4, true)); + _defragmentationPolicy.applyActionResult(operationContext(), dataSizeAction, resp); + + // 1. The outcome of the data size has been stored in the expected document... + auto chunkQuery = BSON(ChunkType::collectionUUID() + << kUuid << ChunkType::min(kKeyAtMin) << ChunkType::max(kKeyAtMax)); + auto configChunkDoc = + findOneOnConfigCollection(operationContext(), ChunkType::ConfigNS, chunkQuery).getValue(); + ASSERT_EQ(configChunkDoc.getField("estimatedDataSizeBytes").safeNumberLong(), + std::numeric_limits<int64_t>::max()); + + // No new action is expected - and the algorithm should converge + nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext()); + ASSERT_TRUE(nextAction == boost::none); + ASSERT_FALSE(_defragmentationPolicy.isDefragmentingCollection(coll.getUuid())); + verifyExpectedDefragmentationPhaseOndisk(boost::none); +} + TEST_F(BalancerDefragmentationPolicyTest, TestRetriableFailedDataSizeActionGetsReissued) { auto coll = setupCollectionWithPhase({makeConfigChunkEntry()}); _defragmentationPolicy.startCollectionDefragmentation(operationContext(), coll); @@ -315,7 +342,7 @@ TEST_F(BalancerDefragmentationPolicyTest, TestRemoveCollectionEndsDefragmentatio auto nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext()); DataSizeInfo dataSizeAction = stdx::get<DataSizeInfo>(*nextAction); - auto resp = StatusWith(DataSizeResponse(2000, 4)); + auto resp = StatusWith(DataSizeResponse(2000, 4, false)); _defragmentationPolicy.applyActionResult(operationContext(), dataSizeAction, resp); // Remove collection entry from config.collections diff --git a/src/mongo/db/s/balancer/balancer_policy.cpp b/src/mongo/db/s/balancer/balancer_policy.cpp index 71d42629e52..7677a0e0d23 100644 --- a/src/mongo/db/s/balancer/balancer_policy.cpp +++ b/src/mongo/db/s/balancer/balancer_policy.cpp @@ -976,13 +976,15 @@ DataSizeInfo::DataSizeInfo(const ShardId& shardId, const ChunkRange& chunkRange, const ShardVersion& version, const KeyPattern& keyPattern, - bool estimatedValue) + bool estimatedValue, + int64_t maxSize) : shardId(shardId), nss(nss), uuid(uuid), chunkRange(chunkRange), version(version), keyPattern(keyPattern), - estimatedValue(estimatedValue) {} + estimatedValue(estimatedValue), + maxSize(maxSize) {} } // namespace mongo diff --git a/src/mongo/db/s/balancer/balancer_policy.h b/src/mongo/db/s/balancer/balancer_policy.h index 8436a2a77b8..706d2cdb351 100644 --- a/src/mongo/db/s/balancer/balancer_policy.h +++ b/src/mongo/db/s/balancer/balancer_policy.h @@ -187,7 +187,8 @@ struct DataSizeInfo { const ChunkRange& chunkRange, const ShardVersion& version, const KeyPattern& keyPattern, - bool estimatedValue); + bool estimatedValue, + int64_t maxSize); ShardId shardId; NamespaceString nss; @@ -198,14 +199,16 @@ struct DataSizeInfo { ShardVersion version; KeyPattern keyPattern; bool estimatedValue; + int64_t maxSize; }; struct DataSizeResponse { - DataSizeResponse(long long sizeBytes, long long numObjects) - : sizeBytes(sizeBytes), numObjects(numObjects) {} + DataSizeResponse(long long sizeBytes, long long numObjects, bool maxSizeReached) + : sizeBytes(sizeBytes), numObjects(numObjects), maxSizeReached(maxSizeReached) {} long long sizeBytes; long long numObjects; + bool maxSizeReached; }; typedef stdx:: |