diff options
author | Pierlauro Sciarelli <pierlauro.sciarelli@mongodb.com> | 2022-03-04 12:07:32 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-04 13:13:37 +0000 |
commit | a80c0ef27627e24f3ff601ae234e6a8ec99ad2b3 (patch) | |
tree | bb126ad5766af56033b4e2b0efb2168f8e1f5ad5 /src/mongo | |
parent | ee5dacd18fd01e61b449066e2081515da79a8d66 (diff) | |
download | mongo-a80c0ef27627e24f3ff601ae234e6a8ec99ad2b3.tar.gz |
SERVER-63761 Implement first version of moveRange command (aliasing moveChunk)
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/move_chunk_command.cpp | 254 | ||||
-rw-r--r-- | src/mongo/db/s/move_range_command.cpp | 254 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/request_types/move_range_request.idl | 87 |
5 files changed, 343 insertions, 255 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 60b195acda9..7b6deed301d 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -335,9 +335,9 @@ env.Library( 'merge_chunks_command.cpp', 'migration_chunk_cloner_source_legacy_commands.cpp', 'migration_destination_manager_legacy_commands.cpp', - 'move_chunk_command.cpp', 'move_primary_coordinator.cpp', 'move_primary_coordinator_document.idl', + 'move_range_command.cpp', 'remove_chunks.idl', 'rename_collection_coordinator.cpp', 'rename_collection_participant_service.cpp', diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp deleted file mode 100644 index 6193b998c80..00000000000 --- a/src/mongo/db/s/move_chunk_command.cpp +++ /dev/null @@ -1,254 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/client/remote_command_targeter.h" -#include "mongo/db/auth/action_set.h" -#include "mongo/db/auth/action_type.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/commands.h" -#include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/s/active_migrations_registry.h" -#include "mongo/db/s/chunk_move_write_concern_options.h" -#include "mongo/db/s/migration_source_manager.h" -#include "mongo/db/s/sharding_state.h" -#include "mongo/db/s/sharding_statistics.h" -#include "mongo/logv2/log.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/grid.h" -#include "mongo/s/request_types/move_chunk_request.h" -#include "mongo/util/concurrency/notification.h" -#include "mongo/util/concurrency/thread_pool.h" - -namespace mongo { -namespace { - -const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, - // Note: Even though we're setting UNSET here, - // kMajority implies JOURNAL if journaling is - // supported by mongod and - // writeConcernMajorityJournalDefault is set to true - // in the ReplSetConfig. - WriteConcernOptions::SyncMode::UNSET, - WriteConcernOptions::kWriteConcernTimeoutSharding); - -class MoveChunkCommand : public BasicCommand { -public: - MoveChunkCommand() : BasicCommand("moveChunk") {} - - std::string help() const override { - return "should not be calling this directly"; - } - - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kNever; - } - - bool adminOnly() const override { - return true; - } - - bool supportsWriteConcern(const BSONObj& cmd) const override { - return true; - } - - Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) const override { - if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( - ResourcePattern::forClusterResource(), ActionType::internal)) { - return Status(ErrorCodes::Unauthorized, "Unauthorized"); - } - return Status::OK(); - } - - std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { - return CommandHelpers::parseNsFullyQualified(cmdObj); - } - - bool run(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder&) override { - auto shardingState = ShardingState::get(opCtx); - uassertStatusOK(shardingState->canAcceptShardedCommands()); - - const MoveChunkRequest moveChunkRequest = uassertStatusOK( - MoveChunkRequest::createFromCommand(NamespaceString(parseNs(dbname, cmdObj)), cmdObj)); - - // Make sure we're as up-to-date as possible with shard information. This catches the case - // where we might have changed a shard's host by removing/adding a shard with the same name. - Grid::get(opCtx)->shardRegistry()->reload(opCtx); - - auto scopedMigration = uassertStatusOK( - ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(opCtx, moveChunkRequest)); - - // Check if there is an existing migration running and if so, join it - if (scopedMigration.mustExecute()) { - auto moveChunkComplete = - ExecutorFuture<void>(_getExecutor()) - .then([moveChunkRequest, - scopedMigration = std::move(scopedMigration), - serviceContext = opCtx->getServiceContext()]() mutable { - // This local variable is created to enforce that the scopedMigration is - // destroyed before setting the shared state as ready. - // Note that captured objects of the lambda are destroyed by the executor - // thread after setting the shared state as ready. - auto scopedMigrationLocal(std::move(scopedMigration)); - ThreadClient tc("MoveChunk", serviceContext); - { - stdx::lock_guard<Client> lk(*tc.get()); - tc->setSystemOperationKillableByStepdown(lk); - } - auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); - auto opCtx = uniqueOpCtx.get(); - - { - // Ensure that opCtx will get interrupted in the event of a stepdown. - // This is to ensure that the MigrationSourceManager checks that there - // are no pending migrationCoordinators documents (under the - // ActiveMigrationRegistry lock) on the same term during which the - // migrationCoordinators document will be persisted. - Lock::GlobalLock lk(opCtx, MODE_IX); - } - - // Note: This internal authorization is tied to the lifetime of the client. - AuthorizationSession::get(opCtx->getClient()) - ->grantInternalAuthorization(opCtx->getClient()); - - Status status = {ErrorCodes::InternalError, "Uninitialized value"}; - - try { - _runImpl(opCtx, moveChunkRequest); - status = Status::OK(); - } catch (const DBException& e) { - status = e.toStatus(); - LOGV2_WARNING(23777, - "Chunk move failed with {error}", - "Error while doing moveChunk", - "error"_attr = redact(status)); - - if (status.code() == ErrorCodes::LockTimeout) { - ShardingStatistics::get(opCtx) - .countDonorMoveChunkLockTimeout.addAndFetch(1); - } - } - - scopedMigrationLocal.signalComplete(status); - uassertStatusOK(status); - }); - moveChunkComplete.get(opCtx); - } else { - uassertStatusOK(scopedMigration.waitForCompletion(opCtx)); - } - - if (moveChunkRequest.getWaitForDelete()) { - // Ensure we capture the latest opTime in the system, since range deletion happens - // asynchronously with a different OperationContext. This must be done after the above - // join, because each caller must set the opTime to wait for writeConcern for on its own - // OperationContext. - auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient()); - replClient.setLastOpToSystemLastOpTime(opCtx); - - WriteConcernResult writeConcernResult; - writeConcernResult.wTimedOut = false; - Status majorityStatus = waitForWriteConcern( - opCtx, replClient.getLastOp(), kMajorityWriteConcern, &writeConcernResult); - - if (!majorityStatus.isOK()) { - if (!writeConcernResult.wTimedOut) { - uassertStatusOK(majorityStatus); - } - return false; - } - } - - return true; - } - -private: - static void _runImpl(OperationContext* opCtx, const MoveChunkRequest& moveChunkRequest) { - if (moveChunkRequest.getFromShardId() == moveChunkRequest.getToShardId()) { - // TODO: SERVER-46669 handle wait for delete. - return; - } - - // Resolve the donor and recipient shards and their connection string - auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); - - const auto donorConnStr = - uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getFromShardId())) - ->getConnString(); - const auto recipientHost = uassertStatusOK([&] { - auto recipientShard = - uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getToShardId())); - - return recipientShard->getTargeter()->findHost( - opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); - }()); - - MigrationSourceManager migrationSourceManager( - opCtx, moveChunkRequest, donorConnStr, recipientHost); - - migrationSourceManager.startClone(); - migrationSourceManager.awaitToCatchUp(); - migrationSourceManager.enterCriticalSection(); - migrationSourceManager.commitChunkOnRecipient(); - migrationSourceManager.commitChunkMetadataOnConfig(); - } - -private: - // Returns a single-threaded executor to be used to run moveChunk commands. The executor is - // initialized on the first call to this function. Uses a shared_ptr because a shared_ptr is - // required to work with ExecutorFutures. - static std::shared_ptr<ThreadPool> _getExecutor() { - static Mutex mutex = MONGO_MAKE_LATCH("MoveChunkExecutor::_mutex"); - static std::shared_ptr<ThreadPool> executor; - - stdx::lock_guard<Latch> lg(mutex); - if (!executor) { - ThreadPool::Options options; - options.poolName = "MoveChunk"; - options.minThreads = 0; - // We limit the size of the thread pool to a single thread because currently there can - // only be one moveChunk operation on a shard at a time. - options.maxThreads = 1; - executor = std::make_shared<ThreadPool>(std::move(options)); - executor->startup(); - } - - return executor; - } -} moveChunkCmd; - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/s/move_range_command.cpp b/src/mongo/db/s/move_range_command.cpp new file mode 100644 index 00000000000..48d01214af4 --- /dev/null +++ b/src/mongo/db/s/move_range_command.cpp @@ -0,0 +1,254 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/active_migrations_registry.h" +#include "mongo/db/s/migration_source_manager.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_statistics.h" +#include "mongo/logv2/log.h" +#include "mongo/logv2/redaction.h" +#include "mongo/s/grid.h" +#include "mongo/s/request_types/move_chunk_request.h" +#include "mongo/s/request_types/move_range_request_gen.h" + +namespace mongo { +namespace { + +const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, + // Note: Even though we're setting UNSET here, + // kMajority implies JOURNAL if journaling is + // supported by mongod and + // writeConcernMajorityJournalDefault is set to true + // in the ReplSetConfig. + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kWriteConcernTimeoutSharding); + +class ShardsvrMoveRangeCommand final : public TypedCommand<ShardsvrMoveRangeCommand> { +public: + using Request = ShardsvrMoveRange; + + ShardsvrMoveRangeCommand() + : TypedCommand<ShardsvrMoveRangeCommand>(Request::kCommandName, Request::kCommandAlias) {} + + bool skipApiVersionCheck() const override { + // Internal command (server to server). + return true; + } + + std::string help() const override { + return "Internal command invoked by the config server to move a chunk/range"; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool adminOnly() const override { + return true; + } + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); + opCtx->setAlwaysInterruptAtStepDownOrUp(); + + const auto WC = opCtx->getWriteConcern(); + const auto req = + request().toBSON(BSON(WriteConcernOptions::kWriteConcernField << WC.toBSON())); + + const MoveChunkRequest moveChunkRequest = + uassertStatusOK(MoveChunkRequest::createFromCommand(ns(), req)); + + // Make sure we're as up-to-date as possible with shard information. This catches the + // case where we might have changed a shard's host by removing/adding a shard with the + // same name. + Grid::get(opCtx)->shardRegistry()->reload(opCtx); + + auto scopedMigration = uassertStatusOK( + ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(opCtx, moveChunkRequest)); + + // Check if there is an existing migration running and if so, join it + if (scopedMigration.mustExecute()) { + auto moveChunkComplete = + ExecutorFuture<void>(_getExecutor()) + .then([moveChunkRequest, + scopedMigration = std::move(scopedMigration), + serviceContext = opCtx->getServiceContext()]() mutable { + // This local variable is created to enforce that the scopedMigration is + // destroyed before setting the shared state as ready. + // Note that captured objects of the lambda are destroyed by the + // executor thread after setting the shared state as ready. + auto scopedMigrationLocal(std::move(scopedMigration)); + ThreadClient tc("MoveChunk", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillableByStepdown(lk); + } + auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + + { + // Ensure that opCtx will get interrupted in the event of a + // stepdown. This is to ensure that the MigrationSourceManager + // checks that there are no pending migrationCoordinators documents + // (under the ActiveMigrationRegistry lock) on the same term during + // which the migrationCoordinators document will be persisted. + Lock::GlobalLock lk(opCtx, MODE_IX); + } + + // Note: This internal authorization is tied to the lifetime of the + // client. + AuthorizationSession::get(opCtx->getClient()) + ->grantInternalAuthorization(opCtx->getClient()); + + Status status = {ErrorCodes::InternalError, "Uninitialized value"}; + + try { + _runImpl(opCtx, moveChunkRequest); + status = Status::OK(); + } catch (const DBException& e) { + status = e.toStatus(); + LOGV2_WARNING(23777, + "Chunk move failed with {error}", + "Error while doing moveChunk", + "error"_attr = redact(status)); + + if (status.code() == ErrorCodes::LockTimeout) { + ShardingStatistics::get(opCtx) + .countDonorMoveChunkLockTimeout.addAndFetch(1); + } + } + + scopedMigrationLocal.signalComplete(status); + uassertStatusOK(status); + }); + moveChunkComplete.get(opCtx); + } else { + uassertStatusOK(scopedMigration.waitForCompletion(opCtx)); + } + + if (moveChunkRequest.getWaitForDelete()) { + // Ensure we capture the latest opTime in the system, since range deletion happens + // asynchronously with a different OperationContext. This must be done after the + // above join, because each caller must set the opTime to wait for writeConcern for + // on its own OperationContext. + auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient()); + replClient.setLastOpToSystemLastOpTime(opCtx); + + WriteConcernResult writeConcernResult; + Status majorityStatus = waitForWriteConcern( + opCtx, replClient.getLastOp(), kMajorityWriteConcern, &writeConcernResult); + + uassertStatusOKWithContext( + majorityStatus, "Failed to wait for range deletions after migration commit"); + } + } + + private: + NamespaceString ns() const override { + return request().getCommandParameter(); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + + static void _runImpl(OperationContext* opCtx, const MoveChunkRequest& moveChunkRequest) { + if (moveChunkRequest.getFromShardId() == moveChunkRequest.getToShardId()) { + // TODO: SERVER-46669 handle wait for delete. + return; + } + + // Resolve the donor and recipient shards and their connection string + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + + const auto donorConnStr = + uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getFromShardId())) + ->getConnString(); + const auto recipientHost = uassertStatusOK([&] { + auto recipientShard = uassertStatusOK( + shardRegistry->getShard(opCtx, moveChunkRequest.getToShardId())); + + return recipientShard->getTargeter()->findHost( + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); + }()); + + MigrationSourceManager migrationSourceManager( + opCtx, moveChunkRequest, donorConnStr, recipientHost); + + migrationSourceManager.startClone(); + migrationSourceManager.awaitToCatchUp(); + migrationSourceManager.enterCriticalSection(); + migrationSourceManager.commitChunkOnRecipient(); + migrationSourceManager.commitChunkMetadataOnConfig(); + } + + // Returns a single-threaded executor to be used to run moveChunk commands. The executor is + // initialized on the first call to this function. Uses a shared_ptr because a shared_ptr is + // required to work with ExecutorFutures. + static std::shared_ptr<ThreadPool> _getExecutor() { + static Mutex mutex = MONGO_MAKE_LATCH("MoveChunkExecutor::_mutex"); + static std::shared_ptr<ThreadPool> executor; + + stdx::lock_guard<Latch> lg(mutex); + if (!executor) { + ThreadPool::Options options; + options.poolName = "MoveChunk"; + options.minThreads = 0; + // We limit the size of the thread pool to a single thread because currently there + // can only be one moveRange operation on a shard at a time. + options.maxThreads = 1; + executor = std::make_shared<ThreadPool>(std::move(options)); + executor->startup(); + } + + return executor; + } + }; + +} moveRangeCmd; + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 59107c28a7a..7be21eef763 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -205,6 +205,7 @@ env.Library( 'request_types/migration_secondary_throttle_options.cpp', 'request_types/move_chunk_request.cpp', 'request_types/move_primary.idl', + 'request_types/move_range_request.idl', 'request_types/refine_collection_shard_key.idl', 'request_types/remove_shard_from_zone_request_type.cpp', 'request_types/remove_tags.idl', diff --git a/src/mongo/s/request_types/move_range_request.idl b/src/mongo/s/request_types/move_range_request.idl new file mode 100644 index 00000000000..0289b916ee1 --- /dev/null +++ b/src/mongo/s/request_types/move_range_request.idl @@ -0,0 +1,87 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + - "mongo/s/sharding_types.idl" + +enums: + ForceJumbo: + description: "This enum represents whether or not a migration should attempt to move a large range" + type: int # Not `string` due to backwards compatibility + values: + kDoNotForce: 0 # do not attempt to migrate a large chunk + kForceManual: 1 # manual moveChunk command specified `forceJumbo: true` + kForceBalancer: 2 # balancer specified `forceJumbo: true` + +commands: + moveRange: + command_name: moveRange + command_alias: moveChunk + cpp_name: ShardsvrMoveRange + description: "Definition of the moveRange command called on shards." + namespace: type + type: namespacestring + api_version: "" + strict: false + fields: + epoch: + type: objectid + description: "Epoch of the collection" + default: mongo::OID() + fromShard: + type: shard_id + description: "ID of the donor shard" + toShard: + type: shard_id + description: "ID of the recipient shard" + min: + type: object + description: "The min key of the range to move" + max: + type: object + description: "The max key of the range to move" + maxChunkSizeBytes: + type: safeInt64 + description: "Max size of data to move" + optional: true + waitForDelete: + type: bool + description: "If set, wait for the chunk migration to finish before returning" + default: false + forceJumbo: + type: ForceJumbo + description: "Specifies the policy to use for jumbo chunks" + default: kDoNotForce + secondaryThrottle: + type: bool + description: "Secondary throttle policy to adopt during the migration" + default: false |