// balancer_policy.cpp /** * Copyright (C) 2010 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include #include "mongo/client/connpool.h" #include "mongo/s/balancer_policy.h" #include "mongo/s/chunk.h" #include "mongo/s/config.h" #include "mongo/s/type_shard.h" #include "mongo/s/type_tags.h" #include "mongo/util/log.h" #include "mongo/util/stringutils.h" #include "mongo/util/text.h" namespace mongo { using std::auto_ptr; using std::endl; using std::map; using std::numeric_limits; using std::set; using std::string; using std::vector; string TagRange::toString() const { return str::stream() << min << " -->> " << max << " on " << tag; } DistributionStatus::DistributionStatus( const ShardInfoMap& shardInfo, const ShardToChunksMap& shardToChunksMap ) : _shardInfo( shardInfo ), _shardChunks( shardToChunksMap ) { for ( ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i ) { _shards.insert( i->first ); } } const ShardInfo& DistributionStatus::shardInfo( const string& shard ) const { ShardInfoMap::const_iterator i = _shardInfo.find( shard ); verify( i != _shardInfo.end() ); return i->second; } unsigned DistributionStatus::totalChunks() const { unsigned total = 0; for (ShardToChunksMap::const_iterator i = _shardChunks.begin(); i != _shardChunks.end(); ++i) { total += i->second->size(); } return total; } unsigned DistributionStatus::numberOfChunksInShard( const string& shard ) const { ShardToChunksMap::const_iterator i = _shardChunks.find(shard); if (i == _shardChunks.end()) { return 0; } return i->second->size(); } unsigned DistributionStatus::numberOfChunksInShardWithTag( const string& shard , const string& tag ) const { ShardToChunksMap::const_iterator i = _shardChunks.find(shard); if (i == _shardChunks.end()) { return 0; } unsigned total = 0; const vector& chunkList = i->second->vector(); for (unsigned j = 0; j < i->second->size(); j++) { if (tag == getTagForChunk(*chunkList[j])) { total++; } } return total; } string DistributionStatus::getBestReceieverShard( const string& tag ) const { string best; unsigned minChunks = numeric_limits::max(); for ( ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i ) { if ( i->second.isSizeMaxed() ) { LOG(1) << i->first << " has already reached the maximum total chunk size." << endl; continue; } if ( i->second.isDraining() ) { LOG(1) << i->first << " is currently draining." << endl; continue; } if ( ! i->second.hasTag( tag ) ) { LOG(1) << i->first << " doesn't have right tag" << endl; continue; } unsigned myChunks = numberOfChunksInShard( i->first ); if ( myChunks >= minChunks ) { LOG(1) << i->first << " has more chunks me:" << myChunks << " best: " << best << ":" << minChunks << endl; continue; } best = i->first; minChunks = myChunks; } return best; } string DistributionStatus::getMostOverloadedShard( const string& tag ) const { string worst; unsigned maxChunks = 0; for ( ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i ) { unsigned myChunks = numberOfChunksInShardWithTag( i->first, tag ); if ( myChunks <= maxChunks ) continue; worst = i->first; maxChunks = myChunks; } return worst; } const vector& DistributionStatus::getChunks( const string& shard) const { ShardToChunksMap::const_iterator i = _shardChunks.find(shard); verify(i != _shardChunks.end()); return i->second->vector(); } bool DistributionStatus::addTagRange( const TagRange& range ) { // first check for overlaps for ( map::const_iterator i = _tagRanges.begin(); i != _tagRanges.end(); ++i ) { const TagRange& tocheck = i->second; if ( range.min == tocheck.min ) { LOG(1) << "have 2 ranges with the same min " << range << " " << tocheck << endl; return false; } if ( range.min < tocheck.min ) { if ( range.max > tocheck.min ) { LOG(1) << "have overlapping ranges " << range << " " << tocheck << endl; return false; } } else { // range.min > tocheck.min if ( tocheck.max > range.min ) { LOG(1) << "have overlapping ranges " << range << " " << tocheck << endl; return false; } } } _tagRanges[range.max.getOwned()] = range; _allTags.insert( range.tag ); return true; } string DistributionStatus::getTagForChunk( const ChunkType& chunk ) const { if ( _tagRanges.size() == 0 ) return ""; const BSONObj min(chunk.getMin()); map::const_iterator i = _tagRanges.upper_bound( min ); if ( i == _tagRanges.end() ) return ""; const TagRange& range = i->second; if ( min < range.min ) return ""; return range.tag; } void DistributionStatus::dump() const { log() << "DistributionStatus" << endl; log() << " shards" << endl; for ( ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i ) { log() << " " << i->first << "\t" << i->second.toString() << endl; ShardToChunksMap::const_iterator j = _shardChunks.find(i->first); verify(j != _shardChunks.end()); const OwnedPointerVector& v = *j->second; for ( unsigned x = 0; x < v.size(); x++ ) log() << " " << *v[x] << endl; } if ( _tagRanges.size() > 0 ) { log() << " tag ranges" << endl; for ( map::const_iterator i = _tagRanges.begin(); i != _tagRanges.end(); ++i ) log() << i->second.toString() << endl; } } Status DistributionStatus::populateShardInfoMap(ShardInfoMap* shardInfo) { try { ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30); auto_ptr cursor(conn->query(ShardType::ConfigNS , Query())); uassert(28597, "Failed to load shard config", cursor.get() != NULL); while (cursor->more()) { ShardType shard; std::string errMsg; bool parseOk = shard.parseBSON(cursor->next(), &errMsg); if (!parseOk) { return Status(ErrorCodes::UnsupportedFormat, errMsg); } std::set dummy; ShardInfo newShardEntry(shard.getMaxSize(), Shard::getShardDataSizeBytes(shard.getHost()) / 1024 / 1024, shard.getDraining(), dummy, Shard::getShardMongoVersion(shard.getHost())); if (shard.isTagsSet()) { BSONArrayIteratorSorted tagIter(shard.getTags()); while (tagIter.more()) { BSONElement tagElement = tagIter.next(); if (tagElement.type() != String) { return Status(ErrorCodes::UnsupportedFormat, str::stream() << "shard tags only supports strings, " << "found " << typeName(tagElement.type())); } newShardEntry.addTag(tagElement.String()); } } shardInfo->insert(make_pair(shard.getName(), newShardEntry)); } conn.done(); } catch (const DBException& ex) { return ex.toStatus(); } return Status::OK(); } void DistributionStatus::populateShardToChunksMap(const ShardInfoMap& allShards, const ChunkManager& chunkMgr, ShardToChunksMap* shardToChunksMap) { // Makes sure there is an entry in shardToChunksMap for every shard. for (ShardInfoMap::const_iterator it = allShards.begin(); it != allShards.end(); ++it) { OwnedPointerVector*& chunkList = (*shardToChunksMap)[it->first]; if (chunkList == NULL) { chunkList = new OwnedPointerVector(); } } const ChunkMap& chunkMap = chunkMgr.getChunkMap(); for (ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it) { const ChunkPtr chunkPtr = it->second; auto_ptr chunk(new ChunkType()); chunk->setNS(chunkPtr->getns()); chunk->setMin(chunkPtr->getMin().getOwned()); chunk->setMax(chunkPtr->getMax().getOwned()); chunk->setJumbo(chunkPtr->isJumbo()); // TODO: is this reliable? const string shardName(chunkPtr->getShard().getName()); chunk->setShard(shardName); (*shardToChunksMap)[shardName]->push_back(chunk.release()); } } StatusWith DistributionStatus::getTagForSingleChunk(const string& configServer, const string& ns, const ChunkType& chunk) { BSONObj tagRangeDoc; try { ScopedDbConnection conn(configServer, 30); Query query(QUERY(TagsType::ns(ns) << TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max() << BSON("$gte" << chunk.getMax()))); tagRangeDoc = conn->findOne(TagsType::ConfigNS, query); conn.done(); } catch (const DBException& excep) { return StatusWith(excep.toStatus()); } if (tagRangeDoc.isEmpty()) { return StatusWith(""); } TagsType tagRange; string errMsg; if (!tagRange.parseBSON(tagRangeDoc, &errMsg)) { return StatusWith(ErrorCodes::FailedToParse, errMsg); } return StatusWith(tagRange.getTag()); } MigrateInfo* BalancerPolicy::balance( const string& ns, const DistributionStatus& distribution, int balancedLastTime ) { // 1) check for shards that policy require to us to move off of: // draining only // 2) check tag policy violations // 3) then we make sure chunks are balanced for each tag // ---- // 1) check things we have to move { const set& shards = distribution.shards(); for ( set::const_iterator z = shards.begin(); z != shards.end(); ++z ) { string shard = *z; const ShardInfo& info = distribution.shardInfo( shard ); if ( ! info.isDraining() ) continue; if ( distribution.numberOfChunksInShard( shard ) == 0 ) continue; // now we know we need to move to chunks off this shard // we will if we are allowed const vector& chunks = distribution.getChunks( shard ); unsigned numJumboChunks = 0; // since we have to move all chunks, lets just do in order for ( unsigned i=0; i 0 ) { const set& shards = distribution.shards(); for ( set::const_iterator i = shards.begin(); i != shards.end(); ++i ) { string shard = *i; const ShardInfo& info = distribution.shardInfo( shard ); const vector& chunks = distribution.getChunks(shard); for ( unsigned j = 0; j < chunks.size(); j++ ) { const ChunkType& chunk = *chunks[j]; string tag = distribution.getTagForChunk(chunk); if ( info.hasTag( tag ) ) continue; // uh oh, this chunk is in the wrong place log() << "chunk " << chunk << " is not on a shard with the right tag: " << tag << endl; if (chunk.isJumboSet() && chunk.getJumbo()) { warning() << "chunk " << chunk << " is jumbo, so cannot be moved" << endl; continue; } string to = distribution.getBestReceieverShard( tag ); if ( to.size() == 0 ) { log() << "no where to put it :(" << endl; continue; } verify( to != shard ); log() << " going to move to: " << to << endl; return new MigrateInfo(ns, to, shard, chunk.toBSON()); } } } // 3) for each tag balance int threshold = 8; if ( balancedLastTime || distribution.totalChunks() < 20 ) threshold = 2; else if ( distribution.totalChunks() < 80 ) threshold = 4; // randomize the order in which we balance the tags // this is so that one bad tag doesn't prevent others from getting balanced vector tags; { set t = distribution.tags(); for ( set::const_iterator i = t.begin(); i != t.end(); ++i ) tags.push_back( *i ); tags.push_back( "" ); std::random_shuffle( tags.begin(), tags.end() ); } for ( unsigned i=0; i& chunks = distribution.getChunks(from); unsigned numJumboChunks = 0; for ( unsigned j = 0; j < chunks.size(); j++ ) { const ChunkType& chunk = *chunks[j]; if (distribution.getTagForChunk(chunk) != tag) continue; if (chunk.isJumboSet() && chunk.getJumbo()) { numJumboChunks++; continue; } log() << " ns: " << ns << " going to move " << chunk << " from: " << from << " to: " << to << " tag [" << tag << "]" << endl; return new MigrateInfo(ns, to, from, chunk.toBSON()); } if ( numJumboChunks ) { error() << "shard: " << from << " ns: " << ns << " has too many chunks, but they are all jumbo " << " numJumboChunks: " << numJumboChunks << endl; continue; } verify( false ); // should be impossible } // Everything is balanced here! return NULL; } ShardInfo::ShardInfo(long long maxSizeMB, long long currSizeMB, bool draining, const set& tags, const string& mongoVersion): _maxSizeMB(maxSizeMB), _currSizeMB(currSizeMB), _draining(draining), _tags(tags), _mongoVersion(mongoVersion) { } ShardInfo::ShardInfo() : _maxSizeMB(0), _currSizeMB(0), _draining(false) { } void ShardInfo::addTag( const string& tag ) { _tags.insert( tag ); } bool ShardInfo::isSizeMaxed() const { if (_maxSizeMB == 0 || _currSizeMB == 0) return false; return _currSizeMB >= _maxSizeMB; } bool ShardInfo::hasTag( const string& tag ) const { if ( tag.size() == 0 ) return true; return _tags.count( tag ) > 0; } string ShardInfo::toString() const { StringBuilder ss; ss << " maxSizeMB: " << _maxSizeMB; ss << " currSizeMB: " << _currSizeMB; ss << " draining: " << _draining; if ( _tags.size() > 0 ) { ss << "tags : "; for ( set::const_iterator i = _tags.begin(); i != _tags.end(); ++i ) ss << *i << ","; } ss << " version: " << _mongoVersion; return ss.str(); } string ChunkInfo::toString() const { StringBuilder buf; buf << " min: " << min; buf << " max: " << max; return buf.str(); } } // namespace mongo