//@file balance.cpp /** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "pch.h" #include "../db/jsobj.h" #include "../db/cmdline.h" #include "../client/distlock.h" #include "mongo/client/dbclientcursor.h" #include "balance.h" #include "server.h" #include "shard.h" #include "config.h" #include "chunk.h" #include "grid.h" namespace mongo { Balancer balancer; Balancer::Balancer() : _balancedLastTime(0), _policy( new BalancerPolicy() ) {} Balancer::~Balancer() { } int Balancer::_moveChunks( const vector* candidateChunks , bool secondaryThrottle ) { int movedCount = 0; for ( vector::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ) { const CandidateChunk& chunkInfo = *it->get(); DBConfigPtr cfg = grid.getDBConfig( chunkInfo.ns ); verify( cfg ); ChunkManagerPtr cm = cfg->getChunkManager( chunkInfo.ns ); verify( cm ); ChunkPtr c = cm->findChunk( chunkInfo.chunk.min ); if ( c->getMin().woCompare( chunkInfo.chunk.min ) || c->getMax().woCompare( chunkInfo.chunk.max ) ) { // likely a split happened somewhere cm = cfg->getChunkManager( chunkInfo.ns , true /* reload */); verify( cm ); c = cm->findChunk( chunkInfo.chunk.min ); if ( c->getMin().woCompare( chunkInfo.chunk.min ) || c->getMax().woCompare( chunkInfo.chunk.max ) ) { log() << "chunk mismatch after reload, ignoring will retry issue " << chunkInfo.chunk.toString() << endl; continue; } } BSONObj res; if ( c->moveAndCommit( Shard::make( chunkInfo.to ) , Chunk::MaxChunkSize , secondaryThrottle , res ) ) { movedCount++; continue; } // the move requires acquiring the collection metadata's lock, which can fail log() << "balancer move failed: " << res << " from: " << chunkInfo.from << " to: " << chunkInfo.to << " chunk: " << chunkInfo.chunk << endl; if ( res["chunkTooBig"].trueValue() ) { // reload just to be safe cm = cfg->getChunkManager( chunkInfo.ns ); verify( cm ); c = cm->findChunk( chunkInfo.chunk.min ); log() << "forcing a split because migrate failed for size reasons" << endl; res = BSONObj(); c->singleSplit( true , res ); log() << "forced split results: " << res << endl; if ( ! res["ok"].trueValue() ) { log() << "marking chunk as jumbo: " << c->toString() << endl; c->markAsJumbo(); // we increment moveCount so we do another round right away movedCount++; } } } return movedCount; } void Balancer::_ping( DBClientBase& conn, bool waiting ) { WriteConcern w = conn.getWriteConcern(); conn.setWriteConcern( W_NONE ); conn.update( ShardNS::mongos , BSON( "_id" << _myid ) , BSON( "$set" << BSON( "ping" << DATENOW << "up" << (int)(time(0)-_started) << "waiting" << waiting ) ) , true ); conn.setWriteConcern( w); } bool Balancer::_checkOIDs() { vector all; Shard::getAllShards( all ); map oids; for ( vector::iterator i=all.begin(); i!=all.end(); ++i ) { Shard s = *i; BSONObj f = s.runCommand( "admin" , "features" , true ); if ( f["oidMachine"].isNumber() ) { int x = f["oidMachine"].numberInt(); if ( oids.count(x) == 0 ) { oids[x] = s; } else { log() << "error: 2 machines have " << x << " as oid machine piece " << s.toString() << " and " << oids[x].toString() << endl; s.runCommand( "admin" , BSON( "features" << 1 << "oidReset" << 1 ) , true ); oids[x].runCommand( "admin" , BSON( "features" << 1 << "oidReset" << 1 ) , true ); return false; } } else { log() << "warning: oidMachine not set on: " << s.toString() << endl; } } return true; } void Balancer::_doBalanceRound( DBClientBase& conn, vector* candidateChunks ) { verify( candidateChunks ); // // 1. Check whether there is any sharded collection to be balanced by querying // the ShardsNS::collections collection // auto_ptr cursor = conn.query( ShardNS::collection , BSONObj() ); vector< string > collections; while ( cursor->more() ) { BSONObj col = cursor->nextSafe(); // sharded collections will have a shard "key". if ( ! col["key"].eoo() && ! col["noBalance"].trueValue() ){ collections.push_back( col["_id"].String() ); } else if( col["noBalance"].trueValue() ){ LOG(1) << "not balancing collection " << col["_id"].String() << ", explicitly disabled" << endl; } } cursor.reset(); if ( collections.empty() ) { LOG(1) << "no collections to balance" << endl; return; } // // 2. Get a list of all the shards that are participating in this balance round // along with any maximum allowed quotas and current utilization. We get the // latter by issuing db.serverStatus() (mem.mapped) to all shards. // // TODO: skip unresponsive shards and mark information as stale. // vector allShards; Shard::getAllShards( allShards ); if ( allShards.size() < 2) { LOG(1) << "can't balance without more active shards" << endl; return; } ShardInfoMap shardInfo; for ( vector::const_iterator it = allShards.begin(); it != allShards.end(); ++it ) { const Shard& s = *it; ShardStatus status = s.getStatus(); shardInfo[ s.getName() ] = ShardInfo( s.getMaxSize(), status.mapped(), s.isDraining(), status.hasOpsQueued(), s.tags() ); } // // 3. For each collection, check if the balancing policy recommends moving anything around. // for (vector::const_iterator it = collections.begin(); it != collections.end(); ++it ) { const string& ns = *it; map< string,vector > shardToChunksMap; cursor = conn.query( ShardNS::chunk , QUERY( "ns" << ns ).sort( "min" ) ); while ( cursor->more() ) { BSONObj chunk = cursor->nextSafe(); if ( chunk["jumbo"].trueValue() ) continue; vector& chunks = shardToChunksMap[chunk["shard"].String()]; chunks.push_back( chunk.getOwned() ); } cursor.reset(); if (shardToChunksMap.empty()) { LOG(1) << "skipping empty collection (" << ns << ")"; continue; } for ( vector::iterator i=allShards.begin(); i!=allShards.end(); ++i ) { // this just makes sure there is an entry in shardToChunksMap for every shard Shard s = *i; shardToChunksMap[s.getName()].size(); } DistributionStatus status( shardInfo, shardToChunksMap ); // load tags conn.ensureIndex( ShardNS::tags, BSON( "ns" << 1 << "min" << 1 ), true ); cursor = conn.query( ShardNS::tags , QUERY( "ns" << ns ).sort( "min" ) ); while ( cursor->more() ) { BSONObj tag = cursor->nextSafe(); uassert( 16356 , str::stream() << "tag ranges not valid for: " << ns , status.addTagRange( TagRange( tag["min"].Obj().getOwned(), tag["max"].Obj().getOwned(), tag["tag"].String() ) ) ); } cursor.reset(); CandidateChunk* p = _policy->balance( ns, status, _balancedLastTime ); if ( p ) candidateChunks->push_back( CandidateChunkPtr( p ) ); } } bool Balancer::_init() { try { log() << "about to contact config servers and shards" << endl; // contact the config server and refresh shard information // checks that each shard is indeed a different process (no hostname mixup) // these checks are redundant in that they're redone at every new round but we want to do them initially here // so to catch any problem soon Shard::reloadShardInfo(); _checkOIDs(); log() << "config servers and shards contacted successfully" << endl; StringBuilder buf; buf << getHostNameCached() << ":" << cmdLine.port; _myid = buf.str(); _started = time(0); log() << "balancer id: " << _myid << " started at " << time_t_to_String_short(_started) << endl; return true; } catch ( std::exception& e ) { warning() << "could not initialize balancer, please check that all shards and config servers are up: " << e.what() << endl; return false; } } void Balancer::run() { // this is the body of a BackgroundJob so if we throw here we're basically ending the balancer thread prematurely while ( ! inShutdown() ) { if ( ! _init() ) { log() << "will retry to initialize balancer in one minute" << endl; sleepsecs( 60 ); continue; } break; } int sleepTime = 30; // getConnectioString and dist lock constructor does not throw, which is what we expect on while // on the balancer thread ConnectionString config = configServer.getConnectionString(); DistributedLock balanceLock( config , "balancer" ); while ( ! inShutdown() ) { try { scoped_ptr connPtr( ScopedDbConnection::getInternalScopedDbConnection( config.toString() ) ); ScopedDbConnection& conn = *connPtr; // ping has to be first so we keep things in the config server in sync _ping( conn.conn() ); // use fresh shard state Shard::reloadShardInfo(); // refresh chunk size (even though another balancer might be active) Chunk::refreshChunkSize(); BSONObj balancerConfig; // now make sure we should even be running if ( ! grid.shouldBalance( "", &balancerConfig ) ) { LOG(1) << "skipping balancing round because balancing is disabled" << endl; // Ping again so scripts can determine if we're active without waiting _ping( conn.conn(), true ); conn.done(); sleepsecs( sleepTime ); continue; } sleepTime = balancerConfig["_nosleep"].trueValue() ? 30 : 6; uassert( 13258 , "oids broken after resetting!" , _checkOIDs() ); { dist_lock_try lk( &balanceLock , "doing balance round" ); if ( ! lk.got() ) { LOG(1) << "skipping balancing round because another balancer is active" << endl; // Ping again so scripts can determine if we're active without waiting _ping( conn.conn(), true ); conn.done(); sleepsecs( sleepTime ); // no need to wake up soon continue; } LOG(1) << "*** start balancing round" << endl; vector candidateChunks; _doBalanceRound( conn.conn() , &candidateChunks ); if ( candidateChunks.size() == 0 ) { LOG(1) << "no need to move any chunk" << endl; _balancedLastTime = 0; } else { _balancedLastTime = _moveChunks( &candidateChunks, balancerConfig["_secondaryThrottle"].trueValue() ); } LOG(1) << "*** end of balancing round" << endl; } // Ping again so scripts can determine if we're active without waiting _ping( conn.conn(), true ); conn.done(); sleepsecs( _balancedLastTime ? sleepTime / 6 : sleepTime ); } catch ( std::exception& e ) { log() << "caught exception while doing balance: " << e.what() << endl; // Just to match the opening statement if in log level 1 LOG(1) << "*** End of balancing round" << endl; sleepsecs( sleepTime ); // sleep a fair amount b/c of error continue; } } } } // namespace mongo