summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2022-12-28 17:28:30 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-28 17:58:19 +0000
commitd0d9e95c07df276f4fa3cef07982803d689da5c7 (patch)
tree8a8d616e4289bb787ecb16ad6b88715458e972fc /src
parent73b7a22328c766e1effbc28ecb02228dba4e1cf8 (diff)
downloadmongo-d0d9e95c07df276f4fa3cef07982803d689da5c7.tar.gz
SERVER-71771 Defragmenter should cap chunk size estimation
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/balancer/balancer.cpp3
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler.h3
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp8
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h10
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp3
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp150
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h3
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp31
-rw-r--r--src/mongo/db/s/balancer/balancer_policy.cpp6
-rw-r--r--src/mongo/db/s/balancer/balancer_policy.h9
10 files changed, 144 insertions, 82 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp
index d7e3839b533..7f37b3a38ae 100644
--- a/src/mongo/db/s/balancer/balancer.cpp
+++ b/src/mongo/db/s/balancer/balancer.cpp
@@ -586,7 +586,8 @@ void Balancer::_consumeActionStreamLoop() {
dataSizeAction.chunkRange,
dataSizeAction.version,
dataSizeAction.keyPattern,
- dataSizeAction.estimatedValue)
+ dataSizeAction.estimatedValue,
+ dataSizeAction.maxSize)
.thenRunOn(*executor)
.onCompletion([this,
selectedStream,
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler.h b/src/mongo/db/s/balancer/balancer_commands_scheduler.h
index 2c75d93e357..d04126c2270 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler.h
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler.h
@@ -147,7 +147,8 @@ public:
const ChunkRange& chunkRange,
const ShardVersion& version,
const KeyPattern& keyPattern,
- bool estimatedValue) = 0;
+ bool estimatedValue,
+ int64_t maxSize) = 0;
virtual SemiFuture<void> requestMoveRange(OperationContext* opCtx,
const ShardsvrMoveRange& request,
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
index 2e32bdd6808..d6ca29d85e6 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
@@ -184,6 +184,7 @@ const std::string DataSizeCommandInfo::kKeyPattern = "keyPattern";
const std::string DataSizeCommandInfo::kMinValue = "min";
const std::string DataSizeCommandInfo::kMaxValue = "max";
const std::string DataSizeCommandInfo::kEstimatedValue = "estimate";
+const std::string DataSizeCommandInfo::kMaxSizeValue = "maxSize";
const std::string SplitChunkCommandInfo::kCommandName = "splitChunk";
const std::string SplitChunkCommandInfo::kShardName = "from";
@@ -359,13 +360,15 @@ SemiFuture<DataSizeResponse> BalancerCommandsSchedulerImpl::requestDataSize(
const ChunkRange& chunkRange,
const ShardVersion& version,
const KeyPattern& keyPattern,
- bool estimatedValue) {
+ bool estimatedValue,
+ int64_t maxSize) {
auto commandInfo = std::make_shared<DataSizeCommandInfo>(nss,
shardId,
keyPattern.toBSON(),
chunkRange.getMin(),
chunkRange.getMax(),
estimatedValue,
+ maxSize,
version);
return _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo))
@@ -377,7 +380,8 @@ SemiFuture<DataSizeResponse> BalancerCommandsSchedulerImpl::requestDataSize(
}
long long sizeBytes = remoteResponse.data["size"].number();
long long numObjects = remoteResponse.data["numObjects"].number();
- return DataSizeResponse(sizeBytes, numObjects);
+ bool maxSizeReached = remoteResponse.data["maxReached"].trueValue();
+ return DataSizeResponse(sizeBytes, numObjects, maxSizeReached);
})
.semi();
}
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
index 0d36e2d5959..f68d8d4bbd2 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
@@ -337,12 +337,14 @@ public:
const BSONObj& lowerBoundKey,
const BSONObj& upperBoundKey,
bool estimatedValue,
+ int64_t maxSize,
const ShardVersion& version)
: CommandInfo(shardId, nss, boost::none),
_shardKeyPattern(shardKeyPattern),
_lowerBoundKey(lowerBoundKey),
_upperBoundKey(upperBoundKey),
_estimatedValue(estimatedValue),
+ _maxSize(maxSize),
_version(version) {}
BSONObj serialise() const override {
@@ -351,7 +353,8 @@ public:
.append(kKeyPattern, _shardKeyPattern)
.append(kMinValue, _lowerBoundKey)
.append(kMaxValue, _upperBoundKey)
- .append(kEstimatedValue, _estimatedValue);
+ .append(kEstimatedValue, _estimatedValue)
+ .append(kMaxSizeValue, _maxSize);
_version.serialize(ShardVersion::kShardVersionField, &commandBuilder);
@@ -363,6 +366,7 @@ private:
BSONObj _lowerBoundKey;
BSONObj _upperBoundKey;
bool _estimatedValue;
+ int64_t _maxSize;
ShardVersion _version;
static const std::string kCommandName;
@@ -370,6 +374,7 @@ private:
static const std::string kMinValue;
static const std::string kMaxValue;
static const std::string kEstimatedValue;
+ static const std::string kMaxSizeValue;
};
class SplitChunkCommandInfo : public CommandInfo {
@@ -579,7 +584,8 @@ public:
const ChunkRange& chunkRange,
const ShardVersion& version,
const KeyPattern& keyPattern,
- bool estimatedValue) override;
+ bool estimatedValue,
+ int64_t maxSize) override;
private:
enum class SchedulerState { Recovering, Running, Stopping, Stopped };
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
index b9c5827d83a..2c096547669 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
@@ -308,7 +308,8 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) {
chunk.getRange(),
ShardVersion(chunk.getVersion(), boost::optional<CollectionIndexes>(boost::none)),
KeyPattern(BSON("x" << 1)),
- false /* issuedByRemoteUser */);
+ false /* issuedByRemoteUser */,
+ (kDefaultMaxChunkSizeBytes / 100) * 25 /* maxSize */);
auto swReceivedDataSize = futureResponse.getNoThrow();
ASSERT_OK(swReceivedDataSize.getStatus());
auto receivedDataSize = swReceivedDataSize.getValue();
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
index c7d43d723df..f8e0372814a 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
@@ -169,6 +169,11 @@ public:
auto collectionChunks = getCollectionChunks(opCtx, coll);
const auto collectionZones = getCollectionZones(opCtx, coll);
+ // Calculate small chunk threshold to limit dataSize commands
+ const auto maxChunkSizeBytes = getCollectionMaxChunkSizeBytes(opCtx, coll);
+ const int64_t smallChunkSizeThreshold =
+ (maxChunkSizeBytes / 100) * kSmallChunkSizeThresholdPctg;
+
stdx::unordered_map<ShardId, PendingActions> pendingActionsByShards;
// Find ranges of chunks; for single-chunk ranges, request DataSize; for multi-range, issue
// merge
@@ -195,6 +200,7 @@ public:
new MergeAndMeasureChunksPhase(coll.getNss(),
coll.getUuid(),
coll.getKeyPattern().toBSON(),
+ smallChunkSizeThreshold,
std::move(pendingActionsByShards)));
}
@@ -220,14 +226,15 @@ public:
if (pendingActions.rangesWithoutDataSize.size() > pendingActions.rangesToMerge.size()) {
const auto& rangeToMeasure = pendingActions.rangesWithoutDataSize.back();
- nextAction =
- boost::optional<DefragmentationAction>(DataSizeInfo(shardId,
- _nss,
- _uuid,
- rangeToMeasure,
- shardVersion,
- _shardKey,
- true /* estimate */));
+ nextAction = boost::optional<DefragmentationAction>(
+ DataSizeInfo(shardId,
+ _nss,
+ _uuid,
+ rangeToMeasure,
+ shardVersion,
+ _shardKey,
+ true /* estimate */,
+ _smallChunkSizeThresholdBytes /* maxSize */));
pendingActions.rangesWithoutDataSize.pop_back();
} else if (!pendingActions.rangesToMerge.empty()) {
const auto& rangeToMerge = pendingActions.rangesToMerge.back();
@@ -266,64 +273,70 @@ public:
return;
}
stdx::visit(
- OverloadedVisitor{[&](const MergeInfo& mergeAction) {
- auto& mergeResponse = stdx::get<Status>(response);
- auto& shardingPendingActions =
- _pendingActionsByShards[mergeAction.shardId];
- handleActionResult(
- opCtx,
- _nss,
- _uuid,
- getType(),
- mergeResponse,
- [&]() {
- shardingPendingActions.rangesWithoutDataSize.emplace_back(
- mergeAction.chunkRange);
- },
- [&]() {
- shardingPendingActions.rangesToMerge.emplace_back(
- mergeAction.chunkRange);
- },
- [&]() { _abort(getType()); });
- },
- [&](const DataSizeInfo& dataSizeAction) {
- auto& dataSizeResponse =
- stdx::get<StatusWith<DataSizeResponse>>(response);
- handleActionResult(
- opCtx,
- _nss,
- _uuid,
- getType(),
- dataSizeResponse.getStatus(),
- [&]() {
- ChunkType chunk(dataSizeAction.uuid,
- dataSizeAction.chunkRange,
- dataSizeAction.version.placementVersion(),
- dataSizeAction.shardId);
- auto catalogManager = ShardingCatalogManager::get(opCtx);
- catalogManager->setChunkEstimatedSize(
- opCtx,
- chunk,
- dataSizeResponse.getValue().sizeBytes,
- ShardingCatalogClient::kMajorityWriteConcern);
- },
- [&]() {
- auto& shardingPendingActions =
- _pendingActionsByShards[dataSizeAction.shardId];
- shardingPendingActions.rangesWithoutDataSize.emplace_back(
- dataSizeAction.chunkRange);
- },
- [&]() { _abort(getType()); });
- },
- [&](const AutoSplitVectorInfo& _) {
- uasserted(ErrorCodes::BadValue, "Unexpected action type");
- },
- [&](const SplitInfoWithKeyPattern& _) {
- uasserted(ErrorCodes::BadValue, "Unexpected action type");
- },
- [&](const MigrateInfo& _) {
- uasserted(ErrorCodes::BadValue, "Unexpected action type");
- }},
+ OverloadedVisitor{
+ [&](const MergeInfo& mergeAction) {
+ auto& mergeResponse = stdx::get<Status>(response);
+ auto& shardingPendingActions = _pendingActionsByShards[mergeAction.shardId];
+ handleActionResult(
+ opCtx,
+ _nss,
+ _uuid,
+ getType(),
+ mergeResponse,
+ [&]() {
+ shardingPendingActions.rangesWithoutDataSize.emplace_back(
+ mergeAction.chunkRange);
+ },
+ [&]() {
+ shardingPendingActions.rangesToMerge.emplace_back(
+ mergeAction.chunkRange);
+ },
+ [&]() { _abort(getType()); });
+ },
+ [&](const DataSizeInfo& dataSizeAction) {
+ auto& dataSizeResponse = stdx::get<StatusWith<DataSizeResponse>>(response);
+ handleActionResult(
+ opCtx,
+ _nss,
+ _uuid,
+ getType(),
+ dataSizeResponse.getStatus(),
+ [&]() {
+ ChunkType chunk(dataSizeAction.uuid,
+ dataSizeAction.chunkRange,
+ dataSizeAction.version.placementVersion(),
+ dataSizeAction.shardId);
+ auto catalogManager = ShardingCatalogManager::get(opCtx);
+ // Max out the chunk size if it has has been estimated as bigger
+ // than _smallChunkSizeThresholdBytes; this will exlude the
+ // chunk from the list of candidates considered by
+ // MoveAndMergeChunksPhase
+ auto estimatedSize = dataSizeResponse.getValue().maxSizeReached
+ ? std::numeric_limits<int64_t>::max()
+ : dataSizeResponse.getValue().sizeBytes;
+ catalogManager->setChunkEstimatedSize(
+ opCtx,
+ chunk,
+ estimatedSize,
+ ShardingCatalogClient::kMajorityWriteConcern);
+ },
+ [&]() {
+ auto& shardingPendingActions =
+ _pendingActionsByShards[dataSizeAction.shardId];
+ shardingPendingActions.rangesWithoutDataSize.emplace_back(
+ dataSizeAction.chunkRange);
+ },
+ [&]() { _abort(getType()); });
+ },
+ [&](const AutoSplitVectorInfo& _) {
+ uasserted(ErrorCodes::BadValue, "Unexpected action type");
+ },
+ [&](const SplitInfoWithKeyPattern& _) {
+ uasserted(ErrorCodes::BadValue, "Unexpected action type");
+ },
+ [&](const MigrateInfo& _) {
+ uasserted(ErrorCodes::BadValue, "Unexpected action type");
+ }},
action);
}
@@ -357,10 +370,12 @@ private:
const NamespaceString& nss,
const UUID& uuid,
const BSONObj& shardKey,
+ const int64_t smallChunkSizeThresholdBytes,
stdx::unordered_map<ShardId, PendingActions>&& pendingActionsByShards)
: _nss(nss),
_uuid(uuid),
_shardKey(shardKey),
+ _smallChunkSizeThresholdBytes(smallChunkSizeThresholdBytes),
_pendingActionsByShards(std::move(pendingActionsByShards)) {}
void _abort(const DefragmentationPhaseEnum nextPhase) {
@@ -372,6 +387,7 @@ private:
const NamespaceString _nss;
const UUID _uuid;
const BSONObj _shardKey;
+ const int64_t _smallChunkSizeThresholdBytes;
stdx::unordered_map<ShardId, PendingActions> _pendingActionsByShards;
boost::optional<ShardId> _shardToProcess;
size_t _outstandingActions{0};
@@ -765,8 +781,6 @@ private:
bool _isChunkToMergeLeftSibling;
};
- static constexpr uint64_t kSmallChunkSizeThresholdPctg = 25;
-
const NamespaceString _nss;
const UUID _uuid;
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
index 06300c67809..dfa284aca13 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
@@ -64,6 +64,9 @@ public:
virtual bool isComplete() const = 0;
virtual void userAbort() = 0;
+
+protected:
+ static constexpr uint64_t kSmallChunkSizeThresholdPctg = 25;
};
class BalancerDefragmentationPolicyImpl : public BalancerDefragmentationPolicy {
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
index 389066255a9..0ffb8b256d2 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
@@ -264,7 +264,7 @@ TEST_F(BalancerDefragmentationPolicyTest,
ASSERT_TRUE(nextAction.has_value());
DataSizeInfo dataSizeAction = stdx::get<DataSizeInfo>(*nextAction);
- auto resp = StatusWith(DataSizeResponse(2000, 4));
+ auto resp = StatusWith(DataSizeResponse(2000, 4, false));
_defragmentationPolicy.applyActionResult(operationContext(), dataSizeAction, resp);
// 1. The outcome of the data size has been stored in the expected document...
@@ -281,6 +281,33 @@ TEST_F(BalancerDefragmentationPolicyTest,
verifyExpectedDefragmentationPhaseOndisk(DefragmentationPhaseEnum::kMoveAndMergeChunks);
}
+TEST_F(BalancerDefragmentationPolicyTest,
+ TestPhaseOneDataSizeResponsesWithMaxSizeReachedCausesChunkToBeSkippedByPhaseTwo) {
+ auto coll = setupCollectionWithPhase({makeConfigChunkEntry()});
+ setDefaultClusterStats();
+ _defragmentationPolicy.startCollectionDefragmentation(operationContext(), coll);
+ auto nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext());
+ ASSERT_TRUE(nextAction.has_value());
+ DataSizeInfo dataSizeAction = stdx::get<DataSizeInfo>(*nextAction);
+
+ auto resp = StatusWith(DataSizeResponse(2000, 4, true));
+ _defragmentationPolicy.applyActionResult(operationContext(), dataSizeAction, resp);
+
+ // 1. The outcome of the data size has been stored in the expected document...
+ auto chunkQuery = BSON(ChunkType::collectionUUID()
+ << kUuid << ChunkType::min(kKeyAtMin) << ChunkType::max(kKeyAtMax));
+ auto configChunkDoc =
+ findOneOnConfigCollection(operationContext(), ChunkType::ConfigNS, chunkQuery).getValue();
+ ASSERT_EQ(configChunkDoc.getField("estimatedDataSizeBytes").safeNumberLong(),
+ std::numeric_limits<int64_t>::max());
+
+ // No new action is expected - and the algorithm should converge
+ nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext());
+ ASSERT_TRUE(nextAction == boost::none);
+ ASSERT_FALSE(_defragmentationPolicy.isDefragmentingCollection(coll.getUuid()));
+ verifyExpectedDefragmentationPhaseOndisk(boost::none);
+}
+
TEST_F(BalancerDefragmentationPolicyTest, TestRetriableFailedDataSizeActionGetsReissued) {
auto coll = setupCollectionWithPhase({makeConfigChunkEntry()});
_defragmentationPolicy.startCollectionDefragmentation(operationContext(), coll);
@@ -315,7 +342,7 @@ TEST_F(BalancerDefragmentationPolicyTest, TestRemoveCollectionEndsDefragmentatio
auto nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext());
DataSizeInfo dataSizeAction = stdx::get<DataSizeInfo>(*nextAction);
- auto resp = StatusWith(DataSizeResponse(2000, 4));
+ auto resp = StatusWith(DataSizeResponse(2000, 4, false));
_defragmentationPolicy.applyActionResult(operationContext(), dataSizeAction, resp);
// Remove collection entry from config.collections
diff --git a/src/mongo/db/s/balancer/balancer_policy.cpp b/src/mongo/db/s/balancer/balancer_policy.cpp
index 71d42629e52..7677a0e0d23 100644
--- a/src/mongo/db/s/balancer/balancer_policy.cpp
+++ b/src/mongo/db/s/balancer/balancer_policy.cpp
@@ -976,13 +976,15 @@ DataSizeInfo::DataSizeInfo(const ShardId& shardId,
const ChunkRange& chunkRange,
const ShardVersion& version,
const KeyPattern& keyPattern,
- bool estimatedValue)
+ bool estimatedValue,
+ int64_t maxSize)
: shardId(shardId),
nss(nss),
uuid(uuid),
chunkRange(chunkRange),
version(version),
keyPattern(keyPattern),
- estimatedValue(estimatedValue) {}
+ estimatedValue(estimatedValue),
+ maxSize(maxSize) {}
} // namespace mongo
diff --git a/src/mongo/db/s/balancer/balancer_policy.h b/src/mongo/db/s/balancer/balancer_policy.h
index 8436a2a77b8..706d2cdb351 100644
--- a/src/mongo/db/s/balancer/balancer_policy.h
+++ b/src/mongo/db/s/balancer/balancer_policy.h
@@ -187,7 +187,8 @@ struct DataSizeInfo {
const ChunkRange& chunkRange,
const ShardVersion& version,
const KeyPattern& keyPattern,
- bool estimatedValue);
+ bool estimatedValue,
+ int64_t maxSize);
ShardId shardId;
NamespaceString nss;
@@ -198,14 +199,16 @@ struct DataSizeInfo {
ShardVersion version;
KeyPattern keyPattern;
bool estimatedValue;
+ int64_t maxSize;
};
struct DataSizeResponse {
- DataSizeResponse(long long sizeBytes, long long numObjects)
- : sizeBytes(sizeBytes), numObjects(numObjects) {}
+ DataSizeResponse(long long sizeBytes, long long numObjects, bool maxSizeReached)
+ : sizeBytes(sizeBytes), numObjects(numObjects), maxSizeReached(maxSizeReached) {}
long long sizeBytes;
long long numObjects;
+ bool maxSizeReached;
};
typedef stdx::