summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/balancer/balancer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/balancer/balancer.cpp')
-rw-r--r--src/mongo/db/s/balancer/balancer.cpp127
1 files changed, 64 insertions, 63 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp
index fd798ecd665..60a765cf5d6 100644
--- a/src/mongo/db/s/balancer/balancer.cpp
+++ b/src/mongo/db/s/balancer/balancer.cpp
@@ -186,12 +186,12 @@ Balancer* Balancer::get(OperationContext* operationContext) {
return get(operationContext->getServiceContext());
}
-void Balancer::initiateBalancer(OperationContext* txn) {
+void Balancer::initiateBalancer(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
invariant(_state == kStopped);
_state = kRunning;
- _migrationManager.startRecoveryAndAcquireDistLocks(txn);
+ _migrationManager.startRecoveryAndAcquireDistLocks(opCtx);
invariant(!_thread.joinable());
invariant(!_threadOperationContext);
@@ -240,15 +240,15 @@ void Balancer::waitForBalancerToStop() {
LOG(1) << "Balancer thread terminated";
}
-void Balancer::joinCurrentRound(OperationContext* txn) {
+void Balancer::joinCurrentRound(OperationContext* opCtx) {
stdx::unique_lock<stdx::mutex> scopedLock(_mutex);
const auto numRoundsAtStart = _numBalancerRounds;
_condVar.wait(scopedLock,
[&] { return !_inBalancerRound || _numBalancerRounds != numRoundsAtStart; });
}
-Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& chunk) {
- auto migrateStatus = _chunkSelectionPolicy->selectSpecificChunkToMove(txn, chunk);
+Status Balancer::rebalanceSingleChunk(OperationContext* opCtx, const ChunkType& chunk) {
+ auto migrateStatus = _chunkSelectionPolicy->selectSpecificChunkToMove(opCtx, chunk);
if (!migrateStatus.isOK()) {
return migrateStatus.getStatus();
}
@@ -259,37 +259,37 @@ Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& ch
return Status::OK();
}
- auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
- Status refreshStatus = balancerConfig->refreshAndCheck(txn);
+ auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
+ Status refreshStatus = balancerConfig->refreshAndCheck(opCtx);
if (!refreshStatus.isOK()) {
return refreshStatus;
}
- return _migrationManager.executeManualMigration(txn,
+ return _migrationManager.executeManualMigration(opCtx,
*migrateInfo,
balancerConfig->getMaxChunkSizeBytes(),
balancerConfig->getSecondaryThrottle(),
balancerConfig->waitForDelete());
}
-Status Balancer::moveSingleChunk(OperationContext* txn,
+Status Balancer::moveSingleChunk(OperationContext* opCtx,
const ChunkType& chunk,
const ShardId& newShardId,
uint64_t maxChunkSizeBytes,
const MigrationSecondaryThrottleOptions& secondaryThrottle,
bool waitForDelete) {
- auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(txn, chunk, newShardId);
+ auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(opCtx, chunk, newShardId);
if (!moveAllowedStatus.isOK()) {
return moveAllowedStatus;
}
return _migrationManager.executeManualMigration(
- txn, MigrateInfo(newShardId, chunk), maxChunkSizeBytes, secondaryThrottle, waitForDelete);
+ opCtx, MigrateInfo(newShardId, chunk), maxChunkSizeBytes, secondaryThrottle, waitForDelete);
}
-void Balancer::report(OperationContext* txn, BSONObjBuilder* builder) {
- auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
- balancerConfig->refreshAndCheck(txn);
+void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) {
+ auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
+ balancerConfig->refreshAndCheck(opCtx);
const auto mode = balancerConfig->getBalancerMode();
@@ -301,27 +301,27 @@ void Balancer::report(OperationContext* txn, BSONObjBuilder* builder) {
void Balancer::_mainThread() {
Client::initThread("Balancer");
- auto txn = cc().makeOperationContext();
- auto shardingContext = Grid::get(txn.get());
+ auto opCtx = cc().makeOperationContext();
+ auto shardingContext = Grid::get(opCtx.get());
log() << "CSRS balancer is starting";
{
stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
- _threadOperationContext = txn.get();
+ _threadOperationContext = opCtx.get();
}
const Seconds kInitBackoffInterval(10);
auto balancerConfig = shardingContext->getBalancerConfiguration();
while (!_stopRequested()) {
- Status refreshStatus = balancerConfig->refreshAndCheck(txn.get());
+ Status refreshStatus = balancerConfig->refreshAndCheck(opCtx.get());
if (!refreshStatus.isOK()) {
warning() << "Balancer settings could not be loaded and will be retried in "
<< durationCount<Seconds>(kInitBackoffInterval) << " seconds"
<< causedBy(refreshStatus);
- _sleepFor(txn.get(), kInitBackoffInterval);
+ _sleepFor(opCtx.get(), kInitBackoffInterval);
continue;
}
@@ -330,8 +330,9 @@ void Balancer::_mainThread() {
log() << "CSRS balancer thread is recovering";
- _migrationManager.finishRecovery(
- txn.get(), balancerConfig->getMaxChunkSizeBytes(), balancerConfig->getSecondaryThrottle());
+ _migrationManager.finishRecovery(opCtx.get(),
+ balancerConfig->getMaxChunkSizeBytes(),
+ balancerConfig->getSecondaryThrottle());
log() << "CSRS balancer thread is recovered";
@@ -339,23 +340,23 @@ void Balancer::_mainThread() {
while (!_stopRequested()) {
BalanceRoundDetails roundDetails;
- _beginRound(txn.get());
+ _beginRound(opCtx.get());
try {
- shardingContext->shardRegistry()->reload(txn.get());
+ shardingContext->shardRegistry()->reload(opCtx.get());
- uassert(13258, "oids broken after resetting!", _checkOIDs(txn.get()));
+ uassert(13258, "oids broken after resetting!", _checkOIDs(opCtx.get()));
- Status refreshStatus = balancerConfig->refreshAndCheck(txn.get());
+ Status refreshStatus = balancerConfig->refreshAndCheck(opCtx.get());
if (!refreshStatus.isOK()) {
warning() << "Skipping balancing round" << causedBy(refreshStatus);
- _endRound(txn.get(), kBalanceRoundDefaultInterval);
+ _endRound(opCtx.get(), kBalanceRoundDefaultInterval);
continue;
}
if (!balancerConfig->shouldBalance()) {
LOG(1) << "Skipping balancing round because balancing is disabled";
- _endRound(txn.get(), kBalanceRoundDefaultInterval);
+ _endRound(opCtx.get(), kBalanceRoundDefaultInterval);
continue;
}
@@ -366,9 +367,9 @@ void Balancer::_mainThread() {
<< balancerConfig->getSecondaryThrottle().toBSON();
OCCASIONALLY warnOnMultiVersion(
- uassertStatusOK(_clusterStats->getStats(txn.get())));
+ uassertStatusOK(_clusterStats->getStats(opCtx.get())));
- Status status = _enforceTagRanges(txn.get());
+ Status status = _enforceTagRanges(opCtx.get());
if (!status.isOK()) {
warning() << "Failed to enforce tag ranges" << causedBy(status);
} else {
@@ -376,25 +377,25 @@ void Balancer::_mainThread() {
}
const auto candidateChunks = uassertStatusOK(
- _chunkSelectionPolicy->selectChunksToMove(txn.get(), _balancedLastTime));
+ _chunkSelectionPolicy->selectChunksToMove(opCtx.get(), _balancedLastTime));
if (candidateChunks.empty()) {
LOG(1) << "no need to move any chunk";
_balancedLastTime = false;
} else {
- _balancedLastTime = _moveChunks(txn.get(), candidateChunks);
+ _balancedLastTime = _moveChunks(opCtx.get(), candidateChunks);
roundDetails.setSucceeded(static_cast<int>(candidateChunks.size()),
_balancedLastTime);
- shardingContext->catalogClient(txn.get())->logAction(
- txn.get(), "balancer.round", "", roundDetails.toBSON());
+ shardingContext->catalogClient(opCtx.get())
+ ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON());
}
LOG(1) << "*** End of balancing round";
}
- _endRound(txn.get(),
+ _endRound(opCtx.get(),
_balancedLastTime ? kShortBalanceRoundInterval
: kBalanceRoundDefaultInterval);
} catch (const std::exception& e) {
@@ -406,11 +407,11 @@ void Balancer::_mainThread() {
// This round failed, tell the world!
roundDetails.setFailed(e.what());
- shardingContext->catalogClient(txn.get())->logAction(
- txn.get(), "balancer.round", "", roundDetails.toBSON());
+ shardingContext->catalogClient(opCtx.get())
+ ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON());
// Sleep a fair amount before retrying because of the error
- _endRound(txn.get(), kBalanceRoundDefaultInterval);
+ _endRound(opCtx.get(), kBalanceRoundDefaultInterval);
}
}
@@ -437,13 +438,13 @@ bool Balancer::_stopRequested() {
return (_state != kRunning);
}
-void Balancer::_beginRound(OperationContext* txn) {
+void Balancer::_beginRound(OperationContext* opCtx) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
_inBalancerRound = true;
_condVar.notify_all();
}
-void Balancer::_endRound(OperationContext* txn, Seconds waitTimeout) {
+void Balancer::_endRound(OperationContext* opCtx, Seconds waitTimeout) {
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
_inBalancerRound = false;
@@ -451,16 +452,16 @@ void Balancer::_endRound(OperationContext* txn, Seconds waitTimeout) {
_condVar.notify_all();
}
- _sleepFor(txn, waitTimeout);
+ _sleepFor(opCtx, waitTimeout);
}
-void Balancer::_sleepFor(OperationContext* txn, Seconds waitTimeout) {
+void Balancer::_sleepFor(OperationContext* opCtx, Seconds waitTimeout) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
_condVar.wait_for(lock, waitTimeout.toSystemDuration(), [&] { return _state != kRunning; });
}
-bool Balancer::_checkOIDs(OperationContext* txn) {
- auto shardingContext = Grid::get(txn);
+bool Balancer::_checkOIDs(OperationContext* opCtx) {
+ auto shardingContext = Grid::get(opCtx);
vector<ShardId> all;
shardingContext->shardRegistry()->getAllShardIds(&all);
@@ -473,14 +474,14 @@ bool Balancer::_checkOIDs(OperationContext* txn) {
return false;
}
- auto shardStatus = shardingContext->shardRegistry()->getShard(txn, shardId);
+ auto shardStatus = shardingContext->shardRegistry()->getShard(opCtx, shardId);
if (!shardStatus.isOK()) {
continue;
}
const auto s = shardStatus.getValue();
auto result = uassertStatusOK(
- s->runCommandWithFixedRetryAttempts(txn,
+ s->runCommandWithFixedRetryAttempts(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
BSON("features" << 1),
@@ -497,18 +498,18 @@ bool Balancer::_checkOIDs(OperationContext* txn) {
<< " and " << oids[x];
result = uassertStatusOK(s->runCommandWithFixedRetryAttempts(
- txn,
+ opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
BSON("features" << 1 << "oidReset" << 1),
Shard::RetryPolicy::kIdempotent));
uassertStatusOK(result.commandStatus);
- auto otherShardStatus = shardingContext->shardRegistry()->getShard(txn, oids[x]);
+ auto otherShardStatus = shardingContext->shardRegistry()->getShard(opCtx, oids[x]);
if (otherShardStatus.isOK()) {
result = uassertStatusOK(
otherShardStatus.getValue()->runCommandWithFixedRetryAttempts(
- txn,
+ opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
BSON("features" << 1 << "oidReset" << 1),
@@ -526,14 +527,14 @@ bool Balancer::_checkOIDs(OperationContext* txn) {
return true;
}
-Status Balancer::_enforceTagRanges(OperationContext* txn) {
- auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(txn);
+Status Balancer::_enforceTagRanges(OperationContext* opCtx) {
+ auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(opCtx);
if (!chunksToSplitStatus.isOK()) {
return chunksToSplitStatus.getStatus();
}
for (const auto& splitInfo : chunksToSplitStatus.getValue()) {
- auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, splitInfo.nss);
+ auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, splitInfo.nss);
if (!scopedCMStatus.isOK()) {
return scopedCMStatus.getStatus();
}
@@ -541,7 +542,7 @@ Status Balancer::_enforceTagRanges(OperationContext* txn) {
const auto& scopedCM = scopedCMStatus.getValue();
auto splitStatus =
- shardutil::splitChunkAtMultiplePoints(txn,
+ shardutil::splitChunkAtMultiplePoints(opCtx,
splitInfo.shardId,
splitInfo.nss,
scopedCM.cm()->getShardKeyPattern(),
@@ -557,9 +558,9 @@ Status Balancer::_enforceTagRanges(OperationContext* txn) {
return Status::OK();
}
-int Balancer::_moveChunks(OperationContext* txn,
+int Balancer::_moveChunks(OperationContext* opCtx,
const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateChunks) {
- auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
+ auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
// If the balancer was disabled since we started this round, don't start new chunk moves
if (_stopRequested() || !balancerConfig->shouldBalance()) {
@@ -568,7 +569,7 @@ int Balancer::_moveChunks(OperationContext* txn,
}
auto migrationStatuses =
- _migrationManager.executeMigrationsForAutoBalance(txn,
+ _migrationManager.executeMigrationsForAutoBalance(opCtx,
candidateChunks,
balancerConfig->getMaxChunkSizeBytes(),
balancerConfig->getSecondaryThrottle(),
@@ -598,7 +599,7 @@ int Balancer::_moveChunks(OperationContext* txn,
log() << "Performing a split because migration " << redact(requestIt->toString())
<< " failed for size reasons" << causedBy(redact(status));
- _splitOrMarkJumbo(txn, NamespaceString(requestIt->ns), requestIt->minKey);
+ _splitOrMarkJumbo(opCtx, NamespaceString(requestIt->ns), requestIt->minKey);
continue;
}
@@ -609,28 +610,28 @@ int Balancer::_moveChunks(OperationContext* txn,
return numChunksProcessed;
}
-void Balancer::_splitOrMarkJumbo(OperationContext* txn,
+void Balancer::_splitOrMarkJumbo(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& minKey) {
- auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(txn, nss));
+ auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss));
const auto cm = scopedCM.cm().get();
auto chunk = cm->findIntersectingChunkWithSimpleCollation(minKey);
try {
const auto splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints(
- txn,
+ opCtx,
chunk->getShardId(),
nss,
cm->getShardKeyPattern(),
ChunkRange(chunk->getMin(), chunk->getMax()),
- Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
+ Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
boost::none));
uassert(ErrorCodes::CannotSplit, "No split points found", !splitPoints.empty());
uassertStatusOK(
- shardutil::splitChunkAtMultiplePoints(txn,
+ shardutil::splitChunkAtMultiplePoints(opCtx,
chunk->getShardId(),
nss,
cm->getShardKeyPattern(),
@@ -644,8 +645,8 @@ void Balancer::_splitOrMarkJumbo(OperationContext* txn,
const std::string chunkName = ChunkType::genID(nss.ns(), chunk->getMin());
- auto status = Grid::get(txn)->catalogClient(txn)->updateConfigDocument(
- txn,
+ auto status = Grid::get(opCtx)->catalogClient(opCtx)->updateConfigDocument(
+ opCtx,
ChunkType::ConfigNS,
BSON(ChunkType::name(chunkName)),
BSON("$set" << BSON(ChunkType::jumbo(true))),