/* Copyright 2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
#include
#include "mongo/base/owned_pointer_map.h"
#include "mongo/platform/random.h"
#include "mongo/s/balancer_policy.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/config.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/log.h"
namespace mongo {
using boost::scoped_ptr;
using std::auto_ptr;
using std::endl;
using std::map;
using std::string;
using std::stringstream;
using std::vector;
namespace {
typedef OwnedPointerMap > OwnedShardToChunksMap;
TEST( BalancerPolicyTests , SizeMaxedShardTest ) {
ASSERT( ! ShardInfo(0, 0, false).isSizeMaxed() );
ASSERT( ! ShardInfo(100LL, 80LL, false).isSizeMaxed() );
ASSERT( ShardInfo(100LL, 110LL, false).isSizeMaxed() );
}
TEST( BalancerPolicyTests , BalanceNormalTest ) {
// 2 chunks and 0 chunk shards
OwnedShardToChunksMap chunkMap;
auto_ptr > chunks(new OwnedPointerVector());
auto_ptr chunk(new ChunkType());
chunk->setMin(BSON("x" << BSON("$minKey" << 1)));
chunk->setMax(BSON("x" << 49));
chunks->push_back(chunk.release());
chunk.reset(new ChunkType());
chunk->setMin(BSON("x" << 49));
chunk->setMax(BSON("x" << BSON("$maxKey" << 1)));
chunks->push_back(chunk.release());
chunkMap.mutableMap()["shard0"] = chunks.release();
chunkMap.mutableMap()["shard1"] = new OwnedPointerVector();
// no limits
ShardInfoMap info;
info["shard0"] = ShardInfo(0, 2, false);
info["shard1"] = ShardInfo(0, 0, false);
DistributionStatus status(info, chunkMap.map());
boost::scoped_ptr c(BalancerPolicy::balance( "ns", status, 1 ));
ASSERT( c );
}
TEST( BalancerPolicyTests , BalanceJumbo ) {
// 2 chunks and 0 chunk shards
OwnedShardToChunksMap chunkMap;
auto_ptr > chunks(new OwnedPointerVector());
auto_ptr chunk(new ChunkType());
chunk->setMin(BSON("x" << BSON("$minKey" << 1)));
chunk->setMax(BSON("x" << 10));
chunk->setJumbo(true);
chunks->push_back(chunk.release());
chunk.reset(new ChunkType());
chunk->setMin(BSON("x" << 10));
chunk->setMax(BSON("x" << 20));
chunk->setJumbo(true);
chunks->push_back(chunk.release());
chunk.reset(new ChunkType());
chunk->setMin(BSON("x" << 20));
chunk->setMax(BSON("x" << 30));
chunks->push_back(chunk.release());
chunk.reset(new ChunkType());
chunk->setMin(BSON("x" << 30));
chunk->setMax(BSON("x" << 40));
chunk->setJumbo(true);
chunks->push_back(chunk.release());
chunk.reset(new ChunkType());
chunk->setMin(BSON("x" << 40));
chunk->setMax(BSON("x" << BSON("$maxKey" << 1)));
chunks->push_back(chunk.release());
chunkMap.mutableMap()["shard0"] = chunks.release();
chunkMap.mutableMap()["shard1"] = new OwnedPointerVector;
// no limits
ShardInfoMap info;
info["shard0"] = ShardInfo(0, 2, false);
info["shard1"] = ShardInfo(0, 0, false);
DistributionStatus status(info, chunkMap.map());
boost::scoped_ptr c(BalancerPolicy::balance( "ns", status, 1 ));
ASSERT( c );
ASSERT_EQUALS( 30, c->chunk.max["x"].numberInt() );
}
TEST( BalanceNormalTests , BalanceDrainingTest ) {
// one normal, one draining
// 2 chunks and 0 chunk shards
OwnedShardToChunksMap chunkMap;
auto_ptr > chunks(new OwnedPointerVector());
auto_ptr chunk(new ChunkType());
chunk->setMin(BSON("x" << BSON("$minKey" << 1)));
chunk->setMax(BSON("x" << 49));
chunks->push_back(chunk.release());
chunk.reset(new ChunkType());
chunk->setMin(BSON("x" << 49));
chunk->setMax(BSON("x" << BSON("$maxKey" << 1)));
chunks->push_back(chunk.release());
chunkMap.mutableMap()["shard0"] = chunks.release();
chunkMap.mutableMap()["shard1"] = new OwnedPointerVector();
// shard0 is draining
ShardInfoMap limitsMap;
limitsMap["shard0"] = ShardInfo(0LL, 2LL, true);
limitsMap["shard1"] = ShardInfo(0LL, 0LL, false);
DistributionStatus status(limitsMap, chunkMap.map());
boost::scoped_ptr c(BalancerPolicy::balance( "ns", status, 0 ));
ASSERT( c );
ASSERT_EQUALS( c->to , "shard1" );
ASSERT_EQUALS( c->from , "shard0" );
ASSERT( ! c->chunk.min.isEmpty() );
}
TEST( BalancerPolicyTests , BalanceEndedDrainingTest ) {
// 2 chunks and 0 chunk (drain completed) shards
OwnedShardToChunksMap chunkMap;
auto_ptr > chunks(new OwnedPointerVector());
auto_ptr chunk(new ChunkType());
chunk->setMin(BSON("x" << BSON("$minKey" << 1)));
chunk->setMax(BSON("x" << 49));
chunks->push_back(chunk.release());
chunk.reset(new ChunkType());
chunk->setMin(BSON("x" << 49));
chunk->setMax(BSON("x" << BSON("$maxKey" << 1)));
chunks->push_back(chunk.release());
chunkMap.mutableMap()["shard0"] = chunks.release();
chunkMap.mutableMap()["shard1"] = new OwnedPointerVector();
// no limits
ShardInfoMap limitsMap;
limitsMap["shard0"] = ShardInfo(0, 2, false);
limitsMap["shard1"] = ShardInfo(0, 0, true);
DistributionStatus status(limitsMap, chunkMap.map());
boost::scoped_ptr c(BalancerPolicy::balance( "ns", status, 0 ));
ASSERT( ! c );
}
TEST( BalancerPolicyTests , BalanceImpasseTest ) {
// one maxed out, one draining
// 2 chunks and 0 chunk shards
OwnedShardToChunksMap chunkMap;
auto_ptr > chunks(new OwnedPointerVector());
auto_ptr chunk(new ChunkType());
chunk->setMin(BSON("x" << BSON("$minKey" << 1)));
chunk->setMax(BSON("x" << 49));
chunks->push_back(chunk.release());
chunk.reset(new ChunkType());
chunk->setMin(BSON("x" << 49));
chunk->setMax(BSON("x" << BSON("$maxKey" << 1)));
chunks->push_back(chunk.release());
chunkMap.mutableMap()["shard0"] = new OwnedPointerVector();
chunkMap.mutableMap()["shard1"] = chunks.release();
chunkMap.mutableMap()["shard2"] = new OwnedPointerVector();
// shard0 is draining, shard1 is maxed out, shard2 has writebacks pending
ShardInfoMap limitsMap;
limitsMap["shard0"] = ShardInfo(0, 2, true);
limitsMap["shard1"] = ShardInfo(1, 1, false);
limitsMap["shard2"] = ShardInfo(0, 1, true);
DistributionStatus status(limitsMap, chunkMap.map());
boost::scoped_ptr c(BalancerPolicy::balance( "ns", status, 0 ));
ASSERT( ! c );
}
void addShard( OwnedShardToChunksMap& map, unsigned numChunks, bool last ) {
unsigned total = 0;
const OwnedShardToChunksMap::MapType& shardToChunks = map.map();
for (OwnedShardToChunksMap::MapType::const_iterator i = shardToChunks.begin();
i != shardToChunks.end(); ++i) {
total += i->second->size();
}
stringstream ss;
ss << "shard" << shardToChunks.size();
string myName = ss.str();
auto_ptr > chunksList(
new OwnedPointerVector());
for ( unsigned i=0; i chunk(new ChunkType());
if ( i == 0 && total == 0 )
chunk->setMin(BSON("x" << BSON("$minKey" << 1)));
else
chunk->setMin(BSON("x" << total + i));
if ( last && i == ( numChunks - 1 ) )
chunk->setMax(BSON("x" << BSON("$maxKey" << 1)));
else
chunk->setMax(BSON("x" << 1 + total + i));
chunksList->push_back(chunk.release());
}
OwnedShardToChunksMap::MapType& mutableShardToChunks = map.mutableMap();
mutableShardToChunks[myName] = chunksList.release();
}
void moveChunk( OwnedShardToChunksMap& map, MigrateInfo* m ) {
vector& chunks = map.mutableMap()[m->from]->mutableVector();
for (vector::iterator i = chunks.begin(); i != chunks.end(); ++i) {
if ((*i)->getMin() == m->chunk.min) {
map.mutableMap()[m->to]->push_back(*i);
chunks.erase(i);
return;
}
}
verify(0);
}
TEST( BalancerPolicyTests, MultipleDraining ) {
OwnedShardToChunksMap chunks;
addShard( chunks, 5 , false );
addShard( chunks, 10 , false );
addShard( chunks, 5 , true );
ShardInfoMap shards;
shards["shard0"] = ShardInfo(0, 5, true);
shards["shard1"] = ShardInfo(0, 5, true);
shards["shard2"] = ShardInfo(0, 5, false);
DistributionStatus d(shards, chunks.map());
boost::scoped_ptr m(BalancerPolicy::balance( "ns", d, 0 ));
ASSERT( m );
ASSERT_EQUALS( "shard2" , m->to );
}
TEST( BalancerPolicyTests, TagsDraining ) {
OwnedShardToChunksMap chunks;
addShard( chunks, 5 , false );
addShard( chunks, 5 , false );
addShard( chunks, 5 , true );
ShardInfoMap shards;
shards["shard0"] = ShardInfo(0, 5, false);
shards["shard1"] = ShardInfo(0, 5, true);
shards["shard2"] = ShardInfo(0, 5, false);
shards["shard0"].addTag( "a" );
shards["shard1"].addTag( "a" );
shards["shard1"].addTag( "b" );
shards["shard2"].addTag( "b" );
while ( true ) {
DistributionStatus d(shards, chunks.map());
d.addTagRange( TagRange( BSON( "x" << -1 ), BSON( "x" << 7 ) , "a" ) );
d.addTagRange( TagRange( BSON( "x" << 7 ), BSON( "x" << 1000 ) , "b" ) );
boost::scoped_ptr m(BalancerPolicy::balance( "ns", d, 0 ));
if ( ! m )
break;
if ( m->chunk.min["x"].numberInt() < 7 ) {
ASSERT_EQUALS( "shard0" , m->to );
}
else {
ASSERT_EQUALS( "shard2" , m->to );
}
moveChunk( chunks, m.get() );
}
ASSERT_EQUALS( 7U , chunks.mutableMap()["shard0"]->size() );
ASSERT_EQUALS( 0U , chunks.mutableMap()["shard1"]->size() );
ASSERT_EQUALS( 8U , chunks.mutableMap()["shard2"]->size() );
}
TEST( BalancerPolicyTests, TagsPolicyChange ) {
OwnedShardToChunksMap chunks;
addShard( chunks, 5 , false );
addShard( chunks, 5 , false );
addShard( chunks, 5 , true );
ShardInfoMap shards;
shards["shard0"] = ShardInfo(0, 5, false);
shards["shard1"] = ShardInfo(0, 5, false);
shards["shard2"] = ShardInfo(0, 5, false);
shards["shard0"].addTag( "a" );
shards["shard1"].addTag( "a" );
while ( true ) {
DistributionStatus d(shards, chunks.map());
d.addTagRange( TagRange( BSON( "x" << -1 ), BSON( "x" << 1000 ) , "a" ) );
boost::scoped_ptr m(BalancerPolicy::balance( "ns", d, 0 ));
if ( ! m )
break;
moveChunk( chunks, m.get() );
}
const size_t shard0Size = chunks.mutableMap()["shard0"]->size();
const size_t shard1Size = chunks.mutableMap()["shard1"]->size();
ASSERT_EQUALS( 15U , shard0Size + shard1Size );
ASSERT(shard0Size == 7U || shard0Size == 8U);
ASSERT_EQUALS(0U, chunks.mutableMap()["shard2"]->size());
}
TEST( BalancerPolicyTests, TagsSelector ) {
OwnedShardToChunksMap chunks;
ShardInfoMap shards;
DistributionStatus d(shards, chunks.map());
ASSERT( d.addTagRange( TagRange( BSON( "x" << 1 ), BSON( "x" << 10 ) , "a" ) ) );
ASSERT( d.addTagRange( TagRange( BSON( "x" << 10 ), BSON( "x" << 20 ) , "b" ) ) );
ASSERT( d.addTagRange( TagRange( BSON( "x" << 20 ), BSON( "x" << 30 ) , "c" ) ) );
ASSERT( ! d.addTagRange( TagRange( BSON( "x" << 20 ), BSON( "x" << 30 ) , "c" ) ) );
ASSERT( ! d.addTagRange( TagRange( BSON( "x" << 22 ), BSON( "x" << 28 ) , "c" ) ) );
ASSERT( ! d.addTagRange( TagRange( BSON( "x" << 28 ), BSON( "x" << 33 ) , "c" ) ) );
ChunkType chunk;
chunk.setMin(BSON("x" << -4));
ASSERT_EQUALS("", d.getTagForChunk(chunk));
chunk.setMin(BSON("x" << 0));
ASSERT_EQUALS("", d.getTagForChunk(chunk));
chunk.setMin(BSON("x" << 1));
ASSERT_EQUALS("a", d.getTagForChunk(chunk));
chunk.setMin(BSON("x" << 10));
ASSERT_EQUALS("b", d.getTagForChunk(chunk));
chunk.setMin(BSON("x" << 15));
ASSERT_EQUALS("b", d.getTagForChunk(chunk));
chunk.setMin(BSON("x" << 25));
ASSERT_EQUALS("c", d.getTagForChunk(chunk));
chunk.setMin(BSON("x" << 35));
ASSERT_EQUALS("", d.getTagForChunk(chunk));
}
/**
* Idea for this test is to set up three shards, one of which is overloaded (too much data).
*
* Even though the overloaded shard has less chunks, we shouldn't move chunks to that shard.
*/
TEST( BalancerPolicyTests, MaxSizeRespect ) {
OwnedShardToChunksMap chunks;
addShard( chunks, 3 , false );
addShard( chunks, 4 , false );
addShard( chunks, 6 , true );
// Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 3.
// Other shards have maxSize = 0 = unset.
ShardInfoMap shards;
// ShardInfo(maxSize, currSize, draining, opsQueued)
shards["shard0"] = ShardInfo(1, 3, false);
shards["shard1"] = ShardInfo(0, 4, false);
shards["shard2"] = ShardInfo(0, 6, false);
DistributionStatus d(shards, chunks.map());
scoped_ptr m(BalancerPolicy::balance( "ns", d, 0 ));
ASSERT( m );
ASSERT_EQUALS( "shard2" , m->from );
ASSERT_EQUALS( "shard1" , m->to );
}
/**
* Here we check that being over the maxSize is *not* equivalent to draining, we don't want
* to empty shards for no other reason than they are over this limit.
*/
TEST( BalancerPolicyTests, MaxSizeNoDrain ) {
OwnedShardToChunksMap chunks;
// Shard0 will be overloaded
addShard( chunks, 4 , false );
addShard( chunks, 4 , false );
addShard( chunks, 4 , true );
// Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 4.
// Other shards have maxSize = 0 = unset.
ShardInfoMap shards;
// ShardInfo(maxSize, currSize, draining, opsQueued)
shards["shard0"] = ShardInfo(1, 4, false);
shards["shard1"] = ShardInfo(0, 4, false);
shards["shard2"] = ShardInfo(0, 4, false);
DistributionStatus d(shards, chunks.map());
boost::scoped_ptr m(BalancerPolicy::balance( "ns", d, 0 ));
ASSERT( !m );
}
/**
* Idea behind this test is that we set up several shards, the first two of which are
* draining and the second two of which have a data size limit. We also simulate a random
* number of chunks on each shard.
*
* Once the shards are setup, we virtually migrate numChunks times, or until there are no
* more migrations to run. Each chunk is assumed to have a size of 1 unit, and we increment
* our currSize for each shard as the chunks move.
*
* Finally, we ensure that the drained shards are drained, the data-limited shards aren't
* overloaded, and that all shards (including the data limited shard if the baseline isn't
* over the limit are balanced to within 1 unit of some baseline.
*
*/
TEST( BalancerPolicyTests, Simulation ) {
// Hardcode seed here, make test deterministic.
int64_t seed = 1337;
PseudoRandom rng(seed);
// Run test 10 times
for (int test = 0; test < 10; test++) {
//
// Setup our shards as draining, with maxSize, and normal
//
int numShards = 7;
int numChunks = 0;
OwnedShardToChunksMap chunks;
ShardInfoMap shards;
map expected;
for (int i = 0; i < numShards; i++) {
int numShardChunks = rng.nextInt32(100);
bool draining = i < 2;
bool maxed = i >= 2 && i < 4;
if (draining) expected[str::stream() << "shard" << i] = 0;
if (maxed) expected[str::stream() << "shard" << i] = numShardChunks + 1;
addShard(chunks, numShardChunks, false);
numChunks += numShardChunks;
shards[str::stream() << "shard" << i] =
ShardInfo(maxed ? numShardChunks + 1 : 0,
numShardChunks, draining);
}
for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) {
log() << it->first << " : " << it->second.toString() << endl;
}
//
// Perform migrations and increment data size as chunks move
//
for (int i = 0; i < numChunks; i++) {
DistributionStatus d(shards, chunks.map());
boost::scoped_ptr m(BalancerPolicy::balance( "ns", d, i != 0 ));
if (!m) {
log() << "Finished with test moves." << endl;
break;
}
moveChunk(chunks, m.get());
{
ShardInfo& info = shards[m->from];
shards[m->from] = ShardInfo(info.getMaxSizeMB(),
info.getCurrSizeMB() - 1,
info.isDraining());
}
{
ShardInfo& info = shards[m->to];
shards[m->to] = ShardInfo(info.getMaxSizeMB(),
info.getCurrSizeMB() + 1,
info.isDraining());
}
}
//
// Make sure our balance is correct and our data size is low.
//
// The balanced value is the count on the last shard, since it's not draining or
// limited
int balancedSize = (--shards.end())->second.getCurrSizeMB();
for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) {
log() << it->first << " : " << it->second.toString() << endl;
}
for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) {
log() << it->first << " : " << it->second.toString() << endl;
map::iterator expectedIt = expected.find(it->first);
if (expectedIt == expected.end()) {
bool isInRange = it->second.getCurrSizeMB() >= balancedSize - 1 &&
it->second.getCurrSizeMB() <= balancedSize + 1;
if (!isInRange) {
warning() << "non-limited and non-draining shard had "
<< it->second.getCurrSizeMB() << " chunks, expected near "
<< balancedSize << endl;
}
ASSERT(isInRange);
}
else {
int expectedSize = expectedIt->second;
bool isInRange = it->second.getCurrSizeMB() <= expectedSize;
if (isInRange && expectedSize >= balancedSize) {
isInRange = it->second.getCurrSizeMB() >= balancedSize - 1 &&
it->second.getCurrSizeMB() <= balancedSize + 1;
}
if (!isInRange) {
warning() << "limited or draining shard had "
<< it->second.getCurrSizeMB() << " chunks, expected less than "
<< expectedSize << " and (if less than expected) near "
<< balancedSize << endl;
}
ASSERT(isInRange);
}
}
}
}
}
}