/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/s/balancer/balancer.h"
#include
#include "mongo/base/status_with.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/client.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/lite_parsed_query.h"
#include "mongo/s/balancer/balancer_chunk_selection_policy_impl.h"
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/balancer/cluster_statistics_impl.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/move_chunk_request.h"
#include "mongo/s/shard_util.h"
#include "mongo/s/sharding_raii.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/represent_as.h"
#include "mongo/util/timer.h"
#include "mongo/util/version.h"
namespace mongo {
using std::map;
using std::string;
using std::vector;
namespace {
const Seconds kBalanceRoundDefaultInterval(10);
const Seconds kShortBalanceRoundInterval(1);
const char kChunkTooBig[] = "chunkTooBig";
const auto getBalancer = ServiceContext::declareDecoration();
/**
* Utility class to generate timing and statistics for a single balancer round.
*/
class BalanceRoundDetails {
public:
BalanceRoundDetails() : _executionTimer() {}
void setSucceeded(int candidateChunks, int chunksMoved) {
invariant(!_errMsg);
_candidateChunks = candidateChunks;
_chunksMoved = chunksMoved;
}
void setFailed(const string& errMsg) {
_errMsg = errMsg;
}
BSONObj toBSON() const {
BSONObjBuilder builder;
builder.append("executionTimeMillis", _executionTimer.millis());
builder.append("errorOccured", _errMsg.is_initialized());
if (_errMsg) {
builder.append("errmsg", *_errMsg);
} else {
builder.append("candidateChunks", _candidateChunks);
builder.append("chunksMoved", _chunksMoved);
}
return builder.obj();
}
private:
const Timer _executionTimer;
// Set only on success
int _candidateChunks{0};
int _chunksMoved{0};
// Set only on failure
boost::optional _errMsg;
};
/**
* Occasionally prints a log message with shard versions if the versions are not the same
* in the cluster.
*/
void warnOnMultiVersion(const vector& clusterStats) {
bool isMultiVersion = false;
for (const auto& stat : clusterStats) {
if (!isSameMajorVersion(stat.mongoVersion.c_str())) {
isMultiVersion = true;
break;
}
}
// If we're all the same version, don't message
if (!isMultiVersion)
return;
StringBuilder sb;
sb << "Multi version cluster detected. Local version: " << versionString
<< ", shard versions: ";
for (const auto& stat : clusterStats) {
sb << stat.shardId << " is at " << stat.mongoVersion << "; ";
}
warning() << sb.str();
}
void appendOperationDeadlineIfSet(OperationContext* txn, BSONObjBuilder* cmdBuilder) {
if (!txn->hasDeadline()) {
return;
}
// Treat a remaining max time less than 1ms as 1ms, since any smaller is treated as infinity
// or an error on the receiving node.
const auto remainingMicros = std::max(txn->getRemainingMaxTimeMicros(), Microseconds{1000});
const auto maxTimeMsArg = representAs(durationCount(remainingMicros));
// We know that remainingMicros > 1000us, so if maxTimeMsArg is not engaged, it is because
// remainingMicros was too big to represent as a 32-bit signed integer number of
// milliseconds. In that case, we omit a maxTimeMs argument on the command, implying "no max
// time".
if (!maxTimeMsArg) {
return;
}
cmdBuilder->append(LiteParsedQuery::cmdOptionMaxTimeMS, *maxTimeMsArg);
}
/**
* Blocking method, which requests a single chunk migration to run.
*/
Status executeSingleMigration(OperationContext* txn,
const MigrateInfo& migrateInfo,
uint64_t maxChunkSizeBytes,
const MigrationSecondaryThrottleOptions& secondaryThrottle,
bool waitForDelete) {
const NamespaceString nss(migrateInfo.ns);
auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss);
if (!scopedCMStatus.isOK()) {
return scopedCMStatus.getStatus();
}
ChunkManager* const cm = scopedCMStatus.getValue().cm();
auto c = cm->findIntersectingChunk(txn, migrateInfo.minKey);
BSONObjBuilder builder;
MoveChunkRequest::appendAsCommand(
&builder,
nss,
cm->getVersion(),
Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(),
migrateInfo.from,
migrateInfo.to,
ChunkRange(c->getMin(), c->getMax()),
maxChunkSizeBytes,
secondaryThrottle,
waitForDelete);
appendOperationDeadlineIfSet(txn, &builder);
BSONObj cmdObj = builder.obj();
Status status{ErrorCodes::NotYetInitialized, "Uninitialized"};
auto shard = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from);
if (!shard) {
status = {ErrorCodes::ShardNotFound,
str::stream() << "shard " << migrateInfo.from << " not found"};
} else {
auto cmdStatus = shard->runCommand(txn,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
cmdObj,
Shard::RetryPolicy::kNotIdempotent);
if (!cmdStatus.isOK()) {
status = std::move(cmdStatus.getStatus());
} else {
status = std::move(cmdStatus.getValue().commandStatus);
BSONObj cmdResponse = std::move(cmdStatus.getValue().response);
// For backwards compatibility with 3.2 and earlier, where the move chunk command
// instead of returning a ChunkTooBig status includes an extra field in the response
bool chunkTooBig = false;
bsonExtractBooleanFieldWithDefault(cmdResponse, kChunkTooBig, false, &chunkTooBig);
if (chunkTooBig) {
invariant(!status.isOK());
status = {ErrorCodes::ChunkTooBig, status.reason()};
}
}
}
if (!status.isOK()) {
log() << "Move chunk " << cmdObj << " failed" << causedBy(status);
return {status.code(), str::stream() << "move failed due to " << status.toString()};
}
cm->reload(txn);
return Status::OK();
}
MONGO_FP_DECLARE(skipBalanceRound);
MONGO_FP_DECLARE(balancerRoundIntervalSetting);
} // namespace
Balancer::Balancer()
: _balancedLastTime(0),
_chunkSelectionPolicy(stdx::make_unique(
stdx::make_unique())),
_clusterStats(stdx::make_unique()) {}
Balancer::~Balancer() = default;
Balancer* Balancer::get(OperationContext* operationContext) {
return &getBalancer(operationContext->getServiceContext());
}
void Balancer::start(OperationContext* txn) {
invariant(!_thread.joinable());
_thread = stdx::thread([this] { _mainThread(); });
}
Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& chunk) {
auto migrateStatus = _chunkSelectionPolicy->selectSpecificChunkToMove(txn, chunk);
if (!migrateStatus.isOK()) {
return migrateStatus.getStatus();
}
auto migrateInfo = std::move(migrateStatus.getValue());
if (!migrateInfo) {
LOG(1) << "Unable to find more appropriate location for chunk " << chunk;
return Status::OK();
}
auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
Status refreshStatus = balancerConfig->refreshAndCheck(txn);
if (!refreshStatus.isOK()) {
return refreshStatus;
}
_moveChunks(txn,
{*migrateInfo},
balancerConfig->getSecondaryThrottle(),
balancerConfig->waitForDelete());
return Status::OK();
}
Status Balancer::moveSingleChunk(OperationContext* txn,
const ChunkType& chunk,
const ShardId& newShardId,
uint64_t maxChunkSizeBytes,
const MigrationSecondaryThrottleOptions& secondaryThrottle,
bool waitForDelete) {
auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(txn, chunk, newShardId);
if (!moveAllowedStatus.isOK()) {
return moveAllowedStatus;
}
return executeSingleMigration(txn,
MigrateInfo(chunk.getNS(), newShardId, chunk),
maxChunkSizeBytes,
secondaryThrottle,
waitForDelete);
}
void Balancer::_mainThread() {
Client::initThread("Balancer");
// This is the body of a BackgroundJob so if we throw here we're basically ending the balancer
// thread prematurely.
while (!inShutdown()) {
auto txn = cc().makeOperationContext();
if (!_init(txn.get())) {
log() << "will retry to initialize balancer in one minute";
sleepsecs(60);
continue;
}
break;
}
Seconds balanceRoundInterval(kBalanceRoundDefaultInterval);
while (!inShutdown()) {
auto txn = cc().makeOperationContext();
auto shardingContext = Grid::get(txn.get());
auto balancerConfig = shardingContext->getBalancerConfiguration();
BalanceRoundDetails roundDetails;
try {
// Reporting the balancer as active must be first call so the balancer control scripts
// know that there is an active balancer
_stardingUptimeReporter.reportStatus(txn.get(), true);
MONGO_FAIL_POINT_BLOCK(balancerRoundIntervalSetting, scopedBalancerRoundInterval) {
const BSONObj& data = scopedBalancerRoundInterval.getData();
balanceRoundInterval = Seconds(data["sleepSecs"].numberInt());
}
// Use fresh shard state and balancer settings
shardingContext->shardRegistry()->reload(txn.get());
Status refreshStatus = balancerConfig->refreshAndCheck(txn.get());
if (!refreshStatus.isOK()) {
warning() << "Skipping balancing round" << causedBy(refreshStatus);
sleepFor(balanceRoundInterval);
continue;
}
// now make sure we should even be running
if (!balancerConfig->isBalancerActive() || MONGO_FAIL_POINT(skipBalanceRound)) {
LOG(1) << "skipping balancing round because balancing is disabled";
// Tell scripts that the balancer is not active anymore
_stardingUptimeReporter.reportStatus(txn.get(), false);
sleepFor(balanceRoundInterval);
continue;
}
uassert(13258, "oids broken after resetting!", _checkOIDs(txn.get()));
{
auto scopedDistLock = shardingContext->catalogManager(txn.get())
->distLock(txn.get(),
"balancer",
"doing balance round",
DistLockManager::kSingleLockAttemptTimeout);
if (!scopedDistLock.isOK()) {
LOG(1) << "skipping balancing round" << causedBy(scopedDistLock.getStatus());
// Tell scripts that the balancer is not active anymore
_stardingUptimeReporter.reportStatus(txn.get(), false);
sleepFor(balanceRoundInterval); // no need to wake up soon
continue;
}
LOG(1) << "*** start balancing round. "
<< "waitForDelete: " << balancerConfig->waitForDelete()
<< ", secondaryThrottle: "
<< balancerConfig->getSecondaryThrottle().toBSON();
OCCASIONALLY warnOnMultiVersion(
uassertStatusOK(_clusterStats->getStats(txn.get())));
Status status = _enforceTagRanges(txn.get());
if (!status.isOK()) {
warning() << "Failed to enforce tag ranges" << causedBy(status);
} else {
LOG(1) << "Done enforcing tag range boundaries.";
}
const auto candidateChunks = uassertStatusOK(
_chunkSelectionPolicy->selectChunksToMove(txn.get(), _balancedLastTime));
if (candidateChunks.empty()) {
LOG(1) << "no need to move any chunk";
_balancedLastTime = 0;
} else {
_balancedLastTime = _moveChunks(txn.get(),
candidateChunks,
balancerConfig->getSecondaryThrottle(),
balancerConfig->waitForDelete());
roundDetails.setSucceeded(static_cast(candidateChunks.size()),
_balancedLastTime);
shardingContext->catalogManager(txn.get())
->logAction(txn.get(), "balancer.round", "", roundDetails.toBSON());
}
LOG(1) << "*** End of balancing round";
}
// Tell scripts that the balancer is not active anymore
_stardingUptimeReporter.reportStatus(txn.get(), false);
sleepFor(_balancedLastTime ? kShortBalanceRoundInterval : balanceRoundInterval);
} catch (const std::exception& e) {
log() << "caught exception while doing balance: " << e.what();
// Just to match the opening statement if in log level 1
LOG(1) << "*** End of balancing round";
// This round failed, tell the world!
roundDetails.setFailed(e.what());
shardingContext->catalogManager(txn.get())
->logAction(txn.get(), "balancer.round", "", roundDetails.toBSON());
// Sleep a fair amount before retrying because of the error
sleepFor(balanceRoundInterval);
}
}
}
bool Balancer::_init(OperationContext* txn) {
log() << "about to contact config servers and shards";
try {
// Contact the config server and refresh shard information. Checks that each shard is indeed
// a different process (no hostname mixup).
//
// These checks are redundant in that they're redone at every new round but we want to do
// them initially here so to catch any problem soon.
Grid::get(txn)->shardRegistry()->reload(txn);
if (!_checkOIDs(txn)) {
return false;
}
log() << "config servers and shards contacted successfully";
_stardingUptimeReporter.reportStatus(txn, false);
log() << "balancer id: " << _stardingUptimeReporter.getInstanceId() << " started";
return true;
} catch (const std::exception& e) {
warning() << "could not initialize balancer, please check that all shards and config "
"servers are up: " << e.what();
return false;
}
}
bool Balancer::_checkOIDs(OperationContext* txn) {
auto shardingContext = Grid::get(txn);
vector all;
shardingContext->shardRegistry()->getAllShardIds(&all);
// map of OID machine ID => shardId
map oids;
for (const ShardId& shardId : all) {
const auto s = shardingContext->shardRegistry()->getShard(txn, shardId);
if (!s) {
continue;
}
auto result =
uassertStatusOK(s->runCommand(txn,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
BSON("features" << 1),
Shard::RetryPolicy::kIdempotent));
uassertStatusOK(result.commandStatus);
BSONObj f = std::move(result.response);
if (f["oidMachine"].isNumber()) {
int x = f["oidMachine"].numberInt();
if (oids.count(x) == 0) {
oids[x] = shardId;
} else {
log() << "error: 2 machines have " << x << " as oid machine piece: " << shardId
<< " and " << oids[x];
result = uassertStatusOK(
s->runCommand(txn,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
BSON("features" << 1 << "oidReset" << 1),
Shard::RetryPolicy::kIdempotent));
uassertStatusOK(result.commandStatus);
const auto otherShard = shardingContext->shardRegistry()->getShard(txn, oids[x]);
if (otherShard) {
result = uassertStatusOK(
otherShard->runCommand(txn,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
BSON("features" << 1 << "oidReset" << 1),
Shard::RetryPolicy::kIdempotent));
uassertStatusOK(result.commandStatus);
}
return false;
}
} else {
log() << "warning: oidMachine not set on: " << s->toString();
}
}
return true;
}
Status Balancer::_enforceTagRanges(OperationContext* txn) {
auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(txn);
if (!chunksToSplitStatus.isOK()) {
return chunksToSplitStatus.getStatus();
}
for (const auto& splitInfo : chunksToSplitStatus.getValue()) {
auto scopedCMStatus = ScopedChunkManager::getExisting(txn, splitInfo.nss);
if (!scopedCMStatus.isOK()) {
return scopedCMStatus.getStatus();
}
auto scopedCM = std::move(scopedCMStatus.getValue());
ChunkManager* const cm = scopedCM.cm();
auto splitStatus = shardutil::splitChunkAtMultiplePoints(txn,
splitInfo.shardId,
splitInfo.nss,
cm->getShardKeyPattern(),
splitInfo.collectionVersion,
splitInfo.minKey,
splitInfo.maxKey,
{splitInfo.splitKey});
if (!splitStatus.isOK()) {
warning() << "Failed to enforce tag range for chunk " << splitInfo
<< causedBy(splitStatus.getStatus());
}
}
return Status::OK();
}
int Balancer::_moveChunks(OperationContext* txn,
const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateChunks,
const MigrationSecondaryThrottleOptions& secondaryThrottle,
bool waitForDelete) {
int movedCount = 0;
for (const auto& migrateInfo : candidateChunks) {
auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
// If the balancer was disabled since we started this round, don't start new chunk moves
if (!balancerConfig->isBalancerActive()) {
LOG(1) << "Stopping balancing round early as balancing was disabled";
return movedCount;
}
// Changes to metadata, borked metadata, and connectivity problems between shards
// should cause us to abort this chunk move, but shouldn't cause us to abort the entire
// round of chunks.
//
// TODO(spencer): We probably *should* abort the whole round on issues communicating
// with the config servers, but its impossible to distinguish those types of failures
// at the moment.
//
// TODO: Handle all these things more cleanly, since they're expected problems
const NamespaceString nss(migrateInfo.ns);
try {
Status status = executeSingleMigration(txn,
migrateInfo,
balancerConfig->getMaxChunkSizeBytes(),
balancerConfig->getSecondaryThrottle(),
balancerConfig->waitForDelete());
if (status.isOK()) {
movedCount++;
} else if (status == ErrorCodes::ChunkTooBig) {
log() << "Performing a split because migrate failed for size reasons"
<< causedBy(status);
auto scopedCM = uassertStatusOK(ScopedChunkManager::getExisting(txn, nss));
ChunkManager* const cm = scopedCM.cm();
auto c = cm->findIntersectingChunk(txn, migrateInfo.minKey);
auto splitStatus = c->split(txn, Chunk::normal, nullptr);
if (!splitStatus.isOK()) {
log() << "Marking chunk " << c->toString() << " as jumbo.";
c->markAsJumbo(txn);
// We increment moveCount so we do another round right away
movedCount++;
}
} else {
log() << "Balancer move failed" << causedBy(status);
}
} catch (const DBException& ex) {
log() << "balancer move " << migrateInfo << " failed" << causedBy(ex);
}
}
return movedCount;
}
} // namespace mongo