summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPierlauro Sciarelli <pierlauro.sciarelli@mongodb.com>2021-02-08 16:56:13 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-09 12:08:13 +0000
commit5ad5c0409eace36efc15cfaa2d8d2936a09639b1 (patch)
treecb698a8f3383e2515f81d35105c08c42b1117bd5
parent29a838f9193ba88c7ba8195fd44eb252a80be5f8 (diff)
downloadmongo-5ad5c0409eace36efc15cfaa2d8d2936a09639b1.tar.gz
SERVER-54387 Create renameCollection DDL coordinator
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/rename_collection_coordinator.cpp200
-rw-r--r--src/mongo/db/s/rename_collection_coordinator.h62
-rw-r--r--src/mongo/db/s/shardsvr_rename_collection_command.cpp127
4 files changed, 268 insertions, 122 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index e2dd0f71059..7b7cef23bcc 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -341,6 +341,7 @@ env.Library(
'migration_destination_manager_legacy_commands.cpp',
'move_chunk_command.cpp',
'move_primary_command.cpp',
+ 'rename_collection_coordinator.cpp',
'set_shard_version_command.cpp',
'sharding_ddl_coordinator.cpp',
'sharding_server_status.cpp',
diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp
new file mode 100644
index 00000000000..ee8a1bceabb
--- /dev/null
+++ b/src/mongo/db/s/rename_collection_coordinator.cpp
@@ -0,0 +1,200 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/db/s/rename_collection_coordinator.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/rename_collection.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/dist_lock_manager.h"
+#include "mongo/db/s/shard_metadata_util.h"
+#include "mongo/db/s/sharding_ddl_util.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
+#include "mongo/s/sharded_collections_ddl_parameters_gen.h"
+
+namespace mongo {
+namespace {
+
+void sendCommandToParticipants(OperationContext* opCtx,
+ StringData db,
+ StringData cmdName,
+ const BSONObj& cmd) {
+ const auto selfShardId = ShardingState::get(opCtx)->shardId();
+ auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+ const auto allShardIds = shardRegistry->getAllShardIds(opCtx);
+
+ for (const auto& shardId : allShardIds) {
+ if (shardId == selfShardId) {
+ continue;
+ }
+
+ auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId));
+ const auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ db.toString(),
+ CommandHelpers::appendMajorityWriteConcern(cmd),
+ Shard::RetryPolicy::kNoRetry));
+ uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(cmdResponse),
+ str::stream()
+ << "Error processing " << cmdName << " on shard" << shardId);
+ }
+}
+
+} // namespace
+
+RenameCollectionCoordinator::RenameCollectionCoordinator(OperationContext* opCtx,
+ const NamespaceString& _nss,
+ const NamespaceString& toNss,
+ bool dropTarget,
+ bool stayTemp)
+ : ShardingDDLCoordinator(opCtx, _nss),
+ _serviceContext(opCtx->getServiceContext()),
+ _toNss(toNss),
+ _dropTarget(dropTarget),
+ _stayTemp(stayTemp){};
+
+SemiFuture<void> RenameCollectionCoordinator::runImpl(
+ std::shared_ptr<executor::TaskExecutor> executor) {
+ return ExecutorFuture<void>(executor, Status::OK())
+ .then([this, anchor = shared_from_this()]() {
+ ThreadClient tc{"RenameCollectionCoordinator", _serviceContext};
+ auto opCtxHolder = tc->makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ _forwardableOpMetadata.setOn(opCtx);
+
+ uassert(ErrorCodes::CommandFailed,
+ "Source and destination collections must be on the same database.",
+ _nss.db() == _toNss.db());
+
+ auto distLockManager = DistLockManager::get(opCtx->getServiceContext());
+ const auto dbDistLock = uassertStatusOK(distLockManager->lock(
+ opCtx, _nss.db(), "RenameCollection", DistLockManager::kDefaultLockTimeout));
+ const auto fromCollDistLock = uassertStatusOK(distLockManager->lock(
+ opCtx, _nss.ns(), "RenameCollection", DistLockManager::kDefaultLockTimeout));
+ const auto toCollDistLock = uassertStatusOK(distLockManager->lock(
+ opCtx, _toNss.ns(), "RenameCollection", DistLockManager::kDefaultLockTimeout));
+
+ RenameCollectionOptions options{_dropTarget, _stayTemp};
+
+ sharding_ddl_util::checkShardedRenamePreconditions(opCtx, _toNss, options.dropTarget);
+
+ {
+ // Take the source collection critical section
+ AutoGetCollection sourceCollLock(opCtx, _nss, MODE_X);
+ auto* const fromCsr = CollectionShardingRuntime::get(opCtx, _nss);
+ auto fromCsrLock =
+ CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, fromCsr);
+ fromCsr->enterCriticalSectionCatchUpPhase(fromCsrLock);
+ fromCsr->enterCriticalSectionCommitPhase(fromCsrLock);
+ }
+
+ {
+ // Take the destination collection critical section
+ AutoGetCollection targetCollLock(opCtx, _toNss, MODE_X);
+ auto* const toCsr = CollectionShardingRuntime::get(opCtx, _toNss);
+ auto toCsrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, toCsr);
+ if (!toCsr->getCurrentMetadataIfKnown()) {
+ // Setting metadata to UNSHARDED (can't be UNKNOWN when taking the critical
+ // section)
+ toCsr->setFilteringMetadata(opCtx, CollectionMetadata());
+ }
+ toCsr->enterCriticalSectionCatchUpPhase(toCsrLock);
+ toCsr->enterCriticalSectionCommitPhase(toCsrLock);
+ }
+
+ // Rename the collection locally and clear the cache
+ validateAndRunRenameCollection(opCtx, _nss, _toNss, options);
+ uassertStatusOK(shardmetadatautil::dropChunksAndDeleteCollectionsEntry(opCtx, _nss));
+
+ // Rename the collection locally on all other shards
+ ShardsvrRenameCollectionParticipant renameCollParticipantRequest(_nss);
+ renameCollParticipantRequest.setDbName(_nss.db());
+ renameCollParticipantRequest.setDropTarget(_dropTarget);
+ renameCollParticipantRequest.setStayTemp(_stayTemp);
+ renameCollParticipantRequest.setTo(_toNss);
+ sendCommandToParticipants(opCtx,
+ _nss.db(),
+ ShardsvrRenameCollectionParticipant::kCommandName,
+ renameCollParticipantRequest.toBSON({}));
+
+ sharding_ddl_util::shardedRenameMetadata(opCtx, _nss, _toNss);
+
+ // Unblock participants for r/w on source and destination collections
+ ShardsvrRenameCollectionUnblockParticipant unblockParticipantRequest(_nss);
+ unblockParticipantRequest.setDbName(_nss.db());
+ unblockParticipantRequest.setTo(_toNss);
+ sendCommandToParticipants(opCtx,
+ _nss.db(),
+ ShardsvrRenameCollectionUnblockParticipant::kCommandName,
+ unblockParticipantRequest.toBSON({}));
+
+ {
+ // Clear source critical section
+ AutoGetCollection sourceCollLock(opCtx, _nss, MODE_X);
+ auto* const fromCsr = CollectionShardingRuntime::get(opCtx, _nss);
+ fromCsr->exitCriticalSection(opCtx);
+ fromCsr->clearFilteringMetadata(opCtx);
+ }
+
+ {
+ // Clear target critical section
+ AutoGetCollection targetCollLock(opCtx, _toNss, MODE_X);
+ auto* const toCsr = CollectionShardingRuntime::get(opCtx, _toNss);
+ toCsr->exitCriticalSection(opCtx);
+ toCsr->clearFilteringMetadata(opCtx);
+ }
+
+ auto catalog = Grid::get(opCtx)->catalogCache();
+ auto cm = uassertStatusOK(catalog->getCollectionRoutingInfoWithRefresh(opCtx, _toNss));
+ _response.emplaceValue(RenameCollectionResponse(cm.getVersion()));
+ })
+ .onError([this, anchor = shared_from_this()](const Status& status) {
+ LOGV2_ERROR(5438700,
+ "Error running rename collection",
+ "namespace"_attr = _nss,
+ "error"_attr = redact(status));
+ return status;
+ })
+ .semi();
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/s/rename_collection_coordinator.h b/src/mongo/db/s/rename_collection_coordinator.h
new file mode 100644
index 00000000000..ceffde4bb19
--- /dev/null
+++ b/src/mongo/db/s/rename_collection_coordinator.h
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/s/sharding_ddl_coordinator.h"
+#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
+
+namespace mongo {
+
+class RenameCollectionCoordinator final
+ : public ShardingDDLCoordinator,
+ public std::enable_shared_from_this<RenameCollectionCoordinator> {
+public:
+ RenameCollectionCoordinator(OperationContext* opCtx,
+ const NamespaceString& fromNss,
+ const NamespaceString& toNss,
+ bool dropTarget,
+ bool stayTemp);
+
+ SharedSemiFuture<RenameCollectionResponse> getResponseFuture() {
+ return _response.getFuture();
+ }
+
+private:
+ SemiFuture<void> runImpl(std::shared_ptr<executor::TaskExecutor> executor) override;
+
+ ServiceContext* _serviceContext;
+ NamespaceString _toNss;
+ bool _dropTarget;
+ bool _stayTemp;
+
+ SharedPromise<RenameCollectionResponse> _response;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/shardsvr_rename_collection_command.cpp b/src/mongo/db/s/shardsvr_rename_collection_command.cpp
index d0e098fcde0..352ac2414af 100644
--- a/src/mongo/db/s/shardsvr_rename_collection_command.cpp
+++ b/src/mongo/db/s/shardsvr_rename_collection_command.cpp
@@ -35,14 +35,10 @@
#include "mongo/db/catalog/rename_collection.h"
#include "mongo/db/commands.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/s/dist_lock_manager.h"
-#include "mongo/db/s/shard_metadata_util.h"
-#include "mongo/db/s/sharding_ddl_util.h"
+#include "mongo/db/s/rename_collection_coordinator.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/logv2/log.h"
-#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
@@ -86,122 +82,6 @@ RenameCollectionResponse renameUnshardedCollection(OperationContext* opCtx,
return RenameCollectionResponse(ChunkVersion::UNSHARDED());
}
-void sendCommandToParticipants(OperationContext* opCtx,
- StringData db,
- StringData cmdName,
- const BSONObj& cmd) {
- const auto selfShardId = ShardingState::get(opCtx)->shardId();
- auto shardRegistry = Grid::get(opCtx)->shardRegistry();
- const auto allShardIds = shardRegistry->getAllShardIds(opCtx);
-
- for (const auto& shardId : allShardIds) {
- if (shardId == selfShardId) {
- continue;
- }
-
- auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId));
- const auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- db.toString(),
- CommandHelpers::appendMajorityWriteConcern(cmd),
- Shard::RetryPolicy::kNoRetry));
- uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(cmdResponse),
- str::stream() << "Error processing " << cmdName << " on shard "
- << shardId);
- }
-}
-
-RenameCollectionResponse renameShardedCollection(OperationContext* opCtx,
- const ShardsvrRenameCollection& request,
- const NamespaceString& fromNss) {
- const auto toNss = request.getTo();
-
- uassert(ErrorCodes::CommandFailed,
- "Source and destination collections must be on the same database.",
- fromNss.db() == toNss.db());
-
- auto distLockManager = DistLockManager::get(opCtx->getServiceContext());
- const auto dbDistLock = uassertStatusOK(distLockManager->lock(
- opCtx, fromNss.db(), "RenameCollection", DistLockManager::kDefaultLockTimeout));
- const auto fromCollDistLock = uassertStatusOK(distLockManager->lock(
- opCtx, fromNss.ns(), "RenameCollection", DistLockManager::kDefaultLockTimeout));
- const auto toCollDistLock = uassertStatusOK(distLockManager->lock(
- opCtx, toNss.ns(), "RenameCollection", DistLockManager::kDefaultLockTimeout));
-
- RenameCollectionOptions options{request.getDropTarget(), request.getStayTemp()};
-
- sharding_ddl_util::checkShardedRenamePreconditions(opCtx, toNss, options.dropTarget);
-
- {
- // Take the source collection critical section
- AutoGetCollection sourceCollLock(opCtx, fromNss, MODE_X);
- auto* const fromCsr = CollectionShardingRuntime::get(opCtx, fromNss);
- auto fromCsrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, fromCsr);
- fromCsr->enterCriticalSectionCatchUpPhase(fromCsrLock);
- fromCsr->enterCriticalSectionCommitPhase(fromCsrLock);
- }
-
- {
- // Take the destination collection critical section
- AutoGetCollection targetCollLock(opCtx, toNss, MODE_X);
- auto* const toCsr = CollectionShardingRuntime::get(opCtx, toNss);
- auto toCsrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, toCsr);
- if (!toCsr->getCurrentMetadataIfKnown()) {
- // Setting metadata to UNSHARDED (can't be UNKNOWN when taking the critical section)
- toCsr->setFilteringMetadata(opCtx, CollectionMetadata());
- }
- toCsr->enterCriticalSectionCatchUpPhase(toCsrLock);
- toCsr->enterCriticalSectionCommitPhase(toCsrLock);
- }
-
- // Rename the collection locally and clear the cache
- validateAndRunRenameCollection(opCtx, fromNss, toNss, options);
- uassertStatusOK(shardmetadatautil::dropChunksAndDeleteCollectionsEntry(opCtx, fromNss));
-
- // Rename the collection locally on all other shards
- ShardsvrRenameCollectionParticipant renameCollParticipantRequest(fromNss);
- renameCollParticipantRequest.setDbName(fromNss.db());
- renameCollParticipantRequest.setDropTarget(request.getDropTarget());
- renameCollParticipantRequest.setStayTemp(request.getStayTemp());
- renameCollParticipantRequest.setTo(request.getTo());
- sendCommandToParticipants(opCtx,
- fromNss.db(),
- ShardsvrRenameCollectionParticipant::kCommandName,
- renameCollParticipantRequest.toBSON({}));
-
- sharding_ddl_util::shardedRenameMetadata(opCtx, fromNss, toNss);
-
- // Unblock participants for r/w on source and destination collections
- ShardsvrRenameCollectionUnblockParticipant unblockParticipantRequest(fromNss);
- unblockParticipantRequest.setDbName(fromNss.db());
- unblockParticipantRequest.setTo(toNss);
- sendCommandToParticipants(opCtx,
- fromNss.db(),
- ShardsvrRenameCollectionUnblockParticipant::kCommandName,
- unblockParticipantRequest.toBSON({}));
-
- {
- // Clear source critical section
- AutoGetCollection sourceCollLock(opCtx, fromNss, MODE_X);
- auto* const fromCsr = CollectionShardingRuntime::get(opCtx, fromNss);
- fromCsr->exitCriticalSection(opCtx);
- fromCsr->clearFilteringMetadata(opCtx);
- }
-
- {
- // Clear target critical section
- AutoGetCollection targetCollLock(opCtx, toNss, MODE_X);
- auto* const toCsr = CollectionShardingRuntime::get(opCtx, toNss);
- toCsr->exitCriticalSection(opCtx);
- toCsr->clearFilteringMetadata(opCtx);
- }
-
- auto catalog = Grid::get(opCtx)->catalogCache();
- auto cm = uassertStatusOK(catalog->getCollectionRoutingInfoWithRefresh(opCtx, toNss));
- return RenameCollectionResponse(cm.getVersion());
-}
-
class ShardsvrRenameCollectionCommand final : public TypedCommand<ShardsvrRenameCollectionCommand> {
public:
using Request = ShardsvrRenameCollection;
@@ -249,7 +129,10 @@ public:
<< opCtx->getWriteConcern().wMode,
opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority);
- return renameShardedCollection(opCtx, req, fromNss);
+ auto renameCollectionCoordinator = std::make_shared<RenameCollectionCoordinator>(
+ opCtx, ns(), req.getTo(), req.getDropTarget(), req.getStayTemp());
+ renameCollectionCoordinator->run(opCtx).get();
+ return renameCollectionCoordinator->getResponseFuture().get();
}
private: