diff options
author | Pierlauro Sciarelli <pierlauro.sciarelli@mongodb.com> | 2021-02-08 16:56:13 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-09 12:08:13 +0000 |
commit | 5ad5c0409eace36efc15cfaa2d8d2936a09639b1 (patch) | |
tree | cb698a8f3383e2515f81d35105c08c42b1117bd5 | |
parent | 29a838f9193ba88c7ba8195fd44eb252a80be5f8 (diff) | |
download | mongo-5ad5c0409eace36efc15cfaa2d8d2936a09639b1.tar.gz |
SERVER-54387 Create renameCollection DDL coordinator
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/rename_collection_coordinator.cpp | 200 | ||||
-rw-r--r-- | src/mongo/db/s/rename_collection_coordinator.h | 62 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_rename_collection_command.cpp | 127 |
4 files changed, 268 insertions, 122 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index e2dd0f71059..7b7cef23bcc 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -341,6 +341,7 @@ env.Library( 'migration_destination_manager_legacy_commands.cpp', 'move_chunk_command.cpp', 'move_primary_command.cpp', + 'rename_collection_coordinator.cpp', 'set_shard_version_command.cpp', 'sharding_ddl_coordinator.cpp', 'sharding_server_status.cpp', diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp new file mode 100644 index 00000000000..ee8a1bceabb --- /dev/null +++ b/src/mongo/db/s/rename_collection_coordinator.cpp @@ -0,0 +1,200 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/db/s/rename_collection_coordinator.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/rename_collection.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/commands.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/dist_lock_manager.h" +#include "mongo/db/s/shard_metadata_util.h" +#include "mongo/db/s/sharding_ddl_util.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/logv2/log.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/grid.h" +#include "mongo/s/request_types/sharded_ddl_commands_gen.h" +#include "mongo/s/sharded_collections_ddl_parameters_gen.h" + +namespace mongo { +namespace { + +void sendCommandToParticipants(OperationContext* opCtx, + StringData db, + StringData cmdName, + const BSONObj& cmd) { + const auto selfShardId = ShardingState::get(opCtx)->shardId(); + auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + const auto allShardIds = shardRegistry->getAllShardIds(opCtx); + + for (const auto& shardId : allShardIds) { + if (shardId == selfShardId) { + continue; + } + + auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId)); + const auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + db.toString(), + CommandHelpers::appendMajorityWriteConcern(cmd), + Shard::RetryPolicy::kNoRetry)); + uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(cmdResponse), + str::stream() + << "Error processing " << cmdName << " on shard" << shardId); + } +} + +} // namespace + +RenameCollectionCoordinator::RenameCollectionCoordinator(OperationContext* opCtx, + const NamespaceString& _nss, + const NamespaceString& toNss, + bool dropTarget, + bool stayTemp) + : ShardingDDLCoordinator(opCtx, _nss), + _serviceContext(opCtx->getServiceContext()), + _toNss(toNss), + _dropTarget(dropTarget), + _stayTemp(stayTemp){}; + +SemiFuture<void> RenameCollectionCoordinator::runImpl( + std::shared_ptr<executor::TaskExecutor> executor) { + return ExecutorFuture<void>(executor, Status::OK()) + .then([this, anchor = shared_from_this()]() { + ThreadClient tc{"RenameCollectionCoordinator", _serviceContext}; + auto opCtxHolder = tc->makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + _forwardableOpMetadata.setOn(opCtx); + + uassert(ErrorCodes::CommandFailed, + "Source and destination collections must be on the same database.", + _nss.db() == _toNss.db()); + + auto distLockManager = DistLockManager::get(opCtx->getServiceContext()); + const auto dbDistLock = uassertStatusOK(distLockManager->lock( + opCtx, _nss.db(), "RenameCollection", DistLockManager::kDefaultLockTimeout)); + const auto fromCollDistLock = uassertStatusOK(distLockManager->lock( + opCtx, _nss.ns(), "RenameCollection", DistLockManager::kDefaultLockTimeout)); + const auto toCollDistLock = uassertStatusOK(distLockManager->lock( + opCtx, _toNss.ns(), "RenameCollection", DistLockManager::kDefaultLockTimeout)); + + RenameCollectionOptions options{_dropTarget, _stayTemp}; + + sharding_ddl_util::checkShardedRenamePreconditions(opCtx, _toNss, options.dropTarget); + + { + // Take the source collection critical section + AutoGetCollection sourceCollLock(opCtx, _nss, MODE_X); + auto* const fromCsr = CollectionShardingRuntime::get(opCtx, _nss); + auto fromCsrLock = + CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, fromCsr); + fromCsr->enterCriticalSectionCatchUpPhase(fromCsrLock); + fromCsr->enterCriticalSectionCommitPhase(fromCsrLock); + } + + { + // Take the destination collection critical section + AutoGetCollection targetCollLock(opCtx, _toNss, MODE_X); + auto* const toCsr = CollectionShardingRuntime::get(opCtx, _toNss); + auto toCsrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, toCsr); + if (!toCsr->getCurrentMetadataIfKnown()) { + // Setting metadata to UNSHARDED (can't be UNKNOWN when taking the critical + // section) + toCsr->setFilteringMetadata(opCtx, CollectionMetadata()); + } + toCsr->enterCriticalSectionCatchUpPhase(toCsrLock); + toCsr->enterCriticalSectionCommitPhase(toCsrLock); + } + + // Rename the collection locally and clear the cache + validateAndRunRenameCollection(opCtx, _nss, _toNss, options); + uassertStatusOK(shardmetadatautil::dropChunksAndDeleteCollectionsEntry(opCtx, _nss)); + + // Rename the collection locally on all other shards + ShardsvrRenameCollectionParticipant renameCollParticipantRequest(_nss); + renameCollParticipantRequest.setDbName(_nss.db()); + renameCollParticipantRequest.setDropTarget(_dropTarget); + renameCollParticipantRequest.setStayTemp(_stayTemp); + renameCollParticipantRequest.setTo(_toNss); + sendCommandToParticipants(opCtx, + _nss.db(), + ShardsvrRenameCollectionParticipant::kCommandName, + renameCollParticipantRequest.toBSON({})); + + sharding_ddl_util::shardedRenameMetadata(opCtx, _nss, _toNss); + + // Unblock participants for r/w on source and destination collections + ShardsvrRenameCollectionUnblockParticipant unblockParticipantRequest(_nss); + unblockParticipantRequest.setDbName(_nss.db()); + unblockParticipantRequest.setTo(_toNss); + sendCommandToParticipants(opCtx, + _nss.db(), + ShardsvrRenameCollectionUnblockParticipant::kCommandName, + unblockParticipantRequest.toBSON({})); + + { + // Clear source critical section + AutoGetCollection sourceCollLock(opCtx, _nss, MODE_X); + auto* const fromCsr = CollectionShardingRuntime::get(opCtx, _nss); + fromCsr->exitCriticalSection(opCtx); + fromCsr->clearFilteringMetadata(opCtx); + } + + { + // Clear target critical section + AutoGetCollection targetCollLock(opCtx, _toNss, MODE_X); + auto* const toCsr = CollectionShardingRuntime::get(opCtx, _toNss); + toCsr->exitCriticalSection(opCtx); + toCsr->clearFilteringMetadata(opCtx); + } + + auto catalog = Grid::get(opCtx)->catalogCache(); + auto cm = uassertStatusOK(catalog->getCollectionRoutingInfoWithRefresh(opCtx, _toNss)); + _response.emplaceValue(RenameCollectionResponse(cm.getVersion())); + }) + .onError([this, anchor = shared_from_this()](const Status& status) { + LOGV2_ERROR(5438700, + "Error running rename collection", + "namespace"_attr = _nss, + "error"_attr = redact(status)); + return status; + }) + .semi(); +} + + +} // namespace mongo diff --git a/src/mongo/db/s/rename_collection_coordinator.h b/src/mongo/db/s/rename_collection_coordinator.h new file mode 100644 index 00000000000..ceffde4bb19 --- /dev/null +++ b/src/mongo/db/s/rename_collection_coordinator.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/s/sharding_ddl_coordinator.h" +#include "mongo/s/request_types/sharded_ddl_commands_gen.h" + +namespace mongo { + +class RenameCollectionCoordinator final + : public ShardingDDLCoordinator, + public std::enable_shared_from_this<RenameCollectionCoordinator> { +public: + RenameCollectionCoordinator(OperationContext* opCtx, + const NamespaceString& fromNss, + const NamespaceString& toNss, + bool dropTarget, + bool stayTemp); + + SharedSemiFuture<RenameCollectionResponse> getResponseFuture() { + return _response.getFuture(); + } + +private: + SemiFuture<void> runImpl(std::shared_ptr<executor::TaskExecutor> executor) override; + + ServiceContext* _serviceContext; + NamespaceString _toNss; + bool _dropTarget; + bool _stayTemp; + + SharedPromise<RenameCollectionResponse> _response; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/shardsvr_rename_collection_command.cpp b/src/mongo/db/s/shardsvr_rename_collection_command.cpp index d0e098fcde0..352ac2414af 100644 --- a/src/mongo/db/s/shardsvr_rename_collection_command.cpp +++ b/src/mongo/db/s/shardsvr_rename_collection_command.cpp @@ -35,14 +35,10 @@ #include "mongo/db/catalog/rename_collection.h" #include "mongo/db/commands.h" #include "mongo/db/db_raii.h" -#include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/dist_lock_manager.h" -#include "mongo/db/s/shard_metadata_util.h" -#include "mongo/db/s/sharding_ddl_util.h" +#include "mongo/db/s/rename_collection_coordinator.h" #include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" -#include "mongo/s/catalog/type_tags.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/sharded_ddl_commands_gen.h" @@ -86,122 +82,6 @@ RenameCollectionResponse renameUnshardedCollection(OperationContext* opCtx, return RenameCollectionResponse(ChunkVersion::UNSHARDED()); } -void sendCommandToParticipants(OperationContext* opCtx, - StringData db, - StringData cmdName, - const BSONObj& cmd) { - const auto selfShardId = ShardingState::get(opCtx)->shardId(); - auto shardRegistry = Grid::get(opCtx)->shardRegistry(); - const auto allShardIds = shardRegistry->getAllShardIds(opCtx); - - for (const auto& shardId : allShardIds) { - if (shardId == selfShardId) { - continue; - } - - auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId)); - const auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - db.toString(), - CommandHelpers::appendMajorityWriteConcern(cmd), - Shard::RetryPolicy::kNoRetry)); - uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(cmdResponse), - str::stream() << "Error processing " << cmdName << " on shard " - << shardId); - } -} - -RenameCollectionResponse renameShardedCollection(OperationContext* opCtx, - const ShardsvrRenameCollection& request, - const NamespaceString& fromNss) { - const auto toNss = request.getTo(); - - uassert(ErrorCodes::CommandFailed, - "Source and destination collections must be on the same database.", - fromNss.db() == toNss.db()); - - auto distLockManager = DistLockManager::get(opCtx->getServiceContext()); - const auto dbDistLock = uassertStatusOK(distLockManager->lock( - opCtx, fromNss.db(), "RenameCollection", DistLockManager::kDefaultLockTimeout)); - const auto fromCollDistLock = uassertStatusOK(distLockManager->lock( - opCtx, fromNss.ns(), "RenameCollection", DistLockManager::kDefaultLockTimeout)); - const auto toCollDistLock = uassertStatusOK(distLockManager->lock( - opCtx, toNss.ns(), "RenameCollection", DistLockManager::kDefaultLockTimeout)); - - RenameCollectionOptions options{request.getDropTarget(), request.getStayTemp()}; - - sharding_ddl_util::checkShardedRenamePreconditions(opCtx, toNss, options.dropTarget); - - { - // Take the source collection critical section - AutoGetCollection sourceCollLock(opCtx, fromNss, MODE_X); - auto* const fromCsr = CollectionShardingRuntime::get(opCtx, fromNss); - auto fromCsrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, fromCsr); - fromCsr->enterCriticalSectionCatchUpPhase(fromCsrLock); - fromCsr->enterCriticalSectionCommitPhase(fromCsrLock); - } - - { - // Take the destination collection critical section - AutoGetCollection targetCollLock(opCtx, toNss, MODE_X); - auto* const toCsr = CollectionShardingRuntime::get(opCtx, toNss); - auto toCsrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, toCsr); - if (!toCsr->getCurrentMetadataIfKnown()) { - // Setting metadata to UNSHARDED (can't be UNKNOWN when taking the critical section) - toCsr->setFilteringMetadata(opCtx, CollectionMetadata()); - } - toCsr->enterCriticalSectionCatchUpPhase(toCsrLock); - toCsr->enterCriticalSectionCommitPhase(toCsrLock); - } - - // Rename the collection locally and clear the cache - validateAndRunRenameCollection(opCtx, fromNss, toNss, options); - uassertStatusOK(shardmetadatautil::dropChunksAndDeleteCollectionsEntry(opCtx, fromNss)); - - // Rename the collection locally on all other shards - ShardsvrRenameCollectionParticipant renameCollParticipantRequest(fromNss); - renameCollParticipantRequest.setDbName(fromNss.db()); - renameCollParticipantRequest.setDropTarget(request.getDropTarget()); - renameCollParticipantRequest.setStayTemp(request.getStayTemp()); - renameCollParticipantRequest.setTo(request.getTo()); - sendCommandToParticipants(opCtx, - fromNss.db(), - ShardsvrRenameCollectionParticipant::kCommandName, - renameCollParticipantRequest.toBSON({})); - - sharding_ddl_util::shardedRenameMetadata(opCtx, fromNss, toNss); - - // Unblock participants for r/w on source and destination collections - ShardsvrRenameCollectionUnblockParticipant unblockParticipantRequest(fromNss); - unblockParticipantRequest.setDbName(fromNss.db()); - unblockParticipantRequest.setTo(toNss); - sendCommandToParticipants(opCtx, - fromNss.db(), - ShardsvrRenameCollectionUnblockParticipant::kCommandName, - unblockParticipantRequest.toBSON({})); - - { - // Clear source critical section - AutoGetCollection sourceCollLock(opCtx, fromNss, MODE_X); - auto* const fromCsr = CollectionShardingRuntime::get(opCtx, fromNss); - fromCsr->exitCriticalSection(opCtx); - fromCsr->clearFilteringMetadata(opCtx); - } - - { - // Clear target critical section - AutoGetCollection targetCollLock(opCtx, toNss, MODE_X); - auto* const toCsr = CollectionShardingRuntime::get(opCtx, toNss); - toCsr->exitCriticalSection(opCtx); - toCsr->clearFilteringMetadata(opCtx); - } - - auto catalog = Grid::get(opCtx)->catalogCache(); - auto cm = uassertStatusOK(catalog->getCollectionRoutingInfoWithRefresh(opCtx, toNss)); - return RenameCollectionResponse(cm.getVersion()); -} - class ShardsvrRenameCollectionCommand final : public TypedCommand<ShardsvrRenameCollectionCommand> { public: using Request = ShardsvrRenameCollection; @@ -249,7 +129,10 @@ public: << opCtx->getWriteConcern().wMode, opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority); - return renameShardedCollection(opCtx, req, fromNss); + auto renameCollectionCoordinator = std::make_shared<RenameCollectionCoordinator>( + opCtx, ns(), req.getTo(), req.getDropTarget(), req.getStayTemp()); + renameCollectionCoordinator->run(opCtx).get(); + return renameCollectionCoordinator->getResponseFuture().get(); } private: |