// balancer_policy.cpp
/**
* Copyright (C) 2010 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include
#include "mongo/s/balancer_policy.h"
#include "mongo/s/chunk.h"
#include "mongo/s/config.h"
#include "mongo/s/type_shard.h"
#include "mongo/s/type_tags.h"
#include "mongo/util/log.h"
#include "mongo/util/stringutils.h"
#include "mongo/util/text.h"
namespace mongo {
using std::auto_ptr;
using std::endl;
using std::map;
using std::numeric_limits;
using std::set;
using std::string;
using std::vector;
string TagRange::toString() const {
return str::stream() << min << " -->> " << max << " on " << tag;
}
DistributionStatus::DistributionStatus(const ShardInfoMap& shardInfo,
const ShardToChunksMap& shardToChunksMap)
: _shardInfo(shardInfo), _shardChunks(shardToChunksMap) {
for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) {
_shards.insert(i->first);
}
}
const ShardInfo& DistributionStatus::shardInfo(const string& shard) const {
ShardInfoMap::const_iterator i = _shardInfo.find(shard);
verify(i != _shardInfo.end());
return i->second;
}
unsigned DistributionStatus::totalChunks() const {
unsigned total = 0;
for (ShardToChunksMap::const_iterator i = _shardChunks.begin(); i != _shardChunks.end(); ++i) {
total += i->second->size();
}
return total;
}
unsigned DistributionStatus::numberOfChunksInShard(const string& shard) const {
ShardToChunksMap::const_iterator i = _shardChunks.find(shard);
if (i == _shardChunks.end()) {
return 0;
}
return i->second->size();
}
unsigned DistributionStatus::numberOfChunksInShardWithTag(const string& shard,
const string& tag) const {
ShardToChunksMap::const_iterator i = _shardChunks.find(shard);
if (i == _shardChunks.end()) {
return 0;
}
unsigned total = 0;
const vector& chunkList = i->second->vector();
for (unsigned j = 0; j < i->second->size(); j++) {
if (tag == getTagForChunk(*chunkList[j])) {
total++;
}
}
return total;
}
string DistributionStatus::getBestReceieverShard(const string& tag) const {
string best;
unsigned minChunks = numeric_limits::max();
for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) {
if (i->second.isSizeMaxed()) {
LOG(1) << i->first << " has already reached the maximum total chunk size." << endl;
continue;
}
if (i->second.isDraining()) {
LOG(1) << i->first << " is currently draining." << endl;
continue;
}
if (!i->second.hasTag(tag)) {
LOG(1) << i->first << " doesn't have right tag" << endl;
continue;
}
unsigned myChunks = numberOfChunksInShard(i->first);
if (myChunks >= minChunks) {
LOG(1) << i->first << " has more chunks me:" << myChunks << " best: " << best << ":"
<< minChunks << endl;
continue;
}
best = i->first;
minChunks = myChunks;
}
return best;
}
string DistributionStatus::getMostOverloadedShard(const string& tag) const {
string worst;
unsigned maxChunks = 0;
for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) {
unsigned myChunks = numberOfChunksInShardWithTag(i->first, tag);
if (myChunks <= maxChunks)
continue;
worst = i->first;
maxChunks = myChunks;
}
return worst;
}
const vector& DistributionStatus::getChunks(const string& shard) const {
ShardToChunksMap::const_iterator i = _shardChunks.find(shard);
verify(i != _shardChunks.end());
return i->second->vector();
}
bool DistributionStatus::addTagRange(const TagRange& range) {
// first check for overlaps
for (map::const_iterator i = _tagRanges.begin(); i != _tagRanges.end();
++i) {
const TagRange& tocheck = i->second;
if (range.min == tocheck.min) {
LOG(1) << "have 2 ranges with the same min " << range << " " << tocheck << endl;
return false;
}
if (range.min < tocheck.min) {
if (range.max > tocheck.min) {
LOG(1) << "have overlapping ranges " << range << " " << tocheck << endl;
return false;
}
} else {
// range.min > tocheck.min
if (tocheck.max > range.min) {
LOG(1) << "have overlapping ranges " << range << " " << tocheck << endl;
return false;
}
}
}
_tagRanges[range.max.getOwned()] = range;
_allTags.insert(range.tag);
return true;
}
string DistributionStatus::getTagForChunk(const ChunkType& chunk) const {
if (_tagRanges.size() == 0)
return "";
const BSONObj min(chunk.getMin());
map::const_iterator i = _tagRanges.upper_bound(min);
if (i == _tagRanges.end())
return "";
const TagRange& range = i->second;
if (min < range.min)
return "";
return range.tag;
}
void DistributionStatus::dump() const {
log() << "DistributionStatus" << endl;
log() << " shards" << endl;
for (ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i) {
log() << " " << i->first << "\t" << i->second.toString() << endl;
ShardToChunksMap::const_iterator j = _shardChunks.find(i->first);
verify(j != _shardChunks.end());
const OwnedPointerVector& v = *j->second;
for (unsigned x = 0; x < v.size(); x++)
log() << " " << *v[x] << endl;
}
if (_tagRanges.size() > 0) {
log() << " tag ranges" << endl;
for (map::const_iterator i = _tagRanges.begin(); i != _tagRanges.end();
++i)
log() << i->second.toString() << endl;
}
}
Status DistributionStatus::populateShardInfoMap(ShardInfoMap* shardInfo) {
try {
ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30);
auto_ptr cursor(conn->query(ShardType::ConfigNS, Query()));
uassert(28597, "Failed to load shard config", cursor.get() != NULL);
while (cursor->more()) {
ShardType shard;
std::string errMsg;
bool parseOk = shard.parseBSON(cursor->next(), &errMsg);
if (!parseOk) {
return Status(ErrorCodes::UnsupportedFormat, errMsg);
}
std::set dummy;
ShardInfo newShardEntry(shard.getMaxSize(),
Shard::getShardDataSizeBytes(shard.getHost()) / 1024 / 1024,
shard.getDraining(),
dummy,
Shard::getShardMongoVersion(shard.getHost()));
if (shard.isTagsSet()) {
BSONArrayIteratorSorted tagIter(shard.getTags());
while (tagIter.more()) {
BSONElement tagElement = tagIter.next();
if (tagElement.type() != String) {
return Status(ErrorCodes::UnsupportedFormat,
str::stream() << "shard tags only supports strings, "
<< "found " << typeName(tagElement.type()));
}
newShardEntry.addTag(tagElement.String());
}
}
shardInfo->insert(make_pair(shard.getName(), newShardEntry));
}
conn.done();
} catch (const DBException& ex) {
return ex.toStatus();
}
return Status::OK();
}
void DistributionStatus::populateShardToChunksMap(const ShardInfoMap& allShards,
const ChunkManager& chunkMgr,
ShardToChunksMap* shardToChunksMap) {
// Makes sure there is an entry in shardToChunksMap for every shard.
for (ShardInfoMap::const_iterator it = allShards.begin(); it != allShards.end(); ++it) {
OwnedPointerVector*& chunkList = (*shardToChunksMap)[it->first];
if (chunkList == NULL) {
chunkList = new OwnedPointerVector();
}
}
const ChunkMap& chunkMap = chunkMgr.getChunkMap();
for (ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it) {
const ChunkPtr chunkPtr = it->second;
auto_ptr chunk(new ChunkType());
chunk->setNS(chunkPtr->getns());
chunk->setMin(chunkPtr->getMin().getOwned());
chunk->setMax(chunkPtr->getMax().getOwned());
chunk->setJumbo(chunkPtr->isJumbo()); // TODO: is this reliable?
const string shardName(chunkPtr->getShard().getName());
chunk->setShard(shardName);
(*shardToChunksMap)[shardName]->push_back(chunk.release());
}
}
StatusWith DistributionStatus::getTagForSingleChunk(const string& configServer,
const string& ns,
const ChunkType& chunk) {
BSONObj tagRangeDoc;
try {
ScopedDbConnection conn(configServer, 30);
Query query(QUERY(TagsType::ns(ns) << TagsType::min() << BSON("$lte" << chunk.getMin())
<< TagsType::max() << BSON("$gte" << chunk.getMax())));
tagRangeDoc = conn->findOne(TagsType::ConfigNS, query);
conn.done();
} catch (const DBException& excep) {
return StatusWith(excep.toStatus());
}
if (tagRangeDoc.isEmpty()) {
return StatusWith("");
}
TagsType tagRange;
string errMsg;
if (!tagRange.parseBSON(tagRangeDoc, &errMsg)) {
return StatusWith(ErrorCodes::FailedToParse, errMsg);
}
return StatusWith(tagRange.getTag());
}
MigrateInfo* BalancerPolicy::balance(const string& ns,
const DistributionStatus& distribution,
int balancedLastTime) {
// 1) check for shards that policy require to us to move off of:
// draining only
// 2) check tag policy violations
// 3) then we make sure chunks are balanced for each tag
// ----
// 1) check things we have to move
{
const set& shards = distribution.shards();
for (set::const_iterator z = shards.begin(); z != shards.end(); ++z) {
string shard = *z;
const ShardInfo& info = distribution.shardInfo(shard);
if (!info.isDraining())
continue;
if (distribution.numberOfChunksInShard(shard) == 0)
continue;
// now we know we need to move to chunks off this shard
// we will if we are allowed
const vector& chunks = distribution.getChunks(shard);
unsigned numJumboChunks = 0;
// since we have to move all chunks, lets just do in order
for (unsigned i = 0; i < chunks.size(); i++) {
const ChunkType& chunkToMove = *chunks[i];
if (chunkToMove.isJumboSet() && chunkToMove.getJumbo()) {
numJumboChunks++;
continue;
}
string tag = distribution.getTagForChunk(chunkToMove);
string to = distribution.getBestReceieverShard(tag);
if (to.size() == 0) {
warning() << "want to move chunk: " << chunkToMove << "(" << tag << ") "
<< "from " << shard << " but can't find anywhere to put it" << endl;
continue;
}
log() << "going to move " << chunkToMove << " from " << shard << "(" << tag << ")"
<< " to " << to << endl;
return new MigrateInfo(ns, to, shard, chunkToMove.toBSON());
}
warning() << "can't find any chunk to move from: " << shard << " but we want to. "
<< " numJumboChunks: " << numJumboChunks << endl;
}
}
// 2) tag violations
if (distribution.tags().size() > 0) {
const set& shards = distribution.shards();
for (set::const_iterator i = shards.begin(); i != shards.end(); ++i) {
string shard = *i;
const ShardInfo& info = distribution.shardInfo(shard);
const vector& chunks = distribution.getChunks(shard);
for (unsigned j = 0; j < chunks.size(); j++) {
const ChunkType& chunk = *chunks[j];
string tag = distribution.getTagForChunk(chunk);
if (info.hasTag(tag))
continue;
// uh oh, this chunk is in the wrong place
log() << "chunk " << chunk << " is not on a shard with the right tag: " << tag
<< endl;
if (chunk.isJumboSet() && chunk.getJumbo()) {
warning() << "chunk " << chunk << " is jumbo, so cannot be moved" << endl;
continue;
}
string to = distribution.getBestReceieverShard(tag);
if (to.size() == 0) {
log() << "no where to put it :(" << endl;
continue;
}
verify(to != shard);
log() << " going to move to: " << to << endl;
return new MigrateInfo(ns, to, shard, chunk.toBSON());
}
}
}
// 3) for each tag balance
int threshold = 8;
if (balancedLastTime || distribution.totalChunks() < 20)
threshold = 2;
else if (distribution.totalChunks() < 80)
threshold = 4;
// randomize the order in which we balance the tags
// this is so that one bad tag doesn't prevent others from getting balanced
vector tags;
{
set t = distribution.tags();
for (set::const_iterator i = t.begin(); i != t.end(); ++i)
tags.push_back(*i);
tags.push_back("");
std::random_shuffle(tags.begin(), tags.end());
}
for (unsigned i = 0; i < tags.size(); i++) {
string tag = tags[i];
string from = distribution.getMostOverloadedShard(tag);
if (from.size() == 0)
continue;
unsigned max = distribution.numberOfChunksInShardWithTag(from, tag);
if (max == 0)
continue;
string to = distribution.getBestReceieverShard(tag);
if (to.size() == 0) {
log() << "no available shards to take chunks for tag [" << tag << "]" << endl;
return NULL;
}
unsigned min = distribution.numberOfChunksInShardWithTag(to, tag);
const int imbalance = max - min;
LOG(1) << "collection : " << ns << endl;
LOG(1) << "donor : " << from << " chunks on " << max << endl;
LOG(1) << "receiver : " << to << " chunks on " << min << endl;
LOG(1) << "threshold : " << threshold << endl;
if (imbalance < threshold)
continue;
const vector& chunks = distribution.getChunks(from);
unsigned numJumboChunks = 0;
for (unsigned j = 0; j < chunks.size(); j++) {
const ChunkType& chunk = *chunks[j];
if (distribution.getTagForChunk(chunk) != tag)
continue;
if (chunk.isJumboSet() && chunk.getJumbo()) {
numJumboChunks++;
continue;
}
log() << " ns: " << ns << " going to move " << chunk << " from: " << from
<< " to: " << to << " tag [" << tag << "]" << endl;
return new MigrateInfo(ns, to, from, chunk.toBSON());
}
if (numJumboChunks) {
error() << "shard: " << from << " ns: " << ns
<< " has too many chunks, but they are all jumbo "
<< " numJumboChunks: " << numJumboChunks << endl;
continue;
}
verify(false); // should be impossible
}
// Everything is balanced here!
return NULL;
}
ShardInfo::ShardInfo(long long maxSizeMB,
long long currSizeMB,
bool draining,
const set& tags,
const string& mongoVersion)
: _maxSizeMB(maxSizeMB),
_currSizeMB(currSizeMB),
_draining(draining),
_tags(tags),
_mongoVersion(mongoVersion) {}
ShardInfo::ShardInfo() : _maxSizeMB(0), _currSizeMB(0), _draining(false) {}
void ShardInfo::addTag(const string& tag) {
_tags.insert(tag);
}
bool ShardInfo::isSizeMaxed() const {
if (_maxSizeMB == 0 || _currSizeMB == 0)
return false;
return _currSizeMB >= _maxSizeMB;
}
bool ShardInfo::hasTag(const string& tag) const {
if (tag.size() == 0)
return true;
return _tags.count(tag) > 0;
}
string ShardInfo::toString() const {
StringBuilder ss;
ss << " maxSizeMB: " << _maxSizeMB;
ss << " currSizeMB: " << _currSizeMB;
ss << " draining: " << _draining;
if (_tags.size() > 0) {
ss << "tags : ";
for (set::const_iterator i = _tags.begin(); i != _tags.end(); ++i)
ss << *i << ",";
}
ss << " version: " << _mongoVersion;
return ss.str();
}
string ChunkInfo::toString() const {
StringBuilder buf;
buf << " min: " << min;
buf << " max: " << max;
return buf.str();
}
} // namespace mongo