summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorAlex Taskov <alex.taskov@mongodb.com>2019-08-28 19:30:39 +0000
committerevergreen <evergreen@mongodb.com>2019-08-28 19:30:39 +0000
commit5e14accc4ebe76366d7d2747fd30b603bf02eac2 (patch)
tree248180efbbe52bcc5d4c7d00a449479a630ea362 /src/mongo/db/s
parentb2741ca014cc55f59c398d1006ccae06f394018c (diff)
downloadmongo-5e14accc4ebe76366d7d2747fd30b603bf02eac2.tar.gz
SERVER-42914 Implement random chunk selection policy for balancer for use in concurrency_*_with_balancer workloads
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/balancer/balancer.cpp24
-rw-r--r--src/mongo/db/s/balancer/balancer.h4
-rw-r--r--src/mongo/db/s/balancer/balancer_policy.cpp75
3 files changed, 96 insertions, 7 deletions
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp
index 90d03a118a9..dd1a9011612 100644
--- a/src/mongo/db/s/balancer/balancer.cpp
+++ b/src/mongo/db/s/balancer/balancer.cpp
@@ -54,6 +54,7 @@
#include "mongo/s/shard_util.h"
#include "mongo/util/concurrency/idle_thread_block.h"
#include "mongo/util/exit.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/timer.h"
#include "mongo/util/version.h"
@@ -66,6 +67,8 @@ using std::vector;
namespace {
+MONGO_FAIL_POINT_DEFINE(overrideBalanceRoundInterval);
+
const Seconds kBalanceRoundDefaultInterval(10);
// Sleep between balancer rounds in the case where the last round found some chunks which needed to
@@ -391,9 +394,20 @@ void Balancer::_mainThread() {
LOG(1) << "*** End of balancing round";
}
- _endRound(opCtx.get(),
- _balancedLastTime ? kShortBalanceRoundInterval
- : kBalanceRoundDefaultInterval);
+ auto balancerInterval = [&]() -> Milliseconds {
+ MONGO_FAIL_POINT_BLOCK(overrideBalanceRoundInterval, data) {
+ int interval = data.getData()["intervalMs"].numberInt();
+ log() << "overrideBalanceRoundInterval: using shorter balancing interval: "
+ << interval << "ms";
+
+ return Milliseconds(interval);
+ }
+
+ return _balancedLastTime ? kShortBalanceRoundInterval
+ : kBalanceRoundDefaultInterval;
+ }();
+
+ _endRound(opCtx.get(), balancerInterval);
} catch (const DBException& e) {
log() << "caught exception while doing balance: " << e.what();
@@ -441,7 +455,7 @@ void Balancer::_beginRound(OperationContext* opCtx) {
_condVar.notify_all();
}
-void Balancer::_endRound(OperationContext* opCtx, Seconds waitTimeout) {
+void Balancer::_endRound(OperationContext* opCtx, Milliseconds waitTimeout) {
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
_inBalancerRound = false;
@@ -453,7 +467,7 @@ void Balancer::_endRound(OperationContext* opCtx, Seconds waitTimeout) {
_sleepFor(opCtx, waitTimeout);
}
-void Balancer::_sleepFor(OperationContext* opCtx, Seconds waitTimeout) {
+void Balancer::_sleepFor(OperationContext* opCtx, Milliseconds waitTimeout) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
_condVar.wait_for(lock, waitTimeout.toSystemDuration(), [&] { return _state != kRunning; });
}
diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h
index 2b6738def19..d33d6c1ddc0 100644
--- a/src/mongo/db/s/balancer/balancer.h
+++ b/src/mongo/db/s/balancer/balancer.h
@@ -172,13 +172,13 @@ private:
* Signals the beginning and end of a balancing round.
*/
void _beginRound(OperationContext* opCtx);
- void _endRound(OperationContext* opCtx, Seconds waitTimeout);
+ void _endRound(OperationContext* opCtx, Milliseconds waitTimeout);
/**
* Blocks the caller for the specified timeout or until the balancer condition variable is
* signaled, whichever comes first.
*/
- void _sleepFor(OperationContext* opCtx, Seconds waitTimeout);
+ void _sleepFor(OperationContext* opCtx, Milliseconds waitTimeout);
/**
* Returns true if all the servers listed in configdb as being shards are reachable and are
diff --git a/src/mongo/db/s/balancer/balancer_policy.cpp b/src/mongo/db/s/balancer/balancer_policy.cpp
index 5f8fcbf5034..efd71ca26ed 100644
--- a/src/mongo/db/s/balancer/balancer_policy.cpp
+++ b/src/mongo/db/s/balancer/balancer_policy.cpp
@@ -33,14 +33,19 @@
#include "mongo/db/s/balancer/balancer_policy.h"
+#include <random>
+
#include "mongo/db/s/balancer/type_migration.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog/type_tags.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/str.h"
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(balancerShouldReturnRandomMigrations);
+
using std::map;
using std::numeric_limits;
using std::set;
@@ -287,11 +292,81 @@ ShardId BalancerPolicy::_getMostOverloadedShard(const ShardStatisticsVector& sha
return worst;
}
+// Returns a random integer in [0, max) using a uniform random distribution.
+int getRandomIndex(int max) {
+ std::default_random_engine gen(time(nullptr));
+ std::uniform_int_distribution<int> dist(0, max - 1);
+
+ return dist(gen);
+}
+
+// Iterates through the shardStats vector starting from index until it finds an element that has > 0
+// chunks. It will wrap around at the end and stop at the starting index. If no shards have chunks,
+// it will return the original index value.
+int getNextShardWithChunks(const ShardStatisticsVector& shardStats,
+ const DistributionStatus& distribution,
+ int index) {
+ int retIndex = index;
+
+ while (distribution.numberOfChunksInShard(shardStats[retIndex].shardId) == 0) {
+ retIndex = (retIndex + 1) % shardStats.size();
+
+ if (retIndex == index)
+ return index;
+ }
+
+ return retIndex;
+}
+
+// Returns a randomly chosen pair of source -> destination shards for testing.
+// The random pair is chosen by the following algorithm:
+// - create an array of indices with values [0, n)
+// - select a random index from this set
+// - advance the chosen index until we encounter a shard with chunks to move
+// - remove the chosen index from the set by swapping it with the last element
+// - select the destination index from the remaining indices
+MigrateInfo chooseRandomMigration(const ShardStatisticsVector& shardStats,
+ const DistributionStatus& distribution) {
+ std::vector<int> indices(shardStats.size());
+
+ int i = 0;
+ std::generate(indices.begin(), indices.end(), [&i] { return i++; });
+
+ int choice = getRandomIndex(indices.size());
+
+ const int sourceIndex = getNextShardWithChunks(shardStats, distribution, indices[choice]);
+ const auto& sourceShardId = shardStats[sourceIndex].shardId;
+ std::swap(indices[sourceIndex], indices[indices.size() - 1]);
+
+ choice = getRandomIndex(indices.size() - 1);
+ const int destIndex = indices[choice];
+ const auto& destShardId = shardStats[destIndex].shardId;
+
+ LOG(1) << "balancerShouldReturnRandomMigrations: source: " << sourceShardId
+ << " dest: " << destShardId;
+
+ const auto& chunks = distribution.getChunks(sourceShardId);
+
+ return {destShardId, chunks[getRandomIndex(chunks.size())]};
+}
+
vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardStats,
const DistributionStatus& distribution,
std::set<ShardId>* usedShards) {
vector<MigrateInfo> migrations;
+ if (MONGO_FAIL_POINT(balancerShouldReturnRandomMigrations) &&
+ !distribution.nss().isConfigDB()) {
+ LOG(1) << "balancerShouldReturnRandomMigrations failpoint is set";
+
+ if (shardStats.size() < 2)
+ return migrations;
+
+ migrations.push_back(chooseRandomMigration(shardStats, distribution));
+
+ return migrations;
+ }
+
// 1) Check for shards, which are in draining mode
{
for (const auto& stat : shardStats) {