diff options
author | Silvia Surroca <silvia.surroca@mongodb.com> | 2022-11-25 14:23:29 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-25 14:58:37 +0000 |
commit | a71e6f82149e19001ac75af9cb8bcbd3b35219ac (patch) | |
tree | cfa24f0e565b8005b6499b2d1969e1ad97e2b3cb | |
parent | 3568ba71bb3d57ce5176b44206ad66b05d29e407 (diff) | |
download | mongo-a71e6f82149e19001ac75af9cb8bcbd3b35219ac.tar.gz |
SERVER-70973 Balancer should stop iterating collections when there are no more available shards
12 files changed, 211 insertions, 108 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index fdb36b34bdf..7432d038afc 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -775,13 +775,24 @@ void Balancer::_mainThread() { LOGV2_DEBUG(21861, 1, "Done enforcing tag range boundaries."); } - stdx::unordered_set<ShardId> usedShards; + const std::vector<ClusterStatistics::ShardStatistics> shardStats = + uassertStatusOK(_clusterStats->getStats(opCtx.get())); + + stdx::unordered_set<ShardId> availableShards; + std::transform( + shardStats.begin(), + shardStats.end(), + std::inserter(availableShards, availableShards.end()), + [](const ClusterStatistics::ShardStatistics& shardStatistics) -> ShardId { + return shardStatistics.shardId; + }); const auto chunksToDefragment = - _defragmentationPolicy->selectChunksToMove(opCtx.get(), &usedShards); + _defragmentationPolicy->selectChunksToMove(opCtx.get(), &availableShards); - const auto chunksToRebalance = uassertStatusOK( - _chunkSelectionPolicy->selectChunksToMove(opCtx.get(), &usedShards)); + const auto chunksToRebalance = + uassertStatusOK(_chunkSelectionPolicy->selectChunksToMove( + opCtx.get(), shardStats, &availableShards)); if (chunksToRebalance.empty() && chunksToDefragment.empty()) { LOGV2_DEBUG(21862, 1, "No need to move any chunk"); diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy.h b/src/mongo/db/s/balancer/balancer_chunk_selection_policy.h index 5aed229c202..0c00c3c5730 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy.h +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy.h @@ -74,7 +74,10 @@ public: * Potentially blocking method, which gives out a set of chunks to be moved. */ virtual StatusWith<MigrateInfoVector> selectChunksToMove( - OperationContext* opCtx, stdx::unordered_set<ShardId>* unavailableShards) = 0; + OperationContext* opCtx, + const std::vector<ClusterStatistics::ShardStatistics>& shardStats, + stdx::unordered_set<ShardId>* availableShards) = 0; + /** * Given a valid namespace returns all the Migrations the balancer would need to perform diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp index b659001db31..bd1c1ddd763 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -403,15 +403,11 @@ StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToSpli } StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMove( - OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) { - auto shardStatsStatus = _clusterStats->getStats(opCtx); - if (!shardStatsStatus.isOK()) { - return shardStatsStatus.getStatus(); - } - - const auto& shardStats = shardStatsStatus.getValue(); + OperationContext* opCtx, + const std::vector<ClusterStatistics::ShardStatistics>& shardStats, + stdx::unordered_set<ShardId>* availableShards) { - if (shardStats.size() < 2) { + if (availableShards->size() < 2) { return MigrateInfoVector{}; } @@ -428,6 +424,11 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo std::vector<CollectionType> collBatch; for (auto collIt = collections.begin(); collIt != collections.end();) { + + if (availableShards->size() < 2) { + break; + } + const auto& coll = *(collIt++); if (!coll.getAllowBalance() || !coll.getAllowMigrations() || !coll.getPermitMigrations() || coll.getDefragmentCollection()) { @@ -456,6 +457,11 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo } for (const auto& collFromBatch : collBatch) { + + if (availableShards->size() < 2) { + break; + } + const auto& nss = collFromBatch.getNss(); boost::optional<CollectionDataSizeInfoForBalancing> optDataSizeInfo; @@ -464,7 +470,7 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo } auto candidatesStatus = _getMigrateCandidatesForCollection( - opCtx, nss, shardStats, optDataSizeInfo, usedShards); + opCtx, nss, shardStats, optDataSizeInfo, availableShards); if (candidatesStatus == ErrorCodes::NamespaceNotFound) { // Namespace got dropped before we managed to get to it, so just skip it continue; @@ -501,7 +507,14 @@ StatusWith<MigrateInfosWithReason> BalancerChunkSelectionPolicyImpl::selectChunk // doesn't. Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss); - stdx::unordered_set<ShardId> usedShards; + stdx::unordered_set<ShardId> availableShards; + std::transform(shardStats.begin(), + shardStats.end(), + std::inserter(availableShards, availableShards.end()), + [](const ClusterStatistics::ShardStatistics& shardStatistics) -> ShardId { + return shardStatistics.shardId; + }); + boost::optional<CollectionDataSizeInfoForBalancing> optCollDataSizeInfo; if (feature_flags::gBalanceAccordingToDataSize.isEnabled( @@ -510,7 +523,7 @@ StatusWith<MigrateInfosWithReason> BalancerChunkSelectionPolicyImpl::selectChunk } auto candidatesStatus = _getMigrateCandidatesForCollection( - opCtx, nss, shardStats, optCollDataSizeInfo, &usedShards); + opCtx, nss, shardStats, optCollDataSizeInfo, &availableShards); if (!candidatesStatus.isOK()) { return candidatesStatus.getStatus(); } @@ -634,7 +647,7 @@ BalancerChunkSelectionPolicyImpl::_getMigrateCandidatesForCollection( const NamespaceString& nss, const ShardStatisticsVector& shardStats, const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, - stdx::unordered_set<ShardId>* usedShards) { + stdx::unordered_set<ShardId>* availableShards) { auto routingInfoStatus = Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss); if (!routingInfoStatus.isOK()) { @@ -691,7 +704,7 @@ BalancerChunkSelectionPolicyImpl::_getMigrateCandidatesForCollection( shardStats, distribution, collDataSizeInfo, - usedShards, + availableShards, Grid::get(opCtx)->getBalancerConfiguration()->attemptToBalanceJumboChunks()); } diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h index adb3314aa12..41ef536c2d4 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h @@ -47,7 +47,9 @@ public: const NamespaceString& ns) override; StatusWith<MigrateInfoVector> selectChunksToMove( - OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) override; + OperationContext* opCtx, + const std::vector<ClusterStatistics::ShardStatistics>& shardStats, + stdx::unordered_set<ShardId>* availableShards) override; StatusWith<MigrateInfosWithReason> selectChunksToMove(OperationContext* opCtx, const NamespaceString& ns) override; @@ -78,7 +80,7 @@ private: const NamespaceString& nss, const ShardStatisticsVector& shardStats, const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, - stdx::unordered_set<ShardId>* usedShards); + stdx::unordered_set<ShardId>* availableShards); // Source for obtaining cluster statistics. Not owned and must not be destroyed before the // policy object is destroyed. diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp index 8e19a6087e6..93b5af5ae7a 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_test.cpp @@ -120,6 +120,18 @@ protected: std::unique_ptr<BalancerChunkSelectionPolicy> _chunkSelectionPolicy; }; +stdx::unordered_set<ShardId> getAllShardIds( + const std::vector<ClusterStatistics::ShardStatistics>& shardStats) { + stdx::unordered_set<ShardId> shards; + std::transform(shardStats.begin(), + shardStats.end(), + std::inserter(shards, shards.end()), + [](const ClusterStatistics::ShardStatistics& shardStaticstics) -> ShardId { + return shardStaticstics.shardId; + }); + return shards; +} + TEST_F(BalancerChunkSelectionTest, TagRangesOverlap) { // Set up two shards in the metadata. ASSERT_OK(catalogClient()->insertConfigDocument( @@ -212,9 +224,11 @@ TEST_F(BalancerChunkSelectionTest, TagRangeMaxNotAlignedWithChunkMax) { shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); shardTargeterMock(opCtx.get(), kShardId1)->setFindHostReturnValue(kShardHost1); - stdx::unordered_set<ShardId> usedShards; - auto candidateChunksStatus = - _chunkSelectionPolicy.get()->selectChunksToMove(opCtx.get(), &usedShards); + std::vector<ClusterStatistics::ShardStatistics> shardStats = + uassertStatusOK(_clusterStats.get()->getStats(opCtx.get())); + auto availableShards = getAllShardIds(shardStats); + auto candidateChunksStatus = _chunkSelectionPolicy.get()->selectChunksToMove( + opCtx.get(), shardStats, &availableShards); ASSERT_OK(candidateChunksStatus.getStatus()); // The balancer does not bubble up the IllegalOperation error, but it is expected @@ -321,9 +335,12 @@ TEST_F(BalancerChunkSelectionTest, ShardedTimeseriesCollectionsCanBeBalanced) { shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); shardTargeterMock(opCtx.get(), kShardId1)->setFindHostReturnValue(kShardHost1); - stdx::unordered_set<ShardId> usedShards; - auto candidateChunksStatus = - _chunkSelectionPolicy.get()->selectChunksToMove(opCtx.get(), &usedShards); + std::vector<ClusterStatistics::ShardStatistics> shardStats = + uassertStatusOK(_clusterStats.get()->getStats(opCtx.get())); + auto availableShards = getAllShardIds(shardStats); + + auto candidateChunksStatus = _chunkSelectionPolicy.get()->selectChunksToMove( + opCtx.get(), shardStats, &availableShards); ASSERT_OK(candidateChunksStatus.getStatus()); ASSERT_EQUALS(1, candidateChunksStatus.getValue().size()); diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy.h index 3358c6c79df..bfd81c9250f 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy.h +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy.h @@ -76,11 +76,11 @@ public: /** * Pulls the next batch of actionable chunk migration requests, given the current internal state - * and the passed in list of unavaible shards. + * and the passed in list of available shards. * Every chunk migration request is then expected to be acknowledged by the balancer by issuing * a call to applyActionResult() (declared in ActionsStreamPolicy) */ virtual MigrateInfoVector selectChunksToMove(OperationContext* opCtx, - stdx::unordered_set<ShardId>* usedShards) = 0; + stdx::unordered_set<ShardId>* availableShards) = 0; }; } // namespace mongo diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp index 0f1063d6d8c..952448cd219 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp @@ -245,7 +245,7 @@ public: } boost::optional<MigrateInfo> popNextMigration( - OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) override { + OperationContext* opCtx, stdx::unordered_set<ShardId>* availableShards) override { return boost::none; } @@ -422,9 +422,9 @@ public: } boost::optional<MigrateInfo> popNextMigration( - OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) override { + OperationContext* opCtx, stdx::unordered_set<ShardId>* availableShards) override { for (const auto& shardId : _shardProcessingOrder) { - if (usedShards->count(shardId) != 0) { + if (availableShards->count(shardId) == 0) { // the shard is already busy in a migration continue; } @@ -432,7 +432,7 @@ public: ChunkRangeInfoIterator nextSmallChunk; std::list<ChunkRangeInfoIterator> candidateSiblings; if (!_findNextSmallChunkInShard( - shardId, *usedShards, &nextSmallChunk, &candidateSiblings)) { + shardId, *availableShards, &nextSmallChunk, &candidateSiblings)) { // there isn't a chunk in this shard that can currently be moved and merged with one // of its siblings. continue; @@ -455,8 +455,8 @@ public: // ... then build up the migration request, marking the needed resources as busy. nextSmallChunk->busyInOperation = true; targetSibling->busyInOperation = true; - usedShards->insert(nextSmallChunk->shard); - usedShards->insert(targetSibling->shard); + availableShards->erase(nextSmallChunk->shard); + availableShards->erase(targetSibling->shard); auto smallChunkVersion = getShardVersion(opCtx, nextSmallChunk->shard, _nss); _outstandingMigrations.emplace_back(nextSmallChunk, targetSibling); return _outstandingMigrations.back().asMigrateInfo(_uuid, _nss, smallChunkVersion); @@ -880,7 +880,7 @@ private: // Returns true on success (storing the related info in nextSmallChunk + smallChunkSiblings), // false otherwise. bool _findNextSmallChunkInShard(const ShardId& shard, - const stdx::unordered_set<ShardId>& usedShards, + const stdx::unordered_set<ShardId>& availableShards, ChunkRangeInfoIterator* nextSmallChunk, std::list<ChunkRangeInfoIterator>* smallChunkSiblings) { auto matchingShardInfo = _smallChunksByShard.find(shard); @@ -906,7 +906,7 @@ private: size_t siblingsDiscardedDueToRangeDeletion = 0; for (const auto& sibling : candidateSiblings) { - if (sibling->busyInOperation || usedShards.count(sibling->shard)) { + if (sibling->busyInOperation || !availableShards.count(sibling->shard)) { continue; } if ((*candidateIt)->shardsToAvoid.count(sibling->shard)) { @@ -1080,7 +1080,7 @@ public: } boost::optional<MigrateInfo> popNextMigration( - OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) override { + OperationContext* opCtx, stdx::unordered_set<ShardId>* availableShards) override { return boost::none; } @@ -1265,7 +1265,7 @@ public: } boost::optional<MigrateInfo> popNextMigration( - OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) override { + OperationContext* opCtx, stdx::unordered_set<ShardId>* availableShards) override { return boost::none; } @@ -1443,7 +1443,8 @@ BSONObj BalancerDefragmentationPolicyImpl::reportProgressOn(const UUID& uuid) { } MigrateInfoVector BalancerDefragmentationPolicyImpl::selectChunksToMove( - OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) { + OperationContext* opCtx, stdx::unordered_set<ShardId>* availableShards) { + MigrateInfoVector chunksToMove; { stdx::lock_guard<Latch> lk(_stateMutex); @@ -1470,6 +1471,10 @@ MigrateInfoVector BalancerDefragmentationPolicyImpl::selectChunksToMove( for (auto it = collectionUUIDs.begin(); it != collectionUUIDs.end();) { const auto& collUUID = *it; + if (availableShards->size() == 0) { + return chunksToMove; + } + try { auto defragStateIt = _defragmentationStates.find(collUUID); if (defragStateIt == _defragmentationStates.end()) { @@ -1484,7 +1489,7 @@ MigrateInfoVector BalancerDefragmentationPolicyImpl::selectChunksToMove( continue; } auto actionableMigration = - collDefragmentationPhase->popNextMigration(opCtx, usedShards); + collDefragmentationPhase->popNextMigration(opCtx, availableShards); if (!actionableMigration.has_value()) { it = popCollectionUUID(it); continue; @@ -1505,7 +1510,7 @@ MigrateInfoVector BalancerDefragmentationPolicyImpl::selectChunksToMove( } } - if (chunksToMove.empty() && usedShards->empty()) { + if (chunksToMove.empty()) { // If the policy cannot produce new migrations even in absence of temporary constraints, it // is possible that some streaming actions must be processed first. Notify an update of the // internal state to make it happen. diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h index bc41346ca7f..c5174d9944b 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h @@ -53,7 +53,7 @@ public: OperationContext* opCtx) = 0; virtual boost::optional<MigrateInfo> popNextMigration( - OperationContext* opCtx, stdx::unordered_set<ShardId>* usedShards) = 0; + OperationContext* opCtx, stdx::unordered_set<ShardId>* availableShards) = 0; virtual void applyActionResult(OperationContext* opCtx, const DefragmentationAction& action, @@ -85,7 +85,7 @@ public: virtual BSONObj reportProgressOn(const UUID& uuid) override; MigrateInfoVector selectChunksToMove(OperationContext* opCtx, - stdx::unordered_set<ShardId>* usedShards) override; + stdx::unordered_set<ShardId>* availableShards) override; StringData getName() const override; diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp index 88bda45c70c..c5b71710c24 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp @@ -177,6 +177,19 @@ protected: ASSERT_FALSE(configDoc.hasField(CollectionType::kDefragmentationPhaseFieldName)); } }; + + stdx::unordered_set<ShardId> getAllShardIds(OperationContext* opCtx) { + std::vector<ShardStatistics> shardStats = _clusterStats.getStats(opCtx).getValue(); + stdx::unordered_set<ShardId> shards; + std::transform(shardStats.begin(), + shardStats.end(), + std::inserter(shards, shards.end()), + [](const ClusterStatistics::ShardStatistics& shardStatistics) -> ShardId { + return shardStatistics.shardId; + }); + + return shards; + } }; TEST_F(BalancerDefragmentationPolicyTest, TestGetNextActionIsNotReadyWhenNotDefragmenting) { @@ -236,9 +249,9 @@ TEST_F(BalancerDefragmentationPolicyTest, // kMoveAndMergeChunks has no stream actions/migrations to offer, but the condition has to be // verified through a sequence of two action requests (the first being selectChunksToMove()) for // the phase to complete. - stdx::unordered_set<ShardId> usedShards; + auto availableShards = getAllShardIds(operationContext()); auto pendingMigrations = - _defragmentationPolicy.selectChunksToMove(operationContext(), &usedShards); + _defragmentationPolicy.selectChunksToMove(operationContext(), &availableShards); ASSERT_TRUE(pendingMigrations.empty()); verifyExpectedDefragmentationPhaseOndisk(DefragmentationPhaseEnum::kMoveAndMergeChunks); @@ -607,9 +620,9 @@ TEST_F(BalancerDefragmentationPolicyTest, TestPhaseTwoMissingDataSizeRestartsPha ASSERT_TRUE(_defragmentationPolicy.isDefragmentingCollection(coll.getUuid())); verifyExpectedDefragmentationPhaseOndisk(DefragmentationPhaseEnum::kMergeAndMeasureChunks); // There should be a datasize entry and no migrations - stdx::unordered_set<ShardId> usedShards; + auto availableShards = getAllShardIds(operationContext()); auto pendingMigrations = - _defragmentationPolicy.selectChunksToMove(operationContext(), &usedShards); + _defragmentationPolicy.selectChunksToMove(operationContext(), &availableShards); ASSERT_EQ(0, pendingMigrations.size()); auto nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext()); ASSERT_TRUE(nextAction.is_initialized()); @@ -639,11 +652,16 @@ TEST_F(BalancerDefragmentationPolicyTest, TestPhaseTwoChunkCanBeMovedAndMergedWi _clusterStats.setStats(std::move(clusterStats), std::move(collectionStats)); _defragmentationPolicy.startCollectionDefragmentation(operationContext(), coll); ASSERT_TRUE(_defragmentationPolicy.isDefragmentingCollection(coll.getUuid())); - stdx::unordered_set<ShardId> usedShards; + + + auto availableShards = getAllShardIds(operationContext()); + auto numOfShards = availableShards.size(); auto pendingMigrations = - _defragmentationPolicy.selectChunksToMove(operationContext(), &usedShards); + _defragmentationPolicy.selectChunksToMove(operationContext(), &availableShards); + auto numOfUsedShards = numOfShards - availableShards.size(); ASSERT_EQ(1, pendingMigrations.size()); - ASSERT_EQ(2, usedShards.size()); + ASSERT_EQ(2, numOfUsedShards); + auto moveAction = pendingMigrations.back(); // The chunk belonging to the "fullest" shard is expected to be moved - even though it is bigger // than its sibling. @@ -658,10 +676,14 @@ TEST_F(BalancerDefragmentationPolicyTest, TestPhaseTwoChunkCanBeMovedAndMergedWi _defragmentationPolicy.applyActionResult(operationContext(), moveAction, Status::OK()); nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext()); ASSERT_TRUE(nextAction.is_initialized()); - usedShards.clear(); - pendingMigrations = _defragmentationPolicy.selectChunksToMove(operationContext(), &usedShards); + + availableShards = getAllShardIds(operationContext()); + numOfShards = availableShards.size(); + pendingMigrations = + _defragmentationPolicy.selectChunksToMove(operationContext(), &availableShards); + numOfUsedShards = numOfShards - availableShards.size(); ASSERT_TRUE(pendingMigrations.empty()); - ASSERT_EQ(0, usedShards.size()); + ASSERT_EQ(0, numOfUsedShards); auto mergeAction = stdx::get<MergeInfo>(*nextAction); ASSERT_EQ(smallestChunk.getShard(), mergeAction.shardId); @@ -671,7 +693,8 @@ TEST_F(BalancerDefragmentationPolicyTest, TestPhaseTwoChunkCanBeMovedAndMergedWi _defragmentationPolicy.applyActionResult(operationContext(), mergeAction, Status::OK()); nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext()); ASSERT_TRUE(nextAction == boost::none); - pendingMigrations = _defragmentationPolicy.selectChunksToMove(operationContext(), &usedShards); + pendingMigrations = + _defragmentationPolicy.selectChunksToMove(operationContext(), &availableShards); ASSERT_TRUE(pendingMigrations.empty()); } @@ -734,10 +757,12 @@ TEST_F(BalancerDefragmentationPolicyTest, // Two move operation should be returned within a single invocation, using all the possible // shards - stdx::unordered_set<ShardId> usedShards; + auto availableShards = getAllShardIds(operationContext()); + auto numOfShards = availableShards.size(); auto pendingMigrations = - _defragmentationPolicy.selectChunksToMove(operationContext(), &usedShards); - ASSERT_EQ(4, usedShards.size()); + _defragmentationPolicy.selectChunksToMove(operationContext(), &availableShards); + auto numOfUsedShards = numOfShards - availableShards.size(); + ASSERT_EQ(4, numOfUsedShards); ASSERT_EQ(2, pendingMigrations.size()); } diff --git a/src/mongo/db/s/balancer/balancer_policy.cpp b/src/mongo/db/s/balancer/balancer_policy.cpp index 14f3871a9b6..59a1b5b1197 100644 --- a/src/mongo/db/s/balancer/balancer_policy.cpp +++ b/src/mongo/db/s/balancer/balancer_policy.cpp @@ -287,14 +287,14 @@ std::tuple<ShardId, int64_t> BalancerPolicy::_getLeastLoadedReceiverShard( const DistributionStatus& distribution, const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, const string& tag, - const stdx::unordered_set<ShardId>& excludedShards) { + const stdx::unordered_set<ShardId>& availableShards) { ShardId best; int64_t currentMin = numeric_limits<int64_t>::max(); const auto shouldBalanceAccordingToDataSize = collDataSizeInfo.has_value(); for (const auto& stat : shardStats) { - if (excludedShards.count(stat.shardId)) + if (!availableShards.count(stat.shardId)) continue; auto status = isShardSuitableReceiver(stat, tag); @@ -331,14 +331,14 @@ std::tuple<ShardId, int64_t> BalancerPolicy::_getMostOverloadedShard( const DistributionStatus& distribution, const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, const string& chunkTag, - const stdx::unordered_set<ShardId>& excludedShards) { + const stdx::unordered_set<ShardId>& availableShards) { ShardId worst; long long currentMax = numeric_limits<long long>::min(); const auto shouldBalanceAccordingToDataSize = collDataSizeInfo.has_value(); for (const auto& stat : shardStats) { - if (excludedShards.count(stat.shardId)) + if (!availableShards.count(stat.shardId)) continue; if (shouldBalanceAccordingToDataSize) { @@ -435,7 +435,7 @@ MigrateInfosWithReason BalancerPolicy::balance( const ShardStatisticsVector& shardStats, const DistributionStatus& distribution, const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, - stdx::unordered_set<ShardId>* usedShards, + stdx::unordered_set<ShardId>* availableShards, bool forceJumbo) { vector<MigrateInfo> migrations; MigrationReason firstReason = MigrationReason::none; @@ -459,7 +459,7 @@ MigrateInfosWithReason BalancerPolicy::balance( if (!stat.isDraining) continue; - if (usedShards->count(stat.shardId)) + if (!availableShards->count(stat.shardId)) continue; const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId); @@ -481,7 +481,7 @@ MigrateInfosWithReason BalancerPolicy::balance( const string tag = distribution.getTagForChunk(chunk); const auto [to, _] = _getLeastLoadedReceiverShard( - shardStats, distribution, collDataSizeInfo, tag, *usedShards); + shardStats, distribution, collDataSizeInfo, tag, *availableShards); if (!to.isValid()) { if (migrations.empty()) { LOGV2_WARNING(21889, @@ -500,12 +500,13 @@ MigrateInfosWithReason BalancerPolicy::balance( if (firstReason == MigrationReason::none) { firstReason = MigrationReason::drain; } - invariant(usedShards->insert(stat.shardId).second); - invariant(usedShards->insert(to).second); + invariant(availableShards->erase(stat.shardId)); + invariant(availableShards->erase(to)); break; } if (migrations.empty()) { + availableShards->erase(stat.shardId); LOGV2_WARNING(21890, "Unable to find any chunk to move from draining shard " "{shardId}. numJumboChunks: {numJumboChunks}", @@ -513,13 +514,17 @@ MigrateInfosWithReason BalancerPolicy::balance( "shardId"_attr = stat.shardId, "numJumboChunks"_attr = numJumboChunks); } + + if (availableShards->size() < 2) { + return std::make_pair(std::move(migrations), firstReason); + } } } // 2) Check for chunks, which are on the wrong shard and must be moved off of it if (!distribution.tags().empty()) { for (const auto& stat : shardStats) { - if (usedShards->count(stat.shardId)) + if (!availableShards->count(stat.shardId)) continue; const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId); @@ -545,7 +550,7 @@ MigrateInfosWithReason BalancerPolicy::balance( } const auto [to, _] = _getLeastLoadedReceiverShard( - shardStats, distribution, collDataSizeInfo, tag, *usedShards); + shardStats, distribution, collDataSizeInfo, tag, *availableShards); if (!to.isValid()) { if (migrations.empty()) { LOGV2_WARNING(21892, @@ -567,10 +572,14 @@ MigrateInfosWithReason BalancerPolicy::balance( if (firstReason == MigrationReason::none) { firstReason = MigrationReason::zoneViolation; } - invariant(usedShards->insert(stat.shardId).second); - invariant(usedShards->insert(to).second); + invariant(availableShards->erase(stat.shardId)); + invariant(availableShards->erase(to)); break; } + + if (availableShards->size() < 2) { + return std::make_pair(std::move(migrations), firstReason); + } } } @@ -614,7 +623,7 @@ MigrateInfosWithReason BalancerPolicy::balance( *collDataSizeInfo, tag, &migrations, - usedShards, + availableShards, forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer : MoveChunkRequest::ForceJumbo::kDoNotForce); } @@ -625,7 +634,7 @@ MigrateInfosWithReason BalancerPolicy::balance( tag, totalNumberOfShardsWithTag, &migrations, - usedShards, + availableShards, forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer : MoveChunkRequest::ForceJumbo::kDoNotForce); }; @@ -646,11 +655,16 @@ boost::optional<MigrateInfo> BalancerPolicy::balanceSingleChunk( const DistributionStatus& distribution) { const string tag = distribution.getTagForChunk(chunk); - const auto [newShardId, _] = _getLeastLoadedReceiverShard(shardStats, - distribution, - boost::none /* collDataSizeInfo */, - tag, - stdx::unordered_set<ShardId>()); + stdx::unordered_set<ShardId> availableShards; + std::transform(shardStats.begin(), + shardStats.end(), + std::inserter(availableShards, availableShards.end()), + [](const ClusterStatistics::ShardStatistics& shardStatistics) -> ShardId { + return shardStatistics.shardId; + }); + + const auto [newShardId, _] = _getLeastLoadedReceiverShard( + shardStats, distribution, boost::none /* collDataSizeInfo */, tag, availableShards); if (!newShardId.isValid() || newShardId == chunk.getShard()) { return boost::optional<MigrateInfo>(); } @@ -664,7 +678,7 @@ bool BalancerPolicy::_singleZoneBalanceBasedOnChunks(const ShardStatisticsVector const string& tag, size_t totalNumberOfShardsWithTag, vector<MigrateInfo>* migrations, - stdx::unordered_set<ShardId>* usedShards, + stdx::unordered_set<ShardId>* availableShards, MoveChunkRequest::ForceJumbo forceJumbo) { // Calculate the rounded optimal number of chunks per shard const size_t totalNumberOfChunksWithTag = @@ -673,7 +687,7 @@ bool BalancerPolicy::_singleZoneBalanceBasedOnChunks(const ShardStatisticsVector (size_t)std::roundf(totalNumberOfChunksWithTag / (float)totalNumberOfShardsWithTag); const auto [from, fromSize] = - _getMostOverloadedShard(shardStats, distribution, boost::none, tag, *usedShards); + _getMostOverloadedShard(shardStats, distribution, boost::none, tag, *availableShards); if (!from.isValid()) return false; @@ -684,7 +698,7 @@ bool BalancerPolicy::_singleZoneBalanceBasedOnChunks(const ShardStatisticsVector return false; const auto [to, toSize] = - _getLeastLoadedReceiverShard(shardStats, distribution, boost::none, tag, *usedShards); + _getLeastLoadedReceiverShard(shardStats, distribution, boost::none, tag, *availableShards); if (!to.isValid()) { if (migrations->empty()) { LOGV2(21882, "No available shards to take chunks for zone", "zone"_attr = tag); @@ -734,8 +748,8 @@ bool BalancerPolicy::_singleZoneBalanceBasedOnChunks(const ShardStatisticsVector } migrations->emplace_back(to, distribution.nss(), chunk, forceJumbo); - invariant(usedShards->insert(chunk.getShard()).second); - invariant(usedShards->insert(to).second); + invariant(availableShards->erase(chunk.getShard())); + invariant(availableShards->erase(to)); return true; } @@ -760,15 +774,15 @@ bool BalancerPolicy::_singleZoneBalanceBasedOnDataSize( const CollectionDataSizeInfoForBalancing& collDataSizeInfo, const string& tag, vector<MigrateInfo>* migrations, - stdx::unordered_set<ShardId>* usedShards, + stdx::unordered_set<ShardId>* availableShards, MoveChunkRequest::ForceJumbo forceJumbo) { const auto [from, fromSize] = - _getMostOverloadedShard(shardStats, distribution, collDataSizeInfo, tag, *usedShards); + _getMostOverloadedShard(shardStats, distribution, collDataSizeInfo, tag, *availableShards); if (!from.isValid()) return false; - const auto [to, toSize] = - _getLeastLoadedReceiverShard(shardStats, distribution, collDataSizeInfo, tag, *usedShards); + const auto [to, toSize] = _getLeastLoadedReceiverShard( + shardStats, distribution, collDataSizeInfo, tag, *availableShards); if (!to.isValid()) { if (migrations->empty()) { LOGV2(6581600, "No available shards to take chunks for zone", "zone"_attr = tag); @@ -818,8 +832,8 @@ bool BalancerPolicy::_singleZoneBalanceBasedOnDataSize( chunk.getVersion(), forceJumbo, collDataSizeInfo.maxChunkSizeBytes); - invariant(usedShards->insert(chunk.getShard()).second); - invariant(usedShards->insert(to).second); + invariant(availableShards->erase(chunk.getShard())); + invariant(availableShards->erase(to)); return true; } diff --git a/src/mongo/db/s/balancer/balancer_policy.h b/src/mongo/db/s/balancer/balancer_policy.h index 0a047098615..af9377999a6 100644 --- a/src/mongo/db/s/balancer/balancer_policy.h +++ b/src/mongo/db/s/balancer/balancer_policy.h @@ -393,15 +393,15 @@ public: * any of the shards have chunks, which are sufficiently higher than this number, suggests * moving chunks to shards, which are under this number. * - * The usedShards parameter is in/out and it contains the set of shards, which have already been - * used for migrations. Used so we don't return multiple conflicting migrations for the same - * shard. + * The availableShards parameter is in/out and it contains the set of shards, which haven't + * been used for migrations yet. Used so we don't return multiple conflicting migrations for the + * same shard. */ static MigrateInfosWithReason balance( const ShardStatisticsVector& shardStats, const DistributionStatus& distribution, const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, - stdx::unordered_set<ShardId>* usedShards, + stdx::unordered_set<ShardId>* availableShards, bool forceJumbo); /** @@ -426,7 +426,7 @@ private: const DistributionStatus& distribution, const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, const std::string& tag, - const stdx::unordered_set<ShardId>& excludedShards); + const stdx::unordered_set<ShardId>& availableShards); /** * Only considers shards with the specified tag, all shards in case the tag is empty. @@ -442,12 +442,12 @@ private: const DistributionStatus& distribution, const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, const std::string& chunkTag, - const stdx::unordered_set<ShardId>& excludedShards); + const stdx::unordered_set<ShardId>& availableShards); /** * Selects one chunk for the specified zone (if appropriate) to be moved in order to bring the * deviation of the shards chunk contents closer to even across all shards in the specified - * zone. Takes into account and updates the shards, which have already been used for migrations. + * zone. Takes into account and updates the shards, which haven't been used for migrations yet. * * Returns true if a migration was suggested, false otherwise. This method is intented to be * called multiple times until all posible migrations for a zone have been selected. @@ -457,13 +457,13 @@ private: const std::string& tag, size_t totalNumberOfShardsWithTag, std::vector<MigrateInfo>* migrations, - stdx::unordered_set<ShardId>* usedShards, + stdx::unordered_set<ShardId>* availableShards, MoveChunkRequest::ForceJumbo forceJumbo); /** * Selects one range for the specified zone (if appropriate) to be moved in order to bring the * deviation of the collection data size closer to even across all shards in the specified - * zone. Takes into account and updates the shards, which have already been used for migrations. + * zone. Takes into account and updates the shards, which haven't been used for migrations yet. * * Returns true if a migration was suggested, false otherwise. This method is intented to be * called multiple times until all posible migrations for a zone have been selected. @@ -474,7 +474,7 @@ private: const CollectionDataSizeInfoForBalancing& collDataSizeInfo, const std::string& tag, std::vector<MigrateInfo>* migrations, - stdx::unordered_set<ShardId>* usedShards, + stdx::unordered_set<ShardId>* availableShards, MoveChunkRequest::ForceJumbo forceJumbo); }; diff --git a/src/mongo/db/s/balancer/balancer_policy_test.cpp b/src/mongo/db/s/balancer/balancer_policy_test.cpp index c46a874ade4..11b6de63464 100644 --- a/src/mongo/db/s/balancer/balancer_policy_test.cpp +++ b/src/mongo/db/s/balancer/balancer_policy_test.cpp @@ -111,13 +111,22 @@ std::pair<ShardStatisticsVector, ShardToChunksMap> generateCluster( return std::make_pair(std::move(shardStats), std::move(chunkMap)); } +stdx::unordered_set<ShardId> getAllShardIds(const ShardStatisticsVector& shardStats) { + stdx::unordered_set<ShardId> shards; + std::transform(shardStats.begin(), + shardStats.end(), + std::inserter(shards, shards.end()), + [](const ShardStatistics& shardStatistics) { return shardStatistics.shardId; }); + return shards; +} + MigrateInfosWithReason balanceChunks(const ShardStatisticsVector& shardStats, const DistributionStatus& distribution, bool shouldAggressivelyBalance, bool forceJumbo) { - stdx::unordered_set<ShardId> usedShards; + auto availableShards = getAllShardIds(shardStats); return BalancerPolicy::balance( - shardStats, distribution, boost::none /* collDataSizeInfo */, &usedShards, forceJumbo); + shardStats, distribution, boost::none /* collDataSizeInfo */, &availableShards, forceJumbo); } TEST(BalancerPolicy, Basic) { @@ -265,12 +274,13 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNe {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); // Here kShardId0 would have been selected as a donor - stdx::unordered_set<ShardId> usedShards{kShardId0}; + auto availableShards = getAllShardIds(cluster.first); + availableShards.erase(kShardId0); const auto [migrations, reason] = BalancerPolicy::balance(cluster.first, DistributionStatus(kNamespace, cluster.second), boost::none /* collDataSizeInfo */, - &usedShards, + &availableShards, false); ASSERT_EQ(1U, migrations.size()); @@ -289,12 +299,13 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNo {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); // Here kShardId0 would have been selected as a donor - stdx::unordered_set<ShardId> usedShards{kShardId0}; + auto availableShards = getAllShardIds(cluster.first); + availableShards.erase(kShardId0); const auto [migrations, reason] = BalancerPolicy::balance(cluster.first, DistributionStatus(kNamespace, cluster.second), boost::none /* collDataSizeInfo */, - &usedShards, + &availableShards, false); ASSERT_EQ(0U, migrations.size()); } @@ -307,12 +318,13 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseDestinationShards) { {ShardStatistics(kShardId3, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}}); // Here kShardId2 would have been selected as a recipient - stdx::unordered_set<ShardId> usedShards{kShardId2}; + auto availableShards = getAllShardIds(cluster.first); + availableShards.erase(kShardId2); const auto [migrations, reason] = BalancerPolicy::balance(cluster.first, DistributionStatus(kNamespace, cluster.second), boost::none /* collDataSizeInfo */, - &usedShards, + &availableShards, false); ASSERT_EQ(1U, migrations.size()); @@ -667,9 +679,10 @@ TEST(BalancerPolicy, BalancerMostOverLoadShardHasMultipleTagsSkipTagWithShardInU ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 1), BSON("x" << 3), "b"))); ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 3), BSON("x" << 5), "c"))); - stdx::unordered_set<ShardId> usedShards{kShardId1}; + auto availableShards = getAllShardIds(cluster.first); + availableShards.erase(kShardId1); const auto [migrations, reason] = BalancerPolicy::balance( - cluster.first, distribution, boost::none /* collDataSizeInfo */, &usedShards, false); + cluster.first, distribution, boost::none /* collDataSizeInfo */, &availableShards, false); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); |