summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSilvia Surroca <silvia.surroca@mongodb.com>2022-11-25 14:23:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-25 14:58:37 +0000
commita71e6f82149e19001ac75af9cb8bcbd3b35219ac (patch)
treecfa24f0e565b8005b6499b2d1969e1ad97e2b3cb
parent3568ba71bb3d57ce5176b44206ad66b05d29e407 (diff)
downloadmongo-a71e6f82149e19001ac75af9cb8bcbd3b35219ac.tar.gz
SERVER-70973 Balancer should stop iterating collections when there are no more available shards
-rw-r--r--src/mongo/db/s/balancer/balancer.cpp19
-rw-r--r--src/mongo/db/s/balancer/balancer_chunk_selection_policy.h5
-rw-r--r--src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp39
-rw-r--r--src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h6
-rw-r--r--src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp29
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy.h4
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp31
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h4
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp53
-rw-r--r--src/mongo/db/s/balancer/balancer_policy.cpp76
-rw-r--r--src/mongo/db/s/balancer/balancer_policy.h20
-rw-r--r--src/mongo/db/s/balancer/balancer_policy_test.cpp33
12 files changed, 211 insertions, 108 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp
index fdb36b34bdf..7432d038afc 100644
--- a/src/mongo/db/s/balancer/balancer.cpp
+++ b/src/mongo/db/s/balancer/balancer.cpp
@@ -775,13 +775,24 @@ void Balancer::_mainThread() {
LOGV2_DEBUG(21861, 1, "Done enforcing tag range boundaries.");
}
- stdx::unordered_set<ShardId> usedShards;
+ const std::vector<ClusterStatistics::ShardStatistics> shardStats =
+ uassertStatusOK(_clusterStats->getStats(opCtx.get()));
+
+ stdx::unordered_set<ShardId> availableShards;
+ std::transform(
+ shardStats.begin(),
+ shardStats.end(),
+ std::inserter(availableShards, availableShards.end()),
+ [](const ClusterStatistics::ShardStatistics& shardStatistics) -> ShardId {
+ return shardStatistics.shardId;
+ });
const auto chunksToDefragment =
- _defragmentationPolicy->selectChunksToMove(opCtx.get(), &usedShards);
+ _defragmentationPolicy->selectChunksToMove(opCtx.get(), &availableShards);
- const auto chunksToRebalance = uassertStatusOK(
- _chunkSelectionPolicy->selectChunksToMove(opCtx.get(), &usedShards));
+ const auto chunksToRebalance =
+ uassertStatusOK(_chunkSelectionPolicy->selectChunksToMove(
+ opCtx.get(), shardStats, &availableShards));
if (chunksToRebalance.empty() && chunksToDefragment.empty()) {
LOGV2_DEBUG(21862, 1, "No need to move any chunk");
diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy.h b/src/mongo/db/s/balancer/balancer_chunk_selection_policy.h
index 5aed229c202..0c00c3c5730 100644
--- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy.h
+++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy.h
@@ -74,7 +74,10 @@ public:
* Potentially blocking method, which gives out a set of chunks to be moved.
*/
virtual StatusWith<MigrateInfoVector> selectChunksToMove(
- OperationContext* opCtx, stdx::unordered_set<ShardId>* unavailableShards) = 0;
+ OperationContext* opCtx,
+ const std::vector<ClusterStatistics::ShardStatistics>& shardStats,
+ stdx::unordered_set<ShardId>* availableShards) = 0;
+
/**
* Given a valid namespace returns all the Migrations the balancer would need to perform
diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp
index b659001db31..bd1c1ddd763 100644
--- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp
@@ -403,15 +403,11 @@ StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToSpli
}
StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMove(
- OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) {
- auto shardStatsStatus = _clusterStats->getStats(opCtx);
- if (!shardStatsStatus.isOK()) {
- return shardStatsStatus.getStatus();
- }
-
- const auto& shardStats = shardStatsStatus.getValue();
+ OperationContext* opCtx,
+ const std::vector<ClusterStatistics::ShardStatistics>& shardStats,
+ stdx::unordered_set<ShardId>* availableShards) {
- if (shardStats.size() < 2) {
+ if (availableShards->size() < 2) {
return MigrateInfoVector{};
}
@@ -428,6 +424,11 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo
std::vector<CollectionType> collBatch;
for (auto collIt = collections.begin(); collIt != collections.end();) {
+
+ if (availableShards->size() < 2) {
+ break;
+ }
+
const auto& coll = *(collIt++);
if (!coll.getAllowBalance() || !coll.getAllowMigrations() || !coll.getPermitMigrations() ||
coll.getDefragmentCollection()) {
@@ -456,6 +457,11 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo
}
for (const auto& collFromBatch : collBatch) {
+
+ if (availableShards->size() < 2) {
+ break;
+ }
+
const auto& nss = collFromBatch.getNss();
boost::optional<CollectionDataSizeInfoForBalancing> optDataSizeInfo;
@@ -464,7 +470,7 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo
}
auto candidatesStatus = _getMigrateCandidatesForCollection(
- opCtx, nss, shardStats, optDataSizeInfo, usedShards);
+ opCtx, nss, shardStats, optDataSizeInfo, availableShards);
if (candidatesStatus == ErrorCodes::NamespaceNotFound) {
// Namespace got dropped before we managed to get to it, so just skip it
continue;
@@ -501,7 +507,14 @@ StatusWith<MigrateInfosWithReason> BalancerChunkSelectionPolicyImpl::selectChunk
// doesn't.
Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss);
- stdx::unordered_set<ShardId> usedShards;
+ stdx::unordered_set<ShardId> availableShards;
+ std::transform(shardStats.begin(),
+ shardStats.end(),
+ std::inserter(availableShards, availableShards.end()),
+ [](const ClusterStatistics::ShardStatistics& shardStatistics) -> ShardId {
+ return shardStatistics.shardId;
+ });
+
boost::optional<CollectionDataSizeInfoForBalancing> optCollDataSizeInfo;
if (feature_flags::gBalanceAccordingToDataSize.isEnabled(
@@ -510,7 +523,7 @@ StatusWith<MigrateInfosWithReason> BalancerChunkSelectionPolicyImpl::selectChunk
}
auto candidatesStatus = _getMigrateCandidatesForCollection(
- opCtx, nss, shardStats, optCollDataSizeInfo, &usedShards);
+ opCtx, nss, shardStats, optCollDataSizeInfo, &availableShards);
if (!candidatesStatus.isOK()) {
return candidatesStatus.getStatus();
}
@@ -634,7 +647,7 @@ BalancerChunkSelectionPolicyImpl::_getMigrateCandidatesForCollection(
const NamespaceString& nss,
const ShardStatisticsVector& shardStats,
const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo,
- stdx::unordered_set<ShardId>* usedShards) {
+ stdx::unordered_set<ShardId>* availableShards) {
auto routingInfoStatus =
Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss);
if (!routingInfoStatus.isOK()) {
@@ -691,7 +704,7 @@ BalancerChunkSelectionPolicyImpl::_getMigrateCandidatesForCollection(
shardStats,
distribution,
collDataSizeInfo,
- usedShards,
+ availableShards,
Grid::get(opCtx)->getBalancerConfiguration()->attemptToBalanceJumboChunks());
}
diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h
index adb3314aa12..41ef536c2d4 100644
--- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h
+++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h
@@ -47,7 +47,9 @@ public:
const NamespaceString& ns) override;
StatusWith<MigrateInfoVector> selectChunksToMove(
- OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) override;
+ OperationContext* opCtx,
+ const std::vector<ClusterStatistics::ShardStatistics>& shardStats,
+ stdx::unordered_set<ShardId>* availableShards) override;
StatusWith<MigrateInfosWithReason> selectChunksToMove(OperationContext* opCtx,
const NamespaceString& ns) override;
@@ -78,7 +80,7 @@ private:
const NamespaceString& nss,
const ShardStatisticsVector& shardStats,
const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo,
- stdx::unordered_set<ShardId>* usedShards);
+ stdx::unordered_set<ShardId>* availableShards);
// Source for obtaining cluster statistics. Not owned and must not be destroyed before the
// policy object is destroyed.
diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp
index 8e19a6087e6..93b5af5ae7a 100644
--- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp
@@ -120,6 +120,18 @@ protected:
std::unique_ptr<BalancerChunkSelectionPolicy> _chunkSelectionPolicy;
};
+stdx::unordered_set<ShardId> getAllShardIds(
+ const std::vector<ClusterStatistics::ShardStatistics>& shardStats) {
+ stdx::unordered_set<ShardId> shards;
+ std::transform(shardStats.begin(),
+ shardStats.end(),
+ std::inserter(shards, shards.end()),
+ [](const ClusterStatistics::ShardStatistics& shardStaticstics) -> ShardId {
+ return shardStaticstics.shardId;
+ });
+ return shards;
+}
+
TEST_F(BalancerChunkSelectionTest, TagRangesOverlap) {
// Set up two shards in the metadata.
ASSERT_OK(catalogClient()->insertConfigDocument(
@@ -212,9 +224,11 @@ TEST_F(BalancerChunkSelectionTest, TagRangeMaxNotAlignedWithChunkMax) {
shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
shardTargeterMock(opCtx.get(), kShardId1)->setFindHostReturnValue(kShardHost1);
- stdx::unordered_set<ShardId> usedShards;
- auto candidateChunksStatus =
- _chunkSelectionPolicy.get()->selectChunksToMove(opCtx.get(), &usedShards);
+ std::vector<ClusterStatistics::ShardStatistics> shardStats =
+ uassertStatusOK(_clusterStats.get()->getStats(opCtx.get()));
+ auto availableShards = getAllShardIds(shardStats);
+ auto candidateChunksStatus = _chunkSelectionPolicy.get()->selectChunksToMove(
+ opCtx.get(), shardStats, &availableShards);
ASSERT_OK(candidateChunksStatus.getStatus());
// The balancer does not bubble up the IllegalOperation error, but it is expected
@@ -321,9 +335,12 @@ TEST_F(BalancerChunkSelectionTest, ShardedTimeseriesCollectionsCanBeBalanced) {
shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
shardTargeterMock(opCtx.get(), kShardId1)->setFindHostReturnValue(kShardHost1);
- stdx::unordered_set<ShardId> usedShards;
- auto candidateChunksStatus =
- _chunkSelectionPolicy.get()->selectChunksToMove(opCtx.get(), &usedShards);
+ std::vector<ClusterStatistics::ShardStatistics> shardStats =
+ uassertStatusOK(_clusterStats.get()->getStats(opCtx.get()));
+ auto availableShards = getAllShardIds(shardStats);
+
+ auto candidateChunksStatus = _chunkSelectionPolicy.get()->selectChunksToMove(
+ opCtx.get(), shardStats, &availableShards);
ASSERT_OK(candidateChunksStatus.getStatus());
ASSERT_EQUALS(1, candidateChunksStatus.getValue().size());
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy.h
index 3358c6c79df..bfd81c9250f 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy.h
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy.h
@@ -76,11 +76,11 @@ public:
/**
* Pulls the next batch of actionable chunk migration requests, given the current internal state
- * and the passed in list of unavaible shards.
+ * and the passed in list of available shards.
* Every chunk migration request is then expected to be acknowledged by the balancer by issuing
* a call to applyActionResult() (declared in ActionsStreamPolicy)
*/
virtual MigrateInfoVector selectChunksToMove(OperationContext* opCtx,
- stdx::unordered_set<ShardId>* usedShards) = 0;
+ stdx::unordered_set<ShardId>* availableShards) = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
index 0f1063d6d8c..952448cd219 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
@@ -245,7 +245,7 @@ public:
}
boost::optional<MigrateInfo> popNextMigration(
- OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) override {
+ OperationContext* opCtx, stdx::unordered_set<ShardId>* availableShards) override {
return boost::none;
}
@@ -422,9 +422,9 @@ public:
}
boost::optional<MigrateInfo> popNextMigration(
- OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) override {
+ OperationContext* opCtx, stdx::unordered_set<ShardId>* availableShards) override {
for (const auto& shardId : _shardProcessingOrder) {
- if (usedShards->count(shardId) != 0) {
+ if (availableShards->count(shardId) == 0) {
// the shard is already busy in a migration
continue;
}
@@ -432,7 +432,7 @@ public:
ChunkRangeInfoIterator nextSmallChunk;
std::list<ChunkRangeInfoIterator> candidateSiblings;
if (!_findNextSmallChunkInShard(
- shardId, *usedShards, &nextSmallChunk, &candidateSiblings)) {
+ shardId, *availableShards, &nextSmallChunk, &candidateSiblings)) {
// there isn't a chunk in this shard that can currently be moved and merged with one
// of its siblings.
continue;
@@ -455,8 +455,8 @@ public:
// ... then build up the migration request, marking the needed resources as busy.
nextSmallChunk->busyInOperation = true;
targetSibling->busyInOperation = true;
- usedShards->insert(nextSmallChunk->shard);
- usedShards->insert(targetSibling->shard);
+ availableShards->erase(nextSmallChunk->shard);
+ availableShards->erase(targetSibling->shard);
auto smallChunkVersion = getShardVersion(opCtx, nextSmallChunk->shard, _nss);
_outstandingMigrations.emplace_back(nextSmallChunk, targetSibling);
return _outstandingMigrations.back().asMigrateInfo(_uuid, _nss, smallChunkVersion);
@@ -880,7 +880,7 @@ private:
// Returns true on success (storing the related info in nextSmallChunk + smallChunkSiblings),
// false otherwise.
bool _findNextSmallChunkInShard(const ShardId& shard,
- const stdx::unordered_set<ShardId>& usedShards,
+ const stdx::unordered_set<ShardId>& availableShards,
ChunkRangeInfoIterator* nextSmallChunk,
std::list<ChunkRangeInfoIterator>* smallChunkSiblings) {
auto matchingShardInfo = _smallChunksByShard.find(shard);
@@ -906,7 +906,7 @@ private:
size_t siblingsDiscardedDueToRangeDeletion = 0;
for (const auto& sibling : candidateSiblings) {
- if (sibling->busyInOperation || usedShards.count(sibling->shard)) {
+ if (sibling->busyInOperation || !availableShards.count(sibling->shard)) {
continue;
}
if ((*candidateIt)->shardsToAvoid.count(sibling->shard)) {
@@ -1080,7 +1080,7 @@ public:
}
boost::optional<MigrateInfo> popNextMigration(
- OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) override {
+ OperationContext* opCtx, stdx::unordered_set<ShardId>* availableShards) override {
return boost::none;
}
@@ -1265,7 +1265,7 @@ public:
}
boost::optional<MigrateInfo> popNextMigration(
- OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) override {
+ OperationContext* opCtx, stdx::unordered_set<ShardId>* availableShards) override {
return boost::none;
}
@@ -1443,7 +1443,8 @@ BSONObj BalancerDefragmentationPolicyImpl::reportProgressOn(const UUID& uuid) {
}
MigrateInfoVector BalancerDefragmentationPolicyImpl::selectChunksToMove(
- OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) {
+ OperationContext* opCtx, stdx::unordered_set<ShardId>* availableShards) {
+
MigrateInfoVector chunksToMove;
{
stdx::lock_guard<Latch> lk(_stateMutex);
@@ -1470,6 +1471,10 @@ MigrateInfoVector BalancerDefragmentationPolicyImpl::selectChunksToMove(
for (auto it = collectionUUIDs.begin(); it != collectionUUIDs.end();) {
const auto& collUUID = *it;
+ if (availableShards->size() == 0) {
+ return chunksToMove;
+ }
+
try {
auto defragStateIt = _defragmentationStates.find(collUUID);
if (defragStateIt == _defragmentationStates.end()) {
@@ -1484,7 +1489,7 @@ MigrateInfoVector BalancerDefragmentationPolicyImpl::selectChunksToMove(
continue;
}
auto actionableMigration =
- collDefragmentationPhase->popNextMigration(opCtx, usedShards);
+ collDefragmentationPhase->popNextMigration(opCtx, availableShards);
if (!actionableMigration.has_value()) {
it = popCollectionUUID(it);
continue;
@@ -1505,7 +1510,7 @@ MigrateInfoVector BalancerDefragmentationPolicyImpl::selectChunksToMove(
}
}
- if (chunksToMove.empty() && usedShards->empty()) {
+ if (chunksToMove.empty()) {
// If the policy cannot produce new migrations even in absence of temporary constraints, it
// is possible that some streaming actions must be processed first. Notify an update of the
// internal state to make it happen.
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
index bc41346ca7f..c5174d9944b 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
@@ -53,7 +53,7 @@ public:
OperationContext* opCtx) = 0;
virtual boost::optional<MigrateInfo> popNextMigration(
- OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) = 0;
+ OperationContext* opCtx, stdx::unordered_set<ShardId>* availableShards) = 0;
virtual void applyActionResult(OperationContext* opCtx,
const DefragmentationAction& action,
@@ -85,7 +85,7 @@ public:
virtual BSONObj reportProgressOn(const UUID& uuid) override;
MigrateInfoVector selectChunksToMove(OperationContext* opCtx,
- stdx::unordered_set<ShardId>* usedShards) override;
+ stdx::unordered_set<ShardId>* availableShards) override;
StringData getName() const override;
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
index 88bda45c70c..c5b71710c24 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp
@@ -177,6 +177,19 @@ protected:
ASSERT_FALSE(configDoc.hasField(CollectionType::kDefragmentationPhaseFieldName));
}
};
+
+ stdx::unordered_set<ShardId> getAllShardIds(OperationContext* opCtx) {
+ std::vector<ShardStatistics> shardStats = _clusterStats.getStats(opCtx).getValue();
+ stdx::unordered_set<ShardId> shards;
+ std::transform(shardStats.begin(),
+ shardStats.end(),
+ std::inserter(shards, shards.end()),
+ [](const ClusterStatistics::ShardStatistics& shardStatistics) -> ShardId {
+ return shardStatistics.shardId;
+ });
+
+ return shards;
+ }
};
TEST_F(BalancerDefragmentationPolicyTest, TestGetNextActionIsNotReadyWhenNotDefragmenting) {
@@ -236,9 +249,9 @@ TEST_F(BalancerDefragmentationPolicyTest,
// kMoveAndMergeChunks has no stream actions/migrations to offer, but the condition has to be
// verified through a sequence of two action requests (the first being selectChunksToMove()) for
// the phase to complete.
- stdx::unordered_set<ShardId> usedShards;
+ auto availableShards = getAllShardIds(operationContext());
auto pendingMigrations =
- _defragmentationPolicy.selectChunksToMove(operationContext(), &usedShards);
+ _defragmentationPolicy.selectChunksToMove(operationContext(), &availableShards);
ASSERT_TRUE(pendingMigrations.empty());
verifyExpectedDefragmentationPhaseOndisk(DefragmentationPhaseEnum::kMoveAndMergeChunks);
@@ -607,9 +620,9 @@ TEST_F(BalancerDefragmentationPolicyTest, TestPhaseTwoMissingDataSizeRestartsPha
ASSERT_TRUE(_defragmentationPolicy.isDefragmentingCollection(coll.getUuid()));
verifyExpectedDefragmentationPhaseOndisk(DefragmentationPhaseEnum::kMergeAndMeasureChunks);
// There should be a datasize entry and no migrations
- stdx::unordered_set<ShardId> usedShards;
+ auto availableShards = getAllShardIds(operationContext());
auto pendingMigrations =
- _defragmentationPolicy.selectChunksToMove(operationContext(), &usedShards);
+ _defragmentationPolicy.selectChunksToMove(operationContext(), &availableShards);
ASSERT_EQ(0, pendingMigrations.size());
auto nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext());
ASSERT_TRUE(nextAction.is_initialized());
@@ -639,11 +652,16 @@ TEST_F(BalancerDefragmentationPolicyTest, TestPhaseTwoChunkCanBeMovedAndMergedWi
_clusterStats.setStats(std::move(clusterStats), std::move(collectionStats));
_defragmentationPolicy.startCollectionDefragmentation(operationContext(), coll);
ASSERT_TRUE(_defragmentationPolicy.isDefragmentingCollection(coll.getUuid()));
- stdx::unordered_set<ShardId> usedShards;
+
+
+ auto availableShards = getAllShardIds(operationContext());
+ auto numOfShards = availableShards.size();
auto pendingMigrations =
- _defragmentationPolicy.selectChunksToMove(operationContext(), &usedShards);
+ _defragmentationPolicy.selectChunksToMove(operationContext(), &availableShards);
+ auto numOfUsedShards = numOfShards - availableShards.size();
ASSERT_EQ(1, pendingMigrations.size());
- ASSERT_EQ(2, usedShards.size());
+ ASSERT_EQ(2, numOfUsedShards);
+
auto moveAction = pendingMigrations.back();
// The chunk belonging to the "fullest" shard is expected to be moved - even though it is bigger
// than its sibling.
@@ -658,10 +676,14 @@ TEST_F(BalancerDefragmentationPolicyTest, TestPhaseTwoChunkCanBeMovedAndMergedWi
_defragmentationPolicy.applyActionResult(operationContext(), moveAction, Status::OK());
nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext());
ASSERT_TRUE(nextAction.is_initialized());
- usedShards.clear();
- pendingMigrations = _defragmentationPolicy.selectChunksToMove(operationContext(), &usedShards);
+
+ availableShards = getAllShardIds(operationContext());
+ numOfShards = availableShards.size();
+ pendingMigrations =
+ _defragmentationPolicy.selectChunksToMove(operationContext(), &availableShards);
+ numOfUsedShards = numOfShards - availableShards.size();
ASSERT_TRUE(pendingMigrations.empty());
- ASSERT_EQ(0, usedShards.size());
+ ASSERT_EQ(0, numOfUsedShards);
auto mergeAction = stdx::get<MergeInfo>(*nextAction);
ASSERT_EQ(smallestChunk.getShard(), mergeAction.shardId);
@@ -671,7 +693,8 @@ TEST_F(BalancerDefragmentationPolicyTest, TestPhaseTwoChunkCanBeMovedAndMergedWi
_defragmentationPolicy.applyActionResult(operationContext(), mergeAction, Status::OK());
nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext());
ASSERT_TRUE(nextAction == boost::none);
- pendingMigrations = _defragmentationPolicy.selectChunksToMove(operationContext(), &usedShards);
+ pendingMigrations =
+ _defragmentationPolicy.selectChunksToMove(operationContext(), &availableShards);
ASSERT_TRUE(pendingMigrations.empty());
}
@@ -734,10 +757,12 @@ TEST_F(BalancerDefragmentationPolicyTest,
// Two move operation should be returned within a single invocation, using all the possible
// shards
- stdx::unordered_set<ShardId> usedShards;
+ auto availableShards = getAllShardIds(operationContext());
+ auto numOfShards = availableShards.size();
auto pendingMigrations =
- _defragmentationPolicy.selectChunksToMove(operationContext(), &usedShards);
- ASSERT_EQ(4, usedShards.size());
+ _defragmentationPolicy.selectChunksToMove(operationContext(), &availableShards);
+ auto numOfUsedShards = numOfShards - availableShards.size();
+ ASSERT_EQ(4, numOfUsedShards);
ASSERT_EQ(2, pendingMigrations.size());
}
diff --git a/src/mongo/db/s/balancer/balancer_policy.cpp b/src/mongo/db/s/balancer/balancer_policy.cpp
index 14f3871a9b6..59a1b5b1197 100644
--- a/src/mongo/db/s/balancer/balancer_policy.cpp
+++ b/src/mongo/db/s/balancer/balancer_policy.cpp
@@ -287,14 +287,14 @@ std::tuple<ShardId, int64_t> BalancerPolicy::_getLeastLoadedReceiverShard(
const DistributionStatus& distribution,
const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo,
const string& tag,
- const stdx::unordered_set<ShardId>& excludedShards) {
+ const stdx::unordered_set<ShardId>& availableShards) {
ShardId best;
int64_t currentMin = numeric_limits<int64_t>::max();
const auto shouldBalanceAccordingToDataSize = collDataSizeInfo.has_value();
for (const auto& stat : shardStats) {
- if (excludedShards.count(stat.shardId))
+ if (!availableShards.count(stat.shardId))
continue;
auto status = isShardSuitableReceiver(stat, tag);
@@ -331,14 +331,14 @@ std::tuple<ShardId, int64_t> BalancerPolicy::_getMostOverloadedShard(
const DistributionStatus& distribution,
const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo,
const string& chunkTag,
- const stdx::unordered_set<ShardId>& excludedShards) {
+ const stdx::unordered_set<ShardId>& availableShards) {
ShardId worst;
long long currentMax = numeric_limits<long long>::min();
const auto shouldBalanceAccordingToDataSize = collDataSizeInfo.has_value();
for (const auto& stat : shardStats) {
- if (excludedShards.count(stat.shardId))
+ if (!availableShards.count(stat.shardId))
continue;
if (shouldBalanceAccordingToDataSize) {
@@ -435,7 +435,7 @@ MigrateInfosWithReason BalancerPolicy::balance(
const ShardStatisticsVector& shardStats,
const DistributionStatus& distribution,
const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo,
- stdx::unordered_set<ShardId>* usedShards,
+ stdx::unordered_set<ShardId>* availableShards,
bool forceJumbo) {
vector<MigrateInfo> migrations;
MigrationReason firstReason = MigrationReason::none;
@@ -459,7 +459,7 @@ MigrateInfosWithReason BalancerPolicy::balance(
if (!stat.isDraining)
continue;
- if (usedShards->count(stat.shardId))
+ if (!availableShards->count(stat.shardId))
continue;
const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId);
@@ -481,7 +481,7 @@ MigrateInfosWithReason BalancerPolicy::balance(
const string tag = distribution.getTagForChunk(chunk);
const auto [to, _] = _getLeastLoadedReceiverShard(
- shardStats, distribution, collDataSizeInfo, tag, *usedShards);
+ shardStats, distribution, collDataSizeInfo, tag, *availableShards);
if (!to.isValid()) {
if (migrations.empty()) {
LOGV2_WARNING(21889,
@@ -500,12 +500,13 @@ MigrateInfosWithReason BalancerPolicy::balance(
if (firstReason == MigrationReason::none) {
firstReason = MigrationReason::drain;
}
- invariant(usedShards->insert(stat.shardId).second);
- invariant(usedShards->insert(to).second);
+ invariant(availableShards->erase(stat.shardId));
+ invariant(availableShards->erase(to));
break;
}
if (migrations.empty()) {
+ availableShards->erase(stat.shardId);
LOGV2_WARNING(21890,
"Unable to find any chunk to move from draining shard "
"{shardId}. numJumboChunks: {numJumboChunks}",
@@ -513,13 +514,17 @@ MigrateInfosWithReason BalancerPolicy::balance(
"shardId"_attr = stat.shardId,
"numJumboChunks"_attr = numJumboChunks);
}
+
+ if (availableShards->size() < 2) {
+ return std::make_pair(std::move(migrations), firstReason);
+ }
}
}
// 2) Check for chunks, which are on the wrong shard and must be moved off of it
if (!distribution.tags().empty()) {
for (const auto& stat : shardStats) {
- if (usedShards->count(stat.shardId))
+ if (!availableShards->count(stat.shardId))
continue;
const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId);
@@ -545,7 +550,7 @@ MigrateInfosWithReason BalancerPolicy::balance(
}
const auto [to, _] = _getLeastLoadedReceiverShard(
- shardStats, distribution, collDataSizeInfo, tag, *usedShards);
+ shardStats, distribution, collDataSizeInfo, tag, *availableShards);
if (!to.isValid()) {
if (migrations.empty()) {
LOGV2_WARNING(21892,
@@ -567,10 +572,14 @@ MigrateInfosWithReason BalancerPolicy::balance(
if (firstReason == MigrationReason::none) {
firstReason = MigrationReason::zoneViolation;
}
- invariant(usedShards->insert(stat.shardId).second);
- invariant(usedShards->insert(to).second);
+ invariant(availableShards->erase(stat.shardId));
+ invariant(availableShards->erase(to));
break;
}
+
+ if (availableShards->size() < 2) {
+ return std::make_pair(std::move(migrations), firstReason);
+ }
}
}
@@ -614,7 +623,7 @@ MigrateInfosWithReason BalancerPolicy::balance(
*collDataSizeInfo,
tag,
&migrations,
- usedShards,
+ availableShards,
forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer
: MoveChunkRequest::ForceJumbo::kDoNotForce);
}
@@ -625,7 +634,7 @@ MigrateInfosWithReason BalancerPolicy::balance(
tag,
totalNumberOfShardsWithTag,
&migrations,
- usedShards,
+ availableShards,
forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer
: MoveChunkRequest::ForceJumbo::kDoNotForce);
};
@@ -646,11 +655,16 @@ boost::optional<MigrateInfo> BalancerPolicy::balanceSingleChunk(
const DistributionStatus& distribution) {
const string tag = distribution.getTagForChunk(chunk);
- const auto [newShardId, _] = _getLeastLoadedReceiverShard(shardStats,
- distribution,
- boost::none /* collDataSizeInfo */,
- tag,
- stdx::unordered_set<ShardId>());
+ stdx::unordered_set<ShardId> availableShards;
+ std::transform(shardStats.begin(),
+ shardStats.end(),
+ std::inserter(availableShards, availableShards.end()),
+ [](const ClusterStatistics::ShardStatistics& shardStatistics) -> ShardId {
+ return shardStatistics.shardId;
+ });
+
+ const auto [newShardId, _] = _getLeastLoadedReceiverShard(
+ shardStats, distribution, boost::none /* collDataSizeInfo */, tag, availableShards);
if (!newShardId.isValid() || newShardId == chunk.getShard()) {
return boost::optional<MigrateInfo>();
}
@@ -664,7 +678,7 @@ bool BalancerPolicy::_singleZoneBalanceBasedOnChunks(const ShardStatisticsVector
const string& tag,
size_t totalNumberOfShardsWithTag,
vector<MigrateInfo>* migrations,
- stdx::unordered_set<ShardId>* usedShards,
+ stdx::unordered_set<ShardId>* availableShards,
MoveChunkRequest::ForceJumbo forceJumbo) {
// Calculate the rounded optimal number of chunks per shard
const size_t totalNumberOfChunksWithTag =
@@ -673,7 +687,7 @@ bool BalancerPolicy::_singleZoneBalanceBasedOnChunks(const ShardStatisticsVector
(size_t)std::roundf(totalNumberOfChunksWithTag / (float)totalNumberOfShardsWithTag);
const auto [from, fromSize] =
- _getMostOverloadedShard(shardStats, distribution, boost::none, tag, *usedShards);
+ _getMostOverloadedShard(shardStats, distribution, boost::none, tag, *availableShards);
if (!from.isValid())
return false;
@@ -684,7 +698,7 @@ bool BalancerPolicy::_singleZoneBalanceBasedOnChunks(const ShardStatisticsVector
return false;
const auto [to, toSize] =
- _getLeastLoadedReceiverShard(shardStats, distribution, boost::none, tag, *usedShards);
+ _getLeastLoadedReceiverShard(shardStats, distribution, boost::none, tag, *availableShards);
if (!to.isValid()) {
if (migrations->empty()) {
LOGV2(21882, "No available shards to take chunks for zone", "zone"_attr = tag);
@@ -734,8 +748,8 @@ bool BalancerPolicy::_singleZoneBalanceBasedOnChunks(const ShardStatisticsVector
}
migrations->emplace_back(to, distribution.nss(), chunk, forceJumbo);
- invariant(usedShards->insert(chunk.getShard()).second);
- invariant(usedShards->insert(to).second);
+ invariant(availableShards->erase(chunk.getShard()));
+ invariant(availableShards->erase(to));
return true;
}
@@ -760,15 +774,15 @@ bool BalancerPolicy::_singleZoneBalanceBasedOnDataSize(
const CollectionDataSizeInfoForBalancing& collDataSizeInfo,
const string& tag,
vector<MigrateInfo>* migrations,
- stdx::unordered_set<ShardId>* usedShards,
+ stdx::unordered_set<ShardId>* availableShards,
MoveChunkRequest::ForceJumbo forceJumbo) {
const auto [from, fromSize] =
- _getMostOverloadedShard(shardStats, distribution, collDataSizeInfo, tag, *usedShards);
+ _getMostOverloadedShard(shardStats, distribution, collDataSizeInfo, tag, *availableShards);
if (!from.isValid())
return false;
- const auto [to, toSize] =
- _getLeastLoadedReceiverShard(shardStats, distribution, collDataSizeInfo, tag, *usedShards);
+ const auto [to, toSize] = _getLeastLoadedReceiverShard(
+ shardStats, distribution, collDataSizeInfo, tag, *availableShards);
if (!to.isValid()) {
if (migrations->empty()) {
LOGV2(6581600, "No available shards to take chunks for zone", "zone"_attr = tag);
@@ -818,8 +832,8 @@ bool BalancerPolicy::_singleZoneBalanceBasedOnDataSize(
chunk.getVersion(),
forceJumbo,
collDataSizeInfo.maxChunkSizeBytes);
- invariant(usedShards->insert(chunk.getShard()).second);
- invariant(usedShards->insert(to).second);
+ invariant(availableShards->erase(chunk.getShard()));
+ invariant(availableShards->erase(to));
return true;
}
diff --git a/src/mongo/db/s/balancer/balancer_policy.h b/src/mongo/db/s/balancer/balancer_policy.h
index 0a047098615..af9377999a6 100644
--- a/src/mongo/db/s/balancer/balancer_policy.h
+++ b/src/mongo/db/s/balancer/balancer_policy.h
@@ -393,15 +393,15 @@ public:
* any of the shards have chunks, which are sufficiently higher than this number, suggests
* moving chunks to shards, which are under this number.
*
- * The usedShards parameter is in/out and it contains the set of shards, which have already been
- * used for migrations. Used so we don't return multiple conflicting migrations for the same
- * shard.
+ * The availableShards parameter is in/out and it contains the set of shards, which haven't
+ * been used for migrations yet. Used so we don't return multiple conflicting migrations for the
+ * same shard.
*/
static MigrateInfosWithReason balance(
const ShardStatisticsVector& shardStats,
const DistributionStatus& distribution,
const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo,
- stdx::unordered_set<ShardId>* usedShards,
+ stdx::unordered_set<ShardId>* availableShards,
bool forceJumbo);
/**
@@ -426,7 +426,7 @@ private:
const DistributionStatus& distribution,
const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo,
const std::string& tag,
- const stdx::unordered_set<ShardId>& excludedShards);
+ const stdx::unordered_set<ShardId>& availableShards);
/**
* Only considers shards with the specified tag, all shards in case the tag is empty.
@@ -442,12 +442,12 @@ private:
const DistributionStatus& distribution,
const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo,
const std::string& chunkTag,
- const stdx::unordered_set<ShardId>& excludedShards);
+ const stdx::unordered_set<ShardId>& availableShards);
/**
* Selects one chunk for the specified zone (if appropriate) to be moved in order to bring the
* deviation of the shards chunk contents closer to even across all shards in the specified
- * zone. Takes into account and updates the shards, which have already been used for migrations.
+ * zone. Takes into account and updates the shards, which haven't been used for migrations yet.
*
* Returns true if a migration was suggested, false otherwise. This method is intented to be
* called multiple times until all posible migrations for a zone have been selected.
@@ -457,13 +457,13 @@ private:
const std::string& tag,
size_t totalNumberOfShardsWithTag,
std::vector<MigrateInfo>* migrations,
- stdx::unordered_set<ShardId>* usedShards,
+ stdx::unordered_set<ShardId>* availableShards,
MoveChunkRequest::ForceJumbo forceJumbo);
/**
* Selects one range for the specified zone (if appropriate) to be moved in order to bring the
* deviation of the collection data size closer to even across all shards in the specified
- * zone. Takes into account and updates the shards, which have already been used for migrations.
+ * zone. Takes into account and updates the shards, which haven't been used for migrations yet.
*
* Returns true if a migration was suggested, false otherwise. This method is intented to be
* called multiple times until all posible migrations for a zone have been selected.
@@ -474,7 +474,7 @@ private:
const CollectionDataSizeInfoForBalancing& collDataSizeInfo,
const std::string& tag,
std::vector<MigrateInfo>* migrations,
- stdx::unordered_set<ShardId>* usedShards,
+ stdx::unordered_set<ShardId>* availableShards,
MoveChunkRequest::ForceJumbo forceJumbo);
};
diff --git a/src/mongo/db/s/balancer/balancer_policy_test.cpp b/src/mongo/db/s/balancer/balancer_policy_test.cpp
index c46a874ade4..11b6de63464 100644
--- a/src/mongo/db/s/balancer/balancer_policy_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_policy_test.cpp
@@ -111,13 +111,22 @@ std::pair<ShardStatisticsVector, ShardToChunksMap> generateCluster(
return std::make_pair(std::move(shardStats), std::move(chunkMap));
}
+stdx::unordered_set<ShardId> getAllShardIds(const ShardStatisticsVector& shardStats) {
+ stdx::unordered_set<ShardId> shards;
+ std::transform(shardStats.begin(),
+ shardStats.end(),
+ std::inserter(shards, shards.end()),
+ [](const ShardStatistics& shardStatistics) { return shardStatistics.shardId; });
+ return shards;
+}
+
MigrateInfosWithReason balanceChunks(const ShardStatisticsVector& shardStats,
const DistributionStatus& distribution,
bool shouldAggressivelyBalance,
bool forceJumbo) {
- stdx::unordered_set<ShardId> usedShards;
+ auto availableShards = getAllShardIds(shardStats);
return BalancerPolicy::balance(
- shardStats, distribution, boost::none /* collDataSizeInfo */, &usedShards, forceJumbo);
+ shardStats, distribution, boost::none /* collDataSizeInfo */, &availableShards, forceJumbo);
}
TEST(BalancerPolicy, Basic) {
@@ -265,12 +274,13 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNe
{ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
// Here kShardId0 would have been selected as a donor
- stdx::unordered_set<ShardId> usedShards{kShardId0};
+ auto availableShards = getAllShardIds(cluster.first);
+ availableShards.erase(kShardId0);
const auto [migrations, reason] =
BalancerPolicy::balance(cluster.first,
DistributionStatus(kNamespace, cluster.second),
boost::none /* collDataSizeInfo */,
- &usedShards,
+ &availableShards,
false);
ASSERT_EQ(1U, migrations.size());
@@ -289,12 +299,13 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNo
{ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
// Here kShardId0 would have been selected as a donor
- stdx::unordered_set<ShardId> usedShards{kShardId0};
+ auto availableShards = getAllShardIds(cluster.first);
+ availableShards.erase(kShardId0);
const auto [migrations, reason] =
BalancerPolicy::balance(cluster.first,
DistributionStatus(kNamespace, cluster.second),
boost::none /* collDataSizeInfo */,
- &usedShards,
+ &availableShards,
false);
ASSERT_EQ(0U, migrations.size());
}
@@ -307,12 +318,13 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseDestinationShards) {
{ShardStatistics(kShardId3, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}});
// Here kShardId2 would have been selected as a recipient
- stdx::unordered_set<ShardId> usedShards{kShardId2};
+ auto availableShards = getAllShardIds(cluster.first);
+ availableShards.erase(kShardId2);
const auto [migrations, reason] =
BalancerPolicy::balance(cluster.first,
DistributionStatus(kNamespace, cluster.second),
boost::none /* collDataSizeInfo */,
- &usedShards,
+ &availableShards,
false);
ASSERT_EQ(1U, migrations.size());
@@ -667,9 +679,10 @@ TEST(BalancerPolicy, BalancerMostOverLoadShardHasMultipleTagsSkipTagWithShardInU
ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 1), BSON("x" << 3), "b")));
ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 3), BSON("x" << 5), "c")));
- stdx::unordered_set<ShardId> usedShards{kShardId1};
+ auto availableShards = getAllShardIds(cluster.first);
+ availableShards.erase(kShardId1);
const auto [migrations, reason] = BalancerPolicy::balance(
- cluster.first, distribution, boost::none /* collDataSizeInfo */, &usedShards, false);
+ cluster.first, distribution, boost::none /* collDataSizeInfo */, &availableShards, false);
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);