diff options
Diffstat (limited to 'src/mongo/s/balance.cpp')
-rw-r--r-- | src/mongo/s/balance.cpp | 1023 |
1 files changed, 500 insertions, 523 deletions
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index 9203f913656..fd78d6195fe 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -61,652 +61,629 @@ namespace mongo { - using boost::scoped_ptr; - using std::auto_ptr; - using std::endl; - using std::map; - using std::set; - using std::string; - using std::vector; - - MONGO_FP_DECLARE(skipBalanceRound); - - Balancer balancer; - - Balancer::Balancer() : _balancedLastTime(0), _policy( new BalancerPolicy() ) {} - - Balancer::~Balancer() { - } - - int Balancer::_moveChunks(const vector<CandidateChunkPtr>* candidateChunks, - const WriteConcernOptions* writeConcern, - bool waitForDelete) - { - int movedCount = 0; - - for ( vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ) { +using boost::scoped_ptr; +using std::auto_ptr; +using std::endl; +using std::map; +using std::set; +using std::string; +using std::vector; + +MONGO_FP_DECLARE(skipBalanceRound); + +Balancer balancer; + +Balancer::Balancer() : _balancedLastTime(0), _policy(new BalancerPolicy()) {} + +Balancer::~Balancer() {} + +int Balancer::_moveChunks(const vector<CandidateChunkPtr>* candidateChunks, + const WriteConcernOptions* writeConcern, + bool waitForDelete) { + int movedCount = 0; + + for (vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin(); + it != candidateChunks->end(); + ++it) { + // If the balancer was disabled since we started this round, don't start new + // chunks moves. + SettingsType balancerConfig; + std::string errMsg; + + if (!grid.getBalancerSettings(&balancerConfig, &errMsg)) { + warning() << errMsg; + // No point in continuing the round if the config servers are unreachable. + return movedCount; + } - // If the balancer was disabled since we started this round, don't start new - // chunks moves. - SettingsType balancerConfig; - std::string errMsg; + if ((balancerConfig.isKeySet() && // balancer config doc exists + !grid.shouldBalance(balancerConfig)) || + MONGO_FAIL_POINT(skipBalanceRound)) { + LOG(1) << "Stopping balancing round early as balancing was disabled"; + return movedCount; + } - if (!grid.getBalancerSettings(&balancerConfig, &errMsg)) { - warning() << errMsg; - // No point in continuing the round if the config servers are unreachable. - return movedCount; + // Changes to metadata, borked metadata, and connectivity problems between shards should + // cause us to abort this chunk move, but shouldn't cause us to abort the entire round + // of chunks. + // TODO(spencer): We probably *should* abort the whole round on issues communicating + // with the config servers, but its impossible to distinguish those types of failures + // at the moment. + // TODO: Handle all these things more cleanly, since they're expected problems + const CandidateChunk& chunkInfo = *it->get(); + try { + DBConfigPtr cfg = grid.getDBConfig(chunkInfo.ns); + verify(cfg); + + // NOTE: We purposely do not reload metadata here, since _doBalanceRound already + // tried to do so once. + ChunkManagerPtr cm = cfg->getChunkManager(chunkInfo.ns); + verify(cm); + + ChunkPtr c = cm->findIntersectingChunk(chunkInfo.chunk.min); + if (c->getMin().woCompare(chunkInfo.chunk.min) || + c->getMax().woCompare(chunkInfo.chunk.max)) { + // likely a split happened somewhere + cm = cfg->getChunkManager(chunkInfo.ns, true /* reload */); + verify(cm); + + c = cm->findIntersectingChunk(chunkInfo.chunk.min); + if (c->getMin().woCompare(chunkInfo.chunk.min) || + c->getMax().woCompare(chunkInfo.chunk.max)) { + log() << "chunk mismatch after reload, ignoring will retry issue " + << chunkInfo.chunk.toString() << endl; + continue; + } } - if ((balancerConfig.isKeySet() && // balancer config doc exists - !grid.shouldBalance(balancerConfig)) || - MONGO_FAIL_POINT(skipBalanceRound)) { - LOG(1) << "Stopping balancing round early as balancing was disabled"; - return movedCount; + BSONObj res; + if (c->moveAndCommit(Shard::make(chunkInfo.to), + Chunk::MaxChunkSize, + writeConcern, + waitForDelete, + 0, /* maxTimeMS */ + res)) { + movedCount++; + continue; } - // Changes to metadata, borked metadata, and connectivity problems between shards should - // cause us to abort this chunk move, but shouldn't cause us to abort the entire round - // of chunks. - // TODO(spencer): We probably *should* abort the whole round on issues communicating - // with the config servers, but its impossible to distinguish those types of failures - // at the moment. - // TODO: Handle all these things more cleanly, since they're expected problems - const CandidateChunk& chunkInfo = *it->get(); - try { - - DBConfigPtr cfg = grid.getDBConfig( chunkInfo.ns ); - verify( cfg ); + // the move requires acquiring the collection metadata's lock, which can fail + log() << "balancer move failed: " << res << " from: " << chunkInfo.from + << " to: " << chunkInfo.to << " chunk: " << chunkInfo.chunk << endl; - // NOTE: We purposely do not reload metadata here, since _doBalanceRound already - // tried to do so once. - ChunkManagerPtr cm = cfg->getChunkManager( chunkInfo.ns ); - verify( cm ); + if (res["chunkTooBig"].trueValue()) { + // reload just to be safe + cm = cfg->getChunkManager(chunkInfo.ns); + verify(cm); + c = cm->findIntersectingChunk(chunkInfo.chunk.min); - ChunkPtr c = cm->findIntersectingChunk( chunkInfo.chunk.min ); - if ( c->getMin().woCompare( chunkInfo.chunk.min ) || c->getMax().woCompare( chunkInfo.chunk.max ) ) { - // likely a split happened somewhere - cm = cfg->getChunkManager( chunkInfo.ns , true /* reload */); - verify( cm ); + log() << "performing a split because migrate failed for size reasons"; - c = cm->findIntersectingChunk( chunkInfo.chunk.min ); - if ( c->getMin().woCompare( chunkInfo.chunk.min ) || c->getMax().woCompare( chunkInfo.chunk.max ) ) { - log() << "chunk mismatch after reload, ignoring will retry issue " << chunkInfo.chunk.toString() << endl; - continue; - } - } + Status status = c->split(Chunk::normal, NULL, NULL); + log() << "split results: " << status << endl; - BSONObj res; - if (c->moveAndCommit(Shard::make(chunkInfo.to), - Chunk::MaxChunkSize, - writeConcern, - waitForDelete, - 0, /* maxTimeMS */ - res)) { + if (!status.isOK()) { + log() << "marking chunk as jumbo: " << c->toString() << endl; + c->markAsJumbo(); + // we increment moveCount so we do another round right away movedCount++; - continue; - } - - // the move requires acquiring the collection metadata's lock, which can fail - log() << "balancer move failed: " << res << " from: " << chunkInfo.from << " to: " << chunkInfo.to - << " chunk: " << chunkInfo.chunk << endl; - - if ( res["chunkTooBig"].trueValue() ) { - // reload just to be safe - cm = cfg->getChunkManager( chunkInfo.ns ); - verify( cm ); - c = cm->findIntersectingChunk( chunkInfo.chunk.min ); - - log() << "performing a split because migrate failed for size reasons"; - - Status status = c->split(Chunk::normal, NULL, NULL); - log() << "split results: " << status << endl; - - if ( !status.isOK() ) { - log() << "marking chunk as jumbo: " << c->toString() << endl; - c->markAsJumbo(); - // we increment moveCount so we do another round right away - movedCount++; - } - } } - catch( const DBException& ex ) { - warning() << "could not move chunk " << chunkInfo.chunk.toString() - << ", continuing balancing round" << causedBy( ex ) << endl; - } + } catch (const DBException& ex) { + warning() << "could not move chunk " << chunkInfo.chunk.toString() + << ", continuing balancing round" << causedBy(ex) << endl; } - - return movedCount; } - void Balancer::_ping( bool waiting ) { - clusterUpdate( MongosType::ConfigNS, - BSON( MongosType::name( _myid )), - BSON( "$set" << BSON( MongosType::ping(jsTime()) << - MongosType::up(static_cast<int>(time(0)-_started)) << - MongosType::waiting(waiting) << - MongosType::mongoVersion(versionString) )), - true, // upsert - false, // multi - WriteConcernOptions::Unacknowledged, - NULL ); - } + return movedCount; +} + +void Balancer::_ping(bool waiting) { + clusterUpdate(MongosType::ConfigNS, + BSON(MongosType::name(_myid)), + BSON("$set" << BSON(MongosType::ping(jsTime()) + << MongosType::up(static_cast<int>(time(0) - _started)) + << MongosType::waiting(waiting) + << MongosType::mongoVersion(versionString))), + true, // upsert + false, // multi + WriteConcernOptions::Unacknowledged, + NULL); +} + +/* +* Builds the details object for the actionlog. +* Current formats for detail are: +* Success: { +* "candidateChunks" : , +* "chunksMoved" : , +* "executionTimeMillis" : , +* "errorOccured" : false +* } +* Failure: { +* "executionTimeMillis" : , +* "errmsg" : , +* "errorOccured" : true +* } +* @param didError, did this round end in an error? +* @param executionTime, the time this round took to run +* @param candidateChunks, the number of chunks identified to be moved +* @param chunksMoved, the number of chunks moved +* @param errmsg, the error message for this round +*/ - /* - * Builds the details object for the actionlog. - * Current formats for detail are: - * Success: { - * "candidateChunks" : , - * "chunksMoved" : , - * "executionTimeMillis" : , - * "errorOccured" : false - * } - * Failure: { - * "executionTimeMillis" : , - * "errmsg" : , - * "errorOccured" : true - * } - * @param didError, did this round end in an error? - * @param executionTime, the time this round took to run - * @param candidateChunks, the number of chunks identified to be moved - * @param chunksMoved, the number of chunks moved - * @param errmsg, the error message for this round - */ - - static BSONObj _buildDetails( bool didError, int executionTime, - int candidateChunks, int chunksMoved, const std::string& errmsg ) { - - BSONObjBuilder builder; - builder.append("executionTimeMillis", executionTime); - builder.append("errorOccured", didError); - - if ( didError ) { - builder.append("errmsg", errmsg); - } else { - builder.append("candidateChunks", candidateChunks); - builder.append("chunksMoved", chunksMoved); - } - return builder.obj(); +static BSONObj _buildDetails(bool didError, + int executionTime, + int candidateChunks, + int chunksMoved, + const std::string& errmsg) { + BSONObjBuilder builder; + builder.append("executionTimeMillis", executionTime); + builder.append("errorOccured", didError); + + if (didError) { + builder.append("errmsg", errmsg); + } else { + builder.append("candidateChunks", candidateChunks); + builder.append("chunksMoved", chunksMoved); } + return builder.obj(); +} - /** - * Reports the result of the balancer round into config.actionlog - * - * @param actionLog, which contains the balancer round information to be logged - * - */ - - static void _reportRound( ActionLogType& actionLog) { - try { - ScopedDbConnection conn( configServer.getConnectionString(), 30 ); - - // send a copy of the message to the log in case it doesn't reach config.actionlog - actionLog.setTime(jsTime()); - - LOG(1) << "about to log balancer result: " << actionLog; - - // The following method is not thread safe. However, there is only one balancer - // thread per mongos process. The create collection is a a no-op when the collection - // already exists - static bool createActionlog = false; - if ( ! createActionlog ) { - try { - static const int actionLogSizeBytes = 1024 * 1024 * 2; - conn->createCollection( ActionLogType::ConfigNS , actionLogSizeBytes , true ); - } - catch ( const DBException& ex ) { - LOG(1) << "config.actionlog could not be created, another mongos process " - << "may have done so" << causedBy(ex); - - } - createActionlog = true; - } - - Status result = clusterInsert( ActionLogType::ConfigNS, - actionLog.toBSON(), - WriteConcernOptions::AllConfigs, - NULL ); - - if ( !result.isOK() ) { - log() << "Error encountered while logging action from balancer " - << result.reason(); +/** + * Reports the result of the balancer round into config.actionlog + * + * @param actionLog, which contains the balancer round information to be logged + * + */ + +static void _reportRound(ActionLogType& actionLog) { + try { + ScopedDbConnection conn(configServer.getConnectionString(), 30); + + // send a copy of the message to the log in case it doesn't reach config.actionlog + actionLog.setTime(jsTime()); + + LOG(1) << "about to log balancer result: " << actionLog; + + // The following method is not thread safe. However, there is only one balancer + // thread per mongos process. The create collection is a a no-op when the collection + // already exists + static bool createActionlog = false; + if (!createActionlog) { + try { + static const int actionLogSizeBytes = 1024 * 1024 * 2; + conn->createCollection(ActionLogType::ConfigNS, actionLogSizeBytes, true); + } catch (const DBException& ex) { + LOG(1) << "config.actionlog could not be created, another mongos process " + << "may have done so" << causedBy(ex); } - - conn.done(); - } - catch ( const DBException& ex ) { - // if we got here, it means the config change is only in the log; - // the change didn't make it to config.actionlog - warning() << "could not log balancer result" << causedBy(ex); + createActionlog = true; } - } - bool Balancer::_checkOIDs() { - vector<Shard> all; - Shard::getAllShards( all ); + Status result = clusterInsert( + ActionLogType::ConfigNS, actionLog.toBSON(), WriteConcernOptions::AllConfigs, NULL); - map<int,Shard> oids; - - for ( vector<Shard>::iterator i=all.begin(); i!=all.end(); ++i ) { - Shard s = *i; - BSONObj f = s.runCommand( "admin" , "features" ); - if ( f["oidMachine"].isNumber() ) { - int x = f["oidMachine"].numberInt(); - if ( oids.count(x) == 0 ) { - oids[x] = s; - } - else { - log() << "error: 2 machines have " << x << " as oid machine piece " << s.toString() << " and " << oids[x].toString() << endl; - s.runCommand( "admin" , BSON( "features" << 1 << "oidReset" << 1 ) ); - oids[x].runCommand( "admin" , BSON( "features" << 1 << "oidReset" << 1 ) ); - return false; - } - } - else { - log() << "warning: oidMachine not set on: " << s.toString() << endl; - } + if (!result.isOK()) { + log() << "Error encountered while logging action from balancer " << result.reason(); } - return true; + + conn.done(); + } catch (const DBException& ex) { + // if we got here, it means the config change is only in the log; + // the change didn't make it to config.actionlog + warning() << "could not log balancer result" << causedBy(ex); } - - /** - * Occasionally prints a log message with shard versions if the versions are not the same - * in the cluster. - */ - void warnOnMultiVersion( const ShardInfoMap& shardInfo ) { - - bool isMultiVersion = false; - for ( ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i ) { - if ( !isSameMajorVersion( i->second.getMongoVersion().c_str() ) ) { - isMultiVersion = true; - break; +} + +bool Balancer::_checkOIDs() { + vector<Shard> all; + Shard::getAllShards(all); + + map<int, Shard> oids; + + for (vector<Shard>::iterator i = all.begin(); i != all.end(); ++i) { + Shard s = *i; + BSONObj f = s.runCommand("admin", "features"); + if (f["oidMachine"].isNumber()) { + int x = f["oidMachine"].numberInt(); + if (oids.count(x) == 0) { + oids[x] = s; + } else { + log() << "error: 2 machines have " << x << " as oid machine piece " << s.toString() + << " and " << oids[x].toString() << endl; + s.runCommand("admin", BSON("features" << 1 << "oidReset" << 1)); + oids[x].runCommand("admin", BSON("features" << 1 << "oidReset" << 1)); + return false; } + } else { + log() << "warning: oidMachine not set on: " << s.toString() << endl; } + } + return true; +} - // If we're all the same version, don't message - if ( !isMultiVersion ) return; - - warning() << "multiVersion cluster detected, my version is " << versionString << endl; - for ( ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i ) { - log() << i->first << " is at version " << i->second.getMongoVersion() << endl; - } +/** + * Occasionally prints a log message with shard versions if the versions are not the same + * in the cluster. + */ +void warnOnMultiVersion(const ShardInfoMap& shardInfo) { + bool isMultiVersion = false; + for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) { + if (!isSameMajorVersion(i->second.getMongoVersion().c_str())) { + isMultiVersion = true; + break; + } } - void Balancer::_doBalanceRound( DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks ) { - verify( candidateChunks ); + // If we're all the same version, don't message + if (!isMultiVersion) + return; - // - // 1. Check whether there is any sharded collection to be balanced by querying - // the ShardsNS::collections collection - // + warning() << "multiVersion cluster detected, my version is " << versionString << endl; + for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) { + log() << i->first << " is at version " << i->second.getMongoVersion() << endl; + } +} - auto_ptr<DBClientCursor> cursor = conn.query(CollectionType::ConfigNS, BSONObj()); +void Balancer::_doBalanceRound(DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks) { + verify(candidateChunks); - if ( NULL == cursor.get() ) { - warning() << "could not query " << CollectionType::ConfigNS - << " while trying to balance" << endl; - return; - } + // + // 1. Check whether there is any sharded collection to be balanced by querying + // the ShardsNS::collections collection + // - vector< string > collections; - while ( cursor->more() ) { - BSONObj col = cursor->nextSafe(); + auto_ptr<DBClientCursor> cursor = conn.query(CollectionType::ConfigNS, BSONObj()); - // sharded collections will have a shard "key". - if ( ! col[CollectionType::keyPattern()].eoo() && - ! col[CollectionType::noBalance()].trueValue() ){ - collections.push_back( col[CollectionType::ns()].String() ); - } - else if( col[CollectionType::noBalance()].trueValue() ){ - LOG(1) << "not balancing collection " << col[CollectionType::ns()].String() - << ", explicitly disabled" << endl; - } - - } - cursor.reset(); + if (NULL == cursor.get()) { + warning() << "could not query " << CollectionType::ConfigNS << " while trying to balance" + << endl; + return; + } - if ( collections.empty() ) { - LOG(1) << "no collections to balance" << endl; - return; + vector<string> collections; + while (cursor->more()) { + BSONObj col = cursor->nextSafe(); + + // sharded collections will have a shard "key". + if (!col[CollectionType::keyPattern()].eoo() && + !col[CollectionType::noBalance()].trueValue()) { + collections.push_back(col[CollectionType::ns()].String()); + } else if (col[CollectionType::noBalance()].trueValue()) { + LOG(1) << "not balancing collection " << col[CollectionType::ns()].String() + << ", explicitly disabled" << endl; } + } + cursor.reset(); - // - // 2. Get a list of all the shards that are participating in this balance round - // along with any maximum allowed quotas and current utilization. We get the - // latter by issuing db.serverStatus() (mem.mapped) to all shards. - // - // TODO: skip unresponsive shards and mark information as stale. - // - - ShardInfoMap shardInfo; - Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo); + if (collections.empty()) { + LOG(1) << "no collections to balance" << endl; + return; + } - if (!loadStatus.isOK()) { - warning() << "failed to load shard metadata" << causedBy(loadStatus); - return; - } + // + // 2. Get a list of all the shards that are participating in this balance round + // along with any maximum allowed quotas and current utilization. We get the + // latter by issuing db.serverStatus() (mem.mapped) to all shards. + // + // TODO: skip unresponsive shards and mark information as stale. + // - if (shardInfo.size() < 2) { - LOG(1) << "can't balance without more active shards"; - return; - } - - OCCASIONALLY warnOnMultiVersion( shardInfo ); + ShardInfoMap shardInfo; + Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo); - // - // 3. For each collection, check if the balancing policy recommends moving anything around. - // + if (!loadStatus.isOK()) { + warning() << "failed to load shard metadata" << causedBy(loadStatus); + return; + } - for (vector<string>::const_iterator it = collections.begin(); it != collections.end(); ++it ) { - const string& ns = *it; + if (shardInfo.size() < 2) { + LOG(1) << "can't balance without more active shards"; + return; + } - OwnedPointerMap<string, OwnedPointerVector<ChunkType> > shardToChunksMap; - cursor = conn.query(ChunkType::ConfigNS, - QUERY(ChunkType::ns(ns)).sort(ChunkType::min())); + OCCASIONALLY warnOnMultiVersion(shardInfo); - set<BSONObj> allChunkMinimums; + // + // 3. For each collection, check if the balancing policy recommends moving anything around. + // - while ( cursor->more() ) { - BSONObj chunkDoc = cursor->nextSafe().getOwned(); + for (vector<string>::const_iterator it = collections.begin(); it != collections.end(); ++it) { + const string& ns = *it; - auto_ptr<ChunkType> chunk(new ChunkType()); - string errmsg; - if (!chunk->parseBSON(chunkDoc, &errmsg)) { - error() << "bad chunk format for " << chunkDoc - << ": " << errmsg << endl; - return; - } + OwnedPointerMap<string, OwnedPointerVector<ChunkType>> shardToChunksMap; + cursor = conn.query(ChunkType::ConfigNS, QUERY(ChunkType::ns(ns)).sort(ChunkType::min())); - allChunkMinimums.insert(chunk->getMin().getOwned()); - OwnedPointerVector<ChunkType>*& chunkList = - shardToChunksMap.mutableMap()[chunk->getShard()]; + set<BSONObj> allChunkMinimums; - if (chunkList == NULL) { - chunkList = new OwnedPointerVector<ChunkType>(); - } + while (cursor->more()) { + BSONObj chunkDoc = cursor->nextSafe().getOwned(); - chunkList->mutableVector().push_back(chunk.release()); + auto_ptr<ChunkType> chunk(new ChunkType()); + string errmsg; + if (!chunk->parseBSON(chunkDoc, &errmsg)) { + error() << "bad chunk format for " << chunkDoc << ": " << errmsg << endl; + return; } - cursor.reset(); - if (shardToChunksMap.map().empty()) { - LOG(1) << "skipping empty collection (" << ns << ")"; - continue; + allChunkMinimums.insert(chunk->getMin().getOwned()); + OwnedPointerVector<ChunkType>*& chunkList = + shardToChunksMap.mutableMap()[chunk->getShard()]; + + if (chunkList == NULL) { + chunkList = new OwnedPointerVector<ChunkType>(); } - for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) { - // this just makes sure there is an entry in shardToChunksMap for every shard - OwnedPointerVector<ChunkType>*& chunkList = - shardToChunksMap.mutableMap()[i->first]; + chunkList->mutableVector().push_back(chunk.release()); + } + cursor.reset(); - if (chunkList == NULL) { - chunkList = new OwnedPointerVector<ChunkType>(); - } - } + if (shardToChunksMap.map().empty()) { + LOG(1) << "skipping empty collection (" << ns << ")"; + continue; + } - DistributionStatus status(shardInfo, shardToChunksMap.map()); + for (ShardInfoMap::const_iterator i = shardInfo.begin(); i != shardInfo.end(); ++i) { + // this just makes sure there is an entry in shardToChunksMap for every shard + OwnedPointerVector<ChunkType>*& chunkList = shardToChunksMap.mutableMap()[i->first]; - // load tags - cursor = conn.query(TagsType::ConfigNS, - QUERY(TagsType::ns(ns)).sort(TagsType::min())); + if (chunkList == NULL) { + chunkList = new OwnedPointerVector<ChunkType>(); + } + } - vector<TagRange> ranges; + DistributionStatus status(shardInfo, shardToChunksMap.map()); - while ( cursor->more() ) { - BSONObj tag = cursor->nextSafe(); - TagRange tr(tag[TagsType::min()].Obj().getOwned(), - tag[TagsType::max()].Obj().getOwned(), - tag[TagsType::tag()].String()); - ranges.push_back(tr); - uassert(16356, - str::stream() << "tag ranges not valid for: " << ns, - status.addTagRange(tr) ); + // load tags + cursor = conn.query(TagsType::ConfigNS, QUERY(TagsType::ns(ns)).sort(TagsType::min())); - } - cursor.reset(); + vector<TagRange> ranges; - DBConfigPtr cfg = grid.getDBConfig( ns ); - if ( !cfg ) { - warning() << "could not load db config to balance " << ns << " collection" << endl; - continue; - } + while (cursor->more()) { + BSONObj tag = cursor->nextSafe(); + TagRange tr(tag[TagsType::min()].Obj().getOwned(), + tag[TagsType::max()].Obj().getOwned(), + tag[TagsType::tag()].String()); + ranges.push_back(tr); + uassert( + 16356, str::stream() << "tag ranges not valid for: " << ns, status.addTagRange(tr)); + } + cursor.reset(); - // This line reloads the chunk manager once if this process doesn't know the collection - // is sharded yet. - ChunkManagerPtr cm = cfg->getChunkManagerIfExists( ns, true ); - if ( !cm ) { - warning() << "could not load chunks to balance " << ns << " collection" << endl; - continue; - } + DBConfigPtr cfg = grid.getDBConfig(ns); + if (!cfg) { + warning() << "could not load db config to balance " << ns << " collection" << endl; + continue; + } - // loop through tags to make sure no chunk spans tags; splits on tag min. for all chunks - bool didAnySplits = false; - for ( unsigned i = 0; i < ranges.size(); i++ ) { - BSONObj min = ranges[i].min; + // This line reloads the chunk manager once if this process doesn't know the collection + // is sharded yet. + ChunkManagerPtr cm = cfg->getChunkManagerIfExists(ns, true); + if (!cm) { + warning() << "could not load chunks to balance " << ns << " collection" << endl; + continue; + } - min = cm->getShardKeyPattern().getKeyPattern().extendRangeBound( min, false ); + // loop through tags to make sure no chunk spans tags; splits on tag min. for all chunks + bool didAnySplits = false; + for (unsigned i = 0; i < ranges.size(); i++) { + BSONObj min = ranges[i].min; - if ( allChunkMinimums.count( min ) > 0 ) - continue; + min = cm->getShardKeyPattern().getKeyPattern().extendRangeBound(min, false); - didAnySplits = true; + if (allChunkMinimums.count(min) > 0) + continue; - log() << "ns: " << ns << " need to split on " - << min << " because there is a range there" << endl; + didAnySplits = true; - ChunkPtr c = cm->findIntersectingChunk( min ); + log() << "ns: " << ns << " need to split on " << min + << " because there is a range there" << endl; - vector<BSONObj> splitPoints; - splitPoints.push_back( min ); + ChunkPtr c = cm->findIntersectingChunk(min); - Status status = c->multiSplit(splitPoints, NULL); - if ( !status.isOK() ) { - error() << "split failed: " << status << endl; - } - else { - LOG(1) << "split worked" << endl; - } - break; - } + vector<BSONObj> splitPoints; + splitPoints.push_back(min); - if ( didAnySplits ) { - // state change, just wait till next round - continue; + Status status = c->multiSplit(splitPoints, NULL); + if (!status.isOK()) { + error() << "split failed: " << status << endl; + } else { + LOG(1) << "split worked" << endl; } + break; + } - CandidateChunk* p = _policy->balance( ns, status, _balancedLastTime ); - if ( p ) candidateChunks->push_back( CandidateChunkPtr( p ) ); + if (didAnySplits) { + // state change, just wait till next round + continue; } - } - bool Balancer::_init() { - try { + CandidateChunk* p = _policy->balance(ns, status, _balancedLastTime); + if (p) + candidateChunks->push_back(CandidateChunkPtr(p)); + } +} - log() << "about to contact config servers and shards" << endl; +bool Balancer::_init() { + try { + log() << "about to contact config servers and shards" << endl; - // contact the config server and refresh shard information - // checks that each shard is indeed a different process (no hostname mixup) - // these checks are redundant in that they're redone at every new round but we want to do them initially here - // so to catch any problem soon - Shard::reloadShardInfo(); - _checkOIDs(); + // contact the config server and refresh shard information + // checks that each shard is indeed a different process (no hostname mixup) + // these checks are redundant in that they're redone at every new round but we want to do them initially here + // so to catch any problem soon + Shard::reloadShardInfo(); + _checkOIDs(); - log() << "config servers and shards contacted successfully" << endl; + log() << "config servers and shards contacted successfully" << endl; - StringBuilder buf; - buf << getHostNameCached() << ":" << serverGlobalParams.port; - _myid = buf.str(); - _started = time(0); + StringBuilder buf; + buf << getHostNameCached() << ":" << serverGlobalParams.port; + _myid = buf.str(); + _started = time(0); - log() << "balancer id: " << _myid << " started at " << time_t_to_String_short(_started) << endl; + log() << "balancer id: " << _myid << " started at " << time_t_to_String_short(_started) + << endl; - return true; + return true; + } catch (std::exception& e) { + warning() << "could not initialize balancer, please check that all shards and config " + "servers are up: " << e.what() << endl; + return false; + } +} + +void Balancer::run() { + // this is the body of a BackgroundJob so if we throw here we're basically ending the balancer thread prematurely + while (!inShutdown()) { + if (!_init()) { + log() << "will retry to initialize balancer in one minute" << endl; + sleepsecs(60); + continue; } - catch ( std::exception& e ) { - warning() << "could not initialize balancer, please check that all shards and config servers are up: " << e.what() << endl; - return false; - } + break; } - void Balancer::run() { + int sleepTime = 10; - // this is the body of a BackgroundJob so if we throw here we're basically ending the balancer thread prematurely - while ( ! inShutdown() ) { + // getConnectioString and dist lock constructor does not throw, which is what we expect on while + // on the balancer thread + ConnectionString config = configServer.getConnectionString(); + DistributedLock balanceLock(config, "balancer"); - if ( ! _init() ) { - log() << "will retry to initialize balancer in one minute" << endl; - sleepsecs( 60 ); - continue; - } - - break; - } - - int sleepTime = 10; + while (!inShutdown()) { + Timer balanceRoundTimer; + ActionLogType actionLog; - // getConnectioString and dist lock constructor does not throw, which is what we expect on while - // on the balancer thread - ConnectionString config = configServer.getConnectionString(); - DistributedLock balanceLock( config , "balancer" ); + actionLog.setServer(getHostNameCached()); + actionLog.setWhat("balancer.round"); - while ( ! inShutdown() ) { + try { + ScopedDbConnection conn(config.toString(), 30); - Timer balanceRoundTimer; - ActionLogType actionLog; + // ping has to be first so we keep things in the config server in sync + _ping(); - actionLog.setServer(getHostNameCached()); - actionLog.setWhat("balancer.round"); + BSONObj balancerResult; - try { + // use fresh shard state + Shard::reloadShardInfo(); - ScopedDbConnection conn(config.toString(), 30); + // refresh chunk size (even though another balancer might be active) + Chunk::refreshChunkSize(); - // ping has to be first so we keep things in the config server in sync - _ping(); + SettingsType balancerConfig; + string errMsg; - BSONObj balancerResult; + if (!grid.getBalancerSettings(&balancerConfig, &errMsg)) { + warning() << errMsg; + return; + } - // use fresh shard state - Shard::reloadShardInfo(); + // now make sure we should even be running + if ((balancerConfig.isKeySet() && // balancer config doc exists + !grid.shouldBalance(balancerConfig)) || + MONGO_FAIL_POINT(skipBalanceRound)) { + LOG(1) << "skipping balancing round because balancing is disabled" << endl; - // refresh chunk size (even though another balancer might be active) - Chunk::refreshChunkSize(); + // Ping again so scripts can determine if we're active without waiting + _ping(true); - SettingsType balancerConfig; - string errMsg; + conn.done(); - if (!grid.getBalancerSettings(&balancerConfig, &errMsg)) { - warning() << errMsg; - return ; - } + sleepsecs(sleepTime); + continue; + } - // now make sure we should even be running - if ((balancerConfig.isKeySet() && // balancer config doc exists - !grid.shouldBalance(balancerConfig)) || - MONGO_FAIL_POINT(skipBalanceRound)) { + uassert(13258, "oids broken after resetting!", _checkOIDs()); - LOG(1) << "skipping balancing round because balancing is disabled" << endl; + { + dist_lock_try lk(&balanceLock, "doing balance round"); + if (!lk.got()) { + LOG(1) << "skipping balancing round because another balancer is active" << endl; // Ping again so scripts can determine if we're active without waiting - _ping( true ); + _ping(true); conn.done(); - sleepsecs( sleepTime ); + sleepsecs(sleepTime); // no need to wake up soon continue; } - uassert( 13258 , "oids broken after resetting!" , _checkOIDs() ); - - { - dist_lock_try lk( &balanceLock , "doing balance round" ); - if ( ! lk.got() ) { - LOG(1) << "skipping balancing round because another balancer is active" << endl; - - // Ping again so scripts can determine if we're active without waiting - _ping( true ); - - conn.done(); - - sleepsecs( sleepTime ); // no need to wake up soon - continue; - } + if (!isConfigServerConsistent()) { + conn.done(); + warning() << "Skipping balancing round because data inconsistency" + << " was detected amongst the config servers." << endl; + sleepsecs(sleepTime); + continue; + } - if ( !isConfigServerConsistent() ) { - conn.done(); - warning() << "Skipping balancing round because data inconsistency" - << " was detected amongst the config servers." << endl; - sleepsecs( sleepTime ); - continue; + const bool waitForDelete = + (balancerConfig.isWaitForDeleteSet() ? balancerConfig.getWaitForDelete() + : false); + + scoped_ptr<WriteConcernOptions> writeConcern; + if (balancerConfig.isKeySet()) { // if balancer doc exists. + StatusWith<WriteConcernOptions*> extractStatus = + balancerConfig.extractWriteConcern(); + if (extractStatus.isOK()) { + writeConcern.reset(extractStatus.getValue()); + } else { + warning() << extractStatus.toString(); } + } - const bool waitForDelete = (balancerConfig.isWaitForDeleteSet() ? - balancerConfig.getWaitForDelete() : false); - - scoped_ptr<WriteConcernOptions> writeConcern; - if (balancerConfig.isKeySet()) { // if balancer doc exists. - StatusWith<WriteConcernOptions*> extractStatus = - balancerConfig.extractWriteConcern(); - if (extractStatus.isOK()) { - writeConcern.reset(extractStatus.getValue()); - } - else { - warning() << extractStatus.toString(); - } - } + LOG(1) << "*** start balancing round. " + << "waitForDelete: " << waitForDelete << ", secondaryThrottle: " + << (writeConcern.get() ? writeConcern->toBSON().toString() : "default") + << endl; + + vector<CandidateChunkPtr> candidateChunks; + _doBalanceRound(conn.conn(), &candidateChunks); + if (candidateChunks.size() == 0) { + LOG(1) << "no need to move any chunk" << endl; + _balancedLastTime = 0; + } else { + _balancedLastTime = + _moveChunks(&candidateChunks, writeConcern.get(), waitForDelete); + } - LOG(1) << "*** start balancing round. " - << "waitForDelete: " << waitForDelete - << ", secondaryThrottle: " - << (writeConcern.get() ? writeConcern->toBSON().toString() : "default") - << endl; - - vector<CandidateChunkPtr> candidateChunks; - _doBalanceRound( conn.conn() , &candidateChunks ); - if ( candidateChunks.size() == 0 ) { - LOG(1) << "no need to move any chunk" << endl; - _balancedLastTime = 0; - } - else { - _balancedLastTime = _moveChunks(&candidateChunks, - writeConcern.get(), - waitForDelete ); - } + actionLog.setDetails(_buildDetails(false, + balanceRoundTimer.millis(), + static_cast<int>(candidateChunks.size()), + _balancedLastTime, + "")); - actionLog.setDetails( _buildDetails( false, balanceRoundTimer.millis(), - static_cast<int>(candidateChunks.size()), _balancedLastTime, "") ); + _reportRound(actionLog); - _reportRound( actionLog ); + LOG(1) << "*** end of balancing round" << endl; + } - LOG(1) << "*** end of balancing round" << endl; - } + // Ping again so scripts can determine if we're active without waiting + _ping(true); - // Ping again so scripts can determine if we're active without waiting - _ping( true ); - - conn.done(); + conn.done(); - sleepsecs( _balancedLastTime ? sleepTime / 10 : sleepTime ); - } - catch ( std::exception& e ) { - log() << "caught exception while doing balance: " << e.what() << endl; + sleepsecs(_balancedLastTime ? sleepTime / 10 : sleepTime); + } catch (std::exception& e) { + log() << "caught exception while doing balance: " << e.what() << endl; - // Just to match the opening statement if in log level 1 - LOG(1) << "*** End of balancing round" << endl; + // Just to match the opening statement if in log level 1 + LOG(1) << "*** End of balancing round" << endl; - // This round failed, tell the world! - actionLog.setDetails( _buildDetails( true, balanceRoundTimer.millis(), - 0, 0, e.what()) ); + // This round failed, tell the world! + actionLog.setDetails(_buildDetails(true, balanceRoundTimer.millis(), 0, 0, e.what())); - _reportRound( actionLog ); + _reportRound(actionLog); - sleepsecs( sleepTime ); // sleep a fair amount b/c of error - continue; - } + sleepsecs(sleepTime); // sleep a fair amount b/c of error + continue; } - } +} } // namespace mongo |