diff options
6 files changed, 37 insertions, 102 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index b4466c22f2c..5421e431176 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -237,6 +237,7 @@ Balancer::Balancer() _clusterStats.get(), _random, [this]() { // On any internal update of the defragmentation policy status, wake up the thread // consuming the stream of actions + _newInfoOnStreamingActions.store(true); _defragmentationCondVar.notify_all(); })) {} @@ -311,10 +312,6 @@ void Balancer::joinCurrentRound(OperationContext* opCtx) { }); } -Balancer::ScopedPauseBalancerRequest Balancer::requestPause() { - return ScopedPauseBalancerRequest(this); -} - Status Balancer::rebalanceSingleChunk(OperationContext* opCtx, const NamespaceString& nss, const ChunkType& chunk) { @@ -430,17 +427,15 @@ void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) { void Balancer::_consumeActionStreamLoop() { Client::initThread("BalancerSecondary"); auto opCtx = cc().makeOperationContext(); + // This thread never refreshes balancerConfig - instead, it relies on the requests + // performed by _mainThread() on each round to eventually see updated information. + auto balancerConfig = Grid::get(opCtx.get())->getBalancerConfiguration(); executor::ScopedTaskExecutor executor( Grid::get(opCtx.get())->getExecutorPool()->getFixedExecutor()); - boost::optional<DefragmentationAction> nextAction(boost::none); - auto newActionAvailable = [&] { - nextAction = _defragmentationPolicy->getNextStreamingAction(opCtx.get()); - return nextAction.is_initialized(); - }; - auto applyActionResponse = [this](const DefragmentationAction& action, const DefragmentationActionResponse& response) { + invariant(_outstandingStreamingOps.addAndFetch(-1) >= 0); ThreadClient tc("BalancerDefragmentationPolicy::applyActionResponse", getGlobalServiceContext()); auto opCtx = tc->makeOperationContext(); @@ -461,16 +456,27 @@ void Balancer::_consumeActionStreamLoop() { } lastActionTime = Date_t::now(); }; - + bool streamDrained = false; while (true) { { stdx::unique_lock<Latch> ul(_mutex); - _defragmentationCondVar.wait( - ul, [&] { return _state != kRunning || newActionAvailable(); }); + _defragmentationCondVar.wait(ul, [&] { + auto canConsumeStream = balancerConfig->shouldBalanceForAutoSplit() && + _outstandingStreamingOps.load() <= kMaxOutstandingStreamingOperations; + return _state != kRunning || + (canConsumeStream && (!streamDrained || _newInfoOnStreamingActions.load())); + }); if (_state != kRunning) { break; } } + _newInfoOnStreamingActions.store(false); + auto nextAction = _defragmentationPolicy->getNextStreamingAction(opCtx.get()); + if ((streamDrained = !nextAction.is_initialized())) { + continue; + } + + _outstandingStreamingOps.fetchAndAdd(1); stdx::visit( visit_helper::Overloaded{ [&](MergeInfo&& mergeAction) { @@ -619,12 +625,15 @@ void Balancer::_mainThread() { _endRound(opCtx.get(), kBalanceRoundDefaultInterval); continue; } - if (!balancerConfig->shouldBalance() || _stopOrPauseRequested()) { + if (!balancerConfig->shouldBalance() || _stopRequested()) { LOGV2_DEBUG(21859, 1, "Skipping balancing round because balancing is disabled"); _endRound(opCtx.get(), kBalanceRoundDefaultInterval); continue; } + // The current configuration is allowing the balancer to perform operations. + // Unblock the secondary thread if needed. + _defragmentationCondVar.notify_all(); { LOGV2_DEBUG(21860, 1, @@ -738,27 +747,11 @@ void Balancer::_mainThread() { LOGV2(21867, "CSRS balancer is now stopped"); } -void Balancer::_addPauseRequest() { - stdx::unique_lock<Latch> scopedLock(_mutex); - ++_numPauseRequests; -} - -void Balancer::_removePauseRequest() { - stdx::unique_lock<Latch> scopedLock(_mutex); - invariant(_numPauseRequests > 0); - --_numPauseRequests; -} - bool Balancer::_stopRequested() { stdx::lock_guard<Latch> scopedLock(_mutex); return (_state != kRunning); } -bool Balancer::_stopOrPauseRequested() { - stdx::lock_guard<Latch> scopedLock(_mutex); - return (_state != kRunning || _numPauseRequests > 0); -} - void Balancer::_beginRound(OperationContext* opCtx) { stdx::unique_lock<Latch> lock(_mutex); _inBalancerRound = true; @@ -903,7 +896,7 @@ int Balancer::_moveChunks(OperationContext* opCtx, auto catalogClient = Grid::get(opCtx)->catalogClient(); // If the balancer was disabled since we started this round, don't start new chunk moves - if (_stopOrPauseRequested() || !balancerConfig->shouldBalance()) { + if (_stopRequested() || !balancerConfig->shouldBalance()) { LOGV2_DEBUG(21870, 1, "Skipping balancing round because balancer was stopped"); return 0; } @@ -979,7 +972,7 @@ int Balancer::_moveChunks(OperationContext* opCtx, return numChunksProcessed; } -void Balancer::notifyPersistedBalancerSettingsChanged() { +void Balancer::notifyPersistedBalancerSettingsChanged(OperationContext* opCtx) { _condVar.notify_all(); } diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h index b3975e8c27d..495919bcc2e 100644 --- a/src/mongo/db/s/balancer/balancer.h +++ b/src/mongo/db/s/balancer/balancer.h @@ -63,28 +63,6 @@ class Balancer : public ReplicaSetAwareServiceConfigSvr<Balancer> { public: /** - * Scoped class to manage the pause/resumeBalancer requests cycle. - * See Balancer::requestPause() for more details. - */ - class ScopedPauseBalancerRequest { - public: - ~ScopedPauseBalancerRequest() { - _balancer->_removePauseRequest(); - } - - private: - Balancer* _balancer; - - ScopedPauseBalancerRequest(Balancer* balancer) : _balancer(balancer) { - _balancer->_addPauseRequest(); - } - - ScopedPauseBalancerRequest(const ScopedPauseBalancerRequest&) = delete; - ScopedPauseBalancerRequest& operator=(const ScopedPauseBalancerRequest&) = delete; - - friend class Balancer; - }; - /** * Provide access to the Balancer decoration on ServiceContext. */ static Balancer* get(ServiceContext* serviceContext); @@ -135,14 +113,6 @@ public: */ void joinCurrentRound(OperationContext* opCtx); - - /** - * Invoked by any client requiring a temporary suspension of the balancer thread - * (I.E. the setFCV process). The request is NOT persisted by the balancer in its config - * document and remains active as long as the returned ScopedPauseRequest doesn't get destroyed. - */ - ScopedPauseBalancerRequest requestPause(); - /** * Blocking call, which requests the balancer to move a single chunk to a more appropriate * shard, in accordance with the active balancer policy. It is not guaranteed that the chunk @@ -190,7 +160,7 @@ public: /** * Informs the balancer that a setting that affects it changed. */ - void notifyPersistedBalancerSettingsChanged(); + void notifyPersistedBalancerSettingsChanged(OperationContext* opCtx); /** * Informs the balancer that the user has requested defragmentation to be stopped on a @@ -206,6 +176,8 @@ public: const NamespaceString& nss); private: + static constexpr int kMaxOutstandingStreamingOperations = 50; + /** * Possible runtime states of the balancer. The comments indicate the allowed next state. */ @@ -243,21 +215,6 @@ private: bool _stopRequested(); /** - * Adds a request to pause the balancer main loop. - */ - void _addPauseRequest(); - - /** - * Removes a previously added request to pause the balancer main loop. - */ - void _removePauseRequest(); - - /** - * Assess whether the balancer has any active pause or stop request. - */ - bool _stopOrPauseRequested(); - - /** * Signals the beginning and end of a balancing round. */ void _beginRound(OperationContext* opCtx); @@ -310,6 +267,10 @@ private: // thread. OperationContext* _threadOperationContext{nullptr}; + AtomicWord<int> _outstandingStreamingOps{0}; + + AtomicWord<bool> _newInfoOnStreamingActions{true}; + // Indicates whether the balancer is currently executing a balancer round bool _inBalancerRound{false}; @@ -325,9 +286,6 @@ private: // Number of moved chunks in last round int _balancedLastTime; - // Number of active pause balancer requests - int _numPauseRequests{0}; - // Source of randomness when metadata needs to be randomized. BalancerRandomSource _random; diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp index 2fcecb77fe3..b7b7e621f95 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp @@ -1512,10 +1512,6 @@ MigrateInfoVector BalancerDefragmentationPolicyImpl::selectChunksToMove( boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::getNextStreamingAction( OperationContext* opCtx) { stdx::lock_guard<Latch> lk(_stateMutex); - if (_concurrentStreamingOps >= kMaxConcurrentOperations) { - return boost::none; - } - // Visit the defrag state in round robin fashion starting from a random one auto stateIt = [&] { auto it = _defragmentationStates.begin(); @@ -1535,7 +1531,6 @@ boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::getNex auto nextAction = currentCollectionDefragmentationState->popNextStreamableAction(opCtx); if (nextAction) { - ++_concurrentStreamingOps; return nextAction; } ++stateIt; @@ -1594,15 +1589,8 @@ void BalancerDefragmentationPolicyImpl::applyActionResult( }, action); - if (!targetState) { - return; - } - - targetState->applyActionResult(opCtx, action, response); - bool streamingActionReceived = !stdx::holds_alternative<MigrateInfo>(action); - if (streamingActionReceived) { - --_concurrentStreamingOps; - invariant(_concurrentStreamingOps >= 0); + if (targetState) { + targetState->applyActionResult(opCtx, action, response); } } _onStateUpdated(); diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h index 98226085a8d..d18d2cfacfa 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h @@ -98,8 +98,6 @@ public: const NamespaceString& nss) override; private: - static constexpr int kMaxConcurrentOperations = 50; - /** * Advances the defragmentation state of the specified collection to the next actionable phase * (or sets the related DefragmentationPhase object to nullptr if nothing more can be done). @@ -139,8 +137,6 @@ private: Mutex _stateMutex = MONGO_MAKE_LATCH("BalancerChunkMergerImpl::_stateMutex"); - int _concurrentStreamingOps{0}; - ClusterStatistics* const _clusterStats; BalancerRandomSource& _random; diff --git a/src/mongo/db/s/config/configsvr_control_balancer_command.cpp b/src/mongo/db/s/config/configsvr_control_balancer_command.cpp index 84d97e2ee58..399a8924fe4 100644 --- a/src/mongo/db/s/config/configsvr_control_balancer_command.cpp +++ b/src/mongo/db/s/config/configsvr_control_balancer_command.cpp @@ -111,7 +111,7 @@ private: auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); uassertStatusOK(balancerConfig->setBalancerMode(opCtx, BalancerSettingsType::kFull)); uassertStatusOK(balancerConfig->enableAutoSplit(opCtx, true)); - Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged(); + Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged(opCtx); ShardingLogging::get(opCtx)->logAction(opCtx, "balancer.start", "", BSONObj()).ignore(); } }; @@ -131,7 +131,7 @@ private: uassertStatusOK(balancerConfig->setBalancerMode(opCtx, BalancerSettingsType::kOff)); uassertStatusOK(balancerConfig->enableAutoSplit(opCtx, false)); - Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged(); + Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged(opCtx); Balancer::get(opCtx)->joinCurrentRound(opCtx); ShardingLogging::get(opCtx)->logAction(opCtx, "balancer.stop", "", BSONObj()).ignore(); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index e3d7c8c6287..48dd1a66759 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -652,7 +652,7 @@ void ShardingCatalogManager::configureCollectionBalancing( nss, executor); - Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged(); + Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged(opCtx); } void ShardingCatalogManager::renameShardedMetadata( |