/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/s/balance.h"
#include
#include "mongo/base/owned_pointer_map.h"
#include "mongo/client/connpool.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/client.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/server_options.h"
#include "mongo/db/write_concern.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/catalog/type_actionlog.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_settings.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/config.h"
#include "mongo/s/catalog/dist_lock_manager.h"
#include "mongo/s/grid.h"
#include "mongo/s/server.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/type_mongos.h"
#include "mongo/s/type_tags.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/timer.h"
#include "mongo/util/version.h"
namespace mongo {
using boost::scoped_ptr;
using boost::shared_ptr;
using std::auto_ptr;
using std::endl;
using std::map;
using std::set;
using std::string;
using std::vector;
MONGO_FP_DECLARE(skipBalanceRound);
Balancer balancer;
Balancer::Balancer()
: _balancedLastTime(0),
_policy(new BalancerPolicy()) {
}
Balancer::~Balancer() {
}
int Balancer::_moveChunks(const vector* candidateChunks,
const WriteConcernOptions* writeConcern,
bool waitForDelete)
{
int movedCount = 0;
for ( vector::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ) {
// If the balancer was disabled since we started this round, don't start new
// chunks moves.
auto balSettingsResult =
grid.catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey);
const bool isBalSettingsAbsent = balSettingsResult.getStatus() == ErrorCodes::NoSuchKey;
if (!balSettingsResult.isOK() && !isBalSettingsAbsent) {
warning() << balSettingsResult.getStatus();
return movedCount;
}
const SettingsType& balancerConfig = balSettingsResult.getValue();
if ((!isBalSettingsAbsent && !grid.shouldBalance(balancerConfig)) ||
MONGO_FAIL_POINT(skipBalanceRound)) {
LOG(1) << "Stopping balancing round early as balancing was disabled";
return movedCount;
}
// Changes to metadata, borked metadata, and connectivity problems between shards should
// cause us to abort this chunk move, but shouldn't cause us to abort the entire round
// of chunks.
// TODO(spencer): We probably *should* abort the whole round on issues communicating
// with the config servers, but its impossible to distinguish those types of failures
// at the moment.
// TODO: Handle all these things more cleanly, since they're expected problems
const CandidateChunk& chunkInfo = *it->get();
const NamespaceString nss(chunkInfo.ns);
try {
auto status = grid.catalogCache()->getDatabase(nss.db().toString());
fassert(28628, status.getStatus());
shared_ptr cfg = status.getValue();
// NOTE: We purposely do not reload metadata here, since _doBalanceRound already
// tried to do so once.
ChunkManagerPtr cm = cfg->getChunkManager( chunkInfo.ns );
verify( cm );
ChunkPtr c = cm->findIntersectingChunk( chunkInfo.chunk.min );
if ( c->getMin().woCompare( chunkInfo.chunk.min ) || c->getMax().woCompare( chunkInfo.chunk.max ) ) {
// likely a split happened somewhere
cm = cfg->getChunkManager( chunkInfo.ns , true /* reload */);
verify( cm );
c = cm->findIntersectingChunk( chunkInfo.chunk.min );
if ( c->getMin().woCompare( chunkInfo.chunk.min ) || c->getMax().woCompare( chunkInfo.chunk.max ) ) {
log() << "chunk mismatch after reload, ignoring will retry issue " << chunkInfo.chunk.toString() << endl;
continue;
}
}
BSONObj res;
if (c->moveAndCommit(Shard::make(chunkInfo.to),
Chunk::MaxChunkSize,
writeConcern,
waitForDelete,
0, /* maxTimeMS */
res)) {
movedCount++;
continue;
}
// the move requires acquiring the collection metadata's lock, which can fail
log() << "balancer move failed: " << res << " from: " << chunkInfo.from << " to: " << chunkInfo.to
<< " chunk: " << chunkInfo.chunk << endl;
if ( res["chunkTooBig"].trueValue() ) {
// reload just to be safe
cm = cfg->getChunkManager( chunkInfo.ns );
verify( cm );
c = cm->findIntersectingChunk( chunkInfo.chunk.min );
log() << "performing a split because migrate failed for size reasons";
Status status = c->split(Chunk::normal, NULL, NULL);
log() << "split results: " << status << endl;
if ( !status.isOK() ) {
log() << "marking chunk as jumbo: " << c->toString() << endl;
c->markAsJumbo();
// we increment moveCount so we do another round right away
movedCount++;
}
}
}
catch( const DBException& ex ) {
warning() << "could not move chunk " << chunkInfo.chunk.toString()
<< ", continuing balancing round" << causedBy( ex ) << endl;
}
}
return movedCount;
}
void Balancer::_ping(bool waiting) {
grid.catalogManager()->update(
MongosType::ConfigNS,
BSON(MongosType::name(_myid)),
BSON("$set" << BSON(MongosType::ping(jsTime()) <<
MongosType::up(static_cast(time(0) - _started)) <<
MongosType::waiting(waiting) <<
MongosType::mongoVersion(versionString))),
true,
false,
NULL);
}
/*
* Builds the details object for the actionlog.
* Current formats for detail are:
* Success: {
* "candidateChunks" : ,
* "chunksMoved" : ,
* "executionTimeMillis" : ,
* "errorOccured" : false
* }
* Failure: {
* "executionTimeMillis" : ,
* "errmsg" : ,
* "errorOccured" : true
* }
* @param didError, did this round end in an error?
* @param executionTime, the time this round took to run
* @param candidateChunks, the number of chunks identified to be moved
* @param chunksMoved, the number of chunks moved
* @param errmsg, the error message for this round
*/
static BSONObj _buildDetails( bool didError, int executionTime,
int candidateChunks, int chunksMoved, const std::string& errmsg ) {
BSONObjBuilder builder;
builder.append("executionTimeMillis", executionTime);
builder.append("errorOccured", didError);
if ( didError ) {
builder.append("errmsg", errmsg);
} else {
builder.append("candidateChunks", candidateChunks);
builder.append("chunksMoved", chunksMoved);
}
return builder.obj();
}
bool Balancer::_checkOIDs() {
vector all;
Shard::getAllShards( all );
map oids;
for ( vector::iterator i=all.begin(); i!=all.end(); ++i ) {
Shard s = *i;
BSONObj f = s.runCommand( "admin" , "features" );
if ( f["oidMachine"].isNumber() ) {
int x = f["oidMachine"].numberInt();
if ( oids.count(x) == 0 ) {
oids[x] = s;
}
else {
log() << "error: 2 machines have " << x << " as oid machine piece " << s.toString() << " and " << oids[x].toString() << endl;
s.runCommand( "admin" , BSON( "features" << 1 << "oidReset" << 1 ) );
oids[x].runCommand( "admin" , BSON( "features" << 1 << "oidReset" << 1 ) );
return false;
}
}
else {
log() << "warning: oidMachine not set on: " << s.toString() << endl;
}
}
return true;
}
/**
* Occasionally prints a log message with shard versions if the versions are not the same
* in the cluster.
*/
void warnOnMultiVersion( const ShardInfoMap& shardInfo ) {
bool isMultiVersion = false;
for ( ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i ) {
if ( !isSameMajorVersion( i->second.getMongoVersion().c_str() ) ) {
isMultiVersion = true;
break;
}
}
// If we're all the same version, don't message
if ( !isMultiVersion ) return;
warning() << "multiVersion cluster detected, my version is " << versionString << endl;
for ( ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i ) {
log() << i->first << " is at version " << i->second.getMongoVersion() << endl;
}
}
void Balancer::_doBalanceRound(DBClientBase& conn, vector* candidateChunks) {
invariant(candidateChunks);
vector collections;
Status collsStatus = grid.catalogManager()->getCollections(nullptr, &collections);
if (!collsStatus.isOK()) {
warning() << "Failed to retrieve the set of collections during balancing round "
<< collsStatus;
return;
}
if (collections.empty()) {
LOG(1) << "no collections to balance";
return;
}
// Get a list of all the shards that are participating in this balance round along with any
// maximum allowed quotas and current utilization. We get the latter by issuing
// db.serverStatus() (mem.mapped) to all shards.
//
// TODO: skip unresponsive shards and mark information as stale.
ShardInfoMap shardInfo;
Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo);
if (!loadStatus.isOK()) {
warning() << "failed to load shard metadata" << causedBy(loadStatus);
return;
}
if (shardInfo.size() < 2) {
LOG(1) << "can't balance without more active shards";
return;
}
OCCASIONALLY warnOnMultiVersion( shardInfo );
// For each collection, check if the balancing policy recommends moving anything around.
for (const auto& coll : collections) {
// Skip collections for which balancing is disabled
const NamespaceString& ns = coll.getNs();
if (!coll.getAllowBalance()) {
LOG(1) << "Not balancing collection " << ns << "; explicitly disabled.";
continue;
}
OwnedPointerMap > shardToChunksMap;
auto_ptr cursor =
conn.query(ChunkType::ConfigNS,
QUERY(ChunkType::ns(ns)).sort(ChunkType::min()));
set allChunkMinimums;
while ( cursor->more() ) {
BSONObj chunkDoc = cursor->nextSafe().getOwned();
StatusWith chunkRes = ChunkType::fromBSON(chunkDoc);
if (!chunkRes.isOK()) {
error() << "bad chunk format for " << chunkDoc
<< ": " << chunkRes.getStatus().reason();
return;
}
auto_ptr chunk(new ChunkType(chunkRes.getValue()));
allChunkMinimums.insert(chunk->getMin().getOwned());
OwnedPointerVector*& chunkList =
shardToChunksMap.mutableMap()[chunk->getShard()];
if (chunkList == NULL) {
chunkList = new OwnedPointerVector();
}
chunkList->mutableVector().push_back(chunk.release());
}
cursor.reset();
if (shardToChunksMap.map().empty()) {
LOG(1) << "skipping empty collection (" << ns << ")";
continue;
}
for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) {
// this just makes sure there is an entry in shardToChunksMap for every shard
OwnedPointerVector*& chunkList =
shardToChunksMap.mutableMap()[i->first];
if (chunkList == NULL) {
chunkList = new OwnedPointerVector();
}
}
DistributionStatus status(shardInfo, shardToChunksMap.map());
cursor = conn.query(TagsType::ConfigNS,
QUERY(TagsType::ns(ns)).sort(TagsType::min()));
vector ranges;
while ( cursor->more() ) {
BSONObj tag = cursor->nextSafe();
TagRange tr(tag[TagsType::min()].Obj().getOwned(),
tag[TagsType::max()].Obj().getOwned(),
tag[TagsType::tag()].String());
ranges.push_back(tr);
uassert(16356,
str::stream() << "tag ranges not valid for: " << ns.toString(),
status.addTagRange(tr) );
}
cursor.reset();
auto statusGetDb = grid.catalogCache()->getDatabase(ns.db().toString());
if (!statusGetDb.isOK()) {
warning() << "could not load db config to balance " << ns
<< ", collection: " << statusGetDb.getStatus();
continue;
}
shared_ptr cfg = statusGetDb.getValue();
// This line reloads the chunk manager once if this process doesn't know the collection
// is sharded yet.
ChunkManagerPtr cm = cfg->getChunkManagerIfExists( ns, true );
if ( !cm ) {
warning() << "could not load chunks to balance " << ns << " collection" << endl;
continue;
}
// loop through tags to make sure no chunk spans tags; splits on tag min. for all chunks
bool didAnySplits = false;
for ( unsigned i = 0; i < ranges.size(); i++ ) {
BSONObj min = ranges[i].min;
min = cm->getShardKeyPattern().getKeyPattern().extendRangeBound( min, false );
if ( allChunkMinimums.count( min ) > 0 )
continue;
didAnySplits = true;
log() << "ns: " << ns << " need to split on "
<< min << " because there is a range there" << endl;
ChunkPtr c = cm->findIntersectingChunk( min );
vector splitPoints;
splitPoints.push_back( min );
Status status = c->multiSplit(splitPoints, NULL);
if ( !status.isOK() ) {
error() << "split failed: " << status << endl;
}
else {
LOG(1) << "split worked" << endl;
}
break;
}
if (didAnySplits) {
// state change, just wait till next round
continue;
}
CandidateChunk* p = _policy->balance(ns, status, _balancedLastTime);
if (p) {
candidateChunks->push_back(CandidateChunkPtr(p));
}
}
}
bool Balancer::_init() {
try {
log() << "about to contact config servers and shards" << endl;
// contact the config server and refresh shard information
// checks that each shard is indeed a different process (no hostname mixup)
// these checks are redundant in that they're redone at every new round but we want to do them initially here
// so to catch any problem soon
Shard::reloadShardInfo();
_checkOIDs();
log() << "config servers and shards contacted successfully" << endl;
StringBuilder buf;
buf << getHostNameCached() << ":" << serverGlobalParams.port;
_myid = buf.str();
_started = time(0);
log() << "balancer id: " << _myid << " started";
return true;
}
catch ( std::exception& e ) {
warning() << "could not initialize balancer, please check that all shards and config servers are up: " << e.what() << endl;
return false;
}
}
void Balancer::run() {
Client::initThread("Balancer");
// this is the body of a BackgroundJob so if we throw here we're basically ending the balancer thread prematurely
while ( ! inShutdown() ) {
if ( ! _init() ) {
log() << "will retry to initialize balancer in one minute" << endl;
sleepsecs( 60 );
continue;
}
break;
}
int sleepTime = 10;
// getConnectioString and dist lock constructor does not throw, which is what we expect on while
// on the balancer thread
ConnectionString config = configServer.getConnectionString();
while ( ! inShutdown() ) {
Timer balanceRoundTimer;
ActionLogType actionLog;
actionLog.setServer(getHostNameCached());
actionLog.setWhat("balancer.round");
try {
ScopedDbConnection conn(config.toString(), 30);
// ping has to be first so we keep things in the config server in sync
_ping();
BSONObj balancerResult;
// use fresh shard state
Shard::reloadShardInfo();
// refresh chunk size (even though another balancer might be active)
Chunk::refreshChunkSize();
auto balSettingsResult =
grid.catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey);
const bool isBalSettingsAbsent =
balSettingsResult.getStatus() == ErrorCodes::NoSuchKey;
if (!balSettingsResult.isOK() && !isBalSettingsAbsent) {
warning() << balSettingsResult.getStatus();
return;
}
const SettingsType& balancerConfig = balSettingsResult.getValue();
// now make sure we should even be running
if ((!isBalSettingsAbsent && !grid.shouldBalance(balancerConfig)) ||
MONGO_FAIL_POINT(skipBalanceRound)) {
LOG(1) << "skipping balancing round because balancing is disabled" << endl;
// Ping again so scripts can determine if we're active without waiting
_ping( true );
conn.done();
sleepsecs( sleepTime );
continue;
}
uassert( 13258 , "oids broken after resetting!" , _checkOIDs() );
{
auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(
"balancer", "doing balance round");
if (!scopedDistLock.isOK()) {
LOG(1) << "skipping balancing round"
<< causedBy(scopedDistLock.getStatus());
// Ping again so scripts can determine if we're active without waiting
_ping( true );
conn.done();
sleepsecs( sleepTime ); // no need to wake up soon
continue;
}
const bool waitForDelete = (balancerConfig.isWaitForDeleteSet() ?
balancerConfig.getWaitForDelete() : false);
std::unique_ptr writeConcern;
if (balancerConfig.isKeySet()) { // if balancer doc exists.
writeConcern = std::move(balancerConfig.getWriteConcern());
}
LOG(1) << "*** start balancing round. "
<< "waitForDelete: " << waitForDelete
<< ", secondaryThrottle: "
<< (writeConcern.get() ? writeConcern->toBSON().toString() : "default")
<< endl;
vector candidateChunks;
_doBalanceRound( conn.conn() , &candidateChunks );
if ( candidateChunks.size() == 0 ) {
LOG(1) << "no need to move any chunk" << endl;
_balancedLastTime = 0;
}
else {
_balancedLastTime = _moveChunks(&candidateChunks,
writeConcern.get(),
waitForDelete );
}
actionLog.setDetails( _buildDetails( false, balanceRoundTimer.millis(),
static_cast(candidateChunks.size()), _balancedLastTime, "") );
actionLog.setTime(jsTime());
grid.catalogManager()->logAction(actionLog);
LOG(1) << "*** end of balancing round" << endl;
}
// Ping again so scripts can determine if we're active without waiting
_ping( true );
conn.done();
sleepsecs( _balancedLastTime ? sleepTime / 10 : sleepTime );
}
catch ( std::exception& e ) {
log() << "caught exception while doing balance: " << e.what() << endl;
// Just to match the opening statement if in log level 1
LOG(1) << "*** End of balancing round" << endl;
// This round failed, tell the world!
actionLog.setDetails( _buildDetails( true, balanceRoundTimer.millis(),
0, 0, e.what()) );
actionLog.setTime(jsTime());
grid.catalogManager()->logAction(actionLog);
sleepsecs( sleepTime ); // sleep a fair amount b/c of error
continue;
}
}
}
} // namespace mongo