summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/balancer/balancer.cpp57
-rw-r--r--src/mongo/db/s/balancer/balancer.h56
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp16
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h4
-rw-r--r--src/mongo/db/s/config/configsvr_control_balancer_command.cpp4
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp2
6 files changed, 37 insertions, 102 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp
index b4466c22f2c..5421e431176 100644
--- a/src/mongo/db/s/balancer/balancer.cpp
+++ b/src/mongo/db/s/balancer/balancer.cpp
@@ -237,6 +237,7 @@ Balancer::Balancer()
_clusterStats.get(), _random, [this]() {
// On any internal update of the defragmentation policy status, wake up the thread
// consuming the stream of actions
+ _newInfoOnStreamingActions.store(true);
_defragmentationCondVar.notify_all();
})) {}
@@ -311,10 +312,6 @@ void Balancer::joinCurrentRound(OperationContext* opCtx) {
});
}
-Balancer::ScopedPauseBalancerRequest Balancer::requestPause() {
- return ScopedPauseBalancerRequest(this);
-}
-
Status Balancer::rebalanceSingleChunk(OperationContext* opCtx,
const NamespaceString& nss,
const ChunkType& chunk) {
@@ -430,17 +427,15 @@ void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) {
void Balancer::_consumeActionStreamLoop() {
Client::initThread("BalancerSecondary");
auto opCtx = cc().makeOperationContext();
+ // This thread never refreshes balancerConfig - instead, it relies on the requests
+ // performed by _mainThread() on each round to eventually see updated information.
+ auto balancerConfig = Grid::get(opCtx.get())->getBalancerConfiguration();
executor::ScopedTaskExecutor executor(
Grid::get(opCtx.get())->getExecutorPool()->getFixedExecutor());
- boost::optional<DefragmentationAction> nextAction(boost::none);
- auto newActionAvailable = [&] {
- nextAction = _defragmentationPolicy->getNextStreamingAction(opCtx.get());
- return nextAction.is_initialized();
- };
-
auto applyActionResponse = [this](const DefragmentationAction& action,
const DefragmentationActionResponse& response) {
+ invariant(_outstandingStreamingOps.addAndFetch(-1) >= 0);
ThreadClient tc("BalancerDefragmentationPolicy::applyActionResponse",
getGlobalServiceContext());
auto opCtx = tc->makeOperationContext();
@@ -461,16 +456,27 @@ void Balancer::_consumeActionStreamLoop() {
}
lastActionTime = Date_t::now();
};
-
+ bool streamDrained = false;
while (true) {
{
stdx::unique_lock<Latch> ul(_mutex);
- _defragmentationCondVar.wait(
- ul, [&] { return _state != kRunning || newActionAvailable(); });
+ _defragmentationCondVar.wait(ul, [&] {
+ auto canConsumeStream = balancerConfig->shouldBalanceForAutoSplit() &&
+ _outstandingStreamingOps.load() <= kMaxOutstandingStreamingOperations;
+ return _state != kRunning ||
+ (canConsumeStream && (!streamDrained || _newInfoOnStreamingActions.load()));
+ });
if (_state != kRunning) {
break;
}
}
+ _newInfoOnStreamingActions.store(false);
+ auto nextAction = _defragmentationPolicy->getNextStreamingAction(opCtx.get());
+ if ((streamDrained = !nextAction.is_initialized())) {
+ continue;
+ }
+
+ _outstandingStreamingOps.fetchAndAdd(1);
stdx::visit(
visit_helper::Overloaded{
[&](MergeInfo&& mergeAction) {
@@ -619,12 +625,15 @@ void Balancer::_mainThread() {
_endRound(opCtx.get(), kBalanceRoundDefaultInterval);
continue;
}
- if (!balancerConfig->shouldBalance() || _stopOrPauseRequested()) {
+ if (!balancerConfig->shouldBalance() || _stopRequested()) {
LOGV2_DEBUG(21859, 1, "Skipping balancing round because balancing is disabled");
_endRound(opCtx.get(), kBalanceRoundDefaultInterval);
continue;
}
+ // The current configuration is allowing the balancer to perform operations.
+ // Unblock the secondary thread if needed.
+ _defragmentationCondVar.notify_all();
{
LOGV2_DEBUG(21860,
1,
@@ -738,27 +747,11 @@ void Balancer::_mainThread() {
LOGV2(21867, "CSRS balancer is now stopped");
}
-void Balancer::_addPauseRequest() {
- stdx::unique_lock<Latch> scopedLock(_mutex);
- ++_numPauseRequests;
-}
-
-void Balancer::_removePauseRequest() {
- stdx::unique_lock<Latch> scopedLock(_mutex);
- invariant(_numPauseRequests > 0);
- --_numPauseRequests;
-}
-
bool Balancer::_stopRequested() {
stdx::lock_guard<Latch> scopedLock(_mutex);
return (_state != kRunning);
}
-bool Balancer::_stopOrPauseRequested() {
- stdx::lock_guard<Latch> scopedLock(_mutex);
- return (_state != kRunning || _numPauseRequests > 0);
-}
-
void Balancer::_beginRound(OperationContext* opCtx) {
stdx::unique_lock<Latch> lock(_mutex);
_inBalancerRound = true;
@@ -903,7 +896,7 @@ int Balancer::_moveChunks(OperationContext* opCtx,
auto catalogClient = Grid::get(opCtx)->catalogClient();
// If the balancer was disabled since we started this round, don't start new chunk moves
- if (_stopOrPauseRequested() || !balancerConfig->shouldBalance()) {
+ if (_stopRequested() || !balancerConfig->shouldBalance()) {
LOGV2_DEBUG(21870, 1, "Skipping balancing round because balancer was stopped");
return 0;
}
@@ -979,7 +972,7 @@ int Balancer::_moveChunks(OperationContext* opCtx,
return numChunksProcessed;
}
-void Balancer::notifyPersistedBalancerSettingsChanged() {
+void Balancer::notifyPersistedBalancerSettingsChanged(OperationContext* opCtx) {
_condVar.notify_all();
}
diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h
index b3975e8c27d..495919bcc2e 100644
--- a/src/mongo/db/s/balancer/balancer.h
+++ b/src/mongo/db/s/balancer/balancer.h
@@ -63,28 +63,6 @@ class Balancer : public ReplicaSetAwareServiceConfigSvr<Balancer> {
public:
/**
- * Scoped class to manage the pause/resumeBalancer requests cycle.
- * See Balancer::requestPause() for more details.
- */
- class ScopedPauseBalancerRequest {
- public:
- ~ScopedPauseBalancerRequest() {
- _balancer->_removePauseRequest();
- }
-
- private:
- Balancer* _balancer;
-
- ScopedPauseBalancerRequest(Balancer* balancer) : _balancer(balancer) {
- _balancer->_addPauseRequest();
- }
-
- ScopedPauseBalancerRequest(const ScopedPauseBalancerRequest&) = delete;
- ScopedPauseBalancerRequest& operator=(const ScopedPauseBalancerRequest&) = delete;
-
- friend class Balancer;
- };
- /**
* Provide access to the Balancer decoration on ServiceContext.
*/
static Balancer* get(ServiceContext* serviceContext);
@@ -135,14 +113,6 @@ public:
*/
void joinCurrentRound(OperationContext* opCtx);
-
- /**
- * Invoked by any client requiring a temporary suspension of the balancer thread
- * (I.E. the setFCV process). The request is NOT persisted by the balancer in its config
- * document and remains active as long as the returned ScopedPauseRequest doesn't get destroyed.
- */
- ScopedPauseBalancerRequest requestPause();
-
/**
* Blocking call, which requests the balancer to move a single chunk to a more appropriate
* shard, in accordance with the active balancer policy. It is not guaranteed that the chunk
@@ -190,7 +160,7 @@ public:
/**
* Informs the balancer that a setting that affects it changed.
*/
- void notifyPersistedBalancerSettingsChanged();
+ void notifyPersistedBalancerSettingsChanged(OperationContext* opCtx);
/**
* Informs the balancer that the user has requested defragmentation to be stopped on a
@@ -206,6 +176,8 @@ public:
const NamespaceString& nss);
private:
+ static constexpr int kMaxOutstandingStreamingOperations = 50;
+
/**
* Possible runtime states of the balancer. The comments indicate the allowed next state.
*/
@@ -243,21 +215,6 @@ private:
bool _stopRequested();
/**
- * Adds a request to pause the balancer main loop.
- */
- void _addPauseRequest();
-
- /**
- * Removes a previously added request to pause the balancer main loop.
- */
- void _removePauseRequest();
-
- /**
- * Assess whether the balancer has any active pause or stop request.
- */
- bool _stopOrPauseRequested();
-
- /**
* Signals the beginning and end of a balancing round.
*/
void _beginRound(OperationContext* opCtx);
@@ -310,6 +267,10 @@ private:
// thread.
OperationContext* _threadOperationContext{nullptr};
+ AtomicWord<int> _outstandingStreamingOps{0};
+
+ AtomicWord<bool> _newInfoOnStreamingActions{true};
+
// Indicates whether the balancer is currently executing a balancer round
bool _inBalancerRound{false};
@@ -325,9 +286,6 @@ private:
// Number of moved chunks in last round
int _balancedLastTime;
- // Number of active pause balancer requests
- int _numPauseRequests{0};
-
// Source of randomness when metadata needs to be randomized.
BalancerRandomSource _random;
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
index 2fcecb77fe3..b7b7e621f95 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
@@ -1512,10 +1512,6 @@ MigrateInfoVector BalancerDefragmentationPolicyImpl::selectChunksToMove(
boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::getNextStreamingAction(
OperationContext* opCtx) {
stdx::lock_guard<Latch> lk(_stateMutex);
- if (_concurrentStreamingOps >= kMaxConcurrentOperations) {
- return boost::none;
- }
-
// Visit the defrag state in round robin fashion starting from a random one
auto stateIt = [&] {
auto it = _defragmentationStates.begin();
@@ -1535,7 +1531,6 @@ boost::optional<DefragmentationAction> BalancerDefragmentationPolicyImpl::getNex
auto nextAction =
currentCollectionDefragmentationState->popNextStreamableAction(opCtx);
if (nextAction) {
- ++_concurrentStreamingOps;
return nextAction;
}
++stateIt;
@@ -1594,15 +1589,8 @@ void BalancerDefragmentationPolicyImpl::applyActionResult(
},
action);
- if (!targetState) {
- return;
- }
-
- targetState->applyActionResult(opCtx, action, response);
- bool streamingActionReceived = !stdx::holds_alternative<MigrateInfo>(action);
- if (streamingActionReceived) {
- --_concurrentStreamingOps;
- invariant(_concurrentStreamingOps >= 0);
+ if (targetState) {
+ targetState->applyActionResult(opCtx, action, response);
}
}
_onStateUpdated();
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
index 98226085a8d..d18d2cfacfa 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.h
@@ -98,8 +98,6 @@ public:
const NamespaceString& nss) override;
private:
- static constexpr int kMaxConcurrentOperations = 50;
-
/**
* Advances the defragmentation state of the specified collection to the next actionable phase
* (or sets the related DefragmentationPhase object to nullptr if nothing more can be done).
@@ -139,8 +137,6 @@ private:
Mutex _stateMutex = MONGO_MAKE_LATCH("BalancerChunkMergerImpl::_stateMutex");
- int _concurrentStreamingOps{0};
-
ClusterStatistics* const _clusterStats;
BalancerRandomSource& _random;
diff --git a/src/mongo/db/s/config/configsvr_control_balancer_command.cpp b/src/mongo/db/s/config/configsvr_control_balancer_command.cpp
index 84d97e2ee58..399a8924fe4 100644
--- a/src/mongo/db/s/config/configsvr_control_balancer_command.cpp
+++ b/src/mongo/db/s/config/configsvr_control_balancer_command.cpp
@@ -111,7 +111,7 @@ private:
auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
uassertStatusOK(balancerConfig->setBalancerMode(opCtx, BalancerSettingsType::kFull));
uassertStatusOK(balancerConfig->enableAutoSplit(opCtx, true));
- Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged();
+ Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged(opCtx);
ShardingLogging::get(opCtx)->logAction(opCtx, "balancer.start", "", BSONObj()).ignore();
}
};
@@ -131,7 +131,7 @@ private:
uassertStatusOK(balancerConfig->setBalancerMode(opCtx, BalancerSettingsType::kOff));
uassertStatusOK(balancerConfig->enableAutoSplit(opCtx, false));
- Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged();
+ Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged(opCtx);
Balancer::get(opCtx)->joinCurrentRound(opCtx);
ShardingLogging::get(opCtx)->logAction(opCtx, "balancer.stop", "", BSONObj()).ignore();
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index e3d7c8c6287..48dd1a66759 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -652,7 +652,7 @@ void ShardingCatalogManager::configureCollectionBalancing(
nss,
executor);
- Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged();
+ Balancer::get(opCtx)->notifyPersistedBalancerSettingsChanged(opCtx);
}
void ShardingCatalogManager::renameShardedMetadata(