summaryrefslogtreecommitdiff
path: root/s/balance.cpp
diff options
context:
space:
mode:
authorAlberto Lerner <alerner@10gen.com>2010-06-05 16:17:59 -0400
committerAlberto Lerner <alerner@10gen.com>2010-06-05 16:17:59 -0400
commite9ac0d3a9f791ee026112e54b2ddf2da05fb09e2 (patch)
tree10a847f69bfdae5cde8a766a86f454e521e23ad8 /s/balance.cpp
parent125e11d87a6a47157e9bd771fb0f516efbf5e92c (diff)
downloadmongo-e9ac0d3a9f791ee026112e54b2ddf2da05fb09e2.tar.gz
SERVER-950 Pass space usage and limits information the to balancer policy
Diffstat (limited to 's/balance.cpp')
-rw-r--r--s/balance.cpp122
1 files changed, 112 insertions, 10 deletions
diff --git a/s/balance.cpp b/s/balance.cpp
index 7c5de4d732f..16abf7a8b89 100644
--- a/s/balance.cpp
+++ b/s/balance.cpp
@@ -31,7 +31,7 @@ namespace mongo {
Balancer balancer;
- Balancer::Balancer() : _policy( new BalancerPolicy ){}
+ Balancer::Balancer() : _balancedLastTime(0), _policy( new BalancerPolicy ){}
Balancer::~Balancer() {
delete _policy;
@@ -88,11 +88,11 @@ namespace mongo {
return _myid == x["who"].String() && incarnation == x["x"].OID();
}
- int Balancer::_moveChunks( const vector<BalancerPolicy::ChunkInfoPtr>* toBalance ) {
+ int Balancer::_moveChunks( const vector<CandidateChunkPtr>* candidateChunks ) {
int movedCount = 0;
- for ( vector<BalancerPolicy::ChunkInfoPtr>::const_iterator it = toBalance->begin(); it != toBalance->end(); ++it ){
- const BalancerPolicy::ChunkInfo& chunkInfo = *it->get();
+ for ( vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ){
+ const CandidateChunk& chunkInfo = *it->get();
DBConfigPtr cfg = grid.getDBConfig( chunkInfo.ns );
assert( cfg );
@@ -181,6 +181,108 @@ namespace mongo {
return true;
}
+ void Balancer::_doBalanceRound( DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks ){
+ assert( candidateChunks );
+
+ log(1) << "balancer: start balancing round" << endl;
+
+ //
+ // 1. Check whether there is any sharded collection to be balanced by querying
+ // the ShardsNS::database collection
+ //
+ // { "_id" : "test", "partitioned" : true, "primary" : "shard0",
+ // "sharded" : {
+ // "test.images" : { "key" : { "_id" : 1 }, "unique" : false },
+ // ...
+ // }
+ // }
+ //
+
+ auto_ptr<DBClientCursor> cursor = conn.query( ShardNS::database , BSON( "partitioned" << true ) );
+ vector< string > collections;
+ while ( cursor->more() ){
+ BSONObj db = cursor->next();
+
+ // A database may be partitioned but not yet have a sharded collection.
+ // 'cursor' will point to docs that do not contain the "sharded" key. Since
+ // there'd be nothing to balance, we want to skip those here.
+
+ BSONElement shardedColls = db["sharded"];
+ if ( shardedColls.eoo() ){
+ log(2) << "balancer: skipping database with no sharded collection ("
+ << db["_id"].str() << ")" << endl;
+ continue;
+ }
+
+ BSONObjIterator i( shardedColls.Obj() );
+ while ( i.more() ){
+ BSONElement e = i.next();
+ collections.push_back( e.fieldName() );
+ }
+ }
+ cursor.reset();
+
+ if ( collections.empty() ) {
+ log(1) << "balancer: no collections to balance" << endl;
+ return;
+ }
+
+ //
+ // 2. Get a list of all the shards that are participating in this balance round
+ // along with any maximum allowed quotas and current utilization. We get the
+ // latter by issuing db.serverStatus() (mem.mapped) to all shards.
+ //
+ // TODO: issue serverStatus() and get the mem.mapped section back. For now,
+ // let's assume zero usage.
+ // TODO: skip unresponsive shards and mark information as stale.
+ //
+
+ vector<Shard> allShards;
+ Shard::getAllShards( allShards );
+ if ( allShards.size() < 2) {
+ log(1) << "balancer: can't balance without more active shards" << endl;
+ return;
+ }
+
+ map< string, BSONObj > shardLimitsMap;
+ for ( vector<Shard>::const_iterator it = allShards.begin(); it != allShards.end(); ++it ){
+ const Shard& s = *it;
+ BSONObj limitsObj = BSON( "maxSize" << 0 << "currSize" << 0 /* TODO */);
+ shardLimitsMap[s.getName()] = limitsObj;
+ }
+
+ //
+ // 3. For each collection, check if the balancing policy recommends moving anything around.
+ //
+
+ for (vector<string>::const_iterator it = collections.begin(); it != collections.end(); ++it ) {
+ const string& ns = *it;
+
+ map< string,vector<BSONObj> > shardToChunksMap;
+ cursor = conn.query( ShardNS::chunk , QUERY( "ns" << ns ).sort( "min" ) );
+ while ( cursor->more() ){
+ BSONObj chunk = cursor->next();
+ vector<BSONObj>& chunks = shardToChunksMap[chunk["shard"].String()];
+ chunks.push_back( chunk.getOwned() );
+ }
+ cursor.reset();
+
+ if (shardToChunksMap.empty()) {
+ log(1) << "balancer: skipping empty collection (" << ns << ")";
+ continue;
+ }
+
+ for ( vector<Shard>::iterator i=allShards.begin(); i!=allShards.end(); ++i ){
+ // this just makes sure there is an entry in shardToChunksMap for every shard
+ Shard s = *i;
+ shardToChunksMap[s.getName()].size();
+ }
+
+ CandidateChunk* p = _policy->balance( ns , shardLimitsMap , shardToChunksMap , _balancedLastTime );
+ if ( p ) candidateChunks->push_back( CandidateChunkPtr( p ) );
+ }
+ }
+
void Balancer::run(){
{ // init stuff, don't want to do at static init
@@ -206,12 +308,13 @@ namespace mongo {
uassert( 13258 , "oids broken after resetting!" , _checkOIDs() );
}
- vector<BalancerPolicy::ChunkInfoPtr> toBalance;
+ vector<CandidateChunkPtr> candidateChunks;
if ( _shouldIBalance( conn.conn() ) ){
- _policy->balance( conn.conn(), &toBalance );
- if ( toBalance.size() > 0 ) {
- int moves = _moveChunks( &toBalance );
- _policy->setBalancedLastTime( moves );
+ candidateChunks.clear();
+ _doBalanceRound( conn.conn() , &candidateChunks );
+
+ if ( candidateChunks.size() > 0 ) {
+ _balancedLastTime = _moveChunks( &candidateChunks );
}
}
@@ -221,7 +324,6 @@ namespace mongo {
log() << "caught exception while doing balance: " << e.what() << endl;
continue;
}
-
}
}