diff options
author | Alberto Lerner <alerner@10gen.com> | 2010-06-05 16:17:59 -0400 |
---|---|---|
committer | Alberto Lerner <alerner@10gen.com> | 2010-06-05 16:17:59 -0400 |
commit | e9ac0d3a9f791ee026112e54b2ddf2da05fb09e2 (patch) | |
tree | 10a847f69bfdae5cde8a766a86f454e521e23ad8 /s/balance.cpp | |
parent | 125e11d87a6a47157e9bd771fb0f516efbf5e92c (diff) | |
download | mongo-e9ac0d3a9f791ee026112e54b2ddf2da05fb09e2.tar.gz |
SERVER-950 Pass space usage and limits information the to balancer policy
Diffstat (limited to 's/balance.cpp')
-rw-r--r-- | s/balance.cpp | 122 |
1 files changed, 112 insertions, 10 deletions
diff --git a/s/balance.cpp b/s/balance.cpp index 7c5de4d732f..16abf7a8b89 100644 --- a/s/balance.cpp +++ b/s/balance.cpp @@ -31,7 +31,7 @@ namespace mongo { Balancer balancer; - Balancer::Balancer() : _policy( new BalancerPolicy ){} + Balancer::Balancer() : _balancedLastTime(0), _policy( new BalancerPolicy ){} Balancer::~Balancer() { delete _policy; @@ -88,11 +88,11 @@ namespace mongo { return _myid == x["who"].String() && incarnation == x["x"].OID(); } - int Balancer::_moveChunks( const vector<BalancerPolicy::ChunkInfoPtr>* toBalance ) { + int Balancer::_moveChunks( const vector<CandidateChunkPtr>* candidateChunks ) { int movedCount = 0; - for ( vector<BalancerPolicy::ChunkInfoPtr>::const_iterator it = toBalance->begin(); it != toBalance->end(); ++it ){ - const BalancerPolicy::ChunkInfo& chunkInfo = *it->get(); + for ( vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ){ + const CandidateChunk& chunkInfo = *it->get(); DBConfigPtr cfg = grid.getDBConfig( chunkInfo.ns ); assert( cfg ); @@ -181,6 +181,108 @@ namespace mongo { return true; } + void Balancer::_doBalanceRound( DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks ){ + assert( candidateChunks ); + + log(1) << "balancer: start balancing round" << endl; + + // + // 1. Check whether there is any sharded collection to be balanced by querying + // the ShardsNS::database collection + // + // { "_id" : "test", "partitioned" : true, "primary" : "shard0", + // "sharded" : { + // "test.images" : { "key" : { "_id" : 1 }, "unique" : false }, + // ... + // } + // } + // + + auto_ptr<DBClientCursor> cursor = conn.query( ShardNS::database , BSON( "partitioned" << true ) ); + vector< string > collections; + while ( cursor->more() ){ + BSONObj db = cursor->next(); + + // A database may be partitioned but not yet have a sharded collection. + // 'cursor' will point to docs that do not contain the "sharded" key. Since + // there'd be nothing to balance, we want to skip those here. + + BSONElement shardedColls = db["sharded"]; + if ( shardedColls.eoo() ){ + log(2) << "balancer: skipping database with no sharded collection (" + << db["_id"].str() << ")" << endl; + continue; + } + + BSONObjIterator i( shardedColls.Obj() ); + while ( i.more() ){ + BSONElement e = i.next(); + collections.push_back( e.fieldName() ); + } + } + cursor.reset(); + + if ( collections.empty() ) { + log(1) << "balancer: no collections to balance" << endl; + return; + } + + // + // 2. Get a list of all the shards that are participating in this balance round + // along with any maximum allowed quotas and current utilization. We get the + // latter by issuing db.serverStatus() (mem.mapped) to all shards. + // + // TODO: issue serverStatus() and get the mem.mapped section back. For now, + // let's assume zero usage. + // TODO: skip unresponsive shards and mark information as stale. + // + + vector<Shard> allShards; + Shard::getAllShards( allShards ); + if ( allShards.size() < 2) { + log(1) << "balancer: can't balance without more active shards" << endl; + return; + } + + map< string, BSONObj > shardLimitsMap; + for ( vector<Shard>::const_iterator it = allShards.begin(); it != allShards.end(); ++it ){ + const Shard& s = *it; + BSONObj limitsObj = BSON( "maxSize" << 0 << "currSize" << 0 /* TODO */); + shardLimitsMap[s.getName()] = limitsObj; + } + + // + // 3. For each collection, check if the balancing policy recommends moving anything around. + // + + for (vector<string>::const_iterator it = collections.begin(); it != collections.end(); ++it ) { + const string& ns = *it; + + map< string,vector<BSONObj> > shardToChunksMap; + cursor = conn.query( ShardNS::chunk , QUERY( "ns" << ns ).sort( "min" ) ); + while ( cursor->more() ){ + BSONObj chunk = cursor->next(); + vector<BSONObj>& chunks = shardToChunksMap[chunk["shard"].String()]; + chunks.push_back( chunk.getOwned() ); + } + cursor.reset(); + + if (shardToChunksMap.empty()) { + log(1) << "balancer: skipping empty collection (" << ns << ")"; + continue; + } + + for ( vector<Shard>::iterator i=allShards.begin(); i!=allShards.end(); ++i ){ + // this just makes sure there is an entry in shardToChunksMap for every shard + Shard s = *i; + shardToChunksMap[s.getName()].size(); + } + + CandidateChunk* p = _policy->balance( ns , shardLimitsMap , shardToChunksMap , _balancedLastTime ); + if ( p ) candidateChunks->push_back( CandidateChunkPtr( p ) ); + } + } + void Balancer::run(){ { // init stuff, don't want to do at static init @@ -206,12 +308,13 @@ namespace mongo { uassert( 13258 , "oids broken after resetting!" , _checkOIDs() ); } - vector<BalancerPolicy::ChunkInfoPtr> toBalance; + vector<CandidateChunkPtr> candidateChunks; if ( _shouldIBalance( conn.conn() ) ){ - _policy->balance( conn.conn(), &toBalance ); - if ( toBalance.size() > 0 ) { - int moves = _moveChunks( &toBalance ); - _policy->setBalancedLastTime( moves ); + candidateChunks.clear(); + _doBalanceRound( conn.conn() , &candidateChunks ); + + if ( candidateChunks.size() > 0 ) { + _balancedLastTime = _moveChunks( &candidateChunks ); } } @@ -221,7 +324,6 @@ namespace mongo { log() << "caught exception while doing balance: " << e.what() << endl; continue; } - } } |