// balancer_policy.cpp /** * Copyright (C) 2010 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/balancer_policy.h" #include #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog/type_tags.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" #include "mongo/util/log.h" #include "mongo/util/stringutils.h" namespace mongo { using std::map; using std::numeric_limits; using std::set; using std::string; using std::vector; namespace { /** * Executes the serverStatus command against the specified shard and obtains the version of the * running MongoD service. * * The MongoD version or throws an exception. Known exception codes are: * ShardNotFound if shard by that id is not available on the registry * NoSuchKey if the version could not be retrieved */ std::string retrieveShardMongoDVersion(OperationContext* txn, ShardId shardId, ShardRegistry* shardRegistry) { auto shard = shardRegistry->getShard(txn, shardId); if (!shard) { uassertStatusOK({ErrorCodes::ShardNotFound, "Shard not found"}); } auto shardHost = uassertStatusOK( shard->getTargeter()->findHost({ReadPreference::PrimaryOnly, TagSet::primaryOnly()})); BSONObj serverStatus = uassertStatusOK(shardRegistry->runCommand(shardHost, "admin", BSON("serverStatus" << 1))); BSONElement versionElement = serverStatus["version"]; if (versionElement.type() != String) { uassertStatusOK({ErrorCodes::NoSuchKey, "version field not found in serverStatus"}); } return versionElement.str(); } } // namespace string TagRange::toString() const { return str::stream() << min << " -->> " << max << " on " << tag; } DistributionStatus::DistributionStatus(const ShardInfoMap& shardInfo, const ShardToChunksMap& shardToChunksMap) : _shardInfo(shardInfo), _shardChunks(shardToChunksMap) { for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) { _shardIds.insert(i->first); } } const ShardInfo& DistributionStatus::shardInfo(const ShardId& shardId) const { ShardInfoMap::const_iterator i = _shardInfo.find(shardId); verify(i != _shardInfo.end()); return i->second; } unsigned DistributionStatus::totalChunks() const { unsigned total = 0; for (ShardToChunksMap::const_iterator i = _shardChunks.begin(); i != _shardChunks.end(); ++i) { total += i->second.size(); } return total; } unsigned DistributionStatus::numberOfChunksInShard(const ShardId& shardId) const { ShardToChunksMap::const_iterator i = _shardChunks.find(shardId); if (i == _shardChunks.end()) { return 0; } return i->second.size(); } unsigned DistributionStatus::numberOfChunksInShardWithTag(const ShardId& shardId, const string& tag) const { ShardToChunksMap::const_iterator i = _shardChunks.find(shardId); if (i == _shardChunks.end()) { return 0; } unsigned total = 0; const vector& chunkList = i->second; for (unsigned j = 0; j < i->second.size(); j++) { if (tag == getTagForChunk(chunkList[j])) { total++; } } return total; } string DistributionStatus::getBestReceieverShard(const string& tag) const { string best; unsigned minChunks = numeric_limits::max(); for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) { if (i->second.isSizeMaxed()) { LOG(1) << i->first << " has already reached the maximum total chunk size."; continue; } if (i->second.isDraining()) { LOG(1) << i->first << " is currently draining."; continue; } if (!i->second.hasTag(tag)) { LOG(1) << i->first << " doesn't have right tag"; continue; } unsigned myChunks = numberOfChunksInShard(i->first); if (myChunks >= minChunks) { LOG(1) << i->first << " has more chunks me:" << myChunks << " best: " << best << ":" << minChunks; continue; } best = i->first; minChunks = myChunks; } return best; } string DistributionStatus::getMostOverloadedShard(const string& tag) const { string worst; unsigned maxChunks = 0; for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) { unsigned myChunks = numberOfChunksInShardWithTag(i->first, tag); if (myChunks <= maxChunks) continue; worst = i->first; maxChunks = myChunks; } return worst; } const vector& DistributionStatus::getChunks(const ShardId& shardId) const { ShardToChunksMap::const_iterator i = _shardChunks.find(shardId); invariant(i != _shardChunks.end()); return i->second; } bool DistributionStatus::addTagRange(const TagRange& range) { // first check for overlaps for (map::const_iterator i = _tagRanges.begin(); i != _tagRanges.end(); ++i) { const TagRange& tocheck = i->second; if (range.min == tocheck.min) { LOG(1) << "have 2 ranges with the same min " << range << " " << tocheck; return false; } if (range.min < tocheck.min) { if (range.max > tocheck.min) { LOG(1) << "have overlapping ranges " << range << " " << tocheck; return false; } } else { // range.min > tocheck.min if (tocheck.max > range.min) { LOG(1) << "have overlapping ranges " << range << " " << tocheck; return false; } } } _tagRanges[range.max.getOwned()] = range; _allTags.insert(range.tag); return true; } string DistributionStatus::getTagForChunk(const ChunkType& chunk) const { if (_tagRanges.size() == 0) return ""; const BSONObj min(chunk.getMin()); map::const_iterator i = _tagRanges.upper_bound(min); if (i == _tagRanges.end()) return ""; const TagRange& range = i->second; if (min < range.min) return ""; return range.tag; } void DistributionStatus::dump() const { log() << "DistributionStatus"; log() << " shards"; for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) { log() << " " << i->first << "\t" << i->second.toString(); ShardToChunksMap::const_iterator j = _shardChunks.find(i->first); verify(j != _shardChunks.end()); const vector& v = j->second; for (unsigned x = 0; x < v.size(); x++) { log() << " " << v[x]; } } if (_tagRanges.size() > 0) { log() << " tag ranges"; for (map::const_iterator i = _tagRanges.begin(); i != _tagRanges.end(); ++i) log() << i->second.toString(); } } Status DistributionStatus::populateShardInfoMap(OperationContext* txn, ShardInfoMap* shardInfo) { try { vector shards; Status status = grid.catalogManager(txn)->getAllShards(txn, &shards); if (!status.isOK()) { return status; } for (const ShardType& shardData : shards) { std::set dummy; const long long shardSizeBytes = uassertStatusOK( shardutil::retrieveTotalShardSize(txn, shardData.getName(), grid.shardRegistry())); const std::string shardMongodVersion = retrieveShardMongoDVersion(txn, shardData.getName(), grid.shardRegistry()); ShardInfo newShardEntry(shardData.getMaxSizeMB(), shardSizeBytes / 1024 / 1024, shardData.getDraining(), dummy, shardMongodVersion); for (const string& shardTag : shardData.getTags()) { newShardEntry.addTag(shardTag); } shardInfo->insert(make_pair(shardData.getName(), newShardEntry)); } } catch (const DBException& ex) { return ex.toStatus(); } return Status::OK(); } void DistributionStatus::populateShardToChunksMap(const ShardInfoMap& allShards, const ChunkManager& chunkMgr, ShardToChunksMap* shardToChunksMap) { // Makes sure there is an entry in shardToChunksMap for every shard. for (ShardInfoMap::const_iterator it = allShards.begin(); it != allShards.end(); ++it) { (*shardToChunksMap)[it->first]; } const ChunkMap& chunkMap = chunkMgr.getChunkMap(); for (ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it) { const ChunkPtr chunkPtr = it->second; ChunkType chunk; chunk.setNS(chunkMgr.getns()); chunk.setMin(chunkPtr->getMin().getOwned()); chunk.setMax(chunkPtr->getMax().getOwned()); chunk.setJumbo(chunkPtr->isJumbo()); // TODO: is this reliable? const string shardName(chunkPtr->getShardId()); chunk.setShard(shardName); (*shardToChunksMap)[shardName].push_back(chunk); } } MigrateInfo* BalancerPolicy::balance(const string& ns, const DistributionStatus& distribution, int balancedLastTime) { // 1) check for shards that policy require to us to move off of: // draining only // 2) check tag policy violations // 3) then we make sure chunks are balanced for each tag // ---- // 1) check things we have to move { for (const ShardId& shardId : distribution.shardIds()) { const ShardInfo& info = distribution.shardInfo(shardId); if (!info.isDraining()) continue; if (distribution.numberOfChunksInShard(shardId) == 0) continue; // now we know we need to move to chunks off this shard // we will if we are allowed const vector& chunks = distribution.getChunks(shardId); unsigned numJumboChunks = 0; // since we have to move all chunks, lets just do in order for (unsigned i = 0; i < chunks.size(); i++) { const ChunkType& chunkToMove = chunks[i]; if (chunkToMove.getJumbo()) { numJumboChunks++; continue; } string tag = distribution.getTagForChunk(chunkToMove); const ShardId to = distribution.getBestReceieverShard(tag); if (to.size() == 0) { warning() << "want to move chunk: " << chunkToMove << "(" << tag << ") " << "from " << shardId << " but can't find anywhere to put it"; continue; } log() << "going to move " << chunkToMove << " from " << shardId << "(" << tag << ")" << " to " << to; return new MigrateInfo(ns, to, shardId, chunkToMove.toBSON()); } warning() << "can't find any chunk to move from: " << shardId << " but we want to. " << " numJumboChunks: " << numJumboChunks; } } // 2) tag violations if (distribution.tags().size() > 0) { for (const ShardId& shardId : distribution.shardIds()) { const ShardInfo& info = distribution.shardInfo(shardId); const vector& chunks = distribution.getChunks(shardId); for (unsigned j = 0; j < chunks.size(); j++) { const ChunkType& chunk = chunks[j]; string tag = distribution.getTagForChunk(chunk); if (info.hasTag(tag)) continue; // uh oh, this chunk is in the wrong place log() << "chunk " << chunk << " is not on a shard with the right tag: " << tag; if (chunk.getJumbo()) { warning() << "chunk " << chunk << " is jumbo, so cannot be moved"; continue; } const ShardId to = distribution.getBestReceieverShard(tag); if (to.size() == 0) { log() << "no where to put it :("; continue; } verify(to != shardId); log() << " going to move to: " << to; return new MigrateInfo(ns, to, shardId, chunk.toBSON()); } } } // 3) for each tag balance int threshold = 8; if (balancedLastTime || distribution.totalChunks() < 20) threshold = 2; else if (distribution.totalChunks() < 80) threshold = 4; // randomize the order in which we balance the tags // this is so that one bad tag doesn't prevent others from getting balanced vector tags; { set t = distribution.tags(); for (set::const_iterator i = t.begin(); i != t.end(); ++i) tags.push_back(*i); tags.push_back(""); std::random_shuffle(tags.begin(), tags.end()); } for (unsigned i = 0; i < tags.size(); i++) { string tag = tags[i]; const ShardId from = distribution.getMostOverloadedShard(tag); if (from.size() == 0) continue; unsigned max = distribution.numberOfChunksInShardWithTag(from, tag); if (max == 0) continue; string to = distribution.getBestReceieverShard(tag); if (to.size() == 0) { log() << "no available shards to take chunks for tag [" << tag << "]"; return NULL; } unsigned min = distribution.numberOfChunksInShardWithTag(to, tag); const int imbalance = max - min; LOG(1) << "collection : " << ns; LOG(1) << "donor : " << from << " chunks on " << max; LOG(1) << "receiver : " << to << " chunks on " << min; LOG(1) << "threshold : " << threshold; if (imbalance < threshold) continue; const vector& chunks = distribution.getChunks(from); unsigned numJumboChunks = 0; for (unsigned j = 0; j < chunks.size(); j++) { const ChunkType& chunk = chunks[j]; if (distribution.getTagForChunk(chunk) != tag) continue; if (chunk.getJumbo()) { numJumboChunks++; continue; } log() << " ns: " << ns << " going to move " << chunk << " from: " << from << " to: " << to << " tag [" << tag << "]"; return new MigrateInfo(ns, to, from, chunk.toBSON()); } if (numJumboChunks) { error() << "shard: " << from << " ns: " << ns << " has too many chunks, but they are all jumbo " << " numJumboChunks: " << numJumboChunks; continue; } verify(false); // should be impossible } // Everything is balanced here! return NULL; } ShardInfo::ShardInfo(long long maxSizeMB, long long currSizeMB, bool draining, const set& tags, const string& mongoVersion) : _maxSizeMB(maxSizeMB), _currSizeMB(currSizeMB), _draining(draining), _tags(tags), _mongoVersion(mongoVersion) {} ShardInfo::ShardInfo() : _maxSizeMB(0), _currSizeMB(0), _draining(false) {} void ShardInfo::addTag(const string& tag) { _tags.insert(tag); } bool ShardInfo::isSizeMaxed() const { if (_maxSizeMB == 0 || _currSizeMB == 0) return false; return _currSizeMB >= _maxSizeMB; } bool ShardInfo::hasTag(const string& tag) const { if (tag.size() == 0) return true; return _tags.count(tag) > 0; } string ShardInfo::toString() const { StringBuilder ss; ss << " maxSizeMB: " << _maxSizeMB; ss << " currSizeMB: " << _currSizeMB; ss << " draining: " << _draining; if (_tags.size() > 0) { ss << "tags : "; for (set::const_iterator i = _tags.begin(); i != _tags.end(); ++i) ss << *i << ","; } ss << " version: " << _mongoVersion; return ss.str(); } string ChunkInfo::toString() const { StringBuilder buf; buf << " min: " << min; buf << " max: " << max; return buf.str(); } } // namespace mongo