diff options
author | Alex Taskov <alex.taskov@mongodb.com> | 2019-08-28 19:30:39 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-08-28 19:30:39 +0000 |
commit | 5e14accc4ebe76366d7d2747fd30b603bf02eac2 (patch) | |
tree | 248180efbbe52bcc5d4c7d00a449479a630ea362 /src/mongo/db/s | |
parent | b2741ca014cc55f59c398d1006ccae06f394018c (diff) | |
download | mongo-5e14accc4ebe76366d7d2747fd30b603bf02eac2.tar.gz |
SERVER-42914 Implement random chunk selection policy for balancer for use in concurrency_*_with_balancer workloads
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/balancer/balancer.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_policy.cpp | 75 |
3 files changed, 96 insertions, 7 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 90d03a118a9..dd1a9011612 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -54,6 +54,7 @@ #include "mongo/s/shard_util.h" #include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/exit.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" #include "mongo/util/version.h" @@ -66,6 +67,8 @@ using std::vector; namespace { +MONGO_FAIL_POINT_DEFINE(overrideBalanceRoundInterval); + const Seconds kBalanceRoundDefaultInterval(10); // Sleep between balancer rounds in the case where the last round found some chunks which needed to @@ -391,9 +394,20 @@ void Balancer::_mainThread() { LOG(1) << "*** End of balancing round"; } - _endRound(opCtx.get(), - _balancedLastTime ? kShortBalanceRoundInterval - : kBalanceRoundDefaultInterval); + auto balancerInterval = [&]() -> Milliseconds { + MONGO_FAIL_POINT_BLOCK(overrideBalanceRoundInterval, data) { + int interval = data.getData()["intervalMs"].numberInt(); + log() << "overrideBalanceRoundInterval: using shorter balancing interval: " + << interval << "ms"; + + return Milliseconds(interval); + } + + return _balancedLastTime ? kShortBalanceRoundInterval + : kBalanceRoundDefaultInterval; + }(); + + _endRound(opCtx.get(), balancerInterval); } catch (const DBException& e) { log() << "caught exception while doing balance: " << e.what(); @@ -441,7 +455,7 @@ void Balancer::_beginRound(OperationContext* opCtx) { _condVar.notify_all(); } -void Balancer::_endRound(OperationContext* opCtx, Seconds waitTimeout) { +void Balancer::_endRound(OperationContext* opCtx, Milliseconds waitTimeout) { { stdx::lock_guard<stdx::mutex> lock(_mutex); _inBalancerRound = false; @@ -453,7 +467,7 @@ void Balancer::_endRound(OperationContext* opCtx, Seconds waitTimeout) { _sleepFor(opCtx, waitTimeout); } -void Balancer::_sleepFor(OperationContext* opCtx, Seconds waitTimeout) { +void Balancer::_sleepFor(OperationContext* opCtx, Milliseconds waitTimeout) { stdx::unique_lock<stdx::mutex> lock(_mutex); _condVar.wait_for(lock, waitTimeout.toSystemDuration(), [&] { return _state != kRunning; }); } diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h index 2b6738def19..d33d6c1ddc0 100644 --- a/src/mongo/db/s/balancer/balancer.h +++ b/src/mongo/db/s/balancer/balancer.h @@ -172,13 +172,13 @@ private: * Signals the beginning and end of a balancing round. */ void _beginRound(OperationContext* opCtx); - void _endRound(OperationContext* opCtx, Seconds waitTimeout); + void _endRound(OperationContext* opCtx, Milliseconds waitTimeout); /** * Blocks the caller for the specified timeout or until the balancer condition variable is * signaled, whichever comes first. */ - void _sleepFor(OperationContext* opCtx, Seconds waitTimeout); + void _sleepFor(OperationContext* opCtx, Milliseconds waitTimeout); /** * Returns true if all the servers listed in configdb as being shards are reachable and are diff --git a/src/mongo/db/s/balancer/balancer_policy.cpp b/src/mongo/db/s/balancer/balancer_policy.cpp index 5f8fcbf5034..efd71ca26ed 100644 --- a/src/mongo/db/s/balancer/balancer_policy.cpp +++ b/src/mongo/db/s/balancer/balancer_policy.cpp @@ -33,14 +33,19 @@ #include "mongo/db/s/balancer/balancer_policy.h" +#include <random> + #include "mongo/db/s/balancer/type_migration.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog/type_tags.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/str.h" namespace mongo { +MONGO_FAIL_POINT_DEFINE(balancerShouldReturnRandomMigrations); + using std::map; using std::numeric_limits; using std::set; @@ -287,11 +292,81 @@ ShardId BalancerPolicy::_getMostOverloadedShard(const ShardStatisticsVector& sha return worst; } +// Returns a random integer in [0, max) using a uniform random distribution. +int getRandomIndex(int max) { + std::default_random_engine gen(time(nullptr)); + std::uniform_int_distribution<int> dist(0, max - 1); + + return dist(gen); +} + +// Iterates through the shardStats vector starting from index until it finds an element that has > 0 +// chunks. It will wrap around at the end and stop at the starting index. If no shards have chunks, +// it will return the original index value. +int getNextShardWithChunks(const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + int index) { + int retIndex = index; + + while (distribution.numberOfChunksInShard(shardStats[retIndex].shardId) == 0) { + retIndex = (retIndex + 1) % shardStats.size(); + + if (retIndex == index) + return index; + } + + return retIndex; +} + +// Returns a randomly chosen pair of source -> destination shards for testing. +// The random pair is chosen by the following algorithm: +// - create an array of indices with values [0, n) +// - select a random index from this set +// - advance the chosen index until we encounter a shard with chunks to move +// - remove the chosen index from the set by swapping it with the last element +// - select the destination index from the remaining indices +MigrateInfo chooseRandomMigration(const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution) { + std::vector<int> indices(shardStats.size()); + + int i = 0; + std::generate(indices.begin(), indices.end(), [&i] { return i++; }); + + int choice = getRandomIndex(indices.size()); + + const int sourceIndex = getNextShardWithChunks(shardStats, distribution, indices[choice]); + const auto& sourceShardId = shardStats[sourceIndex].shardId; + std::swap(indices[sourceIndex], indices[indices.size() - 1]); + + choice = getRandomIndex(indices.size() - 1); + const int destIndex = indices[choice]; + const auto& destShardId = shardStats[destIndex].shardId; + + LOG(1) << "balancerShouldReturnRandomMigrations: source: " << sourceShardId + << " dest: " << destShardId; + + const auto& chunks = distribution.getChunks(sourceShardId); + + return {destShardId, chunks[getRandomIndex(chunks.size())]}; +} + vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardStats, const DistributionStatus& distribution, std::set<ShardId>* usedShards) { vector<MigrateInfo> migrations; + if (MONGO_FAIL_POINT(balancerShouldReturnRandomMigrations) && + !distribution.nss().isConfigDB()) { + LOG(1) << "balancerShouldReturnRandomMigrations failpoint is set"; + + if (shardStats.size() < 2) + return migrations; + + migrations.push_back(chooseRandomMigration(shardStats, distribution)); + + return migrations; + } + // 1) Check for shards, which are in draining mode { for (const auto& stat : shardStats) { |