diff options
Diffstat (limited to 'src/mongo/s')
44 files changed, 6398 insertions, 24 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 1983abd864d..91be5a20f2d 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -45,6 +45,7 @@ env.Library( env.Library( target='common', source=[ + 'balancer/type_migration.cpp', 'catalog/mongo_version_range.cpp', 'catalog/type_changelog.cpp', 'catalog/type_chunk.cpp', @@ -92,6 +93,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/query/collation/collator_factory_mock', + '$BUILD_DIR/mongo/db/service_context_noop_init', '$BUILD_DIR/mongo/client/remote_command_targeter_mock', '$BUILD_DIR/mongo/executor/network_test_env', '$BUILD_DIR/mongo/executor/task_executor_pool', @@ -153,6 +155,7 @@ env.CppUnitTest( env.CppUnitTest( target='sharding_common_test', source=[ + 'balancer/type_migration_test.cpp', 'catalog/type_changelog_test.cpp', 'catalog/type_chunk_test.cpp', 'catalog/type_collection_test.cpp', @@ -249,11 +252,20 @@ env.CppUnitTest('request_types_test', ], ) -# This library contains sharding functionality used by both mongod and mongos +# This library contains sharding functionality used by both mongod and mongos. Certain tests, +# which exercise this functionality also link against it. env.Library( target='coreshard', source=[ - 'balancer_configuration.cpp', + 'balancer/balancer.cpp', + 'balancer/balancer_chunk_selection_policy.cpp', + 'balancer/balancer_chunk_selection_policy_impl.cpp', + 'balancer/balancer_configuration.cpp', + 'balancer/balancer_policy.cpp', + 'balancer/cluster_statistics.cpp', + 'balancer/cluster_statistics_impl.cpp', + 'balancer/migration_manager.cpp', + 'balancer/scoped_migration_request.cpp', 'catalog/catalog_cache.cpp', 'chunk.cpp', 'chunk_manager.cpp', @@ -317,11 +329,15 @@ env.Library( ) env.CppUnitTest( - target='balancer_configuration_test', + target='balancer_test', source=[ - 'balancer_configuration_test.cpp', + 'balancer/balancer_configuration_test.cpp', + 'balancer/balancer_policy_tests.cpp', + 'balancer/cluster_statistics_test.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/service_context_noop_init', 'coreshard', 'sharding_test_fixture', ] @@ -338,6 +354,19 @@ env.CppUnitTest( ] ) +env.CppUnitTest( + target='migration_manager_test', + source=[ + 'balancer/migration_manager_test.cpp', + 'balancer/scoped_migration_request_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/util/version_impl', + 'config_server_test_fixture', + 'coreshard', + ] +) + env.Library( target='local_sharding_info', source=[ diff --git a/src/mongo/s/balancer/balancer.cpp b/src/mongo/s/balancer/balancer.cpp new file mode 100644 index 00000000000..40982f8cd03 --- /dev/null +++ b/src/mongo/s/balancer/balancer.cpp @@ -0,0 +1,633 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/balancer.h" + +#include <algorithm> +#include <string> + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/read_preference.h" +#include "mongo/db/client.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/s/balancer/balancer_chunk_selection_policy_impl.h" +#include "mongo/s/balancer/balancer_configuration.h" +#include "mongo/s/balancer/cluster_statistics_impl.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_identity_loader.h" +#include "mongo/s/grid.h" +#include "mongo/s/shard_util.h" +#include "mongo/s/sharding_raii.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/log.h" +#include "mongo/util/timer.h" +#include "mongo/util/version.h" + +namespace mongo { + +using std::map; +using std::string; +using std::vector; + +namespace { + +const Seconds kBalanceRoundDefaultInterval(10); +const Seconds kShortBalanceRoundInterval(1); + +const auto getBalancer = ServiceContext::declareDecoration<std::unique_ptr<Balancer>>(); + +/** + * Utility class to generate timing and statistics for a single balancer round. + */ +class BalanceRoundDetails { +public: + BalanceRoundDetails() : _executionTimer() {} + + void setSucceeded(int candidateChunks, int chunksMoved) { + invariant(!_errMsg); + _candidateChunks = candidateChunks; + _chunksMoved = chunksMoved; + } + + void setFailed(const string& errMsg) { + _errMsg = errMsg; + } + + BSONObj toBSON() const { + BSONObjBuilder builder; + builder.append("executionTimeMillis", _executionTimer.millis()); + builder.append("errorOccured", _errMsg.is_initialized()); + + if (_errMsg) { + builder.append("errmsg", *_errMsg); + } else { + builder.append("candidateChunks", _candidateChunks); + builder.append("chunksMoved", _chunksMoved); + } + + return builder.obj(); + } + +private: + const Timer _executionTimer; + + // Set only on success + int _candidateChunks{0}; + int _chunksMoved{0}; + + // Set only on failure + boost::optional<string> _errMsg; +}; + +/** + * Occasionally prints a log message with shard versions if the versions are not the same + * in the cluster. + */ +void warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics>& clusterStats) { + + auto&& vii = VersionInfoInterface::instance(); + + bool isMultiVersion = false; + for (const auto& stat : clusterStats) { + if (!vii.isSameMajorVersion(stat.mongoVersion.c_str())) { + isMultiVersion = true; + break; + } + } + + // If we're all the same version, don't message + if (!isMultiVersion) + return; + + StringBuilder sb; + sb << "Multi version cluster detected. Local version: " << vii.version() + << ", shard versions: "; + + for (const auto& stat : clusterStats) { + sb << stat.shardId << " is at " << stat.mongoVersion << "; "; + } + + warning() << sb.str(); +} + +} // namespace + +Balancer::Balancer(ServiceContext* serviceContext) + : _balancedLastTime(0), + _clusterStats(stdx::make_unique<ClusterStatisticsImpl>()), + _chunkSelectionPolicy( + stdx::make_unique<BalancerChunkSelectionPolicyImpl>(_clusterStats.get())), + _migrationManager(serviceContext) {} + +Balancer::~Balancer() { + // The balancer thread must have been stopped + stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + invariant(_state == kStopped); +} + +void Balancer::create(ServiceContext* serviceContext) { + invariant(!getBalancer(serviceContext)); + getBalancer(serviceContext) = stdx::make_unique<Balancer>(serviceContext); +} + +Balancer* Balancer::get(ServiceContext* serviceContext) { + return getBalancer(serviceContext).get(); +} + +Balancer* Balancer::get(OperationContext* operationContext) { + return get(operationContext->getServiceContext()); +} + +void Balancer::onTransitionToPrimary(OperationContext* txn) { + stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + invariant(_state == kStopped); + _state = kRunning; + + _migrationManager.startRecoveryAndAcquireDistLocks(txn); + + invariant(!_thread.joinable()); + invariant(!_threadOperationContext); + _thread = stdx::thread([this] { _mainThread(); }); +} + +void Balancer::onStepDownFromPrimary() { + stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + if (_state != kRunning) + return; + + _state = kStopping; + + // Interrupt the balancer thread if it has been started. We are guaranteed that the operation + // context of that thread is still alive, because we hold the balancer mutex. + if (_threadOperationContext) { + stdx::lock_guard<Client> scopedClientLock(*_threadOperationContext->getClient()); + _threadOperationContext->markKilled(ErrorCodes::InterruptedDueToReplStateChange); + } + + // Schedule a separate thread to shutdown the migration manager in order to avoid deadlock with + // replication step down + invariant(!_migrationManagerInterruptThread.joinable()); + _migrationManagerInterruptThread = + stdx::thread([this] { _migrationManager.interruptAndDisableMigrations(); }); + + _condVar.notify_all(); +} + +void Balancer::onDrainComplete(OperationContext* txn) { + invariant(!txn->lockState()->isLocked()); + + { + stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + if (_state == kStopped) + return; + + invariant(_state == kStopping); + invariant(_thread.joinable()); + } + + _thread.join(); + + stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + _state = kStopped; + _thread = {}; + + LOG(1) << "Balancer thread terminated"; +} + +void Balancer::joinCurrentRound(OperationContext* txn) { + stdx::unique_lock<stdx::mutex> scopedLock(_mutex); + const auto numRoundsAtStart = _numBalancerRounds; + _condVar.wait(scopedLock, + [&] { return !_inBalancerRound || _numBalancerRounds != numRoundsAtStart; }); +} + +Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& chunk) { + auto migrateStatus = _chunkSelectionPolicy->selectSpecificChunkToMove(txn, chunk); + if (!migrateStatus.isOK()) { + return migrateStatus.getStatus(); + } + + auto migrateInfo = std::move(migrateStatus.getValue()); + if (!migrateInfo) { + LOG(1) << "Unable to find more appropriate location for chunk " << redact(chunk.toString()); + return Status::OK(); + } + + auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); + Status refreshStatus = balancerConfig->refreshAndCheck(txn); + if (!refreshStatus.isOK()) { + return refreshStatus; + } + + return _migrationManager.executeManualMigration(txn, + *migrateInfo, + balancerConfig->getMaxChunkSizeBytes(), + balancerConfig->getSecondaryThrottle(), + balancerConfig->waitForDelete()); +} + +Status Balancer::moveSingleChunk(OperationContext* txn, + const ChunkType& chunk, + const ShardId& newShardId, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete) { + auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(txn, chunk, newShardId); + if (!moveAllowedStatus.isOK()) { + return moveAllowedStatus; + } + + return _migrationManager.executeManualMigration(txn, + MigrateInfo(chunk.getNS(), newShardId, chunk), + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete); +} + +void Balancer::report(OperationContext* txn, BSONObjBuilder* builder) { + auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); + balancerConfig->refreshAndCheck(txn); + + const auto mode = balancerConfig->getBalancerMode(); + + stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + builder->append("mode", BalancerSettingsType::kBalancerModes[mode]); + builder->append("inBalancerRound", _inBalancerRound); + builder->append("numBalancerRounds", _numBalancerRounds); +} + +void Balancer::_mainThread() { + Client::initThread("Balancer"); + auto txn = cc().makeOperationContext(); + auto shardingContext = Grid::get(txn.get()); + + log() << "CSRS balancer is starting"; + + { + stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + _threadOperationContext = txn.get(); + } + + const Seconds kInitBackoffInterval(10); + + // Take the balancer distributed lock and hold it permanently. Do the attempts with single + // attempts in order to not block the thread and be able to check for interrupt more frequently. + while (!_stopRequested()) { + auto distLockHandleStatus = + shardingContext->catalogClient(txn.get())->getDistLockManager()->lockWithSessionID( + txn.get(), + "balancer", + "CSRS Balancer", + OID::gen(), + DistLockManager::kSingleLockAttemptTimeout); + if (!distLockHandleStatus.isOK()) { + warning() << "Balancer distributed lock could not be acquired and will be retried in " + << durationCount<Seconds>(kInitBackoffInterval) << " seconds" + << causedBy(distLockHandleStatus.getStatus()); + + _sleepFor(txn.get(), kInitBackoffInterval); + continue; + } + + break; + } + + log() << "CSRS balancer thread is recovering"; + + auto balancerConfig = Grid::get(txn.get())->getBalancerConfiguration(); + _migrationManager.finishRecovery(txn.get(), + balancerConfig->getMaxChunkSizeBytes(), + balancerConfig->getSecondaryThrottle(), + balancerConfig->waitForDelete()); + + log() << "CSRS balancer thread is recovered"; + + // Main balancer loop + while (!_stopRequested()) { + auto balancerConfig = shardingContext->getBalancerConfiguration(); + + BalanceRoundDetails roundDetails; + + _beginRound(txn.get()); + + try { + shardingContext->shardRegistry()->reload(txn.get()); + + uassert(13258, "oids broken after resetting!", _checkOIDs(txn.get())); + + Status refreshStatus = balancerConfig->refreshAndCheck(txn.get()); + if (!refreshStatus.isOK()) { + warning() << "Skipping balancing round" << causedBy(refreshStatus); + _endRound(txn.get(), kBalanceRoundDefaultInterval); + continue; + } + + if (!balancerConfig->shouldBalance()) { + LOG(1) << "Skipping balancing round because balancing is disabled"; + _endRound(txn.get(), kBalanceRoundDefaultInterval); + continue; + } + + { + LOG(1) << "*** start balancing round. " + << "waitForDelete: " << balancerConfig->waitForDelete() + << ", secondaryThrottle: " + << balancerConfig->getSecondaryThrottle().toBSON(); + + OCCASIONALLY warnOnMultiVersion( + uassertStatusOK(_clusterStats->getStats(txn.get()))); + + Status status = _enforceTagRanges(txn.get()); + if (!status.isOK()) { + warning() << "Failed to enforce tag ranges" << causedBy(status); + } else { + LOG(1) << "Done enforcing tag range boundaries."; + } + + const auto candidateChunks = uassertStatusOK( + _chunkSelectionPolicy->selectChunksToMove(txn.get(), _balancedLastTime)); + + if (candidateChunks.empty()) { + LOG(1) << "no need to move any chunk"; + _balancedLastTime = false; + } else { + _balancedLastTime = _moveChunks(txn.get(), candidateChunks); + + roundDetails.setSucceeded(static_cast<int>(candidateChunks.size()), + _balancedLastTime); + + shardingContext->catalogClient(txn.get())->logAction( + txn.get(), "balancer.round", "", roundDetails.toBSON()); + } + + LOG(1) << "*** End of balancing round"; + } + + _endRound(txn.get(), + _balancedLastTime ? kShortBalanceRoundInterval + : kBalanceRoundDefaultInterval); + } catch (const std::exception& e) { + log() << "caught exception while doing balance: " << e.what(); + + // Just to match the opening statement if in log level 1 + LOG(1) << "*** End of balancing round"; + + // This round failed, tell the world! + roundDetails.setFailed(e.what()); + + shardingContext->catalogClient(txn.get())->logAction( + txn.get(), "balancer.round", "", roundDetails.toBSON()); + + // Sleep a fair amount before retrying because of the error + _endRound(txn.get(), kBalanceRoundDefaultInterval); + } + } + + { + stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + invariant(_state == kStopping); + invariant(_migrationManagerInterruptThread.joinable()); + } + + _migrationManagerInterruptThread.join(); + _migrationManager.drainActiveMigrations(); + + { + stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + _migrationManagerInterruptThread = {}; + _threadOperationContext = nullptr; + } + + log() << "CSRS balancer is now stopped"; +} + +bool Balancer::_stopRequested() { + stdx::lock_guard<stdx::mutex> scopedLock(_mutex); + return (_state != kRunning); +} + +void Balancer::_beginRound(OperationContext* txn) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _inBalancerRound = true; + _condVar.notify_all(); +} + +void Balancer::_endRound(OperationContext* txn, Seconds waitTimeout) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + _inBalancerRound = false; + _numBalancerRounds++; + _condVar.notify_all(); + } + + _sleepFor(txn, waitTimeout); +} + +void Balancer::_sleepFor(OperationContext* txn, Seconds waitTimeout) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _condVar.wait_for(lock, waitTimeout.toSystemDuration(), [&] { return _state != kRunning; }); +} + +bool Balancer::_checkOIDs(OperationContext* txn) { + auto shardingContext = Grid::get(txn); + + vector<ShardId> all; + shardingContext->shardRegistry()->getAllShardIds(&all); + + // map of OID machine ID => shardId + map<int, ShardId> oids; + + for (const ShardId& shardId : all) { + if (_stopRequested()) { + return false; + } + + auto shardStatus = shardingContext->shardRegistry()->getShard(txn, shardId); + if (!shardStatus.isOK()) { + continue; + } + const auto s = shardStatus.getValue(); + + auto result = uassertStatusOK( + s->runCommandWithFixedRetryAttempts(txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + BSON("features" << 1), + Shard::RetryPolicy::kIdempotent)); + uassertStatusOK(result.commandStatus); + BSONObj f = std::move(result.response); + + if (f["oidMachine"].isNumber()) { + int x = f["oidMachine"].numberInt(); + if (oids.count(x) == 0) { + oids[x] = shardId; + } else { + log() << "error: 2 machines have " << x << " as oid machine piece: " << shardId + << " and " << oids[x]; + + result = uassertStatusOK(s->runCommandWithFixedRetryAttempts( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + BSON("features" << 1 << "oidReset" << 1), + Shard::RetryPolicy::kIdempotent)); + uassertStatusOK(result.commandStatus); + + auto otherShardStatus = shardingContext->shardRegistry()->getShard(txn, oids[x]); + if (otherShardStatus.isOK()) { + result = uassertStatusOK( + otherShardStatus.getValue()->runCommandWithFixedRetryAttempts( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + BSON("features" << 1 << "oidReset" << 1), + Shard::RetryPolicy::kIdempotent)); + uassertStatusOK(result.commandStatus); + } + + return false; + } + } else { + log() << "warning: oidMachine not set on: " << s->toString(); + } + } + + return true; +} + +Status Balancer::_enforceTagRanges(OperationContext* txn) { + auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(txn); + if (!chunksToSplitStatus.isOK()) { + return chunksToSplitStatus.getStatus(); + } + + for (const auto& splitInfo : chunksToSplitStatus.getValue()) { + auto scopedCMStatus = ScopedChunkManager::getExisting(txn, splitInfo.nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); + } + + auto scopedCM = std::move(scopedCMStatus.getValue()); + ChunkManager* const cm = scopedCM.cm(); + + auto splitStatus = shardutil::splitChunkAtMultiplePoints(txn, + splitInfo.shardId, + splitInfo.nss, + cm->getShardKeyPattern(), + splitInfo.collectionVersion, + splitInfo.minKey, + splitInfo.maxKey, + splitInfo.chunkVersion, + splitInfo.splitKeys); + if (!splitStatus.isOK()) { + warning() << "Failed to enforce tag range for chunk " << redact(splitInfo.toString()) + << causedBy(redact(splitStatus.getStatus())); + } + + cm->reload(txn); + } + + return Status::OK(); +} + +int Balancer::_moveChunks(OperationContext* txn, + const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateChunks) { + auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); + + // If the balancer was disabled since we started this round, don't start new chunk moves + if (_stopRequested() || !balancerConfig->shouldBalance()) { + LOG(1) << "Skipping balancing round because balancer was stopped"; + return 0; + } + + auto migrationStatuses = + _migrationManager.executeMigrationsForAutoBalance(txn, + candidateChunks, + balancerConfig->getMaxChunkSizeBytes(), + balancerConfig->getSecondaryThrottle(), + balancerConfig->waitForDelete()); + + int numChunksProcessed = 0; + + for (const auto& migrationStatusEntry : migrationStatuses) { + const Status& status = migrationStatusEntry.second; + if (status.isOK()) { + numChunksProcessed++; + continue; + } + + const MigrationIdentifier& migrationId = migrationStatusEntry.first; + + if (status == ErrorCodes::ChunkTooBig) { + numChunksProcessed++; + + auto failedRequestIt = std::find_if(candidateChunks.begin(), + candidateChunks.end(), + [&migrationId](const MigrateInfo& migrateInfo) { + return migrateInfo.getName() == migrationId; + }); + invariant(failedRequestIt != candidateChunks.end()); + + log() << "Performing a split because migration " << failedRequestIt->toString() + << " failed for size reasons" << causedBy(status); + + _splitOrMarkJumbo(txn, NamespaceString(failedRequestIt->ns), failedRequestIt->minKey); + continue; + } + + log() << "Balancer move " << migrationId << " failed" << causedBy(status); + } + + return numChunksProcessed; +} + +void Balancer::_splitOrMarkJumbo(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& minKey) { + auto scopedChunkManager = uassertStatusOK(ScopedChunkManager::getExisting(txn, nss)); + ChunkManager* const chunkManager = scopedChunkManager.cm(); + + auto chunk = chunkManager->findIntersectingChunkWithSimpleCollation(txn, minKey); + + auto splitStatus = chunk->split(txn, Chunk::normal, nullptr); + if (!splitStatus.isOK()) { + log() << "Marking chunk " << chunk->toString() << " as jumbo."; + chunk->markAsJumbo(txn); + } +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/balancer.h b/src/mongo/s/balancer/balancer.h new file mode 100644 index 00000000000..bc18818484d --- /dev/null +++ b/src/mongo/s/balancer/balancer.h @@ -0,0 +1,250 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/s/balancer/balancer_chunk_selection_policy.h" +#include "mongo/s/balancer/migration_manager.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" + +namespace mongo { + +class ChunkType; +class ClusterStatistics; +class MigrationSecondaryThrottleOptions; +class OperationContext; +class ServiceContext; +class Status; + +/** + * The balancer is a background task that tries to keep the number of chunks across all + * servers of the cluster even. Although every mongos will have one balancer running, only one + * of them will be active at the any given point in time. The balancer uses a distributed lock + * for that coordination. + * + * The balancer does act continuously but in "rounds". At a given round, it would decide if + * there is an imbalance by checking the difference in chunks between the most and least + * loaded shards. It would issue a request for a chunk migration per round, if it found so. + */ +class Balancer { + MONGO_DISALLOW_COPYING(Balancer); + +public: + Balancer(ServiceContext* serviceContext); + ~Balancer(); + + /** + * Instantiates an instance of the balancer and installs it on the specified service context. + * This method is not thread-safe and must be called only once when the service is starting. + */ + static void create(ServiceContext* serviceContext); + + /** + * Retrieves the per-service instance of the Balancer. + */ + static Balancer* get(ServiceContext* serviceContext); + static Balancer* get(OperationContext* operationContext); + + /** + * Invoked when the config server primary enters the 'PRIMARY' state and is invoked while the + * caller is holding the global X lock. Kicks off the main balancer thread and returns + * immediately. + * + * Must only be called if the balancer is in the stopped state (i.e., just constructed or + * onDrainComplete has been called before). Any code in this call must not try to acquire any + * locks or to wait on operations, which acquire locks. + */ + void onTransitionToPrimary(OperationContext* txn); + + /** + * Invoked when this node which is currently serving as a 'PRIMARY' steps down and is invoked + * while the global X lock is held. Requests the main balancer thread to stop and returns + * immediately without waiting for it to terminate. + * + * This method might be called multiple times in succession, which is what happens as a result + * of incomplete transition to primary so it is resilient to that. + * + * The onDrainComplete method must be called afterwards in order to wait for the main balancer + * thread to terminate and to allow onTransitionToPrimary to be called again. + */ + void onStepDownFromPrimary(); + + /** + * Invoked when a node on its way to becoming a primary finishes draining and is about to + * acquire the global X lock in order to allow writes. Waits for the balancer thread to + * terminate and primes the balancer so that onTransitionToPrimary can be called. + * + * This method is called without any locks held. + */ + void onDrainComplete(OperationContext* txn); + + /** + * Potentially blocking method, which will return immediately if the balancer is not running a + * balancer round and will block until the current round completes otherwise. + */ + void joinCurrentRound(OperationContext* txn); + + /** + * Blocking call, which requests the balancer to move a single chunk to a more appropriate + * shard, in accordance with the active balancer policy. It is not guaranteed that the chunk + * will actually move because it may already be at the best shard. An error will be returned if + * the attempt to find a better shard or the actual migration fail for any reason. + */ + Status rebalanceSingleChunk(OperationContext* txn, const ChunkType& chunk); + + /** + * Blocking call, which requests the balancer to move a single chunk to the specified location + * in accordance with the active balancer policy. An error will be returned if the attempt to + * move fails for any reason. + * + * NOTE: This call disregards the balancer enabled/disabled status and will proceed with the + * move regardless. If should be used only for user-initiated moves. + */ + Status moveSingleChunk(OperationContext* txn, + const ChunkType& chunk, + const ShardId& newShardId, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete); + + /** + * Appends the runtime state of the balancer instance to the specified builder. + */ + void report(OperationContext* txn, BSONObjBuilder* builder); + +private: + /** + * Possible runtime states of the balancer. The comments indicate the allowed next state. + */ + enum State { + kStopped, // kRunning + kRunning, // kStopping + kStopping, // kStopped + }; + + /** + * The main balancer loop, which runs in a separate thread. + */ + void _mainThread(); + + /** + * Checks whether the balancer main thread has been requested to stop. + */ + bool _stopRequested(); + + /** + * Signals the beginning and end of a balancing round. + */ + void _beginRound(OperationContext* txn); + void _endRound(OperationContext* txn, Seconds waitTimeout); + + /** + * Blocks the caller for the specified timeout or until the balancer condition variable is + * signaled, whichever comes first. + */ + void _sleepFor(OperationContext* txn, Seconds waitTimeout); + + /** + * Returns true if all the servers listed in configdb as being shards are reachable and are + * distinct processes (no hostname mixup). + */ + bool _checkOIDs(OperationContext* txn); + + /** + * Iterates through all chunks in all collections and ensures that no chunks straddle tag + * boundary. If any do, they will be split. + */ + Status _enforceTagRanges(OperationContext* txn); + + /** + * Schedules migrations for the specified set of chunks and returns how many chunks were + * successfully processed. + */ + int _moveChunks(OperationContext* txn, + const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateChunks); + + /** + * Performs a split on the chunk with min value "minKey". If the split fails, it is marked as + * jumbo. + */ + void _splitOrMarkJumbo(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& minKey); + + // Protects the state below + stdx::mutex _mutex; + + // Indicates the current state of the balancer + State _state{kStopped}; + + // The main balancer thread + stdx::thread _thread; + + // The operation context of the main balancer thread. This value may only be available in the + // kRunning state and is used to force interrupt of any blocking calls made by the balancer + // thread. + OperationContext* _threadOperationContext{nullptr}; + + // This thread is only available in the kStopping state and is necessary for the migration + // manager shutdown to not deadlock with replica set step down. In particular, the migration + // manager's order of lock acquisition is mutex, then collection lock, whereas stepdown first + // acquires the global S lock and then acquires the migration manager's mutex. + // + // The interrupt thread is scheduled when the balancer enters the kStopping state (which is at + // step down) and is joined outside of lock, when the replica set leaves draining mode, outside + // of the global X lock. + stdx::thread _migrationManagerInterruptThread; + + // Indicates whether the balancer is currently executing a balancer round + bool _inBalancerRound{false}; + + // Counts the number of balancing rounds performed since the balancer thread was first activated + int64_t _numBalancerRounds{0}; + + // Condition variable, which is signalled every time the above runtime state of the balancer + // changes (in particular, state/balancer round and number of balancer rounds). + stdx::condition_variable _condVar; + + // Number of moved chunks in last round + int _balancedLastTime; + + // Source for cluster statistics + std::unique_ptr<ClusterStatistics> _clusterStats; + + // Balancer policy. Depends on the cluster statistics instance above so it should be created + // after it and destroyed before it. + std::unique_ptr<BalancerChunkSelectionPolicy> _chunkSelectionPolicy; + + // Migration manager used to schedule and manage migrations + MigrationManager _migrationManager; +}; + +} // namespace mongo diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy.cpp b/src/mongo/s/balancer/balancer_chunk_selection_policy.cpp new file mode 100644 index 00000000000..2ee21b6ae87 --- /dev/null +++ b/src/mongo/s/balancer/balancer_chunk_selection_policy.cpp @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/balancer_chunk_selection_policy.h" + +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +BalancerChunkSelectionPolicy::BalancerChunkSelectionPolicy() = default; + +BalancerChunkSelectionPolicy::~BalancerChunkSelectionPolicy() = default; + +BalancerChunkSelectionPolicy::SplitInfo::SplitInfo(ShardId inShardId, + NamespaceString inNss, + ChunkVersion inCollectionVersion, + ChunkVersion inChunkVersion, + const BSONObj& inMinKey, + const BSONObj& inMaxKey, + std::vector<BSONObj> inSplitKeys) + : shardId(std::move(inShardId)), + nss(std::move(inNss)), + collectionVersion(inCollectionVersion), + chunkVersion(inChunkVersion), + minKey(inMinKey), + maxKey(inMaxKey), + splitKeys(std::move(inSplitKeys)) {} + +std::string BalancerChunkSelectionPolicy::SplitInfo::toString() const { + StringBuilder splitKeysBuilder; + for (const auto& splitKey : splitKeys) { + splitKeysBuilder << splitKey.toString() << ", "; + } + + return str::stream() << "Splitting chunk in " << nss.ns() << " [ " << minKey << ", " << maxKey + << "), residing on " << shardId << " at [ " << splitKeysBuilder.str() + << " ] with version " << chunkVersion.toString() + << " and collection version " << collectionVersion.toString(); +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy.h b/src/mongo/s/balancer/balancer_chunk_selection_policy.h new file mode 100644 index 00000000000..ac227183ddf --- /dev/null +++ b/src/mongo/s/balancer/balancer_chunk_selection_policy.h @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/s/balancer/balancer_policy.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/chunk_version.h" + +namespace mongo { + +class ChunkType; +class NamespaceString; +class OperationContext; +template <typename T> +class StatusWith; + +/** + * Interface used by the balancer for selecting chunks, which need to be moved around in order for + * the sharded cluster to be balanced. It is up to the implementation to decide what exactly + * 'balanced' means. + */ +class BalancerChunkSelectionPolicy { + MONGO_DISALLOW_COPYING(BalancerChunkSelectionPolicy); + +public: + /** + * Describes a chunk which needs to be split, because it violates the balancer policy. + */ + struct SplitInfo { + SplitInfo(ShardId shardId, + NamespaceString nss, + ChunkVersion collectionVersion, + ChunkVersion chunkVersion, + const BSONObj& minKey, + const BSONObj& maxKey, + std::vector<BSONObj> splitKeys); + + std::string toString() const; + + ShardId shardId; + NamespaceString nss; + ChunkVersion collectionVersion; + ChunkVersion chunkVersion; + BSONObj minKey; + BSONObj maxKey; + std::vector<BSONObj> splitKeys; + }; + + typedef std::vector<SplitInfo> SplitInfoVector; + + typedef std::vector<MigrateInfo> MigrateInfoVector; + + virtual ~BalancerChunkSelectionPolicy(); + + /** + * Potentially blocking method, which gives out a set of chunks, which need to be split because + * they violate the policy for some reason. The reason is decided by the policy and may include + * chunk is too big or chunk straddles a tag range. + */ + virtual StatusWith<SplitInfoVector> selectChunksToSplit(OperationContext* txn) = 0; + + /** + * Potentially blocking method, which gives out a set of chunks to be moved. The + * aggressiveBalanceHint indicates to the balancing logic that it should lower the threshold for + * difference in number of chunks across shards and thus potentially cause more chunks to move. + */ + virtual StatusWith<MigrateInfoVector> selectChunksToMove(OperationContext* txn, + bool aggressiveBalanceHint) = 0; + + /** + * Requests a single chunk to be relocated to a different shard, if possible. If some error + * occurs while trying to determine the best location for the chunk, a failed status is + * returned. If the chunk is already at the best shard that it can be, returns boost::none. + * Otherwise returns migration information for where the chunk should be moved. + */ + virtual StatusWith<boost::optional<MigrateInfo>> selectSpecificChunkToMove( + OperationContext* txn, const ChunkType& chunk) = 0; + + /** + * Asks the chunk selection policy to validate that the specified chunk migration is allowed + * given the current rules. Returns OK if the migration won't violate any rules or any other + * failed status otherwise. + */ + virtual Status checkMoveAllowed(OperationContext* txn, + const ChunkType& chunk, + const ShardId& newShardId) = 0; + +protected: + BalancerChunkSelectionPolicy(); +}; + +} // namespace mongo diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp new file mode 100644 index 00000000000..c8b4b8d115d --- /dev/null +++ b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -0,0 +1,418 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/balancer_chunk_selection_policy_impl.h" + +#include <set> +#include <vector> + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj_comparator_interface.h" +#include "mongo/s/catalog/catalog_cache.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_tags.h" +#include "mongo/s/grid.h" +#include "mongo/s/sharding_raii.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +using ChunkMinimumsSet = BSONObjSet; +using MigrateInfoVector = BalancerChunkSelectionPolicy::MigrateInfoVector; +using SplitInfoVector = BalancerChunkSelectionPolicy::SplitInfoVector; +using std::shared_ptr; +using std::unique_ptr; +using std::vector; + +namespace { + +/** + * Does a linear pass over the information cached in the specified chunk manager and extracts chunk + * distrubution and chunk placement information which is needed by the balancer policy. + */ +StatusWith<std::pair<DistributionStatus, ChunkMinimumsSet>> createCollectionDistributionInfo( + OperationContext* txn, const ShardStatisticsVector& allShards, ChunkManager* chunkMgr) { + ShardToChunksMap shardToChunksMap; + ChunkMinimumsSet chunkMinimums = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + + // Makes sure there is an entry in shardToChunksMap for every shard, so empty shards will also + // be accounted for + for (const auto& stat : allShards) { + shardToChunksMap[stat.shardId]; + } + + for (const auto& entry : chunkMgr->getChunkMap()) { + const auto& chunkEntry = entry.second; + + ChunkType chunk; + chunk.setMin(chunkEntry->getMin()); + chunk.setMax(chunkEntry->getMax()); + chunk.setJumbo(chunkEntry->isJumbo()); + chunk.setShard(chunkEntry->getShardId()); + + shardToChunksMap[chunkEntry->getShardId()].push_back(chunk); + chunkMinimums.insert(chunkEntry->getMin()); + } + + vector<TagsType> collectionTags; + Status tagsStatus = Grid::get(txn)->catalogClient(txn)->getTagsForCollection( + txn, chunkMgr->getns(), &collectionTags); + if (!tagsStatus.isOK()) { + return {tagsStatus.code(), + str::stream() << "Unable to load tags for collection " << chunkMgr->getns() + << " due to " + << tagsStatus.toString()}; + } + + DistributionStatus distribution(NamespaceString(chunkMgr->getns()), + std::move(shardToChunksMap)); + + // Cache the collection tags + const auto& keyPattern = chunkMgr->getShardKeyPattern().getKeyPattern(); + + for (const auto& tag : collectionTags) { + auto status = distribution.addRangeToZone( + ZoneRange(keyPattern.extendRangeBound(tag.getMinKey(), false), + keyPattern.extendRangeBound(tag.getMaxKey(), false), + tag.getTag())); + + if (!status.isOK()) { + return status; + } + } + + return std::make_pair(std::move(distribution), std::move(chunkMinimums)); +} + +} // namespace + +BalancerChunkSelectionPolicyImpl::BalancerChunkSelectionPolicyImpl(ClusterStatistics* clusterStats) + : _clusterStats(clusterStats) {} + +BalancerChunkSelectionPolicyImpl::~BalancerChunkSelectionPolicyImpl() = default; + +StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToSplit( + OperationContext* txn) { + auto shardStatsStatus = _clusterStats->getStats(txn); + if (!shardStatsStatus.isOK()) { + return shardStatsStatus.getStatus(); + } + + const auto shardStats = std::move(shardStatsStatus.getValue()); + + vector<CollectionType> collections; + + Status collsStatus = + Grid::get(txn)->catalogClient(txn)->getCollections(txn, nullptr, &collections, nullptr); + if (!collsStatus.isOK()) { + return collsStatus; + } + + if (collections.empty()) { + return SplitInfoVector{}; + } + + SplitInfoVector splitCandidates; + + for (const auto& coll : collections) { + const NamespaceString nss(coll.getNs()); + + auto candidatesStatus = _getSplitCandidatesForCollection(txn, nss, shardStats); + if (candidatesStatus == ErrorCodes::NamespaceNotFound) { + // Namespace got dropped before we managed to get to it, so just skip it + continue; + } else if (!candidatesStatus.isOK()) { + warning() << "Unable to enforce tag range policy for collection " << nss.ns() + << causedBy(candidatesStatus.getStatus()); + continue; + } + + splitCandidates.insert(splitCandidates.end(), + std::make_move_iterator(candidatesStatus.getValue().begin()), + std::make_move_iterator(candidatesStatus.getValue().end())); + } + + return splitCandidates; +} + +StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMove( + OperationContext* txn, bool aggressiveBalanceHint) { + auto shardStatsStatus = _clusterStats->getStats(txn); + if (!shardStatsStatus.isOK()) { + return shardStatsStatus.getStatus(); + } + + const auto shardStats = std::move(shardStatsStatus.getValue()); + + if (shardStats.size() < 2) { + return MigrateInfoVector{}; + } + + vector<CollectionType> collections; + + Status collsStatus = + Grid::get(txn)->catalogClient(txn)->getCollections(txn, nullptr, &collections, nullptr); + if (!collsStatus.isOK()) { + return collsStatus; + } + + if (collections.empty()) { + return MigrateInfoVector{}; + } + + MigrateInfoVector candidateChunks; + + for (const auto& coll : collections) { + const NamespaceString nss(coll.getNs()); + + if (!coll.getAllowBalance()) { + LOG(1) << "Not balancing collection " << nss << "; explicitly disabled."; + continue; + } + + auto candidatesStatus = + _getMigrateCandidatesForCollection(txn, nss, shardStats, aggressiveBalanceHint); + if (candidatesStatus == ErrorCodes::NamespaceNotFound) { + // Namespace got dropped before we managed to get to it, so just skip it + continue; + } else if (!candidatesStatus.isOK()) { + warning() << "Unable to balance collection " << nss.ns() + << causedBy(candidatesStatus.getStatus()); + continue; + } + + candidateChunks.insert(candidateChunks.end(), + std::make_move_iterator(candidatesStatus.getValue().begin()), + std::make_move_iterator(candidatesStatus.getValue().end())); + } + + return candidateChunks; +} + +StatusWith<boost::optional<MigrateInfo>> +BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* txn, + const ChunkType& chunk) { + auto shardStatsStatus = _clusterStats->getStats(txn); + if (!shardStatsStatus.isOK()) { + return shardStatsStatus.getStatus(); + } + + const auto shardStats = std::move(shardStatsStatus.getValue()); + + const NamespaceString nss(chunk.getNS()); + + auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); + } + + auto scopedCM = std::move(scopedCMStatus.getValue()); + ChunkManager* const cm = scopedCM.cm(); + + auto collInfoStatus = createCollectionDistributionInfo(txn, shardStats, cm); + if (!collInfoStatus.isOK()) { + return collInfoStatus.getStatus(); + } + + auto collInfo = std::move(collInfoStatus.getValue()); + + return BalancerPolicy::balanceSingleChunk(chunk, shardStats, std::get<0>(collInfo)); +} + +Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* txn, + const ChunkType& chunk, + const ShardId& newShardId) { + auto shardStatsStatus = _clusterStats->getStats(txn); + if (!shardStatsStatus.isOK()) { + return shardStatsStatus.getStatus(); + } + + auto shardStats = std::move(shardStatsStatus.getValue()); + + const NamespaceString nss(chunk.getNS()); + + auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); + } + + auto scopedCM = std::move(scopedCMStatus.getValue()); + ChunkManager* const cm = scopedCM.cm(); + + auto collInfoStatus = createCollectionDistributionInfo(txn, shardStats, cm); + if (!collInfoStatus.isOK()) { + return collInfoStatus.getStatus(); + } + + auto collInfo = std::move(collInfoStatus.getValue()); + + DistributionStatus distribution = std::move(std::get<0>(collInfo)); + + auto newShardIterator = + std::find_if(shardStats.begin(), + shardStats.end(), + [&newShardId](const ClusterStatistics::ShardStatistics& stat) { + return stat.shardId == newShardId; + }); + if (newShardIterator == shardStats.end()) { + return {ErrorCodes::ShardNotFound, + str::stream() << "Unable to find constraints information for shard " << newShardId + << ". Move to this shard will be disallowed."}; + } + + return BalancerPolicy::isShardSuitableReceiver(*newShardIterator, + distribution.getTagForChunk(chunk)); +} + +StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidatesForCollection( + OperationContext* txn, const NamespaceString& nss, const ShardStatisticsVector& shardStats) { + auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); + } + + auto scopedCM = std::move(scopedCMStatus.getValue()); + ChunkManager* const cm = scopedCM.cm(); + + auto collInfoStatus = createCollectionDistributionInfo(txn, shardStats, cm); + if (!collInfoStatus.isOK()) { + return collInfoStatus.getStatus(); + } + + auto collInfo = std::move(collInfoStatus.getValue()); + + DistributionStatus distribution = std::move(std::get<0>(collInfo)); + ChunkMinimumsSet allChunkMinimums = std::move(std::get<1>(collInfo)); + + SplitInfoVector splitCandidates; + + // Accumulate split points for the same chunk together + shared_ptr<Chunk> currentChunk; + vector<BSONObj> currentSplitVector; + + for (const auto& tagRangeEntry : distribution.tagRanges()) { + const auto& tagRange = tagRangeEntry.second; + + if (allChunkMinimums.count(tagRange.min)) { + continue; + } + + shared_ptr<Chunk> chunk = cm->findIntersectingChunkWithSimpleCollation(txn, tagRange.min); + + if (!currentChunk) { + currentChunk = chunk; + } + + invariant(currentChunk); + + if (chunk == currentChunk) { + currentSplitVector.push_back(tagRange.min); + } else { + splitCandidates.emplace_back(currentChunk->getShardId(), + nss, + cm->getVersion(), + currentChunk->getLastmod(), + currentChunk->getMin(), + currentChunk->getMax(), + std::move(currentSplitVector)); + + currentChunk = chunk; + currentSplitVector.push_back(tagRange.min); + } + } + + // Drain the current split vector if there are any entries left + if (currentChunk) { + invariant(!currentSplitVector.empty()); + + splitCandidates.emplace_back(currentChunk->getShardId(), + nss, + cm->getVersion(), + currentChunk->getLastmod(), + currentChunk->getMin(), + currentChunk->getMax(), + std::move(currentSplitVector)); + } + + return splitCandidates; +} + +StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandidatesForCollection( + OperationContext* txn, + const NamespaceString& nss, + const ShardStatisticsVector& shardStats, + bool aggressiveBalanceHint) { + auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); + } + + auto scopedCM = std::move(scopedCMStatus.getValue()); + ChunkManager* const cm = scopedCM.cm(); + + auto collInfoStatus = createCollectionDistributionInfo(txn, shardStats, cm); + if (!collInfoStatus.isOK()) { + return collInfoStatus.getStatus(); + } + + auto collInfo = std::move(collInfoStatus.getValue()); + + DistributionStatus distribution = std::move(std::get<0>(collInfo)); + ChunkMinimumsSet allChunkMinimums = std::move(std::get<1>(collInfo)); + + for (const auto& tagRangeEntry : distribution.tagRanges()) { + const auto& tagRange = tagRangeEntry.second; + + if (!allChunkMinimums.count(tagRange.min)) { + // This tag falls somewhere at the middle of a chunk. Therefore we must skip balancing + // this collection until it is split at the next iteration. + // + // TODO: We should be able to just skip chunks, which straddle tags and still make some + // progress balancing. + return {ErrorCodes::IllegalOperation, + str::stream() + << "Tag boundaries " + << tagRange.toString() + << " fall in the middle of an existing chunk. Balancing for collection " + << nss.ns() + << " will be postponed until the chunk is split appropriately."}; + } + } + + return BalancerPolicy::balance(shardStats, distribution, aggressiveBalanceHint); +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h new file mode 100644 index 00000000000..ffb769121b7 --- /dev/null +++ b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/s/balancer/balancer_chunk_selection_policy.h" + +namespace mongo { + +class ClusterStatistics; + +class BalancerChunkSelectionPolicyImpl final : public BalancerChunkSelectionPolicy { +public: + BalancerChunkSelectionPolicyImpl(ClusterStatistics* clusterStats); + ~BalancerChunkSelectionPolicyImpl(); + + StatusWith<SplitInfoVector> selectChunksToSplit(OperationContext* txn) override; + + StatusWith<MigrateInfoVector> selectChunksToMove(OperationContext* txn, + bool aggressiveBalanceHint) override; + + StatusWith<boost::optional<MigrateInfo>> selectSpecificChunkToMove( + OperationContext* txn, const ChunkType& chunk) override; + + Status checkMoveAllowed(OperationContext* txn, + const ChunkType& chunk, + const ShardId& newShardId) override; + +private: + /** + * Synchronous method, which iterates the collection's chunks and uses the tags information to + * figure out whether some of them validate the tag range boundaries and need to be split. + */ + StatusWith<SplitInfoVector> _getSplitCandidatesForCollection( + OperationContext* txn, const NamespaceString& nss, const ShardStatisticsVector& shardStats); + + /** + * Synchronous method, which iterates the collection's chunks and uses the cluster statistics to + * figure out where to place them. + */ + StatusWith<MigrateInfoVector> _getMigrateCandidatesForCollection( + OperationContext* txn, + const NamespaceString& nss, + const ShardStatisticsVector& shardStats, + bool aggressiveBalanceHint); + + // Source for obtaining cluster statistics. Not owned and must not be destroyed before the + // policy object is destroyed. + ClusterStatistics* const _clusterStats; +}; + +} // namespace mongo diff --git a/src/mongo/s/balancer_configuration.cpp b/src/mongo/s/balancer/balancer_configuration.cpp index 79098a2ec4a..0c86d4d35b9 100644 --- a/src/mongo/s/balancer_configuration.cpp +++ b/src/mongo/s/balancer/balancer_configuration.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include <algorithm> diff --git a/src/mongo/s/balancer_configuration.h b/src/mongo/s/balancer/balancer_configuration.h index 2f5370d162c..2f5370d162c 100644 --- a/src/mongo/s/balancer_configuration.h +++ b/src/mongo/s/balancer/balancer_configuration.h diff --git a/src/mongo/s/balancer_configuration_test.cpp b/src/mongo/s/balancer/balancer_configuration_test.cpp index e889e4a5ded..7e81885b662 100644 --- a/src/mongo/s/balancer_configuration_test.cpp +++ b/src/mongo/s/balancer/balancer_configuration_test.cpp @@ -38,7 +38,7 @@ #include "mongo/executor/remote_command_request.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/sharding_test_fixture.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/s/balancer/balancer_policy.cpp b/src/mongo/s/balancer/balancer_policy.cpp new file mode 100644 index 00000000000..67c75dd590b --- /dev/null +++ b/src/mongo/s/balancer/balancer_policy.cpp @@ -0,0 +1,518 @@ +/** +* Copyright (C) 2010 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects +* for all of the code used other than as permitted herein. If you modify +* file(s) with this exception, you may extend this exception to your +* version of the file(s), but you are not obligated to do so. If you do not +* wish to do so, delete this exception statement from your version. If you +* delete this exception statement from all source files in the program, +* then also delete it in the license file. +*/ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/balancer_policy.h" + +#include "mongo/bson/simple_bsonobj_comparator.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/catalog/type_tags.h" +#include "mongo/util/log.h" +#include "mongo/util/stringutils.h" + +namespace mongo { + +using std::map; +using std::numeric_limits; +using std::set; +using std::string; +using std::vector; + +namespace { + +// These values indicate the minimum deviation shard's number of chunks need to have from the +// optimal average across all shards for a zone for a rebalancing migration to be initiated. +const size_t kDefaultImbalanceThreshold = 2; +const size_t kAggressiveImbalanceThreshold = 1; + +} // namespace + +DistributionStatus::DistributionStatus(NamespaceString nss, ShardToChunksMap shardToChunksMap) + : _nss(std::move(nss)), + _shardChunks(std::move(shardToChunksMap)), + _zoneRanges(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ZoneRange>()) {} + +size_t DistributionStatus::totalChunks() const { + size_t total = 0; + + for (const auto& shardChunk : _shardChunks) { + total += shardChunk.second.size(); + } + + return total; +} + +size_t DistributionStatus::totalChunksWithTag(const std::string& tag) const { + size_t total = 0; + + for (const auto& shardChunk : _shardChunks) { + total += numberOfChunksInShardWithTag(shardChunk.first, tag); + } + + return total; +} + +size_t DistributionStatus::numberOfChunksInShard(const ShardId& shardId) const { + const auto& shardChunks = getChunks(shardId); + return shardChunks.size(); +} + +size_t DistributionStatus::numberOfChunksInShardWithTag(const ShardId& shardId, + const string& tag) const { + const auto& shardChunks = getChunks(shardId); + + size_t total = 0; + + for (const auto& chunk : shardChunks) { + if (tag == getTagForChunk(chunk)) { + total++; + } + } + + return total; +} + +const vector<ChunkType>& DistributionStatus::getChunks(const ShardId& shardId) const { + ShardToChunksMap::const_iterator i = _shardChunks.find(shardId); + invariant(i != _shardChunks.end()); + + return i->second; +} + +Status DistributionStatus::addRangeToZone(const ZoneRange& range) { + const auto minIntersect = _zoneRanges.upper_bound(range.min); + const auto maxIntersect = _zoneRanges.upper_bound(range.max); + + // Check for partial overlap + if (minIntersect != maxIntersect) { + invariant(minIntersect != _zoneRanges.end()); + const auto& intersectingRange = + (SimpleBSONObjComparator::kInstance.evaluate(minIntersect->second.min < range.max)) + ? minIntersect->second + : maxIntersect->second; + + if (SimpleBSONObjComparator::kInstance.evaluate(intersectingRange.min == range.min) && + SimpleBSONObjComparator::kInstance.evaluate(intersectingRange.max == range.max) && + intersectingRange.zone == range.zone) { + return Status::OK(); + } + + return {ErrorCodes::RangeOverlapConflict, + str::stream() << "Zone range: " << range.toString() + << " is overlapping with existing: " + << intersectingRange.toString()}; + } + + // Check for containment + if (minIntersect != _zoneRanges.end()) { + const ZoneRange& nextRange = minIntersect->second; + if (SimpleBSONObjComparator::kInstance.evaluate(range.max > nextRange.min)) { + invariant(SimpleBSONObjComparator::kInstance.evaluate(range.max < nextRange.max)); + return {ErrorCodes::RangeOverlapConflict, + str::stream() << "Zone range: " << range.toString() + << " is overlapping with existing: " + << nextRange.toString()}; + } + } + + _zoneRanges[range.max.getOwned()] = range; + _allTags.insert(range.zone); + return Status::OK(); +} + +string DistributionStatus::getTagForChunk(const ChunkType& chunk) const { + const auto minIntersect = _zoneRanges.upper_bound(chunk.getMin()); + const auto maxIntersect = _zoneRanges.lower_bound(chunk.getMax()); + + // We should never have a partial overlap with a chunk range. If it happens, treat it as if this + // chunk doesn't belong to a tag + if (minIntersect != maxIntersect) { + return ""; + } + + if (minIntersect == _zoneRanges.end()) { + return ""; + } + + const ZoneRange& intersectRange = minIntersect->second; + + // Check for containment + if (SimpleBSONObjComparator::kInstance.evaluate(intersectRange.min <= chunk.getMin()) && + SimpleBSONObjComparator::kInstance.evaluate(chunk.getMax() <= intersectRange.max)) { + return intersectRange.zone; + } + + return ""; +} + +void DistributionStatus::report(BSONObjBuilder* builder) const { + builder->append("ns", _nss.ns()); + + // Report all shards + BSONArrayBuilder shardArr(builder->subarrayStart("shards")); + for (const auto& shardChunk : _shardChunks) { + BSONObjBuilder shardEntry(shardArr.subobjStart()); + shardEntry.append("name", shardChunk.first.toString()); + + BSONArrayBuilder chunkArr(shardEntry.subarrayStart("chunks")); + for (const auto& chunk : shardChunk.second) { + chunkArr.append(chunk.toBSON()); + } + chunkArr.doneFast(); + + shardEntry.doneFast(); + } + shardArr.doneFast(); + + // Report all tags + BSONArrayBuilder tagsArr(builder->subarrayStart("tags")); + tagsArr.append(_allTags); + tagsArr.doneFast(); + + // Report all tag ranges + BSONArrayBuilder tagRangesArr(builder->subarrayStart("tagRanges")); + for (const auto& tagRange : _zoneRanges) { + BSONObjBuilder tagRangeEntry(tagRangesArr.subobjStart()); + tagRangeEntry.append("tag", tagRange.second.zone); + tagRangeEntry.append("mapKey", tagRange.first); + tagRangeEntry.append("min", tagRange.second.min); + tagRangeEntry.append("max", tagRange.second.max); + tagRangeEntry.doneFast(); + } + tagRangesArr.doneFast(); +} + +string DistributionStatus::toString() const { + BSONObjBuilder builder; + report(&builder); + + return builder.obj().toString(); +} + +Status BalancerPolicy::isShardSuitableReceiver(const ClusterStatistics::ShardStatistics& stat, + const string& chunkTag) { + if (stat.isSizeMaxed()) { + return {ErrorCodes::IllegalOperation, + str::stream() << stat.shardId + << " has already reached the maximum total chunk size."}; + } + + if (stat.isDraining) { + return {ErrorCodes::IllegalOperation, + str::stream() << stat.shardId << " is currently draining."}; + } + + if (!chunkTag.empty() && !stat.shardTags.count(chunkTag)) { + return {ErrorCodes::IllegalOperation, + str::stream() << stat.shardId << " doesn't have right tag"}; + } + + return Status::OK(); +} + +ShardId BalancerPolicy::_getLeastLoadedReceiverShard(const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const string& tag, + const set<ShardId>& excludedShards) { + ShardId best; + unsigned minChunks = numeric_limits<unsigned>::max(); + + for (const auto& stat : shardStats) { + if (excludedShards.count(stat.shardId)) + continue; + + auto status = isShardSuitableReceiver(stat, tag); + if (!status.isOK()) { + continue; + } + + unsigned myChunks = distribution.numberOfChunksInShard(stat.shardId); + if (myChunks >= minChunks) { + continue; + } + + best = stat.shardId; + minChunks = myChunks; + } + + return best; +} + +ShardId BalancerPolicy::_getMostOverloadedShard(const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const string& chunkTag, + const set<ShardId>& excludedShards) { + ShardId worst; + unsigned maxChunks = 0; + + for (const auto& stat : shardStats) { + if (excludedShards.count(stat.shardId)) + continue; + + const unsigned shardChunkCount = + distribution.numberOfChunksInShardWithTag(stat.shardId, chunkTag); + if (shardChunkCount <= maxChunks) + continue; + + worst = stat.shardId; + maxChunks = shardChunkCount; + } + + return worst; +} + +vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + bool shouldAggressivelyBalance) { + vector<MigrateInfo> migrations; + + // Set of shards, which have already been used for migrations. Used so we don't return multiple + // migrations for the same shard. + set<ShardId> usedShards; + + // 1) Check for shards, which are in draining mode and must have chunks moved off of them + { + for (const auto& stat : shardStats) { + if (!stat.isDraining) + continue; + + const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId); + + if (chunks.empty()) + continue; + + // Now we know we need to move to chunks off this shard, but only if permitted by the + // tags policy + unsigned numJumboChunks = 0; + + // Since we have to move all chunks, lets just do in order + for (const auto& chunk : chunks) { + if (chunk.getJumbo()) { + numJumboChunks++; + continue; + } + + const string tag = distribution.getTagForChunk(chunk); + + const ShardId to = + _getLeastLoadedReceiverShard(shardStats, distribution, tag, usedShards); + if (!to.isValid()) { + if (migrations.empty()) { + warning() << "Chunk " << redact(chunk.toString()) + << " is on a draining shard, but no appropriate recipient found"; + } + continue; + } + + invariant(to != stat.shardId); + migrations.emplace_back(distribution.nss().ns(), to, chunk); + invariant(usedShards.insert(stat.shardId).second); + invariant(usedShards.insert(to).second); + break; + } + + if (migrations.empty()) { + warning() << "Unable to find any chunk to move from draining shard " << stat.shardId + << ". numJumboChunks: " << numJumboChunks; + } + } + } + + // 2) Check for chunks, which are on the wrong shard and must be moved off of it + if (!distribution.tags().empty()) { + for (const auto& stat : shardStats) { + const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId); + for (const auto& chunk : chunks) { + const string tag = distribution.getTagForChunk(chunk); + if (tag.empty()) + continue; + + if (stat.shardTags.count(tag)) + continue; + + if (chunk.getJumbo()) { + warning() << "chunk " << redact(chunk.toString()) << " violates tag " + << redact(tag) << ", but it is jumbo and cannot be moved"; + continue; + } + + const ShardId to = + _getLeastLoadedReceiverShard(shardStats, distribution, tag, usedShards); + if (!to.isValid()) { + if (migrations.empty()) { + warning() << "chunk " << redact(chunk.toString()) << " violates tag " + << redact(tag) << ", but no appropriate recipient found"; + } + continue; + } + + invariant(to != stat.shardId); + migrations.emplace_back(distribution.nss().ns(), to, chunk); + invariant(usedShards.insert(stat.shardId).second); + invariant(usedShards.insert(to).second); + break; + } + } + } + + // 3) for each tag balance + const size_t imbalanceThreshold = (shouldAggressivelyBalance || distribution.totalChunks() < 20) + ? kAggressiveImbalanceThreshold + : kDefaultImbalanceThreshold; + + vector<string> tagsPlusEmpty(distribution.tags().begin(), distribution.tags().end()); + tagsPlusEmpty.push_back(""); + + for (const auto& tag : tagsPlusEmpty) { + while (_singleZoneBalance( + shardStats, distribution, tag, imbalanceThreshold, &migrations, &usedShards)) + ; + } + + return migrations; +} + +boost::optional<MigrateInfo> BalancerPolicy::balanceSingleChunk( + const ChunkType& chunk, + const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution) { + const string tag = distribution.getTagForChunk(chunk); + + ShardId newShardId = + _getLeastLoadedReceiverShard(shardStats, distribution, tag, set<ShardId>()); + if (!newShardId.isValid() || newShardId == chunk.getShard()) { + return boost::optional<MigrateInfo>(); + } + + return MigrateInfo(distribution.nss().ns(), newShardId, chunk); +} + +bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const string& tag, + size_t imbalanceThreshold, + vector<MigrateInfo>* migrations, + set<ShardId>* usedShards) { + const ShardId from = _getMostOverloadedShard(shardStats, distribution, tag, *usedShards); + if (!from.isValid()) + return false; + + const size_t max = distribution.numberOfChunksInShardWithTag(from, tag); + if (max == 0) + return false; + + const ShardId to = _getLeastLoadedReceiverShard(shardStats, distribution, tag, *usedShards); + if (!to.isValid()) { + if (migrations->empty()) { + log() << "No available shards to take chunks for tag [" << tag << "]"; + } + return false; + } + + const size_t min = distribution.numberOfChunksInShardWithTag(to, tag); + if (min >= max) + return false; + + const size_t totalNumberOfChunksWithTag = + (tag.empty() ? distribution.totalChunks() : distribution.totalChunksWithTag(tag)); + + size_t totalNumberOfShardsWithTag = 0; + + for (const auto& stat : shardStats) { + if (tag.empty() || stat.shardTags.count(tag)) { + totalNumberOfShardsWithTag++; + } + } + + // totalNumberOfShardsWithTag cannot be zero if the to shard is valid + invariant(totalNumberOfShardsWithTag); + invariant(totalNumberOfChunksWithTag >= max); + + // Calculate the ceiling of the optimal number of chunks per shard + const size_t idealNumberOfChunksPerShardWithTag = + (totalNumberOfChunksWithTag / totalNumberOfShardsWithTag) + + (totalNumberOfChunksWithTag % totalNumberOfShardsWithTag ? 1 : 0); + + const size_t imbalance = max - idealNumberOfChunksPerShardWithTag; + + LOG(1) << "collection : " << distribution.nss().ns(); + LOG(1) << "zone : " << tag; + LOG(1) << "donor : " << from << " chunks on " << max; + LOG(1) << "receiver : " << to << " chunks on " << min; + LOG(1) << "ideal : " << idealNumberOfChunksPerShardWithTag; + LOG(1) << "threshold : " << imbalanceThreshold; + + // Check whether it is necessary to balance within this zone + if (imbalance < imbalanceThreshold) + return false; + + const vector<ChunkType>& chunks = distribution.getChunks(from); + + unsigned numJumboChunks = 0; + + for (const auto& chunk : chunks) { + if (distribution.getTagForChunk(chunk) != tag) + continue; + + if (chunk.getJumbo()) { + numJumboChunks++; + continue; + } + + migrations->emplace_back(distribution.nss().ns(), to, chunk); + invariant(usedShards->insert(chunk.getShard()).second); + invariant(usedShards->insert(to).second); + return true; + } + + if (numJumboChunks) { + warning() << "Shard: " << from << ", collection: " << distribution.nss().ns() + << " has only jumbo chunks for zone \'" << tag + << "\' and cannot be balanced. Jumbo chunks count: " << numJumboChunks; + } + + return false; +} + +string ZoneRange::toString() const { + return str::stream() << min << " -->> " << max << " on " << zone; +} + +std::string MigrateInfo::getName() const { + return ChunkType::genID(ns, minKey); +} + +string MigrateInfo::toString() const { + return str::stream() << ns << ": [" << minKey << ", " << maxKey << "), from " << from << ", to " + << to; +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/balancer_policy.h b/src/mongo/s/balancer/balancer_policy.h new file mode 100644 index 00000000000..97a1f2e91b1 --- /dev/null +++ b/src/mongo/s/balancer/balancer_policy.h @@ -0,0 +1,246 @@ +/** +* Copyright (C) 2010 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects +* for all of the code used other than as permitted herein. If you modify +* file(s) with this exception, you may extend this exception to your +* version of the file(s), but you are not obligated to do so. If you do not +* wish to do so, delete this exception statement from your version. If you +* delete this exception statement from all source files in the program, +* then also delete it in the license file. +*/ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/simple_bsonobj_comparator.h" +#include "mongo/s/balancer/cluster_statistics.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/client/shard.h" + +namespace mongo { + +class ChunkManager; +class OperationContext; + +struct ZoneRange { + ZoneRange() = default; + + ZoneRange(const BSONObj& a_min, const BSONObj& a_max, const std::string& _zone) + : min(a_min.getOwned()), max(a_max.getOwned()), zone(_zone) {} + + std::string toString() const; + + BSONObj min; + BSONObj max; + std::string zone; +}; + +struct MigrateInfo { + MigrateInfo(const std::string& a_ns, const ShardId& a_to, const ChunkType& a_chunk) + : ns(a_ns), + to(a_to), + from(a_chunk.getShard()), + minKey(a_chunk.getMin()), + maxKey(a_chunk.getMax()) {} + + MigrateInfo(const std::string& a_ns, + const ShardId& a_to, + const ShardId& a_from, + const BSONObj& a_minKey, + const BSONObj& a_maxKey) + : ns(a_ns), to(a_to), from(a_from), minKey(a_minKey), maxKey(a_maxKey) {} + + std::string getName() const; + std::string toString() const; + + std::string ns; + ShardId to; + ShardId from; + BSONObj minKey; + BSONObj maxKey; +}; + +typedef std::vector<ClusterStatistics::ShardStatistics> ShardStatisticsVector; +typedef std::map<ShardId, std::vector<ChunkType>> ShardToChunksMap; + +/** + * This class constitutes a cache of the chunk distribution across the entire cluster along with the + * zone boundaries imposed on it. This information is stored in format, which makes it efficient to + * query utilization statististics and to decide what to balance. + */ +class DistributionStatus { + MONGO_DISALLOW_COPYING(DistributionStatus); + +public: + DistributionStatus(NamespaceString nss, ShardToChunksMap shardToChunksMap); + DistributionStatus(DistributionStatus&&) = default; + + /** + * Returns the namespace for which this balance status applies. + */ + const NamespaceString& nss() const { + return _nss; + } + + /** + * Appends the specified range to the set of ranges tracked for this collection and checks if + * it overlaps with existing ranges. + */ + Status addRangeToZone(const ZoneRange& range); + + /** + * Returns total number of chunks across all shards. + */ + size_t totalChunks() const; + + /** + * Returns the total number of chunks across all shards, which fall into the specified zone's + * range. + */ + size_t totalChunksWithTag(const std::string& tag) const; + + /** + * Returns number of chunks in the specified shard. + */ + size_t numberOfChunksInShard(const ShardId& shardId) const; + + /** + * Returns number of chunks in the specified shard, which have the given tag. + */ + size_t numberOfChunksInShardWithTag(const ShardId& shardId, const std::string& tag) const; + + /** + * Returns all chunks for the specified shard. + */ + const std::vector<ChunkType>& getChunks(const ShardId& shardId) const; + + /** + * Returns all tag ranges defined for the collection. + */ + const BSONObjIndexedMap<ZoneRange>& tagRanges() const { + return _zoneRanges; + } + + /** + * Returns all tags defined for the collection. + */ + const std::set<std::string>& tags() const { + return _allTags; + } + + /** + * Using the set of tags defined for the collection, returns what tag corresponds to the + * specified chunk. If the chunk doesn't fall into any tag returns the empty string. + */ + std::string getTagForChunk(const ChunkType& chunk) const; + + /** + * Returns a BSON/string representation of this distribution status. + */ + void report(BSONObjBuilder* builder) const; + std::string toString() const; + +private: + // Namespace for which this distribution applies + NamespaceString _nss; + + // Map of what chunks are owned by each shard + ShardToChunksMap _shardChunks; + + // Map of zone max key to the zone description + BSONObjIndexedMap<ZoneRange> _zoneRanges; + + // Set of all zones defined for this collection + std::set<std::string> _allTags; +}; + +class BalancerPolicy { +public: + /** + * Determines whether a shard with the specified utilization statistics would be able to accept + * a chunk with the specified tag. According to the policy a shard cannot accept chunks if its + * size is maxed out and if the chunk's tag conflicts with the tag of the shard. + */ + static Status isShardSuitableReceiver(const ClusterStatistics::ShardStatistics& stat, + const std::string& chunkTag); + + /** + * Returns a suggested set of chunks to move whithin a collection's shards, given the specified + * state of the shards (draining, max size reached, etc) and the number of chunks for that + * collection. If the policy doesn't recommend anything to move, it returns an empty vector. The + * entries in the vector do are all for separate source/destination shards and as such do not + * need to be done serially and can be scheduled in parallel. + * + * The balancing logic calculates the optimum number of chunks per shard for each zone and if + * any of the shards have chunks, which are sufficiently higher than this number, suggests + * moving chunks to shards, which are under this number. + * + * The shouldAggressivelyBalance parameter causes the threshold for chunk could disparity + * between shards to be lowered. + */ + static std::vector<MigrateInfo> balance(const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + bool shouldAggressivelyBalance); + + /** + * Using the specified distribution information, returns a suggested better location for the + * specified chunk if one is available. + */ + static boost::optional<MigrateInfo> balanceSingleChunk(const ChunkType& chunk, + const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution); + +private: + /** + * Return the shard with the specified tag, which has the least number of chunks. If the tag is + * empty, considers all shards. + */ + static ShardId _getLeastLoadedReceiverShard(const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const std::string& tag, + const std::set<ShardId>& excludedShards); + + /** + * Return the shard which has the least number of chunks with the specified tag. If the tag is + * empty, considers all chunks. + */ + static ShardId _getMostOverloadedShard(const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const std::string& chunkTag, + const std::set<ShardId>& excludedShards); + + /** + * Selects one chunk for the specified zone (if appropriate) to be moved in order to bring the + * deviation of the shards chunk contents closer to even across all shards in the specified + * zone. Takes into account the shards, which have already been used for migrations. + * + * Returns true if a migration was suggested, false otherwise. This method is intented to be + * called multiple times until all posible migrations for a zone have been selected. + */ + static bool _singleZoneBalance(const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const std::string& tag, + size_t imbalanceThreshold, + std::vector<MigrateInfo>* migrations, + std::set<ShardId>* usedShards); +}; + +} // namespace mongo diff --git a/src/mongo/s/balancer/balancer_policy_tests.cpp b/src/mongo/s/balancer/balancer_policy_tests.cpp new file mode 100644 index 00000000000..b6024c8f1d5 --- /dev/null +++ b/src/mongo/s/balancer/balancer_policy_tests.cpp @@ -0,0 +1,608 @@ +/** + * Copyright (C) 2012-2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/platform/random.h" +#include "mongo/s/balancer/balancer_policy.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +using std::map; +using std::string; +using std::stringstream; +using std::vector; + +using ShardStatistics = ClusterStatistics::ShardStatistics; + +const auto emptyTagSet = std::set<std::string>(); +const std::string emptyShardVersion = ""; +const auto kShardId0 = ShardId("shard0"); +const auto kShardId1 = ShardId("shard1"); +const auto kShardId2 = ShardId("shard2"); +const auto kShardId3 = ShardId("shard3"); +const NamespaceString kNamespace("TestDB", "TestColl"); +const uint64_t kNoMaxSize = 0; + +/** + * Constructs a shard statistics vector and a consistent mapping of chunks to shards given the + * specified input parameters. The generated chunks have an ever increasing min value. I.e, they + * will be in the form: + * + * [MinKey, 1), [1, 2), [2, 3) ... [N - 1, MaxKey) + */ +std::pair<ShardStatisticsVector, ShardToChunksMap> generateCluster( + const vector<std::pair<ShardStatistics, size_t>>& shardsAndNumChunks) { + int64_t totalNumChunks = 0; + for (const auto& entry : shardsAndNumChunks) { + totalNumChunks += std::get<1>(entry); + } + + ShardToChunksMap chunkMap; + ShardStatisticsVector shardStats; + + int64_t currentChunk = 0; + + for (auto it = shardsAndNumChunks.begin(); it != shardsAndNumChunks.end(); it++) { + ShardStatistics shard = std::move(it->first); + const size_t numChunks = it->second; + + // Ensure that an entry is created + chunkMap[shard.shardId]; + + for (size_t i = 0; i < numChunks; i++, currentChunk++) { + ChunkType chunk; + chunk.setMin(currentChunk == 0 ? kMinBSONKey : BSON("x" << currentChunk)); + chunk.setMax(currentChunk == totalNumChunks - 1 ? kMaxBSONKey + : BSON("x" << currentChunk + 1)); + chunk.setShard(shard.shardId); + + chunkMap[shard.shardId].push_back(std::move(chunk)); + } + + shardStats.push_back(std::move(shard)); + } + + return std::make_pair(std::move(shardStats), std::move(chunkMap)); +} + +TEST(BalancerPolicy, Basic) { + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4}, + {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}, + {ShardStatistics(kShardId2, kNoMaxSize, 3, false, emptyTagSet, emptyShardVersion), 3}}); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT_EQ(1U, migrations.size()); + ASSERT_EQ(kShardId0, migrations[0].from); + ASSERT_EQ(kShardId1, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); +} + +TEST(BalancerPolicy, SmallClusterShouldBePerfectlyBalanced) { + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}, + {ShardStatistics(kShardId1, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 2}, + {ShardStatistics(kShardId2, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT_EQ(1U, migrations.size()); + ASSERT_EQ(kShardId1, migrations[0].from); + ASSERT_EQ(kShardId2, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), migrations[0].maxKey); +} + +TEST(BalancerPolicy, SingleChunkShouldNotMove) { + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}, + {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); + + ASSERT( + BalancerPolicy::balance(cluster.first, DistributionStatus(kNamespace, cluster.second), true) + .empty()); + ASSERT(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false) + .empty()); +} + +TEST(BalancerPolicy, BalanceThresholdObeyed) { + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 2}, + {ShardStatistics(kShardId1, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 2}, + {ShardStatistics(kShardId2, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}, + {ShardStatistics(kShardId3, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}}); + + ASSERT( + BalancerPolicy::balance(cluster.first, DistributionStatus(kNamespace, cluster.second), true) + .empty()); + ASSERT(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false) + .empty()); +} + +TEST(BalancerPolicy, ParallelBalancing) { + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4}, + {ShardStatistics(kShardId1, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4}, + {ShardStatistics(kShardId2, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}, + {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT_EQ(2U, migrations.size()); + + ASSERT_EQ(kShardId0, migrations[0].from); + ASSERT_EQ(kShardId2, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); + + ASSERT_EQ(kShardId1, migrations[1].from); + ASSERT_EQ(kShardId3, migrations[1].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMin(), migrations[1].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), migrations[1].maxKey); +} + +TEST(BalancerPolicy, JumboChunksNotMoved) { + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4}, + {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); + + cluster.second[kShardId0][0].setJumbo(true); + cluster.second[kShardId0][1].setJumbo(false); // Only chunk 1 is not jumbo + cluster.second[kShardId0][2].setJumbo(true); + cluster.second[kShardId0][3].setJumbo(true); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT_EQ(1U, migrations.size()); + ASSERT_EQ(kShardId0, migrations[0].from); + ASSERT_EQ(kShardId1, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMax(), migrations[0].maxKey); +} + +TEST(BalancerPolicy, JumboChunksNotMovedParallel) { + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4}, + {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}, + {ShardStatistics(kShardId2, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4}, + {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}}); + + cluster.second[kShardId0][0].setJumbo(true); + cluster.second[kShardId0][1].setJumbo(false); // Only chunk 1 is not jumbo + cluster.second[kShardId0][2].setJumbo(true); + cluster.second[kShardId0][3].setJumbo(true); + + cluster.second[kShardId2][0].setJumbo(true); + cluster.second[kShardId2][1].setJumbo(true); + cluster.second[kShardId2][2].setJumbo(false); // Only chunk 1 is not jumbo + cluster.second[kShardId2][3].setJumbo(true); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT_EQ(2U, migrations.size()); + + ASSERT_EQ(kShardId0, migrations[0].from); + ASSERT_EQ(kShardId1, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMax(), migrations[0].maxKey); + + ASSERT_EQ(kShardId2, migrations[1].from); + ASSERT_EQ(kShardId3, migrations[1].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][2].getMin(), migrations[1].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][2].getMax(), migrations[1].maxKey); +} + +TEST(BalancerPolicy, DrainingSingleChunk) { + // shard0 is draining and chunks will go to shard1, even though it has a lot more chunks + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 1}, + {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}}); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT_EQ(1U, migrations.size()); + ASSERT_EQ(kShardId0, migrations[0].from); + ASSERT_EQ(kShardId1, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); +} + +TEST(BalancerPolicy, DrainingSingleChunkPerShard) { + // shard0 and shard2 are draining and chunks will go to shard1 and shard3 in parallel + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 1}, + {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}, + {ShardStatistics(kShardId2, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 1}, + {ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}}); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT_EQ(2U, migrations.size()); + + ASSERT_EQ(kShardId0, migrations[0].from); + ASSERT_EQ(kShardId1, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); + + ASSERT_EQ(kShardId2, migrations[1].from); + ASSERT_EQ(kShardId3, migrations[1].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[1].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[1].maxKey); +} + +TEST(BalancerPolicy, DrainingWithTwoChunksFirstOneSelected) { + // shard0 is draining and chunks will go to shard1, even though it has a lot more chunks + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 2}, + {ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}}); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT_EQ(1U, migrations.size()); + ASSERT_EQ(kShardId0, migrations[0].from); + ASSERT_EQ(kShardId1, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); +} + +TEST(BalancerPolicy, DrainingMultipleShardsFirstOneSelected) { + // shard0 and shard1 are both draining with very little chunks in them and chunks will go to + // shard2, even though it has a lot more chunks that the other two + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 5, true, emptyTagSet, emptyShardVersion), 1}, + {ShardStatistics(kShardId1, kNoMaxSize, 5, true, emptyTagSet, emptyShardVersion), 2}, + {ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 16}}); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT_EQ(1U, migrations.size()); + ASSERT_EQ(kShardId0, migrations[0].from); + ASSERT_EQ(kShardId2, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); +} + +TEST(BalancerPolicy, DrainingMultipleShardsWontAcceptChunks) { + // shard0 has many chunks, but can't move them to shard1 or shard2 because they are draining + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4}, + {ShardStatistics(kShardId1, kNoMaxSize, 0, true, emptyTagSet, emptyShardVersion), 0}, + {ShardStatistics(kShardId2, kNoMaxSize, 0, true, emptyTagSet, emptyShardVersion), 0}}); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT(migrations.empty()); +} + +TEST(BalancerPolicy, DrainingSingleAppropriateShardFoundDueToTag) { + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 2, false, {"NYC"}, emptyShardVersion), 4}, + {ShardStatistics(kShardId1, kNoMaxSize, 2, false, {"LAX"}, emptyShardVersion), 4}, + {ShardStatistics(kShardId2, kNoMaxSize, 1, true, {"LAX"}, emptyShardVersion), 1}}); + + DistributionStatus distribution(kNamespace, cluster.second); + ASSERT_OK(distribution.addRangeToZone(ZoneRange( + cluster.second[kShardId2][0].getMin(), cluster.second[kShardId2][0].getMax(), "LAX"))); + + const auto migrations(BalancerPolicy::balance(cluster.first, distribution, false)); + ASSERT_EQ(1U, migrations.size()); + ASSERT_EQ(kShardId2, migrations[0].from); + ASSERT_EQ(kShardId1, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey); +} + +TEST(BalancerPolicy, DrainingNoAppropriateShardsFoundDueToTag) { + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 2, false, {"NYC"}, emptyShardVersion), 4}, + {ShardStatistics(kShardId1, kNoMaxSize, 2, false, {"LAX"}, emptyShardVersion), 4}, + {ShardStatistics(kShardId2, kNoMaxSize, 1, true, {"SEA"}, emptyShardVersion), 1}}); + + DistributionStatus distribution(kNamespace, cluster.second); + ASSERT_OK(distribution.addRangeToZone(ZoneRange( + cluster.second[kShardId2][0].getMin(), cluster.second[kShardId2][0].getMax(), "SEA"))); + + const auto migrations(BalancerPolicy::balance(cluster.first, distribution, false)); + ASSERT(migrations.empty()); +} + +TEST(BalancerPolicy, NoBalancingDueToAllNodesEitherDrainingOrMaxedOut) { + // shard0 and shard2 are draining, shard1 is maxed out + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 1}, + {ShardStatistics(kShardId1, 1, 1, false, emptyTagSet, emptyShardVersion), 6}, + {ShardStatistics(kShardId2, kNoMaxSize, 1, true, emptyTagSet, emptyShardVersion), 1}}); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT(migrations.empty()); +} + +TEST(BalancerPolicy, BalancerRespectsMaxShardSizeOnlyBalanceToNonMaxed) { + // Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 3. Other + // shards have maxSize = 0 = unset. Even though the overloaded shard has the least number of + // less chunks, we shouldn't move chunks to that shard. + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, 1, 3, false, emptyTagSet, emptyShardVersion), 2}, + {ShardStatistics(kShardId1, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4}, + {ShardStatistics(kShardId2, kNoMaxSize, 6, false, emptyTagSet, emptyShardVersion), 6}}); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT_EQ(1U, migrations.size()); + ASSERT_EQ(kShardId2, migrations[0].from); + ASSERT_EQ(kShardId1, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey); +} + +TEST(BalancerPolicy, BalancerRespectsMaxShardSizeWhenAllBalanced) { + // Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 4. Other + // shards have maxSize = 0 = unset. We check that being over the maxSize is NOT equivalent to + // draining, we don't want to empty shards for no other reason than they are over this limit. + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, 1, 4, false, emptyTagSet, emptyShardVersion), 4}, + {ShardStatistics(kShardId1, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4}, + {ShardStatistics(kShardId2, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4}}); + + const auto migrations(BalancerPolicy::balance( + cluster.first, DistributionStatus(kNamespace, cluster.second), false)); + ASSERT(migrations.empty()); +} + +TEST(BalancerPolicy, BalancerRespectsTagsWhenDraining) { + // shard1 drains the proper chunk to shard0, even though it is more loaded than shard2 + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 6}, + {ShardStatistics(kShardId1, kNoMaxSize, 5, true, {"a", "b"}, emptyShardVersion), 2}, + {ShardStatistics(kShardId2, kNoMaxSize, 5, false, {"b"}, emptyShardVersion), 2}}); + + DistributionStatus distribution(kNamespace, cluster.second); + ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 7), "a"))); + ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 8), kMaxBSONKey, "b"))); + + const auto migrations(BalancerPolicy::balance(cluster.first, distribution, false)); + ASSERT_EQ(1U, migrations.size()); + ASSERT_EQ(kShardId1, migrations[0].from); + ASSERT_EQ(kShardId0, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), migrations[0].maxKey); +} + +TEST(BalancerPolicy, BalancerRespectsTagPolicyBeforeImbalance) { + // There is a large imbalance between shard0 and shard1, but the balancer must first fix the + // chunks, which are on a wrong shard due to tag policy + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 2}, + {ShardStatistics(kShardId1, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 6}, + {ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 2}}); + + DistributionStatus distribution(kNamespace, cluster.second); + ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 100), "a"))); + + const auto migrations(BalancerPolicy::balance(cluster.first, distribution, false)); + ASSERT_EQ(1U, migrations.size()); + ASSERT_EQ(kShardId2, migrations[0].from); + ASSERT_EQ(kShardId0, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey); +} + +TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedCluster) { + // Chunks are balanced across shards, but there are wrong tags, which need to be fixed + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3}, + {ShardStatistics(kShardId1, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3}, + {ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 3}}); + + DistributionStatus distribution(kNamespace, cluster.second); + ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 10), "a"))); + + const auto migrations(BalancerPolicy::balance(cluster.first, distribution, false)); + ASSERT_EQ(1U, migrations.size()); + ASSERT_EQ(kShardId2, migrations[0].from); + ASSERT_EQ(kShardId0, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey); +} + +TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedClusterParallel) { + // Chunks are balanced across shards, but there are wrong tags, which need to be fixed + auto cluster = generateCluster( + {{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3}, + {ShardStatistics(kShardId1, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3}, + {ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 3}, + {ShardStatistics(kShardId3, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 3}}); + + DistributionStatus distribution(kNamespace, cluster.second); + ASSERT_OK(distribution.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 20), "a"))); + + const auto migrations(BalancerPolicy::balance(cluster.first, distribution, false)); + ASSERT_EQ(2U, migrations.size()); + + ASSERT_EQ(kShardId2, migrations[0].from); + ASSERT_EQ(kShardId0, migrations[0].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey); + + ASSERT_EQ(kShardId3, migrations[1].from); + ASSERT_EQ(kShardId1, migrations[1].to); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId3][0].getMin(), migrations[1].minKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId3][0].getMax(), migrations[1].maxKey); +} + +TEST(DistributionStatus, AddTagRangeOverlap) { + DistributionStatus d(kNamespace, ShardToChunksMap{}); + + // Note that there is gap between 10 and 20 for which there is no tag + ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << 1), BSON("x" << 10), "a"))); + ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << 20), BSON("x" << 30), "b"))); + + ASSERT_EQ(ErrorCodes::RangeOverlapConflict, + d.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 2), "d"))); + ASSERT_EQ(ErrorCodes::RangeOverlapConflict, + d.addRangeToZone(ZoneRange(BSON("x" << -1), BSON("x" << 5), "d"))); + ASSERT_EQ(ErrorCodes::RangeOverlapConflict, + d.addRangeToZone(ZoneRange(BSON("x" << 5), BSON("x" << 9), "d"))); + ASSERT_EQ(ErrorCodes::RangeOverlapConflict, + d.addRangeToZone(ZoneRange(BSON("x" << 1), BSON("x" << 10), "d"))); + ASSERT_EQ(ErrorCodes::RangeOverlapConflict, + d.addRangeToZone(ZoneRange(BSON("x" << 5), BSON("x" << 25), "d"))); + ASSERT_EQ(ErrorCodes::RangeOverlapConflict, + d.addRangeToZone(ZoneRange(BSON("x" << -1), BSON("x" << 32), "d"))); + ASSERT_EQ(ErrorCodes::RangeOverlapConflict, + d.addRangeToZone(ZoneRange(BSON("x" << 25), kMaxBSONKey, "d"))); +} + +TEST(DistributionStatus, ChunkTagsSelectorWithRegularKeys) { + DistributionStatus d(kNamespace, ShardToChunksMap{}); + + ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << 1), BSON("x" << 10), "a"))); + ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << 10), BSON("x" << 20), "b"))); + ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << 20), BSON("x" << 30), "c"))); + + { + ChunkType chunk; + chunk.setMin(kMinBSONKey); + chunk.setMax(BSON("x" << 1)); + ASSERT_EQUALS("", d.getTagForChunk(chunk)); + } + + { + ChunkType chunk; + chunk.setMin(BSON("x" << 0)); + chunk.setMax(BSON("x" << 1)); + ASSERT_EQUALS("", d.getTagForChunk(chunk)); + } + + { + ChunkType chunk; + chunk.setMin(BSON("x" << 1)); + chunk.setMax(BSON("x" << 5)); + ASSERT_EQUALS("a", d.getTagForChunk(chunk)); + } + + { + ChunkType chunk; + chunk.setMin(BSON("x" << 10)); + chunk.setMax(BSON("x" << 20)); + ASSERT_EQUALS("b", d.getTagForChunk(chunk)); + } + + { + ChunkType chunk; + chunk.setMin(BSON("x" << 15)); + chunk.setMax(BSON("x" << 20)); + ASSERT_EQUALS("b", d.getTagForChunk(chunk)); + } + + { + ChunkType chunk; + chunk.setMin(BSON("x" << 25)); + chunk.setMax(BSON("x" << 30)); + ASSERT_EQUALS("c", d.getTagForChunk(chunk)); + } + + { + ChunkType chunk; + chunk.setMin(BSON("x" << 35)); + chunk.setMax(BSON("x" << 40)); + ASSERT_EQUALS("", d.getTagForChunk(chunk)); + } + + { + ChunkType chunk; + chunk.setMin(BSON("x" << 40)); + chunk.setMax(kMaxBSONKey); + ASSERT_EQUALS("", d.getTagForChunk(chunk)); + } +} + +TEST(DistributionStatus, ChunkTagsSelectorWithMinMaxKeys) { + DistributionStatus d(kNamespace, ShardToChunksMap{}); + + ASSERT_OK(d.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << -100), "a"))); + ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << -10), BSON("x" << 10), "b"))); + ASSERT_OK(d.addRangeToZone(ZoneRange(BSON("x" << 100), kMaxBSONKey, "c"))); + + { + ChunkType chunk; + chunk.setMin(kMinBSONKey); + chunk.setMax(BSON("x" << -100)); + ASSERT_EQUALS("a", d.getTagForChunk(chunk)); + } + + { + ChunkType chunk; + chunk.setMin(BSON("x" << -100)); + chunk.setMax(BSON("x" << -11)); + ASSERT_EQUALS("", d.getTagForChunk(chunk)); + } + + { + ChunkType chunk; + chunk.setMin(BSON("x" << -10)); + chunk.setMax(BSON("x" << 0)); + ASSERT_EQUALS("b", d.getTagForChunk(chunk)); + } + + { + ChunkType chunk; + chunk.setMin(BSON("x" << 0)); + chunk.setMax(BSON("x" << 10)); + ASSERT_EQUALS("b", d.getTagForChunk(chunk)); + } + + { + ChunkType chunk; + chunk.setMin(BSON("x" << 10)); + chunk.setMax(BSON("x" << 20)); + ASSERT_EQUALS("", d.getTagForChunk(chunk)); + } + + { + ChunkType chunk; + chunk.setMin(BSON("x" << 200)); + chunk.setMax(kMaxBSONKey); + ASSERT_EQUALS("c", d.getTagForChunk(chunk)); + } +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/balancer/cluster_statistics.cpp b/src/mongo/s/balancer/cluster_statistics.cpp new file mode 100644 index 00000000000..d42994379ef --- /dev/null +++ b/src/mongo/s/balancer/cluster_statistics.cpp @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/cluster_statistics.h" + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" + +namespace mongo { + +ClusterStatistics::ClusterStatistics() = default; + +ClusterStatistics::~ClusterStatistics() = default; + +ClusterStatistics::ShardStatistics::ShardStatistics() = default; + +ClusterStatistics::ShardStatistics::ShardStatistics(ShardId inShardId, + uint64_t inMaxSizeMB, + uint64_t inCurrSizeMB, + bool inIsDraining, + std::set<std::string> inShardTags, + std::string inMongoVersion) + : shardId(std::move(inShardId)), + maxSizeMB(std::move(inMaxSizeMB)), + currSizeMB(std::move(inCurrSizeMB)), + isDraining(std::move(inIsDraining)), + shardTags(std::move(inShardTags)), + mongoVersion(std::move(inMongoVersion)) {} + +bool ClusterStatistics::ShardStatistics::isSizeMaxed() const { + if (!maxSizeMB || !currSizeMB) { + return false; + } + + return currSizeMB >= maxSizeMB; +} + +BSONObj ClusterStatistics::ShardStatistics::toBSON() const { + BSONObjBuilder builder; + builder.append("id", shardId.toString()); + builder.append("maxSizeMB", static_cast<long long>(maxSizeMB)); + builder.append("currSizeMB", static_cast<long long>(currSizeMB)); + builder.append("draining", isDraining); + if (!shardTags.empty()) { + BSONArrayBuilder arrayBuilder(builder.subarrayStart("tags")); + arrayBuilder.append(shardTags); + } + + builder.append("version", mongoVersion); + return builder.obj(); +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/cluster_statistics.h b/src/mongo/s/balancer/cluster_statistics.h new file mode 100644 index 00000000000..8963720ee6f --- /dev/null +++ b/src/mongo/s/balancer/cluster_statistics.h @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <set> +#include <string> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/s/client/shard.h" + +namespace mongo { + +class BSONObj; +class OperationContext; +template <typename T> +class StatusWith; + +/** + * This interface serves as means for obtaining data distribution and shard utilization statistics + * for the entire sharded cluster. Implementations may choose whatever means necessary to perform + * the statistics collection. There should be one instance of this object per service context. + */ +class ClusterStatistics { + MONGO_DISALLOW_COPYING(ClusterStatistics); + +public: + /** + * Structure, which describes the statistics of a single shard host. + */ + struct ShardStatistics { + public: + ShardStatistics(); + ShardStatistics(ShardId shardId, + uint64_t maxSizeMB, + uint64_t currSizeMB, + bool isDraining, + std::set<std::string> shardTags, + std::string mongoVersion); + + /** + * Returns if a shard cannot receive any new chunks because it has reached the per-shard + * data size limit. + */ + bool isSizeMaxed() const; + + /** + * Returns BSON representation of this shard's statistics, for reporting purposes. + */ + BSONObj toBSON() const; + + // The id of the shard for which this statistic applies + ShardId shardId; + + // The maximum size allowed for the shard + uint64_t maxSizeMB{0}; + + // The current size of the shard + uint64_t currSizeMB{0}; + + // Whether the shard is in draining mode + bool isDraining{false}; + + // Set of tags for the shard + std::set<std::string> shardTags; + + // Version of mongod, which runs on this shard's primary + std::string mongoVersion; + }; + + virtual ~ClusterStatistics(); + + /** + * Retrieves a snapshot of the current shard utilization state. The implementation of this + * method may block if necessary in order to refresh its state or may return a cached value. + */ + virtual StatusWith<std::vector<ShardStatistics>> getStats(OperationContext* txn) = 0; + +protected: + ClusterStatistics(); +}; + +} // namespace mongo diff --git a/src/mongo/s/balancer/cluster_statistics_impl.cpp b/src/mongo/s/balancer/cluster_statistics_impl.cpp new file mode 100644 index 00000000000..b6e734c6fc2 --- /dev/null +++ b/src/mongo/s/balancer/cluster_statistics_impl.cpp @@ -0,0 +1,160 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/cluster_statistics_impl.h" + +#include "mongo/base/status_with.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/read_preference.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/shard_util.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +using std::string; +using std::vector; + +namespace { + +const char kVersionField[] = "version"; + +/** + * Executes the serverStatus command against the specified shard and obtains the version of the + * running MongoD service. + * + * Returns the MongoD version in strig format or an error. Known error codes are: + * ShardNotFound if shard by that id is not available on the registry + * NoSuchKey if the version could not be retrieved + */ +StatusWith<string> retrieveShardMongoDVersion(OperationContext* txn, ShardId shardId) { + auto shardRegistry = Grid::get(txn)->shardRegistry(); + auto shardStatus = shardRegistry->getShard(txn, shardId); + if (!shardStatus.isOK()) { + return shardStatus.getStatus(); + } + auto shard = shardStatus.getValue(); + + auto commandResponse = + shard->runCommandWithFixedRetryAttempts(txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + BSON("serverStatus" << 1), + Shard::RetryPolicy::kIdempotent); + if (!commandResponse.isOK()) { + return commandResponse.getStatus(); + } + if (!commandResponse.getValue().commandStatus.isOK()) { + return commandResponse.getValue().commandStatus; + } + + BSONObj serverStatus = std::move(commandResponse.getValue().response); + + string version; + Status status = bsonExtractStringField(serverStatus, kVersionField, &version); + if (!status.isOK()) { + return status; + } + + return version; +} + +} // namespace + +using ShardStatistics = ClusterStatistics::ShardStatistics; + +ClusterStatisticsImpl::ClusterStatisticsImpl() = default; + +ClusterStatisticsImpl::~ClusterStatisticsImpl() = default; + +StatusWith<vector<ShardStatistics>> ClusterStatisticsImpl::getStats(OperationContext* txn) { + // Get a list of all the shards that are participating in this balance round along with any + // maximum allowed quotas and current utilization. We get the latter by issuing + // db.serverStatus() (mem.mapped) to all shards. + // + // TODO: skip unresponsive shards and mark information as stale. + auto shardsStatus = Grid::get(txn)->catalogClient(txn)->getAllShards( + txn, repl::ReadConcernLevel::kMajorityReadConcern); + if (!shardsStatus.isOK()) { + return shardsStatus.getStatus(); + } + + const vector<ShardType> shards(std::move(shardsStatus.getValue().value)); + + vector<ShardStatistics> stats; + + for (const auto& shard : shards) { + auto shardSizeStatus = shardutil::retrieveTotalShardSize(txn, shard.getName()); + if (!shardSizeStatus.isOK()) { + const Status& status = shardSizeStatus.getStatus(); + + return {status.code(), + str::stream() << "Unable to obtain shard utilization information for " + << shard.getName() + << " due to " + << status.reason()}; + } + + string mongoDVersion; + + auto mongoDVersionStatus = retrieveShardMongoDVersion(txn, shard.getName()); + if (mongoDVersionStatus.isOK()) { + mongoDVersion = std::move(mongoDVersionStatus.getValue()); + } else { + // Since the mongod version is only used for reporting, there is no need to fail the + // entire round if it cannot be retrieved, so just leave it empty + log() << "Unable to obtain shard version for " << shard.getName() + << causedBy(mongoDVersionStatus.getStatus()); + } + + std::set<string> shardTags; + + for (const auto& shardTag : shard.getTags()) { + shardTags.insert(shardTag); + } + + stats.emplace_back(shard.getName(), + shard.getMaxSizeMB(), + shardSizeStatus.getValue() / 1024 / 1024, + shard.getDraining(), + std::move(shardTags), + std::move(mongoDVersion)); + } + + return stats; +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/cluster_statistics_impl.h b/src/mongo/s/balancer/cluster_statistics_impl.h new file mode 100644 index 00000000000..493c792a713 --- /dev/null +++ b/src/mongo/s/balancer/cluster_statistics_impl.h @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/s/balancer/cluster_statistics.h" + +namespace mongo { + +/** + * Default implementation for the cluster statistics gathering utility. Uses a blocking method to + * fetch the statistics and does not perform any caching. If any of the shards fails to report + * statistics fails the entire refresh. + */ +class ClusterStatisticsImpl final : public ClusterStatistics { +public: + ClusterStatisticsImpl(); + ~ClusterStatisticsImpl(); + + StatusWith<std::vector<ShardStatistics>> getStats(OperationContext* txn) override; +}; + +} // namespace mongo diff --git a/src/mongo/s/balancer/cluster_statistics_test.cpp b/src/mongo/s/balancer/cluster_statistics_test.cpp new file mode 100644 index 00000000000..c9115b6dafd --- /dev/null +++ b/src/mongo/s/balancer/cluster_statistics_test.cpp @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/cluster_statistics.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using ShardStatistics = ClusterStatistics::ShardStatistics; + +const auto emptyTagSet = std::set<std::string>(); + +TEST(ShardStatistics, SizeMaxedTest) { + ASSERT( + !ShardStatistics(ShardId("TestShardId"), 0, 0, false, emptyTagSet, "3.2.0").isSizeMaxed()); + ASSERT(!ShardStatistics(ShardId("TestShardId"), 100LL, 80LL, false, emptyTagSet, "3.2.0") + .isSizeMaxed()); + ASSERT(ShardStatistics(ShardId("TestShardId"), 100LL, 110LL, false, emptyTagSet, "3.2.0") + .isSizeMaxed()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp new file mode 100644 index 00000000000..c5af288fb2f --- /dev/null +++ b/src/mongo/s/balancer/migration_manager.cpp @@ -0,0 +1,791 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/migration_manager.h" + +#include <memory> + +#include "mongo/bson/simple_bsonobj_comparator.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/remote_command_targeter.h" +#include "mongo/db/client.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/balancer/scoped_migration_request.h" +#include "mongo/s/balancer/type_migration.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/move_chunk_request.h" +#include "mongo/s/sharding_raii.h" +#include "mongo/util/log.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +using executor::RemoteCommandRequest; +using executor::RemoteCommandResponse; +using std::shared_ptr; +using std::vector; +using str::stream; + +namespace { + +const char kChunkTooBig[] = "chunkTooBig"; +const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + Seconds(15)); + +/** + * Parses the specified asynchronous command response and converts it to status to use as outcome of + * an asynchronous migration command. It is necessary for two reasons: + * - Preserve backwards compatibility with 3.2 and earlier, where the move chunk command instead of + * returning a ChunkTooBig status includes an extra field in the response. + * - Convert CallbackCanceled errors into BalancerInterrupted for the cases where the migration + * manager is being stopped at replica set stepdown. This return code allows the mongos calling + * logic to retry the operation on a new primary. + */ +Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandResponse& response, + bool isStopping) { + if (!response.isOK()) { + if (response.status == ErrorCodes::CallbackCanceled && isStopping) { + return {ErrorCodes::BalancerInterrupted, + "Migration interrupted because the balancer is stopping"}; + } + + return response.status; + } + + Status commandStatus = getStatusFromCommandResult(response.data); + + if (!commandStatus.isOK()) { + bool chunkTooBig = false; + bsonExtractBooleanFieldWithDefault(response.data, kChunkTooBig, false, &chunkTooBig); + if (chunkTooBig) { + commandStatus = {ErrorCodes::ChunkTooBig, commandStatus.reason()}; + } + } + + return commandStatus; +} + +/** + * Blocking call to acquire the distributed collection lock for the specified namespace. + */ +StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn, + const OID& lockSessionID, + const NamespaceString& nss) { + const std::string whyMessage(stream() << "Migrating chunk(s) in collection " << nss.ns()); + + auto statusWithDistLockHandle = + Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lockWithSessionID( + txn, nss.ns(), whyMessage, lockSessionID, DistLockManager::kSingleLockAttemptTimeout); + + if (!statusWithDistLockHandle.isOK()) { + // If we get LockBusy while trying to acquire the collection distributed lock, this implies + // that a concurrent collection operation is running either on a 3.2 shard or on mongos. + // Convert it to ConflictingOperationInProgress to better indicate the error. + // + // In addition, the code which re-schedules parallel migrations serially for 3.2 shard + // compatibility uses the LockBusy code as a hint to do the reschedule. + const ErrorCodes::Error code = (statusWithDistLockHandle == ErrorCodes::LockBusy + ? ErrorCodes::ConflictingOperationInProgress + : statusWithDistLockHandle.getStatus().code()); + + return {code, + stream() << "Could not acquire collection lock for " << nss.ns() + << " to migrate chunks, due to " + << statusWithDistLockHandle.getStatus().reason()}; + } + + return std::move(statusWithDistLockHandle.getValue()); +} + +/** + * Returns whether the specified status is an error caused by stepdown of the primary config node + * currently running the balancer. + */ +bool isErrorDueToBalancerStepdown(Status status) { + return (status == ErrorCodes::BalancerInterrupted || + status == ErrorCodes::InterruptedAtShutdown || + status == ErrorCodes::InterruptedDueToReplStateChange || + ErrorCodes::isShutdownError(status.code())); +} + +} // namespace + +MigrationManager::MigrationManager(ServiceContext* serviceContext) + : _serviceContext(serviceContext), _lockSessionID(OID::gen()) {} + +MigrationManager::~MigrationManager() { + // The migration manager must be completely quiesced at destruction time + invariant(_activeMigrationsWithoutDistLock.empty()); +} + +MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( + OperationContext* txn, + const vector<MigrateInfo>& migrateInfos, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete) { + + MigrationStatuses migrationStatuses; + + vector<MigrateInfo> rescheduledMigrations; + + { + std::map<MigrationIdentifier, ScopedMigrationRequest> scopedMigrationRequests; + vector<std::pair<shared_ptr<Notification<Status>>, MigrateInfo>> responses; + + for (const auto& migrateInfo : migrateInfos) { + // Write a document to the config.migrations collection, in case this migration must be + // recovered by the Balancer. Fail if the chunk is already moving. + auto statusWithScopedMigrationRequest = + ScopedMigrationRequest::writeMigration(txn, migrateInfo); + if (!statusWithScopedMigrationRequest.isOK()) { + migrationStatuses.emplace(migrateInfo.getName(), + std::move(statusWithScopedMigrationRequest.getStatus())); + continue; + } + scopedMigrationRequests.emplace(migrateInfo.getName(), + std::move(statusWithScopedMigrationRequest.getValue())); + + responses.emplace_back(_schedule(txn, + migrateInfo, + false, // Config server takes the collection dist lock + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete), + migrateInfo); + } + + // Wait for all the scheduled migrations to complete and note the ones, which failed with a + // LockBusy error code. These need to be executed serially, without the distributed lock + // being held by the config server for backwards compatibility with 3.2 shards. + for (auto& response : responses) { + auto notification = std::move(response.first); + auto migrateInfo = std::move(response.second); + + Status responseStatus = notification->get(); + + if (responseStatus == ErrorCodes::LockBusy) { + rescheduledMigrations.emplace_back(std::move(migrateInfo)); + } else { + if (isErrorDueToBalancerStepdown(responseStatus)) { + auto it = scopedMigrationRequests.find(migrateInfo.getName()); + invariant(it != scopedMigrationRequests.end()); + it->second.keepDocumentOnDestruct(); + } + + migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus)); + } + } + } + + // Schedule all 3.2 compatibility migrations sequentially + for (const auto& migrateInfo : rescheduledMigrations) { + // Write a document to the config.migrations collection, in case this migration must be + // recovered by the Balancer. Fail if the chunk is already moving. + auto statusWithScopedMigrationRequest = + ScopedMigrationRequest::writeMigration(txn, migrateInfo); + if (!statusWithScopedMigrationRequest.isOK()) { + migrationStatuses.emplace(migrateInfo.getName(), + std::move(statusWithScopedMigrationRequest.getStatus())); + continue; + } + + Status responseStatus = _schedule(txn, + migrateInfo, + true, // Shard takes the collection dist lock + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete) + ->get(); + + if (isErrorDueToBalancerStepdown(responseStatus)) { + statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct(); + } + + migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus)); + } + + invariant(migrationStatuses.size() == migrateInfos.size()); + + return migrationStatuses; +} + +Status MigrationManager::executeManualMigration( + OperationContext* txn, + const MigrateInfo& migrateInfo, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete) { + _waitForRecovery(); + + // Write a document to the config.migrations collection, in case this migration must be + // recovered by the Balancer. Fail if the chunk is already moving. + auto statusWithScopedMigrationRequest = + ScopedMigrationRequest::writeMigration(txn, migrateInfo); + if (!statusWithScopedMigrationRequest.isOK()) { + return statusWithScopedMigrationRequest.getStatus(); + } + + Status status = _schedule(txn, + migrateInfo, + false, // Config server takes the collection dist lock + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete) + ->get(); + + auto scopedCMStatus = ScopedChunkManager::getExisting(txn, NamespaceString(migrateInfo.ns)); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); + } + + auto scopedCM = std::move(scopedCMStatus.getValue()); + ChunkManager* const cm = scopedCM.cm(); + + auto chunk = cm->findIntersectingChunkWithSimpleCollation(txn, migrateInfo.minKey); + invariant(chunk); + + // The order of the checks below is important due to the need for interrupted migration calls to + // be able to join any possibly completed migrations, which are still running in the + // waitForDelete step. + if (isErrorDueToBalancerStepdown(status)) { + statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct(); + + // We want the mongos to get a retriable error, and not make its replica set monitor + // interpret something like InterruptedDueToReplStateChange as the config server when the + // error comes from the shard. + return {ErrorCodes::BalancerInterrupted, status.reason()}; + } else if (chunk->getShardId() == migrateInfo.to) { + // Regardless of the status, if the chunk's current shard matches the destination, deem the + // move as success. + return Status::OK(); + } + + return status; +} + +void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_state == State::kStopped); + invariant(_migrationRecoveryMap.empty()); + _state = State::kRecovering; + } + + auto scopedGuard = MakeGuard([&] { + _migrationRecoveryMap.clear(); + _abandonActiveMigrationsAndEnableManager(txn); + }); + + // Load the active migrations from the config.migrations collection. + auto statusWithMigrationsQueryResponse = + Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(MigrationType::ConfigNS), + BSONObj(), + BSONObj(), + boost::none); + + if (!statusWithMigrationsQueryResponse.isOK()) { + warning() << "Unable to read config.migrations collection documents for balancer migration" + << " recovery. Abandoning balancer recovery." + << causedBy(redact(statusWithMigrationsQueryResponse.getStatus())); + return; + } + + for (const BSONObj& migration : statusWithMigrationsQueryResponse.getValue().docs) { + auto statusWithMigrationType = MigrationType::fromBSON(migration); + if (!statusWithMigrationType.isOK()) { + // The format of this migration document is incorrect. The balancer holds a distlock for + // this migration, but without parsing the migration document we cannot identify which + // distlock must be released. So we must release all distlocks. + warning() << "Unable to parse config.migrations document '" + << redact(migration.toString()) + << "' for balancer migration recovery. Abandoning balancer recovery." + << causedBy(redact(statusWithMigrationType.getStatus())); + return; + } + MigrateInfo migrateInfo = statusWithMigrationType.getValue().toMigrateInfo(); + + auto it = _migrationRecoveryMap.find(NamespaceString(migrateInfo.ns)); + if (it == _migrationRecoveryMap.end()) { + std::list<MigrateInfo> list; + it = _migrationRecoveryMap.insert(std::make_pair(NamespaceString(migrateInfo.ns), list)) + .first; + + // Reacquire the matching distributed lock for this namespace. + const std::string whyMessage(stream() << "Migrating chunk(s) in collection " + << redact(migrateInfo.ns)); + auto statusWithDistLockHandle = + Grid::get(txn) + ->catalogClient(txn) + ->getDistLockManager() + ->tryLockWithLocalWriteConcern(txn, migrateInfo.ns, whyMessage, _lockSessionID); + if (!statusWithDistLockHandle.isOK() && + statusWithDistLockHandle.getStatus() != ErrorCodes::LockBusy) { + // LockBusy is alright because that should mean a 3.2 shard has it for the active + // migration. + warning() << "Failed to acquire distributed lock for collection '" + << redact(migrateInfo.ns) + << "' during balancer recovery of an active migration. Abandoning" + << " balancer recovery." + << causedBy(redact(statusWithDistLockHandle.getStatus())); + return; + } + } + + it->second.push_back(std::move(migrateInfo)); + } + + scopedGuard.Dismiss(); +} + +void MigrationManager::finishRecovery(OperationContext* txn, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_state == State::kStopping) { + _migrationRecoveryMap.clear(); + return; + } + + // If recovery was abandoned in startRecovery, then there is no more to do. + if (_state == State::kEnabled) { + invariant(_migrationRecoveryMap.empty()); + return; + } + + invariant(_state == State::kRecovering); + } + + auto scopedGuard = MakeGuard([&] { + _migrationRecoveryMap.clear(); + _abandonActiveMigrationsAndEnableManager(txn); + }); + + // Schedule recovered migrations. + vector<ScopedMigrationRequest> scopedMigrationRequests; + vector<shared_ptr<Notification<Status>>> responses; + + for (auto& nssAndMigrateInfos : _migrationRecoveryMap) { + auto& nss = nssAndMigrateInfos.first; + auto& migrateInfos = nssAndMigrateInfos.second; + invariant(!migrateInfos.empty()); + + auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss); + if (!scopedCMStatus.isOK()) { + // This shouldn't happen because the collection was intact and sharded when the previous + // config primary was active and the dist locks have been held by the balancer + // throughout. Abort migration recovery. + warning() << "Unable to reload chunk metadata for collection '" << nss + << "' during balancer recovery. Abandoning recovery." + << causedBy(redact(scopedCMStatus.getStatus())); + return; + } + + auto scopedCM = std::move(scopedCMStatus.getValue()); + ChunkManager* const cm = scopedCM.cm(); + + int scheduledMigrations = 0; + + while (!migrateInfos.empty()) { + const auto migrationInfo = std::move(migrateInfos.front()); + migrateInfos.pop_front(); + + auto chunk = cm->findIntersectingChunkWithSimpleCollation(txn, migrationInfo.minKey); + invariant(chunk); + + if (chunk->getShardId() != migrationInfo.from) { + // Chunk is no longer on the source shard specified by this migration. Erase the + // migration recovery document associated with it. + ScopedMigrationRequest::createForRecovery(txn, nss, migrationInfo.minKey); + continue; + } + + scopedMigrationRequests.emplace_back( + ScopedMigrationRequest::createForRecovery(txn, nss, migrationInfo.minKey)); + + scheduledMigrations++; + + responses.emplace_back(_schedule(txn, + migrationInfo, + false, // Config server takes the collection dist lock + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete)); + } + + // If no migrations were scheduled for this namespace, free the dist lock + if (!scheduledMigrations) { + Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock( + txn, _lockSessionID, nss.ns()); + } + } + + _migrationRecoveryMap.clear(); + scopedGuard.Dismiss(); + + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_state == State::kRecovering) { + _state = State::kEnabled; + _condVar.notify_all(); + } + } + + // Wait for each migration to finish, as usual. + for (auto& response : responses) { + response->get(); + } +} + +void MigrationManager::interruptAndDisableMigrations() { + executor::TaskExecutor* const executor = + Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); + + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_state == State::kEnabled || _state == State::kRecovering); + _state = State::kStopping; + + // Interrupt any active migrations with dist lock + for (auto& cmsEntry : _activeMigrationsWithDistLock) { + auto* cms = &cmsEntry.second; + + for (auto& migration : cms->migrations) { + if (migration.callbackHandle) { + executor->cancel(*migration.callbackHandle); + } + } + } + + // Interrupt any active migrations without dist lock + for (auto& migration : _activeMigrationsWithoutDistLock) { + if (migration.callbackHandle) { + executor->cancel(*migration.callbackHandle); + } + } + + _checkDrained_inlock(); +} + +void MigrationManager::drainActiveMigrations() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + + if (_state == State::kStopped) + return; + invariant(_state == State::kStopping); + + _condVar.wait(lock, [this] { + return _activeMigrationsWithDistLock.empty() && _activeMigrationsWithoutDistLock.empty(); + }); + + _state = State::kStopped; +} + +shared_ptr<Notification<Status>> MigrationManager::_schedule( + OperationContext* txn, + const MigrateInfo& migrateInfo, + bool shardTakesCollectionDistLock, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete) { + const NamespaceString nss(migrateInfo.ns); + + // Ensure we are not stopped in order to avoid doing the extra work + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_state != State::kEnabled && _state != State::kRecovering) { + return std::make_shared<Notification<Status>>( + Status(ErrorCodes::BalancerInterrupted, + "Migration cannot be executed because the balancer is not running")); + } + } + + + // Sanity checks that the chunk being migrated is actually valid. These will be repeated at the + // shard as well, but doing them here saves an extra network call, which might otherwise fail. + auto statusWithScopedChunkManager = ScopedChunkManager::getExisting(txn, nss); + if (!statusWithScopedChunkManager.isOK()) { + return std::make_shared<Notification<Status>>( + std::move(statusWithScopedChunkManager.getStatus())); + } + + ChunkManager* const chunkManager = statusWithScopedChunkManager.getValue().cm(); + + auto chunk = chunkManager->findIntersectingChunkWithSimpleCollation(txn, migrateInfo.minKey); + invariant(chunk); + + // If the chunk is not found exactly as requested, the caller must have stale data + if (SimpleBSONObjComparator::kInstance.evaluate(chunk->getMin() != migrateInfo.minKey) || + SimpleBSONObjComparator::kInstance.evaluate(chunk->getMax() != migrateInfo.maxKey)) { + return std::make_shared<Notification<Status>>(Status( + ErrorCodes::IncompatibleShardingMetadata, + stream() << "Chunk " << ChunkRange(migrateInfo.minKey, migrateInfo.maxKey).toString() + << " does not exist.")); + } + + const auto fromShardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from); + if (!fromShardStatus.isOK()) { + return std::make_shared<Notification<Status>>(std::move(fromShardStatus.getStatus())); + } + + const auto fromShard = fromShardStatus.getValue(); + auto fromHostStatus = + fromShard->getTargeter()->findHost(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); + if (!fromHostStatus.isOK()) { + return std::make_shared<Notification<Status>>(std::move(fromHostStatus.getStatus())); + } + + BSONObjBuilder builder; + MoveChunkRequest::appendAsCommand( + &builder, + nss, + chunkManager->getVersion(), + Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(), + migrateInfo.from, + migrateInfo.to, + ChunkRange(migrateInfo.minKey, migrateInfo.maxKey), + chunk->getLastmod(), + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete, + shardTakesCollectionDistLock); + + stdx::lock_guard<stdx::mutex> lock(_mutex); + + if (_state != State::kEnabled && _state != State::kRecovering) { + return std::make_shared<Notification<Status>>( + Status(ErrorCodes::BalancerInterrupted, + "Migration cannot be executed because the balancer is not running")); + } + + Migration migration(nss, builder.obj()); + + auto retVal = migration.completionNotification; + + if (shardTakesCollectionDistLock) { + _scheduleWithoutDistLock_inlock(txn, fromHostStatus.getValue(), std::move(migration)); + } else { + _scheduleWithDistLock_inlock(txn, fromHostStatus.getValue(), std::move(migration)); + } + + return retVal; +} + +void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn, + const HostAndPort& targetHost, + Migration migration) { + executor::TaskExecutor* const executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor(); + + const NamespaceString nss(migration.nss); + + auto it = _activeMigrationsWithDistLock.find(nss); + if (it == _activeMigrationsWithDistLock.end()) { + // Acquire the collection distributed lock (blocking call) + auto distLockHandleStatus = acquireDistLock(txn, _lockSessionID, nss); + if (!distLockHandleStatus.isOK()) { + migration.completionNotification->set(distLockHandleStatus.getStatus()); + return; + } + + it = _activeMigrationsWithDistLock + .insert(std::make_pair( + nss, CollectionMigrationsState(std::move(distLockHandleStatus.getValue())))) + .first; + } + + auto collectionMigrationState = &it->second; + + // Add ourselves to the list of migrations on this collection + collectionMigrationState->migrations.push_front(std::move(migration)); + auto itMigration = collectionMigrationState->migrations.begin(); + + const RemoteCommandRequest remoteRequest( + targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, txn); + + StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus = + executor->scheduleRemoteCommand( + remoteRequest, + [this, collectionMigrationState, itMigration]( + const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + Client::initThread(getThreadName().c_str()); + ON_BLOCK_EXIT([&] { Client::destroy(); }); + auto txn = cc().makeOperationContext(); + + stdx::lock_guard<stdx::mutex> lock(_mutex); + _completeWithDistLock_inlock( + txn.get(), + itMigration, + extractMigrationStatusFromRemoteCommandResponse( + args.response, _state != State::kEnabled && _state != State::kRecovering)); + }); + + if (callbackHandleWithStatus.isOK()) { + itMigration->callbackHandle = std::move(callbackHandleWithStatus.getValue()); + return; + } + + _completeWithDistLock_inlock(txn, itMigration, std::move(callbackHandleWithStatus.getStatus())); +} + +void MigrationManager::_completeWithDistLock_inlock(OperationContext* txn, + MigrationsList::iterator itMigration, + Status status) { + const NamespaceString nss(itMigration->nss); + + // Make sure to signal the notification last, after the distributed lock is freed, so that we + // don't have the race condition where a subsequently scheduled migration finds the dist lock + // still acquired. + auto notificationToSignal = itMigration->completionNotification; + + auto it = _activeMigrationsWithDistLock.find(nss); + invariant(it != _activeMigrationsWithDistLock.end()); + + auto collectionMigrationState = &it->second; + collectionMigrationState->migrations.erase(itMigration); + + if (collectionMigrationState->migrations.empty()) { + Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock( + txn, collectionMigrationState->distLockHandle, nss.ns()); + _activeMigrationsWithDistLock.erase(it); + _checkDrained_inlock(); + } + + notificationToSignal->set(status); +} + +void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn, + const HostAndPort& targetHost, + Migration migration) { + executor::TaskExecutor* const executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor(); + + _activeMigrationsWithoutDistLock.push_front(std::move(migration)); + auto itMigration = _activeMigrationsWithoutDistLock.begin(); + + const RemoteCommandRequest remoteRequest( + targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, txn); + + StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus = + executor->scheduleRemoteCommand( + remoteRequest, + [this, itMigration](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + auto notificationToSignal = itMigration->completionNotification; + + stdx::lock_guard<stdx::mutex> lock(_mutex); + + _activeMigrationsWithoutDistLock.erase(itMigration); + _checkDrained_inlock(); + + notificationToSignal->set(extractMigrationStatusFromRemoteCommandResponse( + args.response, _state != State::kEnabled && _state != State::kRecovering)); + }); + + if (callbackHandleWithStatus.isOK()) { + itMigration->callbackHandle = std::move(callbackHandleWithStatus.getValue()); + return; + } + + auto notificationToSignal = itMigration->completionNotification; + + _activeMigrationsWithoutDistLock.erase(itMigration); + _checkDrained_inlock(); + + notificationToSignal->set(std::move(callbackHandleWithStatus.getStatus())); +} + +void MigrationManager::_checkDrained_inlock() { + if (_state == State::kEnabled || _state == State::kRecovering) { + return; + } + invariant(_state == State::kStopping); + + if (_activeMigrationsWithDistLock.empty() && _activeMigrationsWithoutDistLock.empty()) { + _condVar.notify_all(); + } +} + +void MigrationManager::_waitForRecovery() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _condVar.wait(lock, [this] { return _state != State::kRecovering; }); +} + +void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* txn) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + if (_state == State::kStopping) { + // The balancer was interrupted. Let the next balancer recover the state. + return; + } + invariant(_state == State::kRecovering); + + auto catalogClient = Grid::get(txn)->catalogClient(txn); + + // Unlock all balancer distlocks we aren't using anymore. + auto distLockManager = catalogClient->getDistLockManager(); + distLockManager->unlockAll(txn, distLockManager->getProcessID()); + + // Clear the config.migrations collection so that those chunks can be scheduled for migration + // again. + catalogClient->removeConfigDocuments( + txn, MigrationType::ConfigNS, BSONObj(), kMajorityWriteConcern); + + _state = State::kEnabled; + _condVar.notify_all(); +} + +MigrationManager::Migration::Migration(NamespaceString inNss, BSONObj inMoveChunkCmdObj) + : nss(std::move(inNss)), + moveChunkCmdObj(std::move(inMoveChunkCmdObj)), + completionNotification(std::make_shared<Notification<Status>>()) {} + +MigrationManager::Migration::~Migration() { + invariant(completionNotification); +} + +MigrationManager::CollectionMigrationsState::CollectionMigrationsState( + DistLockHandle inDistLockHandle) + : distLockHandle(std::move(inDistLockHandle)) {} + +MigrationManager::CollectionMigrationsState::~CollectionMigrationsState() { + invariant(migrations.empty()); +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/migration_manager.h b/src/mongo/s/balancer/migration_manager.h new file mode 100644 index 00000000000..20585df7c23 --- /dev/null +++ b/src/mongo/s/balancer/migration_manager.h @@ -0,0 +1,294 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <list> +#include <map> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/namespace_string.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/balancer/balancer_policy.h" +#include "mongo/s/catalog/dist_lock_manager.h" +#include "mongo/s/migration_secondary_throttle_options.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/unordered_map.h" +#include "mongo/util/concurrency/notification.h" + +namespace mongo { + +class OperationContext; +class ServiceContext; +class Status; +template <typename T> +class StatusWith; + +// Uniquely identifies a migration, regardless of shard and version. +typedef std::string MigrationIdentifier; +typedef std::map<MigrationIdentifier, Status> MigrationStatuses; + +/** + * Manages and executes parallel migrations for the balancer. + * + * TODO: for v3.6, remove code making compatible with v3.2 shards that take distlock. + */ +class MigrationManager { + MONGO_DISALLOW_COPYING(MigrationManager); + +public: + MigrationManager(ServiceContext* serviceContext); + ~MigrationManager(); + + /** + * A blocking method that attempts to schedule all the migrations specified in + * "candidateMigrations" and wait for them to complete. Takes the distributed lock for each + * collection with a chunk being migrated. + * + * If any of the migrations, which were scheduled in parallel fails with a LockBusy error + * reported from the shard, retries it serially without the distributed lock. + * + * Returns a map of migration Status objects to indicate the success/failure of each migration. + */ + MigrationStatuses executeMigrationsForAutoBalance( + OperationContext* txn, + const std::vector<MigrateInfo>& migrateInfos, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete); + + /** + * A blocking method that attempts to schedule the migration specified in "migrateInfo" and + * waits for it to complete. Takes the distributed lock for the namespace which is being + * migrated. + * + * Returns the status of the migration. + */ + Status executeManualMigration(OperationContext* txn, + const MigrateInfo& migrateInfo, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete); + + /** + * Non-blocking method that puts the migration manager in the kRecovering state, in which + * new migration requests will block until finishRecovery is called. Then does local writes to + * reacquire the distributed locks for active migrations. + * + * The active migration recovery may fail and be abandoned, setting the state to kEnabled. + */ + void startRecoveryAndAcquireDistLocks(OperationContext* txn); + + /** + * Blocking method that must only be called after startRecovery has been called. Recovers the + * state of the migration manager (if necessary and able) and puts it in the kEnabled state, + * where it will accept new migrations. Any migrations waiting on the recovery state will be + * unblocked once the state is kEnabled, and then this function waits for the recovered active + * migrations to finish before returning. + * + * The active migration recovery may fail and be abandoned, setting the state to kEnabled and + * unblocking any process waiting on the recovery state. + */ + void finishRecovery(OperationContext* txn, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete); + + /** + * Non-blocking method that should never be called concurrently with finishRecovery. Puts the + * manager in a state where all subsequently scheduled migrations will immediately fail (without + * ever getting scheduled) and all active ones will be cancelled. It has no effect if the + * migration manager is already stopping or stopped. + */ + void interruptAndDisableMigrations(); + + /** + * Blocking method that waits for any currently scheduled migrations to complete. Must be + * called after interruptAndDisableMigrations has been called in order to be able to re-enable + * migrations again. + */ + void drainActiveMigrations(); + +private: + // The current state of the migration manager + enum class State { // Allowed transitions: + kStopped, // kRecovering + kRecovering, // kEnabled, kStopping + kEnabled, // kStopping + kStopping, // kStopped + }; + + /** + * Tracks the execution state of a single migration. + */ + struct Migration { + Migration(NamespaceString nss, BSONObj moveChunkCmdObj); + ~Migration(); + + // Namespace for which this migration applies + NamespaceString nss; + + // Command object representing the migration + BSONObj moveChunkCmdObj; + + // Callback handle for the migration network request. If the migration has not yet been sent + // on the network, this value is not set. + boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle; + + // Notification, which will be signaled when the migration completes + std::shared_ptr<Notification<Status>> completionNotification; + }; + + // Used as a type in which to store a list of active migrations. The reason to choose list is + // that its iterators do not get invalidated when entries are removed around them. This allows + // O(1) removal time. + using MigrationsList = std::list<Migration>; + + /** + * Contains the runtime state for a single collection. This class does not have concurrency + * control of its own and relies on the migration manager's mutex. + */ + struct CollectionMigrationsState { + CollectionMigrationsState(DistLockHandle distLockHandle); + ~CollectionMigrationsState(); + + // Dist lock handle, which must be released at destruction time. + const DistLockHandle distLockHandle; + + // Contains a set of migrations which are currently active for this namespace. + MigrationsList migrations; + }; + + using CollectionMigrationsStateMap = + stdx::unordered_map<NamespaceString, CollectionMigrationsState>; + + /** + * Optionally takes the collection distributed lock and schedules a chunk migration with the + * specified parameters. May block for distributed lock acquisition. If dist lock acquisition is + * successful (or not done), schedules the migration request and returns a notification which + * can be used to obtain the outcome of the operation. + * + * The 'shardTakesCollectionDistLock' parameter controls whether the distributed lock is + * acquired by the migration manager or by the shard executing the migration request. + */ + std::shared_ptr<Notification<Status>> _schedule( + OperationContext* txn, + const MigrateInfo& migrateInfo, + bool shardTakesCollectionDistLock, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete); + + /** + * Acquires the collection distributed lock for the specified namespace and if it succeeds, + * schedules the migration. + * + * The distributed lock is acquired before scheduling the first migration for the collection and + * is only released when all active migrations on the collection have finished. + */ + void _scheduleWithDistLock_inlock(OperationContext* txn, + const HostAndPort& targetHost, + Migration migration); + + /** + * Used internally for migrations scheduled with the distributed lock acquired by the config + * server. Called exactly once for each scheduled migration, it will signal the migration in the + * passed iterator and if this is the last migration for the collection will free the collection + * distributed lock. + */ + void _completeWithDistLock_inlock(OperationContext* txn, + MigrationsList::iterator itMigration, + Status status); + + /** + * Immediately schedules the specified migration without attempting to acquire the collection + * distributed lock or checking that it is not being held. + * + * This method is only used for retrying migrations that have failed with LockBusy errors + * returned by the shard, which only happens with legacy 3.2 shards that take the collection + * distributed lock themselves. + */ + void _scheduleWithoutDistLock_inlock(OperationContext* txn, + const HostAndPort& targetHost, + Migration migration); + + /** + * If the state of the migration manager is kStopping, checks whether there are any outstanding + * scheduled requests and if there aren't any signals the class condition variable. + */ + void _checkDrained_inlock(); + + /** + * Blocking call, which waits for the migration manager to leave the recovering state (if it is + * currently recovering). + */ + void _waitForRecovery(); + + /** + * Should only be called from startRecovery or finishRecovery functions when the migration + * manager is in either the kStopped or kRecovering state. Releases all the distributed locks + * that the balancer holds, clears the config.migrations collection, changes the state of the + * migration manager to kEnabled. Then unblocks all processes waiting for kEnabled state. + */ + void _abandonActiveMigrationsAndEnableManager(OperationContext* txn); + + // The service context under which this migration manager runs. + ServiceContext* const _serviceContext; + + // Used as a constant session ID for all distributed locks that this MigrationManager holds. + // Currently required so that locks can be reacquired for the balancer in startRecovery and then + // overtaken in later operations. + OID _lockSessionID; + + // Carries migration information over from startRecovery to finishRecovery. Should only be set + // in startRecovery and then accessed in finishRecovery. + stdx::unordered_map<NamespaceString, std::list<MigrateInfo>> _migrationRecoveryMap; + + // Protects the class state below. + stdx::mutex _mutex; + + // Always start the migration manager in a stopped state. + State _state{State::kStopped}; + + // Condition variable, which is waited on when the migration manager's state is changing and + // signaled when the state change is complete. + stdx::condition_variable _condVar; + + // Holds information about each collection's distributed lock and active migrations via a + // CollectionMigrationState object. + CollectionMigrationsStateMap _activeMigrationsWithDistLock; + + // Holds information about migrations, which have been scheduled without the collection + // distributed lock acquired (i.e., the shard is asked to acquire it). + MigrationsList _activeMigrationsWithoutDistLock; +}; + +} // namespace mongo diff --git a/src/mongo/s/balancer/migration_manager_test.cpp b/src/mongo/s/balancer/migration_manager_test.cpp new file mode 100644 index 00000000000..64f1aa193ed --- /dev/null +++ b/src/mongo/s/balancer/migration_manager_test.cpp @@ -0,0 +1,980 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/client.h" +#include "mongo/db/commands.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/s/balancer/migration_manager.h" +#include "mongo/s/balancer/type_migration.h" +#include "mongo/s/catalog/dist_lock_manager_mock.h" +#include "mongo/s/catalog/sharding_catalog_client_impl.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_database.h" +#include "mongo/s/catalog/type_locks.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/config_server_test_fixture.h" +#include "mongo/s/move_chunk_request.h" +#include "mongo/stdx/memory.h" + +namespace mongo { +namespace { + +using executor::RemoteCommandRequest; +using executor::RemoteCommandResponse; +using std::vector; +using unittest::assertGet; + +const auto kShardId0 = ShardId("shard0"); +const auto kShardId1 = ShardId("shard1"); +const auto kShardId2 = ShardId("shard2"); +const auto kShardId3 = ShardId("shard3"); + +const HostAndPort kShardHost0 = HostAndPort("TestHost0", 12345); +const HostAndPort kShardHost1 = HostAndPort("TestHost1", 12346); +const HostAndPort kShardHost2 = HostAndPort("TestHost2", 12347); +const HostAndPort kShardHost3 = HostAndPort("TestHost3", 12348); + +const MigrationSecondaryThrottleOptions kDefaultSecondaryThrottle = + MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault); + +const long long kMaxSizeMB = 100; +const std::string kPattern = "_id"; + +const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + Seconds(15)); + +class MigrationManagerTest : public ConfigServerTestFixture { +protected: + /** + * Returns the mock targeter for the specified shard. Useful to use like so + * + * shardTargeterMock(txn, shardId)->setFindHostReturnValue(shardHost); + * + * Then calls to RemoteCommandTargeterMock::findHost will return HostAndPort "shardHost" for + * Shard "shardId". + * + * Scheduling a command requires a shard host target. The command will be caught by the mock + * network, but sending the command requires finding the shard's host. + */ + std::shared_ptr<RemoteCommandTargeterMock> shardTargeterMock(OperationContext* txn, + ShardId shardId); + + /** + * Inserts a document into the config.databases collection to indicate that "dbName" is sharded + * with primary "primaryShard". + */ + void setUpDatabase(const std::string& dbName, const ShardId primaryShard); + + /** + * Inserts a document into the config.collections collection to indicate that "collName" is + * sharded with version "version". The shard key pattern defaults to "_id". + */ + void setUpCollection(const std::string collName, ChunkVersion version); + + /** + * Inserts a document into the config.chunks collection so that the chunk defined by the + * parameters exists. Returns a ChunkType defined by the parameters. + */ + ChunkType setUpChunk(const std::string& collName, + const BSONObj& chunkMin, + const BSONObj& chunkMax, + const ShardId& shardId, + const ChunkVersion& version); + + /** + * Inserts a document into the config.migrations collection as an active migration. + */ + void setUpMigration(const std::string& collName, + const BSONObj& minKey, + const BSONObj& maxKey, + const ShardId& toShard, + const ShardId& fromShard); + + /** + * Asserts that config.migrations is empty and config.locks contains no locked documents, both + * of which should be true if the MigrationManager is inactive and behaving properly. + */ + void checkMigrationsCollectionIsEmptyAndLocksAreUnlocked(); + + /** + * Sets up mock network to expect a moveChunk command and return a fixed BSON response or a + * "returnStatus". + */ + void expectMoveChunkCommand(const ChunkType& chunk, + const ShardId& toShardId, + const bool& takeDistLock, + const BSONObj& response); + void expectMoveChunkCommand(const ChunkType& chunk, + const ShardId& toShardId, + const bool& takeDistLock, + const Status& returnStatus); + + // Random static initialization order can result in X constructor running before Y constructor + // if X and Y are defined in different source files. Defining variables here to enforce order. + const BSONObj kShard0 = + BSON(ShardType::name(kShardId0.toString()) << ShardType::host(kShardHost0.toString()) + << ShardType::maxSizeMB(kMaxSizeMB)); + const BSONObj kShard1 = + BSON(ShardType::name(kShardId1.toString()) << ShardType::host(kShardHost1.toString()) + << ShardType::maxSizeMB(kMaxSizeMB)); + const BSONObj kShard2 = + BSON(ShardType::name(kShardId2.toString()) << ShardType::host(kShardHost2.toString()) + << ShardType::maxSizeMB(kMaxSizeMB)); + const BSONObj kShard3 = + BSON(ShardType::name(kShardId3.toString()) << ShardType::host(kShardHost3.toString()) + << ShardType::maxSizeMB(kMaxSizeMB)); + + const KeyPattern kKeyPattern = KeyPattern(BSON(kPattern << 1)); + + std::unique_ptr<MigrationManager> _migrationManager; + +private: + void setUp() override; + void tearDown() override; +}; + +void MigrationManagerTest::setUp() { + ConfigServerTestFixture::setUp(); + _migrationManager = stdx::make_unique<MigrationManager>(getServiceContext()); + _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); + _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false); +} + +void MigrationManagerTest::tearDown() { + checkMigrationsCollectionIsEmptyAndLocksAreUnlocked(); + _migrationManager->interruptAndDisableMigrations(); + _migrationManager->drainActiveMigrations(); + _migrationManager.reset(); + ConfigServerTestFixture::tearDown(); +} + +std::shared_ptr<RemoteCommandTargeterMock> MigrationManagerTest::shardTargeterMock( + OperationContext* txn, ShardId shardId) { + return RemoteCommandTargeterMock::get( + uassertStatusOK(shardRegistry()->getShard(txn, shardId))->getTargeter()); +} + +void MigrationManagerTest::setUpDatabase(const std::string& dbName, const ShardId primaryShard) { + DatabaseType db; + db.setName(dbName); + db.setPrimary(primaryShard); + db.setSharded(true); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), DatabaseType::ConfigNS, db.toBSON(), kMajorityWriteConcern)); +} + +void MigrationManagerTest::setUpCollection(const std::string collName, ChunkVersion version) { + CollectionType coll; + coll.setNs(NamespaceString(collName)); + coll.setEpoch(version.epoch()); + coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(version.toLong())); + coll.setKeyPattern(kKeyPattern); + coll.setUnique(false); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), CollectionType::ConfigNS, coll.toBSON(), kMajorityWriteConcern)); +} + +ChunkType MigrationManagerTest::setUpChunk(const std::string& collName, + const BSONObj& chunkMin, + const BSONObj& chunkMax, + const ShardId& shardId, + const ChunkVersion& version) { + ChunkType chunk; + chunk.setNS(collName); + chunk.setMin(chunkMin); + chunk.setMax(chunkMax); + chunk.setShard(shardId); + chunk.setVersion(version); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ChunkType::ConfigNS, chunk.toBSON(), kMajorityWriteConcern)); + return chunk; +} + +void MigrationManagerTest::setUpMigration(const std::string& collName, + const BSONObj& minKey, + const BSONObj& maxKey, + const ShardId& toShard, + const ShardId& fromShard) { + BSONObjBuilder builder; + builder.append(MigrationType::ns(), collName); + builder.append(MigrationType::min(), minKey); + builder.append(MigrationType::max(), maxKey); + builder.append(MigrationType::toShard(), toShard.toString()); + builder.append(MigrationType::fromShard(), fromShard.toString()); + MigrationType migrationType = assertGet(MigrationType::fromBSON(builder.obj())); + ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), + MigrationType::ConfigNS, + migrationType.toBSON(), + kMajorityWriteConcern)); +} + +void MigrationManagerTest::checkMigrationsCollectionIsEmptyAndLocksAreUnlocked() { + auto statusWithMigrationsQueryResponse = + shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(MigrationType::ConfigNS), + BSONObj(), + BSONObj(), + boost::none); + Shard::QueryResponse migrationsQueryResponse = + uassertStatusOK(statusWithMigrationsQueryResponse); + ASSERT_EQUALS(0U, migrationsQueryResponse.docs.size()); + + auto statusWithLocksQueryResponse = shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(LocksType::ConfigNS), + BSON(LocksType::state(LocksType::LOCKED)), + BSONObj(), + boost::none); + Shard::QueryResponse locksQueryResponse = uassertStatusOK(statusWithLocksQueryResponse); + ASSERT_EQUALS(0U, locksQueryResponse.docs.size()); +} + +void MigrationManagerTest::expectMoveChunkCommand(const ChunkType& chunk, + const ShardId& toShardId, + const bool& takeDistLock, + const BSONObj& response) { + onCommand([&chunk, &toShardId, &takeDistLock, &response](const RemoteCommandRequest& request) { + NamespaceString nss(request.cmdObj.firstElement().valueStringData()); + ASSERT_EQ(chunk.getNS(), nss.ns()); + + const StatusWith<MoveChunkRequest> moveChunkRequestWithStatus = + MoveChunkRequest::createFromCommand(nss, request.cmdObj); + ASSERT_OK(moveChunkRequestWithStatus.getStatus()); + + ASSERT_EQ(chunk.getNS(), moveChunkRequestWithStatus.getValue().getNss().ns()); + ASSERT_BSONOBJ_EQ(chunk.getMin(), moveChunkRequestWithStatus.getValue().getMinKey()); + ASSERT_BSONOBJ_EQ(chunk.getMax(), moveChunkRequestWithStatus.getValue().getMaxKey()); + ASSERT_EQ(chunk.getShard(), moveChunkRequestWithStatus.getValue().getFromShardId()); + + ASSERT_EQ(toShardId, moveChunkRequestWithStatus.getValue().getToShardId()); + ASSERT_EQ(takeDistLock, moveChunkRequestWithStatus.getValue().getTakeDistLock()); + + return response; + }); +} + +void MigrationManagerTest::expectMoveChunkCommand(const ChunkType& chunk, + const ShardId& toShardId, + const bool& takeDistLock, + const Status& returnStatus) { + BSONObjBuilder resultBuilder; + Command::appendCommandStatus(resultBuilder, returnStatus); + expectMoveChunkCommand(chunk, toShardId, takeDistLock, resultBuilder.obj()); +} + +TEST_F(MigrationManagerTest, OneCollectionTwoMigrations) { + // Set up two shards in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up two chunks in the metadata. + ChunkType chunk1 = + setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); + version.incMinor(); + ChunkType chunk2 = + setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); + + // Going to request that these two chunks get migrated. + const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}, + {chunk2.getNS(), kShardId3, chunk2}}; + + auto future = launchAsync([this, migrationRequests] { + Client::initThreadIfNotAlready("Test"); + auto txn = cc().makeOperationContext(); + + // Scheduling the moveChunk commands requires finding a host to which to send the command. + // Set up dummy hosts for the source shards. + shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2); + + MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( + txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); + + for (const auto& migrateInfo : migrationRequests) { + ASSERT_OK(migrationStatuses.at(migrateInfo.getName())); + } + }); + + // Expect two moveChunk commands. + expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK()); + expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK()); + + // Run the MigrationManager code. + future.timed_get(kFutureTimeout); +} + +TEST_F(MigrationManagerTest, TwoCollectionsTwoMigrationsEach) { + // Set up two shards in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); + + // Set up a database and two collections as sharded in the metadata. + std::string dbName = "foo"; + std::string collName1 = "foo.bar"; + std::string collName2 = "foo.baz"; + ChunkVersion version1(2, 0, OID::gen()); + ChunkVersion version2(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName1, version1); + setUpCollection(collName2, version2); + + // Set up two chunks in the metadata for each collection. + ChunkType chunk1coll1 = + setUpChunk(collName1, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version1); + version1.incMinor(); + ChunkType chunk2coll1 = + setUpChunk(collName1, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version1); + + ChunkType chunk1coll2 = + setUpChunk(collName2, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version2); + version2.incMinor(); + ChunkType chunk2coll2 = + setUpChunk(collName2, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version2); + + // Going to request that these four chunks get migrated. + const std::vector<MigrateInfo> migrationRequests{{chunk1coll1.getNS(), kShardId1, chunk1coll1}, + {chunk2coll1.getNS(), kShardId3, chunk2coll1}, + {chunk1coll2.getNS(), kShardId1, chunk1coll2}, + {chunk2coll2.getNS(), kShardId3, chunk2coll2}}; + + auto future = launchAsync([this, migrationRequests] { + Client::initThreadIfNotAlready("Test"); + auto txn = cc().makeOperationContext(); + + // Scheduling the moveChunk commands requires finding a host to which to send the command. + // Set up dummy hosts for the source shards. + shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2); + + MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( + txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); + + for (const auto& migrateInfo : migrationRequests) { + ASSERT_OK(migrationStatuses.at(migrateInfo.getName())); + } + }); + + // Expect four moveChunk commands. + expectMoveChunkCommand(chunk1coll1, kShardId1, false, Status::OK()); + expectMoveChunkCommand(chunk2coll1, kShardId3, false, Status::OK()); + expectMoveChunkCommand(chunk1coll2, kShardId1, false, Status::OK()); + expectMoveChunkCommand(chunk2coll2, kShardId3, false, Status::OK()); + + // Run the MigrationManager code. + future.timed_get(kFutureTimeout); +} + +// Old v3.2 shards expect to take the distributed lock before executing a moveChunk command. The +// MigrationManager should take the distlock and fail the first moveChunk command with an old shard, +// and then release the lock and retry the command successfully. +TEST_F(MigrationManagerTest, SameCollectionOldShardMigration) { + // Set up two shards in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up two chunks in the metadata. + ChunkType chunk1 = + setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); + version.incMinor(); + ChunkType chunk2 = + setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); + + // Going to request that these two chunks get migrated. + const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}, + {chunk2.getNS(), kShardId3, chunk2}}; + + auto future = launchAsync([this, migrationRequests] { + Client::initThreadIfNotAlready("Test"); + auto txn = cc().makeOperationContext(); + + // Scheduling the moveChunk commands requires finding a host to which to send the command. + // Set up dummy hosts for the source shards. + shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2); + + MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( + txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); + + for (const auto& migrateInfo : migrationRequests) { + ASSERT_OK(migrationStatuses.at(migrateInfo.getName())); + } + }); + + // Expect two moveChunk commands. + expectMoveChunkCommand( + chunk1, + kShardId1, + false, + Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigration generated error.")); + expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK()); + expectMoveChunkCommand(chunk1, kShardId1, true, Status::OK()); + + // Run the MigrationManager code. + future.timed_get(kFutureTimeout); +} + +// Fail a migration if an old v3.2 shard fails to acquire the distributed lock more than once. The +// first LockBusy error identifies the shard as an old shard to the MigrationManager, the second +// indicates the lock is held elsewhere and unavailable. +TEST_F(MigrationManagerTest, SameOldShardFailsToAcquireDistributedLockTwice) { + // Set up a shard in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up a chunk in the metadata. + ChunkType chunk1 = + setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version); + + // Going to request that this chunk get migrated. + const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}}; + + auto future = launchAsync([this, migrationRequests] { + Client::initThreadIfNotAlready("Test"); + auto txn = cc().makeOperationContext(); + + // Scheduling the moveChunk commands requires finding a host to which to send the command. + // Set up a dummy host for the source shard. + shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + + MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( + txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); + + for (const auto& migrateInfo : migrationRequests) { + ASSERT_EQ(ErrorCodes::LockBusy, migrationStatuses.at(migrateInfo.getName())); + } + }); + + // Expect two sequential moveChunk commands to the same shard, both of which fail with LockBusy. + expectMoveChunkCommand( + chunk1, + kShardId1, + false, + Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigrations generated error.")); + expectMoveChunkCommand( + chunk1, + kShardId1, + true, + Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigrations generated error.")); + + // Run the MigrationManager code. + future.timed_get(kFutureTimeout); +} + +// If in the same collection a migration is scheduled with an old v3.2 shard, a second migration in +// the collection with a different old v3.2 shard should get rescheduled. +TEST_F(MigrationManagerTest, SameCollectionTwoOldShardMigrations) { + // Set up two shards in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up two chunks in the metadata. + ChunkType chunk1 = + setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); + version.incMinor(); + ChunkType chunk2 = + setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); + + // Going to request that these two chunks get migrated. + const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}, + {chunk2.getNS(), kShardId3, chunk2}}; + + auto future = launchAsync([this, migrationRequests] { + Client::initThreadIfNotAlready("Test"); + auto txn = cc().makeOperationContext(); + + // Scheduling the moveChunk commands requires finding a host to which to send the command. + // Set up dummy hosts for the source shards. + shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2); + + MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( + txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); + + for (const auto& migrateInfo : migrationRequests) { + ASSERT_OK(migrationStatuses.at(migrateInfo.getName())); + } + }); + + // Expect two failed moveChunk commands, then two successful moveChunk commands after the + // balancer releases the distributed lock. + expectMoveChunkCommand( + chunk1, + kShardId1, + false, + Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigration generated error.")); + expectMoveChunkCommand( + chunk2, + kShardId3, + false, + Status(ErrorCodes::LockBusy, "SameCollectionOldShardMigration generated error.")); + expectMoveChunkCommand(chunk1, kShardId1, true, Status::OK()); + expectMoveChunkCommand(chunk2, kShardId3, true, Status::OK()); + + // Run the MigrationManager code. + future.timed_get(kFutureTimeout); +} + +// Takes the distributed lock for a collection so that that the MigrationManager is unable to +// schedule migrations for that collection. +TEST_F(MigrationManagerTest, FailToAcquireDistributedLock) { + // Set up two shards in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up two chunks in the metadata. + ChunkType chunk1 = + setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); + version.incMinor(); + ChunkType chunk2 = + setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); + + // Going to request that these two chunks get migrated. + const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}, + {chunk2.getNS(), kShardId3, chunk2}}; + + shardTargeterMock(operationContext(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(operationContext(), kShardId2)->setFindHostReturnValue(kShardHost2); + + // Take the distributed lock for the collection before scheduling via the MigrationManager. + const std::string whyMessage("FailToAcquireDistributedLock unit-test taking distributed lock"); + DistLockManager::ScopedDistLock distLockStatus = assertGet( + catalogClient()->getDistLockManager()->lock(operationContext(), + chunk1.getNS(), + whyMessage, + DistLockManager::kSingleLockAttemptTimeout)); + + MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( + operationContext(), migrationRequests, 0, kDefaultSecondaryThrottle, false); + + for (const auto& migrateInfo : migrationRequests) { + ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, + migrationStatuses.at(migrateInfo.getName())); + } +} + +// The MigrationManager should fail the migration if a host is not found for the source shard. +// Scheduling a moveChunk command requires finding a host to which to send the command. +TEST_F(MigrationManagerTest, SourceShardNotFound) { + // Set up two shards in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up two chunks in the metadata. + ChunkType chunk1 = + setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); + version.incMinor(); + ChunkType chunk2 = + setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); + + // Going to request that these two chunks get migrated. + const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}, + {chunk2.getNS(), kShardId3, chunk2}}; + + auto future = launchAsync([this, chunk1, chunk2, migrationRequests] { + Client::initThreadIfNotAlready("Test"); + auto txn = cc().makeOperationContext(); + + // Scheduling a moveChunk command requires finding a host to which to send the command. Set + // up a dummy host for kShardHost0, and return an error for kShardHost3. + shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(txn.get(), kShardId2) + ->setFindHostReturnValue( + Status(ErrorCodes::ReplicaSetNotFound, "SourceShardNotFound generated error.")); + + MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( + txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); + + ASSERT_OK(migrationStatuses.at(chunk1.getName())); + ASSERT_EQ(ErrorCodes::ReplicaSetNotFound, migrationStatuses.at(chunk2.getName())); + }); + + // Expect only one moveChunk command to be called. + expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK()); + + // Run the MigrationManager code. + future.timed_get(kFutureTimeout); +} + +TEST_F(MigrationManagerTest, JumboChunkResponseBackwardsCompatibility) { + // Set up one shard in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up a single chunk in the metadata. + ChunkType chunk1 = + setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version); + + // Going to request that this chunk gets migrated. + const std::vector<MigrateInfo> migrationRequests{{chunk1.getNS(), kShardId1, chunk1}}; + + auto future = launchAsync([this, chunk1, migrationRequests] { + Client::initThreadIfNotAlready("Test"); + auto txn = cc().makeOperationContext(); + + // Scheduling a moveChunk command requires finding a host to which to send the command. Set + // up a dummy host for kShardHost0. + shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + + MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( + txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); + + ASSERT_EQ(ErrorCodes::ChunkTooBig, migrationStatuses.at(chunk1.getName())); + }); + + // Expect only one moveChunk command to be called. + expectMoveChunkCommand(chunk1, kShardId1, false, BSON("ok" << 0 << "chunkTooBig" << true)); + + // Run the MigrationManager code. + future.timed_get(kFutureTimeout); +} + +TEST_F(MigrationManagerTest, InterruptMigration) { + // Set up one shard in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up a single chunk in the metadata. + ChunkType chunk = + setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version); + + auto future = launchAsync([&] { + Client::initThreadIfNotAlready("Test"); + auto txn = cc().makeOperationContext(); + + // Scheduling a moveChunk command requires finding a host to which to send the command. Set + // up a dummy host for kShardHost0. + shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + + ASSERT_NOT_OK(_migrationManager->executeManualMigration( + txn.get(), {chunk.getNS(), kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false)); + }); + + // Wait till the move chunk request gets sent and pretend that it is stuck by never responding + // to the request + network()->enterNetwork(); + network()->blackHole(network()->getNextReadyRequest()); + network()->exitNetwork(); + + // Now that the migration request is 'pending', try to cancel the migration manager. This should + // succeed. + _migrationManager->interruptAndDisableMigrations(); + + // Ensure that cancellations get processed + network()->enterNetwork(); + network()->runReadyNetworkOperations(); + network()->exitNetwork(); + + // Ensure that the previously scheduled migration is cancelled + future.timed_get(kFutureTimeout); + + // Ensure that no new migrations can be scheduled + ASSERT_NOT_OK(_migrationManager->executeManualMigration(operationContext(), + {chunk.getNS(), kShardId1, chunk}, + 0, + kDefaultSecondaryThrottle, + false)); + + // Ensure that the migration manager is no longer handling any migrations. + _migrationManager->drainActiveMigrations(); + + // Check that the migration that was active when the migration manager was interrupted can be + // found in config.migrations (and thus would be recovered if a migration manager were to start + // up again). + auto statusWithMigrationsQueryResponse = + shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(MigrationType::ConfigNS), + BSON(MigrationType::name(chunk.getName())), + BSONObj(), + boost::none); + Shard::QueryResponse migrationsQueryResponse = + uassertStatusOK(statusWithMigrationsQueryResponse); + ASSERT_EQUALS(1U, migrationsQueryResponse.docs.size()); + + ASSERT_OK(catalogClient()->removeConfigDocuments(operationContext(), + MigrationType::ConfigNS, + BSON(MigrationType::name(chunk.getName())), + kMajorityWriteConcern)); + + // Restore the migration manager back to the started state, which is expected by tearDown + _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); + _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false); +} + +TEST_F(MigrationManagerTest, RestartMigrationManager) { + // Set up one shard in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up a single chunk in the metadata. + ChunkType chunk1 = + setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version); + + // Go through the lifecycle of the migration manager + _migrationManager->interruptAndDisableMigrations(); + _migrationManager->drainActiveMigrations(); + _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); + _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false); + + auto future = launchAsync([&] { + Client::initThreadIfNotAlready("Test"); + auto txn = cc().makeOperationContext(); + + // Scheduling a moveChunk command requires finding a host to which to send the command. Set + // up a dummy host for kShardHost0. + shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + + ASSERT_OK(_migrationManager->executeManualMigration( + txn.get(), {chunk1.getNS(), kShardId1, chunk1}, 0, kDefaultSecondaryThrottle, false)); + }); + + // Expect only one moveChunk command to be called. + expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK()); + + // Run the MigrationManager code. + future.timed_get(kFutureTimeout); +} + +TEST_F(MigrationManagerTest, MigrationRecovery) { + // Set up two shards in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up two chunks in the metadata. + ChunkType chunk1 = + setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); + version.incMinor(); + ChunkType chunk2 = + setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); + + _migrationManager->interruptAndDisableMigrations(); + _migrationManager->drainActiveMigrations(); + + // Set up two fake active migrations by writing documents to the config.migrations collection. + setUpMigration(collName, + chunk1.getMin(), + chunk1.getMax(), + kShardId1.toString(), + chunk1.getShard().toString()); + setUpMigration(collName, + chunk2.getMin(), + chunk2.getMax(), + kShardId3.toString(), + chunk2.getShard().toString()); + + _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); + + auto future = launchAsync([this] { + Client::initThreadIfNotAlready("Test"); + auto txn = cc().makeOperationContext(); + + // Scheduling the moveChunk commands requires finding hosts to which to send the commands. + // Set up dummy hosts for the source shards. + shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2); + + _migrationManager->finishRecovery(txn.get(), 0, kDefaultSecondaryThrottle, false); + }); + + // Expect two moveChunk commands. + expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK()); + expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK()); + + // Run the MigrationManager code. + future.timed_get(kFutureTimeout); +} + +TEST_F(MigrationManagerTest, FailMigrationRecovery) { + // Set up two shards in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up two chunks in the metadata. + ChunkType chunk1 = + setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); + version.incMinor(); + ChunkType chunk2 = + setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); + + _migrationManager->interruptAndDisableMigrations(); + _migrationManager->drainActiveMigrations(); + + // Set up a parsable fake active migration document in the config.migrations collection. + setUpMigration(collName, + chunk1.getMin(), + chunk1.getMax(), + kShardId1.toString(), + chunk1.getShard().toString()); + + // Set up a fake active migration document that will fail MigrationType parsing -- missing + // field. + BSONObjBuilder builder; + builder.append("_id", "testing"); + // No MigrationType::ns() field! + builder.append(MigrationType::min(), chunk2.getMin()); + builder.append(MigrationType::max(), chunk2.getMax()); + builder.append(MigrationType::toShard(), kShardId3.toString()); + builder.append(MigrationType::fromShard(), chunk2.getShard().toString()); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), MigrationType::ConfigNS, builder.obj(), kMajorityWriteConcern)); + + // Take the distributed lock for the collection, which should be released during recovery when + // it fails. Any dist lock held by the config server will be released via proccessId, so the + // session ID used here doesn't matter. + ASSERT_OK(catalogClient()->getDistLockManager()->lockWithSessionID( + operationContext(), + collName, + "MigrationManagerTest", + OID::gen(), + DistLockManager::kSingleLockAttemptTimeout)); + + _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); + _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false); + + // MigrationManagerTest::tearDown checks that the config.migrations collection is empty and all + // distributed locks are unlocked. +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/balancer/scoped_migration_request.cpp b/src/mongo/s/balancer/scoped_migration_request.cpp new file mode 100644 index 00000000000..5b02d57b1d8 --- /dev/null +++ b/src/mongo/s/balancer/scoped_migration_request.cpp @@ -0,0 +1,188 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/scoped_migration_request.h" + +#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/s/balancer/type_migration.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/util/log.h" + +namespace mongo { + +namespace { +const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + Seconds(15)); +const int kDuplicateKeyErrorMaxRetries = 2; +} + +ScopedMigrationRequest::ScopedMigrationRequest(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& minKey) + : _txn(txn), _nss(nss), _minKey(minKey) {} + +ScopedMigrationRequest::~ScopedMigrationRequest() { + if (!_txn) { + // If the txn object was cleared, nothing should happen in the destructor. + return; + } + + // Try to delete the entry in the config.migrations collection. If the command fails, that is + // okay. + BSONObj migrationDocumentIdentifier = + BSON(MigrationType::ns(_nss.ns()) << MigrationType::min(_minKey)); + Status result = grid.catalogClient(_txn)->removeConfigDocuments( + _txn, MigrationType::ConfigNS, migrationDocumentIdentifier, kMajorityWriteConcern); + + if (!result.isOK()) { + LOG(0) << "Failed to remove config.migrations document for migration '" + << migrationDocumentIdentifier.toString() << "'" << causedBy(redact(result)); + } +} + +ScopedMigrationRequest::ScopedMigrationRequest(ScopedMigrationRequest&& other) { + *this = std::move(other); + // Set txn to null so that the destructor will do nothing. + other._txn = nullptr; +} + +ScopedMigrationRequest& ScopedMigrationRequest::operator=(ScopedMigrationRequest&& other) { + if (this != &other) { + _txn = other._txn; + _nss = other._nss; + _minKey = other._minKey; + // Set txn to null so that the destructor will do nothing. + other._txn = nullptr; + } + + return *this; +} + +StatusWith<ScopedMigrationRequest> ScopedMigrationRequest::writeMigration( + OperationContext* txn, const MigrateInfo& migrateInfo) { + + // Try to write a unique migration document to config.migrations. + MigrationType migrationType(migrateInfo); + for (int retry = 0; retry < kDuplicateKeyErrorMaxRetries; ++retry) { + Status result = grid.catalogClient(txn)->insertConfigDocument( + txn, MigrationType::ConfigNS, migrationType.toBSON(), kMajorityWriteConcern); + + if (result == ErrorCodes::DuplicateKey) { + // If the exact migration described by "migrateInfo" is active, return a scoped object + // for the request because this migration request will join the active one once + // scheduled. + auto statusWithMigrationQueryResult = + grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(MigrationType::ConfigNS), + BSON(MigrationType::name(migrateInfo.getName())), + BSONObj(), + boost::none); + if (!statusWithMigrationQueryResult.isOK()) { + return {statusWithMigrationQueryResult.getStatus().code(), + str::stream() + << "Failed to verify whether conflicting migration is in " + << "progress for migration '" + << redact(migrateInfo.toString()) + << "' while trying to query config.migrations." + << causedBy(redact(statusWithMigrationQueryResult.getStatus()))}; + } + if (statusWithMigrationQueryResult.getValue().docs.empty()) { + // The document that caused the DuplicateKey error is no longer in the collection, + // so retrying the insert might succeed. + continue; + } + invariant(statusWithMigrationQueryResult.getValue().docs.size() == 1); + + BSONObj activeMigrationBSON = statusWithMigrationQueryResult.getValue().docs.front(); + auto statusWithActiveMigration = MigrationType::fromBSON(activeMigrationBSON); + if (!statusWithActiveMigration.isOK()) { + return {statusWithActiveMigration.getStatus().code(), + str::stream() << "Failed to verify whether conflicting migration is in " + << "progress for migration '" + << redact(migrateInfo.toString()) + << "' while trying to parse active migration document '" + << redact(activeMigrationBSON.toString()) + << "'." + << causedBy(redact(statusWithActiveMigration.getStatus()))}; + } + + MigrateInfo activeMigrateInfo = statusWithActiveMigration.getValue().toMigrateInfo(); + if (activeMigrateInfo.to != migrateInfo.to || + activeMigrateInfo.from != migrateInfo.from) { + log() << "Failed to write document '" << redact(migrateInfo.toString()) + << "' to config.migrations because there is already an active migration for" + << " that chunk: '" << redact(activeMigrateInfo.toString()) << "'." + << causedBy(redact(result)); + return result; + } + + result = Status::OK(); + } + + // As long as there isn't a DuplicateKey error, the document may have been written, and it's + // safe (won't delete another migration's document) and necessary to try to clean up the + // document via the destructor. + ScopedMigrationRequest scopedMigrationRequest( + txn, NamespaceString(migrateInfo.ns), migrateInfo.minKey); + + // If there was a write error, let the object go out of scope and clean up in the + // destructor. + if (!result.isOK()) { + return result; + } + + return std::move(scopedMigrationRequest); + } + + MONGO_UNREACHABLE; +} + +ScopedMigrationRequest ScopedMigrationRequest::createForRecovery(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& minKey) { + return ScopedMigrationRequest(txn, nss, minKey); +} + +void ScopedMigrationRequest::keepDocumentOnDestruct() { + _txn = nullptr; + LOG(1) << "Keeping config.migrations document with namespace '" << _nss << "' and minKey '" + << _minKey << "' for balancer recovery"; +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/scoped_migration_request.h b/src/mongo/s/balancer/scoped_migration_request.h new file mode 100644 index 00000000000..8595671dc4d --- /dev/null +++ b/src/mongo/s/balancer/scoped_migration_request.h @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/base/status_with.h" +#include "mongo/s/balancer/balancer_policy.h" +#include "mongo/s/migration_secondary_throttle_options.h" + +namespace mongo { + +/** + * RAII class that handles writes to the config.migrations collection for a migration that comes + * through the balancer. + * + * A migration must have an entry in the config.migrations collection so that the Balancer can + * recover from stepdown/crash. This entry must be entered before a migration begins and then + * removed once the migration has finished. + * + * This class should only be used by the Balancer! + */ +class ScopedMigrationRequest { + MONGO_DISALLOW_COPYING(ScopedMigrationRequest); + +public: + /** + * Deletes this migration's entry in the config.migrations collection, using majority write + * concern. If there is a balancer stepdown/crash before the write propagates to a majority of + * servers, that is alright because the balancer recovery process will handle it. + * + * If keepDocumentOnDestruct() has been called, then no attempt to remove the document is made. + */ + ~ScopedMigrationRequest(); + + ScopedMigrationRequest(ScopedMigrationRequest&& other); + ScopedMigrationRequest& operator=(ScopedMigrationRequest&& other); + + /** + * Inserts an unique migration entry in the config.migrations collection. If the write is + * successful, a ScopedMigrationRequest object is returned; otherwise, the write error. + * + * The destructor will handle removing the document when it is no longer needed. + */ + static StatusWith<ScopedMigrationRequest> writeMigration(OperationContext* txn, + const MigrateInfo& migrate); + + /** + * Creates a ScopedMigrationRequest object without inserting a document into config.migrations. + * The destructor will handle removing the migration document when it is no longer needed. + * + * This should only be used on Balancer recovery when a config.migrations document already + * exists for the migration. + */ + static ScopedMigrationRequest createForRecovery(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& minKey); + + /** + * Clears the operation context so that the destructor will not remove the config.migrations + * document for the migration. + * + * This should only be used on the Balancer when it is interrupted and must leave entries in + * config.migrations so that ongoing migrations can be recovered later. + */ + void keepDocumentOnDestruct(); + +private: + ScopedMigrationRequest(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& minKey); + + // Need an operation context with which to do a write in the destructor. + OperationContext* _txn; + + // ns and minkey are needed to identify the migration document when it is removed from + // config.migrations by the destructor. + NamespaceString _nss; + BSONObj _minKey; +}; + +} // namespace mongo diff --git a/src/mongo/s/balancer/scoped_migration_request_test.cpp b/src/mongo/s/balancer/scoped_migration_request_test.cpp new file mode 100644 index 00000000000..48c0d501136 --- /dev/null +++ b/src/mongo/s/balancer/scoped_migration_request_test.cpp @@ -0,0 +1,213 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/scoped_migration_request.h" + +#include "mongo/s/balancer/type_migration.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/config_server_test_fixture.h" +#include "mongo/s/migration_secondary_throttle_options.h" + +namespace mongo { +namespace { + +using unittest::assertGet; + +const std::string kNs = "TestDB.TestColl"; +const BSONObj kMin = BSON("a" << 10); +const BSONObj kMax = BSON("a" << 20); +const ShardId kFromShard("shard0000"); +const ShardId kToShard("shard0001"); +const ShardId kDifferentToShard("shard0002"); +const std::string kName = "TestDB.TestColl-a_10"; + +class ScopedMigrationRequestTest : public ConfigServerTestFixture { +public: + /** + * Queries config.migrations for a document with name (_id) "chunkName" and asserts that the + * number of documents returned equals "expectedNumberOfDocuments". + */ + void checkMigrationsCollectionForDocument(std::string chunkName, + const unsigned long expectedNumberOfDocuments); + + /** + * Makes a ScopedMigrationRequest and checks that the migration was written to + * config.migrations. This exercises the ScopedMigrationRequest move and assignment + * constructors. + */ + ScopedMigrationRequest makeScopedMigrationRequest(const MigrateInfo& migrateInfo); +}; + +void ScopedMigrationRequestTest::checkMigrationsCollectionForDocument( + std::string chunkName, const unsigned long expectedNumberOfDocuments) { + auto response = shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(MigrationType::ConfigNS), + BSON(MigrationType::name(chunkName)), + BSONObj(), + boost::none); + Shard::QueryResponse queryResponse = unittest::assertGet(response); + std::vector<BSONObj> docs = queryResponse.docs; + ASSERT_EQUALS(expectedNumberOfDocuments, docs.size()); +} + +ScopedMigrationRequest ScopedMigrationRequestTest::makeScopedMigrationRequest( + const MigrateInfo& migrateInfo) { + ScopedMigrationRequest scopedMigrationRequest = + assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo)); + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); + + return scopedMigrationRequest; +} + +MigrateInfo makeMigrateInfo() { + const ChunkVersion kChunkVersion{1, 2, OID::gen()}; + + BSONObjBuilder chunkBuilder; + chunkBuilder.append(ChunkType::name(), kName); + chunkBuilder.append(ChunkType::ns(), kNs); + chunkBuilder.append(ChunkType::min(), kMin); + chunkBuilder.append(ChunkType::max(), kMax); + kChunkVersion.appendForChunk(&chunkBuilder); + chunkBuilder.append(ChunkType::shard(), kFromShard.toString()); + + ChunkType chunkType = assertGet(ChunkType::fromBSON(chunkBuilder.obj())); + ASSERT_OK(chunkType.validate()); + + return MigrateInfo(kNs, kToShard, chunkType); +} + +TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequest) { + MigrateInfo migrateInfo = makeMigrateInfo(); + + { + ScopedMigrationRequest scopedMigrationRequest = + assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo)); + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); + } + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 0); +} + +/** + * A document is created via scoped object, but document is not removed in destructor because + * keepDocumentOnDestruct() is called beforehand. Then recreate the scoped object without writing to + * the migraitons collection, and remove on destruct. + * + * Simulates (mostly) Balancer recovery. + */ +TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequestOnRecovery) { + MigrateInfo migrateInfo = makeMigrateInfo(); + + // Insert the document for the MigrationRequest and then prevent its removal in the destructor. + { + ScopedMigrationRequest scopedMigrationRequest = + assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo)); + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); + + scopedMigrationRequest.keepDocumentOnDestruct(); + } + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); + + // Fail to write a migration document if a migration document already exists for that chunk but + // with a different destination shard. (the migration request must have identical parameters). + { + MigrateInfo differentToShardMigrateInfo = migrateInfo; + differentToShardMigrateInfo.to = kDifferentToShard; + + StatusWith<ScopedMigrationRequest> statusWithScopedMigrationRequest = + ScopedMigrationRequest::writeMigration(operationContext(), differentToShardMigrateInfo); + + ASSERT_EQUALS(ErrorCodes::DuplicateKey, statusWithScopedMigrationRequest.getStatus()); + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); + } + + // Create a new scoped object without inserting a document, and check that the destructor + // still removes the document corresponding to the MigrationRequest. + { + ScopedMigrationRequest scopedMigrationRequest = ScopedMigrationRequest::createForRecovery( + operationContext(), NamespaceString(migrateInfo.ns), migrateInfo.minKey); + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); + } + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 0); +} + +TEST_F(ScopedMigrationRequestTest, CreateMultipleScopedMigrationRequestsForIdenticalMigration) { + MigrateInfo migrateInfo = makeMigrateInfo(); + + { + // Create a ScopedMigrationRequest, which will do the config.migrations write. + ScopedMigrationRequest scopedMigrationRequest = + assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo)); + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); + + { + // Should be able to create another Scoped object if the request is identical. + ScopedMigrationRequest identicalScopedMigrationRequest = + assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo)); + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); + } + + // If any scoped object goes out of scope, the migration should be over and the document + // removed. + checkMigrationsCollectionForDocument(migrateInfo.getName(), 0); + } + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 0); +} + +TEST_F(ScopedMigrationRequestTest, MoveAndAssignmentConstructors) { + MigrateInfo migrateInfo = makeMigrateInfo(); + + // Test that when the move and assignment constructors are used and the original variable goes + // out of scope, the original object's destructor does not remove the migration document. + { + ScopedMigrationRequest anotherScopedMigrationRequest = + makeScopedMigrationRequest(migrateInfo); + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); + } + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 0); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/balancer/type_migration.cpp b/src/mongo/s/balancer/type_migration.cpp new file mode 100644 index 00000000000..963c40b30e3 --- /dev/null +++ b/src/mongo/s/balancer/type_migration.cpp @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/type_migration.h" + +#include "mongo/bson/util/bson_extract.h" +#include "mongo/s/catalog/type_chunk.h" + +namespace mongo { + +const std::string MigrationType::ConfigNS = "config.migrations"; + +const BSONField<std::string> MigrationType::name("_id"); +const BSONField<std::string> MigrationType::ns("ns"); +const BSONField<BSONObj> MigrationType::min("min"); +const BSONField<BSONObj> MigrationType::max("max"); +const BSONField<std::string> MigrationType::fromShard("fromShard"); +const BSONField<std::string> MigrationType::toShard("toShard"); + +MigrationType::MigrationType() = default; + +MigrationType::MigrationType(MigrateInfo info) + : _nss(NamespaceString(info.ns)), + _min(info.minKey), + _max(info.maxKey), + _fromShard(info.from), + _toShard(info.to) {} + +StatusWith<MigrationType> MigrationType::fromBSON(const BSONObj& source) { + MigrationType migrationType; + + { + std::string migrationNS; + Status status = bsonExtractStringField(source, ns.name(), &migrationNS); + if (!status.isOK()) + return status; + migrationType._nss = NamespaceString(migrationNS); + } + + { + auto chunkRangeStatus = ChunkRange::fromBSON(source); + if (!chunkRangeStatus.isOK()) + return chunkRangeStatus.getStatus(); + + const auto chunkRange = std::move(chunkRangeStatus.getValue()); + migrationType._min = chunkRange.getMin().getOwned(); + migrationType._max = chunkRange.getMax().getOwned(); + } + + { + std::string migrationToShard; + Status status = bsonExtractStringField(source, toShard.name(), &migrationToShard); + if (!status.isOK()) + return status; + migrationType._toShard = migrationToShard; + } + + { + std::string migrationFromShard; + Status status = bsonExtractStringField(source, fromShard.name(), &migrationFromShard); + if (!status.isOK()) + return status; + migrationType._fromShard = migrationFromShard; + } + + return migrationType; +} + +BSONObj MigrationType::toBSON() const { + BSONObjBuilder builder; + if (_nss && _min) + builder.append(name.name(), getName()); + if (_nss) + builder.append(ns.name(), _nss->ns()); + if (_min) + builder.append(min.name(), _min.get()); + if (_max) + builder.append(max.name(), _max.get()); + if (_fromShard) + builder.append(fromShard.name(), _fromShard->toString()); + if (_toShard) + builder.append(toShard.name(), _toShard->toString()); + + return builder.obj(); +} + +MigrateInfo MigrationType::toMigrateInfo() const { + return MigrateInfo(_nss->ns(), _toShard.get(), _fromShard.get(), _min.get(), _max.get()); +} + +std::string MigrationType::getName() const { + return ChunkType::genID(_nss->ns(), _min.get()); +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/type_migration.h b/src/mongo/s/balancer/type_migration.h new file mode 100644 index 00000000000..5f2948e9dfe --- /dev/null +++ b/src/mongo/s/balancer/type_migration.h @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/bson/bsonobj.h" +#include "mongo/s/balancer/balancer_policy.h" +#include "mongo/s/chunk_version.h" +#include "mongo/s/client/shard.h" + +namespace mongo { + +/** + * This class represents the layout and contents of documents contained in the config.migrations + * collection. All manipulation of documents coming from that collection should be done with this + * class. + */ +class MigrationType { +public: + // Name of the migrations collection in the config server. + static const std::string ConfigNS; + + // Field names and types in the migrations collection type. + static const BSONField<std::string> name; + static const BSONField<std::string> ns; + static const BSONField<BSONObj> min; + static const BSONField<BSONObj> max; + static const BSONField<std::string> fromShard; + static const BSONField<std::string> toShard; + static const BSONField<std::string> chunkVersionField; + static const BSONField<std::string> collectionVersionField; + + /** + * The Balancer encapsulates migration information in MigrateInfo objects, so this facilitates + * conversion to a config.migrations entry format. + */ + explicit MigrationType(MigrateInfo info); + + /** + * Constructs a new MigrationType object from BSON. Expects all fields to be present, and errors + * if they are not. + */ + static StatusWith<MigrationType> fromBSON(const BSONObj& source); + + /** + * Returns the BSON representation of the config.migrations document entry. + */ + BSONObj toBSON() const; + + /** + * Helper function for the Balancer that uses MigrateInfo objects to schedule migrations. + */ + MigrateInfo toMigrateInfo() const; + + /** + * Uniquely identifies a chunk by collection and min key. + */ + std::string getName() const; + +private: + MigrationType(); + + // Required fields for config.migrations. + boost::optional<NamespaceString> _nss; + boost::optional<BSONObj> _min; + boost::optional<BSONObj> _max; + boost::optional<ShardId> _fromShard; + boost::optional<ShardId> _toShard; +}; + +} // namespace mongo diff --git a/src/mongo/s/balancer/type_migration_test.cpp b/src/mongo/s/balancer/type_migration_test.cpp new file mode 100644 index 00000000000..d3a352301aa --- /dev/null +++ b/src/mongo/s/balancer/type_migration_test.cpp @@ -0,0 +1,165 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/jsobj.h" +#include "mongo/s/balancer/type_migration.h" +#include "mongo/s/catalog/type_chunk.h" + +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using unittest::assertGet; + +const std::string kName = "TestDB.TestColl-a_10"; +const std::string kNs = "TestDB.TestColl"; +const BSONObj kMin = BSON("a" << 10); +const BSONObj kMax = BSON("a" << 20); +const ShardId kFromShard("shard0000"); +const ShardId kToShard("shard0001"); + +TEST(MigrationTypeTest, ConvertFromMigrationInfo) { + const ChunkVersion version(1, 2, OID::gen()); + + BSONObjBuilder chunkBuilder; + chunkBuilder.append(ChunkType::name(), kName); + chunkBuilder.append(ChunkType::ns(), kNs); + chunkBuilder.append(ChunkType::min(), kMin); + chunkBuilder.append(ChunkType::max(), kMax); + version.appendForChunk(&chunkBuilder); + chunkBuilder.append(ChunkType::shard(), kFromShard.toString()); + + ChunkType chunkType = assertGet(ChunkType::fromBSON(chunkBuilder.obj())); + ASSERT_OK(chunkType.validate()); + + MigrateInfo migrateInfo(kNs, kToShard, chunkType); + MigrationType migrationType(migrateInfo); + + BSONObjBuilder builder; + builder.append(MigrationType::name(), kName); + builder.append(MigrationType::ns(), kNs); + builder.append(MigrationType::min(), kMin); + builder.append(MigrationType::max(), kMax); + builder.append(MigrationType::fromShard(), kFromShard.toString()); + builder.append(MigrationType::toShard(), kToShard.toString()); + + BSONObj obj = builder.obj(); + + ASSERT_BSONOBJ_EQ(obj, migrationType.toBSON()); +} + +TEST(MigrationTypeTest, FromAndToBSON) { + BSONObjBuilder builder; + builder.append(MigrationType::name(), kName); + builder.append(MigrationType::ns(), kNs); + builder.append(MigrationType::min(), kMin); + builder.append(MigrationType::max(), kMax); + builder.append(MigrationType::fromShard(), kFromShard.toString()); + builder.append(MigrationType::toShard(), kToShard.toString()); + + BSONObj obj = builder.obj(); + + MigrationType migrationType = assertGet(MigrationType::fromBSON(obj)); + ASSERT_BSONOBJ_EQ(obj, migrationType.toBSON()); +} + +TEST(MigrationTypeTest, MissingRequiredNamespaceField) { + BSONObjBuilder builder; + builder.append(MigrationType::min(), kMin); + builder.append(MigrationType::max(), kMax); + builder.append(MigrationType::fromShard(), kFromShard.toString()); + builder.append(MigrationType::toShard(), kToShard.toString()); + + BSONObj obj = builder.obj(); + + StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj); + ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey); + ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), MigrationType::ns.name()); +} + +TEST(MigrationTypeTest, MissingRequiredMinField) { + BSONObjBuilder builder; + builder.append(MigrationType::ns(), kNs); + builder.append(MigrationType::max(), kMax); + builder.append(MigrationType::fromShard(), kFromShard.toString()); + builder.append(MigrationType::toShard(), kToShard.toString()); + + BSONObj obj = builder.obj(); + + StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj); + ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey); + ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), MigrationType::min.name()); +} + +TEST(MigrationTypeTest, MissingRequiredMaxField) { + BSONObjBuilder builder; + builder.append(MigrationType::ns(), kNs); + builder.append(MigrationType::min(), kMin); + builder.append(MigrationType::fromShard(), kFromShard.toString()); + builder.append(MigrationType::toShard(), kToShard.toString()); + + BSONObj obj = builder.obj(); + + StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj); + ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey); + ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), MigrationType::max.name()); +} + +TEST(MigrationTypeTest, MissingRequiredFromShardField) { + BSONObjBuilder builder; + builder.append(MigrationType::ns(), kNs); + builder.append(MigrationType::min(), kMin); + builder.append(MigrationType::max(), kMax); + builder.append(MigrationType::toShard(), kToShard.toString()); + + BSONObj obj = builder.obj(); + + StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj); + ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey); + ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), MigrationType::fromShard.name()); +} + +TEST(MigrationTypeTest, MissingRequiredToShardField) { + BSONObjBuilder builder; + builder.append(MigrationType::ns(), kNs); + builder.append(MigrationType::min(), kMin); + builder.append(MigrationType::max(), kMax); + builder.append(MigrationType::fromShard(), kFromShard.toString()); + + BSONObj obj = builder.obj(); + + StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj); + ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey); + ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), MigrationType::toShard.name()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/catalog/replset_dist_lock_manager_test.cpp b/src/mongo/s/catalog/replset_dist_lock_manager_test.cpp index 4862d1eeae9..02c9c8f6b1f 100644 --- a/src/mongo/s/catalog/replset_dist_lock_manager_test.cpp +++ b/src/mongo/s/catalog/replset_dist_lock_manager_test.cpp @@ -47,7 +47,7 @@ #include "mongo/db/service_context_noop.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/dist_lock_catalog_mock.h" #include "mongo/s/catalog/replset_dist_lock_manager.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" diff --git a/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp index 025b573e601..cc4220e2e96 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp @@ -50,14 +50,14 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/s/balancer/balancer_policy.h" -#include "mongo/db/s/balancer/type_migration.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/wire_version.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/balancer/balancer_policy.h" +#include "mongo/s/balancer/type_migration.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_collection.h" diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 2b091b89e42..bcdd2dd09c7 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -37,7 +37,7 @@ #include "mongo/db/commands.h" #include "mongo/db/lasterror.h" #include "mongo/platform/random.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/chunk_manager.h" diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index d1b9a1490f4..bf9998a48a5 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -49,7 +49,7 @@ #include "mongo/db/query/query_planner.h" #include "mongo/db/query/query_planner_common.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_collection.h" diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp index f7e8f460148..cf1e8df7a41 100644 --- a/src/mongo/s/cluster_write.cpp +++ b/src/mongo/s/cluster_write.cpp @@ -38,7 +38,7 @@ #include "mongo/base/status.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/chunk_manager.h" diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index a86d3b6c2f0..73f1860118d 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -38,7 +38,7 @@ #include "mongo/db/commands/find_and_modify.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/collation/collator_factory_interface.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index b2b863d7201..bd8c6c5ace2 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -41,7 +41,7 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/mr.h" #include "mongo/db/query/collation/collation_spec.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/sharding_catalog_client.h" diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp index 1040669d275..4236beb6121 100644 --- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp @@ -38,7 +38,7 @@ #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp index 6c374d4f6e1..255578c28bd 100644 --- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp @@ -49,7 +49,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/chunk_manager.h" diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp index 36f7a97e55d..5705aee7740 100644 --- a/src/mongo/s/config.cpp +++ b/src/mongo/s/config.cpp @@ -37,7 +37,7 @@ #include "mongo/db/lasterror.h" #include "mongo/db/operation_context.h" #include "mongo/db/write_concern.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp index 563c88d807b..9a8f8a33dfb 100644 --- a/src/mongo/s/config_server_test_fixture.cpp +++ b/src/mongo/s/config_server_test_fixture.cpp @@ -46,12 +46,13 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_settings.h" +#include "mongo/db/service_context_noop.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/dist_lock_catalog_impl.h" #include "mongo/s/catalog/replset_dist_lock_manager.h" diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index 98853d566f4..fbc973788d9 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -36,7 +36,7 @@ #include "mongo/db/server_options.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/sharding_catalog_manager.h" diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 6e446427982..3a2bce364e5 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -62,7 +62,7 @@ #include "mongo/db/wire_version.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/platform/process_id.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/client/shard_connection.h" diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 97b49a46c8f..51b30053305 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -46,7 +46,7 @@ #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/rpc/metadata/metadata_hook.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/dist_lock_catalog_impl.h" #include "mongo/s/catalog/replset_dist_lock_manager.h" diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp index c905fe3cdbd..2c37be980a9 100644 --- a/src/mongo/s/sharding_mongod_test_fixture.cpp +++ b/src/mongo/s/sharding_mongod_test_fixture.cpp @@ -54,7 +54,7 @@ #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/dist_lock_catalog.h" #include "mongo/s/catalog/dist_lock_manager.h" diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index 0f927c81614..c7ab3484da3 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -49,7 +49,7 @@ #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" diff --git a/src/mongo/s/sharding_uptime_reporter.cpp b/src/mongo/s/sharding_uptime_reporter.cpp index ba5f3690822..0b0d78bc91b 100644 --- a/src/mongo/s/sharding_uptime_reporter.cpp +++ b/src/mongo/s/sharding_uptime_reporter.cpp @@ -34,7 +34,7 @@ #include "mongo/db/client.h" #include "mongo/db/server_options.h" -#include "mongo/s/balancer_configuration.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_mongos.h" #include "mongo/s/grid.h" |