summaryrefslogtreecommitdiff
path: root/src/mongo/s/balance.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/balance.cpp')
-rw-r--r--src/mongo/s/balance.cpp348
1 files changed, 348 insertions, 0 deletions
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp
new file mode 100644
index 00000000000..e1c4b65ca0b
--- /dev/null
+++ b/src/mongo/s/balance.cpp
@@ -0,0 +1,348 @@
+//@file balance.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+
+#include "../db/jsobj.h"
+#include "../db/cmdline.h"
+
+#include "../client/distlock.h"
+
+#include "balance.h"
+#include "server.h"
+#include "shard.h"
+#include "config.h"
+#include "chunk.h"
+#include "grid.h"
+
+namespace mongo {
+
+ Balancer balancer;
+
+ Balancer::Balancer() : _balancedLastTime(0), _policy( new BalancerPolicy() ) {}
+
+ Balancer::~Balancer() {
+ }
+
+ int Balancer::_moveChunks( const vector<CandidateChunkPtr>* candidateChunks ) {
+ int movedCount = 0;
+
+ for ( vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ) {
+ const CandidateChunk& chunkInfo = *it->get();
+
+ DBConfigPtr cfg = grid.getDBConfig( chunkInfo.ns );
+ assert( cfg );
+
+ ChunkManagerPtr cm = cfg->getChunkManager( chunkInfo.ns );
+ assert( cm );
+
+ const BSONObj& chunkToMove = chunkInfo.chunk;
+ ChunkPtr c = cm->findChunk( chunkToMove["min"].Obj() );
+ if ( c->getMin().woCompare( chunkToMove["min"].Obj() ) || c->getMax().woCompare( chunkToMove["max"].Obj() ) ) {
+ // likely a split happened somewhere
+ cm = cfg->getChunkManager( chunkInfo.ns , true /* reload */);
+ assert( cm );
+
+ c = cm->findChunk( chunkToMove["min"].Obj() );
+ if ( c->getMin().woCompare( chunkToMove["min"].Obj() ) || c->getMax().woCompare( chunkToMove["max"].Obj() ) ) {
+ log() << "chunk mismatch after reload, ignoring will retry issue cm: "
+ << c->getMin() << " min: " << chunkToMove["min"].Obj() << endl;
+ continue;
+ }
+ }
+
+ BSONObj res;
+ if ( c->moveAndCommit( Shard::make( chunkInfo.to ) , Chunk::MaxChunkSize , res ) ) {
+ movedCount++;
+ continue;
+ }
+
+ // the move requires acquiring the collection metadata's lock, which can fail
+ log() << "balancer move failed: " << res << " from: " << chunkInfo.from << " to: " << chunkInfo.to
+ << " chunk: " << chunkToMove << endl;
+
+ if ( res["chunkTooBig"].trueValue() ) {
+ // reload just to be safe
+ cm = cfg->getChunkManager( chunkInfo.ns );
+ assert( cm );
+ c = cm->findChunk( chunkToMove["min"].Obj() );
+
+ log() << "forcing a split because migrate failed for size reasons" << endl;
+
+ res = BSONObj();
+ c->singleSplit( true , res );
+ log() << "forced split results: " << res << endl;
+
+ if ( ! res["ok"].trueValue() ) {
+ log() << "marking chunk as jumbo: " << c->toString() << endl;
+ c->markAsJumbo();
+ // we increment moveCount so we do another round right away
+ movedCount++;
+ }
+
+ }
+ }
+
+ return movedCount;
+ }
+
+ void Balancer::_ping( DBClientBase& conn ) {
+ WriteConcern w = conn.getWriteConcern();
+ conn.setWriteConcern( W_NONE );
+
+ conn.update( ShardNS::mongos ,
+ BSON( "_id" << _myid ) ,
+ BSON( "$set" << BSON( "ping" << DATENOW << "up" << (int)(time(0)-_started) ) ) ,
+ true );
+
+ conn.setWriteConcern( w);
+ }
+
+ bool Balancer::_checkOIDs() {
+ vector<Shard> all;
+ Shard::getAllShards( all );
+
+ map<int,Shard> oids;
+
+ for ( vector<Shard>::iterator i=all.begin(); i!=all.end(); ++i ) {
+ Shard s = *i;
+ BSONObj f = s.runCommand( "admin" , "features" );
+ if ( f["oidMachine"].isNumber() ) {
+ int x = f["oidMachine"].numberInt();
+ if ( oids.count(x) == 0 ) {
+ oids[x] = s;
+ }
+ else {
+ log() << "error: 2 machines have " << x << " as oid machine piece " << s.toString() << " and " << oids[x].toString() << endl;
+ s.runCommand( "admin" , BSON( "features" << 1 << "oidReset" << 1 ) );
+ oids[x].runCommand( "admin" , BSON( "features" << 1 << "oidReset" << 1 ) );
+ return false;
+ }
+ }
+ else {
+ log() << "warning: oidMachine not set on: " << s.toString() << endl;
+ }
+ }
+ return true;
+ }
+
+ void Balancer::_doBalanceRound( DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks ) {
+ assert( candidateChunks );
+
+ //
+ // 1. Check whether there is any sharded collection to be balanced by querying
+ // the ShardsNS::collections collection
+ //
+
+ auto_ptr<DBClientCursor> cursor = conn.query( ShardNS::collection , BSONObj() );
+ vector< string > collections;
+ while ( cursor->more() ) {
+ BSONObj col = cursor->nextSafe();
+
+ // sharded collections will have a shard "key".
+ if ( ! col["key"].eoo() )
+ collections.push_back( col["_id"].String() );
+ }
+ cursor.reset();
+
+ if ( collections.empty() ) {
+ LOG(1) << "no collections to balance" << endl;
+ return;
+ }
+
+ //
+ // 2. Get a list of all the shards that are participating in this balance round
+ // along with any maximum allowed quotas and current utilization. We get the
+ // latter by issuing db.serverStatus() (mem.mapped) to all shards.
+ //
+ // TODO: skip unresponsive shards and mark information as stale.
+ //
+
+ vector<Shard> allShards;
+ Shard::getAllShards( allShards );
+ if ( allShards.size() < 2) {
+ LOG(1) << "can't balance without more active shards" << endl;
+ return;
+ }
+
+ map< string, BSONObj > shardLimitsMap;
+ for ( vector<Shard>::const_iterator it = allShards.begin(); it != allShards.end(); ++it ) {
+ const Shard& s = *it;
+ ShardStatus status = s.getStatus();
+
+ BSONObj limitsObj = BSON( ShardFields::maxSize( s.getMaxSize() ) <<
+ LimitsFields::currSize( status.mapped() ) <<
+ ShardFields::draining( s.isDraining() ) <<
+ LimitsFields::hasOpsQueued( status.hasOpsQueued() )
+ );
+
+ shardLimitsMap[ s.getName() ] = limitsObj;
+ }
+
+ //
+ // 3. For each collection, check if the balancing policy recommends moving anything around.
+ //
+
+ for (vector<string>::const_iterator it = collections.begin(); it != collections.end(); ++it ) {
+ const string& ns = *it;
+
+ map< string,vector<BSONObj> > shardToChunksMap;
+ cursor = conn.query( ShardNS::chunk , QUERY( "ns" << ns ).sort( "min" ) );
+ while ( cursor->more() ) {
+ BSONObj chunk = cursor->nextSafe();
+ if ( chunk["jumbo"].trueValue() )
+ continue;
+ vector<BSONObj>& chunks = shardToChunksMap[chunk["shard"].String()];
+ chunks.push_back( chunk.getOwned() );
+ }
+ cursor.reset();
+
+ if (shardToChunksMap.empty()) {
+ LOG(1) << "skipping empty collection (" << ns << ")";
+ continue;
+ }
+
+ for ( vector<Shard>::iterator i=allShards.begin(); i!=allShards.end(); ++i ) {
+ // this just makes sure there is an entry in shardToChunksMap for every shard
+ Shard s = *i;
+ shardToChunksMap[s.getName()].size();
+ }
+
+ CandidateChunk* p = _policy->balance( ns , shardLimitsMap , shardToChunksMap , _balancedLastTime );
+ if ( p ) candidateChunks->push_back( CandidateChunkPtr( p ) );
+ }
+ }
+
+ bool Balancer::_init() {
+ try {
+
+ log() << "about to contact config servers and shards" << endl;
+
+ // contact the config server and refresh shard information
+ // checks that each shard is indeed a different process (no hostname mixup)
+ // these checks are redundant in that they're redone at every new round but we want to do them initially here
+ // so to catch any problem soon
+ Shard::reloadShardInfo();
+ _checkOIDs();
+
+ log() << "config servers and shards contacted successfully" << endl;
+
+ StringBuilder buf;
+ buf << getHostNameCached() << ":" << cmdLine.port;
+ _myid = buf.str();
+ _started = time(0);
+
+ log() << "balancer id: " << _myid << " started at " << time_t_to_String_short(_started) << endl;
+
+ return true;
+
+ }
+ catch ( std::exception& e ) {
+ warning() << "could not initialize balancer, please check that all shards and config servers are up: " << e.what() << endl;
+ return false;
+
+ }
+ }
+
+ void Balancer::run() {
+
+ // this is the body of a BackgroundJob so if we throw here we're basically ending the balancer thread prematurely
+ while ( ! inShutdown() ) {
+
+ if ( ! _init() ) {
+ log() << "will retry to initialize balancer in one minute" << endl;
+ sleepsecs( 60 );
+ continue;
+ }
+
+ break;
+ }
+
+ // getConnectioString and dist lock constructor does not throw, which is what we expect on while
+ // on the balancer thread
+ ConnectionString config = configServer.getConnectionString();
+ DistributedLock balanceLock( config , "balancer" );
+
+ while ( ! inShutdown() ) {
+
+ try {
+
+ ScopedDbConnection conn( config );
+
+ // ping has to be first so we keep things in the config server in sync
+ _ping( conn.conn() );
+
+ // now make sure we should even be running
+ if ( ! grid.shouldBalance() ) {
+ LOG(1) << "skipping balancing round because balancing is disabled" << endl;
+ conn.done();
+
+ sleepsecs( 30 );
+ continue;
+ }
+
+ uassert( 13258 , "oids broken after resetting!" , _checkOIDs() );
+
+ // use fresh shard state
+ Shard::reloadShardInfo();
+
+ // refresh chunk size (even though another balancer might be active)
+ Chunk::refreshChunkSize();
+
+ {
+ dist_lock_try lk( &balanceLock , "doing balance round" );
+ if ( ! lk.got() ) {
+ LOG(1) << "skipping balancing round because another balancer is active" << endl;
+ conn.done();
+
+ sleepsecs( 30 ); // no need to wake up soon
+ continue;
+ }
+
+ LOG(1) << "*** start balancing round" << endl;
+
+ vector<CandidateChunkPtr> candidateChunks;
+ _doBalanceRound( conn.conn() , &candidateChunks );
+ if ( candidateChunks.size() == 0 ) {
+ LOG(1) << "no need to move any chunk" << endl;
+ }
+ else {
+ _balancedLastTime = _moveChunks( &candidateChunks );
+ }
+
+ LOG(1) << "*** end of balancing round" << endl;
+ }
+
+ conn.done();
+
+ sleepsecs( _balancedLastTime ? 5 : 10 );
+ }
+ catch ( std::exception& e ) {
+ log() << "caught exception while doing balance: " << e.what() << endl;
+
+ // Just to match the opening statement if in log level 1
+ LOG(1) << "*** End of balancing round" << endl;
+
+ sleepsecs( 30 ); // sleep a fair amount b/c of error
+ continue;
+ }
+ }
+
+ }
+
+} // namespace mongo