From 08f3a57fbb54e72a03e47047a1e8aea0f708f1c7 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 24 Jun 2016 11:12:39 -0400 Subject: SERVER-24532 adding MigrationManager to manage parallel migrations --- src/mongo/db/repl/replication_coordinator_mock.cpp | 7 +- src/mongo/db/repl/replication_coordinator_mock.h | 7 + src/mongo/s/SConscript | 13 + src/mongo/s/balancer/balancer_policy.cpp | 4 + src/mongo/s/balancer/balancer_policy.h | 1 + src/mongo/s/balancer/migration_manager.cpp | 329 +++++++++++++++++++++ src/mongo/s/balancer/migration_manager.h | 192 ++++++++++++ src/mongo/s/balancer/migration_manager_test.cpp | 282 ++++++++++++++++++ src/mongo/s/client/shard_local.cpp | 11 +- src/mongo/s/client/shard_local.h | 2 +- src/mongo/s/client/shard_local_test.cpp | 9 +- src/mongo/s/config_server_test_fixture.cpp | 56 +++- src/mongo/s/config_server_test_fixture.h | 6 +- 13 files changed, 888 insertions(+), 31 deletions(-) create mode 100644 src/mongo/s/balancer/migration_manager.cpp create mode 100644 src/mongo/s/balancer/migration_manager.h create mode 100644 src/mongo/s/balancer/migration_manager_test.cpp diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 0d95c9f3f0e..4a0bb7c8ea8 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -33,7 +33,6 @@ #include "mongo/base/status.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/read_concern_args.h" -#include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/sync_source_resolver.h" #include "mongo/db/storage/snapshot_name.h" #include "mongo/db/write_concern_options.h" @@ -226,7 +225,11 @@ StatusWith ReplicationCoordinatorMock::prepareReplSetUpdatePositionComm } ReplicaSetConfig ReplicationCoordinatorMock::getConfig() const { - return ReplicaSetConfig(); + return _getConfigReturnValue; +} + +void ReplicationCoordinatorMock::setGetConfigReturnValue(ReplicaSetConfig returnValue) { + _getConfigReturnValue = std::move(returnValue); } void ReplicationCoordinatorMock::processReplSetGetConfig(BSONObjBuilder* result) { diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 58e213ed6ee..4dd04f8fb3f 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -30,6 +30,7 @@ #include "mongo/base/status.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/platform/atomic_word.h" @@ -258,12 +259,18 @@ public: virtual Status stepUpIfEligible() override; + /** + * Sets the return value for calls to getConfig. + */ + void setGetConfigReturnValue(ReplicaSetConfig returnValue); + private: AtomicUInt64 _snapshotNameGenerator; const ReplSettings _settings; MemberState _memberState; OpTime _myLastDurableOpTime; OpTime _myLastAppliedOpTime; + ReplicaSetConfig _getConfigReturnValue; }; } // namespace repl diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 8b0b8321aaa..4cb691f5153 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -249,6 +249,7 @@ env.Library( 'balancer/balancer_policy.cpp', 'balancer/cluster_statistics.cpp', 'balancer/cluster_statistics_impl.cpp', + 'balancer/migration_manager.cpp', 'catalog/catalog_cache.cpp', 'chunk.cpp', 'chunk_manager.cpp', @@ -337,6 +338,18 @@ env.CppUnitTest( ] ) +env.CppUnitTest( + target='migration_manager_test', + source=[ + 'balancer/migration_manager_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/repl/replmocks', + 'config_server_test_fixture', + 'coreshard', + ] +) + env.Library( target='local_sharding_info', source=[ diff --git a/src/mongo/s/balancer/balancer_policy.cpp b/src/mongo/s/balancer/balancer_policy.cpp index 205102fe909..12670903785 100644 --- a/src/mongo/s/balancer/balancer_policy.cpp +++ b/src/mongo/s/balancer/balancer_policy.cpp @@ -380,6 +380,10 @@ string TagRange::toString() const { return str::stream() << min << " -->> " << max << " on " << tag; } +std::string MigrateInfo::getName() const { + return ChunkType::genID(ns, minKey); +} + string MigrateInfo::toString() const { return str::stream() << ns << ": [" << minKey << ", " << maxKey << "), from " << from << ", to " << to; diff --git a/src/mongo/s/balancer/balancer_policy.h b/src/mongo/s/balancer/balancer_policy.h index 01fb4cd64af..97aeeb05c64 100644 --- a/src/mongo/s/balancer/balancer_policy.h +++ b/src/mongo/s/balancer/balancer_policy.h @@ -60,6 +60,7 @@ struct MigrateInfo { minKey(a_chunk.getMin()), maxKey(a_chunk.getMax()) {} + std::string getName() const; std::string toString() const; std::string ns; diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp new file mode 100644 index 00000000000..42183f00b03 --- /dev/null +++ b/src/mongo/s/balancer/migration_manager.cpp @@ -0,0 +1,329 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/migration_manager.h" + +#include "mongo/client/remote_command_targeter.h" +#include "mongo/db/operation_context.h" +#include "mongo/executor/task_executor.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/balancer/balancer_configuration.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/move_chunk_request.h" +#include "mongo/s/sharding_raii.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/represent_as.h" + +namespace mongo { + +namespace { + +using executor::RemoteCommandRequest; +using executor::RemoteCommandResponse; + +} // namespace + +MigrationManager::Migration::Migration( + const MigrateInfo& migrateInfo, + boost::optional callbackHandle) + : chunkInfo(&migrateInfo), + moveChunkCallbackHandle(std::move(callbackHandle)), + oldShard(false) {} + +void MigrationManager::Migration::setCallbackHandle( + executor::TaskExecutor::CallbackHandle callbackHandle) { + invariant(!moveChunkCallbackHandle); + moveChunkCallbackHandle = std::move(callbackHandle); +} + +void MigrationManager::Migration::clearCallbackHandle() { + moveChunkCallbackHandle = boost::none; +} + +MigrationManager::DistLockTracker::DistLockTracker( + boost::optional distlock) + : distributedLock(std::move(distlock)) { + if (distlock) { + migrationCounter = 1; + } else { + migrationCounter = 0; + } +} + +MigrationStatuses MigrationManager::scheduleMigrations( + OperationContext* txn, + const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateMigrations) { + invariant(_activeMigrations.empty()); + + MigrationStatuses migrationStatuses; + + for (auto& migrateInfo : candidateMigrations) { + _activeMigrations.push_back(Migration(migrateInfo, boost::none)); + } + + _executeMigrations(txn, &migrationStatuses); + + return migrationStatuses; +} + +void MigrationManager::_executeMigrations(OperationContext* txn, + MigrationStatuses* migrationStatuses) { + auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); + for (auto& migration : _activeMigrations) { + const NamespaceString nss(migration.chunkInfo->ns); + + auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss); + if (!scopedCMStatus.isOK()) { + // Unable to find the ChunkManager for "nss" for whatever reason; abandon this + // migration and proceed to the next. + stdx::lock_guard lk(_mutex); + migrationStatuses->insert(MigrationStatuses::value_type( + migration.chunkInfo->getName(), std::move(scopedCMStatus.getStatus()))); + continue; + } + + ChunkManager* const chunkManager = scopedCMStatus.getValue().cm(); + auto chunk = chunkManager->findIntersectingChunk(txn, migration.chunkInfo->minKey); + + { + // No need to lock the mutex. Only this function and _takeDistLockForAMigration + // manipulate "_distributedLocks". No need to protect serial actions. + if (!_takeDistLockForAMigration(txn, migration, migrationStatuses)) { + // If there is a lock conflict between the balancer and the shard, or a shard and a + // shard, the migration has been rescheduled. Otherwise an attempt to take the lock + // failed for whatever reason and this migration is being abandoned. + continue; + } + } + + BSONObjBuilder builder; + MoveChunkRequest::appendAsCommand( + &builder, + nss, + chunkManager->getVersion(), + Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(), + migration.chunkInfo->from, + migration.chunkInfo->to, + ChunkRange(chunk->getMin(), chunk->getMax()), + balancerConfig->getMaxChunkSizeBytes(), + balancerConfig->getSecondaryThrottle(), + balancerConfig->waitForDelete(), + migration.oldShard ? true : false); // takeDistLock flag. + + BSONObj moveChunkRequestObj = builder.obj(); + + const auto recipientShard = grid.shardRegistry()->getShard(txn, migration.chunkInfo->from); + const auto host = recipientShard->getTargeter()->findHost( + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + RemoteCommandTargeter::selectFindHostMaxWaitTime(txn)); + if (!host.isOK()) { + // Unable to find a target shard for whatever reason; abandon this migration and proceed + // to the next. + stdx::lock_guard lk(_mutex); + migrationStatuses->insert(MigrationStatuses::value_type(migration.chunkInfo->getName(), + std::move(host.getStatus()))); + continue; + } + + RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj); + + StatusWith remoteCommandResponse( + Status{ErrorCodes::InternalError, "Uninitialized value"}); + + executor::TaskExecutor* executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor(); + + StatusWith callbackHandleWithStatus = + executor->scheduleRemoteCommand(remoteRequest, + stdx::bind(&MigrationManager::_checkMigrationCallback, + this, + stdx::placeholders::_1, + txn, + &migration, + migrationStatuses)); + + if (!callbackHandleWithStatus.isOK()) { + // Scheduling the migration moveChunk failed. + stdx::lock_guard lk(_mutex); + migrationStatuses->insert(MigrationStatuses::value_type( + migration.chunkInfo->getName(), std::move(callbackHandleWithStatus.getStatus()))); + continue; + } + + // The moveChunk command was successfully scheduled. Store the callback handle so that the + // command's return can be waited for later. + stdx::lock_guard lk(_mutex); + migration.setCallbackHandle(std::move(callbackHandleWithStatus.getValue())); + } + + _waitForMigrations(txn); + // At this point, there are no parallel running threads so it is safe not to lock the mutex. + + // All the migrations have returned, release all of the distributed locks that are no longer + // being used. + _distributedLocks.clear(); + + // If there are rescheduled migrations, move them to active and run the function again. + if (!_rescheduledMigrations.empty()) { + // Clear all the callback handles of the rescheduled migrations. + for (auto migration : _rescheduledMigrations) { + migration.clearCallbackHandle(); + } + _activeMigrations = std::move(_rescheduledMigrations); + _rescheduledMigrations.clear(); + _executeMigrations(txn, migrationStatuses); + } else { + _activeMigrations.clear(); + } +} + +void MigrationManager::_checkMigrationCallback( + const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackArgs, + OperationContext* txn, + Migration* migration, + MigrationStatuses* migrationStatuses) { + const auto& remoteCommandResponseWithStatus = callbackArgs.response; + + if (!remoteCommandResponseWithStatus.isOK()) { + stdx::lock_guard lk(_mutex); + migrationStatuses->insert( + MigrationStatuses::value_type(migration->chunkInfo->getName(), + std::move(remoteCommandResponseWithStatus.getStatus()))); + return; + } + + Status commandStatus = + getStatusFromCommandResult(remoteCommandResponseWithStatus.getValue().data); + if (commandStatus == ErrorCodes::LockBusy && !migration->oldShard) { + migration->oldShard = true; + stdx::lock_guard lk(_mutex); + _rescheduleMigration(*migration); + return; + } + + stdx::lock_guard lk(_mutex); + migrationStatuses->insert( + MigrationStatuses::value_type(migration->chunkInfo->getName(), std::move(commandStatus))); +} + +void MigrationManager::_waitForMigrations(OperationContext* txn) const { + executor::TaskExecutor* executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor(); + for (const auto& migration : _activeMigrations) { + // Block until the command is carried out. + if (migration.moveChunkCallbackHandle) { + executor->wait(migration.moveChunkCallbackHandle.get()); + } + } +} + +void MigrationManager::_rescheduleMigration(const Migration& migration) { + _rescheduledMigrations.push_back(migration); +} + +bool MigrationManager::_takeDistLockForAMigration(OperationContext* txn, + const Migration& migration, + MigrationStatuses* migrationStatuses) { + auto it = _distributedLocks.find(migration.chunkInfo->ns); + + if (it == _distributedLocks.end()) { + // Neither the balancer nor the shard has the distributed collection lock for "ns". + if (migration.oldShard) { + DistLockTracker distLockTracker(boost::none); + _distributedLocks.insert(std::map::value_type( + migration.chunkInfo->ns, std::move(distLockTracker))); + } else { + auto distlock = _getDistLock(txn, migration); + if (!distlock.isOK()) { + // Abandon the migration so the balancer doesn't reschedule endlessly if whatever is + // preventing the distlock from being acquired doesn't go away. + stdx::lock_guard lk(_mutex); + migrationStatuses->insert(MigrationStatuses::value_type( + migration.chunkInfo->getName(), std::move(distlock.getStatus()))); + return false; + } + DistLockTracker distLockTracker(std::move(distlock.getValue())); + _distributedLocks.insert(std::map::value_type( + migration.chunkInfo->ns, std::move(distLockTracker))); + } + } else { + DistLockTracker* distLockTracker = &(it->second); + if (!distLockTracker->distributedLock) { + // Lock conflict. A shard holds the lock for a different migration. + invariant(distLockTracker->migrationCounter == 0 && !distLockTracker->distributedLock); + _rescheduleMigration(migration); + return false; + } else { + invariant(distLockTracker->distributedLock && distLockTracker->migrationCounter > 0); + if (migration.oldShard) { + // Lock conflict. The balancer holds the lock, so the shard cannot take it yet. + _rescheduleMigration(migration); + return false; + } else { + ++(distLockTracker->migrationCounter); + } + } + } + + return true; +} + +StatusWith MigrationManager::_getDistLock( + OperationContext* txn, const Migration& migration) { + const std::string whyMessage( + str::stream() + << "migrating chunk " + << ChunkRange(migration.chunkInfo->minKey, migration.chunkInfo->maxKey).toString() + << " in " + << migration.chunkInfo->ns); + + StatusWith distLockStatus = + Grid::get(txn)->catalogClient(txn)->distLock(txn, migration.chunkInfo->ns, whyMessage); + + if (!distLockStatus.isOK()) { + const std::string msg = str::stream() + << "Could not acquire collection lock for " << migration.chunkInfo->ns + << " to migrate chunk " + << ChunkRange(migration.chunkInfo->minKey, migration.chunkInfo->maxKey).toString() + << " due to " << distLockStatus.getStatus().toString(); + warning() << msg; + return {distLockStatus.getStatus().code(), msg}; + } + + return std::move(distLockStatus.getValue()); +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/migration_manager.h b/src/mongo/s/balancer/migration_manager.h new file mode 100644 index 00000000000..8089197de9c --- /dev/null +++ b/src/mongo/s/balancer/migration_manager.h @@ -0,0 +1,192 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/executor/task_executor.h" +#include "mongo/s/balancer/balancer_chunk_selection_policy.h" +#include "mongo/s/catalog/dist_lock_manager.h" +#include "mongo/stdx/mutex.h" + +namespace mongo { + +class OperationContext; +class Status; +class MigrationSecondaryThrottleOptions; + +// Uniquely identifies a migration, regardless of shard and version. +typedef std::string MigrationIdentifier; +typedef std::map MigrationStatuses; + +/** + * Manages and executes parallel migrations for the balancer. + * + * TODO: for v3.6, remove code making compatible with v3.2 shards that take distlock. + */ +class MigrationManager { + MONGO_DISALLOW_COPYING(MigrationManager); + +public: + MigrationManager() = default; + ~MigrationManager() = default; + + /** + * A blocking method that attempts to schedule all the migrations specified in + * "candidateMigrations". Takes the distributed lock for each collection with a chunk being + * migrated. + * + * Returns a map of migration Status objects to indicate the success/failure of each migration. + */ + MigrationStatuses scheduleMigrations( + OperationContext* txn, + const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateMigrations); + +private: + /** + * Holds the data associated with an ongoing migration. Stores a callback handle for the + * moveChunk command when one is scheduled. Also holds a flag that indicates the source shard is + * v3.2 and must take the distributed lock itself. + */ + struct Migration { + Migration(const MigrateInfo& migrateInfo, + boost::optional callbackHandle); + + void setCallbackHandle(executor::TaskExecutor::CallbackHandle callbackHandle); + void clearCallbackHandle(); + + // Pointer to the chunk details. + const MigrateInfo* chunkInfo; + + // Callback handle for the active moveChunk request. If no migration is active for the chunk + // specified in "chunkInfo", this won't be set. + boost::optional moveChunkCallbackHandle; + + // Indicates that the first moveChunk request failed with LockBusy. The second attempt must + // be made without the balancer holding the collection distlock. This is necessary for + // compatibility with a v3.2 shard, which expects to take the distlock itself. + bool oldShard; + }; + + /** + * Manages and maintains a collection distlock, which should normally be held by the balancer, + * but in the case of a migration with a v3.2 source shard the balancer must release it in order + * to allow the shard to acquire it. + */ + struct DistLockTracker { + DistLockTracker(boost::optional distlock); + + // Holds the distributed lock, if the balancer should hold it for the migration. If this is + // empty, then a shard has the distlock. + boost::optional distributedLock; + + // The number of migrations that are currently using the balancer held distributed lock. + int migrationCounter; + }; + + /** + * Blocking function that schedules all the migrations prepared in "_activeMigrations" and then + * waits for them all to complete. This is also where the distributed locks are taken. Some + * migrations may be rescheduled for a recursive call of this function if there are distributed + * lock conflicts. A lock conflict can occur when: + * 1) The source shard of a migration is v3.2 and expects to take the lock itself and the + * balancer already holds it for a different migration. + * 2) A v3.2 shard already has the distlock, so it isn't free for either the balancer to + * take or another v3.2 shard. + * All lock conflicts are resolved by waiting for all of the scheduled migrations to complete, + * at which point all the locks are safely released. + * + * All the moveChunk command Status results are placed in "migrationStatuses". + */ + void _executeMigrations(OperationContext* txn, + std::map* migrationStatuses); + + /** + * Callback function that checks a remote command response for errors. If there is a LockBusy + * error, the first time this happens the shard starting the migration is assumed to be v3.2 and + * is marked such and rescheduled. On other errors, the migration is abandoned. Places all of + * the Status results from the moveChunk commands in "migrationStatuses". + */ + void _checkMigrationCallback( + const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackArgs, + OperationContext* txn, + Migration* migration, + std::map* migrationStatuses); + + /** + * Goes through the callback handles in "_activeMigrations" and waits for the moveChunk commands + * to return. + */ + void _waitForMigrations(OperationContext* txn) const; + + /** + * Adds "migration" to "_rescheduledMigrations" vector. + */ + void _rescheduleMigration(const Migration& migration); + + /** + * Attempts to take a distlock for collection "ns", if appropriate. It may + * 1) Take the distlock for the balancer and initialize the counter to 1. + * 2) Increment the counter on the distlock that the balancer already holds. + * 3) Initialize the counter to 0 to indicate a migration with a v3.2 shard, where the shard + * will take the distlock. + * + * If none of these actions are possible because of a lock conflict (shard can't take the lock + * if the balancer already holds it, or vice versa) or if the lock is unavailable, returns + * false to indicate that the migration cannot proceed right now. If the lock could not be + * taken because of a lock conflict as described, then the migration is rescheduled; otherwise + * it is abandoned. + * + * If an attempt to acquire the distributed lock fails and the migration is abandoned, the error + * Status is placed in "migrationStatuses". + */ + bool _takeDistLockForAMigration(OperationContext* txn, + const Migration& migration, + MigrationStatuses* migrationStatuses); + + /** + * Attempts to acquire the distributed collection lock necessary required for "migration". + */ + StatusWith _getDistLock(OperationContext* txn, + const Migration& migration); + + // Protects class variables when migrations run in parallel. + stdx::mutex _mutex; + + // Holds information about each ongoing migration. + std::vector _activeMigrations; + + // Temporary container for migrations that must be rescheduled. After all of the + // _activeMigrations are finished, this variable is used to reset _activeMigrations before + // executing migrations again. + std::vector _rescheduledMigrations; + + // Manages the distributed locks and whether the balancer or shard holds them. + std::map _distributedLocks; +}; + +} // namespace mongo diff --git a/src/mongo/s/balancer/migration_manager_test.cpp b/src/mongo/s/balancer/migration_manager_test.cpp new file mode 100644 index 00000000000..ed419ba5e77 --- /dev/null +++ b/src/mongo/s/balancer/migration_manager_test.cpp @@ -0,0 +1,282 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/client.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/s/balancer/migration_manager.h" +#include "mongo/s/catalog/dist_lock_manager_mock.h" +#include "mongo/s/catalog/replset/sharding_catalog_client_impl.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_database.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/config_server_test_fixture.h" +#include "mongo/s/move_chunk_request.h" +#include "mongo/stdx/memory.h" + +namespace mongo { +namespace { + +using executor::RemoteCommandRequest; +using executor::RemoteCommandResponse; + +const auto kShardId0 = ShardId("shard0"); +const auto kShardId1 = ShardId("shard1"); +const auto kShardId2 = ShardId("shard2"); +const auto kShardId3 = ShardId("shard3"); + +const HostAndPort kShardHost0 = HostAndPort("TestHost0", 12345); +const HostAndPort kShardHost1 = HostAndPort("TestHost1", 12346); +const HostAndPort kShardHost2 = HostAndPort("TestHost2", 12347); +const HostAndPort kShardHost3 = HostAndPort("TestHost3", 12348); + +const long long kMaxSizeMB = 100; +const std::string kPattern = "_id"; + +const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + Seconds(15)); + +class MigrationManagerTest : public ConfigServerTestFixture { +protected: + std::unique_ptr _migrationManager; + + /** + * Returns the mock targeter for the specified shard. Useful to use like so + * + * shardTargeterMock(txn, shardId)->setFindHostReturnValue(shardHost); + * + * Then calls to RemoteCommandTargeterMock::findHost will return HostAndPort "shardHost" for + * Shard "shardId". + * + * Scheduling a command requires a shard host target. The command will be caught by the mock + * network, but sending the command requires finding the shard's host. + */ + std::shared_ptr shardTargeterMock(OperationContext* txn, + ShardId shardId); + + /** + * Inserts a document into the config.databases collection to indicate that "dbName" is sharded + * with primary "primaryShard". + */ + void setUpDatabase(const std::string& dbName, const ShardId primaryShard); + + /** + * Inserts a document into the config.collections collection to indicate that "collName" is + * sharded with version "version". The shard key pattern defaults to "_id". + */ + void setUpCollection(const std::string collName, ChunkVersion version); + + /** + * Inserts a document into the config.chunks collection so that the chunk defined by the + * parameters exists. Returns a ChunkType defined by the parameters. + */ + ChunkType setUpChunk(const std::string& collName, + const BSONObj& chunkMin, + const BSONObj& chunkMax, + const ShardId& fromShardId, + const ShardId& toShardId, + const ChunkVersion& version); + + /** + * Sets up mock network to expect a moveChunk command and return "returnStatus". + */ + void expectMoveChunkCommand(const ChunkType& chunk, + const ShardId& toShardId, + const bool& takeDistLock, + const Status& returnStatus); + +private: + void setUp() override; + void tearDown() override; + +public: + // Random static initialization order can result in X constructor running before Y constructor + // if X and Y are defined in different source files. Defining variables here to enforce order. + const BSONObj shardType0 = + BSON(ShardType::name(kShardId0.toString()) << ShardType::host(kShardHost0.toString()) + << ShardType::maxSizeMB(kMaxSizeMB)); + const BSONObj shardType1 = + BSON(ShardType::name(kShardId1.toString()) << ShardType::host(kShardHost1.toString()) + << ShardType::maxSizeMB(kMaxSizeMB)); + const BSONObj shardType2 = + BSON(ShardType::name(kShardId2.toString()) << ShardType::host(kShardHost2.toString()) + << ShardType::maxSizeMB(kMaxSizeMB)); + const BSONObj shardType3 = + BSON(ShardType::name(kShardId3.toString()) << ShardType::host(kShardHost3.toString()) + << ShardType::maxSizeMB(kMaxSizeMB)); + const KeyPattern kKeyPattern = KeyPattern(BSON(kPattern << 1)); +}; + +void MigrationManagerTest::setUp() { + _migrationManager = stdx::make_unique(); + ConfigServerTestFixture::setUp(); +} + +void MigrationManagerTest::tearDown() { + _migrationManager.reset(); + ConfigServerTestFixture::tearDown(); +} + +std::shared_ptr MigrationManagerTest::shardTargeterMock( + OperationContext* txn, ShardId shardId) { + return RemoteCommandTargeterMock::get(shardRegistry()->getShard(txn, shardId)->getTargeter()); +} + +void MigrationManagerTest::setUpDatabase(const std::string& dbName, const ShardId primaryShard) { + DatabaseType db; + db.setName(dbName); + db.setPrimary(primaryShard); + db.setSharded(true); + ASSERT(catalogClient() + ->insertConfigDocument( + operationContext(), DatabaseType::ConfigNS, db.toBSON(), kMajorityWriteConcern) + .isOK()); +} + +void MigrationManagerTest::setUpCollection(const std::string collName, ChunkVersion version) { + CollectionType coll; + coll.setNs(NamespaceString(collName)); + coll.setEpoch(version.epoch()); + coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(version.toLong())); + coll.setKeyPattern(kKeyPattern); + coll.setUnique(false); + ASSERT( + catalogClient() + ->insertConfigDocument( + operationContext(), CollectionType::ConfigNS, coll.toBSON(), kMajorityWriteConcern) + .isOK()); +} + +ChunkType MigrationManagerTest::setUpChunk(const std::string& collName, + const BSONObj& chunkMin, + const BSONObj& chunkMax, + const ShardId& fromShardId, + const ShardId& toShardId, + const ChunkVersion& version) { + ChunkType chunk; + chunk.setNS(collName); + chunk.setMin(chunkMin); + chunk.setMax(chunkMax); + chunk.setShard(fromShardId); + chunk.setVersion(version); + ASSERT(catalogClient() + ->insertConfigDocument( + operationContext(), ChunkType::ConfigNS, chunk.toBSON(), kMajorityWriteConcern) + .isOK()); + return chunk; +} + +void MigrationManagerTest::expectMoveChunkCommand(const ChunkType& chunk, + const ShardId& toShardId, + const bool& takeDistLock, + const Status& returnStatus) { + onCommand( + [&chunk, &toShardId, &takeDistLock, &returnStatus](const RemoteCommandRequest& request) { + NamespaceString nss(request.cmdObj.firstElement().valueStringData()); + ASSERT_EQ(chunk.getNS(), nss.ns()); + + const StatusWith moveChunkRequestWithStatus = + MoveChunkRequest::createFromCommand(nss, request.cmdObj); + ASSERT(moveChunkRequestWithStatus.isOK()); + + ASSERT_EQ(chunk.getNS(), moveChunkRequestWithStatus.getValue().getNss().ns()); + ASSERT_EQ(chunk.getMin(), moveChunkRequestWithStatus.getValue().getMinKey()); + ASSERT_EQ(chunk.getMax(), moveChunkRequestWithStatus.getValue().getMaxKey()); + ASSERT_EQ(chunk.getShard(), moveChunkRequestWithStatus.getValue().getFromShardId()); + + ASSERT_EQ(toShardId, moveChunkRequestWithStatus.getValue().getToShardId()); + ASSERT_EQ(takeDistLock, moveChunkRequestWithStatus.getValue().getTakeDistLock()); + + if (returnStatus.isOK()) { + return BSON("ok" << 1); + } else { + BSONObjBuilder builder; + builder.append("ok", 0); + builder.append("code", returnStatus.code()); + builder.append("errmsg", returnStatus.reason()); + return builder.obj(); + } + }); +} + +TEST_F(MigrationManagerTest, TwoSameCollectionMigrations) { + // Set up two shards in the metadata. + ASSERT(catalogClient() + ->insertConfigDocument( + operationContext(), ShardType::ConfigNS, shardType0, kMajorityWriteConcern) + .isOK()); + ASSERT(catalogClient() + ->insertConfigDocument( + operationContext(), ShardType::ConfigNS, shardType2, kMajorityWriteConcern) + .isOK()); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up two chunks in the metadata. + BalancerChunkSelectionPolicy::MigrateInfoVector candidateChunks; + ChunkType chunk1 = setUpChunk( + collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, kShardId1, version); + version.incMinor(); + ChunkType chunk2 = setUpChunk( + collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, kShardId3, version); + + // Going to request that these two chunks get migrated. + candidateChunks.push_back(MigrateInfo(chunk1.getNS(), kShardId1, chunk1)); + candidateChunks.push_back(MigrateInfo(chunk2.getNS(), kShardId3, chunk2)); + + auto future = launchAsync([this, candidateChunks] { + Client::initThreadIfNotAlready("Test"); + auto txn = cc().makeOperationContext(); + + // Scheduling the moveChunk commands requires finding a host to which to send the command. + // Set up some dummy hosts -- moveChunk commands are going to hit the mock network anyway. + shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost3); + + _migrationManager->scheduleMigrations(txn.get(), candidateChunks); + }); + + // Expect two moveChunk commands. + expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK()); + expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK()); + + // Run the MigrationManager code. + future.timed_get(kFutureTimeout); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/client/shard_local.cpp b/src/mongo/s/client/shard_local.cpp index 3eca849b2b5..085dbce0e40 100644 --- a/src/mongo/s/client/shard_local.cpp +++ b/src/mongo/s/client/shard_local.cpp @@ -54,13 +54,14 @@ const Status kInternalErrorStatus{ErrorCodes::InternalError, "Invalid to check for write concern error if command failed"}; } // namespace +ShardLocal::ShardLocal(const ShardId& id) : Shard(id) { + // Currently ShardLocal only works for config servers. If we ever start using ShardLocal on + // shards we'll need to consider how to handle shards. + invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer); +} + const ConnectionString ShardLocal::getConnString() const { auto replCoord = repl::getGlobalReplicationCoordinator(); - - // Currently ShardLocal only works for config servers, which must be replica sets. If we - // ever start using ShardLocal on shards we'll need to consider how to handle shards that are - // not replica sets. - invariant(replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet); return replCoord->getConfig().getConnectionString(); } diff --git a/src/mongo/s/client/shard_local.h b/src/mongo/s/client/shard_local.h index 27e2edbea40..56358b449dc 100644 --- a/src/mongo/s/client/shard_local.h +++ b/src/mongo/s/client/shard_local.h @@ -40,7 +40,7 @@ class ShardLocal : public Shard { MONGO_DISALLOW_COPYING(ShardLocal); public: - explicit ShardLocal(const ShardId& id) : Shard(id) {} + explicit ShardLocal(const ShardId& id); ~ShardLocal() = default; diff --git a/src/mongo/s/client/shard_local_test.cpp b/src/mongo/s/client/shard_local_test.cpp index b79fef28600..35a9359b372 100644 --- a/src/mongo/s/client/shard_local_test.cpp +++ b/src/mongo/s/client/shard_local_test.cpp @@ -82,6 +82,7 @@ void ShardLocalTest::setUp() { ServiceContextMongoDTest::setUp(); Client::initThreadIfNotAlready(); _txn = getGlobalServiceContext()->makeOperationContext(&cc()); + serverGlobalParams.clusterRole = ClusterRole::ConfigServer; _shardLocal = stdx::make_unique(ShardId("shardOrConfig")); const repl::ReplSettings replSettings = {}; repl::setGlobalReplicationCoordinator(new repl::ReplicationCoordinatorMock(replSettings)); @@ -153,7 +154,7 @@ StatusWith ShardLocalTest::runFindQuery(NamespaceString ns } TEST_F(ShardLocalTest, RunCommand) { - NamespaceString nss("foo.bar"); + NamespaceString nss("admin.bar"); StatusWith findAndModifyResponse = runFindAndModifyRunCommand( nss, BSON("fooItem" << 1), BSON("$set" << BSON("fooRandom" << 254))); @@ -165,7 +166,7 @@ TEST_F(ShardLocalTest, RunCommand) { } TEST_F(ShardLocalTest, FindOneWithoutLimit) { - NamespaceString nss("foo.bar"); + NamespaceString nss("admin.bar"); // Set up documents to be queried. StatusWith findAndModifyResponse = runFindAndModifyRunCommand( @@ -189,7 +190,7 @@ TEST_F(ShardLocalTest, FindOneWithoutLimit) { } TEST_F(ShardLocalTest, FindManyWithLimit) { - NamespaceString nss("foo.bar"); + NamespaceString nss("admin.bar"); // Set up documents to be queried. StatusWith findAndModifyResponse = runFindAndModifyRunCommand( @@ -219,7 +220,7 @@ TEST_F(ShardLocalTest, FindManyWithLimit) { } TEST_F(ShardLocalTest, FindNoMatchingDocumentsEmpty) { - NamespaceString nss("foo.bar"); + NamespaceString nss("admin.bar"); // Set up a document. StatusWith findAndModifyResponse = runFindAndModifyRunCommand( diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp index c04d0df2a06..aed55611dc1 100644 --- a/src/mongo/s/config_server_test_fixture.cpp +++ b/src/mongo/s/config_server_test_fixture.cpp @@ -52,7 +52,8 @@ #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" -#include "mongo/s/catalog/dist_lock_manager_mock.h" +#include "mongo/s/catalog/replset/dist_lock_catalog_impl.h" +#include "mongo/s/catalog/replset/replset_dist_lock_manager.h" #include "mongo/s/catalog/replset/sharding_catalog_client_impl.h" #include "mongo/s/catalog/replset/sharding_catalog_manager_impl.h" #include "mongo/s/catalog/type_changelog.h" @@ -104,8 +105,23 @@ void ConfigServerTestFixture::setUp() { _opCtx = cc().makeOperationContext(); repl::ReplSettings replSettings; - repl::ReplicationCoordinator::set( - serviceContext, stdx::make_unique(replSettings)); + auto replCoord = stdx::make_unique(replSettings); + repl::ReplicaSetConfig config; + config.initialize(BSON("_id" + << "mySet" + << "protocolVersion" + << 1 + << "version" + << 3 + << "members" + << BSON_ARRAY(BSON("host" + << "node2:12345" + << "_id" + << 1)))); + replCoord->setGetConfigReturnValue(config); + repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord)); + + serverGlobalParams.clusterRole = ClusterRole::ConfigServer; // Set up executor pool used for most operations. auto fixedNet = stdx::make_unique(); @@ -131,18 +147,6 @@ void ConfigServerTestFixture::setUp() { _addShardNetworkTestEnv = stdx::make_unique(specialExec.get(), specialMockNet); _executorForAddShard = specialExec.get(); - auto uniqueDistLockManager = stdx::make_unique(); - _distLockManager = uniqueDistLockManager.get(); - std::unique_ptr catalogClient( - stdx::make_unique(std::move(uniqueDistLockManager))); - _catalogClient = catalogClient.get(); - catalogClient->startup(); - - std::unique_ptr catalogManager( - stdx::make_unique(_catalogClient, std::move(specialExec))); - _catalogManager = catalogManager.get(); - catalogManager->startup(); - auto targeterFactory(stdx::make_unique()); auto targeterFactoryPtr = targeterFactory.get(); _targeterFactory = targeterFactoryPtr; @@ -177,6 +181,23 @@ void ConfigServerTestFixture::setUp() { stdx::make_unique(std::move(shardFactory), ConnectionString::forLocal())); executorPool->startup(); + auto distLockCatalog = stdx::make_unique(shardRegistry.get()); + + auto uniqueDistLockManager = + stdx::make_unique(serviceContext, + "distLockProcessId", + std::move(distLockCatalog), + ReplSetDistLockManager::kDistLockPingInterval, + ReplSetDistLockManager::kDistLockExpirationTime); + _distLockManager = uniqueDistLockManager.get(); + std::unique_ptr catalogClient( + stdx::make_unique(std::move(uniqueDistLockManager))); + _catalogClient = catalogClient.get(); + + std::unique_ptr catalogManager( + stdx::make_unique(_catalogClient, std::move(specialExec))); + _catalogManager = catalogManager.get(); + // For now initialize the global grid object. All sharding objects will be accessible from there // until we get rid of it. grid.init(std::move(catalogClient), @@ -187,6 +208,9 @@ void ConfigServerTestFixture::setUp() { stdx::make_unique(), std::move(executorPool), _mockNetwork); + + _catalogClient->startup(); + _catalogManager->startup(); } void ConfigServerTestFixture::tearDown() { @@ -255,7 +279,7 @@ MessagingPortMock* ConfigServerTestFixture::getMessagingPort() const { return _messagePort.get(); } -DistLockManagerMock* ConfigServerTestFixture::distLock() const { +ReplSetDistLockManager* ConfigServerTestFixture::distLock() const { invariant(_distLockManager); return _distLockManager; } diff --git a/src/mongo/s/config_server_test_fixture.h b/src/mongo/s/config_server_test_fixture.h index 7deabf1b323..38d348a78f0 100644 --- a/src/mongo/s/config_server_test_fixture.h +++ b/src/mongo/s/config_server_test_fixture.h @@ -42,7 +42,7 @@ class BSONObj; class CatalogCache; struct ChunkVersion; class CollectionType; -class DistLockManagerMock; +class ReplSetDistLockManager; class NamespaceString; class Shard; class ShardFactoryMock; @@ -100,7 +100,7 @@ public: MessagingPortMock* getMessagingPort() const; - DistLockManagerMock* distLock() const; + ReplSetDistLockManager* distLock() const; OperationContext* operationContext() const; @@ -172,7 +172,7 @@ private: executor::TaskExecutor* _executorForAddShard; std::unique_ptr _networkTestEnv; std::unique_ptr _addShardNetworkTestEnv; - DistLockManagerMock* _distLockManager = nullptr; + ReplSetDistLockManager* _distLockManager = nullptr; ShardingCatalogClientImpl* _catalogClient = nullptr; ShardingCatalogManagerImpl* _catalogManager = nullptr; }; -- cgit v1.2.1