summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/SConscript37
-rw-r--r--src/mongo/s/balancer/balancer.cpp633
-rw-r--r--src/mongo/s/balancer/balancer.h250
-rw-r--r--src/mongo/s/balancer/balancer_chunk_selection_policy.cpp68
-rw-r--r--src/mongo/s/balancer/balancer_chunk_selection_policy.h122
-rw-r--r--src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp418
-rw-r--r--src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h77
-rw-r--r--src/mongo/s/balancer/balancer_configuration.cpp (renamed from src/mongo/s/balancer_configuration.cpp)2
-rw-r--r--src/mongo/s/balancer/balancer_configuration.h (renamed from src/mongo/s/balancer_configuration.h)0
-rw-r--r--src/mongo/s/balancer/balancer_configuration_test.cpp (renamed from src/mongo/s/balancer_configuration_test.cpp)2
-rw-r--r--src/mongo/s/balancer/balancer_policy.cpp518
-rw-r--r--src/mongo/s/balancer/balancer_policy.h246
-rw-r--r--src/mongo/s/balancer/balancer_policy_tests.cpp608
-rw-r--r--src/mongo/s/balancer/cluster_statistics.cpp80
-rw-r--r--src/mongo/s/balancer/cluster_statistics.h110
-rw-r--r--src/mongo/s/balancer/cluster_statistics_impl.cpp160
-rw-r--r--src/mongo/s/balancer/cluster_statistics_impl.h48
-rw-r--r--src/mongo/s/balancer/cluster_statistics_test.cpp51
-rw-r--r--src/mongo/s/balancer/migration_manager.cpp791
-rw-r--r--src/mongo/s/balancer/migration_manager.h294
-rw-r--r--src/mongo/s/balancer/migration_manager_test.cpp980
-rw-r--r--src/mongo/s/balancer/scoped_migration_request.cpp188
-rw-r--r--src/mongo/s/balancer/scoped_migration_request.h106
-rw-r--r--src/mongo/s/balancer/scoped_migration_request_test.cpp213
-rw-r--r--src/mongo/s/balancer/type_migration.cpp122
-rw-r--r--src/mongo/s/balancer/type_migration.h96
-rw-r--r--src/mongo/s/balancer/type_migration_test.cpp165
-rw-r--r--src/mongo/s/catalog/replset_dist_lock_manager_test.cpp2
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_impl.cpp4
-rw-r--r--src/mongo/s/chunk.cpp2
-rw-r--r--src/mongo/s/chunk_manager.cpp2
-rw-r--r--src/mongo/s/cluster_write.cpp2
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_move_chunk_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_shard_collection_cmd.cpp2
-rw-r--r--src/mongo/s/config.cpp2
-rw-r--r--src/mongo/s/config_server_test_fixture.cpp3
-rw-r--r--src/mongo/s/grid.cpp2
-rw-r--r--src/mongo/s/server.cpp2
-rw-r--r--src/mongo/s/sharding_initialization.cpp2
-rw-r--r--src/mongo/s/sharding_mongod_test_fixture.cpp2
-rw-r--r--src/mongo/s/sharding_test_fixture.cpp2
-rw-r--r--src/mongo/s/sharding_uptime_reporter.cpp2
44 files changed, 6398 insertions, 24 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 1983abd864d..91be5a20f2d 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -45,6 +45,7 @@ env.Library(
env.Library(
target='common',
source=[
+ 'balancer/type_migration.cpp',
'catalog/mongo_version_range.cpp',
'catalog/type_changelog.cpp',
'catalog/type_chunk.cpp',
@@ -92,6 +93,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/query/collation/collator_factory_mock',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
'$BUILD_DIR/mongo/client/remote_command_targeter_mock',
'$BUILD_DIR/mongo/executor/network_test_env',
'$BUILD_DIR/mongo/executor/task_executor_pool',
@@ -153,6 +155,7 @@ env.CppUnitTest(
env.CppUnitTest(
target='sharding_common_test',
source=[
+ 'balancer/type_migration_test.cpp',
'catalog/type_changelog_test.cpp',
'catalog/type_chunk_test.cpp',
'catalog/type_collection_test.cpp',
@@ -249,11 +252,20 @@ env.CppUnitTest('request_types_test',
],
)
-# This library contains sharding functionality used by both mongod and mongos
+# This library contains sharding functionality used by both mongod and mongos. Certain tests,
+# which exercise this functionality also link against it.
env.Library(
target='coreshard',
source=[
- 'balancer_configuration.cpp',
+ 'balancer/balancer.cpp',
+ 'balancer/balancer_chunk_selection_policy.cpp',
+ 'balancer/balancer_chunk_selection_policy_impl.cpp',
+ 'balancer/balancer_configuration.cpp',
+ 'balancer/balancer_policy.cpp',
+ 'balancer/cluster_statistics.cpp',
+ 'balancer/cluster_statistics_impl.cpp',
+ 'balancer/migration_manager.cpp',
+ 'balancer/scoped_migration_request.cpp',
'catalog/catalog_cache.cpp',
'chunk.cpp',
'chunk_manager.cpp',
@@ -317,11 +329,15 @@ env.Library(
)
env.CppUnitTest(
- target='balancer_configuration_test',
+ target='balancer_test',
source=[
- 'balancer_configuration_test.cpp',
+ 'balancer/balancer_configuration_test.cpp',
+ 'balancer/balancer_policy_tests.cpp',
+ 'balancer/cluster_statistics_test.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
'coreshard',
'sharding_test_fixture',
]
@@ -338,6 +354,19 @@ env.CppUnitTest(
]
)
+env.CppUnitTest(
+ target='migration_manager_test',
+ source=[
+ 'balancer/migration_manager_test.cpp',
+ 'balancer/scoped_migration_request_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/util/version_impl',
+ 'config_server_test_fixture',
+ 'coreshard',
+ ]
+)
+
env.Library(
target='local_sharding_info',
source=[
diff --git a/src/mongo/s/balancer/balancer.cpp b/src/mongo/s/balancer/balancer.cpp
new file mode 100644
index 00000000000..40982f8cd03
--- /dev/null
+++ b/src/mongo/s/balancer/balancer.cpp
@@ -0,0 +1,633 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/balancer.h"
+
+#include <algorithm>
+#include <string>
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/client/read_preference.h"
+#include "mongo/db/client.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/s/balancer/balancer_chunk_selection_policy_impl.h"
+#include "mongo/s/balancer/balancer_configuration.h"
+#include "mongo/s/balancer/cluster_statistics_impl.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/cluster_identity_loader.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/shard_util.h"
+#include "mongo/s/sharding_raii.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/log.h"
+#include "mongo/util/timer.h"
+#include "mongo/util/version.h"
+
+namespace mongo {
+
+using std::map;
+using std::string;
+using std::vector;
+
+namespace {
+
+const Seconds kBalanceRoundDefaultInterval(10);
+const Seconds kShortBalanceRoundInterval(1);
+
+const auto getBalancer = ServiceContext::declareDecoration<std::unique_ptr<Balancer>>();
+
+/**
+ * Utility class to generate timing and statistics for a single balancer round.
+ */
+class BalanceRoundDetails {
+public:
+ BalanceRoundDetails() : _executionTimer() {}
+
+ void setSucceeded(int candidateChunks, int chunksMoved) {
+ invariant(!_errMsg);
+ _candidateChunks = candidateChunks;
+ _chunksMoved = chunksMoved;
+ }
+
+ void setFailed(const string& errMsg) {
+ _errMsg = errMsg;
+ }
+
+ BSONObj toBSON() const {
+ BSONObjBuilder builder;
+ builder.append("executionTimeMillis", _executionTimer.millis());
+ builder.append("errorOccured", _errMsg.is_initialized());
+
+ if (_errMsg) {
+ builder.append("errmsg", *_errMsg);
+ } else {
+ builder.append("candidateChunks", _candidateChunks);
+ builder.append("chunksMoved", _chunksMoved);
+ }
+
+ return builder.obj();
+ }
+
+private:
+ const Timer _executionTimer;
+
+ // Set only on success
+ int _candidateChunks{0};
+ int _chunksMoved{0};
+
+ // Set only on failure
+ boost::optional<string> _errMsg;
+};
+
+/**
+ * Occasionally prints a log message with shard versions if the versions are not the same
+ * in the cluster.
+ */
+void warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics>& clusterStats) {
+
+ auto&& vii = VersionInfoInterface::instance();
+
+ bool isMultiVersion = false;
+ for (const auto& stat : clusterStats) {
+ if (!vii.isSameMajorVersion(stat.mongoVersion.c_str())) {
+ isMultiVersion = true;
+ break;
+ }
+ }
+
+ // If we're all the same version, don't message
+ if (!isMultiVersion)
+ return;
+
+ StringBuilder sb;
+ sb << "Multi version cluster detected. Local version: " << vii.version()
+ << ", shard versions: ";
+
+ for (const auto& stat : clusterStats) {
+ sb << stat.shardId << " is at " << stat.mongoVersion << "; ";
+ }
+
+ warning() << sb.str();
+}
+
+} // namespace
+
+Balancer::Balancer(ServiceContext* serviceContext)
+ : _balancedLastTime(0),
+ _clusterStats(stdx::make_unique<ClusterStatisticsImpl>()),
+ _chunkSelectionPolicy(
+ stdx::make_unique<BalancerChunkSelectionPolicyImpl>(_clusterStats.get())),
+ _migrationManager(serviceContext) {}
+
+Balancer::~Balancer() {
+ // The balancer thread must have been stopped
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ invariant(_state == kStopped);
+}
+
+void Balancer::create(ServiceContext* serviceContext) {
+ invariant(!getBalancer(serviceContext));
+ getBalancer(serviceContext) = stdx::make_unique<Balancer>(serviceContext);
+}
+
+Balancer* Balancer::get(ServiceContext* serviceContext) {
+ return getBalancer(serviceContext).get();
+}
+
+Balancer* Balancer::get(OperationContext* operationContext) {
+ return get(operationContext->getServiceContext());
+}
+
+void Balancer::onTransitionToPrimary(OperationContext* txn) {
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ invariant(_state == kStopped);
+ _state = kRunning;
+
+ _migrationManager.startRecoveryAndAcquireDistLocks(txn);
+
+ invariant(!_thread.joinable());
+ invariant(!_threadOperationContext);
+ _thread = stdx::thread([this] { _mainThread(); });
+}
+
+void Balancer::onStepDownFromPrimary() {
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ if (_state != kRunning)
+ return;
+
+ _state = kStopping;
+
+ // Interrupt the balancer thread if it has been started. We are guaranteed that the operation
+ // context of that thread is still alive, because we hold the balancer mutex.
+ if (_threadOperationContext) {
+ stdx::lock_guard<Client> scopedClientLock(*_threadOperationContext->getClient());
+ _threadOperationContext->markKilled(ErrorCodes::InterruptedDueToReplStateChange);
+ }
+
+ // Schedule a separate thread to shutdown the migration manager in order to avoid deadlock with
+ // replication step down
+ invariant(!_migrationManagerInterruptThread.joinable());
+ _migrationManagerInterruptThread =
+ stdx::thread([this] { _migrationManager.interruptAndDisableMigrations(); });
+
+ _condVar.notify_all();
+}
+
+void Balancer::onDrainComplete(OperationContext* txn) {
+ invariant(!txn->lockState()->isLocked());
+
+ {
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ if (_state == kStopped)
+ return;
+
+ invariant(_state == kStopping);
+ invariant(_thread.joinable());
+ }
+
+ _thread.join();
+
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ _state = kStopped;
+ _thread = {};
+
+ LOG(1) << "Balancer thread terminated";
+}
+
+void Balancer::joinCurrentRound(OperationContext* txn) {
+ stdx::unique_lock<stdx::mutex> scopedLock(_mutex);
+ const auto numRoundsAtStart = _numBalancerRounds;
+ _condVar.wait(scopedLock,
+ [&] { return !_inBalancerRound || _numBalancerRounds != numRoundsAtStart; });
+}
+
+Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& chunk) {
+ auto migrateStatus = _chunkSelectionPolicy->selectSpecificChunkToMove(txn, chunk);
+ if (!migrateStatus.isOK()) {
+ return migrateStatus.getStatus();
+ }
+
+ auto migrateInfo = std::move(migrateStatus.getValue());
+ if (!migrateInfo) {
+ LOG(1) << "Unable to find more appropriate location for chunk " << redact(chunk.toString());
+ return Status::OK();
+ }
+
+ auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
+ Status refreshStatus = balancerConfig->refreshAndCheck(txn);
+ if (!refreshStatus.isOK()) {
+ return refreshStatus;
+ }
+
+ return _migrationManager.executeManualMigration(txn,
+ *migrateInfo,
+ balancerConfig->getMaxChunkSizeBytes(),
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete());
+}
+
+Status Balancer::moveSingleChunk(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+ auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(txn, chunk, newShardId);
+ if (!moveAllowedStatus.isOK()) {
+ return moveAllowedStatus;
+ }
+
+ return _migrationManager.executeManualMigration(txn,
+ MigrateInfo(chunk.getNS(), newShardId, chunk),
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete);
+}
+
+void Balancer::report(OperationContext* txn, BSONObjBuilder* builder) {
+ auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
+ balancerConfig->refreshAndCheck(txn);
+
+ const auto mode = balancerConfig->getBalancerMode();
+
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ builder->append("mode", BalancerSettingsType::kBalancerModes[mode]);
+ builder->append("inBalancerRound", _inBalancerRound);
+ builder->append("numBalancerRounds", _numBalancerRounds);
+}
+
+void Balancer::_mainThread() {
+ Client::initThread("Balancer");
+ auto txn = cc().makeOperationContext();
+ auto shardingContext = Grid::get(txn.get());
+
+ log() << "CSRS balancer is starting";
+
+ {
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ _threadOperationContext = txn.get();
+ }
+
+ const Seconds kInitBackoffInterval(10);
+
+ // Take the balancer distributed lock and hold it permanently. Do the attempts with single
+ // attempts in order to not block the thread and be able to check for interrupt more frequently.
+ while (!_stopRequested()) {
+ auto distLockHandleStatus =
+ shardingContext->catalogClient(txn.get())->getDistLockManager()->lockWithSessionID(
+ txn.get(),
+ "balancer",
+ "CSRS Balancer",
+ OID::gen(),
+ DistLockManager::kSingleLockAttemptTimeout);
+ if (!distLockHandleStatus.isOK()) {
+ warning() << "Balancer distributed lock could not be acquired and will be retried in "
+ << durationCount<Seconds>(kInitBackoffInterval) << " seconds"
+ << causedBy(distLockHandleStatus.getStatus());
+
+ _sleepFor(txn.get(), kInitBackoffInterval);
+ continue;
+ }
+
+ break;
+ }
+
+ log() << "CSRS balancer thread is recovering";
+
+ auto balancerConfig = Grid::get(txn.get())->getBalancerConfiguration();
+ _migrationManager.finishRecovery(txn.get(),
+ balancerConfig->getMaxChunkSizeBytes(),
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete());
+
+ log() << "CSRS balancer thread is recovered";
+
+ // Main balancer loop
+ while (!_stopRequested()) {
+ auto balancerConfig = shardingContext->getBalancerConfiguration();
+
+ BalanceRoundDetails roundDetails;
+
+ _beginRound(txn.get());
+
+ try {
+ shardingContext->shardRegistry()->reload(txn.get());
+
+ uassert(13258, "oids broken after resetting!", _checkOIDs(txn.get()));
+
+ Status refreshStatus = balancerConfig->refreshAndCheck(txn.get());
+ if (!refreshStatus.isOK()) {
+ warning() << "Skipping balancing round" << causedBy(refreshStatus);
+ _endRound(txn.get(), kBalanceRoundDefaultInterval);
+ continue;
+ }
+
+ if (!balancerConfig->shouldBalance()) {
+ LOG(1) << "Skipping balancing round because balancing is disabled";
+ _endRound(txn.get(), kBalanceRoundDefaultInterval);
+ continue;
+ }
+
+ {
+ LOG(1) << "*** start balancing round. "
+ << "waitForDelete: " << balancerConfig->waitForDelete()
+ << ", secondaryThrottle: "
+ << balancerConfig->getSecondaryThrottle().toBSON();
+
+ OCCASIONALLY warnOnMultiVersion(
+ uassertStatusOK(_clusterStats->getStats(txn.get())));
+
+ Status status = _enforceTagRanges(txn.get());
+ if (!status.isOK()) {
+ warning() << "Failed to enforce tag ranges" << causedBy(status);
+ } else {
+ LOG(1) << "Done enforcing tag range boundaries.";
+ }
+
+ const auto candidateChunks = uassertStatusOK(
+ _chunkSelectionPolicy->selectChunksToMove(txn.get(), _balancedLastTime));
+
+ if (candidateChunks.empty()) {
+ LOG(1) << "no need to move any chunk";
+ _balancedLastTime = false;
+ } else {
+ _balancedLastTime = _moveChunks(txn.get(), candidateChunks);
+
+ roundDetails.setSucceeded(static_cast<int>(candidateChunks.size()),
+ _balancedLastTime);
+
+ shardingContext->catalogClient(txn.get())->logAction(
+ txn.get(), "balancer.round", "", roundDetails.toBSON());
+ }
+
+ LOG(1) << "*** End of balancing round";
+ }
+
+ _endRound(txn.get(),
+ _balancedLastTime ? kShortBalanceRoundInterval
+ : kBalanceRoundDefaultInterval);
+ } catch (const std::exception& e) {
+ log() << "caught exception while doing balance: " << e.what();
+
+ // Just to match the opening statement if in log level 1
+ LOG(1) << "*** End of balancing round";
+
+ // This round failed, tell the world!
+ roundDetails.setFailed(e.what());
+
+ shardingContext->catalogClient(txn.get())->logAction(
+ txn.get(), "balancer.round", "", roundDetails.toBSON());
+
+ // Sleep a fair amount before retrying because of the error
+ _endRound(txn.get(), kBalanceRoundDefaultInterval);
+ }
+ }
+
+ {
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ invariant(_state == kStopping);
+ invariant(_migrationManagerInterruptThread.joinable());
+ }
+
+ _migrationManagerInterruptThread.join();
+ _migrationManager.drainActiveMigrations();
+
+ {
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ _migrationManagerInterruptThread = {};
+ _threadOperationContext = nullptr;
+ }
+
+ log() << "CSRS balancer is now stopped";
+}
+
+bool Balancer::_stopRequested() {
+ stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
+ return (_state != kRunning);
+}
+
+void Balancer::_beginRound(OperationContext* txn) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _inBalancerRound = true;
+ _condVar.notify_all();
+}
+
+void Balancer::_endRound(OperationContext* txn, Seconds waitTimeout) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _inBalancerRound = false;
+ _numBalancerRounds++;
+ _condVar.notify_all();
+ }
+
+ _sleepFor(txn, waitTimeout);
+}
+
+void Balancer::_sleepFor(OperationContext* txn, Seconds waitTimeout) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _condVar.wait_for(lock, waitTimeout.toSystemDuration(), [&] { return _state != kRunning; });
+}
+
+bool Balancer::_checkOIDs(OperationContext* txn) {
+ auto shardingContext = Grid::get(txn);
+
+ vector<ShardId> all;
+ shardingContext->shardRegistry()->getAllShardIds(&all);
+
+ // map of OID machine ID => shardId
+ map<int, ShardId> oids;
+
+ for (const ShardId& shardId : all) {
+ if (_stopRequested()) {
+ return false;
+ }
+
+ auto shardStatus = shardingContext->shardRegistry()->getShard(txn, shardId);
+ if (!shardStatus.isOK()) {
+ continue;
+ }
+ const auto s = shardStatus.getValue();
+
+ auto result = uassertStatusOK(
+ s->runCommandWithFixedRetryAttempts(txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ BSON("features" << 1),
+ Shard::RetryPolicy::kIdempotent));
+ uassertStatusOK(result.commandStatus);
+ BSONObj f = std::move(result.response);
+
+ if (f["oidMachine"].isNumber()) {
+ int x = f["oidMachine"].numberInt();
+ if (oids.count(x) == 0) {
+ oids[x] = shardId;
+ } else {
+ log() << "error: 2 machines have " << x << " as oid machine piece: " << shardId
+ << " and " << oids[x];
+
+ result = uassertStatusOK(s->runCommandWithFixedRetryAttempts(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ BSON("features" << 1 << "oidReset" << 1),
+ Shard::RetryPolicy::kIdempotent));
+ uassertStatusOK(result.commandStatus);
+
+ auto otherShardStatus = shardingContext->shardRegistry()->getShard(txn, oids[x]);
+ if (otherShardStatus.isOK()) {
+ result = uassertStatusOK(
+ otherShardStatus.getValue()->runCommandWithFixedRetryAttempts(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ BSON("features" << 1 << "oidReset" << 1),
+ Shard::RetryPolicy::kIdempotent));
+ uassertStatusOK(result.commandStatus);
+ }
+
+ return false;
+ }
+ } else {
+ log() << "warning: oidMachine not set on: " << s->toString();
+ }
+ }
+
+ return true;
+}
+
+Status Balancer::_enforceTagRanges(OperationContext* txn) {
+ auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(txn);
+ if (!chunksToSplitStatus.isOK()) {
+ return chunksToSplitStatus.getStatus();
+ }
+
+ for (const auto& splitInfo : chunksToSplitStatus.getValue()) {
+ auto scopedCMStatus = ScopedChunkManager::getExisting(txn, splitInfo.nss);
+ if (!scopedCMStatus.isOK()) {
+ return scopedCMStatus.getStatus();
+ }
+
+ auto scopedCM = std::move(scopedCMStatus.getValue());
+ ChunkManager* const cm = scopedCM.cm();
+
+ auto splitStatus = shardutil::splitChunkAtMultiplePoints(txn,
+ splitInfo.shardId,
+ splitInfo.nss,
+ cm->getShardKeyPattern(),
+ splitInfo.collectionVersion,
+ splitInfo.minKey,
+ splitInfo.maxKey,
+ splitInfo.chunkVersion,
+ splitInfo.splitKeys);
+ if (!splitStatus.isOK()) {
+ warning() << "Failed to enforce tag range for chunk " << redact(splitInfo.toString())
+ << causedBy(redact(splitStatus.getStatus()));
+ }
+
+ cm->reload(txn);
+ }
+
+ return Status::OK();
+}
+
+int Balancer::_moveChunks(OperationContext* txn,
+ const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateChunks) {
+ auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
+
+ // If the balancer was disabled since we started this round, don't start new chunk moves
+ if (_stopRequested() || !balancerConfig->shouldBalance()) {
+ LOG(1) << "Skipping balancing round because balancer was stopped";
+ return 0;
+ }
+
+ auto migrationStatuses =
+ _migrationManager.executeMigrationsForAutoBalance(txn,
+ candidateChunks,
+ balancerConfig->getMaxChunkSizeBytes(),
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete());
+
+ int numChunksProcessed = 0;
+
+ for (const auto& migrationStatusEntry : migrationStatuses) {
+ const Status& status = migrationStatusEntry.second;
+ if (status.isOK()) {
+ numChunksProcessed++;
+ continue;
+ }
+
+ const MigrationIdentifier& migrationId = migrationStatusEntry.first;
+
+ if (status == ErrorCodes::ChunkTooBig) {
+ numChunksProcessed++;
+
+ auto failedRequestIt = std::find_if(candidateChunks.begin(),
+ candidateChunks.end(),
+ [&migrationId](const MigrateInfo& migrateInfo) {
+ return migrateInfo.getName() == migrationId;
+ });
+ invariant(failedRequestIt != candidateChunks.end());
+
+ log() << "Performing a split because migration " << failedRequestIt->toString()
+ << " failed for size reasons" << causedBy(status);
+
+ _splitOrMarkJumbo(txn, NamespaceString(failedRequestIt->ns), failedRequestIt->minKey);
+ continue;
+ }
+
+ log() << "Balancer move " << migrationId << " failed" << causedBy(status);
+ }
+
+ return numChunksProcessed;
+}
+
+void Balancer::_splitOrMarkJumbo(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& minKey) {
+ auto scopedChunkManager = uassertStatusOK(ScopedChunkManager::getExisting(txn, nss));
+ ChunkManager* const chunkManager = scopedChunkManager.cm();
+
+ auto chunk = chunkManager->findIntersectingChunkWithSimpleCollation(txn, minKey);
+
+ auto splitStatus = chunk->split(txn, Chunk::normal, nullptr);
+ if (!splitStatus.isOK()) {
+ log() << "Marking chunk " << chunk->toString() << " as jumbo.";
+ chunk->markAsJumbo(txn);
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/balancer.h b/src/mongo/s/balancer/balancer.h
new file mode 100644
index 00000000000..bc18818484d
--- /dev/null
+++ b/src/mongo/s/balancer/balancer.h
@@ -0,0 +1,250 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/s/balancer/balancer_chunk_selection_policy.h"
+#include "mongo/s/balancer/migration_manager.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+
+namespace mongo {
+
+class ChunkType;
+class ClusterStatistics;
+class MigrationSecondaryThrottleOptions;
+class OperationContext;
+class ServiceContext;
+class Status;
+
+/**
+ * The balancer is a background task that tries to keep the number of chunks across all
+ * servers of the cluster even. Although every mongos will have one balancer running, only one
+ * of them will be active at the any given point in time. The balancer uses a distributed lock
+ * for that coordination.
+ *
+ * The balancer does act continuously but in "rounds". At a given round, it would decide if
+ * there is an imbalance by checking the difference in chunks between the most and least
+ * loaded shards. It would issue a request for a chunk migration per round, if it found so.
+ */
+class Balancer {
+ MONGO_DISALLOW_COPYING(Balancer);
+
+public:
+ Balancer(ServiceContext* serviceContext);
+ ~Balancer();
+
+ /**
+ * Instantiates an instance of the balancer and installs it on the specified service context.
+ * This method is not thread-safe and must be called only once when the service is starting.
+ */
+ static void create(ServiceContext* serviceContext);
+
+ /**
+ * Retrieves the per-service instance of the Balancer.
+ */
+ static Balancer* get(ServiceContext* serviceContext);
+ static Balancer* get(OperationContext* operationContext);
+
+ /**
+ * Invoked when the config server primary enters the 'PRIMARY' state and is invoked while the
+ * caller is holding the global X lock. Kicks off the main balancer thread and returns
+ * immediately.
+ *
+ * Must only be called if the balancer is in the stopped state (i.e., just constructed or
+ * onDrainComplete has been called before). Any code in this call must not try to acquire any
+ * locks or to wait on operations, which acquire locks.
+ */
+ void onTransitionToPrimary(OperationContext* txn);
+
+ /**
+ * Invoked when this node which is currently serving as a 'PRIMARY' steps down and is invoked
+ * while the global X lock is held. Requests the main balancer thread to stop and returns
+ * immediately without waiting for it to terminate.
+ *
+ * This method might be called multiple times in succession, which is what happens as a result
+ * of incomplete transition to primary so it is resilient to that.
+ *
+ * The onDrainComplete method must be called afterwards in order to wait for the main balancer
+ * thread to terminate and to allow onTransitionToPrimary to be called again.
+ */
+ void onStepDownFromPrimary();
+
+ /**
+ * Invoked when a node on its way to becoming a primary finishes draining and is about to
+ * acquire the global X lock in order to allow writes. Waits for the balancer thread to
+ * terminate and primes the balancer so that onTransitionToPrimary can be called.
+ *
+ * This method is called without any locks held.
+ */
+ void onDrainComplete(OperationContext* txn);
+
+ /**
+ * Potentially blocking method, which will return immediately if the balancer is not running a
+ * balancer round and will block until the current round completes otherwise.
+ */
+ void joinCurrentRound(OperationContext* txn);
+
+ /**
+ * Blocking call, which requests the balancer to move a single chunk to a more appropriate
+ * shard, in accordance with the active balancer policy. It is not guaranteed that the chunk
+ * will actually move because it may already be at the best shard. An error will be returned if
+ * the attempt to find a better shard or the actual migration fail for any reason.
+ */
+ Status rebalanceSingleChunk(OperationContext* txn, const ChunkType& chunk);
+
+ /**
+ * Blocking call, which requests the balancer to move a single chunk to the specified location
+ * in accordance with the active balancer policy. An error will be returned if the attempt to
+ * move fails for any reason.
+ *
+ * NOTE: This call disregards the balancer enabled/disabled status and will proceed with the
+ * move regardless. If should be used only for user-initiated moves.
+ */
+ Status moveSingleChunk(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete);
+
+ /**
+ * Appends the runtime state of the balancer instance to the specified builder.
+ */
+ void report(OperationContext* txn, BSONObjBuilder* builder);
+
+private:
+ /**
+ * Possible runtime states of the balancer. The comments indicate the allowed next state.
+ */
+ enum State {
+ kStopped, // kRunning
+ kRunning, // kStopping
+ kStopping, // kStopped
+ };
+
+ /**
+ * The main balancer loop, which runs in a separate thread.
+ */
+ void _mainThread();
+
+ /**
+ * Checks whether the balancer main thread has been requested to stop.
+ */
+ bool _stopRequested();
+
+ /**
+ * Signals the beginning and end of a balancing round.
+ */
+ void _beginRound(OperationContext* txn);
+ void _endRound(OperationContext* txn, Seconds waitTimeout);
+
+ /**
+ * Blocks the caller for the specified timeout or until the balancer condition variable is
+ * signaled, whichever comes first.
+ */
+ void _sleepFor(OperationContext* txn, Seconds waitTimeout);
+
+ /**
+ * Returns true if all the servers listed in configdb as being shards are reachable and are
+ * distinct processes (no hostname mixup).
+ */
+ bool _checkOIDs(OperationContext* txn);
+
+ /**
+ * Iterates through all chunks in all collections and ensures that no chunks straddle tag
+ * boundary. If any do, they will be split.
+ */
+ Status _enforceTagRanges(OperationContext* txn);
+
+ /**
+ * Schedules migrations for the specified set of chunks and returns how many chunks were
+ * successfully processed.
+ */
+ int _moveChunks(OperationContext* txn,
+ const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateChunks);
+
+ /**
+ * Performs a split on the chunk with min value "minKey". If the split fails, it is marked as
+ * jumbo.
+ */
+ void _splitOrMarkJumbo(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& minKey);
+
+ // Protects the state below
+ stdx::mutex _mutex;
+
+ // Indicates the current state of the balancer
+ State _state{kStopped};
+
+ // The main balancer thread
+ stdx::thread _thread;
+
+ // The operation context of the main balancer thread. This value may only be available in the
+ // kRunning state and is used to force interrupt of any blocking calls made by the balancer
+ // thread.
+ OperationContext* _threadOperationContext{nullptr};
+
+ // This thread is only available in the kStopping state and is necessary for the migration
+ // manager shutdown to not deadlock with replica set step down. In particular, the migration
+ // manager's order of lock acquisition is mutex, then collection lock, whereas stepdown first
+ // acquires the global S lock and then acquires the migration manager's mutex.
+ //
+ // The interrupt thread is scheduled when the balancer enters the kStopping state (which is at
+ // step down) and is joined outside of lock, when the replica set leaves draining mode, outside
+ // of the global X lock.
+ stdx::thread _migrationManagerInterruptThread;
+
+ // Indicates whether the balancer is currently executing a balancer round
+ bool _inBalancerRound{false};
+
+ // Counts the number of balancing rounds performed since the balancer thread was first activated
+ int64_t _numBalancerRounds{0};
+
+ // Condition variable, which is signalled every time the above runtime state of the balancer
+ // changes (in particular, state/balancer round and number of balancer rounds).
+ stdx::condition_variable _condVar;
+
+ // Number of moved chunks in last round
+ int _balancedLastTime;
+
+ // Source for cluster statistics
+ std::unique_ptr<ClusterStatistics> _clusterStats;
+
+ // Balancer policy. Depends on the cluster statistics instance above so it should be created
+ // after it and destroyed before it.
+ std::unique_ptr<BalancerChunkSelectionPolicy> _chunkSelectionPolicy;
+
+ // Migration manager used to schedule and manage migrations
+ MigrationManager _migrationManager;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy.cpp b/src/mongo/s/balancer/balancer_chunk_selection_policy.cpp
new file mode 100644
index 00000000000..2ee21b6ae87
--- /dev/null
+++ b/src/mongo/s/balancer/balancer_chunk_selection_policy.cpp
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/balancer_chunk_selection_policy.h"
+
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+BalancerChunkSelectionPolicy::BalancerChunkSelectionPolicy() = default;
+
+BalancerChunkSelectionPolicy::~BalancerChunkSelectionPolicy() = default;
+
+BalancerChunkSelectionPolicy::SplitInfo::SplitInfo(ShardId inShardId,
+ NamespaceString inNss,
+ ChunkVersion inCollectionVersion,
+ ChunkVersion inChunkVersion,
+ const BSONObj& inMinKey,
+ const BSONObj& inMaxKey,
+ std::vector<BSONObj> inSplitKeys)
+ : shardId(std::move(inShardId)),
+ nss(std::move(inNss)),
+ collectionVersion(inCollectionVersion),
+ chunkVersion(inChunkVersion),
+ minKey(inMinKey),
+ maxKey(inMaxKey),
+ splitKeys(std::move(inSplitKeys)) {}
+
+std::string BalancerChunkSelectionPolicy::SplitInfo::toString() const {
+ StringBuilder splitKeysBuilder;
+ for (const auto& splitKey : splitKeys) {
+ splitKeysBuilder << splitKey.toString() << ", ";
+ }
+
+ return str::stream() << "Splitting chunk in " << nss.ns() << " [ " << minKey << ", " << maxKey
+ << "), residing on " << shardId << " at [ " << splitKeysBuilder.str()
+ << " ] with version " << chunkVersion.toString()
+ << " and collection version " << collectionVersion.toString();
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy.h b/src/mongo/s/balancer/balancer_chunk_selection_policy.h
new file mode 100644
index 00000000000..ac227183ddf
--- /dev/null
+++ b/src/mongo/s/balancer/balancer_chunk_selection_policy.h
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/s/balancer/balancer_policy.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/chunk_version.h"
+
+namespace mongo {
+
+class ChunkType;
+class NamespaceString;
+class OperationContext;
+template <typename T>
+class StatusWith;
+
+/**
+ * Interface used by the balancer for selecting chunks, which need to be moved around in order for
+ * the sharded cluster to be balanced. It is up to the implementation to decide what exactly
+ * 'balanced' means.
+ */
+class BalancerChunkSelectionPolicy {
+ MONGO_DISALLOW_COPYING(BalancerChunkSelectionPolicy);
+
+public:
+ /**
+ * Describes a chunk which needs to be split, because it violates the balancer policy.
+ */
+ struct SplitInfo {
+ SplitInfo(ShardId shardId,
+ NamespaceString nss,
+ ChunkVersion collectionVersion,
+ ChunkVersion chunkVersion,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ std::vector<BSONObj> splitKeys);
+
+ std::string toString() const;
+
+ ShardId shardId;
+ NamespaceString nss;
+ ChunkVersion collectionVersion;
+ ChunkVersion chunkVersion;
+ BSONObj minKey;
+ BSONObj maxKey;
+ std::vector<BSONObj> splitKeys;
+ };
+
+ typedef std::vector<SplitInfo> SplitInfoVector;
+
+ typedef std::vector<MigrateInfo> MigrateInfoVector;
+
+ virtual ~BalancerChunkSelectionPolicy();
+
+ /**
+ * Potentially blocking method, which gives out a set of chunks, which need to be split because
+ * they violate the policy for some reason. The reason is decided by the policy and may include
+ * chunk is too big or chunk straddles a tag range.
+ */
+ virtual StatusWith<SplitInfoVector> selectChunksToSplit(OperationContext* txn) = 0;
+
+ /**
+ * Potentially blocking method, which gives out a set of chunks to be moved. The
+ * aggressiveBalanceHint indicates to the balancing logic that it should lower the threshold for
+ * difference in number of chunks across shards and thus potentially cause more chunks to move.
+ */
+ virtual StatusWith<MigrateInfoVector> selectChunksToMove(OperationContext* txn,
+ bool aggressiveBalanceHint) = 0;
+
+ /**
+ * Requests a single chunk to be relocated to a different shard, if possible. If some error
+ * occurs while trying to determine the best location for the chunk, a failed status is
+ * returned. If the chunk is already at the best shard that it can be, returns boost::none.
+ * Otherwise returns migration information for where the chunk should be moved.
+ */
+ virtual StatusWith<boost::optional<MigrateInfo>> selectSpecificChunkToMove(
+ OperationContext* txn, const ChunkType& chunk) = 0;
+
+ /**
+ * Asks the chunk selection policy to validate that the specified chunk migration is allowed
+ * given the current rules. Returns OK if the migration won't violate any rules or any other
+ * failed status otherwise.
+ */
+ virtual Status checkMoveAllowed(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId) = 0;
+
+protected:
+ BalancerChunkSelectionPolicy();
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp
new file mode 100644
index 00000000000..c8b4b8d115d
--- /dev/null
+++ b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp
@@ -0,0 +1,418 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/balancer_chunk_selection_policy_impl.h"
+
+#include <set>
+#include <vector>
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj_comparator_interface.h"
+#include "mongo/s/catalog/catalog_cache.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_tags.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/sharding_raii.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+using ChunkMinimumsSet = BSONObjSet;
+using MigrateInfoVector = BalancerChunkSelectionPolicy::MigrateInfoVector;
+using SplitInfoVector = BalancerChunkSelectionPolicy::SplitInfoVector;
+using std::shared_ptr;
+using std::unique_ptr;
+using std::vector;
+
+namespace {
+
+/**
+ * Does a linear pass over the information cached in the specified chunk manager and extracts chunk
+ * distrubution and chunk placement information which is needed by the balancer policy.
+ */
+StatusWith<std::pair<DistributionStatus, ChunkMinimumsSet>> createCollectionDistributionInfo(
+ OperationContext* txn, const ShardStatisticsVector& allShards, ChunkManager* chunkMgr) {
+ ShardToChunksMap shardToChunksMap;
+ ChunkMinimumsSet chunkMinimums = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
+
+ // Makes sure there is an entry in shardToChunksMap for every shard, so empty shards will also
+ // be accounted for
+ for (const auto& stat : allShards) {
+ shardToChunksMap[stat.shardId];
+ }
+
+ for (const auto& entry : chunkMgr->getChunkMap()) {
+ const auto& chunkEntry = entry.second;
+
+ ChunkType chunk;
+ chunk.setMin(chunkEntry->getMin());
+ chunk.setMax(chunkEntry->getMax());
+ chunk.setJumbo(chunkEntry->isJumbo());
+ chunk.setShard(chunkEntry->getShardId());
+
+ shardToChunksMap[chunkEntry->getShardId()].push_back(chunk);
+ chunkMinimums.insert(chunkEntry->getMin());
+ }
+
+ vector<TagsType> collectionTags;
+ Status tagsStatus = Grid::get(txn)->catalogClient(txn)->getTagsForCollection(
+ txn, chunkMgr->getns(), &collectionTags);
+ if (!tagsStatus.isOK()) {
+ return {tagsStatus.code(),
+ str::stream() << "Unable to load tags for collection " << chunkMgr->getns()
+ << " due to "
+ << tagsStatus.toString()};
+ }
+
+ DistributionStatus distribution(NamespaceString(chunkMgr->getns()),
+ std::move(shardToChunksMap));
+
+ // Cache the collection tags
+ const auto& keyPattern = chunkMgr->getShardKeyPattern().getKeyPattern();
+
+ for (const auto& tag : collectionTags) {
+ auto status = distribution.addRangeToZone(
+ ZoneRange(keyPattern.extendRangeBound(tag.getMinKey(), false),
+ keyPattern.extendRangeBound(tag.getMaxKey(), false),
+ tag.getTag()));
+
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
+ return std::make_pair(std::move(distribution), std::move(chunkMinimums));
+}
+
+} // namespace
+
+BalancerChunkSelectionPolicyImpl::BalancerChunkSelectionPolicyImpl(ClusterStatistics* clusterStats)
+ : _clusterStats(clusterStats) {}
+
+BalancerChunkSelectionPolicyImpl::~BalancerChunkSelectionPolicyImpl() = default;
+
+StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToSplit(
+ OperationContext* txn) {
+ auto shardStatsStatus = _clusterStats->getStats(txn);
+ if (!shardStatsStatus.isOK()) {
+ return shardStatsStatus.getStatus();
+ }
+
+ const auto shardStats = std::move(shardStatsStatus.getValue());
+
+ vector<CollectionType> collections;
+
+ Status collsStatus =
+ Grid::get(txn)->catalogClient(txn)->getCollections(txn, nullptr, &collections, nullptr);
+ if (!collsStatus.isOK()) {
+ return collsStatus;
+ }
+
+ if (collections.empty()) {
+ return SplitInfoVector{};
+ }
+
+ SplitInfoVector splitCandidates;
+
+ for (const auto& coll : collections) {
+ const NamespaceString nss(coll.getNs());
+
+ auto candidatesStatus = _getSplitCandidatesForCollection(txn, nss, shardStats);
+ if (candidatesStatus == ErrorCodes::NamespaceNotFound) {
+ // Namespace got dropped before we managed to get to it, so just skip it
+ continue;
+ } else if (!candidatesStatus.isOK()) {
+ warning() << "Unable to enforce tag range policy for collection " << nss.ns()
+ << causedBy(candidatesStatus.getStatus());
+ continue;
+ }
+
+ splitCandidates.insert(splitCandidates.end(),
+ std::make_move_iterator(candidatesStatus.getValue().begin()),
+ std::make_move_iterator(candidatesStatus.getValue().end()));
+ }
+
+ return splitCandidates;
+}
+
+StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMove(
+ OperationContext* txn, bool aggressiveBalanceHint) {
+ auto shardStatsStatus = _clusterStats->getStats(txn);
+ if (!shardStatsStatus.isOK()) {
+ return shardStatsStatus.getStatus();
+ }
+
+ const auto shardStats = std::move(shardStatsStatus.getValue());
+
+ if (shardStats.size() < 2) {
+ return MigrateInfoVector{};
+ }
+
+ vector<CollectionType> collections;
+
+ Status collsStatus =
+ Grid::get(txn)->catalogClient(txn)->getCollections(txn, nullptr, &collections, nullptr);
+ if (!collsStatus.isOK()) {
+ return collsStatus;
+ }
+
+ if (collections.empty()) {
+ return MigrateInfoVector{};
+ }
+
+ MigrateInfoVector candidateChunks;
+
+ for (const auto& coll : collections) {
+ const NamespaceString nss(coll.getNs());
+
+ if (!coll.getAllowBalance()) {
+ LOG(1) << "Not balancing collection " << nss << "; explicitly disabled.";
+ continue;
+ }
+
+ auto candidatesStatus =
+ _getMigrateCandidatesForCollection(txn, nss, shardStats, aggressiveBalanceHint);
+ if (candidatesStatus == ErrorCodes::NamespaceNotFound) {
+ // Namespace got dropped before we managed to get to it, so just skip it
+ continue;
+ } else if (!candidatesStatus.isOK()) {
+ warning() << "Unable to balance collection " << nss.ns()
+ << causedBy(candidatesStatus.getStatus());
+ continue;
+ }
+
+ candidateChunks.insert(candidateChunks.end(),
+ std::make_move_iterator(candidatesStatus.getValue().begin()),
+ std::make_move_iterator(candidatesStatus.getValue().end()));
+ }
+
+ return candidateChunks;
+}
+
+StatusWith<boost::optional<MigrateInfo>>
+BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* txn,
+ const ChunkType& chunk) {
+ auto shardStatsStatus = _clusterStats->getStats(txn);
+ if (!shardStatsStatus.isOK()) {
+ return shardStatsStatus.getStatus();
+ }
+
+ const auto shardStats = std::move(shardStatsStatus.getValue());
+
+ const NamespaceString nss(chunk.getNS());
+
+ auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss);
+ if (!scopedCMStatus.isOK()) {
+ return scopedCMStatus.getStatus();
+ }
+
+ auto scopedCM = std::move(scopedCMStatus.getValue());
+ ChunkManager* const cm = scopedCM.cm();
+
+ auto collInfoStatus = createCollectionDistributionInfo(txn, shardStats, cm);
+ if (!collInfoStatus.isOK()) {
+ return collInfoStatus.getStatus();
+ }
+
+ auto collInfo = std::move(collInfoStatus.getValue());
+
+ return BalancerPolicy::balanceSingleChunk(chunk, shardStats, std::get<0>(collInfo));
+}
+
+Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId) {
+ auto shardStatsStatus = _clusterStats->getStats(txn);
+ if (!shardStatsStatus.isOK()) {
+ return shardStatsStatus.getStatus();
+ }
+
+ auto shardStats = std::move(shardStatsStatus.getValue());
+
+ const NamespaceString nss(chunk.getNS());
+
+ auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss);
+ if (!scopedCMStatus.isOK()) {
+ return scopedCMStatus.getStatus();
+ }
+
+ auto scopedCM = std::move(scopedCMStatus.getValue());
+ ChunkManager* const cm = scopedCM.cm();
+
+ auto collInfoStatus = createCollectionDistributionInfo(txn, shardStats, cm);
+ if (!collInfoStatus.isOK()) {
+ return collInfoStatus.getStatus();
+ }
+
+ auto collInfo = std::move(collInfoStatus.getValue());
+
+ DistributionStatus distribution = std::move(std::get<0>(collInfo));
+
+ auto newShardIterator =
+ std::find_if(shardStats.begin(),
+ shardStats.end(),
+ [&newShardId](const ClusterStatistics::ShardStatistics& stat) {
+ return stat.shardId == newShardId;
+ });
+ if (newShardIterator == shardStats.end()) {
+ return {ErrorCodes::ShardNotFound,
+ str::stream() << "Unable to find constraints information for shard " << newShardId
+ << ". Move to this shard will be disallowed."};
+ }
+
+ return BalancerPolicy::isShardSuitableReceiver(*newShardIterator,
+ distribution.getTagForChunk(chunk));
+}
+
+StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidatesForCollection(
+ OperationContext* txn, const NamespaceString& nss, const ShardStatisticsVector& shardStats) {
+ auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss);
+ if (!scopedCMStatus.isOK()) {
+ return scopedCMStatus.getStatus();
+ }
+
+ auto scopedCM = std::move(scopedCMStatus.getValue());
+ ChunkManager* const cm = scopedCM.cm();
+
+ auto collInfoStatus = createCollectionDistributionInfo(txn, shardStats, cm);
+ if (!collInfoStatus.isOK()) {
+ return collInfoStatus.getStatus();
+ }
+
+ auto collInfo = std::move(collInfoStatus.getValue());
+
+ DistributionStatus distribution = std::move(std::get<0>(collInfo));
+ ChunkMinimumsSet allChunkMinimums = std::move(std::get<1>(collInfo));
+
+ SplitInfoVector splitCandidates;
+
+ // Accumulate split points for the same chunk together
+ shared_ptr<Chunk> currentChunk;
+ vector<BSONObj> currentSplitVector;
+
+ for (const auto& tagRangeEntry : distribution.tagRanges()) {
+ const auto& tagRange = tagRangeEntry.second;
+
+ if (allChunkMinimums.count(tagRange.min)) {
+ continue;
+ }
+
+ shared_ptr<Chunk> chunk = cm->findIntersectingChunkWithSimpleCollation(txn, tagRange.min);
+
+ if (!currentChunk) {
+ currentChunk = chunk;
+ }
+
+ invariant(currentChunk);
+
+ if (chunk == currentChunk) {
+ currentSplitVector.push_back(tagRange.min);
+ } else {
+ splitCandidates.emplace_back(currentChunk->getShardId(),
+ nss,
+ cm->getVersion(),
+ currentChunk->getLastmod(),
+ currentChunk->getMin(),
+ currentChunk->getMax(),
+ std::move(currentSplitVector));
+
+ currentChunk = chunk;
+ currentSplitVector.push_back(tagRange.min);
+ }
+ }
+
+ // Drain the current split vector if there are any entries left
+ if (currentChunk) {
+ invariant(!currentSplitVector.empty());
+
+ splitCandidates.emplace_back(currentChunk->getShardId(),
+ nss,
+ cm->getVersion(),
+ currentChunk->getLastmod(),
+ currentChunk->getMin(),
+ currentChunk->getMax(),
+ std::move(currentSplitVector));
+ }
+
+ return splitCandidates;
+}
+
+StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandidatesForCollection(
+ OperationContext* txn,
+ const NamespaceString& nss,
+ const ShardStatisticsVector& shardStats,
+ bool aggressiveBalanceHint) {
+ auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss);
+ if (!scopedCMStatus.isOK()) {
+ return scopedCMStatus.getStatus();
+ }
+
+ auto scopedCM = std::move(scopedCMStatus.getValue());
+ ChunkManager* const cm = scopedCM.cm();
+
+ auto collInfoStatus = createCollectionDistributionInfo(txn, shardStats, cm);
+ if (!collInfoStatus.isOK()) {
+ return collInfoStatus.getStatus();
+ }
+
+ auto collInfo = std::move(collInfoStatus.getValue());
+
+ DistributionStatus distribution = std::move(std::get<0>(collInfo));
+ ChunkMinimumsSet allChunkMinimums = std::move(std::get<1>(collInfo));
+
+ for (const auto& tagRangeEntry : distribution.tagRanges()) {
+ const auto& tagRange = tagRangeEntry.second;
+
+ if (!allChunkMinimums.count(tagRange.min)) {
+ // This tag falls somewhere at the middle of a chunk. Therefore we must skip balancing
+ // this collection until it is split at the next iteration.
+ //
+ // TODO: We should be able to just skip chunks, which straddle tags and still make some
+ // progress balancing.
+ return {ErrorCodes::IllegalOperation,
+ str::stream()
+ << "Tag boundaries "
+ << tagRange.toString()
+ << " fall in the middle of an existing chunk. Balancing for collection "
+ << nss.ns()
+ << " will be postponed until the chunk is split appropriately."};
+ }
+ }
+
+ return BalancerPolicy::balance(shardStats, distribution, aggressiveBalanceHint);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h
new file mode 100644
index 00000000000..ffb769121b7
--- /dev/null
+++ b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/balancer/balancer_chunk_selection_policy.h"
+
+namespace mongo {
+
+class ClusterStatistics;
+
+class BalancerChunkSelectionPolicyImpl final : public BalancerChunkSelectionPolicy {
+public:
+ BalancerChunkSelectionPolicyImpl(ClusterStatistics* clusterStats);
+ ~BalancerChunkSelectionPolicyImpl();
+
+ StatusWith<SplitInfoVector> selectChunksToSplit(OperationContext* txn) override;
+
+ StatusWith<MigrateInfoVector> selectChunksToMove(OperationContext* txn,
+ bool aggressiveBalanceHint) override;
+
+ StatusWith<boost::optional<MigrateInfo>> selectSpecificChunkToMove(
+ OperationContext* txn, const ChunkType& chunk) override;
+
+ Status checkMoveAllowed(OperationContext* txn,
+ const ChunkType& chunk,
+ const ShardId& newShardId) override;
+
+private:
+ /**
+ * Synchronous method, which iterates the collection's chunks and uses the tags information to
+ * figure out whether some of them validate the tag range boundaries and need to be split.
+ */
+ StatusWith<SplitInfoVector> _getSplitCandidatesForCollection(
+ OperationContext* txn, const NamespaceString& nss, const ShardStatisticsVector& shardStats);
+
+ /**
+ * Synchronous method, which iterates the collection's chunks and uses the cluster statistics to
+ * figure out where to place them.
+ */
+ StatusWith<MigrateInfoVector> _getMigrateCandidatesForCollection(
+ OperationContext* txn,
+ const NamespaceString& nss,
+ const ShardStatisticsVector& shardStats,
+ bool aggressiveBalanceHint);
+
+ // Source for obtaining cluster statistics. Not owned and must not be destroyed before the
+ // policy object is destroyed.
+ ClusterStatistics* const _clusterStats;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer_configuration.cpp b/src/mongo/s/balancer/balancer_configuration.cpp
index 79098a2ec4a..0c86d4d35b9 100644
--- a/src/mongo/s/balancer_configuration.cpp
+++ b/src/mongo/s/balancer/balancer_configuration.cpp
@@ -30,7 +30,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include <algorithm>
diff --git a/src/mongo/s/balancer_configuration.h b/src/mongo/s/balancer/balancer_configuration.h
index 2f5370d162c..2f5370d162c 100644
--- a/src/mongo/s/balancer_configuration.h
+++ b/src/mongo/s/balancer/balancer_configuration.h
diff --git a/src/mongo/s/balancer_configuration_test.cpp b/src/mongo/s/balancer/balancer_configuration_test.cpp
index e889e4a5ded..7e81885b662 100644
--- a/src/mongo/s/balancer_configuration_test.cpp
+++ b/src/mongo/s/balancer/balancer_configuration_test.cpp
@@ -38,7 +38,7 @@
#include "mongo/executor/remote_command_request.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/sharding_test_fixture.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/s/balancer/balancer_policy.cpp b/src/mongo/s/balancer/balancer_policy.cpp
new file mode 100644
index 00000000000..67c75dd590b
--- /dev/null
+++ b/src/mongo/s/balancer/balancer_policy.cpp
@@ -0,0 +1,518 @@
+/**
+* Copyright (C) 2010 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects
+* for all of the code used other than as permitted herein. If you modify
+* file(s) with this exception, you may extend this exception to your
+* version of the file(s), but you are not obligated to do so. If you do not
+* wish to do so, delete this exception statement from your version. If you
+* delete this exception statement from all source files in the program,
+* then also delete it in the license file.
+*/
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/balancer_policy.h"
+
+#include "mongo/bson/simple_bsonobj_comparator.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/catalog/type_tags.h"
+#include "mongo/util/log.h"
+#include "mongo/util/stringutils.h"
+
+namespace mongo {
+
+using std::map;
+using std::numeric_limits;
+using std::set;
+using std::string;
+using std::vector;
+
+namespace {
+
+// These values indicate the minimum deviation shard's number of chunks need to have from the
+// optimal average across all shards for a zone for a rebalancing migration to be initiated.
+const size_t kDefaultImbalanceThreshold = 2;
+const size_t kAggressiveImbalanceThreshold = 1;
+
+} // namespace
+
+DistributionStatus::DistributionStatus(NamespaceString nss, ShardToChunksMap shardToChunksMap)
+ : _nss(std::move(nss)),
+ _shardChunks(std::move(shardToChunksMap)),
+ _zoneRanges(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ZoneRange>()) {}
+
+size_t DistributionStatus::totalChunks() const {
+ size_t total = 0;
+
+ for (const auto& shardChunk : _shardChunks) {
+ total += shardChunk.second.size();
+ }
+
+ return total;
+}
+
+size_t DistributionStatus::totalChunksWithTag(const std::string& tag) const {
+ size_t total = 0;
+
+ for (const auto& shardChunk : _shardChunks) {
+ total += numberOfChunksInShardWithTag(shardChunk.first, tag);
+ }
+
+ return total;
+}
+
+size_t DistributionStatus::numberOfChunksInShard(const ShardId& shardId) const {
+ const auto& shardChunks = getChunks(shardId);
+ return shardChunks.size();
+}
+
+size_t DistributionStatus::numberOfChunksInShardWithTag(const ShardId& shardId,
+ const string& tag) const {
+ const auto& shardChunks = getChunks(shardId);
+
+ size_t total = 0;
+
+ for (const auto& chunk : shardChunks) {
+ if (tag == getTagForChunk(chunk)) {
+ total++;
+ }
+ }
+
+ return total;
+}
+
+const vector<ChunkType>& DistributionStatus::getChunks(const ShardId& shardId) const {
+ ShardToChunksMap::const_iterator i = _shardChunks.find(shardId);
+ invariant(i != _shardChunks.end());
+
+ return i->second;
+}
+
+Status DistributionStatus::addRangeToZone(const ZoneRange& range) {
+ const auto minIntersect = _zoneRanges.upper_bound(range.min);
+ const auto maxIntersect = _zoneRanges.upper_bound(range.max);
+
+ // Check for partial overlap
+ if (minIntersect != maxIntersect) {
+ invariant(minIntersect != _zoneRanges.end());
+ const auto& intersectingRange =
+ (SimpleBSONObjComparator::kInstance.evaluate(minIntersect->second.min < range.max))
+ ? minIntersect->second
+ : maxIntersect->second;
+
+ if (SimpleBSONObjComparator::kInstance.evaluate(intersectingRange.min == range.min) &&
+ SimpleBSONObjComparator::kInstance.evaluate(intersectingRange.max == range.max) &&
+ intersectingRange.zone == range.zone) {
+ return Status::OK();
+ }
+
+ return {ErrorCodes::RangeOverlapConflict,
+ str::stream() << "Zone range: " << range.toString()
+ << " is overlapping with existing: "
+ << intersectingRange.toString()};
+ }
+
+ // Check for containment
+ if (minIntersect != _zoneRanges.end()) {
+ const ZoneRange& nextRange = minIntersect->second;
+ if (SimpleBSONObjComparator::kInstance.evaluate(range.max > nextRange.min)) {
+ invariant(SimpleBSONObjComparator::kInstance.evaluate(range.max < nextRange.max));
+ return {ErrorCodes::RangeOverlapConflict,
+ str::stream() << "Zone range: " << range.toString()
+ << " is overlapping with existing: "
+ << nextRange.toString()};
+ }
+ }
+
+ _zoneRanges[range.max.getOwned()] = range;
+ _allTags.insert(range.zone);
+ return Status::OK();
+}
+
+string DistributionStatus::getTagForChunk(const ChunkType& chunk) const {
+ const auto minIntersect = _zoneRanges.upper_bound(chunk.getMin());
+ const auto maxIntersect = _zoneRanges.lower_bound(chunk.getMax());
+
+ // We should never have a partial overlap with a chunk range. If it happens, treat it as if this
+ // chunk doesn't belong to a tag
+ if (minIntersect != maxIntersect) {
+ return "";
+ }
+
+ if (minIntersect == _zoneRanges.end()) {
+ return "";
+ }
+
+ const ZoneRange& intersectRange = minIntersect->second;
+
+ // Check for containment
+ if (SimpleBSONObjComparator::kInstance.evaluate(intersectRange.min <= chunk.getMin()) &&
+ SimpleBSONObjComparator::kInstance.evaluate(chunk.getMax() <= intersectRange.max)) {
+ return intersectRange.zone;
+ }
+
+ return "";
+}
+
+void DistributionStatus::report(BSONObjBuilder* builder) const {
+ builder->append("ns", _nss.ns());
+
+ // Report all shards
+ BSONArrayBuilder shardArr(builder->subarrayStart("shards"));
+ for (const auto& shardChunk : _shardChunks) {
+ BSONObjBuilder shardEntry(shardArr.subobjStart());
+ shardEntry.append("name", shardChunk.first.toString());
+
+ BSONArrayBuilder chunkArr(shardEntry.subarrayStart("chunks"));
+ for (const auto& chunk : shardChunk.second) {
+ chunkArr.append(chunk.toBSON());
+ }
+ chunkArr.doneFast();
+
+ shardEntry.doneFast();
+ }
+ shardArr.doneFast();
+
+ // Report all tags
+ BSONArrayBuilder tagsArr(builder->subarrayStart("tags"));
+ tagsArr.append(_allTags);
+ tagsArr.doneFast();
+
+ // Report all tag ranges
+ BSONArrayBuilder tagRangesArr(builder->subarrayStart("tagRanges"));
+ for (const auto& tagRange : _zoneRanges) {
+ BSONObjBuilder tagRangeEntry(tagRangesArr.subobjStart());
+ tagRangeEntry.append("tag", tagRange.second.zone);
+ tagRangeEntry.append("mapKey", tagRange.first);
+ tagRangeEntry.append("min", tagRange.second.min);
+ tagRangeEntry.append("max", tagRange.second.max);
+ tagRangeEntry.doneFast();
+ }
+ tagRangesArr.doneFast();
+}
+
+string DistributionStatus::toString() const {
+ BSONObjBuilder builder;
+ report(&builder);
+
+ return builder.obj().toString();
+}
+
+Status BalancerPolicy::isShardSuitableReceiver(const ClusterStatistics::ShardStatistics& stat,
+ const string& chunkTag) {
+ if (stat.isSizeMaxed()) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << stat.shardId
+ << " has already reached the maximum total chunk size."};
+ }
+
+ if (stat.isDraining) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << stat.shardId << " is currently draining."};
+ }
+
+ if (!chunkTag.empty() && !stat.shardTags.count(chunkTag)) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << stat.shardId << " doesn't have right tag"};
+ }
+
+ return Status::OK();
+}
+
+ShardId BalancerPolicy::_getLeastLoadedReceiverShard(const ShardStatisticsVector& shardStats,
+ const DistributionStatus& distribution,
+ const string& tag,
+ const set<ShardId>& excludedShards) {
+ ShardId best;
+ unsigned minChunks = numeric_limits<unsigned>::max();
+
+ for (const auto& stat : shardStats) {
+ if (excludedShards.count(stat.shardId))
+ continue;
+
+ auto status = isShardSuitableReceiver(stat, tag);
+ if (!status.isOK()) {
+ continue;
+ }
+
+ unsigned myChunks = distribution.numberOfChunksInShard(stat.shardId);
+ if (myChunks >= minChunks) {
+ continue;
+ }
+
+ best = stat.shardId;
+ minChunks = myChunks;
+ }
+
+ return best;
+}
+
+ShardId BalancerPolicy::_getMostOverloadedShard(const ShardStatisticsVector& shardStats,
+ const DistributionStatus& distribution,
+ const string& chunkTag,
+ const set<ShardId>& excludedShards) {
+ ShardId worst;
+ unsigned maxChunks = 0;
+
+ for (const auto& stat : shardStats) {
+ if (excludedShards.count(stat.shardId))
+ continue;
+
+ const unsigned shardChunkCount =
+ distribution.numberOfChunksInShardWithTag(stat.shardId, chunkTag);
+ if (shardChunkCount <= maxChunks)
+ continue;
+
+ worst = stat.shardId;
+ maxChunks = shardChunkCount;
+ }
+
+ return worst;
+}
+
+vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardStats,
+ const DistributionStatus& distribution,
+ bool shouldAggressivelyBalance) {
+ vector<MigrateInfo> migrations;
+
+ // Set of shards, which have already been used for migrations. Used so we don't return multiple
+ // migrations for the same shard.
+ set<ShardId> usedShards;
+
+ // 1) Check for shards, which are in draining mode and must have chunks moved off of them
+ {
+ for (const auto& stat : shardStats) {
+ if (!stat.isDraining)
+ continue;
+
+ const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId);
+
+ if (chunks.empty())
+ continue;
+
+ // Now we know we need to move to chunks off this shard, but only if permitted by the
+ // tags policy
+ unsigned numJumboChunks = 0;
+
+ // Since we have to move all chunks, lets just do in order
+ for (const auto& chunk : chunks) {
+ if (chunk.getJumbo()) {
+ numJumboChunks++;
+ continue;
+ }
+
+ const string tag = distribution.getTagForChunk(chunk);
+
+ const ShardId to =
+ _getLeastLoadedReceiverShard(shardStats, distribution, tag, usedShards);
+ if (!to.isValid()) {
+ if (migrations.empty()) {
+ warning() << "Chunk " << redact(chunk.toString())
+ << " is on a draining shard, but no appropriate recipient found";
+ }
+ continue;
+ }
+
+ invariant(to != stat.shardId);
+ migrations.emplace_back(distribution.nss().ns(), to, chunk);
+ invariant(usedShards.insert(stat.shardId).second);
+ invariant(usedShards.insert(to).second);
+ break;
+ }
+
+ if (migrations.empty()) {
+ warning() << "Unable to find any chunk to move from draining shard " << stat.shardId
+ << ". numJumboChunks: " << numJumboChunks;
+ }
+ }
+ }
+
+ // 2) Check for chunks, which are on the wrong shard and must be moved off of it
+ if (!distribution.tags().empty()) {
+ for (const auto& stat : shardStats) {
+ const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId);
+ for (const auto& chunk : chunks) {
+ const string tag = distribution.getTagForChunk(chunk);
+ if (tag.empty())
+ continue;
+
+ if (stat.shardTags.count(tag))
+ continue;
+
+ if (chunk.getJumbo()) {
+ warning() << "chunk " << redact(chunk.toString()) << " violates tag "
+ << redact(tag) << ", but it is jumbo and cannot be moved";
+ continue;
+ }
+
+ const ShardId to =
+ _getLeastLoadedReceiverShard(shardStats, distribution, tag, usedShards);
+ if (!to.isValid()) {
+ if (migrations.empty()) {
+ warning() << "chunk " << redact(chunk.toString()) << " violates tag "
+ << redact(tag) << ", but no appropriate recipient found";
+ }
+ continue;
+ }
+
+ invariant(to != stat.shardId);
+ migrations.emplace_back(distribution.nss().ns(), to, chunk);
+ invariant(usedShards.insert(stat.shardId).second);
+ invariant(usedShards.insert(to).second);
+ break;
+ }
+ }
+ }
+
+ // 3) for each tag balance
+ const size_t imbalanceThreshold = (shouldAggressivelyBalance || distribution.totalChunks() < 20)
+ ? kAggressiveImbalanceThreshold
+ : kDefaultImbalanceThreshold;
+
+ vector<string> tagsPlusEmpty(distribution.tags().begin(), distribution.tags().end());
+ tagsPlusEmpty.push_back("");
+
+ for (const auto& tag : tagsPlusEmpty) {
+ while (_singleZoneBalance(
+ shardStats, distribution, tag, imbalanceThreshold, &migrations, &usedShards))
+ ;
+ }
+
+ return migrations;
+}
+
+boost::optional<MigrateInfo> BalancerPolicy::balanceSingleChunk(
+ const ChunkType& chunk,
+ const ShardStatisticsVector& shardStats,
+ const DistributionStatus& distribution) {
+ const string tag = distribution.getTagForChunk(chunk);
+
+ ShardId newShardId =
+ _getLeastLoadedReceiverShard(shardStats, distribution, tag, set<ShardId>());
+ if (!newShardId.isValid() || newShardId == chunk.getShard()) {
+ return boost::optional<MigrateInfo>();
+ }
+
+ return MigrateInfo(distribution.nss().ns(), newShardId, chunk);
+}
+
+bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats,
+ const DistributionStatus& distribution,
+ const string& tag,
+ size_t imbalanceThreshold,
+ vector<MigrateInfo>* migrations,
+ set<ShardId>* usedShards) {
+ const ShardId from = _getMostOverloadedShard(shardStats, distribution, tag, *usedShards);
+ if (!from.isValid())
+ return false;
+
+ const size_t max = distribution.numberOfChunksInShardWithTag(from, tag);
+ if (max == 0)
+ return false;
+
+ const ShardId to = _getLeastLoadedReceiverShard(shardStats, distribution, tag, *usedShards);
+ if (!to.isValid()) {
+ if (migrations->empty()) {
+ log() << "No available shards to take chunks for tag [" << tag << "]";
+ }
+ return false;
+ }
+
+ const size_t min = distribution.numberOfChunksInShardWithTag(to, tag);
+ if (min >= max)
+ return false;
+
+ const size_t totalNumberOfChunksWithTag =
+ (tag.empty() ? distribution.totalChunks() : distribution.totalChunksWithTag(tag));
+
+ size_t totalNumberOfShardsWithTag = 0;
+
+ for (const auto& stat : shardStats) {
+ if (tag.empty() || stat.shardTags.count(tag)) {
+ totalNumberOfShardsWithTag++;
+ }
+ }
+
+ // totalNumberOfShardsWithTag cannot be zero if the to shard is valid
+ invariant(totalNumberOfShardsWithTag);
+ invariant(totalNumberOfChunksWithTag >= max);
+
+ // Calculate the ceiling of the optimal number of chunks per shard
+ const size_t idealNumberOfChunksPerShardWithTag =
+ (totalNumberOfChunksWithTag / totalNumberOfShardsWithTag) +
+ (totalNumberOfChunksWithTag % totalNumberOfShardsWithTag ? 1 : 0);
+
+ const size_t imbalance = max - idealNumberOfChunksPerShardWithTag;
+
+ LOG(1) << "collection : " << distribution.nss().ns();
+ LOG(1) << "zone : " << tag;
+ LOG(1) << "donor : " << from << " chunks on " << max;
+ LOG(1) << "receiver : " << to << " chunks on " << min;
+ LOG(1) << "ideal : " << idealNumberOfChunksPerShardWithTag;
+ LOG(1) << "threshold : " << imbalanceThreshold;
+
+ // Check whether it is necessary to balance within this zone
+ if (imbalance < imbalanceThreshold)
+ return false;
+
+ const vector<ChunkType>& chunks = distribution.getChunks(from);
+
+ unsigned numJumboChunks = 0;
+
+ for (const auto& chunk : chunks) {
+ if (distribution.getTagForChunk(chunk) != tag)
+ continue;
+
+ if (chunk.getJumbo()) {
+ numJumboChunks++;
+ continue;
+ }
+
+ migrations->emplace_back(distribution.nss().ns(), to, chunk);
+ invariant(usedShards->insert(chunk.getShard()).second);
+ invariant(usedShards->insert(to).second);
+ return true;
+ }
+
+ if (numJumboChunks) {
+ warning() << "Shard: " << from << ", collection: " << distribution.nss().ns()
+ << " has only jumbo chunks for zone \'" << tag
+ << "\' and cannot be balanced. Jumbo chunks count: " << numJumboChunks;
+ }
+
+ return false;
+}
+
+string ZoneRange::toString() const {
+ return str::stream() << min << " -->> " << max << " on " << zone;
+}
+
+std::string MigrateInfo::getName() const {
+ return ChunkType::genID(ns, minKey);
+}
+
+string MigrateInfo::toString() const {
+ return str::stream() << ns << ": [" << minKey << ", " << maxKey << "), from " << from << ", to "
+ << to;
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/balancer_policy.h b/src/mongo/s/balancer/balancer_policy.h
new file mode 100644
index 00000000000..97a1f2e91b1
--- /dev/null
+++ b/src/mongo/s/balancer/balancer_policy.h
@@ -0,0 +1,246 @@
+/**
+* Copyright (C) 2010 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects
+* for all of the code used other than as permitted herein. If you modify
+* file(s) with this exception, you may extend this exception to your
+* version of the file(s), but you are not obligated to do so. If you do not
+* wish to do so, delete this exception statement from your version. If you
+* delete this exception statement from all source files in the program,
+* then also delete it in the license file.
+*/
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/simple_bsonobj_comparator.h"
+#include "mongo/s/balancer/cluster_statistics.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/client/shard.h"
+
+namespace mongo {
+
+class ChunkManager;
+class OperationContext;
+
+struct ZoneRange {
+ ZoneRange() = default;
+
+ ZoneRange(const BSONObj& a_min, const BSONObj& a_max, const std::string& _zone)
+ : min(a_min.getOwned()), max(a_max.getOwned()), zone(_zone) {}
+
+ std::string toString() const;
+
+ BSONObj min;
+ BSONObj max;
+ std::string zone;
+};
+
+struct MigrateInfo {
+ MigrateInfo(const std::string& a_ns, const ShardId& a_to, const ChunkType& a_chunk)
+ : ns(a_ns),
+ to(a_to),
+ from(a_chunk.getShard()),
+ minKey(a_chunk.getMin()),
+ maxKey(a_chunk.getMax()) {}
+
+ MigrateInfo(const std::string& a_ns,
+ const ShardId& a_to,
+ const ShardId& a_from,
+ const BSONObj& a_minKey,
+ const BSONObj& a_maxKey)
+ : ns(a_ns), to(a_to), from(a_from), minKey(a_minKey), maxKey(a_maxKey) {}
+
+ std::string getName() const;
+ std::string toString() const;
+
+ std::string ns;
+ ShardId to;
+ ShardId from;
+ BSONObj minKey;
+ BSONObj maxKey;
+};
+
+typedef std::vector<ClusterStatistics::ShardStatistics> ShardStatisticsVector;
+typedef std::map<ShardId, std::vector<ChunkType>> ShardToChunksMap;
+
+/**
+ * This class constitutes a cache of the chunk distribution across the entire cluster along with the
+ * zone boundaries imposed on it. This information is stored in format, which makes it efficient to
+ * query utilization statististics and to decide what to balance.
+ */
+class DistributionStatus {
+ MONGO_DISALLOW_COPYING(DistributionStatus);
+
+public:
+ DistributionStatus(NamespaceString nss, ShardToChunksMap shardToChunksMap);
+ DistributionStatus(DistributionStatus&&) = default;
+
+ /**
+ * Returns the namespace for which this balance status applies.
+ */
+ const NamespaceString& nss() const {
+ return _nss;
+ }
+
+ /**
+ * Appends the specified range to the set of ranges tracked for this collection and checks if
+ * it overlaps with existing ranges.
+ */
+ Status addRangeToZone(const ZoneRange& range);
+
+ /**
+ * Returns total number of chunks across all shards.
+ */
+ size_t totalChunks() const;
+
+ /**
+ * Returns the total number of chunks across all shards, which fall into the specified zone's
+ * range.
+ */
+ size_t totalChunksWithTag(const std::string& tag) const;
+
+ /**
+ * Returns number of chunks in the specified shard.
+ */
+ size_t numberOfChunksInShard(const ShardId& shardId) const;
+
+ /**
+ * Returns number of chunks in the specified shard, which have the given tag.
+ */
+ size_t numberOfChunksInShardWithTag(const ShardId& shardId, const std::string& tag) const;
+
+ /**
+ * Returns all chunks for the specified shard.
+ */
+ const std::vector<ChunkType>& getChunks(const ShardId& shardId) const;
+
+ /**
+ * Returns all tag ranges defined for the collection.
+ */
+ const BSONObjIndexedMap<ZoneRange>& tagRanges() const {
+ return _zoneRanges;
+ }
+
+ /**
+ * Returns all tags defined for the collection.
+ */
+ const std::set<std::string>& tags() const {
+ return _allTags;
+ }
+
+ /**
+ * Using the set of tags defined for the collection, returns what tag corresponds to the
+ * specified chunk. If the chunk doesn't fall into any tag returns the empty string.
+ */
+ std::string getTagForChunk(const ChunkType& chunk) const;
+
+ /**
+ * Returns a BSON/string representation of this distribution status.
+ */
+ void report(BSONObjBuilder* builder) const;
+ std::string toString() const;
+
+private:
+ // Namespace for which this distribution applies
+ NamespaceString _nss;
+
+ // Map of what chunks are owned by each shard
+ ShardToChunksMap _shardChunks;
+
+ // Map of zone max key to the zone description
+ BSONObjIndexedMap<ZoneRange> _zoneRanges;
+
+ // Set of all zones defined for this collection
+ std::set<std::string> _allTags;
+};
+
+class BalancerPolicy {
+public:
+ /**
+ * Determines whether a shard with the specified utilization statistics would be able to accept
+ * a chunk with the specified tag. According to the policy a shard cannot accept chunks if its
+ * size is maxed out and if the chunk's tag conflicts with the tag of the shard.
+ */
+ static Status isShardSuitableReceiver(const ClusterStatistics::ShardStatistics& stat,
+ const std::string& chunkTag);
+
+ /**
+ * Returns a suggested set of chunks to move whithin a collection's shards, given the specified
+ * state of the shards (draining, max size reached, etc) and the number of chunks for that
+ * collection. If the policy doesn't recommend anything to move, it returns an empty vector. The
+ * entries in the vector do are all for separate source/destination shards and as such do not
+ * need to be done serially and can be scheduled in parallel.
+ *
+ * The balancing logic calculates the optimum number of chunks per shard for each zone and if
+ * any of the shards have chunks, which are sufficiently higher than this number, suggests
+ * moving chunks to shards, which are under this number.
+ *
+ * The shouldAggressivelyBalance parameter causes the threshold for chunk could disparity
+ * between shards to be lowered.
+ */
+ static std::vector<MigrateInfo> balance(const ShardStatisticsVector& shardStats,
+ const DistributionStatus& distribution,
+ bool shouldAggressivelyBalance);
+
+ /**
+ * Using the specified distribution information, returns a suggested better location for the
+ * specified chunk if one is available.
+ */
+ static boost::optional<MigrateInfo> balanceSingleChunk(const ChunkType& chunk,
+ const ShardStatisticsVector& shardStats,
+ const DistributionStatus& distribution);
+
+private:
+ /**
+ * Return the shard with the specified tag, which has the least number of chunks. If the tag is
+ * empty, considers all shards.
+ */
+ static ShardId _getLeastLoadedReceiverShard(const ShardStatisticsVector& shardStats,
+ const DistributionStatus& distribution,
+ const std::string& tag,
+ const std::set<ShardId>& excludedShards);
+
+ /**
+ * Return the shard which has the least number of chunks with the specified tag. If the tag is
+ * empty, considers all chunks.
+ */
+ static ShardId _getMostOverloadedShard(const ShardStatisticsVector& shardStats,
+ const DistributionStatus& distribution,
+ const std::string& chunkTag,
+ const std::set<ShardId>& excludedShards);
+
+ /**
+ * Selects one chunk for the specified zone (if appropriate) to be moved in order to bring the
+ * deviation of the shards chunk contents closer to even across all shards in the specified
+ * zone. Takes into account the shards, which have already been used for migrations.
+ *
+ * Returns true if a migration was suggested, false otherwise. This method is intented to be
+ * called multiple times until all posible migrations for a zone have been selected.
+ */
+ static bool _singleZoneBalance(const ShardStatisticsVector& shardStats,
+ const DistributionStatus& distribution,
+ const std::string& tag,
+ size_t imbalanceThreshold,
+ std::vector<MigrateInfo>* migrations,
+ std::set<ShardId>* usedShards);
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/balancer_policy_tests.cpp b/src/mongo/s/balancer/balancer_policy_tests.cpp
new file mode 100644
index 00000000000..b6024c8f1d5
--- /dev/null
+++ b/src/mongo/s/balancer/balancer_policy_tests.cpp
@@ -0,0 +1,608 @@
+/**
+ * Copyright (C) 2012-2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/platform/random.h"
+#include "mongo/s/balancer/balancer_policy.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+using std::map;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+using ShardStatistics = ClusterStatistics::ShardStatistics;
+
+const auto emptyTagSet = std::set<std::string>();
+const std::string emptyShardVersion = "";
+const auto kShardId0 = ShardId("shard0");
+const auto kShardId1 = ShardId("shard1");
+const auto kShardId2 = ShardId("shard2");
+const auto kShardId3 = ShardId("shard3");
+const NamespaceString kNamespace("TestDB", "TestColl");
+const uint64_t kNoMaxSize = 0;
+
+/**
+ * Constructs a shard statistics vector and a consistent mapping of chunks to shards given the
+ * specified input parameters. The generated chunks have an ever increasing min value. I.e, they
+ * will be in the form:
+ *
+ * [MinKey, 1), [1, 2), [2, 3) ... [N - 1, MaxKey)
+ */
+std::pair<ShardStatisticsVector, ShardToChunksMap> generateCluster(
+ const vector<std::pair<ShardStatistics, size_t>>& shardsAndNumChunks) {
+ int64_t totalNumChunks = 0;
+ for (const auto& entry : shardsAndNumChunks) {
+ totalNumChunks += std::get<1>(entry);
+ }
+
+ ShardToChunksMap chunkMap;
+ ShardStatisticsVector shardStats;
+
+ int64_t currentChunk = 0;
+
+ for (auto it = shardsAndNumChunks.begin(); it != shardsAndNumChunks.end(); it++) {
+ ShardStatistics shard = std::move(it->first);
+ const size_t numChunks = it->second;
+
+ // Ensure that an entry is created
+ chunkMap[shard.shardId];
+
+ for (size_t i = 0; i < numChunks; i++, currentChunk++) {
+ ChunkType chunk;
+ chunk.setMin(currentChunk == 0 ? kMinBSONKey : BSON("x" << currentChunk));
+ chunk.setMax(currentChunk == totalNumChunks - 1 ? kMaxBSONKey
+ : BSON("x" << currentChunk + 1));
+ chunk.setShard(shard.shardId);
+
+ chunkMap[shard.shardId].push_back(std::move(chunk));
+ }
+
+ shardStats.push_back(std::move(shard));
+ }
+
+ return std::make_pair(std::move(shardStats), std::move(chunkMap));
+}
+
+TEST(BalancerPolicy, Basic) {
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
+ {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0},
+ {ShardStatistics(kShardId2, kNoMaxSize, 3, false, emptyTagSet, emptyShardVersion), 3}});
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT_EQ(1U, migrations.size());
+ ASSERT_EQ(kShardId0, migrations[0].from);
+ ASSERT_EQ(kShardId1, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey);
+}
+
+TEST(BalancerPolicy, SmallClusterShouldBePerfectlyBalanced) {
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1},
+ {ShardStatistics(kShardId1, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 2},
+ {ShardStatistics(kShardId2, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT_EQ(1U, migrations.size());
+ ASSERT_EQ(kShardId1, migrations[0].from);
+ ASSERT_EQ(kShardId2, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), migrations[0].maxKey);
+}
+
+TEST(BalancerPolicy, SingleChunkShouldNotMove) {
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1},
+ {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
+
+ ASSERT(
+ BalancerPolicy::balance(cluster.first, DistributionStatus(kNamespace, cluster.second), true)
+ .empty());
+ ASSERT(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false)
+ .empty());
+}
+
+TEST(BalancerPolicy, BalanceThresholdObeyed) {
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 2},
+ {ShardStatistics(kShardId1, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 2},
+ {ShardStatistics(kShardId2, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1},
+ {ShardStatistics(kShardId3, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}});
+
+ ASSERT(
+ BalancerPolicy::balance(cluster.first, DistributionStatus(kNamespace, cluster.second), true)
+ .empty());
+ ASSERT(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false)
+ .empty());
+}
+
+TEST(BalancerPolicy, ParallelBalancing) {
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
+ {ShardStatistics(kShardId1, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
+ {ShardStatistics(kShardId2, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0},
+ {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT_EQ(2U, migrations.size());
+
+ ASSERT_EQ(kShardId0, migrations[0].from);
+ ASSERT_EQ(kShardId2, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey);
+
+ ASSERT_EQ(kShardId1, migrations[1].from);
+ ASSERT_EQ(kShardId3, migrations[1].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMin(), migrations[1].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), migrations[1].maxKey);
+}
+
+TEST(BalancerPolicy, JumboChunksNotMoved) {
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4},
+ {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
+
+ cluster.second[kShardId0][0].setJumbo(true);
+ cluster.second[kShardId0][1].setJumbo(false); // Only chunk 1 is not jumbo
+ cluster.second[kShardId0][2].setJumbo(true);
+ cluster.second[kShardId0][3].setJumbo(true);
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT_EQ(1U, migrations.size());
+ ASSERT_EQ(kShardId0, migrations[0].from);
+ ASSERT_EQ(kShardId1, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMax(), migrations[0].maxKey);
+}
+
+TEST(BalancerPolicy, JumboChunksNotMovedParallel) {
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4},
+ {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0},
+ {ShardStatistics(kShardId2, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4},
+ {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
+
+ cluster.second[kShardId0][0].setJumbo(true);
+ cluster.second[kShardId0][1].setJumbo(false); // Only chunk 1 is not jumbo
+ cluster.second[kShardId0][2].setJumbo(true);
+ cluster.second[kShardId0][3].setJumbo(true);
+
+ cluster.second[kShardId2][0].setJumbo(true);
+ cluster.second[kShardId2][1].setJumbo(true);
+ cluster.second[kShardId2][2].setJumbo(false); // Only chunk 1 is not jumbo
+ cluster.second[kShardId2][3].setJumbo(true);
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT_EQ(2U, migrations.size());
+
+ ASSERT_EQ(kShardId0, migrations[0].from);
+ ASSERT_EQ(kShardId1, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMax(), migrations[0].maxKey);
+
+ ASSERT_EQ(kShardId2, migrations[1].from);
+ ASSERT_EQ(kShardId3, migrations[1].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][2].getMin(), migrations[1].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][2].getMax(), migrations[1].maxKey);
+}
+
+TEST(BalancerPolicy, DrainingSingleChunk) {
+ // shard0 is draining and chunks will go to shard1, even though it has a lot more chunks
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 1},
+ {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}});
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT_EQ(1U, migrations.size());
+ ASSERT_EQ(kShardId0, migrations[0].from);
+ ASSERT_EQ(kShardId1, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey);
+}
+
+TEST(BalancerPolicy, DrainingSingleChunkPerShard) {
+ // shard0 and shard2 are draining and chunks will go to shard1 and shard3 in parallel
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 1},
+ {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5},
+ {ShardStatistics(kShardId2, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 1},
+ {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}});
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT_EQ(2U, migrations.size());
+
+ ASSERT_EQ(kShardId0, migrations[0].from);
+ ASSERT_EQ(kShardId1, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey);
+
+ ASSERT_EQ(kShardId2, migrations[1].from);
+ ASSERT_EQ(kShardId3, migrations[1].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[1].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[1].maxKey);
+}
+
+TEST(BalancerPolicy, DrainingWithTwoChunksFirstOneSelected) {
+ // shard0 is draining and chunks will go to shard1, even though it has a lot more chunks
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 2},
+ {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}});
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT_EQ(1U, migrations.size());
+ ASSERT_EQ(kShardId0, migrations[0].from);
+ ASSERT_EQ(kShardId1, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey);
+}
+
+TEST(BalancerPolicy, DrainingMultipleShardsFirstOneSelected) {
+ // shard0 and shard1 are both draining with very little chunks in them and chunks will go to
+ // shard2, even though it has a lot more chunks that the other two
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 5, true, emptyTagSet, emptyShardVersion), 1},
+ {ShardStatistics(kShardId1, kNoMaxSize, 5, true, emptyTagSet, emptyShardVersion), 2},
+ {ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 16}});
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT_EQ(1U, migrations.size());
+ ASSERT_EQ(kShardId0, migrations[0].from);
+ ASSERT_EQ(kShardId2, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey);
+}
+
+TEST(BalancerPolicy, DrainingMultipleShardsWontAcceptChunks) {
+ // shard0 has many chunks, but can't move them to shard1 or shard2 because they are draining
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4},
+ {ShardStatistics(kShardId1, kNoMaxSize, 0, true, emptyTagSet, emptyShardVersion), 0},
+ {ShardStatistics(kShardId2, kNoMaxSize, 0, true, emptyTagSet, emptyShardVersion), 0}});
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT(migrations.empty());
+}
+
+TEST(BalancerPolicy, DrainingSingleAppropriateShardFoundDueToTag) {
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 2, false, {"NYC"}, emptyShardVersion), 4},
+ {ShardStatistics(kShardId1, kNoMaxSize, 2, false, {"LAX"}, emptyShardVersion), 4},
+ {ShardStatistics(kShardId2, kNoMaxSize, 1, true, {"LAX"}, emptyShardVersion), 1}});
+
+ DistributionStatus distribution(kNamespace, cluster.second);
+ ASSERT_OK(distribution.addRangeToZone(ZoneRange(
+ cluster.second[kShardId2][0].getMin(), cluster.second[kShardId2][0].getMax(), "LAX")));
+
+ const auto migrations(BalancerPolicy::balance(cluster.first, distribution, false));
+ ASSERT_EQ(1U, migrations.size());
+ ASSERT_EQ(kShardId2, migrations[0].from);
+ ASSERT_EQ(kShardId1, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey);
+}
+
+TEST(BalancerPolicy, DrainingNoAppropriateShardsFoundDueToTag) {
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 2, false, {"NYC"}, emptyShardVersion), 4},
+ {ShardStatistics(kShardId1, kNoMaxSize, 2, false, {"LAX"}, emptyShardVersion), 4},
+ {ShardStatistics(kShardId2, kNoMaxSize, 1, true, {"SEA"}, emptyShardVersion), 1}});
+
+ DistributionStatus distribution(kNamespace, cluster.second);
+ ASSERT_OK(distribution.addRangeToZone(ZoneRange(
+ cluster.second[kShardId2][0].getMin(), cluster.second[kShardId2][0].getMax(), "SEA")));
+
+ const auto migrations(BalancerPolicy::balance(cluster.first, distribution, false));
+ ASSERT(migrations.empty());
+}
+
+TEST(BalancerPolicy, NoBalancingDueToAllNodesEitherDrainingOrMaxedOut) {
+ // shard0 and shard2 are draining, shard1 is maxed out
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 1},
+ {ShardStatistics(kShardId1, 1, 1, false, emptyTagSet, emptyShardVersion), 6},
+ {ShardStatistics(kShardId2, kNoMaxSize, 1, true, emptyTagSet, emptyShardVersion), 1}});
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT(migrations.empty());
+}
+
+TEST(BalancerPolicy, BalancerRespectsMaxShardSizeOnlyBalanceToNonMaxed) {
+ // Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 3. Other
+ // shards have maxSize = 0 = unset. Even though the overloaded shard has the least number of
+ // less chunks, we shouldn't move chunks to that shard.
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, 1, 3, false, emptyTagSet, emptyShardVersion), 2},
+ {ShardStatistics(kShardId1, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
+ {ShardStatistics(kShardId2, kNoMaxSize, 6, false, emptyTagSet, emptyShardVersion), 6}});
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT_EQ(1U, migrations.size());
+ ASSERT_EQ(kShardId2, migrations[0].from);
+ ASSERT_EQ(kShardId1, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey);
+}
+
+TEST(BalancerPolicy, BalancerRespectsMaxShardSizeWhenAllBalanced) {
+ // Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 4. Other
+ // shards have maxSize = 0 = unset. We check that being over the maxSize is NOT equivalent to
+ // draining, we don't want to empty shards for no other reason than they are over this limit.
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, 1, 4, false, emptyTagSet, emptyShardVersion), 4},
+ {ShardStatistics(kShardId1, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
+ {ShardStatistics(kShardId2, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4}});
+
+ const auto migrations(BalancerPolicy::balance(
+ cluster.first, DistributionStatus(kNamespace, cluster.second), false));
+ ASSERT(migrations.empty());
+}
+
+TEST(BalancerPolicy, BalancerRespectsTagsWhenDraining) {
+ // shard1 drains the proper chunk to shard0, even though it is more loaded than shard2
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 6},
+ {ShardStatistics(kShardId1, kNoMaxSize, 5, true, {"a", "b"}, emptyShardVersion), 2},
+ {ShardStatistics(kShardId2, kNoMaxSize, 5, false, {"b"}, emptyShardVersion), 2}});
+
+ DistributionStatus distribution(kNamespace, cluster.second);
+ ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 7), "a")));
+ ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 8), kMaxBSONKey, "b")));
+
+ const auto migrations(BalancerPolicy::balance(cluster.first, distribution, false));
+ ASSERT_EQ(1U, migrations.size());
+ ASSERT_EQ(kShardId1, migrations[0].from);
+ ASSERT_EQ(kShardId0, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), migrations[0].maxKey);
+}
+
+TEST(BalancerPolicy, BalancerRespectsTagPolicyBeforeImbalance) {
+ // There is a large imbalance between shard0 and shard1, but the balancer must first fix the
+ // chunks, which are on a wrong shard due to tag policy
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 2},
+ {ShardStatistics(kShardId1, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 6},
+ {ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 2}});
+
+ DistributionStatus distribution(kNamespace, cluster.second);
+ ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 100), "a")));
+
+ const auto migrations(BalancerPolicy::balance(cluster.first, distribution, false));
+ ASSERT_EQ(1U, migrations.size());
+ ASSERT_EQ(kShardId2, migrations[0].from);
+ ASSERT_EQ(kShardId0, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey);
+}
+
+TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedCluster) {
+ // Chunks are balanced across shards, but there are wrong tags, which need to be fixed
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3},
+ {ShardStatistics(kShardId1, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3},
+ {ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 3}});
+
+ DistributionStatus distribution(kNamespace, cluster.second);
+ ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 10), "a")));
+
+ const auto migrations(BalancerPolicy::balance(cluster.first, distribution, false));
+ ASSERT_EQ(1U, migrations.size());
+ ASSERT_EQ(kShardId2, migrations[0].from);
+ ASSERT_EQ(kShardId0, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey);
+}
+
+TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedClusterParallel) {
+ // Chunks are balanced across shards, but there are wrong tags, which need to be fixed
+ auto cluster = generateCluster(
+ {{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3},
+ {ShardStatistics(kShardId1, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3},
+ {ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 3},
+ {ShardStatistics(kShardId3, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 3}});
+
+ DistributionStatus distribution(kNamespace, cluster.second);
+ ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 20), "a")));
+
+ const auto migrations(BalancerPolicy::balance(cluster.first, distribution, false));
+ ASSERT_EQ(2U, migrations.size());
+
+ ASSERT_EQ(kShardId2, migrations[0].from);
+ ASSERT_EQ(kShardId0, migrations[0].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey);
+
+ ASSERT_EQ(kShardId3, migrations[1].from);
+ ASSERT_EQ(kShardId1, migrations[1].to);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId3][0].getMin(), migrations[1].minKey);
+ ASSERT_BSONOBJ_EQ(cluster.second[kShardId3][0].getMax(), migrations[1].maxKey);
+}
+
+TEST(DistributionStatus, AddTagRangeOverlap) {
+ DistributionStatus d(kNamespace, ShardToChunksMap{});
+
+ // Note that there is gap between 10 and 20 for which there is no tag
+ ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << 1), BSON("x" << 10), "a")));
+ ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << 20), BSON("x" << 30), "b")));
+
+ ASSERT_EQ(ErrorCodes::RangeOverlapConflict,
+ d.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 2), "d")));
+ ASSERT_EQ(ErrorCodes::RangeOverlapConflict,
+ d.addRangeToZone(ZoneRange(BSON("x" << -1), BSON("x" << 5), "d")));
+ ASSERT_EQ(ErrorCodes::RangeOverlapConflict,
+ d.addRangeToZone(ZoneRange(BSON("x" << 5), BSON("x" << 9), "d")));
+ ASSERT_EQ(ErrorCodes::RangeOverlapConflict,
+ d.addRangeToZone(ZoneRange(BSON("x" << 1), BSON("x" << 10), "d")));
+ ASSERT_EQ(ErrorCodes::RangeOverlapConflict,
+ d.addRangeToZone(ZoneRange(BSON("x" << 5), BSON("x" << 25), "d")));
+ ASSERT_EQ(ErrorCodes::RangeOverlapConflict,
+ d.addRangeToZone(ZoneRange(BSON("x" << -1), BSON("x" << 32), "d")));
+ ASSERT_EQ(ErrorCodes::RangeOverlapConflict,
+ d.addRangeToZone(ZoneRange(BSON("x" << 25), kMaxBSONKey, "d")));
+}
+
+TEST(DistributionStatus, ChunkTagsSelectorWithRegularKeys) {
+ DistributionStatus d(kNamespace, ShardToChunksMap{});
+
+ ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << 1), BSON("x" << 10), "a")));
+ ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << 10), BSON("x" << 20), "b")));
+ ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << 20), BSON("x" << 30), "c")));
+
+ {
+ ChunkType chunk;
+ chunk.setMin(kMinBSONKey);
+ chunk.setMax(BSON("x" << 1));
+ ASSERT_EQUALS("", d.getTagForChunk(chunk));
+ }
+
+ {
+ ChunkType chunk;
+ chunk.setMin(BSON("x" << 0));
+ chunk.setMax(BSON("x" << 1));
+ ASSERT_EQUALS("", d.getTagForChunk(chunk));
+ }
+
+ {
+ ChunkType chunk;
+ chunk.setMin(BSON("x" << 1));
+ chunk.setMax(BSON("x" << 5));
+ ASSERT_EQUALS("a", d.getTagForChunk(chunk));
+ }
+
+ {
+ ChunkType chunk;
+ chunk.setMin(BSON("x" << 10));
+ chunk.setMax(BSON("x" << 20));
+ ASSERT_EQUALS("b", d.getTagForChunk(chunk));
+ }
+
+ {
+ ChunkType chunk;
+ chunk.setMin(BSON("x" << 15));
+ chunk.setMax(BSON("x" << 20));
+ ASSERT_EQUALS("b", d.getTagForChunk(chunk));
+ }
+
+ {
+ ChunkType chunk;
+ chunk.setMin(BSON("x" << 25));
+ chunk.setMax(BSON("x" << 30));
+ ASSERT_EQUALS("c", d.getTagForChunk(chunk));
+ }
+
+ {
+ ChunkType chunk;
+ chunk.setMin(BSON("x" << 35));
+ chunk.setMax(BSON("x" << 40));
+ ASSERT_EQUALS("", d.getTagForChunk(chunk));
+ }
+
+ {
+ ChunkType chunk;
+ chunk.setMin(BSON("x" << 40));
+ chunk.setMax(kMaxBSONKey);
+ ASSERT_EQUALS("", d.getTagForChunk(chunk));
+ }
+}
+
+TEST(DistributionStatus, ChunkTagsSelectorWithMinMaxKeys) {
+ DistributionStatus d(kNamespace, ShardToChunksMap{});
+
+ ASSERT_OK(d.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << -100), "a")));
+ ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << -10), BSON("x" << 10), "b")));
+ ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << 100), kMaxBSONKey, "c")));
+
+ {
+ ChunkType chunk;
+ chunk.setMin(kMinBSONKey);
+ chunk.setMax(BSON("x" << -100));
+ ASSERT_EQUALS("a", d.getTagForChunk(chunk));
+ }
+
+ {
+ ChunkType chunk;
+ chunk.setMin(BSON("x" << -100));
+ chunk.setMax(BSON("x" << -11));
+ ASSERT_EQUALS("", d.getTagForChunk(chunk));
+ }
+
+ {
+ ChunkType chunk;
+ chunk.setMin(BSON("x" << -10));
+ chunk.setMax(BSON("x" << 0));
+ ASSERT_EQUALS("b", d.getTagForChunk(chunk));
+ }
+
+ {
+ ChunkType chunk;
+ chunk.setMin(BSON("x" << 0));
+ chunk.setMax(BSON("x" << 10));
+ ASSERT_EQUALS("b", d.getTagForChunk(chunk));
+ }
+
+ {
+ ChunkType chunk;
+ chunk.setMin(BSON("x" << 10));
+ chunk.setMax(BSON("x" << 20));
+ ASSERT_EQUALS("", d.getTagForChunk(chunk));
+ }
+
+ {
+ ChunkType chunk;
+ chunk.setMin(BSON("x" << 200));
+ chunk.setMax(kMaxBSONKey);
+ ASSERT_EQUALS("c", d.getTagForChunk(chunk));
+ }
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/balancer/cluster_statistics.cpp b/src/mongo/s/balancer/cluster_statistics.cpp
new file mode 100644
index 00000000000..d42994379ef
--- /dev/null
+++ b/src/mongo/s/balancer/cluster_statistics.cpp
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/cluster_statistics.h"
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+
+namespace mongo {
+
+ClusterStatistics::ClusterStatistics() = default;
+
+ClusterStatistics::~ClusterStatistics() = default;
+
+ClusterStatistics::ShardStatistics::ShardStatistics() = default;
+
+ClusterStatistics::ShardStatistics::ShardStatistics(ShardId inShardId,
+ uint64_t inMaxSizeMB,
+ uint64_t inCurrSizeMB,
+ bool inIsDraining,
+ std::set<std::string> inShardTags,
+ std::string inMongoVersion)
+ : shardId(std::move(inShardId)),
+ maxSizeMB(std::move(inMaxSizeMB)),
+ currSizeMB(std::move(inCurrSizeMB)),
+ isDraining(std::move(inIsDraining)),
+ shardTags(std::move(inShardTags)),
+ mongoVersion(std::move(inMongoVersion)) {}
+
+bool ClusterStatistics::ShardStatistics::isSizeMaxed() const {
+ if (!maxSizeMB || !currSizeMB) {
+ return false;
+ }
+
+ return currSizeMB >= maxSizeMB;
+}
+
+BSONObj ClusterStatistics::ShardStatistics::toBSON() const {
+ BSONObjBuilder builder;
+ builder.append("id", shardId.toString());
+ builder.append("maxSizeMB", static_cast<long long>(maxSizeMB));
+ builder.append("currSizeMB", static_cast<long long>(currSizeMB));
+ builder.append("draining", isDraining);
+ if (!shardTags.empty()) {
+ BSONArrayBuilder arrayBuilder(builder.subarrayStart("tags"));
+ arrayBuilder.append(shardTags);
+ }
+
+ builder.append("version", mongoVersion);
+ return builder.obj();
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/cluster_statistics.h b/src/mongo/s/balancer/cluster_statistics.h
new file mode 100644
index 00000000000..8963720ee6f
--- /dev/null
+++ b/src/mongo/s/balancer/cluster_statistics.h
@@ -0,0 +1,110 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/s/client/shard.h"
+
+namespace mongo {
+
+class BSONObj;
+class OperationContext;
+template <typename T>
+class StatusWith;
+
+/**
+ * This interface serves as means for obtaining data distribution and shard utilization statistics
+ * for the entire sharded cluster. Implementations may choose whatever means necessary to perform
+ * the statistics collection. There should be one instance of this object per service context.
+ */
+class ClusterStatistics {
+ MONGO_DISALLOW_COPYING(ClusterStatistics);
+
+public:
+ /**
+ * Structure, which describes the statistics of a single shard host.
+ */
+ struct ShardStatistics {
+ public:
+ ShardStatistics();
+ ShardStatistics(ShardId shardId,
+ uint64_t maxSizeMB,
+ uint64_t currSizeMB,
+ bool isDraining,
+ std::set<std::string> shardTags,
+ std::string mongoVersion);
+
+ /**
+ * Returns if a shard cannot receive any new chunks because it has reached the per-shard
+ * data size limit.
+ */
+ bool isSizeMaxed() const;
+
+ /**
+ * Returns BSON representation of this shard's statistics, for reporting purposes.
+ */
+ BSONObj toBSON() const;
+
+ // The id of the shard for which this statistic applies
+ ShardId shardId;
+
+ // The maximum size allowed for the shard
+ uint64_t maxSizeMB{0};
+
+ // The current size of the shard
+ uint64_t currSizeMB{0};
+
+ // Whether the shard is in draining mode
+ bool isDraining{false};
+
+ // Set of tags for the shard
+ std::set<std::string> shardTags;
+
+ // Version of mongod, which runs on this shard's primary
+ std::string mongoVersion;
+ };
+
+ virtual ~ClusterStatistics();
+
+ /**
+ * Retrieves a snapshot of the current shard utilization state. The implementation of this
+ * method may block if necessary in order to refresh its state or may return a cached value.
+ */
+ virtual StatusWith<std::vector<ShardStatistics>> getStats(OperationContext* txn) = 0;
+
+protected:
+ ClusterStatistics();
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/cluster_statistics_impl.cpp b/src/mongo/s/balancer/cluster_statistics_impl.cpp
new file mode 100644
index 00000000000..b6e734c6fc2
--- /dev/null
+++ b/src/mongo/s/balancer/cluster_statistics_impl.cpp
@@ -0,0 +1,160 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/cluster_statistics_impl.h"
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/read_preference.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/shard_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+using std::string;
+using std::vector;
+
+namespace {
+
+const char kVersionField[] = "version";
+
+/**
+ * Executes the serverStatus command against the specified shard and obtains the version of the
+ * running MongoD service.
+ *
+ * Returns the MongoD version in strig format or an error. Known error codes are:
+ * ShardNotFound if shard by that id is not available on the registry
+ * NoSuchKey if the version could not be retrieved
+ */
+StatusWith<string> retrieveShardMongoDVersion(OperationContext* txn, ShardId shardId) {
+ auto shardRegistry = Grid::get(txn)->shardRegistry();
+ auto shardStatus = shardRegistry->getShard(txn, shardId);
+ if (!shardStatus.isOK()) {
+ return shardStatus.getStatus();
+ }
+ auto shard = shardStatus.getValue();
+
+ auto commandResponse =
+ shard->runCommandWithFixedRetryAttempts(txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ BSON("serverStatus" << 1),
+ Shard::RetryPolicy::kIdempotent);
+ if (!commandResponse.isOK()) {
+ return commandResponse.getStatus();
+ }
+ if (!commandResponse.getValue().commandStatus.isOK()) {
+ return commandResponse.getValue().commandStatus;
+ }
+
+ BSONObj serverStatus = std::move(commandResponse.getValue().response);
+
+ string version;
+ Status status = bsonExtractStringField(serverStatus, kVersionField, &version);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return version;
+}
+
+} // namespace
+
+using ShardStatistics = ClusterStatistics::ShardStatistics;
+
+ClusterStatisticsImpl::ClusterStatisticsImpl() = default;
+
+ClusterStatisticsImpl::~ClusterStatisticsImpl() = default;
+
+StatusWith<vector<ShardStatistics>> ClusterStatisticsImpl::getStats(OperationContext* txn) {
+ // Get a list of all the shards that are participating in this balance round along with any
+ // maximum allowed quotas and current utilization. We get the latter by issuing
+ // db.serverStatus() (mem.mapped) to all shards.
+ //
+ // TODO: skip unresponsive shards and mark information as stale.
+ auto shardsStatus = Grid::get(txn)->catalogClient(txn)->getAllShards(
+ txn, repl::ReadConcernLevel::kMajorityReadConcern);
+ if (!shardsStatus.isOK()) {
+ return shardsStatus.getStatus();
+ }
+
+ const vector<ShardType> shards(std::move(shardsStatus.getValue().value));
+
+ vector<ShardStatistics> stats;
+
+ for (const auto& shard : shards) {
+ auto shardSizeStatus = shardutil::retrieveTotalShardSize(txn, shard.getName());
+ if (!shardSizeStatus.isOK()) {
+ const Status& status = shardSizeStatus.getStatus();
+
+ return {status.code(),
+ str::stream() << "Unable to obtain shard utilization information for "
+ << shard.getName()
+ << " due to "
+ << status.reason()};
+ }
+
+ string mongoDVersion;
+
+ auto mongoDVersionStatus = retrieveShardMongoDVersion(txn, shard.getName());
+ if (mongoDVersionStatus.isOK()) {
+ mongoDVersion = std::move(mongoDVersionStatus.getValue());
+ } else {
+ // Since the mongod version is only used for reporting, there is no need to fail the
+ // entire round if it cannot be retrieved, so just leave it empty
+ log() << "Unable to obtain shard version for " << shard.getName()
+ << causedBy(mongoDVersionStatus.getStatus());
+ }
+
+ std::set<string> shardTags;
+
+ for (const auto& shardTag : shard.getTags()) {
+ shardTags.insert(shardTag);
+ }
+
+ stats.emplace_back(shard.getName(),
+ shard.getMaxSizeMB(),
+ shardSizeStatus.getValue() / 1024 / 1024,
+ shard.getDraining(),
+ std::move(shardTags),
+ std::move(mongoDVersion));
+ }
+
+ return stats;
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/cluster_statistics_impl.h b/src/mongo/s/balancer/cluster_statistics_impl.h
new file mode 100644
index 00000000000..493c792a713
--- /dev/null
+++ b/src/mongo/s/balancer/cluster_statistics_impl.h
@@ -0,0 +1,48 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/balancer/cluster_statistics.h"
+
+namespace mongo {
+
+/**
+ * Default implementation for the cluster statistics gathering utility. Uses a blocking method to
+ * fetch the statistics and does not perform any caching. If any of the shards fails to report
+ * statistics fails the entire refresh.
+ */
+class ClusterStatisticsImpl final : public ClusterStatistics {
+public:
+ ClusterStatisticsImpl();
+ ~ClusterStatisticsImpl();
+
+ StatusWith<std::vector<ShardStatistics>> getStats(OperationContext* txn) override;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/cluster_statistics_test.cpp b/src/mongo/s/balancer/cluster_statistics_test.cpp
new file mode 100644
index 00000000000..c9115b6dafd
--- /dev/null
+++ b/src/mongo/s/balancer/cluster_statistics_test.cpp
@@ -0,0 +1,51 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/cluster_statistics.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using ShardStatistics = ClusterStatistics::ShardStatistics;
+
+const auto emptyTagSet = std::set<std::string>();
+
+TEST(ShardStatistics, SizeMaxedTest) {
+ ASSERT(
+ !ShardStatistics(ShardId("TestShardId"), 0, 0, false, emptyTagSet, "3.2.0").isSizeMaxed());
+ ASSERT(!ShardStatistics(ShardId("TestShardId"), 100LL, 80LL, false, emptyTagSet, "3.2.0")
+ .isSizeMaxed());
+ ASSERT(ShardStatistics(ShardId("TestShardId"), 100LL, 110LL, false, emptyTagSet, "3.2.0")
+ .isSizeMaxed());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp
new file mode 100644
index 00000000000..c5af288fb2f
--- /dev/null
+++ b/src/mongo/s/balancer/migration_manager.cpp
@@ -0,0 +1,791 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/migration_manager.h"
+
+#include <memory>
+
+#include "mongo/bson/simple_bsonobj_comparator.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/remote_command_targeter.h"
+#include "mongo/db/client.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/balancer/scoped_migration_request.h"
+#include "mongo/s/balancer/type_migration.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/move_chunk_request.h"
+#include "mongo/s/sharding_raii.h"
+#include "mongo/util/log.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
+using std::shared_ptr;
+using std::vector;
+using str::stream;
+
+namespace {
+
+const char kChunkTooBig[] = "chunkTooBig";
+const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ Seconds(15));
+
+/**
+ * Parses the specified asynchronous command response and converts it to status to use as outcome of
+ * an asynchronous migration command. It is necessary for two reasons:
+ * - Preserve backwards compatibility with 3.2 and earlier, where the move chunk command instead of
+ * returning a ChunkTooBig status includes an extra field in the response.
+ * - Convert CallbackCanceled errors into BalancerInterrupted for the cases where the migration
+ * manager is being stopped at replica set stepdown. This return code allows the mongos calling
+ * logic to retry the operation on a new primary.
+ */
+Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandResponse& response,
+ bool isStopping) {
+ if (!response.isOK()) {
+ if (response.status == ErrorCodes::CallbackCanceled && isStopping) {
+ return {ErrorCodes::BalancerInterrupted,
+ "Migration interrupted because the balancer is stopping"};
+ }
+
+ return response.status;
+ }
+
+ Status commandStatus = getStatusFromCommandResult(response.data);
+
+ if (!commandStatus.isOK()) {
+ bool chunkTooBig = false;
+ bsonExtractBooleanFieldWithDefault(response.data, kChunkTooBig, false, &chunkTooBig);
+ if (chunkTooBig) {
+ commandStatus = {ErrorCodes::ChunkTooBig, commandStatus.reason()};
+ }
+ }
+
+ return commandStatus;
+}
+
+/**
+ * Blocking call to acquire the distributed collection lock for the specified namespace.
+ */
+StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn,
+ const OID& lockSessionID,
+ const NamespaceString& nss) {
+ const std::string whyMessage(stream() << "Migrating chunk(s) in collection " << nss.ns());
+
+ auto statusWithDistLockHandle =
+ Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lockWithSessionID(
+ txn, nss.ns(), whyMessage, lockSessionID, DistLockManager::kSingleLockAttemptTimeout);
+
+ if (!statusWithDistLockHandle.isOK()) {
+ // If we get LockBusy while trying to acquire the collection distributed lock, this implies
+ // that a concurrent collection operation is running either on a 3.2 shard or on mongos.
+ // Convert it to ConflictingOperationInProgress to better indicate the error.
+ //
+ // In addition, the code which re-schedules parallel migrations serially for 3.2 shard
+ // compatibility uses the LockBusy code as a hint to do the reschedule.
+ const ErrorCodes::Error code = (statusWithDistLockHandle == ErrorCodes::LockBusy
+ ? ErrorCodes::ConflictingOperationInProgress
+ : statusWithDistLockHandle.getStatus().code());
+
+ return {code,
+ stream() << "Could not acquire collection lock for " << nss.ns()
+ << " to migrate chunks, due to "
+ << statusWithDistLockHandle.getStatus().reason()};
+ }
+
+ return std::move(statusWithDistLockHandle.getValue());
+}
+
+/**
+ * Returns whether the specified status is an error caused by stepdown of the primary config node
+ * currently running the balancer.
+ */
+bool isErrorDueToBalancerStepdown(Status status) {
+ return (status == ErrorCodes::BalancerInterrupted ||
+ status == ErrorCodes::InterruptedAtShutdown ||
+ status == ErrorCodes::InterruptedDueToReplStateChange ||
+ ErrorCodes::isShutdownError(status.code()));
+}
+
+} // namespace
+
+MigrationManager::MigrationManager(ServiceContext* serviceContext)
+ : _serviceContext(serviceContext), _lockSessionID(OID::gen()) {}
+
+MigrationManager::~MigrationManager() {
+ // The migration manager must be completely quiesced at destruction time
+ invariant(_activeMigrationsWithoutDistLock.empty());
+}
+
+MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
+ OperationContext* txn,
+ const vector<MigrateInfo>& migrateInfos,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+
+ MigrationStatuses migrationStatuses;
+
+ vector<MigrateInfo> rescheduledMigrations;
+
+ {
+ std::map<MigrationIdentifier, ScopedMigrationRequest> scopedMigrationRequests;
+ vector<std::pair<shared_ptr<Notification<Status>>, MigrateInfo>> responses;
+
+ for (const auto& migrateInfo : migrateInfos) {
+ // Write a document to the config.migrations collection, in case this migration must be
+ // recovered by the Balancer. Fail if the chunk is already moving.
+ auto statusWithScopedMigrationRequest =
+ ScopedMigrationRequest::writeMigration(txn, migrateInfo);
+ if (!statusWithScopedMigrationRequest.isOK()) {
+ migrationStatuses.emplace(migrateInfo.getName(),
+ std::move(statusWithScopedMigrationRequest.getStatus()));
+ continue;
+ }
+ scopedMigrationRequests.emplace(migrateInfo.getName(),
+ std::move(statusWithScopedMigrationRequest.getValue()));
+
+ responses.emplace_back(_schedule(txn,
+ migrateInfo,
+ false, // Config server takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete),
+ migrateInfo);
+ }
+
+ // Wait for all the scheduled migrations to complete and note the ones, which failed with a
+ // LockBusy error code. These need to be executed serially, without the distributed lock
+ // being held by the config server for backwards compatibility with 3.2 shards.
+ for (auto& response : responses) {
+ auto notification = std::move(response.first);
+ auto migrateInfo = std::move(response.second);
+
+ Status responseStatus = notification->get();
+
+ if (responseStatus == ErrorCodes::LockBusy) {
+ rescheduledMigrations.emplace_back(std::move(migrateInfo));
+ } else {
+ if (isErrorDueToBalancerStepdown(responseStatus)) {
+ auto it = scopedMigrationRequests.find(migrateInfo.getName());
+ invariant(it != scopedMigrationRequests.end());
+ it->second.keepDocumentOnDestruct();
+ }
+
+ migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus));
+ }
+ }
+ }
+
+ // Schedule all 3.2 compatibility migrations sequentially
+ for (const auto& migrateInfo : rescheduledMigrations) {
+ // Write a document to the config.migrations collection, in case this migration must be
+ // recovered by the Balancer. Fail if the chunk is already moving.
+ auto statusWithScopedMigrationRequest =
+ ScopedMigrationRequest::writeMigration(txn, migrateInfo);
+ if (!statusWithScopedMigrationRequest.isOK()) {
+ migrationStatuses.emplace(migrateInfo.getName(),
+ std::move(statusWithScopedMigrationRequest.getStatus()));
+ continue;
+ }
+
+ Status responseStatus = _schedule(txn,
+ migrateInfo,
+ true, // Shard takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete)
+ ->get();
+
+ if (isErrorDueToBalancerStepdown(responseStatus)) {
+ statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct();
+ }
+
+ migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus));
+ }
+
+ invariant(migrationStatuses.size() == migrateInfos.size());
+
+ return migrationStatuses;
+}
+
+Status MigrationManager::executeManualMigration(
+ OperationContext* txn,
+ const MigrateInfo& migrateInfo,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+ _waitForRecovery();
+
+ // Write a document to the config.migrations collection, in case this migration must be
+ // recovered by the Balancer. Fail if the chunk is already moving.
+ auto statusWithScopedMigrationRequest =
+ ScopedMigrationRequest::writeMigration(txn, migrateInfo);
+ if (!statusWithScopedMigrationRequest.isOK()) {
+ return statusWithScopedMigrationRequest.getStatus();
+ }
+
+ Status status = _schedule(txn,
+ migrateInfo,
+ false, // Config server takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete)
+ ->get();
+
+ auto scopedCMStatus = ScopedChunkManager::getExisting(txn, NamespaceString(migrateInfo.ns));
+ if (!scopedCMStatus.isOK()) {
+ return scopedCMStatus.getStatus();
+ }
+
+ auto scopedCM = std::move(scopedCMStatus.getValue());
+ ChunkManager* const cm = scopedCM.cm();
+
+ auto chunk = cm->findIntersectingChunkWithSimpleCollation(txn, migrateInfo.minKey);
+ invariant(chunk);
+
+ // The order of the checks below is important due to the need for interrupted migration calls to
+ // be able to join any possibly completed migrations, which are still running in the
+ // waitForDelete step.
+ if (isErrorDueToBalancerStepdown(status)) {
+ statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct();
+
+ // We want the mongos to get a retriable error, and not make its replica set monitor
+ // interpret something like InterruptedDueToReplStateChange as the config server when the
+ // error comes from the shard.
+ return {ErrorCodes::BalancerInterrupted, status.reason()};
+ } else if (chunk->getShardId() == migrateInfo.to) {
+ // Regardless of the status, if the chunk's current shard matches the destination, deem the
+ // move as success.
+ return Status::OK();
+ }
+
+ return status;
+}
+
+void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_state == State::kStopped);
+ invariant(_migrationRecoveryMap.empty());
+ _state = State::kRecovering;
+ }
+
+ auto scopedGuard = MakeGuard([&] {
+ _migrationRecoveryMap.clear();
+ _abandonActiveMigrationsAndEnableManager(txn);
+ });
+
+ // Load the active migrations from the config.migrations collection.
+ auto statusWithMigrationsQueryResponse =
+ Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(MigrationType::ConfigNS),
+ BSONObj(),
+ BSONObj(),
+ boost::none);
+
+ if (!statusWithMigrationsQueryResponse.isOK()) {
+ warning() << "Unable to read config.migrations collection documents for balancer migration"
+ << " recovery. Abandoning balancer recovery."
+ << causedBy(redact(statusWithMigrationsQueryResponse.getStatus()));
+ return;
+ }
+
+ for (const BSONObj& migration : statusWithMigrationsQueryResponse.getValue().docs) {
+ auto statusWithMigrationType = MigrationType::fromBSON(migration);
+ if (!statusWithMigrationType.isOK()) {
+ // The format of this migration document is incorrect. The balancer holds a distlock for
+ // this migration, but without parsing the migration document we cannot identify which
+ // distlock must be released. So we must release all distlocks.
+ warning() << "Unable to parse config.migrations document '"
+ << redact(migration.toString())
+ << "' for balancer migration recovery. Abandoning balancer recovery."
+ << causedBy(redact(statusWithMigrationType.getStatus()));
+ return;
+ }
+ MigrateInfo migrateInfo = statusWithMigrationType.getValue().toMigrateInfo();
+
+ auto it = _migrationRecoveryMap.find(NamespaceString(migrateInfo.ns));
+ if (it == _migrationRecoveryMap.end()) {
+ std::list<MigrateInfo> list;
+ it = _migrationRecoveryMap.insert(std::make_pair(NamespaceString(migrateInfo.ns), list))
+ .first;
+
+ // Reacquire the matching distributed lock for this namespace.
+ const std::string whyMessage(stream() << "Migrating chunk(s) in collection "
+ << redact(migrateInfo.ns));
+ auto statusWithDistLockHandle =
+ Grid::get(txn)
+ ->catalogClient(txn)
+ ->getDistLockManager()
+ ->tryLockWithLocalWriteConcern(txn, migrateInfo.ns, whyMessage, _lockSessionID);
+ if (!statusWithDistLockHandle.isOK() &&
+ statusWithDistLockHandle.getStatus() != ErrorCodes::LockBusy) {
+ // LockBusy is alright because that should mean a 3.2 shard has it for the active
+ // migration.
+ warning() << "Failed to acquire distributed lock for collection '"
+ << redact(migrateInfo.ns)
+ << "' during balancer recovery of an active migration. Abandoning"
+ << " balancer recovery."
+ << causedBy(redact(statusWithDistLockHandle.getStatus()));
+ return;
+ }
+ }
+
+ it->second.push_back(std::move(migrateInfo));
+ }
+
+ scopedGuard.Dismiss();
+}
+
+void MigrationManager::finishRecovery(OperationContext* txn,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (_state == State::kStopping) {
+ _migrationRecoveryMap.clear();
+ return;
+ }
+
+ // If recovery was abandoned in startRecovery, then there is no more to do.
+ if (_state == State::kEnabled) {
+ invariant(_migrationRecoveryMap.empty());
+ return;
+ }
+
+ invariant(_state == State::kRecovering);
+ }
+
+ auto scopedGuard = MakeGuard([&] {
+ _migrationRecoveryMap.clear();
+ _abandonActiveMigrationsAndEnableManager(txn);
+ });
+
+ // Schedule recovered migrations.
+ vector<ScopedMigrationRequest> scopedMigrationRequests;
+ vector<shared_ptr<Notification<Status>>> responses;
+
+ for (auto& nssAndMigrateInfos : _migrationRecoveryMap) {
+ auto& nss = nssAndMigrateInfos.first;
+ auto& migrateInfos = nssAndMigrateInfos.second;
+ invariant(!migrateInfos.empty());
+
+ auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss);
+ if (!scopedCMStatus.isOK()) {
+ // This shouldn't happen because the collection was intact and sharded when the previous
+ // config primary was active and the dist locks have been held by the balancer
+ // throughout. Abort migration recovery.
+ warning() << "Unable to reload chunk metadata for collection '" << nss
+ << "' during balancer recovery. Abandoning recovery."
+ << causedBy(redact(scopedCMStatus.getStatus()));
+ return;
+ }
+
+ auto scopedCM = std::move(scopedCMStatus.getValue());
+ ChunkManager* const cm = scopedCM.cm();
+
+ int scheduledMigrations = 0;
+
+ while (!migrateInfos.empty()) {
+ const auto migrationInfo = std::move(migrateInfos.front());
+ migrateInfos.pop_front();
+
+ auto chunk = cm->findIntersectingChunkWithSimpleCollation(txn, migrationInfo.minKey);
+ invariant(chunk);
+
+ if (chunk->getShardId() != migrationInfo.from) {
+ // Chunk is no longer on the source shard specified by this migration. Erase the
+ // migration recovery document associated with it.
+ ScopedMigrationRequest::createForRecovery(txn, nss, migrationInfo.minKey);
+ continue;
+ }
+
+ scopedMigrationRequests.emplace_back(
+ ScopedMigrationRequest::createForRecovery(txn, nss, migrationInfo.minKey));
+
+ scheduledMigrations++;
+
+ responses.emplace_back(_schedule(txn,
+ migrationInfo,
+ false, // Config server takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete));
+ }
+
+ // If no migrations were scheduled for this namespace, free the dist lock
+ if (!scheduledMigrations) {
+ Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock(
+ txn, _lockSessionID, nss.ns());
+ }
+ }
+
+ _migrationRecoveryMap.clear();
+ scopedGuard.Dismiss();
+
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (_state == State::kRecovering) {
+ _state = State::kEnabled;
+ _condVar.notify_all();
+ }
+ }
+
+ // Wait for each migration to finish, as usual.
+ for (auto& response : responses) {
+ response->get();
+ }
+}
+
+void MigrationManager::interruptAndDisableMigrations() {
+ executor::TaskExecutor* const executor =
+ Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_state == State::kEnabled || _state == State::kRecovering);
+ _state = State::kStopping;
+
+ // Interrupt any active migrations with dist lock
+ for (auto& cmsEntry : _activeMigrationsWithDistLock) {
+ auto* cms = &cmsEntry.second;
+
+ for (auto& migration : cms->migrations) {
+ if (migration.callbackHandle) {
+ executor->cancel(*migration.callbackHandle);
+ }
+ }
+ }
+
+ // Interrupt any active migrations without dist lock
+ for (auto& migration : _activeMigrationsWithoutDistLock) {
+ if (migration.callbackHandle) {
+ executor->cancel(*migration.callbackHandle);
+ }
+ }
+
+ _checkDrained_inlock();
+}
+
+void MigrationManager::drainActiveMigrations() {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+
+ if (_state == State::kStopped)
+ return;
+ invariant(_state == State::kStopping);
+
+ _condVar.wait(lock, [this] {
+ return _activeMigrationsWithDistLock.empty() && _activeMigrationsWithoutDistLock.empty();
+ });
+
+ _state = State::kStopped;
+}
+
+shared_ptr<Notification<Status>> MigrationManager::_schedule(
+ OperationContext* txn,
+ const MigrateInfo& migrateInfo,
+ bool shardTakesCollectionDistLock,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+ const NamespaceString nss(migrateInfo.ns);
+
+ // Ensure we are not stopped in order to avoid doing the extra work
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (_state != State::kEnabled && _state != State::kRecovering) {
+ return std::make_shared<Notification<Status>>(
+ Status(ErrorCodes::BalancerInterrupted,
+ "Migration cannot be executed because the balancer is not running"));
+ }
+ }
+
+
+ // Sanity checks that the chunk being migrated is actually valid. These will be repeated at the
+ // shard as well, but doing them here saves an extra network call, which might otherwise fail.
+ auto statusWithScopedChunkManager = ScopedChunkManager::getExisting(txn, nss);
+ if (!statusWithScopedChunkManager.isOK()) {
+ return std::make_shared<Notification<Status>>(
+ std::move(statusWithScopedChunkManager.getStatus()));
+ }
+
+ ChunkManager* const chunkManager = statusWithScopedChunkManager.getValue().cm();
+
+ auto chunk = chunkManager->findIntersectingChunkWithSimpleCollation(txn, migrateInfo.minKey);
+ invariant(chunk);
+
+ // If the chunk is not found exactly as requested, the caller must have stale data
+ if (SimpleBSONObjComparator::kInstance.evaluate(chunk->getMin() != migrateInfo.minKey) ||
+ SimpleBSONObjComparator::kInstance.evaluate(chunk->getMax() != migrateInfo.maxKey)) {
+ return std::make_shared<Notification<Status>>(Status(
+ ErrorCodes::IncompatibleShardingMetadata,
+ stream() << "Chunk " << ChunkRange(migrateInfo.minKey, migrateInfo.maxKey).toString()
+ << " does not exist."));
+ }
+
+ const auto fromShardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from);
+ if (!fromShardStatus.isOK()) {
+ return std::make_shared<Notification<Status>>(std::move(fromShardStatus.getStatus()));
+ }
+
+ const auto fromShard = fromShardStatus.getValue();
+ auto fromHostStatus =
+ fromShard->getTargeter()->findHost(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly});
+ if (!fromHostStatus.isOK()) {
+ return std::make_shared<Notification<Status>>(std::move(fromHostStatus.getStatus()));
+ }
+
+ BSONObjBuilder builder;
+ MoveChunkRequest::appendAsCommand(
+ &builder,
+ nss,
+ chunkManager->getVersion(),
+ Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(),
+ migrateInfo.from,
+ migrateInfo.to,
+ ChunkRange(migrateInfo.minKey, migrateInfo.maxKey),
+ chunk->getLastmod(),
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete,
+ shardTakesCollectionDistLock);
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ if (_state != State::kEnabled && _state != State::kRecovering) {
+ return std::make_shared<Notification<Status>>(
+ Status(ErrorCodes::BalancerInterrupted,
+ "Migration cannot be executed because the balancer is not running"));
+ }
+
+ Migration migration(nss, builder.obj());
+
+ auto retVal = migration.completionNotification;
+
+ if (shardTakesCollectionDistLock) {
+ _scheduleWithoutDistLock_inlock(txn, fromHostStatus.getValue(), std::move(migration));
+ } else {
+ _scheduleWithDistLock_inlock(txn, fromHostStatus.getValue(), std::move(migration));
+ }
+
+ return retVal;
+}
+
+void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn,
+ const HostAndPort& targetHost,
+ Migration migration) {
+ executor::TaskExecutor* const executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
+
+ const NamespaceString nss(migration.nss);
+
+ auto it = _activeMigrationsWithDistLock.find(nss);
+ if (it == _activeMigrationsWithDistLock.end()) {
+ // Acquire the collection distributed lock (blocking call)
+ auto distLockHandleStatus = acquireDistLock(txn, _lockSessionID, nss);
+ if (!distLockHandleStatus.isOK()) {
+ migration.completionNotification->set(distLockHandleStatus.getStatus());
+ return;
+ }
+
+ it = _activeMigrationsWithDistLock
+ .insert(std::make_pair(
+ nss, CollectionMigrationsState(std::move(distLockHandleStatus.getValue()))))
+ .first;
+ }
+
+ auto collectionMigrationState = &it->second;
+
+ // Add ourselves to the list of migrations on this collection
+ collectionMigrationState->migrations.push_front(std::move(migration));
+ auto itMigration = collectionMigrationState->migrations.begin();
+
+ const RemoteCommandRequest remoteRequest(
+ targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, txn);
+
+ StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus =
+ executor->scheduleRemoteCommand(
+ remoteRequest,
+ [this, collectionMigrationState, itMigration](
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
+ Client::initThread(getThreadName().c_str());
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ auto txn = cc().makeOperationContext();
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _completeWithDistLock_inlock(
+ txn.get(),
+ itMigration,
+ extractMigrationStatusFromRemoteCommandResponse(
+ args.response, _state != State::kEnabled && _state != State::kRecovering));
+ });
+
+ if (callbackHandleWithStatus.isOK()) {
+ itMigration->callbackHandle = std::move(callbackHandleWithStatus.getValue());
+ return;
+ }
+
+ _completeWithDistLock_inlock(txn, itMigration, std::move(callbackHandleWithStatus.getStatus()));
+}
+
+void MigrationManager::_completeWithDistLock_inlock(OperationContext* txn,
+ MigrationsList::iterator itMigration,
+ Status status) {
+ const NamespaceString nss(itMigration->nss);
+
+ // Make sure to signal the notification last, after the distributed lock is freed, so that we
+ // don't have the race condition where a subsequently scheduled migration finds the dist lock
+ // still acquired.
+ auto notificationToSignal = itMigration->completionNotification;
+
+ auto it = _activeMigrationsWithDistLock.find(nss);
+ invariant(it != _activeMigrationsWithDistLock.end());
+
+ auto collectionMigrationState = &it->second;
+ collectionMigrationState->migrations.erase(itMigration);
+
+ if (collectionMigrationState->migrations.empty()) {
+ Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock(
+ txn, collectionMigrationState->distLockHandle, nss.ns());
+ _activeMigrationsWithDistLock.erase(it);
+ _checkDrained_inlock();
+ }
+
+ notificationToSignal->set(status);
+}
+
+void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn,
+ const HostAndPort& targetHost,
+ Migration migration) {
+ executor::TaskExecutor* const executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
+
+ _activeMigrationsWithoutDistLock.push_front(std::move(migration));
+ auto itMigration = _activeMigrationsWithoutDistLock.begin();
+
+ const RemoteCommandRequest remoteRequest(
+ targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, txn);
+
+ StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus =
+ executor->scheduleRemoteCommand(
+ remoteRequest,
+ [this, itMigration](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
+ auto notificationToSignal = itMigration->completionNotification;
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ _activeMigrationsWithoutDistLock.erase(itMigration);
+ _checkDrained_inlock();
+
+ notificationToSignal->set(extractMigrationStatusFromRemoteCommandResponse(
+ args.response, _state != State::kEnabled && _state != State::kRecovering));
+ });
+
+ if (callbackHandleWithStatus.isOK()) {
+ itMigration->callbackHandle = std::move(callbackHandleWithStatus.getValue());
+ return;
+ }
+
+ auto notificationToSignal = itMigration->completionNotification;
+
+ _activeMigrationsWithoutDistLock.erase(itMigration);
+ _checkDrained_inlock();
+
+ notificationToSignal->set(std::move(callbackHandleWithStatus.getStatus()));
+}
+
+void MigrationManager::_checkDrained_inlock() {
+ if (_state == State::kEnabled || _state == State::kRecovering) {
+ return;
+ }
+ invariant(_state == State::kStopping);
+
+ if (_activeMigrationsWithDistLock.empty() && _activeMigrationsWithoutDistLock.empty()) {
+ _condVar.notify_all();
+ }
+}
+
+void MigrationManager::_waitForRecovery() {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _condVar.wait(lock, [this] { return _state != State::kRecovering; });
+}
+
+void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* txn) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ if (_state == State::kStopping) {
+ // The balancer was interrupted. Let the next balancer recover the state.
+ return;
+ }
+ invariant(_state == State::kRecovering);
+
+ auto catalogClient = Grid::get(txn)->catalogClient(txn);
+
+ // Unlock all balancer distlocks we aren't using anymore.
+ auto distLockManager = catalogClient->getDistLockManager();
+ distLockManager->unlockAll(txn, distLockManager->getProcessID());
+
+ // Clear the config.migrations collection so that those chunks can be scheduled for migration
+ // again.
+ catalogClient->removeConfigDocuments(
+ txn, MigrationType::ConfigNS, BSONObj(), kMajorityWriteConcern);
+
+ _state = State::kEnabled;
+ _condVar.notify_all();
+}
+
+MigrationManager::Migration::Migration(NamespaceString inNss, BSONObj inMoveChunkCmdObj)
+ : nss(std::move(inNss)),
+ moveChunkCmdObj(std::move(inMoveChunkCmdObj)),
+ completionNotification(std::make_shared<Notification<Status>>()) {}
+
+MigrationManager::Migration::~Migration() {
+ invariant(completionNotification);
+}
+
+MigrationManager::CollectionMigrationsState::CollectionMigrationsState(
+ DistLockHandle inDistLockHandle)
+ : distLockHandle(std::move(inDistLockHandle)) {}
+
+MigrationManager::CollectionMigrationsState::~CollectionMigrationsState() {
+ invariant(migrations.empty());
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/migration_manager.h b/src/mongo/s/balancer/migration_manager.h
new file mode 100644
index 00000000000..20585df7c23
--- /dev/null
+++ b/src/mongo/s/balancer/migration_manager.h
@@ -0,0 +1,294 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <list>
+#include <map>
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/balancer/balancer_policy.h"
+#include "mongo/s/catalog/dist_lock_manager.h"
+#include "mongo/s/migration_secondary_throttle_options.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/unordered_map.h"
+#include "mongo/util/concurrency/notification.h"
+
+namespace mongo {
+
+class OperationContext;
+class ServiceContext;
+class Status;
+template <typename T>
+class StatusWith;
+
+// Uniquely identifies a migration, regardless of shard and version.
+typedef std::string MigrationIdentifier;
+typedef std::map<MigrationIdentifier, Status> MigrationStatuses;
+
+/**
+ * Manages and executes parallel migrations for the balancer.
+ *
+ * TODO: for v3.6, remove code making compatible with v3.2 shards that take distlock.
+ */
+class MigrationManager {
+ MONGO_DISALLOW_COPYING(MigrationManager);
+
+public:
+ MigrationManager(ServiceContext* serviceContext);
+ ~MigrationManager();
+
+ /**
+ * A blocking method that attempts to schedule all the migrations specified in
+ * "candidateMigrations" and wait for them to complete. Takes the distributed lock for each
+ * collection with a chunk being migrated.
+ *
+ * If any of the migrations, which were scheduled in parallel fails with a LockBusy error
+ * reported from the shard, retries it serially without the distributed lock.
+ *
+ * Returns a map of migration Status objects to indicate the success/failure of each migration.
+ */
+ MigrationStatuses executeMigrationsForAutoBalance(
+ OperationContext* txn,
+ const std::vector<MigrateInfo>& migrateInfos,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete);
+
+ /**
+ * A blocking method that attempts to schedule the migration specified in "migrateInfo" and
+ * waits for it to complete. Takes the distributed lock for the namespace which is being
+ * migrated.
+ *
+ * Returns the status of the migration.
+ */
+ Status executeManualMigration(OperationContext* txn,
+ const MigrateInfo& migrateInfo,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete);
+
+ /**
+ * Non-blocking method that puts the migration manager in the kRecovering state, in which
+ * new migration requests will block until finishRecovery is called. Then does local writes to
+ * reacquire the distributed locks for active migrations.
+ *
+ * The active migration recovery may fail and be abandoned, setting the state to kEnabled.
+ */
+ void startRecoveryAndAcquireDistLocks(OperationContext* txn);
+
+ /**
+ * Blocking method that must only be called after startRecovery has been called. Recovers the
+ * state of the migration manager (if necessary and able) and puts it in the kEnabled state,
+ * where it will accept new migrations. Any migrations waiting on the recovery state will be
+ * unblocked once the state is kEnabled, and then this function waits for the recovered active
+ * migrations to finish before returning.
+ *
+ * The active migration recovery may fail and be abandoned, setting the state to kEnabled and
+ * unblocking any process waiting on the recovery state.
+ */
+ void finishRecovery(OperationContext* txn,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete);
+
+ /**
+ * Non-blocking method that should never be called concurrently with finishRecovery. Puts the
+ * manager in a state where all subsequently scheduled migrations will immediately fail (without
+ * ever getting scheduled) and all active ones will be cancelled. It has no effect if the
+ * migration manager is already stopping or stopped.
+ */
+ void interruptAndDisableMigrations();
+
+ /**
+ * Blocking method that waits for any currently scheduled migrations to complete. Must be
+ * called after interruptAndDisableMigrations has been called in order to be able to re-enable
+ * migrations again.
+ */
+ void drainActiveMigrations();
+
+private:
+ // The current state of the migration manager
+ enum class State { // Allowed transitions:
+ kStopped, // kRecovering
+ kRecovering, // kEnabled, kStopping
+ kEnabled, // kStopping
+ kStopping, // kStopped
+ };
+
+ /**
+ * Tracks the execution state of a single migration.
+ */
+ struct Migration {
+ Migration(NamespaceString nss, BSONObj moveChunkCmdObj);
+ ~Migration();
+
+ // Namespace for which this migration applies
+ NamespaceString nss;
+
+ // Command object representing the migration
+ BSONObj moveChunkCmdObj;
+
+ // Callback handle for the migration network request. If the migration has not yet been sent
+ // on the network, this value is not set.
+ boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle;
+
+ // Notification, which will be signaled when the migration completes
+ std::shared_ptr<Notification<Status>> completionNotification;
+ };
+
+ // Used as a type in which to store a list of active migrations. The reason to choose list is
+ // that its iterators do not get invalidated when entries are removed around them. This allows
+ // O(1) removal time.
+ using MigrationsList = std::list<Migration>;
+
+ /**
+ * Contains the runtime state for a single collection. This class does not have concurrency
+ * control of its own and relies on the migration manager's mutex.
+ */
+ struct CollectionMigrationsState {
+ CollectionMigrationsState(DistLockHandle distLockHandle);
+ ~CollectionMigrationsState();
+
+ // Dist lock handle, which must be released at destruction time.
+ const DistLockHandle distLockHandle;
+
+ // Contains a set of migrations which are currently active for this namespace.
+ MigrationsList migrations;
+ };
+
+ using CollectionMigrationsStateMap =
+ stdx::unordered_map<NamespaceString, CollectionMigrationsState>;
+
+ /**
+ * Optionally takes the collection distributed lock and schedules a chunk migration with the
+ * specified parameters. May block for distributed lock acquisition. If dist lock acquisition is
+ * successful (or not done), schedules the migration request and returns a notification which
+ * can be used to obtain the outcome of the operation.
+ *
+ * The 'shardTakesCollectionDistLock' parameter controls whether the distributed lock is
+ * acquired by the migration manager or by the shard executing the migration request.
+ */
+ std::shared_ptr<Notification<Status>> _schedule(
+ OperationContext* txn,
+ const MigrateInfo& migrateInfo,
+ bool shardTakesCollectionDistLock,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete);
+
+ /**
+ * Acquires the collection distributed lock for the specified namespace and if it succeeds,
+ * schedules the migration.
+ *
+ * The distributed lock is acquired before scheduling the first migration for the collection and
+ * is only released when all active migrations on the collection have finished.
+ */
+ void _scheduleWithDistLock_inlock(OperationContext* txn,
+ const HostAndPort& targetHost,
+ Migration migration);
+
+ /**
+ * Used internally for migrations scheduled with the distributed lock acquired by the config
+ * server. Called exactly once for each scheduled migration, it will signal the migration in the
+ * passed iterator and if this is the last migration for the collection will free the collection
+ * distributed lock.
+ */
+ void _completeWithDistLock_inlock(OperationContext* txn,
+ MigrationsList::iterator itMigration,
+ Status status);
+
+ /**
+ * Immediately schedules the specified migration without attempting to acquire the collection
+ * distributed lock or checking that it is not being held.
+ *
+ * This method is only used for retrying migrations that have failed with LockBusy errors
+ * returned by the shard, which only happens with legacy 3.2 shards that take the collection
+ * distributed lock themselves.
+ */
+ void _scheduleWithoutDistLock_inlock(OperationContext* txn,
+ const HostAndPort& targetHost,
+ Migration migration);
+
+ /**
+ * If the state of the migration manager is kStopping, checks whether there are any outstanding
+ * scheduled requests and if there aren't any signals the class condition variable.
+ */
+ void _checkDrained_inlock();
+
+ /**
+ * Blocking call, which waits for the migration manager to leave the recovering state (if it is
+ * currently recovering).
+ */
+ void _waitForRecovery();
+
+ /**
+ * Should only be called from startRecovery or finishRecovery functions when the migration
+ * manager is in either the kStopped or kRecovering state. Releases all the distributed locks
+ * that the balancer holds, clears the config.migrations collection, changes the state of the
+ * migration manager to kEnabled. Then unblocks all processes waiting for kEnabled state.
+ */
+ void _abandonActiveMigrationsAndEnableManager(OperationContext* txn);
+
+ // The service context under which this migration manager runs.
+ ServiceContext* const _serviceContext;
+
+ // Used as a constant session ID for all distributed locks that this MigrationManager holds.
+ // Currently required so that locks can be reacquired for the balancer in startRecovery and then
+ // overtaken in later operations.
+ OID _lockSessionID;
+
+ // Carries migration information over from startRecovery to finishRecovery. Should only be set
+ // in startRecovery and then accessed in finishRecovery.
+ stdx::unordered_map<NamespaceString, std::list<MigrateInfo>> _migrationRecoveryMap;
+
+ // Protects the class state below.
+ stdx::mutex _mutex;
+
+ // Always start the migration manager in a stopped state.
+ State _state{State::kStopped};
+
+ // Condition variable, which is waited on when the migration manager's state is changing and
+ // signaled when the state change is complete.
+ stdx::condition_variable _condVar;
+
+ // Holds information about each collection's distributed lock and active migrations via a
+ // CollectionMigrationState object.
+ CollectionMigrationsStateMap _activeMigrationsWithDistLock;
+
+ // Holds information about migrations, which have been scheduled without the collection
+ // distributed lock acquired (i.e., the shard is asked to acquire it).
+ MigrationsList _activeMigrationsWithoutDistLock;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/migration_manager_test.cpp b/src/mongo/s/balancer/migration_manager_test.cpp
new file mode 100644
index 00000000000..64f1aa193ed
--- /dev/null
+++ b/src/mongo/s/balancer/migration_manager_test.cpp
@@ -0,0 +1,980 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/client.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/s/balancer/migration_manager.h"
+#include "mongo/s/balancer/type_migration.h"
+#include "mongo/s/catalog/dist_lock_manager_mock.h"
+#include "mongo/s/catalog/sharding_catalog_client_impl.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_database.h"
+#include "mongo/s/catalog/type_locks.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/config_server_test_fixture.h"
+#include "mongo/s/move_chunk_request.h"
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+namespace {
+
+using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
+using std::vector;
+using unittest::assertGet;
+
+const auto kShardId0 = ShardId("shard0");
+const auto kShardId1 = ShardId("shard1");
+const auto kShardId2 = ShardId("shard2");
+const auto kShardId3 = ShardId("shard3");
+
+const HostAndPort kShardHost0 = HostAndPort("TestHost0", 12345);
+const HostAndPort kShardHost1 = HostAndPort("TestHost1", 12346);
+const HostAndPort kShardHost2 = HostAndPort("TestHost2", 12347);
+const HostAndPort kShardHost3 = HostAndPort("TestHost3", 12348);
+
+const MigrationSecondaryThrottleOptions kDefaultSecondaryThrottle =
+ MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault);
+
+const long long kMaxSizeMB = 100;
+const std::string kPattern = "_id";
+
+const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ Seconds(15));
+
+class MigrationManagerTest : public ConfigServerTestFixture {
+protected:
+ /**
+ * Returns the mock targeter for the specified shard. Useful to use like so
+ *
+ * shardTargeterMock(txn, shardId)->setFindHostReturnValue(shardHost);
+ *
+ * Then calls to RemoteCommandTargeterMock::findHost will return HostAndPort "shardHost" for
+ * Shard "shardId".
+ *
+ * Scheduling a command requires a shard host target. The command will be caught by the mock
+ * network, but sending the command requires finding the shard's host.
+ */
+ std::shared_ptr<RemoteCommandTargeterMock> shardTargeterMock(OperationContext* txn,
+ ShardId shardId);
+
+ /**
+ * Inserts a document into the config.databases collection to indicate that "dbName" is sharded
+ * with primary "primaryShard".
+ */
+ void setUpDatabase(const std::string& dbName, const ShardId primaryShard);
+
+ /**
+ * Inserts a document into the config.collections collection to indicate that "collName" is
+ * sharded with version "version". The shard key pattern defaults to "_id".
+ */
+ void setUpCollection(const std::string collName, ChunkVersion version);
+
+ /**
+ * Inserts a document into the config.chunks collection so that the chunk defined by the
+ * parameters exists. Returns a ChunkType defined by the parameters.
+ */
+ ChunkType setUpChunk(const std::string& collName,
+ const BSONObj& chunkMin,
+ const BSONObj& chunkMax,
+ const ShardId& shardId,
+ const ChunkVersion& version);
+
+ /**
+ * Inserts a document into the config.migrations collection as an active migration.
+ */
+ void setUpMigration(const std::string& collName,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ const ShardId& toShard,
+ const ShardId& fromShard);
+
+ /**
+ * Asserts that config.migrations is empty and config.locks contains no locked documents, both
+ * of which should be true if the MigrationManager is inactive and behaving properly.
+ */
+ void checkMigrationsCollectionIsEmptyAndLocksAreUnlocked();
+
+ /**
+ * Sets up mock network to expect a moveChunk command and return a fixed BSON response or a
+ * "returnStatus".
+ */
+ void expectMoveChunkCommand(const ChunkType& chunk,
+ const ShardId& toShardId,
+ const bool& takeDistLock,
+ const BSONObj& response);
+ void expectMoveChunkCommand(const ChunkType& chunk,
+ const ShardId& toShardId,
+ const bool& takeDistLock,
+ const Status& returnStatus);
+
+ // Random static initialization order can result in X constructor running before Y constructor
+ // if X and Y are defined in different source files. Defining variables here to enforce order.
+ const BSONObj kShard0 =
+ BSON(ShardType::name(kShardId0.toString()) << ShardType::host(kShardHost0.toString())
+ << ShardType::maxSizeMB(kMaxSizeMB));
+ const BSONObj kShard1 =
+ BSON(ShardType::name(kShardId1.toString()) << ShardType::host(kShardHost1.toString())
+ << ShardType::maxSizeMB(kMaxSizeMB));
+ const BSONObj kShard2 =
+ BSON(ShardType::name(kShardId2.toString()) << ShardType::host(kShardHost2.toString())
+ << ShardType::maxSizeMB(kMaxSizeMB));
+ const BSONObj kShard3 =
+ BSON(ShardType::name(kShardId3.toString()) << ShardType::host(kShardHost3.toString())
+ << ShardType::maxSizeMB(kMaxSizeMB));
+
+ const KeyPattern kKeyPattern = KeyPattern(BSON(kPattern << 1));
+
+ std::unique_ptr<MigrationManager> _migrationManager;
+
+private:
+ void setUp() override;
+ void tearDown() override;
+};
+
+void MigrationManagerTest::setUp() {
+ ConfigServerTestFixture::setUp();
+ _migrationManager = stdx::make_unique<MigrationManager>(getServiceContext());
+ _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
+ _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false);
+}
+
+void MigrationManagerTest::tearDown() {
+ checkMigrationsCollectionIsEmptyAndLocksAreUnlocked();
+ _migrationManager->interruptAndDisableMigrations();
+ _migrationManager->drainActiveMigrations();
+ _migrationManager.reset();
+ ConfigServerTestFixture::tearDown();
+}
+
+std::shared_ptr<RemoteCommandTargeterMock> MigrationManagerTest::shardTargeterMock(
+ OperationContext* txn, ShardId shardId) {
+ return RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(txn, shardId))->getTargeter());
+}
+
+void MigrationManagerTest::setUpDatabase(const std::string& dbName, const ShardId primaryShard) {
+ DatabaseType db;
+ db.setName(dbName);
+ db.setPrimary(primaryShard);
+ db.setSharded(true);
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), DatabaseType::ConfigNS, db.toBSON(), kMajorityWriteConcern));
+}
+
+void MigrationManagerTest::setUpCollection(const std::string collName, ChunkVersion version) {
+ CollectionType coll;
+ coll.setNs(NamespaceString(collName));
+ coll.setEpoch(version.epoch());
+ coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(version.toLong()));
+ coll.setKeyPattern(kKeyPattern);
+ coll.setUnique(false);
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), CollectionType::ConfigNS, coll.toBSON(), kMajorityWriteConcern));
+}
+
+ChunkType MigrationManagerTest::setUpChunk(const std::string& collName,
+ const BSONObj& chunkMin,
+ const BSONObj& chunkMax,
+ const ShardId& shardId,
+ const ChunkVersion& version) {
+ ChunkType chunk;
+ chunk.setNS(collName);
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkMax);
+ chunk.setShard(shardId);
+ chunk.setVersion(version);
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ChunkType::ConfigNS, chunk.toBSON(), kMajorityWriteConcern));
+ return chunk;
+}
+
+void MigrationManagerTest::setUpMigration(const std::string& collName,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ const ShardId& toShard,
+ const ShardId& fromShard) {
+ BSONObjBuilder builder;
+ builder.append(MigrationType::ns(), collName);
+ builder.append(MigrationType::min(), minKey);
+ builder.append(MigrationType::max(), maxKey);
+ builder.append(MigrationType::toShard(), toShard.toString());
+ builder.append(MigrationType::fromShard(), fromShard.toString());
+ MigrationType migrationType = assertGet(MigrationType::fromBSON(builder.obj()));
+ ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(),
+ MigrationType::ConfigNS,
+ migrationType.toBSON(),
+ kMajorityWriteConcern));
+}
+
+void MigrationManagerTest::checkMigrationsCollectionIsEmptyAndLocksAreUnlocked() {
+ auto statusWithMigrationsQueryResponse =
+ shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(MigrationType::ConfigNS),
+ BSONObj(),
+ BSONObj(),
+ boost::none);
+ Shard::QueryResponse migrationsQueryResponse =
+ uassertStatusOK(statusWithMigrationsQueryResponse);
+ ASSERT_EQUALS(0U, migrationsQueryResponse.docs.size());
+
+ auto statusWithLocksQueryResponse = shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(LocksType::ConfigNS),
+ BSON(LocksType::state(LocksType::LOCKED)),
+ BSONObj(),
+ boost::none);
+ Shard::QueryResponse locksQueryResponse = uassertStatusOK(statusWithLocksQueryResponse);
+ ASSERT_EQUALS(0U, locksQueryResponse.docs.size());
+}
+
+void MigrationManagerTest::expectMoveChunkCommand(const ChunkType& chunk,
+ const ShardId& toShardId,
+ const bool& takeDistLock,
+ const BSONObj& response) {
+ onCommand([&chunk, &toShardId, &takeDistLock, &response](const RemoteCommandRequest& request) {
+ NamespaceString nss(request.cmdObj.firstElement().valueStringData());
+ ASSERT_EQ(chunk.getNS(), nss.ns());
+
+ const StatusWith<MoveChunkRequest> moveChunkRequestWithStatus =
+ MoveChunkRequest::createFromCommand(nss, request.cmdObj);
+ ASSERT_OK(moveChunkRequestWithStatus.getStatus());
+
+ ASSERT_EQ(chunk.getNS(), moveChunkRequestWithStatus.getValue().getNss().ns());
+ ASSERT_BSONOBJ_EQ(chunk.getMin(), moveChunkRequestWithStatus.getValue().getMinKey());
+ ASSERT_BSONOBJ_EQ(chunk.getMax(), moveChunkRequestWithStatus.getValue().getMaxKey());
+ ASSERT_EQ(chunk.getShard(), moveChunkRequestWithStatus.getValue().getFromShardId());
+
+ ASSERT_EQ(toShardId, moveChunkRequestWithStatus.getValue().getToShardId());
+ ASSERT_EQ(takeDistLock, moveChunkRequestWithStatus.getValue().getTakeDistLock());
+
+ return response;
+ });
+}
+
+void MigrationManagerTest::expectMoveChunkCommand(const ChunkType& chunk,
+ const ShardId& toShardId,
+ const bool& takeDistLock,
+ const Status& returnStatus) {
+ BSONObjBuilder resultBuilder;
+ Command::appendCommandStatus(resultBuilder, returnStatus);
+ expectMoveChunkCommand(chunk, toShardId, takeDistLock, resultBuilder.obj());
+}
+
+TEST_F(MigrationManagerTest, OneCollectionTwoMigrations) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ // Going to request that these two chunks get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
+
+ auto future = launchAsync([this, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding a host to which to send the command.
+ // Set up dummy hosts for the source shards.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ for (const auto& migrateInfo : migrationRequests) {
+ ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
+ }
+ });
+
+ // Expect two moveChunk commands.
+ expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK());
+ expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(MigrationManagerTest, TwoCollectionsTwoMigrationsEach) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up a database and two collections as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName1 = "foo.bar";
+ std::string collName2 = "foo.baz";
+ ChunkVersion version1(2, 0, OID::gen());
+ ChunkVersion version2(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName1, version1);
+ setUpCollection(collName2, version2);
+
+ // Set up two chunks in the metadata for each collection.
+ ChunkType chunk1coll1 =
+ setUpChunk(collName1, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version1);
+ version1.incMinor();
+ ChunkType chunk2coll1 =
+ setUpChunk(collName1, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version1);
+
+ ChunkType chunk1coll2 =
+ setUpChunk(collName2, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version2);
+ version2.incMinor();
+ ChunkType chunk2coll2 =
+ setUpChunk(collName2, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version2);
+
+ // Going to request that these four chunks get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1coll1.getNS(), kShardId1, chunk1coll1},
+ {chunk2coll1.getNS(), kShardId3, chunk2coll1},
+ {chunk1coll2.getNS(), kShardId1, chunk1coll2},
+ {chunk2coll2.getNS(), kShardId3, chunk2coll2}};
+
+ auto future = launchAsync([this, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding a host to which to send the command.
+ // Set up dummy hosts for the source shards.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ for (const auto& migrateInfo : migrationRequests) {
+ ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
+ }
+ });
+
+ // Expect four moveChunk commands.
+ expectMoveChunkCommand(chunk1coll1, kShardId1, false, Status::OK());
+ expectMoveChunkCommand(chunk2coll1, kShardId3, false, Status::OK());
+ expectMoveChunkCommand(chunk1coll2, kShardId1, false, Status::OK());
+ expectMoveChunkCommand(chunk2coll2, kShardId3, false, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+// Old v3.2 shards expect to take the distributed lock before executing a moveChunk command. The
+// MigrationManager should take the distlock and fail the first moveChunk command with an old shard,
+// and then release the lock and retry the command successfully.
+TEST_F(MigrationManagerTest, SameCollectionOldShardMigration) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ // Going to request that these two chunks get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
+
+ auto future = launchAsync([this, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding a host to which to send the command.
+ // Set up dummy hosts for the source shards.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ for (const auto& migrateInfo : migrationRequests) {
+ ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
+ }
+ });
+
+ // Expect two moveChunk commands.
+ expectMoveChunkCommand(
+ chunk1,
+ kShardId1,
+ false,
+ Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigration generated error."));
+ expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK());
+ expectMoveChunkCommand(chunk1, kShardId1, true, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+// Fail a migration if an old v3.2 shard fails to acquire the distributed lock more than once. The
+// first LockBusy error identifies the shard as an old shard to the MigrationManager, the second
+// indicates the lock is held elsewhere and unavailable.
+TEST_F(MigrationManagerTest, SameOldShardFailsToAcquireDistributedLockTwice) {
+ // Set up a shard in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up a chunk in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
+
+ // Going to request that this chunk get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}};
+
+ auto future = launchAsync([this, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding a host to which to send the command.
+ // Set up a dummy host for the source shard.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ for (const auto& migrateInfo : migrationRequests) {
+ ASSERT_EQ(ErrorCodes::LockBusy, migrationStatuses.at(migrateInfo.getName()));
+ }
+ });
+
+ // Expect two sequential moveChunk commands to the same shard, both of which fail with LockBusy.
+ expectMoveChunkCommand(
+ chunk1,
+ kShardId1,
+ false,
+ Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigrations generated error."));
+ expectMoveChunkCommand(
+ chunk1,
+ kShardId1,
+ true,
+ Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigrations generated error."));
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+// If in the same collection a migration is scheduled with an old v3.2 shard, a second migration in
+// the collection with a different old v3.2 shard should get rescheduled.
+TEST_F(MigrationManagerTest, SameCollectionTwoOldShardMigrations) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ // Going to request that these two chunks get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
+
+ auto future = launchAsync([this, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding a host to which to send the command.
+ // Set up dummy hosts for the source shards.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ for (const auto& migrateInfo : migrationRequests) {
+ ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
+ }
+ });
+
+ // Expect two failed moveChunk commands, then two successful moveChunk commands after the
+ // balancer releases the distributed lock.
+ expectMoveChunkCommand(
+ chunk1,
+ kShardId1,
+ false,
+ Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigration generated error."));
+ expectMoveChunkCommand(
+ chunk2,
+ kShardId3,
+ false,
+ Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigration generated error."));
+ expectMoveChunkCommand(chunk1, kShardId1, true, Status::OK());
+ expectMoveChunkCommand(chunk2, kShardId3, true, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+// Takes the distributed lock for a collection so that that the MigrationManager is unable to
+// schedule migrations for that collection.
+TEST_F(MigrationManagerTest, FailToAcquireDistributedLock) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ // Going to request that these two chunks get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
+
+ shardTargeterMock(operationContext(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(operationContext(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ // Take the distributed lock for the collection before scheduling via the MigrationManager.
+ const std::string whyMessage("FailToAcquireDistributedLock unit-test taking distributed lock");
+ DistLockManager::ScopedDistLock distLockStatus = assertGet(
+ catalogClient()->getDistLockManager()->lock(operationContext(),
+ chunk1.getNS(),
+ whyMessage,
+ DistLockManager::kSingleLockAttemptTimeout));
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ operationContext(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ for (const auto& migrateInfo : migrationRequests) {
+ ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress,
+ migrationStatuses.at(migrateInfo.getName()));
+ }
+}
+
+// The MigrationManager should fail the migration if a host is not found for the source shard.
+// Scheduling a moveChunk command requires finding a host to which to send the command.
+TEST_F(MigrationManagerTest, SourceShardNotFound) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ // Going to request that these two chunks get migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1},
+ {chunk2.getNS(), kShardId3, chunk2}};
+
+ auto future = launchAsync([this, chunk1, chunk2, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling a moveChunk command requires finding a host to which to send the command. Set
+ // up a dummy host for kShardHost0, and return an error for kShardHost3.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)
+ ->setFindHostReturnValue(
+ Status(ErrorCodes::ReplicaSetNotFound, "SourceShardNotFound generated error."));
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ ASSERT_OK(migrationStatuses.at(chunk1.getName()));
+ ASSERT_EQ(ErrorCodes::ReplicaSetNotFound, migrationStatuses.at(chunk2.getName()));
+ });
+
+ // Expect only one moveChunk command to be called.
+ expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(MigrationManagerTest, JumboChunkResponseBackwardsCompatibility) {
+ // Set up one shard in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up a single chunk in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
+
+ // Going to request that this chunk gets migrated.
+ const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}};
+
+ auto future = launchAsync([this, chunk1, migrationRequests] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling a moveChunk command requires finding a host to which to send the command. Set
+ // up a dummy host for kShardHost0.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+
+ MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
+ txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
+
+ ASSERT_EQ(ErrorCodes::ChunkTooBig, migrationStatuses.at(chunk1.getName()));
+ });
+
+ // Expect only one moveChunk command to be called.
+ expectMoveChunkCommand(chunk1, kShardId1, false, BSON("ok" << 0 << "chunkTooBig" << true));
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(MigrationManagerTest, InterruptMigration) {
+ // Set up one shard in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up a single chunk in the metadata.
+ ChunkType chunk =
+ setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
+
+ auto future = launchAsync([&] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling a moveChunk command requires finding a host to which to send the command. Set
+ // up a dummy host for kShardHost0.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+
+ ASSERT_NOT_OK(_migrationManager->executeManualMigration(
+ txn.get(), {chunk.getNS(), kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false));
+ });
+
+ // Wait till the move chunk request gets sent and pretend that it is stuck by never responding
+ // to the request
+ network()->enterNetwork();
+ network()->blackHole(network()->getNextReadyRequest());
+ network()->exitNetwork();
+
+ // Now that the migration request is 'pending', try to cancel the migration manager. This should
+ // succeed.
+ _migrationManager->interruptAndDisableMigrations();
+
+ // Ensure that cancellations get processed
+ network()->enterNetwork();
+ network()->runReadyNetworkOperations();
+ network()->exitNetwork();
+
+ // Ensure that the previously scheduled migration is cancelled
+ future.timed_get(kFutureTimeout);
+
+ // Ensure that no new migrations can be scheduled
+ ASSERT_NOT_OK(_migrationManager->executeManualMigration(operationContext(),
+ {chunk.getNS(), kShardId1, chunk},
+ 0,
+ kDefaultSecondaryThrottle,
+ false));
+
+ // Ensure that the migration manager is no longer handling any migrations.
+ _migrationManager->drainActiveMigrations();
+
+ // Check that the migration that was active when the migration manager was interrupted can be
+ // found in config.migrations (and thus would be recovered if a migration manager were to start
+ // up again).
+ auto statusWithMigrationsQueryResponse =
+ shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(MigrationType::ConfigNS),
+ BSON(MigrationType::name(chunk.getName())),
+ BSONObj(),
+ boost::none);
+ Shard::QueryResponse migrationsQueryResponse =
+ uassertStatusOK(statusWithMigrationsQueryResponse);
+ ASSERT_EQUALS(1U, migrationsQueryResponse.docs.size());
+
+ ASSERT_OK(catalogClient()->removeConfigDocuments(operationContext(),
+ MigrationType::ConfigNS,
+ BSON(MigrationType::name(chunk.getName())),
+ kMajorityWriteConcern));
+
+ // Restore the migration manager back to the started state, which is expected by tearDown
+ _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
+ _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false);
+}
+
+TEST_F(MigrationManagerTest, RestartMigrationManager) {
+ // Set up one shard in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up a single chunk in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
+
+ // Go through the lifecycle of the migration manager
+ _migrationManager->interruptAndDisableMigrations();
+ _migrationManager->drainActiveMigrations();
+ _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
+ _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false);
+
+ auto future = launchAsync([&] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling a moveChunk command requires finding a host to which to send the command. Set
+ // up a dummy host for kShardHost0.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+
+ ASSERT_OK(_migrationManager->executeManualMigration(
+ txn.get(), {chunk1.getNS(), kShardId1, chunk1}, 0, kDefaultSecondaryThrottle, false));
+ });
+
+ // Expect only one moveChunk command to be called.
+ expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(MigrationManagerTest, MigrationRecovery) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ _migrationManager->interruptAndDisableMigrations();
+ _migrationManager->drainActiveMigrations();
+
+ // Set up two fake active migrations by writing documents to the config.migrations collection.
+ setUpMigration(collName,
+ chunk1.getMin(),
+ chunk1.getMax(),
+ kShardId1.toString(),
+ chunk1.getShard().toString());
+ setUpMigration(collName,
+ chunk2.getMin(),
+ chunk2.getMax(),
+ kShardId3.toString(),
+ chunk2.getShard().toString());
+
+ _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
+
+ auto future = launchAsync([this] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding hosts to which to send the commands.
+ // Set up dummy hosts for the source shards.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ _migrationManager->finishRecovery(txn.get(), 0, kDefaultSecondaryThrottle, false);
+ });
+
+ // Expect two moveChunk commands.
+ expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK());
+ expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(MigrationManagerTest, FailMigrationRecovery) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ _migrationManager->interruptAndDisableMigrations();
+ _migrationManager->drainActiveMigrations();
+
+ // Set up a parsable fake active migration document in the config.migrations collection.
+ setUpMigration(collName,
+ chunk1.getMin(),
+ chunk1.getMax(),
+ kShardId1.toString(),
+ chunk1.getShard().toString());
+
+ // Set up a fake active migration document that will fail MigrationType parsing -- missing
+ // field.
+ BSONObjBuilder builder;
+ builder.append("_id", "testing");
+ // No MigrationType::ns() field!
+ builder.append(MigrationType::min(), chunk2.getMin());
+ builder.append(MigrationType::max(), chunk2.getMax());
+ builder.append(MigrationType::toShard(), kShardId3.toString());
+ builder.append(MigrationType::fromShard(), chunk2.getShard().toString());
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), MigrationType::ConfigNS, builder.obj(), kMajorityWriteConcern));
+
+ // Take the distributed lock for the collection, which should be released during recovery when
+ // it fails. Any dist lock held by the config server will be released via proccessId, so the
+ // session ID used here doesn't matter.
+ ASSERT_OK(catalogClient()->getDistLockManager()->lockWithSessionID(
+ operationContext(),
+ collName,
+ "MigrationManagerTest",
+ OID::gen(),
+ DistLockManager::kSingleLockAttemptTimeout));
+
+ _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
+ _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false);
+
+ // MigrationManagerTest::tearDown checks that the config.migrations collection is empty and all
+ // distributed locks are unlocked.
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/balancer/scoped_migration_request.cpp b/src/mongo/s/balancer/scoped_migration_request.cpp
new file mode 100644
index 00000000000..5b02d57b1d8
--- /dev/null
+++ b/src/mongo/s/balancer/scoped_migration_request.cpp
@@ -0,0 +1,188 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/scoped_migration_request.h"
+
+#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/s/balancer/type_migration.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+namespace {
+const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ Seconds(15));
+const int kDuplicateKeyErrorMaxRetries = 2;
+}
+
+ScopedMigrationRequest::ScopedMigrationRequest(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& minKey)
+ : _txn(txn), _nss(nss), _minKey(minKey) {}
+
+ScopedMigrationRequest::~ScopedMigrationRequest() {
+ if (!_txn) {
+ // If the txn object was cleared, nothing should happen in the destructor.
+ return;
+ }
+
+ // Try to delete the entry in the config.migrations collection. If the command fails, that is
+ // okay.
+ BSONObj migrationDocumentIdentifier =
+ BSON(MigrationType::ns(_nss.ns()) << MigrationType::min(_minKey));
+ Status result = grid.catalogClient(_txn)->removeConfigDocuments(
+ _txn, MigrationType::ConfigNS, migrationDocumentIdentifier, kMajorityWriteConcern);
+
+ if (!result.isOK()) {
+ LOG(0) << "Failed to remove config.migrations document for migration '"
+ << migrationDocumentIdentifier.toString() << "'" << causedBy(redact(result));
+ }
+}
+
+ScopedMigrationRequest::ScopedMigrationRequest(ScopedMigrationRequest&& other) {
+ *this = std::move(other);
+ // Set txn to null so that the destructor will do nothing.
+ other._txn = nullptr;
+}
+
+ScopedMigrationRequest& ScopedMigrationRequest::operator=(ScopedMigrationRequest&& other) {
+ if (this != &other) {
+ _txn = other._txn;
+ _nss = other._nss;
+ _minKey = other._minKey;
+ // Set txn to null so that the destructor will do nothing.
+ other._txn = nullptr;
+ }
+
+ return *this;
+}
+
+StatusWith<ScopedMigrationRequest> ScopedMigrationRequest::writeMigration(
+ OperationContext* txn, const MigrateInfo& migrateInfo) {
+
+ // Try to write a unique migration document to config.migrations.
+ MigrationType migrationType(migrateInfo);
+ for (int retry = 0; retry < kDuplicateKeyErrorMaxRetries; ++retry) {
+ Status result = grid.catalogClient(txn)->insertConfigDocument(
+ txn, MigrationType::ConfigNS, migrationType.toBSON(), kMajorityWriteConcern);
+
+ if (result == ErrorCodes::DuplicateKey) {
+ // If the exact migration described by "migrateInfo" is active, return a scoped object
+ // for the request because this migration request will join the active one once
+ // scheduled.
+ auto statusWithMigrationQueryResult =
+ grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(MigrationType::ConfigNS),
+ BSON(MigrationType::name(migrateInfo.getName())),
+ BSONObj(),
+ boost::none);
+ if (!statusWithMigrationQueryResult.isOK()) {
+ return {statusWithMigrationQueryResult.getStatus().code(),
+ str::stream()
+ << "Failed to verify whether conflicting migration is in "
+ << "progress for migration '"
+ << redact(migrateInfo.toString())
+ << "' while trying to query config.migrations."
+ << causedBy(redact(statusWithMigrationQueryResult.getStatus()))};
+ }
+ if (statusWithMigrationQueryResult.getValue().docs.empty()) {
+ // The document that caused the DuplicateKey error is no longer in the collection,
+ // so retrying the insert might succeed.
+ continue;
+ }
+ invariant(statusWithMigrationQueryResult.getValue().docs.size() == 1);
+
+ BSONObj activeMigrationBSON = statusWithMigrationQueryResult.getValue().docs.front();
+ auto statusWithActiveMigration = MigrationType::fromBSON(activeMigrationBSON);
+ if (!statusWithActiveMigration.isOK()) {
+ return {statusWithActiveMigration.getStatus().code(),
+ str::stream() << "Failed to verify whether conflicting migration is in "
+ << "progress for migration '"
+ << redact(migrateInfo.toString())
+ << "' while trying to parse active migration document '"
+ << redact(activeMigrationBSON.toString())
+ << "'."
+ << causedBy(redact(statusWithActiveMigration.getStatus()))};
+ }
+
+ MigrateInfo activeMigrateInfo = statusWithActiveMigration.getValue().toMigrateInfo();
+ if (activeMigrateInfo.to != migrateInfo.to ||
+ activeMigrateInfo.from != migrateInfo.from) {
+ log() << "Failed to write document '" << redact(migrateInfo.toString())
+ << "' to config.migrations because there is already an active migration for"
+ << " that chunk: '" << redact(activeMigrateInfo.toString()) << "'."
+ << causedBy(redact(result));
+ return result;
+ }
+
+ result = Status::OK();
+ }
+
+ // As long as there isn't a DuplicateKey error, the document may have been written, and it's
+ // safe (won't delete another migration's document) and necessary to try to clean up the
+ // document via the destructor.
+ ScopedMigrationRequest scopedMigrationRequest(
+ txn, NamespaceString(migrateInfo.ns), migrateInfo.minKey);
+
+ // If there was a write error, let the object go out of scope and clean up in the
+ // destructor.
+ if (!result.isOK()) {
+ return result;
+ }
+
+ return std::move(scopedMigrationRequest);
+ }
+
+ MONGO_UNREACHABLE;
+}
+
+ScopedMigrationRequest ScopedMigrationRequest::createForRecovery(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& minKey) {
+ return ScopedMigrationRequest(txn, nss, minKey);
+}
+
+void ScopedMigrationRequest::keepDocumentOnDestruct() {
+ _txn = nullptr;
+ LOG(1) << "Keeping config.migrations document with namespace '" << _nss << "' and minKey '"
+ << _minKey << "' for balancer recovery";
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/scoped_migration_request.h b/src/mongo/s/balancer/scoped_migration_request.h
new file mode 100644
index 00000000000..8595671dc4d
--- /dev/null
+++ b/src/mongo/s/balancer/scoped_migration_request.h
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status_with.h"
+#include "mongo/s/balancer/balancer_policy.h"
+#include "mongo/s/migration_secondary_throttle_options.h"
+
+namespace mongo {
+
+/**
+ * RAII class that handles writes to the config.migrations collection for a migration that comes
+ * through the balancer.
+ *
+ * A migration must have an entry in the config.migrations collection so that the Balancer can
+ * recover from stepdown/crash. This entry must be entered before a migration begins and then
+ * removed once the migration has finished.
+ *
+ * This class should only be used by the Balancer!
+ */
+class ScopedMigrationRequest {
+ MONGO_DISALLOW_COPYING(ScopedMigrationRequest);
+
+public:
+ /**
+ * Deletes this migration's entry in the config.migrations collection, using majority write
+ * concern. If there is a balancer stepdown/crash before the write propagates to a majority of
+ * servers, that is alright because the balancer recovery process will handle it.
+ *
+ * If keepDocumentOnDestruct() has been called, then no attempt to remove the document is made.
+ */
+ ~ScopedMigrationRequest();
+
+ ScopedMigrationRequest(ScopedMigrationRequest&& other);
+ ScopedMigrationRequest& operator=(ScopedMigrationRequest&& other);
+
+ /**
+ * Inserts an unique migration entry in the config.migrations collection. If the write is
+ * successful, a ScopedMigrationRequest object is returned; otherwise, the write error.
+ *
+ * The destructor will handle removing the document when it is no longer needed.
+ */
+ static StatusWith<ScopedMigrationRequest> writeMigration(OperationContext* txn,
+ const MigrateInfo& migrate);
+
+ /**
+ * Creates a ScopedMigrationRequest object without inserting a document into config.migrations.
+ * The destructor will handle removing the migration document when it is no longer needed.
+ *
+ * This should only be used on Balancer recovery when a config.migrations document already
+ * exists for the migration.
+ */
+ static ScopedMigrationRequest createForRecovery(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& minKey);
+
+ /**
+ * Clears the operation context so that the destructor will not remove the config.migrations
+ * document for the migration.
+ *
+ * This should only be used on the Balancer when it is interrupted and must leave entries in
+ * config.migrations so that ongoing migrations can be recovered later.
+ */
+ void keepDocumentOnDestruct();
+
+private:
+ ScopedMigrationRequest(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& minKey);
+
+ // Need an operation context with which to do a write in the destructor.
+ OperationContext* _txn;
+
+ // ns and minkey are needed to identify the migration document when it is removed from
+ // config.migrations by the destructor.
+ NamespaceString _nss;
+ BSONObj _minKey;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/scoped_migration_request_test.cpp b/src/mongo/s/balancer/scoped_migration_request_test.cpp
new file mode 100644
index 00000000000..48c0d501136
--- /dev/null
+++ b/src/mongo/s/balancer/scoped_migration_request_test.cpp
@@ -0,0 +1,213 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/scoped_migration_request.h"
+
+#include "mongo/s/balancer/type_migration.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/config_server_test_fixture.h"
+#include "mongo/s/migration_secondary_throttle_options.h"
+
+namespace mongo {
+namespace {
+
+using unittest::assertGet;
+
+const std::string kNs = "TestDB.TestColl";
+const BSONObj kMin = BSON("a" << 10);
+const BSONObj kMax = BSON("a" << 20);
+const ShardId kFromShard("shard0000");
+const ShardId kToShard("shard0001");
+const ShardId kDifferentToShard("shard0002");
+const std::string kName = "TestDB.TestColl-a_10";
+
+class ScopedMigrationRequestTest : public ConfigServerTestFixture {
+public:
+ /**
+ * Queries config.migrations for a document with name (_id) "chunkName" and asserts that the
+ * number of documents returned equals "expectedNumberOfDocuments".
+ */
+ void checkMigrationsCollectionForDocument(std::string chunkName,
+ const unsigned long expectedNumberOfDocuments);
+
+ /**
+ * Makes a ScopedMigrationRequest and checks that the migration was written to
+ * config.migrations. This exercises the ScopedMigrationRequest move and assignment
+ * constructors.
+ */
+ ScopedMigrationRequest makeScopedMigrationRequest(const MigrateInfo& migrateInfo);
+};
+
+void ScopedMigrationRequestTest::checkMigrationsCollectionForDocument(
+ std::string chunkName, const unsigned long expectedNumberOfDocuments) {
+ auto response = shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(MigrationType::ConfigNS),
+ BSON(MigrationType::name(chunkName)),
+ BSONObj(),
+ boost::none);
+ Shard::QueryResponse queryResponse = unittest::assertGet(response);
+ std::vector<BSONObj> docs = queryResponse.docs;
+ ASSERT_EQUALS(expectedNumberOfDocuments, docs.size());
+}
+
+ScopedMigrationRequest ScopedMigrationRequestTest::makeScopedMigrationRequest(
+ const MigrateInfo& migrateInfo) {
+ ScopedMigrationRequest scopedMigrationRequest =
+ assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo));
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
+
+ return scopedMigrationRequest;
+}
+
+MigrateInfo makeMigrateInfo() {
+ const ChunkVersion kChunkVersion{1, 2, OID::gen()};
+
+ BSONObjBuilder chunkBuilder;
+ chunkBuilder.append(ChunkType::name(), kName);
+ chunkBuilder.append(ChunkType::ns(), kNs);
+ chunkBuilder.append(ChunkType::min(), kMin);
+ chunkBuilder.append(ChunkType::max(), kMax);
+ kChunkVersion.appendForChunk(&chunkBuilder);
+ chunkBuilder.append(ChunkType::shard(), kFromShard.toString());
+
+ ChunkType chunkType = assertGet(ChunkType::fromBSON(chunkBuilder.obj()));
+ ASSERT_OK(chunkType.validate());
+
+ return MigrateInfo(kNs, kToShard, chunkType);
+}
+
+TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequest) {
+ MigrateInfo migrateInfo = makeMigrateInfo();
+
+ {
+ ScopedMigrationRequest scopedMigrationRequest =
+ assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo));
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
+ }
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 0);
+}
+
+/**
+ * A document is created via scoped object, but document is not removed in destructor because
+ * keepDocumentOnDestruct() is called beforehand. Then recreate the scoped object without writing to
+ * the migraitons collection, and remove on destruct.
+ *
+ * Simulates (mostly) Balancer recovery.
+ */
+TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequestOnRecovery) {
+ MigrateInfo migrateInfo = makeMigrateInfo();
+
+ // Insert the document for the MigrationRequest and then prevent its removal in the destructor.
+ {
+ ScopedMigrationRequest scopedMigrationRequest =
+ assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo));
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
+
+ scopedMigrationRequest.keepDocumentOnDestruct();
+ }
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
+
+ // Fail to write a migration document if a migration document already exists for that chunk but
+ // with a different destination shard. (the migration request must have identical parameters).
+ {
+ MigrateInfo differentToShardMigrateInfo = migrateInfo;
+ differentToShardMigrateInfo.to = kDifferentToShard;
+
+ StatusWith<ScopedMigrationRequest> statusWithScopedMigrationRequest =
+ ScopedMigrationRequest::writeMigration(operationContext(), differentToShardMigrateInfo);
+
+ ASSERT_EQUALS(ErrorCodes::DuplicateKey, statusWithScopedMigrationRequest.getStatus());
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
+ }
+
+ // Create a new scoped object without inserting a document, and check that the destructor
+ // still removes the document corresponding to the MigrationRequest.
+ {
+ ScopedMigrationRequest scopedMigrationRequest = ScopedMigrationRequest::createForRecovery(
+ operationContext(), NamespaceString(migrateInfo.ns), migrateInfo.minKey);
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
+ }
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 0);
+}
+
+TEST_F(ScopedMigrationRequestTest, CreateMultipleScopedMigrationRequestsForIdenticalMigration) {
+ MigrateInfo migrateInfo = makeMigrateInfo();
+
+ {
+ // Create a ScopedMigrationRequest, which will do the config.migrations write.
+ ScopedMigrationRequest scopedMigrationRequest =
+ assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo));
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
+
+ {
+ // Should be able to create another Scoped object if the request is identical.
+ ScopedMigrationRequest identicalScopedMigrationRequest =
+ assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo));
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
+ }
+
+ // If any scoped object goes out of scope, the migration should be over and the document
+ // removed.
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 0);
+ }
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 0);
+}
+
+TEST_F(ScopedMigrationRequestTest, MoveAndAssignmentConstructors) {
+ MigrateInfo migrateInfo = makeMigrateInfo();
+
+ // Test that when the move and assignment constructors are used and the original variable goes
+ // out of scope, the original object's destructor does not remove the migration document.
+ {
+ ScopedMigrationRequest anotherScopedMigrationRequest =
+ makeScopedMigrationRequest(migrateInfo);
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
+ }
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 0);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/balancer/type_migration.cpp b/src/mongo/s/balancer/type_migration.cpp
new file mode 100644
index 00000000000..963c40b30e3
--- /dev/null
+++ b/src/mongo/s/balancer/type_migration.cpp
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/balancer/type_migration.h"
+
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/s/catalog/type_chunk.h"
+
+namespace mongo {
+
+const std::string MigrationType::ConfigNS = "config.migrations";
+
+const BSONField<std::string> MigrationType::name("_id");
+const BSONField<std::string> MigrationType::ns("ns");
+const BSONField<BSONObj> MigrationType::min("min");
+const BSONField<BSONObj> MigrationType::max("max");
+const BSONField<std::string> MigrationType::fromShard("fromShard");
+const BSONField<std::string> MigrationType::toShard("toShard");
+
+MigrationType::MigrationType() = default;
+
+MigrationType::MigrationType(MigrateInfo info)
+ : _nss(NamespaceString(info.ns)),
+ _min(info.minKey),
+ _max(info.maxKey),
+ _fromShard(info.from),
+ _toShard(info.to) {}
+
+StatusWith<MigrationType> MigrationType::fromBSON(const BSONObj& source) {
+ MigrationType migrationType;
+
+ {
+ std::string migrationNS;
+ Status status = bsonExtractStringField(source, ns.name(), &migrationNS);
+ if (!status.isOK())
+ return status;
+ migrationType._nss = NamespaceString(migrationNS);
+ }
+
+ {
+ auto chunkRangeStatus = ChunkRange::fromBSON(source);
+ if (!chunkRangeStatus.isOK())
+ return chunkRangeStatus.getStatus();
+
+ const auto chunkRange = std::move(chunkRangeStatus.getValue());
+ migrationType._min = chunkRange.getMin().getOwned();
+ migrationType._max = chunkRange.getMax().getOwned();
+ }
+
+ {
+ std::string migrationToShard;
+ Status status = bsonExtractStringField(source, toShard.name(), &migrationToShard);
+ if (!status.isOK())
+ return status;
+ migrationType._toShard = migrationToShard;
+ }
+
+ {
+ std::string migrationFromShard;
+ Status status = bsonExtractStringField(source, fromShard.name(), &migrationFromShard);
+ if (!status.isOK())
+ return status;
+ migrationType._fromShard = migrationFromShard;
+ }
+
+ return migrationType;
+}
+
+BSONObj MigrationType::toBSON() const {
+ BSONObjBuilder builder;
+ if (_nss && _min)
+ builder.append(name.name(), getName());
+ if (_nss)
+ builder.append(ns.name(), _nss->ns());
+ if (_min)
+ builder.append(min.name(), _min.get());
+ if (_max)
+ builder.append(max.name(), _max.get());
+ if (_fromShard)
+ builder.append(fromShard.name(), _fromShard->toString());
+ if (_toShard)
+ builder.append(toShard.name(), _toShard->toString());
+
+ return builder.obj();
+}
+
+MigrateInfo MigrationType::toMigrateInfo() const {
+ return MigrateInfo(_nss->ns(), _toShard.get(), _fromShard.get(), _min.get(), _max.get());
+}
+
+std::string MigrationType::getName() const {
+ return ChunkType::genID(_nss->ns(), _min.get());
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/type_migration.h b/src/mongo/s/balancer/type_migration.h
new file mode 100644
index 00000000000..5f2948e9dfe
--- /dev/null
+++ b/src/mongo/s/balancer/type_migration.h
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/s/balancer/balancer_policy.h"
+#include "mongo/s/chunk_version.h"
+#include "mongo/s/client/shard.h"
+
+namespace mongo {
+
+/**
+ * This class represents the layout and contents of documents contained in the config.migrations
+ * collection. All manipulation of documents coming from that collection should be done with this
+ * class.
+ */
+class MigrationType {
+public:
+ // Name of the migrations collection in the config server.
+ static const std::string ConfigNS;
+
+ // Field names and types in the migrations collection type.
+ static const BSONField<std::string> name;
+ static const BSONField<std::string> ns;
+ static const BSONField<BSONObj> min;
+ static const BSONField<BSONObj> max;
+ static const BSONField<std::string> fromShard;
+ static const BSONField<std::string> toShard;
+ static const BSONField<std::string> chunkVersionField;
+ static const BSONField<std::string> collectionVersionField;
+
+ /**
+ * The Balancer encapsulates migration information in MigrateInfo objects, so this facilitates
+ * conversion to a config.migrations entry format.
+ */
+ explicit MigrationType(MigrateInfo info);
+
+ /**
+ * Constructs a new MigrationType object from BSON. Expects all fields to be present, and errors
+ * if they are not.
+ */
+ static StatusWith<MigrationType> fromBSON(const BSONObj& source);
+
+ /**
+ * Returns the BSON representation of the config.migrations document entry.
+ */
+ BSONObj toBSON() const;
+
+ /**
+ * Helper function for the Balancer that uses MigrateInfo objects to schedule migrations.
+ */
+ MigrateInfo toMigrateInfo() const;
+
+ /**
+ * Uniquely identifies a chunk by collection and min key.
+ */
+ std::string getName() const;
+
+private:
+ MigrationType();
+
+ // Required fields for config.migrations.
+ boost::optional<NamespaceString> _nss;
+ boost::optional<BSONObj> _min;
+ boost::optional<BSONObj> _max;
+ boost::optional<ShardId> _fromShard;
+ boost::optional<ShardId> _toShard;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/balancer/type_migration_test.cpp b/src/mongo/s/balancer/type_migration_test.cpp
new file mode 100644
index 00000000000..d3a352301aa
--- /dev/null
+++ b/src/mongo/s/balancer/type_migration_test.cpp
@@ -0,0 +1,165 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/jsobj.h"
+#include "mongo/s/balancer/type_migration.h"
+#include "mongo/s/catalog/type_chunk.h"
+
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using unittest::assertGet;
+
+const std::string kName = "TestDB.TestColl-a_10";
+const std::string kNs = "TestDB.TestColl";
+const BSONObj kMin = BSON("a" << 10);
+const BSONObj kMax = BSON("a" << 20);
+const ShardId kFromShard("shard0000");
+const ShardId kToShard("shard0001");
+
+TEST(MigrationTypeTest, ConvertFromMigrationInfo) {
+ const ChunkVersion version(1, 2, OID::gen());
+
+ BSONObjBuilder chunkBuilder;
+ chunkBuilder.append(ChunkType::name(), kName);
+ chunkBuilder.append(ChunkType::ns(), kNs);
+ chunkBuilder.append(ChunkType::min(), kMin);
+ chunkBuilder.append(ChunkType::max(), kMax);
+ version.appendForChunk(&chunkBuilder);
+ chunkBuilder.append(ChunkType::shard(), kFromShard.toString());
+
+ ChunkType chunkType = assertGet(ChunkType::fromBSON(chunkBuilder.obj()));
+ ASSERT_OK(chunkType.validate());
+
+ MigrateInfo migrateInfo(kNs, kToShard, chunkType);
+ MigrationType migrationType(migrateInfo);
+
+ BSONObjBuilder builder;
+ builder.append(MigrationType::name(), kName);
+ builder.append(MigrationType::ns(), kNs);
+ builder.append(MigrationType::min(), kMin);
+ builder.append(MigrationType::max(), kMax);
+ builder.append(MigrationType::fromShard(), kFromShard.toString());
+ builder.append(MigrationType::toShard(), kToShard.toString());
+
+ BSONObj obj = builder.obj();
+
+ ASSERT_BSONOBJ_EQ(obj, migrationType.toBSON());
+}
+
+TEST(MigrationTypeTest, FromAndToBSON) {
+ BSONObjBuilder builder;
+ builder.append(MigrationType::name(), kName);
+ builder.append(MigrationType::ns(), kNs);
+ builder.append(MigrationType::min(), kMin);
+ builder.append(MigrationType::max(), kMax);
+ builder.append(MigrationType::fromShard(), kFromShard.toString());
+ builder.append(MigrationType::toShard(), kToShard.toString());
+
+ BSONObj obj = builder.obj();
+
+ MigrationType migrationType = assertGet(MigrationType::fromBSON(obj));
+ ASSERT_BSONOBJ_EQ(obj, migrationType.toBSON());
+}
+
+TEST(MigrationTypeTest, MissingRequiredNamespaceField) {
+ BSONObjBuilder builder;
+ builder.append(MigrationType::min(), kMin);
+ builder.append(MigrationType::max(), kMax);
+ builder.append(MigrationType::fromShard(), kFromShard.toString());
+ builder.append(MigrationType::toShard(), kToShard.toString());
+
+ BSONObj obj = builder.obj();
+
+ StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj);
+ ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey);
+ ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), MigrationType::ns.name());
+}
+
+TEST(MigrationTypeTest, MissingRequiredMinField) {
+ BSONObjBuilder builder;
+ builder.append(MigrationType::ns(), kNs);
+ builder.append(MigrationType::max(), kMax);
+ builder.append(MigrationType::fromShard(), kFromShard.toString());
+ builder.append(MigrationType::toShard(), kToShard.toString());
+
+ BSONObj obj = builder.obj();
+
+ StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj);
+ ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey);
+ ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), MigrationType::min.name());
+}
+
+TEST(MigrationTypeTest, MissingRequiredMaxField) {
+ BSONObjBuilder builder;
+ builder.append(MigrationType::ns(), kNs);
+ builder.append(MigrationType::min(), kMin);
+ builder.append(MigrationType::fromShard(), kFromShard.toString());
+ builder.append(MigrationType::toShard(), kToShard.toString());
+
+ BSONObj obj = builder.obj();
+
+ StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj);
+ ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey);
+ ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), MigrationType::max.name());
+}
+
+TEST(MigrationTypeTest, MissingRequiredFromShardField) {
+ BSONObjBuilder builder;
+ builder.append(MigrationType::ns(), kNs);
+ builder.append(MigrationType::min(), kMin);
+ builder.append(MigrationType::max(), kMax);
+ builder.append(MigrationType::toShard(), kToShard.toString());
+
+ BSONObj obj = builder.obj();
+
+ StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj);
+ ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey);
+ ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), MigrationType::fromShard.name());
+}
+
+TEST(MigrationTypeTest, MissingRequiredToShardField) {
+ BSONObjBuilder builder;
+ builder.append(MigrationType::ns(), kNs);
+ builder.append(MigrationType::min(), kMin);
+ builder.append(MigrationType::max(), kMax);
+ builder.append(MigrationType::fromShard(), kFromShard.toString());
+
+ BSONObj obj = builder.obj();
+
+ StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj);
+ ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey);
+ ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), MigrationType::toShard.name());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/catalog/replset_dist_lock_manager_test.cpp b/src/mongo/s/catalog/replset_dist_lock_manager_test.cpp
index 4862d1eeae9..02c9c8f6b1f 100644
--- a/src/mongo/s/catalog/replset_dist_lock_manager_test.cpp
+++ b/src/mongo/s/catalog/replset_dist_lock_manager_test.cpp
@@ -47,7 +47,7 @@
#include "mongo/db/service_context_noop.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/dist_lock_catalog_mock.h"
#include "mongo/s/catalog/replset_dist_lock_manager.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp
index 025b573e601..cc4220e2e96 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp
@@ -50,14 +50,14 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/s/balancer/balancer_policy.h"
-#include "mongo/db/s/balancer/type_migration.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/db/wire_version.h"
#include "mongo/executor/network_interface.h"
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/balancer/balancer_policy.h"
+#include "mongo/s/balancer/type_migration.h"
#include "mongo/s/catalog/config_server_version.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_collection.h"
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index 2b091b89e42..bcdd2dd09c7 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -37,7 +37,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/lasterror.h"
#include "mongo/platform/random.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/chunk_manager.h"
diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp
index d1b9a1490f4..bf9998a48a5 100644
--- a/src/mongo/s/chunk_manager.cpp
+++ b/src/mongo/s/chunk_manager.cpp
@@ -49,7 +49,7 @@
#include "mongo/db/query/query_planner.h"
#include "mongo/db/query/query_planner_common.h"
#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_collection.h"
diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp
index f7e8f460148..cf1e8df7a41 100644
--- a/src/mongo/s/cluster_write.cpp
+++ b/src/mongo/s/cluster_write.cpp
@@ -38,7 +38,7 @@
#include "mongo/base/status.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/write_concern_options.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/chunk_manager.h"
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index a86d3b6c2f0..73f1860118d 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -38,7 +38,7 @@
#include "mongo/db/commands/find_and_modify.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard_connection.h"
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index b2b863d7201..bd8c6c5ace2 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -41,7 +41,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/mr.h"
#include "mongo/db/query/collation/collation_spec.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/dist_lock_manager.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
index 1040669d275..4236beb6121 100644
--- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
@@ -38,7 +38,7 @@
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/write_concern_options.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
index 6c374d4f6e1..255578c28bd 100644
--- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
+++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
@@ -49,7 +49,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/write_concern_options.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/chunk_manager.h"
diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp
index 36f7a97e55d..5705aee7740 100644
--- a/src/mongo/s/config.cpp
+++ b/src/mongo/s/config.cpp
@@ -37,7 +37,7 @@
#include "mongo/db/lasterror.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/write_concern.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_chunk.h"
diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp
index 563c88d807b..9a8f8a33dfb 100644
--- a/src/mongo/s/config_server_test_fixture.cpp
+++ b/src/mongo/s/config_server_test_fixture.cpp
@@ -46,12 +46,13 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_settings.h"
+#include "mongo/db/service_context_noop.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/dist_lock_catalog_impl.h"
#include "mongo/s/catalog/replset_dist_lock_manager.h"
diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp
index 98853d566f4..fbc973788d9 100644
--- a/src/mongo/s/grid.cpp
+++ b/src/mongo/s/grid.cpp
@@ -36,7 +36,7 @@
#include "mongo/db/server_options.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/sharding_catalog_manager.h"
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 6e446427982..3a2bce364e5 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -62,7 +62,7 @@
#include "mongo/db/wire_version.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/platform/process_id.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/client/shard_connection.h"
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 97b49a46c8f..51b30053305 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -46,7 +46,7 @@
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/rpc/metadata/config_server_metadata.h"
#include "mongo/rpc/metadata/metadata_hook.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/dist_lock_catalog_impl.h"
#include "mongo/s/catalog/replset_dist_lock_manager.h"
diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp
index c905fe3cdbd..2c37be980a9 100644
--- a/src/mongo/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/s/sharding_mongod_test_fixture.cpp
@@ -54,7 +54,7 @@
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/dist_lock_catalog.h"
#include "mongo/s/catalog/dist_lock_manager.h"
diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp
index 0f927c81614..c7ab3484da3 100644
--- a/src/mongo/s/sharding_test_fixture.cpp
+++ b/src/mongo/s/sharding_test_fixture.cpp
@@ -49,7 +49,7 @@
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/dist_lock_manager_mock.h"
#include "mongo/s/catalog/sharding_catalog_client_impl.h"
diff --git a/src/mongo/s/sharding_uptime_reporter.cpp b/src/mongo/s/sharding_uptime_reporter.cpp
index ba5f3690822..0b0d78bc91b 100644
--- a/src/mongo/s/sharding_uptime_reporter.cpp
+++ b/src/mongo/s/sharding_uptime_reporter.cpp
@@ -34,7 +34,7 @@
#include "mongo/db/client.h"
#include "mongo/db/server_options.h"
-#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_mongos.h"
#include "mongo/s/grid.h"