/** * Copyright (C) 2012-2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault #include "mongo/platform/random.h" #include "mongo/s/balancer_policy.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/config.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" namespace { using namespace mongo; using std::map; using std::string; using std::stringstream; using std::vector; TEST(BalancerPolicyTests, SizeMaxedShardTest) { ASSERT(!ShardInfo(0, 0, false).isSizeMaxed()); ASSERT(!ShardInfo(100LL, 80LL, false).isSizeMaxed()); ASSERT(ShardInfo(100LL, 110LL, false).isSizeMaxed()); } TEST(BalancerPolicyTests, BalanceNormalTest) { ShardToChunksMap chunkMap; vector chunks; { ChunkType chunk; chunk.setMin(BSON("x" << BSON("$minKey" << 1))); chunk.setMax(BSON("x" << 49)); chunks.push_back(chunk); } { ChunkType chunk; chunk.setMin(BSON("x" << 49)); chunk.setMax(BSON("x" << BSON("$maxKey" << 1))); chunks.push_back(chunk); } chunkMap["shard0"] = chunks; chunkMap["shard1"] = vector(); // no limits ShardInfoMap info; info["shard0"] = ShardInfo(0, 2, false); info["shard1"] = ShardInfo(0, 0, false); DistributionStatus status(info, chunkMap); std::unique_ptr c(BalancerPolicy::balance("ns", status, 1)); ASSERT(c); } TEST(BalancerPolicyTests, BalanceJumbo) { ShardToChunksMap chunkMap; vector chunks; { ChunkType chunk; chunk.setMin(BSON("x" << BSON("$minKey" << 1))); chunk.setMax(BSON("x" << 10)); chunk.setJumbo(true); chunks.push_back(chunk); } { ChunkType chunk; chunk.setMin(BSON("x" << 10)); chunk.setMax(BSON("x" << 20)); chunk.setJumbo(true); chunks.push_back(chunk); } { ChunkType chunk; chunk.setMin(BSON("x" << 20)); chunk.setMax(BSON("x" << 30)); chunks.push_back(chunk); } { ChunkType chunk; chunk.setMin(BSON("x" << 30)); chunk.setMax(BSON("x" << 40)); chunk.setJumbo(true); chunks.push_back(chunk); } { ChunkType chunk; chunk.setMin(BSON("x" << 40)); chunk.setMax(BSON("x" << BSON("$maxKey" << 1))); chunks.push_back(chunk); } chunkMap["shard0"] = chunks; chunkMap["shard1"] = vector(); // no limits ShardInfoMap info; info["shard0"] = ShardInfo(0, 2, false); info["shard1"] = ShardInfo(0, 0, false); DistributionStatus status(info, chunkMap); std::unique_ptr c(BalancerPolicy::balance("ns", status, 1)); ASSERT(c); ASSERT_EQUALS(30, c->chunk.max["x"].numberInt()); } TEST(BalanceNormalTests, BalanceDrainingTest) { ShardToChunksMap chunkMap; vector chunks; { ChunkType chunk; chunk.setMin(BSON("x" << BSON("$minKey" << 1))); chunk.setMax(BSON("x" << 49)); chunks.push_back(chunk); } { ChunkType chunk; chunk.setMin(BSON("x" << 49)); chunk.setMax(BSON("x" << BSON("$maxKey" << 1))); chunks.push_back(chunk); } chunkMap["shard0"] = chunks; chunkMap["shard1"] = vector(); // shard0 is draining ShardInfoMap limitsMap; limitsMap["shard0"] = ShardInfo(0LL, 2LL, true); limitsMap["shard1"] = ShardInfo(0LL, 0LL, false); DistributionStatus status(limitsMap, chunkMap); std::unique_ptr c(BalancerPolicy::balance("ns", status, 0)); ASSERT(c); ASSERT_EQUALS(c->to, "shard1"); ASSERT_EQUALS(c->from, "shard0"); ASSERT(!c->chunk.min.isEmpty()); } TEST(BalancerPolicyTests, BalanceEndedDrainingTest) { ShardToChunksMap chunkMap; vector chunks; { ChunkType chunk; chunk.setMin(BSON("x" << BSON("$minKey" << 1))); chunk.setMax(BSON("x" << 49)); chunks.push_back(chunk); } { ChunkType chunk; chunk.setMin(BSON("x" << 49)); chunk.setMax(BSON("x" << BSON("$maxKey" << 1))); chunks.push_back(chunk); } chunkMap["shard0"] = chunks; chunkMap["shard1"] = vector(); // no limits ShardInfoMap limitsMap; limitsMap["shard0"] = ShardInfo(0, 2, false); limitsMap["shard1"] = ShardInfo(0, 0, true); DistributionStatus status(limitsMap, chunkMap); std::unique_ptr c(BalancerPolicy::balance("ns", status, 0)); ASSERT(!c); } TEST(BalancerPolicyTests, BalanceImpasseTest) { ShardToChunksMap chunkMap; vector chunks; { ChunkType chunk; chunk.setMin(BSON("x" << BSON("$minKey" << 1))); chunk.setMax(BSON("x" << 49)); chunks.push_back(chunk); } { ChunkType chunk; chunk.setMin(BSON("x" << 49)); chunk.setMax(BSON("x" << BSON("$maxKey" << 1))); chunks.push_back(chunk); } chunkMap["shard0"] = vector(); chunkMap["shard1"] = chunks; chunkMap["shard2"] = vector(); // shard0 is draining, shard1 is maxed out, shard2 has writebacks pending ShardInfoMap limitsMap; limitsMap["shard0"] = ShardInfo(0, 2, true); limitsMap["shard1"] = ShardInfo(1, 1, false); limitsMap["shard2"] = ShardInfo(0, 1, true); DistributionStatus status(limitsMap, chunkMap); std::unique_ptr c(BalancerPolicy::balance("ns", status, 0)); ASSERT(!c); } void addShard(ShardToChunksMap& shardToChunks, unsigned numChunks, bool last) { unsigned total = 0; for (const auto& chunk : shardToChunks) { total += chunk.second.size(); } stringstream ss; ss << "shard" << shardToChunks.size(); string myName = ss.str(); vector chunksList; for (unsigned i = 0; i < numChunks; i++) { ChunkType chunk; if (i == 0 && total == 0) { chunk.setMin(BSON("x" << BSON("$minKey" << 1))); } else { chunk.setMin(BSON("x" << total + i)); } if (last && i == (numChunks - 1)) { chunk.setMax(BSON("x" << BSON("$maxKey" << 1))); } else { chunk.setMax(BSON("x" << 1 + total + i)); } chunksList.push_back(chunk); } shardToChunks[myName] = chunksList; } void moveChunk(ShardToChunksMap& shardToChunks, MigrateInfo* m) { vector& chunks = shardToChunks[m->from]; for (vector::iterator i = chunks.begin(); i != chunks.end(); ++i) { if (i->getMin() == m->chunk.min) { shardToChunks[m->to].push_back(*i); chunks.erase(i); return; } } invariant(false); } TEST(BalancerPolicyTests, MultipleDraining) { ShardToChunksMap chunks; addShard(chunks, 5, false); addShard(chunks, 10, false); addShard(chunks, 5, true); ShardInfoMap shards; shards["shard0"] = ShardInfo(0, 5, true); shards["shard1"] = ShardInfo(0, 5, true); shards["shard2"] = ShardInfo(0, 5, false); DistributionStatus d(shards, chunks); std::unique_ptr m(BalancerPolicy::balance("ns", d, 0)); ASSERT(m); ASSERT_EQUALS("shard2", m->to); } TEST(BalancerPolicyTests, TagsDraining) { ShardToChunksMap chunks; addShard(chunks, 5, false); addShard(chunks, 5, false); addShard(chunks, 5, true); ShardInfoMap shards; shards["shard0"] = ShardInfo(0, 5, false); shards["shard1"] = ShardInfo(0, 5, true); shards["shard2"] = ShardInfo(0, 5, false); shards["shard0"].addTag("a"); shards["shard1"].addTag("a"); shards["shard1"].addTag("b"); shards["shard2"].addTag("b"); while (true) { DistributionStatus d(shards, chunks); d.addTagRange(TagRange(BSON("x" << -1), BSON("x" << 7), "a")); d.addTagRange(TagRange(BSON("x" << 7), BSON("x" << 1000), "b")); std::unique_ptr m(BalancerPolicy::balance("ns", d, 0)); if (!m) { break; } if (m->chunk.min["x"].numberInt() < 7) { ASSERT_EQUALS("shard0", m->to); } else { ASSERT_EQUALS("shard2", m->to); } moveChunk(chunks, m.get()); } ASSERT_EQUALS(7U, chunks["shard0"].size()); ASSERT_EQUALS(0U, chunks["shard1"].size()); ASSERT_EQUALS(8U, chunks["shard2"].size()); } TEST(BalancerPolicyTests, TagsPolicyChange) { ShardToChunksMap chunks; addShard(chunks, 5, false); addShard(chunks, 5, false); addShard(chunks, 5, true); ShardInfoMap shards; shards["shard0"] = ShardInfo(0, 5, false); shards["shard1"] = ShardInfo(0, 5, false); shards["shard2"] = ShardInfo(0, 5, false); shards["shard0"].addTag("a"); shards["shard1"].addTag("a"); while (true) { DistributionStatus d(shards, chunks); d.addTagRange(TagRange(BSON("x" << -1), BSON("x" << 1000), "a")); std::unique_ptr m(BalancerPolicy::balance("ns", d, 0)); if (!m) { break; } moveChunk(chunks, m.get()); } const size_t shard0Size = chunks["shard0"].size(); const size_t shard1Size = chunks["shard1"].size(); ASSERT_EQUALS(15U, shard0Size + shard1Size); ASSERT(shard0Size == 7U || shard0Size == 8U); ASSERT_EQUALS(0U, chunks["shard2"].size()); } TEST(BalancerPolicyTests, TagsSelector) { ShardToChunksMap chunks; ShardInfoMap shards; DistributionStatus d(shards, chunks); ASSERT(d.addTagRange(TagRange(BSON("x" << 1), BSON("x" << 10), "a"))); ASSERT(d.addTagRange(TagRange(BSON("x" << 10), BSON("x" << 20), "b"))); ASSERT(d.addTagRange(TagRange(BSON("x" << 20), BSON("x" << 30), "c"))); ASSERT(!d.addTagRange(TagRange(BSON("x" << 20), BSON("x" << 30), "c"))); ASSERT(!d.addTagRange(TagRange(BSON("x" << 22), BSON("x" << 28), "c"))); ASSERT(!d.addTagRange(TagRange(BSON("x" << 28), BSON("x" << 33), "c"))); { ChunkType chunk; chunk.setMin(BSON("x" << -4)); ASSERT_EQUALS("", d.getTagForChunk(chunk)); } { ChunkType chunk; chunk.setMin(BSON("x" << 0)); ASSERT_EQUALS("", d.getTagForChunk(chunk)); } { ChunkType chunk; chunk.setMin(BSON("x" << 1)); ASSERT_EQUALS("a", d.getTagForChunk(chunk)); } { ChunkType chunk; chunk.setMin(BSON("x" << 10)); ASSERT_EQUALS("b", d.getTagForChunk(chunk)); } { ChunkType chunk; chunk.setMin(BSON("x" << 15)); ASSERT_EQUALS("b", d.getTagForChunk(chunk)); } { ChunkType chunk; chunk.setMin(BSON("x" << 25)); ASSERT_EQUALS("c", d.getTagForChunk(chunk)); } { ChunkType chunk; chunk.setMin(BSON("x" << 35)); ASSERT_EQUALS("", d.getTagForChunk(chunk)); } } /** * Idea for this test is to set up three shards, one of which is overloaded (too much data). * * Even though the overloaded shard has less chunks, we shouldn't move chunks to that shard. */ TEST(BalancerPolicyTests, MaxSizeRespect) { ShardToChunksMap chunks; addShard(chunks, 3, false); addShard(chunks, 4, false); addShard(chunks, 6, true); // Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 3. // Other shards have maxSize = 0 = unset. ShardInfoMap shards; shards["shard0"] = ShardInfo(1, 3, false); shards["shard1"] = ShardInfo(0, 4, false); shards["shard2"] = ShardInfo(0, 6, false); DistributionStatus d(shards, chunks); std::unique_ptr m(BalancerPolicy::balance("ns", d, 0)); ASSERT(m); ASSERT_EQUALS("shard2", m->from); ASSERT_EQUALS("shard1", m->to); } /** * Here we check that being over the maxSize is *not* equivalent to draining, we don't want * to empty shards for no other reason than they are over this limit. */ TEST(BalancerPolicyTests, MaxSizeNoDrain) { ShardToChunksMap chunks; // Shard0 will be overloaded addShard(chunks, 4, false); addShard(chunks, 4, false); addShard(chunks, 4, true); // Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 4. // Other shards have maxSize = 0 = unset. ShardInfoMap shards; // ShardInfo(maxSize, currSize, draining, opsQueued) shards["shard0"] = ShardInfo(1, 4, false); shards["shard1"] = ShardInfo(0, 4, false); shards["shard2"] = ShardInfo(0, 4, false); DistributionStatus d(shards, chunks); std::unique_ptr m(BalancerPolicy::balance("ns", d, 0)); ASSERT(!m); } /** * Idea behind this test is that we set up several shards, the first two of which are * draining and the second two of which have a data size limit. We also simulate a random * number of chunks on each shard. * * Once the shards are setup, we virtually migrate numChunks times, or until there are no * more migrations to run. Each chunk is assumed to have a size of 1 unit, and we increment * our currSize for each shard as the chunks move. * * Finally, we ensure that the drained shards are drained, the data-limited shards aren't * overloaded, and that all shards (including the data limited shard if the baseline isn't * over the limit are balanced to within 1 unit of some baseline. * */ TEST(BalancerPolicyTests, Simulation) { // Hardcode seed here, make test deterministic. int64_t seed = 1337; PseudoRandom rng(seed); // Run test 10 times for (int test = 0; test < 10; test++) { // Setup our shards as draining, with maxSize, and normal int numShards = 7; int numChunks = 0; ShardToChunksMap chunks; ShardInfoMap shards; map expected; for (int i = 0; i < numShards; i++) { int numShardChunks = rng.nextInt32(100); bool draining = i < 2; bool maxed = i >= 2 && i < 4; if (draining) { expected[str::stream() << "shard" << i] = 0; } if (maxed) { expected[str::stream() << "shard" << i] = numShardChunks + 1; } addShard(chunks, numShardChunks, false); numChunks += numShardChunks; shards[str::stream() << "shard" << i] = ShardInfo(maxed ? numShardChunks + 1 : 0, numShardChunks, draining); } for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) { log() << it->first << " : " << it->second.toString(); } // Perform migrations and increment data size as chunks move for (int i = 0; i < numChunks; i++) { DistributionStatus d(shards, chunks); std::unique_ptr m(BalancerPolicy::balance("ns", d, i != 0)); if (!m) { log() << "Finished with test moves."; break; } moveChunk(chunks, m.get()); { ShardInfo& info = shards[m->from]; shards[m->from] = ShardInfo(info.getMaxSizeMB(), info.getCurrSizeMB() - 1, info.isDraining()); } { ShardInfo& info = shards[m->to]; shards[m->to] = ShardInfo(info.getMaxSizeMB(), info.getCurrSizeMB() + 1, info.isDraining()); } } // Make sure our balance is correct and our data size is low. // The balanced value is the count on the last shard, since it's not draining or // limited. int balancedSize = (--shards.end())->second.getCurrSizeMB(); for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) { log() << it->first << " : " << it->second.toString(); } for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) { log() << it->first << " : " << it->second.toString(); map::iterator expectedIt = expected.find(it->first); if (expectedIt == expected.end()) { bool isInRange = it->second.getCurrSizeMB() >= balancedSize - 1 && it->second.getCurrSizeMB() <= balancedSize + 1; if (!isInRange) { warning() << "non-limited and non-draining shard had " << it->second.getCurrSizeMB() << " chunks, expected near " << balancedSize; } ASSERT(isInRange); } else { int expectedSize = expectedIt->second; bool isInRange = it->second.getCurrSizeMB() <= expectedSize; if (isInRange && expectedSize >= balancedSize) { isInRange = it->second.getCurrSizeMB() >= balancedSize - 1 && it->second.getCurrSizeMB() <= balancedSize + 1; } if (!isInRange) { warning() << "limited or draining shard had " << it->second.getCurrSizeMB() << " chunks, expected less than " << expectedSize << " and (if less than expected) near " << balancedSize; } ASSERT(isInRange); } } } } } // namespace