diff options
Diffstat (limited to 'src/mongo/db/s/balancer/balancer.cpp')
-rw-r--r-- | src/mongo/db/s/balancer/balancer.cpp | 127 |
1 files changed, 64 insertions, 63 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index fd798ecd665..60a765cf5d6 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -186,12 +186,12 @@ Balancer* Balancer::get(OperationContext* operationContext) { return get(operationContext->getServiceContext()); } -void Balancer::initiateBalancer(OperationContext* txn) { +void Balancer::initiateBalancer(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> scopedLock(_mutex); invariant(_state == kStopped); _state = kRunning; - _migrationManager.startRecoveryAndAcquireDistLocks(txn); + _migrationManager.startRecoveryAndAcquireDistLocks(opCtx); invariant(!_thread.joinable()); invariant(!_threadOperationContext); @@ -240,15 +240,15 @@ void Balancer::waitForBalancerToStop() { LOG(1) << "Balancer thread terminated"; } -void Balancer::joinCurrentRound(OperationContext* txn) { +void Balancer::joinCurrentRound(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> scopedLock(_mutex); const auto numRoundsAtStart = _numBalancerRounds; _condVar.wait(scopedLock, [&] { return !_inBalancerRound || _numBalancerRounds != numRoundsAtStart; }); } -Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& chunk) { - auto migrateStatus = _chunkSelectionPolicy->selectSpecificChunkToMove(txn, chunk); +Status Balancer::rebalanceSingleChunk(OperationContext* opCtx, const ChunkType& chunk) { + auto migrateStatus = _chunkSelectionPolicy->selectSpecificChunkToMove(opCtx, chunk); if (!migrateStatus.isOK()) { return migrateStatus.getStatus(); } @@ -259,37 +259,37 @@ Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& ch return Status::OK(); } - auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); - Status refreshStatus = balancerConfig->refreshAndCheck(txn); + auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); + Status refreshStatus = balancerConfig->refreshAndCheck(opCtx); if (!refreshStatus.isOK()) { return refreshStatus; } - return _migrationManager.executeManualMigration(txn, + return _migrationManager.executeManualMigration(opCtx, *migrateInfo, balancerConfig->getMaxChunkSizeBytes(), balancerConfig->getSecondaryThrottle(), balancerConfig->waitForDelete()); } -Status Balancer::moveSingleChunk(OperationContext* txn, +Status Balancer::moveSingleChunk(OperationContext* opCtx, const ChunkType& chunk, const ShardId& newShardId, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, bool waitForDelete) { - auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(txn, chunk, newShardId); + auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(opCtx, chunk, newShardId); if (!moveAllowedStatus.isOK()) { return moveAllowedStatus; } return _migrationManager.executeManualMigration( - txn, MigrateInfo(newShardId, chunk), maxChunkSizeBytes, secondaryThrottle, waitForDelete); + opCtx, MigrateInfo(newShardId, chunk), maxChunkSizeBytes, secondaryThrottle, waitForDelete); } -void Balancer::report(OperationContext* txn, BSONObjBuilder* builder) { - auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); - balancerConfig->refreshAndCheck(txn); +void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) { + auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); + balancerConfig->refreshAndCheck(opCtx); const auto mode = balancerConfig->getBalancerMode(); @@ -301,27 +301,27 @@ void Balancer::report(OperationContext* txn, BSONObjBuilder* builder) { void Balancer::_mainThread() { Client::initThread("Balancer"); - auto txn = cc().makeOperationContext(); - auto shardingContext = Grid::get(txn.get()); + auto opCtx = cc().makeOperationContext(); + auto shardingContext = Grid::get(opCtx.get()); log() << "CSRS balancer is starting"; { stdx::lock_guard<stdx::mutex> scopedLock(_mutex); - _threadOperationContext = txn.get(); + _threadOperationContext = opCtx.get(); } const Seconds kInitBackoffInterval(10); auto balancerConfig = shardingContext->getBalancerConfiguration(); while (!_stopRequested()) { - Status refreshStatus = balancerConfig->refreshAndCheck(txn.get()); + Status refreshStatus = balancerConfig->refreshAndCheck(opCtx.get()); if (!refreshStatus.isOK()) { warning() << "Balancer settings could not be loaded and will be retried in " << durationCount<Seconds>(kInitBackoffInterval) << " seconds" << causedBy(refreshStatus); - _sleepFor(txn.get(), kInitBackoffInterval); + _sleepFor(opCtx.get(), kInitBackoffInterval); continue; } @@ -330,8 +330,9 @@ void Balancer::_mainThread() { log() << "CSRS balancer thread is recovering"; - _migrationManager.finishRecovery( - txn.get(), balancerConfig->getMaxChunkSizeBytes(), balancerConfig->getSecondaryThrottle()); + _migrationManager.finishRecovery(opCtx.get(), + balancerConfig->getMaxChunkSizeBytes(), + balancerConfig->getSecondaryThrottle()); log() << "CSRS balancer thread is recovered"; @@ -339,23 +340,23 @@ void Balancer::_mainThread() { while (!_stopRequested()) { BalanceRoundDetails roundDetails; - _beginRound(txn.get()); + _beginRound(opCtx.get()); try { - shardingContext->shardRegistry()->reload(txn.get()); + shardingContext->shardRegistry()->reload(opCtx.get()); - uassert(13258, "oids broken after resetting!", _checkOIDs(txn.get())); + uassert(13258, "oids broken after resetting!", _checkOIDs(opCtx.get())); - Status refreshStatus = balancerConfig->refreshAndCheck(txn.get()); + Status refreshStatus = balancerConfig->refreshAndCheck(opCtx.get()); if (!refreshStatus.isOK()) { warning() << "Skipping balancing round" << causedBy(refreshStatus); - _endRound(txn.get(), kBalanceRoundDefaultInterval); + _endRound(opCtx.get(), kBalanceRoundDefaultInterval); continue; } if (!balancerConfig->shouldBalance()) { LOG(1) << "Skipping balancing round because balancing is disabled"; - _endRound(txn.get(), kBalanceRoundDefaultInterval); + _endRound(opCtx.get(), kBalanceRoundDefaultInterval); continue; } @@ -366,9 +367,9 @@ void Balancer::_mainThread() { << balancerConfig->getSecondaryThrottle().toBSON(); OCCASIONALLY warnOnMultiVersion( - uassertStatusOK(_clusterStats->getStats(txn.get()))); + uassertStatusOK(_clusterStats->getStats(opCtx.get()))); - Status status = _enforceTagRanges(txn.get()); + Status status = _enforceTagRanges(opCtx.get()); if (!status.isOK()) { warning() << "Failed to enforce tag ranges" << causedBy(status); } else { @@ -376,25 +377,25 @@ void Balancer::_mainThread() { } const auto candidateChunks = uassertStatusOK( - _chunkSelectionPolicy->selectChunksToMove(txn.get(), _balancedLastTime)); + _chunkSelectionPolicy->selectChunksToMove(opCtx.get(), _balancedLastTime)); if (candidateChunks.empty()) { LOG(1) << "no need to move any chunk"; _balancedLastTime = false; } else { - _balancedLastTime = _moveChunks(txn.get(), candidateChunks); + _balancedLastTime = _moveChunks(opCtx.get(), candidateChunks); roundDetails.setSucceeded(static_cast<int>(candidateChunks.size()), _balancedLastTime); - shardingContext->catalogClient(txn.get())->logAction( - txn.get(), "balancer.round", "", roundDetails.toBSON()); + shardingContext->catalogClient(opCtx.get()) + ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON()); } LOG(1) << "*** End of balancing round"; } - _endRound(txn.get(), + _endRound(opCtx.get(), _balancedLastTime ? kShortBalanceRoundInterval : kBalanceRoundDefaultInterval); } catch (const std::exception& e) { @@ -406,11 +407,11 @@ void Balancer::_mainThread() { // This round failed, tell the world! roundDetails.setFailed(e.what()); - shardingContext->catalogClient(txn.get())->logAction( - txn.get(), "balancer.round", "", roundDetails.toBSON()); + shardingContext->catalogClient(opCtx.get()) + ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON()); // Sleep a fair amount before retrying because of the error - _endRound(txn.get(), kBalanceRoundDefaultInterval); + _endRound(opCtx.get(), kBalanceRoundDefaultInterval); } } @@ -437,13 +438,13 @@ bool Balancer::_stopRequested() { return (_state != kRunning); } -void Balancer::_beginRound(OperationContext* txn) { +void Balancer::_beginRound(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lock(_mutex); _inBalancerRound = true; _condVar.notify_all(); } -void Balancer::_endRound(OperationContext* txn, Seconds waitTimeout) { +void Balancer::_endRound(OperationContext* opCtx, Seconds waitTimeout) { { stdx::lock_guard<stdx::mutex> lock(_mutex); _inBalancerRound = false; @@ -451,16 +452,16 @@ void Balancer::_endRound(OperationContext* txn, Seconds waitTimeout) { _condVar.notify_all(); } - _sleepFor(txn, waitTimeout); + _sleepFor(opCtx, waitTimeout); } -void Balancer::_sleepFor(OperationContext* txn, Seconds waitTimeout) { +void Balancer::_sleepFor(OperationContext* opCtx, Seconds waitTimeout) { stdx::unique_lock<stdx::mutex> lock(_mutex); _condVar.wait_for(lock, waitTimeout.toSystemDuration(), [&] { return _state != kRunning; }); } -bool Balancer::_checkOIDs(OperationContext* txn) { - auto shardingContext = Grid::get(txn); +bool Balancer::_checkOIDs(OperationContext* opCtx) { + auto shardingContext = Grid::get(opCtx); vector<ShardId> all; shardingContext->shardRegistry()->getAllShardIds(&all); @@ -473,14 +474,14 @@ bool Balancer::_checkOIDs(OperationContext* txn) { return false; } - auto shardStatus = shardingContext->shardRegistry()->getShard(txn, shardId); + auto shardStatus = shardingContext->shardRegistry()->getShard(opCtx, shardId); if (!shardStatus.isOK()) { continue; } const auto s = shardStatus.getValue(); auto result = uassertStatusOK( - s->runCommandWithFixedRetryAttempts(txn, + s->runCommandWithFixedRetryAttempts(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", BSON("features" << 1), @@ -497,18 +498,18 @@ bool Balancer::_checkOIDs(OperationContext* txn) { << " and " << oids[x]; result = uassertStatusOK(s->runCommandWithFixedRetryAttempts( - txn, + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", BSON("features" << 1 << "oidReset" << 1), Shard::RetryPolicy::kIdempotent)); uassertStatusOK(result.commandStatus); - auto otherShardStatus = shardingContext->shardRegistry()->getShard(txn, oids[x]); + auto otherShardStatus = shardingContext->shardRegistry()->getShard(opCtx, oids[x]); if (otherShardStatus.isOK()) { result = uassertStatusOK( otherShardStatus.getValue()->runCommandWithFixedRetryAttempts( - txn, + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", BSON("features" << 1 << "oidReset" << 1), @@ -526,14 +527,14 @@ bool Balancer::_checkOIDs(OperationContext* txn) { return true; } -Status Balancer::_enforceTagRanges(OperationContext* txn) { - auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(txn); +Status Balancer::_enforceTagRanges(OperationContext* opCtx) { + auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(opCtx); if (!chunksToSplitStatus.isOK()) { return chunksToSplitStatus.getStatus(); } for (const auto& splitInfo : chunksToSplitStatus.getValue()) { - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, splitInfo.nss); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, splitInfo.nss); if (!scopedCMStatus.isOK()) { return scopedCMStatus.getStatus(); } @@ -541,7 +542,7 @@ Status Balancer::_enforceTagRanges(OperationContext* txn) { const auto& scopedCM = scopedCMStatus.getValue(); auto splitStatus = - shardutil::splitChunkAtMultiplePoints(txn, + shardutil::splitChunkAtMultiplePoints(opCtx, splitInfo.shardId, splitInfo.nss, scopedCM.cm()->getShardKeyPattern(), @@ -557,9 +558,9 @@ Status Balancer::_enforceTagRanges(OperationContext* txn) { return Status::OK(); } -int Balancer::_moveChunks(OperationContext* txn, +int Balancer::_moveChunks(OperationContext* opCtx, const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateChunks) { - auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); + auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); // If the balancer was disabled since we started this round, don't start new chunk moves if (_stopRequested() || !balancerConfig->shouldBalance()) { @@ -568,7 +569,7 @@ int Balancer::_moveChunks(OperationContext* txn, } auto migrationStatuses = - _migrationManager.executeMigrationsForAutoBalance(txn, + _migrationManager.executeMigrationsForAutoBalance(opCtx, candidateChunks, balancerConfig->getMaxChunkSizeBytes(), balancerConfig->getSecondaryThrottle(), @@ -598,7 +599,7 @@ int Balancer::_moveChunks(OperationContext* txn, log() << "Performing a split because migration " << redact(requestIt->toString()) << " failed for size reasons" << causedBy(redact(status)); - _splitOrMarkJumbo(txn, NamespaceString(requestIt->ns), requestIt->minKey); + _splitOrMarkJumbo(opCtx, NamespaceString(requestIt->ns), requestIt->minKey); continue; } @@ -609,28 +610,28 @@ int Balancer::_moveChunks(OperationContext* txn, return numChunksProcessed; } -void Balancer::_splitOrMarkJumbo(OperationContext* txn, +void Balancer::_splitOrMarkJumbo(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) { - auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(txn, nss)); + auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss)); const auto cm = scopedCM.cm().get(); auto chunk = cm->findIntersectingChunkWithSimpleCollation(minKey); try { const auto splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( - txn, + opCtx, chunk->getShardId(), nss, cm->getShardKeyPattern(), ChunkRange(chunk->getMin(), chunk->getMax()), - Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), + Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), boost::none)); uassert(ErrorCodes::CannotSplit, "No split points found", !splitPoints.empty()); uassertStatusOK( - shardutil::splitChunkAtMultiplePoints(txn, + shardutil::splitChunkAtMultiplePoints(opCtx, chunk->getShardId(), nss, cm->getShardKeyPattern(), @@ -644,8 +645,8 @@ void Balancer::_splitOrMarkJumbo(OperationContext* txn, const std::string chunkName = ChunkType::genID(nss.ns(), chunk->getMin()); - auto status = Grid::get(txn)->catalogClient(txn)->updateConfigDocument( - txn, + auto status = Grid::get(opCtx)->catalogClient(opCtx)->updateConfigDocument( + opCtx, ChunkType::ConfigNS, BSON(ChunkType::name(chunkName)), BSON("$set" << BSON(ChunkType::jumbo(true))), |