// balance.cpp /** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "pch.h" #include "../db/jsobj.h" #include "../db/cmdline.h" #include "balance.h" #include "server.h" #include "shard.h" #include "config.h" #include "chunk.h" namespace mongo { Balancer balancer; Balancer::Balancer() : _balancedLastTime(0), _policy( new BalancerPolicy ){} Balancer::~Balancer() { delete _policy; } bool Balancer::_shouldIBalance( DBClientBase& conn ){ BSONObj x = conn.findOne( ShardNS::settings , BSON( "_id" << "balancer" ) ); log(2) << "balancer: " << x << endl; if ( ! x.isEmpty() ){ if ( x["who"].String() == _myid ){ log(2) << "balancer: i'm the current balancer" << endl; return true; } BSONObj other = conn.findOne( ShardNS::mongos , x["who"].wrap( "_id" ) ); massert( 13125 , (string)"can't find mongos: " + x["who"].String() , ! other.isEmpty() ); int secsSincePing = (int)(( jsTime() - other["ping"].Date() ) / 1000 ); log(2) << "current balancer is: " << other << " ping delay(secs): " << secsSincePing << endl; if ( secsSincePing < ( 60 * 10 ) ){ return false; } log() << "balancer: going to take over" << endl; // we want to take over, so fall through to below } // Taking over means replacing 'who' with this balancer's address. Note that // to avoid any races, we use a compare-and-set strategy relying on the // incarnation of the previous balancer (the key 'x'). OID incarnation; incarnation.init(); BSONObjBuilder updateQuery; updateQuery.append( "_id" , "balancer" ); if ( x["x"].type() ) updateQuery.append( x["x"] ); else updateQuery.append( "x" , BSON( "$exists" << false ) ); conn.update( ShardNS::settings , updateQuery.obj() , BSON( "$set" << BSON( "who" << _myid << "x" << incarnation ) ) , true ); // If another balancer beats this one to the punch, the following query will see // the incarnation for that other guy. x = conn.findOne( ShardNS::settings , BSON( "_id" << "balancer" ) ); log() << "balancer: after update: " << x << endl; return _myid == x["who"].String() && incarnation == x["x"].OID(); } int Balancer::_moveChunks( const vector* candidateChunks ) { int movedCount = 0; for ( vector::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ){ const CandidateChunk& chunkInfo = *it->get(); DBConfigPtr cfg = grid.getDBConfig( chunkInfo.ns ); assert( cfg ); ChunkManagerPtr cm = cfg->getChunkManager( chunkInfo.ns ); assert( cm ); const BSONObj& chunkToMove = chunkInfo.chunk; ChunkPtr c = cm->findChunk( chunkToMove["min"].Obj() ); if ( c->getMin().woCompare( chunkToMove["min"].Obj() ) ){ // likely a split happened somewhere cm = cfg->getChunkManager( chunkInfo.ns , true ); assert( cm ); c = cm->findChunk( chunkToMove["min"].Obj() ); if ( c->getMin().woCompare( chunkToMove["min"].Obj() ) ){ log() << "balancer: chunk mismatch after reload, ignoring will retry issue cm: " << c->getMin() << " min: " << chunkToMove["min"].Obj() << endl; continue; } } string errmsg; if ( c->moveAndCommit( Shard::make( chunkInfo.to ) , errmsg ) ){ movedCount++; continue; } log() << "balancer: MOVE FAILED **** " << errmsg << "\n" << " from: " << chunkInfo.from << " to: " << " chunk: " << chunkToMove << endl; } return movedCount; } void Balancer::_ping(){ assert( _myid.size() && _started ); try { ScopedDbConnection conn( configServer.getPrimary() ); _ping( conn.conn() ); conn.done(); } catch ( std::exception& e ){ log() << "bare ping failed: " << e.what() << endl; } } void Balancer::_ping( DBClientBase& conn ){ WriteConcern w = conn.getWriteConcern(); conn.setWriteConcern( W_NONE ); conn.update( ShardNS::mongos , BSON( "_id" << _myid ) , BSON( "$set" << BSON( "ping" << DATENOW << "up" << (int)(time(0)-_started) ) ) , true ); conn.setWriteConcern( w); } bool Balancer::_checkOIDs(){ vector all; Shard::getAllShards( all ); map oids; for ( vector::iterator i=all.begin(); i!=all.end(); ++i ){ Shard s = *i; BSONObj f = s.runCommand( "admin" , "features" ); if ( f["oidMachine"].isNumber() ){ int x = f["oidMachine"].numberInt(); if ( oids.count(x) == 0 ){ oids[x] = s; } else { log() << "error: 2 machines have " << x << " as oid machine piece " << s.toString() << " and " << oids[x].toString() << endl; s.runCommand( "admin" , BSON( "features" << 1 << "oidReset" << 1 ) ); oids[x].runCommand( "admin" , BSON( "features" << 1 << "oidReset" << 1 ) ); return false; } } else { log() << "warning: oidMachine not set on: " << s.toString() << endl; } } return true; } void Balancer::_doBalanceRound( DBClientBase& conn, vector* candidateChunks ){ assert( candidateChunks ); // // 1. Check whether there is any sharded collection to be balanced by querying // the ShardsNS::database collection // // { "_id" : "test", "partitioned" : true, "primary" : "shard0", // "sharded" : { // "test.images" : { "key" : { "_id" : 1 }, "unique" : false }, // ... // } // } // auto_ptr cursor = conn.query( ShardNS::database , BSON( "partitioned" << true ) ); vector< string > collections; while ( cursor->more() ){ BSONObj db = cursor->next(); // A database may be partitioned but not yet have a sharded collection. // 'cursor' will point to docs that do not contain the "sharded" key. Since // there'd be nothing to balance, we want to skip those here. BSONElement shardedColls = db["sharded"]; if ( shardedColls.eoo() ){ log(2) << "balancer: skipping database with no sharded collection (" << db["_id"].str() << ")" << endl; continue; } BSONObjIterator i( shardedColls.Obj() ); while ( i.more() ){ BSONElement e = i.next(); collections.push_back( e.fieldName() ); } } cursor.reset(); if ( collections.empty() ) { log(1) << "balancer: no collections to balance" << endl; return; } // // 2. Get a list of all the shards that are participating in this balance round // along with any maximum allowed quotas and current utilization. We get the // latter by issuing db.serverStatus() (mem.mapped) to all shards. // // TODO: skip unresponsive shards and mark information as stale. // vector allShards; Shard::getAllShards( allShards ); if ( allShards.size() < 2) { log(1) << "balancer: can't balance without more active shards" << endl; return; } map< string, BSONObj > shardLimitsMap; for ( vector::const_iterator it = allShards.begin(); it != allShards.end(); ++it ){ const Shard& s = *it; ShardStatus status = s.getStatus(); BSONObj limitsObj = BSON( "maxSize" << s.getMaxSize() << "currSize" << status.mapped() ); shardLimitsMap[s.getName()] = limitsObj; } // // 3. For each collection, check if the balancing policy recommends moving anything around. // for (vector::const_iterator it = collections.begin(); it != collections.end(); ++it ) { const string& ns = *it; map< string,vector > shardToChunksMap; cursor = conn.query( ShardNS::chunk , QUERY( "ns" << ns ).sort( "min" ) ); while ( cursor->more() ){ BSONObj chunk = cursor->next(); vector& chunks = shardToChunksMap[chunk["shard"].String()]; chunks.push_back( chunk.getOwned() ); } cursor.reset(); if (shardToChunksMap.empty()) { log(1) << "balancer: skipping empty collection (" << ns << ")"; continue; } for ( vector::iterator i=allShards.begin(); i!=allShards.end(); ++i ){ // this just makes sure there is an entry in shardToChunksMap for every shard Shard s = *i; shardToChunksMap[s.getName()].size(); } CandidateChunk* p = _policy->balance( ns , shardLimitsMap , shardToChunksMap , _balancedLastTime ); if ( p ) candidateChunks->push_back( CandidateChunkPtr( p ) ); } } void Balancer::run(){ { // init stuff, don't want to do at static init StringBuilder buf; buf << ourHostname << ":" << cmdLine.port; _myid = buf.str(); log(1) << "balancer myid: " << _myid << endl; _started = time(0); Shard::reloadShardInfo(); } _ping(); _checkOIDs(); while ( ! inShutdown() ){ sleepsecs( 10 ); try { ScopedDbConnection conn( configServer.getPrimary() ); _ping( conn.conn() ); if ( ! _checkOIDs() ){ uassert( 13258 , "oids broken after resetting!" , _checkOIDs() ); } vector candidateChunks; if ( _shouldIBalance( conn.conn() ) ){ log(1) << "balancer: start balancing round" << endl; candidateChunks.clear(); _doBalanceRound( conn.conn() , &candidateChunks ); if ( candidateChunks.size() == 0 ) { log(1) << "balancer: no need to move any chunk" << endl; } else { _balancedLastTime = _moveChunks( &candidateChunks ); log(1) << "balancer: end balancing round" << endl; } } conn.done(); } catch ( std::exception& e ){ log() << "caught exception while doing balance: " << e.what() << endl; continue; } } } } // namespace mongo