diff options
author | Marcos José Grillo Ramirez <marcos.grillo@mongodb.com> | 2022-08-02 13:34:49 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-02 14:39:54 +0000 |
commit | b7cebdc9e7ea7b1c9be1bbfa8a9b52689c0bb450 (patch) | |
tree | a3082cbea1bfcb2a59b84c50ef892b059369da37 /src/mongo/db | |
parent | ee648d59acd37f7a177e1f248359dc7472a6ca4e (diff) | |
download | mongo-b7cebdc9e7ea7b1c9be1bbfa8a9b52689c0bb450.tar.gz |
SERVER-67119 Add sharding catalog global index API to create/remove a global index
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands/set_feature_compatibility_version_command.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 9 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_commit_index_command.cpp | 208 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp | 197 | ||||
-rw-r--r-- | src/mongo/db/s/sharded_index_catalog_commands.idl | 136 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_index_catalog_util.cpp | 274 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_index_catalog_util.h | 77 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_util.cpp | 61 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_util.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_commit_index_participant_command.cpp | 208 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_drop_index_catalog_entry_participant_command.cpp | 196 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_index_catalog_test_commands.cpp | 209 |
15 files changed, 1625 insertions, 28 deletions
diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp index 258885d46e2..3b923d50b46 100644 --- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp +++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp @@ -82,6 +82,7 @@ #include "mongo/idl/cluster_server_parameter_gen.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/catalog/type_index_catalog_gen.h" #include "mongo/s/sharding_feature_flags_gen.h" #include "mongo/stdx/unordered_set.h" #include "mongo/util/exit.h" @@ -456,6 +457,9 @@ private: // TODO SERVER-67392: Remove once FCV 7.0 becomes last-lts. if (feature_flags::gGlobalIndexesShardingCatalog.isEnabledOnVersion(requestedVersion)) { uassertStatusOK(sharding_util::createGlobalIndexesIndexes(opCtx)); + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + uassertStatusOK(sharding_util::createShardCollectionCatalogIndexes(opCtx)); + } } hangWhileUpgrading.pauseWhileSet(opCtx); @@ -569,12 +573,14 @@ private: } // TODO SERVER-67392: Remove when 7.0 branches-out. + // Coordinators that commits indexes to the csrs must be drained before this point. Older + // FCV's must not find cluster-wide indexes. if (requestedVersion == GenericFCV::kLastLTS) { NamespaceString indexCatalogNss; if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { indexCatalogNss = NamespaceString::kConfigsvrIndexCatalogNamespace; } else { - indexCatalogNss = NamespaceString::kShardsIndexCatalogNamespace; + indexCatalogNss = NamespaceString::kShardIndexCatalogNamespace; } LOGV2(6280502, "Droping global indexes collection", "nss"_attr = indexCatalogNss); DropReply dropReply; @@ -588,6 +594,38 @@ private: << causedBy(deletionStatus.reason()), deletionStatus.isOK() || deletionStatus.code() == ErrorCodes::NamespaceNotFound); + + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + LOGV2(6711905, + "Droping collection catalog collection", + "nss"_attr = NamespaceString::kShardCollectionCatalogNamespace); + const auto dropStatus = + dropCollection(opCtx, + NamespaceString::kShardCollectionCatalogNamespace, + &dropReply, + DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); + uassert(dropStatus.code(), + str::stream() << "Failed to drop " + << NamespaceString::kShardCollectionCatalogNamespace + << causedBy(dropStatus.reason()), + dropStatus.isOK() || dropStatus.code() == ErrorCodes::NamespaceNotFound); + } else { + LOGV2(6711906, + "Unset index version field in config.collections", + "nss"_attr = CollectionType::ConfigNS); + DBDirectClient client(opCtx); + write_ops::UpdateCommandRequest update(CollectionType::ConfigNS); + update.setUpdates({[&]() { + write_ops::UpdateOpEntry entry; + entry.setQ( + BSON(CollectionType::kIndexVersionFieldName << BSON("$exists" << true))); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( + BSON("$unset" << BSON(CollectionType::kIndexVersionFieldName << true)))); + entry.setMulti(true); + return entry; + }()}); + client.update(update); + } } uassert(ErrorCodes::Error(549181), diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index af904acf05c..4659cf66abc 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -173,8 +173,11 @@ const NamespaceString NamespaceString::kConfigsvrShardsNamespace(NamespaceString const NamespaceString NamespaceString::kConfigsvrIndexCatalogNamespace(NamespaceString::kConfigDb, "csrs.indexes"); -const NamespaceString NamespaceString::kShardsIndexCatalogNamespace(NamespaceString::kConfigDb, - "shard.indexes"); +const NamespaceString NamespaceString::kShardIndexCatalogNamespace(NamespaceString::kConfigDb, + "shard.indexes"); + +const NamespaceString NamespaceString::kShardCollectionCatalogNamespace(NamespaceString::kConfigDb, + "shard.collections"); NamespaceString NamespaceString::parseFromStringExpectTenantIdInMultitenancyMode(StringData ns) { if (!gMultitenancySupport) { diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index f1042167910..96ffab41641 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -234,7 +234,10 @@ public: static const NamespaceString kConfigsvrIndexCatalogNamespace; // Namespace used for storing the index catalog on the shards. - static const NamespaceString kShardsIndexCatalogNamespace; + static const NamespaceString kShardIndexCatalogNamespace; + + // Namespace used for storing the collection catalog on the shards. + static const NamespaceString kShardCollectionCatalogNamespace; /** * Constructs an empty NamespaceString. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 85c2b0d63d3..346e6fcb8bb 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -995,7 +995,26 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook 6280501, indexStatus.withContext(str::stream() << "Failed to create index on " - << NamespaceString::kShardsIndexCatalogNamespace + << NamespaceString::kShardIndexCatalogNamespace + << " on shard's first transition to primary")); + } + + // Create indexes in config.shard.collections if needed. + indexStatus = sharding_util::createShardCollectionCatalogIndexes(opCtx); + if (!indexStatus.isOK()) { + // If the node is shutting down or it lost quorum just as it was becoming primary, + // don't run the sharding onStepUp machinery. The onStepDown counterpart to these + // methods is already idempotent, so the machinery will remain in the stepped down + // state. + if (ErrorCodes::isShutdownError(indexStatus.code()) || + ErrorCodes::isNotPrimaryError(indexStatus.code())) { + return; + } + fassertFailedWithStatus( + 6711907, + indexStatus.withContext(str::stream() + << "Failed to create index on " + << NamespaceString::kShardCollectionCatalogNamespace << " on shard's first transition to primary")); } } else { // unsharded diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 401fbb2cb6b..48939352636 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -287,9 +287,12 @@ env.Library( 'dist_lock_catalog.cpp', 'dist_lock_manager_replset.cpp', 'dist_lock_manager.cpp', + 'participant_block.idl', 'remove_tags.idl', + 'sharded_index_catalog_commands.idl', 'sharding_config_server_parameters.idl', 'sharding_ddl_util.cpp', + 'sharding_index_catalog_util.cpp', 'sharding_util.cpp', 'split_chunk_request_type.cpp', 'type_lockpings.cpp', @@ -362,12 +365,14 @@ env.Library( 'config/configsvr_clear_jumbo_flag_command.cpp', 'config/configsvr_collmod_command.cpp', 'config/configsvr_commit_chunk_migration_command.cpp', + 'config/configsvr_commit_index_command.cpp', 'config/configsvr_commit_reshard_collection_command.cpp', 'config/configsvr_configure_collection_balancing.cpp', 'config/configsvr_control_balancer_command.cpp', 'config/configsvr_coordinator_service.cpp', 'config/configsvr_coordinator.cpp', 'config/configsvr_coordinator.idl', + 'config/configsvr_drop_index_catalog_command.cpp', 'config/configsvr_create_database_command.cpp', 'config/configsvr_ensure_chunk_version_is_greater_than_command.cpp', 'config/configsvr_merge_chunks_command.cpp', @@ -406,7 +411,6 @@ env.Library( 'migration_destination_manager_legacy_commands.cpp', 'move_primary_coordinator_document.idl', 'move_primary_coordinator.cpp', - 'participant_block.idl', 'refine_collection_shard_key_coordinator_document.idl', 'refine_collection_shard_key_coordinator.cpp', 'remove_chunks.idl', @@ -430,6 +434,8 @@ env.Library( 'shardsvr_cleanup_reshard_collection_command.cpp', 'shardsvr_collmod_command.cpp', 'shardsvr_collmod_participant_command.cpp', + 'shardsvr_index_catalog_test_commands.cpp', + 'shardsvr_commit_index_participant_command.cpp', 'shardsvr_commit_reshard_collection_command.cpp', 'shardsvr_compact_structured_encryption_data_command.cpp', 'shardsvr_create_collection_command.cpp', @@ -440,6 +446,7 @@ env.Library( 'shardsvr_drop_database_command.cpp', 'shardsvr_drop_database_participant_command.cpp', 'shardsvr_drop_indexes_command.cpp', + 'shardsvr_drop_index_catalog_entry_participant_command.cpp', 'shardsvr_get_stats_for_balancing_command.cpp', 'shardsvr_join_migrations_command.cpp', 'shardsvr_merge_chunks_command.cpp', diff --git a/src/mongo/db/s/config/configsvr_commit_index_command.cpp b/src/mongo/db/s/config/configsvr_commit_index_command.cpp new file mode 100644 index 00000000000..ac42d9f9b37 --- /dev/null +++ b/src/mongo/db/s/config/configsvr_commit_index_command.cpp @@ -0,0 +1,208 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/commands/feature_compatibility_version.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/s/sharded_index_catalog_commands_gen.h" +#include "mongo/db/s/sharding_index_catalog_util.h" +#include "mongo/db/transaction/transaction_api.h" +#include "mongo/db/transaction/transaction_participant.h" +#include "mongo/db/transaction/transaction_participant_resource_yielder.h" +#include "mongo/logv2/log.h" +#include "mongo/s/grid.h" +#include "mongo/s/sharding_feature_flags_gen.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + + +namespace mongo { +namespace { + +/** + * Insert an index in the local catalog and bumps the indexVersion in the collections collection + * transactionally. + */ +void commitIndexInTransaction(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + const NamespaceString& userCollectionNss, + const std::string& name, + const BSONObj& keyPattern, + const BSONObj& options, + const UUID& collectionUUID, + const Timestamp& lastmod, + const boost::optional<UUID>& indexCollectionUUID) { + IndexCatalogType indexCatalogEntry(name, keyPattern, options, lastmod, collectionUUID); + indexCatalogEntry.setIndexCollectionUUID(indexCollectionUUID); + + // TODO SERVER-66261: remove the usage of shared_ptr once the executor is inlined, so the + // variable will never be out of scope. + auto upsertIndexOp = std::make_shared<write_ops::UpdateCommandRequest>( + NamespaceString::kConfigsvrIndexCatalogNamespace); + upsertIndexOp->setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON(IndexCatalogType::kCollectionUUIDFieldName + << collectionUUID << IndexCatalogType::kNameFieldName << name)); + entry.setU( + write_ops::UpdateModification::parseFromClassicUpdate(indexCatalogEntry.toBSON())); + entry.setUpsert(true); + entry.setMulti(false); + return entry; + }()}); + + auto updateCollectionOp = + std::make_shared<write_ops::UpdateCommandRequest>(CollectionType::ConfigNS); + updateCollectionOp->setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON(CollectionType::kNssFieldName << userCollectionNss.ns() + << CollectionType::kUuidFieldName + << collectionUUID)); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( + BSON("$set" << BSON(CollectionType::kUuidFieldName + << collectionUUID << CollectionType::kIndexVersionFieldName + << lastmod)))); + entry.setUpsert(true); + entry.setMulti(false); + return entry; + }()}); + + txn_api::SyncTransactionWithRetries txn( + opCtx, executor, TransactionParticipantResourceYielder::make("commitIndexCatalogEntry")); + + txn.run(opCtx, + [updateCollectionOp, upsertIndexOp](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { + return txnClient.runCRUDOp(*upsertIndexOp, {0}) + .thenRunOn(txnExec) + .then([&txnClient, updateCollectionOp](auto upsertResponse) { + uassertStatusOK(upsertResponse.toStatus()); + return txnClient.runCRUDOp(*updateCollectionOp, {1}); + }) + .thenRunOn(txnExec) + .then([](auto updateResponse) { uassertStatusOK(updateResponse.toStatus()); }) + .semi(); + }); +} + + +class ConfigsvrCommitIndexCommand final : public TypedCommand<ConfigsvrCommitIndexCommand> { +public: + using Request = ConfigsvrCommitIndex; + + bool skipApiVersionCheck() const override { + // Internal command (server to server). + return true; + } + + std::string help() const override { + return "Internal command. Do not call directly. Commits a globlal index in the sharding " + "catalog."; + } + + bool adminOnly() const override { + return false; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool supportsRetryableWrite() const final { + return true; + } + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + uassert(ErrorCodes::CommandNotSupported, + format(FMT_STRING("{} command not enabled"), definition()->getName()), + feature_flags::gGlobalIndexesShardingCatalog.isEnabled( + serverGlobalParams.featureCompatibility)); + uassert( + ErrorCodes::IllegalOperation, + format(FMT_STRING("{} can only be run on config servers"), definition()->getName()), + serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + + CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName, + opCtx->getWriteConcern()); + + const auto txnParticipant = TransactionParticipant::get(opCtx); + uassert(6711908, + str::stream() << Request::kCommandName << " must be run as a retryable write", + txnParticipant); + + opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + + commitIndexInTransaction(opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + ns(), + request().getName().toString(), + request().getKeyPattern(), + request().getOptions(), + request().getCollectionUUID(), + request().getLastmod(), + request().getIndexCollectionUUID()); + + // Since no write that generated a retryable write oplog entry with this sessionId + // and txnNumber happened, we need to make a dummy write so that the session gets + // durably persisted on the oplog. This must be the last operation done on this + // command. + DBDirectClient client(opCtx); + client.update(NamespaceString::kServerConfigurationNamespace.ns(), + BSON("_id" << Request::kCommandName), + BSON("$inc" << BSON("count" << 1)), + true /* upsert */, + false /* multi */); + } + + private: + NamespaceString ns() const override { + return request().getCommandParameter(); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; + +} configsvrCommitIndexCommand; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp b/src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp new file mode 100644 index 00000000000..352f9e849c4 --- /dev/null +++ b/src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp @@ -0,0 +1,197 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/commands/feature_compatibility_version.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/s/sharded_index_catalog_commands_gen.h" +#include "mongo/db/s/sharding_index_catalog_util.h" +#include "mongo/db/transaction/transaction_api.h" +#include "mongo/db/transaction/transaction_participant.h" +#include "mongo/db/transaction/transaction_participant_resource_yielder.h" +#include "mongo/logv2/log.h" +#include "mongo/s/grid.h" +#include "mongo/s/request_types/sharded_ddl_commands_gen.h" +#include "mongo/s/sharding_feature_flags_gen.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + + +namespace mongo { +namespace { + +/** + * Drops an index from the local catalog and bumps the indexVersion in the collections collection + * transactionally. + */ +void dropIndexInTransaction(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + const NamespaceString& userCollectionNss, + const std::string& name, + const UUID& collectionUUID, + const Timestamp& lastmod) { + // TODO SERVER-66261: remove the usage of shared_ptr once the executor is inlined, so the + // variable will never be out of scope. + auto deleteOp = std::make_shared<write_ops::DeleteCommandRequest>( + NamespaceString::kConfigsvrIndexCatalogNamespace); + deleteOp->setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(BSON(IndexCatalogType::kCollectionUUIDFieldName + << collectionUUID << IndexCatalogType::kNameFieldName << name)); + entry.setMulti(false); + return entry; + }()}); + + auto updateCollectionOp = + std::make_shared<write_ops::UpdateCommandRequest>(CollectionType::ConfigNS); + updateCollectionOp->setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON(CollectionType::kNssFieldName << userCollectionNss.ns() + << CollectionType::kUuidFieldName + << collectionUUID)); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( + BSON("$set" << BSON(CollectionType::kUuidFieldName + << collectionUUID << CollectionType::kIndexVersionFieldName + << lastmod)))); + entry.setUpsert(true); + entry.setMulti(false); + return entry; + }()}); + + txn_api::SyncTransactionWithRetries txn( + opCtx, executor, TransactionParticipantResourceYielder::make("dropIndexCatalogEntry")); + + txn.run(opCtx, + [updateCollectionOp, deleteOp](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { + return txnClient.runCRUDOp(*deleteOp, {0}) + .thenRunOn(txnExec) + .then([&txnClient, updateCollectionOp](auto deleteResponse) { + uassertStatusOK(deleteResponse.toStatus()); + return txnClient.runCRUDOp(*updateCollectionOp, {1}); + }) + .thenRunOn(txnExec) + .then([](auto updateResponse) { uassertStatusOK(updateResponse.toStatus()); }) + .semi(); + }); +} + +class ConfigsvrDropIndexCatalogEntryCommand final + : public TypedCommand<ConfigsvrDropIndexCatalogEntryCommand> { +public: + using Request = ConfigsvrDropIndexCatalogEntry; + + bool skipApiVersionCheck() const override { + // Internal command (server to server). + return true; + } + + std::string help() const override { + return "Internal command. Do not call directly. Drops an index entry in the sharding " + "catalog."; + } + + bool adminOnly() const override { + return false; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool supportsRetryableWrite() const final { + return true; + } + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + uassert(ErrorCodes::CommandNotSupported, + format(FMT_STRING("{} command not enabled"), definition()->getName()), + feature_flags::gGlobalIndexesShardingCatalog.isEnabled( + serverGlobalParams.featureCompatibility)); + uassert( + ErrorCodes::IllegalOperation, + format(FMT_STRING("{} can only be run on config servers"), definition()->getName()), + serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + + CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName, + opCtx->getWriteConcern()); + + const auto txnParticipant = TransactionParticipant::get(opCtx); + uassert(6711909, + str::stream() << Request::kCommandName << " must be run as a retryable write", + txnParticipant); + + opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + + dropIndexInTransaction(opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + ns(), + request().getName().toString(), + request().getCollectionUUID(), + request().getLastmod()); + + // Since no write that generated a retryable write oplog entry with this sessionId + // and txnNumber happened, we need to make a dummy write so that the session gets + // durably persisted on the oplog. This must be the last operation done on this + // command. + DBDirectClient client(opCtx); + client.update(NamespaceString::kServerConfigurationNamespace.ns(), + BSON("_id" << Request::kCommandName), + BSON("$inc" << BSON("count" << 1)), + true /* upsert */, + false /* multi */); + } + + private: + NamespaceString ns() const override { + return request().getCommandParameter(); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; + +} configsvrDropIndexCatalogEntryCommand; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/sharded_index_catalog_commands.idl b/src/mongo/db/s/sharded_index_catalog_commands.idl new file mode 100644 index 00000000000..1010d2e6588 --- /dev/null +++ b/src/mongo/db/s/sharded_index_catalog_commands.idl @@ -0,0 +1,136 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# This file provides the neccesary commands to implement the sharded catalog index protocol. +# +# The protocol to register an index works the following way: +# 1. While the migrations are disallowed and the critical section is taken for the collection, +# a DDLCoordinator calls _shardsvrCommitIndexParticipant in all shards that contain data in the +# base collection. This will effectively write the catalog entry locally and will bump the index +# version in the collections collection. +# 2. The DDLCoordinator calls _configsvrCommitIndex to commit the index entry in the config server +# transactionally. +# 3. The critical sections can be freed in all shards and the migrations can be allowed again. +# +# To unregister an index, the process is the same, but, instead of using +# _shardsvrCommitIndexParticipant and _configsvrCommitIndex, we use +# _shardsvrDropIndexCatalogEntryParticipant and _configsvrDropIndexCatalogEntry respectively. +# +# Additionally in this file we have two functions to test the registering/unregistering process: +# _shardsvrRegisterIndex and _shardsvrUnregisterIndex, which will be used by tests while the +# DDLCoordinator is implemented. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + - "mongo/s/sharding_types.idl" + - "mongo/s/catalog/type_index_catalog.idl" + +structs: + UnregisterIndexCatalogRequest: + description: "Parameters sent to drop an index entry from the cluster catalog." + strict: false + fields: + collectionUUID: + type: uuid + description: "UUID of the user colection." + name: + type: string + description: "Index name." + lastmod: + type: timestamp + description: "Drop cluster time." + +commands: + _shardsvrRegisterIndex: + command_name: _shardsvrRegisterIndex + cpp_name: ShardsvrRegisterIndex + description: "Internal test command to register an index catalog entry in the cluster." + api_version: "" + namespace: type + type: namespacestring + strict: false + chained_structs: + IndexCatalogType: IndexCatalogType + + _shardsvrCommitIndexParticipant: + command_name: _shardsvrCommitIndexParticipant + cpp_name: ShardsvrCommitIndexParticipant + description: "Internal command to commit an index catalog entry locally in a shard." + namespace: type + type: namespacestring + api_version: "" + strict: false + chained_structs: + IndexCatalogType: IndexCatalogType + + _configsvrCommitIndex: + command_name: _configsvrCommitIndex + cpp_name: ConfigsvrCommitIndex + description: "Internal command to commit an index catalog entry locally in the config server." + namespace: type + type: namespacestring + api_version: "" + strict: false + chained_structs: + IndexCatalogType: IndexCatalogType + + _shardsvrUnregisterIndex: + command_name: _shardsvrUnregisterIndex + cpp_name: ShardsvrUnregisterIndex + description: "Internal test command to unregister an index catalog entry in the cluster." + api_version: "" + namespace: type + type: namespacestring + strict: false + chained_structs: + UnregisterIndexCatalogRequest: UnregisterIndexCatalogRequest + + _shardsvrDropIndexCatalogEntryParticipant: + command_name: _shardsvrDropIndexCatalogEntryParticipant + cpp_name: ShardsvrDropIndexCatalogEntryParticipant + description: "Internal command to drop an index catalog entry locally in a shard." + namespace: type + type: namespacestring + api_version: "" + strict: false + chained_structs: + UnregisterIndexCatalogRequest: UnregisterIndexCatalogRequest + + _configsvrDropIndexCatalogEntry: + command_name: _configsvrDropIndexCatalogEntry + cpp_name: ConfigsvrDropIndexCatalogEntry + description: "Internal command to drop an index catalog entry locally in the config server." + namespace: type + type: namespacestring + api_version: "" + strict: false + chained_structs: + UnregisterIndexCatalogRequest: UnregisterIndexCatalogRequest diff --git a/src/mongo/db/s/sharding_index_catalog_util.cpp b/src/mongo/db/s/sharding_index_catalog_util.cpp new file mode 100644 index 00000000000..367be78f684 --- /dev/null +++ b/src/mongo/db/s/sharding_index_catalog_util.cpp @@ -0,0 +1,274 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/db/s/sharding_index_catalog_util.h" + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/s/participant_block_gen.h" +#include "mongo/db/s/sharded_index_catalog_commands_gen.h" +#include "mongo/db/s/sharding_ddl_util.h" +#include "mongo/db/s/sharding_util.h" +#include "mongo/db/transaction/transaction_api.h" +#include "mongo/db/transaction/transaction_participant_resource_yielder.h" +#include "mongo/db/vector_clock.h" +#include "mongo/logv2/log.h" +#include "mongo/s/catalog/type_index_catalog_gen.h" +#include "mongo/s/grid.h" + +namespace mongo { +namespace sharding_index_catalog_util { + +namespace { + +void performNoopRetryableWriteForIndexCommit( + OperationContext* opCtx, + OperationSessionInfo& osi, + const std::set<ShardId>& shardIdSet, + const std::shared_ptr<executor::TaskExecutor>& executor) { + std::vector<ShardId> shardsAndConfigsvr{shardIdSet.begin(), shardIdSet.end()}; + shardsAndConfigsvr.push_back(Grid::get(opCtx)->shardRegistry()->getConfigShard()->getId()); + sharding_ddl_util::performNoopRetryableWriteOnShards(opCtx, shardsAndConfigsvr, osi, executor); + osi.setTxnNumber(++osi.getTxnNumber().get()); +} + +BSONObj getCriticalSectionReasonForIndexCommit(const NamespaceString& nss, + const std::string& name) { + return BSON("command" + << "commitIndexCatalogEntry" + << "nss" << nss.toString() << IndexCatalogType::kNameFieldName << name); +} + +/** + * Function with an stable vector of shardId's (meaning, migrations will be serialized with this + * function call) that should perform catalog updates. + */ +using IndexModificationCallback = unique_function<void(std::vector<ShardId>&)>; + +/** + * Helper function to generalize the index catalog modification protocol. With this function when + * callback is called, we have the following guarantees: + * + * 1. All migrations will be cancelled, will not be able to commit and will no new migration will + * start for userCollectionNss. + * 2. osi will contain a valid sessionID and transaction number, even after a stepdown. + * 3. There won't be any writes for userCollectionNss because the critical section will be taken + * cluster-wide. + * + * After the execution of this function, the migrations will be enabled again, unless the function + * failed due to a step-down. In which case, this function should be called again on stepUp after + * the node is in steady-state. osi will contain the latest txnNumber used. + * + * Any work done by callback must be resumable and idempotent. + */ +void coordinateIndexCatalogModificationAcrossCollectionShards( + OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + OperationSessionInfo& osi, + const NamespaceString& userCollectionNss, + const std::string& indexName, + const UUID& collectionUUID, + const bool firstExecution, + IndexModificationCallback callback) { + // Stop migrations so the cluster is in a steady state. + sharding_ddl_util::stopMigrations(opCtx, userCollectionNss, collectionUUID); + // Resume migrations no matter what. + ON_BLOCK_EXIT( + [&] { sharding_ddl_util::resumeMigrations(opCtx, userCollectionNss, collectionUUID); }); + + // Get an up to date shard distribution. + auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh( + opCtx, userCollectionNss)); + uassert(ErrorCodes::NamespaceNotSharded, + str::stream() << "collection " << userCollectionNss << " is not sharded", + routingInfo.isSharded()); + std::set<ShardId> shardIdsSet; + routingInfo.getAllShardIds(&shardIdsSet); + + if (!firstExecution) { + // If this is not the first execution (as in, there was a stepdown) advance the + // txnNumber for this lsid, so requests with older txnNumbers can no longer execute. + performNoopRetryableWriteForIndexCommit(opCtx, osi, shardIdsSet, executor); + } + + std::vector<ShardId> shardIdsVec{shardIdsSet.begin(), shardIdsSet.end()}; + + // Block writes in all shards that holds data for the user collection. + ShardsvrParticipantBlock shardsvrBlockWritesRequest(userCollectionNss); + shardsvrBlockWritesRequest.setBlockType(CriticalSectionBlockTypeEnum::kWrites); + shardsvrBlockWritesRequest.setReason( + getCriticalSectionReasonForIndexCommit(userCollectionNss, indexName)); + + sharding_util::sendCommandToShards( + opCtx, + userCollectionNss.db(), + CommandHelpers::appendMajorityWriteConcern(shardsvrBlockWritesRequest.toBSON({})), + shardIdsVec, + executor); + + // Perform the index modification. + callback(shardIdsVec); + + // Release the critical section in all the shards. + shardsvrBlockWritesRequest.setBlockType(CriticalSectionBlockTypeEnum::kUnblock); + sharding_util::sendCommandToShards( + opCtx, + userCollectionNss.db(), + CommandHelpers::appendMajorityWriteConcern(shardsvrBlockWritesRequest.toBSON({})), + shardIdsVec, + executor); +} + +} // namespace + +void registerIndexCatalogEntry(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + OperationSessionInfo& osi, + const NamespaceString& userCollectionNss, + const std::string& name, + const BSONObj& keyPattern, + const BSONObj& options, + const UUID& collectionUUID, + const boost::optional<UUID>& indexCollectionUUID, + bool firstExecution) { + coordinateIndexCatalogModificationAcrossCollectionShards( + opCtx, + executor, + osi, + userCollectionNss, + name, + collectionUUID, + firstExecution, + [&](std::vector<ShardId>& shardIds) { + IndexCatalogType index; + index.setCollectionUUID(collectionUUID); + index.setIndexCollectionUUID(indexCollectionUUID); + index.setKeyPattern(keyPattern); + index.setLastmod([opCtx] { + VectorClock::VectorTime vt = VectorClock::get(opCtx)->getTime(); + return vt.clusterTime().asTimestamp(); + }()); + index.setName(name); + index.setOptions(options); + + ShardsvrCommitIndexParticipant shardsvrCommitIndexParticipantRequest(userCollectionNss); + shardsvrCommitIndexParticipantRequest.setIndexCatalogType(index); + shardsvrCommitIndexParticipantRequest.setDbName(NamespaceString::kAdminDb); + + sharding_util::sendCommandToShards( + opCtx, + userCollectionNss.db(), + CommandHelpers::appendMajorityWriteConcern( + shardsvrCommitIndexParticipantRequest.toBSON(osi.toBSON())), + shardIds, + executor); + + // Now commit the change in the config server. + ConfigsvrCommitIndex configsvrCommitIndexRequest(userCollectionNss); + configsvrCommitIndexRequest.setIndexCatalogType(index); + configsvrCommitIndexRequest.setDbName(NamespaceString::kAdminDb); + auto commitIndexEntryResponse = + Grid::get(opCtx) + ->shardRegistry() + ->getConfigShard() + ->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + CommandHelpers::appendMajorityWriteConcern( + configsvrCommitIndexRequest.toBSON(osi.toBSON())), + Shard::RetryPolicy::kIdempotent); + + uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(commitIndexEntryResponse)); + }); +} + +void unregisterIndexCatalogEntry(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + OperationSessionInfo& osi, + const NamespaceString& userCollectionNss, + const std::string& name, + const UUID& collectionUUID, + bool firstExecution) { + coordinateIndexCatalogModificationAcrossCollectionShards( + opCtx, + executor, + osi, + userCollectionNss, + name, + collectionUUID, + firstExecution, + [&](std::vector<ShardId>& shardIdsVec) { + // Remove the index in the config server. + ConfigsvrDropIndexCatalogEntry configsvrDropIndexCatalogRequest(userCollectionNss); + UnregisterIndexCatalogRequest dropIndexCatalogRequest; + dropIndexCatalogRequest.setCollectionUUID(collectionUUID); + dropIndexCatalogRequest.setLastmod([opCtx] { + VectorClock::VectorTime vt = VectorClock::get(opCtx)->getTime(); + return vt.clusterTime().asTimestamp(); + }()); + dropIndexCatalogRequest.setName(name); + configsvrDropIndexCatalogRequest.setUnregisterIndexCatalogRequest( + dropIndexCatalogRequest); + configsvrDropIndexCatalogRequest.setDbName(NamespaceString::kAdminDb); + auto commitIndexEntryResponse = + Grid::get(opCtx) + ->shardRegistry() + ->getConfigShard() + ->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + CommandHelpers::appendMajorityWriteConcern( + configsvrDropIndexCatalogRequest.toBSON(osi.toBSON())), + Shard::RetryPolicy::kIdempotent); + + uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(commitIndexEntryResponse)); + + // Ensure the index is dropped in every shard. + ShardsvrDropIndexCatalogEntryParticipant shardsvrDropIndexCatalogEntryRequest( + userCollectionNss); + shardsvrDropIndexCatalogEntryRequest.setUnregisterIndexCatalogRequest( + dropIndexCatalogRequest); + shardsvrDropIndexCatalogEntryRequest.setDbName(NamespaceString::kAdminDb); + + sharding_util::sendCommandToShards( + opCtx, + NamespaceString::kAdminDb, + CommandHelpers::appendMajorityWriteConcern( + shardsvrDropIndexCatalogEntryRequest.toBSON(osi.toBSON())), + shardIdsVec, + executor); + }); +} +} // namespace sharding_index_catalog_util + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_index_catalog_util.h b/src/mongo/db/s/sharding_index_catalog_util.h new file mode 100644 index 00000000000..f12be516a95 --- /dev/null +++ b/src/mongo/db/s/sharding_index_catalog_util.h @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/executor/task_executor.h" + +namespace mongo { + +namespace sharding_index_catalog_util { + +/** + * Registers a new index in the catalog so it will be available for the shard and router role. In + * order to execute this function the following preconditions must be met: + * - It should be used from a DDLCoordinator + * - firstExecution should indicate if this is the first time that this function is being called + * from the coordinator (it should be false even if the second execution happened after a stepdown). + * If firstExecution is false, then the executor MUST come from a DDLCoordinator (or a POS). + * - osi must contain a valid session id and transaction number + * + * We have the following guarantees: + * - During the execution of this function migrations for userCollectionNss will be stopped and all + * started migrations will be cancelled. + * - There won't be any writes for userCollectionNss during the index catalog modification. + */ +void registerIndexCatalogEntry(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + OperationSessionInfo& osi, + const NamespaceString& userCollectionNss, + const std::string& name, + const BSONObj& keyPattern, + const BSONObj& options, + const UUID& collectionUUID, + const boost::optional<UUID>& indexCollectionUUID, + bool firstExecution); + +/** + * De-register an index from the catalog so it will no longer be available for shard and router + * role. The same preconditions and guarantees of registerIndexCatalogEntry apply for this function. + */ +void unregisterIndexCatalogEntry(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + OperationSessionInfo& osi, + const NamespaceString& userCollectionNss, + const std::string& name, + const UUID& collectionUUID, + bool firstExecution); +} // namespace sharding_index_catalog_util + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_util.cpp b/src/mongo/db/s/sharding_util.cpp index ea4e1ca7d69..232b348ffc7 100644 --- a/src/mongo/db/s/sharding_util.cpp +++ b/src/mongo/db/s/sharding_util.cpp @@ -38,6 +38,7 @@ #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/logv2/log.h" +#include "mongo/s/catalog/type_collection_gen.h" #include "mongo/s/catalog/type_index_catalog_gen.h" #include "mongo/s/request_types/flush_routing_table_cache_updates_gen.h" @@ -184,31 +185,47 @@ Status createIndexOnCollection(OperationContext* opCtx, Status createGlobalIndexesIndexes(OperationContext* opCtx) { bool unique = true; + NamespaceString indexCatalogNamespace; if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - auto result = - createIndexOnCollection(opCtx, - NamespaceString::kConfigsvrIndexCatalogNamespace, - BSON(IndexCatalogType::kCollectionUUIDFieldName - << 1 << IndexCatalogType::kLastModFieldName << 1), - !unique); - if (!result.isOK()) { - return result.withContext(str::stream() - << "couldn't create collectionUUID_1_lastmod_1 index on " - << NamespaceString::kConfigsvrIndexCatalogNamespace); - } + indexCatalogNamespace = NamespaceString::kConfigsvrIndexCatalogNamespace; } else { - auto result = - createIndexOnCollection(opCtx, - NamespaceString::kShardsIndexCatalogNamespace, - BSON(IndexCatalogType::kCollectionUUIDFieldName - << 1 << IndexCatalogType::kLastModFieldName << 1), - !unique); - if (!result.isOK()) { - return result.withContext(str::stream() - << "couldn't create collectionUUID_1_lastmod_1 index on " - << NamespaceString::kShardsIndexCatalogNamespace); - } + indexCatalogNamespace = NamespaceString::kShardIndexCatalogNamespace; + } + auto result = createIndexOnCollection(opCtx, + indexCatalogNamespace, + BSON(IndexCatalogType::kCollectionUUIDFieldName + << 1 << IndexCatalogType::kLastmodFieldName << 1), + !unique); + if (!result.isOK()) { + return result.withContext(str::stream() + << "couldn't create collectionUUID_1_lastmod_1 index on " + << indexCatalogNamespace); + } + result = createIndexOnCollection(opCtx, + indexCatalogNamespace, + BSON(IndexCatalogType::kCollectionUUIDFieldName + << 1 << IndexCatalogType::kNameFieldName << 1), + unique); + if (!result.isOK()) { + return result.withContext(str::stream() + << "couldn't create collectionUUID_1_name_1 index on " + << indexCatalogNamespace); + } + return Status::OK(); +} + +Status createShardCollectionCatalogIndexes(OperationContext* opCtx) { + bool unique = true; + auto result = createIndexOnCollection(opCtx, + NamespaceString::kShardCollectionCatalogNamespace, + BSON(CollectionTypeBase::kUuidFieldName << 1), + !unique); + if (!result.isOK()) { + return result.withContext(str::stream() + << "couldn't create uuid_1 index on " + << NamespaceString::kShardCollectionCatalogNamespace); } + return Status::OK(); } diff --git a/src/mongo/db/s/sharding_util.h b/src/mongo/db/s/sharding_util.h index 370f3f8ad75..90375723fd9 100644 --- a/src/mongo/db/s/sharding_util.h +++ b/src/mongo/db/s/sharding_util.h @@ -67,6 +67,11 @@ std::vector<AsyncRequestsSender::Response> sendCommandToShards( Status createGlobalIndexesIndexes(OperationContext* opCtx); /** + * Creates the necessary indexes for the collections collection. + */ +Status createShardCollectionCatalogIndexes(OperationContext* opCtx); + +/** * Helper function to create an index on a collection locally. */ Status createIndexOnCollection(OperationContext* opCtx, diff --git a/src/mongo/db/s/shardsvr_commit_index_participant_command.cpp b/src/mongo/db/s/shardsvr_commit_index_participant_command.cpp new file mode 100644 index 00000000000..0b62fc15217 --- /dev/null +++ b/src/mongo/db/s/shardsvr_commit_index_participant_command.cpp @@ -0,0 +1,208 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/recoverable_critical_section_service.h" +#include "mongo/db/s/sharded_index_catalog_commands_gen.h" +#include "mongo/db/s/sharding_index_catalog_util.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/transaction/transaction_participant.h" +#include "mongo/logv2/log.h" +#include "mongo/s/grid.h" +#include "mongo/s/sharding_feature_flags_gen.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +namespace mongo { +namespace { + +/** + * Insert an index in the local catalog. + * + * Returns true if there was a write performed. + */ +bool commitIndex(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + const NamespaceString& userCollectionNss, + const std::string& name, + const BSONObj& keyPattern, + const BSONObj& options, + const UUID& collectionUUID, + const Timestamp& lastmod, + const boost::optional<UUID>& indexCollectionUUID) { + IndexCatalogType indexCatalogEntry(name, keyPattern, options, lastmod, collectionUUID); + indexCatalogEntry.setIndexCollectionUUID(indexCollectionUUID); + + write_ops::UpdateCommandRequest upsertIndexOp(NamespaceString::kShardIndexCatalogNamespace); + upsertIndexOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON(IndexCatalogType::kCollectionUUIDFieldName + << collectionUUID << IndexCatalogType::kNameFieldName << name)); + entry.setU( + write_ops::UpdateModification::parseFromClassicUpdate(indexCatalogEntry.toBSON())); + entry.setUpsert(true); + entry.setMulti(false); + return entry; + }()}); + + write_ops::UpdateCommandRequest updateCollectionOp( + NamespaceString::kShardCollectionCatalogNamespace); + updateCollectionOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON(CollectionType::kNssFieldName << userCollectionNss.ns() + << CollectionType::kUuidFieldName + << collectionUUID)); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( + BSON("$set" << BSON(CollectionType::kIndexVersionFieldName << lastmod)))); + entry.setUpsert(true); + entry.setMulti(false); + return entry; + }()}); + updateCollectionOp.setWriteCommandRequestBase([] { + write_ops::WriteCommandRequestBase wcb; + wcb.setStmtId(1); + return wcb; + }()); + + DBDirectClient client(opCtx); + auto upsertResult = write_ops::checkWriteErrors(client.update(upsertIndexOp)); + auto updateResult = write_ops::checkWriteErrors(client.update(updateCollectionOp)); + + return upsertResult.getN() || updateResult.getN(); +} + +class ShardsvrCommitIndexParticipantCommand final + : public TypedCommand<ShardsvrCommitIndexParticipantCommand> { +public: + using Request = ShardsvrCommitIndexParticipant; + + bool skipApiVersionCheck() const override { + // Internal command (server to server). + return true; + } + + std::string help() const override { + return "Internal command. Do not call directly. Commits a globlal index for the shard-role " + "catalog."; + } + + bool adminOnly() const override { + return false; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool supportsRetryableWrite() const final { + return true; + } + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + uassert(ErrorCodes::CommandNotSupported, + format(FMT_STRING("{} command not enabled"), definition()->getName()), + feature_flags::gGlobalIndexesShardingCatalog.isEnabled( + serverGlobalParams.featureCompatibility)); + uassert(ErrorCodes::IllegalOperation, + "This command can only be executed in steady state shards.", + ShardingState::get(opCtx)->canAcceptShardedCommands() == Status::OK()); + + CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName, + opCtx->getWriteConcern()); + + const auto txnParticipant = TransactionParticipant::get(opCtx); + uassert(6711901, + str::stream() << Request::kCommandName << " must be run as a retryable write", + txnParticipant); + { + AutoGetCollection coll(opCtx, ns(), LockMode::MODE_IS); + auto csr = CollectionShardingRuntime::get(opCtx, ns()); + uassert( + 6711902, + "The critical section must be taken in order to execute this command", + csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite)); + } + + opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + + auto writesPerformed = + commitIndex(opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + ns(), + request().getName().toString(), + request().getKeyPattern(), + request().getOptions(), + request().getCollectionUUID(), + request().getLastmod(), + request().getIndexCollectionUUID()); + + if (!writesPerformed) { + // Since no write that generated a retryable write oplog entry with this sessionId + // and txnNumber happened, we need to make a dummy write so that the session gets + // durably persisted on the oplog. This must be the last operation done on this + // command. + DBDirectClient client(opCtx); + client.update(NamespaceString::kServerConfigurationNamespace.ns(), + BSON("_id" << Request::kCommandName), + BSON("$inc" << BSON("count" << 1)), + true /* upsert */, + false /* multi */); + } + } + + private: + NamespaceString ns() const override { + return request().getCommandParameter(); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; + +} shardsvrCommitIndexParticipantCommand; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/shardsvr_drop_index_catalog_entry_participant_command.cpp b/src/mongo/db/s/shardsvr_drop_index_catalog_entry_participant_command.cpp new file mode 100644 index 00000000000..304e15723a1 --- /dev/null +++ b/src/mongo/db/s/shardsvr_drop_index_catalog_entry_participant_command.cpp @@ -0,0 +1,196 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/sharded_index_catalog_commands_gen.h" +#include "mongo/db/s/sharding_index_catalog_util.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/transaction/transaction_participant.h" +#include "mongo/logv2/log.h" +#include "mongo/s/grid.h" +#include "mongo/s/sharding_feature_flags_gen.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +namespace mongo { +namespace { + +/** + * Drops an index from the local catalog. + * + * Returns true if there was a write performed. + */ +bool dropIndex(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + const NamespaceString& userCollectionNss, + const std::string& name, + const UUID& collectionUUID, + const Timestamp& lastmod) { + write_ops::DeleteCommandRequest deleteOp(NamespaceString::kShardIndexCatalogNamespace); + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(BSON(IndexCatalogType::kCollectionUUIDFieldName + << collectionUUID << IndexCatalogType::kNameFieldName << name)); + entry.setMulti(false); + return entry; + }()}); + deleteOp.setWriteCommandRequestBase([] { + write_ops::WriteCommandRequestBase wcb; + wcb.setStmtId(1); + return wcb; + }()); + + write_ops::UpdateCommandRequest updateCollectionOp( + NamespaceString::kShardCollectionCatalogNamespace); + updateCollectionOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON(CollectionType::kNssFieldName << userCollectionNss.ns() + << CollectionType::kUuidFieldName + << collectionUUID)); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( + BSON("$set" << BSON(CollectionType::kUuidFieldName + << collectionUUID << CollectionType::kIndexVersionFieldName + << lastmod)))); + entry.setUpsert(true); + entry.setMulti(false); + return entry; + }()}); + + DBDirectClient client(opCtx); + auto updateResult = write_ops::checkWriteErrors(client.update(updateCollectionOp)); + auto deleteResult = write_ops::checkWriteErrors(client.remove(deleteOp)); + + return deleteResult.getN() || updateResult.getN(); +} + +class ShardsvrDropIndexCatalogEntryParticipantCommand final + : public TypedCommand<ShardsvrDropIndexCatalogEntryParticipantCommand> { +public: + using Request = ShardsvrDropIndexCatalogEntryParticipant; + + bool skipApiVersionCheck() const override { + // Internal command (server to server). + return true; + } + + std::string help() const override { + return "Internal command. Do not call directly. Drops a globlal index for the shard-role " + "catalog."; + } + + bool adminOnly() const override { + return false; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool supportsRetryableWrite() const final { + return true; + } + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + uassert(ErrorCodes::CommandNotSupported, + format(FMT_STRING("{} command not enabled"), definition()->getName()), + feature_flags::gGlobalIndexesShardingCatalog.isEnabled( + serverGlobalParams.featureCompatibility)); + uassert(ErrorCodes::IllegalOperation, + "This command can only be executed in steady state shards.", + ShardingState::get(opCtx)->canAcceptShardedCommands() == Status::OK()); + + CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName, + opCtx->getWriteConcern()); + + const auto txnParticipant = TransactionParticipant::get(opCtx); + uassert(6711903, + str::stream() << Request::kCommandName << " must be run as a retryable write", + txnParticipant); + { + AutoGetCollection coll(opCtx, ns(), LockMode::MODE_IS); + auto csr = CollectionShardingRuntime::get(opCtx, ns()); + uassert( + 6711904, + "The critical section must be taken in order to execute this command", + csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite)); + } + + opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + + auto writesExecuted = dropIndex(opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + ns(), + request().getName().toString(), + request().getCollectionUUID(), + request().getLastmod()); + + if (!writesExecuted) { + // Since no write that generated a retryable write oplog entry with this sessionId + // and txnNumber happened, we need to make a dummy write so that the session gets + // durably persisted on the oplog. This must be the last operation done on this + // command. + DBDirectClient client(opCtx); + client.update(NamespaceString::kServerConfigurationNamespace.ns(), + BSON("_id" << Request::kCommandName), + BSON("$inc" << BSON("count" << 1)), + true /* upsert */, + false /* multi */); + } + } + + private: + NamespaceString ns() const override { + return request().getCommandParameter(); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; + +} shardsvrDropIndexCatalogEntryParticipantCommand; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/shardsvr_index_catalog_test_commands.cpp b/src/mongo/db/s/shardsvr_index_catalog_test_commands.cpp new file mode 100644 index 00000000000..189e06b7e80 --- /dev/null +++ b/src/mongo/db/s/shardsvr_index_catalog_test_commands.cpp @@ -0,0 +1,209 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/internal_session_pool.h" +#include "mongo/db/s/sharded_index_catalog_commands_gen.h" +#include "mongo/db/s/sharding_index_catalog_util.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/network_interface_thread_pool.h" +#include "mongo/executor/task_executor.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/logv2/log.h" +#include "mongo/rpc/metadata/egress_metadata_hook_list.h" +#include "mongo/s/grid.h" +#include "mongo/s/sharding_feature_flags_gen.h" +#include "mongo/util/future_util.h" + + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +namespace mongo { +namespace { + +class ShardsvrRegisterIndexTestCommand final + : public TypedCommand<ShardsvrRegisterIndexTestCommand> { +public: + using Request = ShardsvrRegisterIndex; + + bool skipApiVersionCheck() const override { + // Internal command (server to server). + return true; + } + + std::string help() const override { + return "Internal command. Do not call directly. Example on how to register an index in the " + "sharding catalog."; + } + + bool adminOnly() const override { + return false; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + uassert(ErrorCodes::CommandNotSupported, + format(FMT_STRING("{} command not enabled"), definition()->getName()), + feature_flags::gGlobalIndexesShardingCatalog.isEnabled( + serverGlobalParams.featureCompatibility)); + uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); + + CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName, + opCtx->getWriteConcern()); + + auto session = InternalSessionPool::get(opCtx)->acquireSystemSession(); + OperationSessionInfo osi; + + osi.setSessionId(session.getSessionId()); + osi.setTxnNumber(session.getTxnNumber()); + opCtx->setLogicalSessionId(*osi.getSessionId()); + opCtx->setTxnNumber(*osi.getTxnNumber()); + sharding_index_catalog_util::registerIndexCatalogEntry( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + osi, + ns(), + request().getName().toString(), + request().getKeyPattern(), + request().getOptions(), + request().getCollectionUUID(), + request().getIndexCollectionUUID(), + true); + // Release the session if the commit is successfull. + InternalSessionPool::get(opCtx)->release(session); + } + + private: + NamespaceString ns() const override { + return request().getCommandParameter(); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; +}; +MONGO_REGISTER_TEST_COMMAND(ShardsvrRegisterIndexTestCommand); + +class ShardsvrUnregisterIndexTestCommand final + : public TypedCommand<ShardsvrUnregisterIndexTestCommand> { +public: + using Request = ShardsvrUnregisterIndex; + + bool skipApiVersionCheck() const override { + // Internal command (server to server). + return true; + } + + std::string help() const override { + return "Internal command. Do not call directly. Example on how to unregister an index in " + "the sharding catalog."; + } + + bool adminOnly() const override { + return false; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + uassert(ErrorCodes::CommandNotSupported, + format(FMT_STRING("{} command not enabled"), definition()->getName()), + feature_flags::gGlobalIndexesShardingCatalog.isEnabled( + serverGlobalParams.featureCompatibility)); + uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); + + CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName, + opCtx->getWriteConcern()); + + auto session = InternalSessionPool::get(opCtx)->acquireSystemSession(); + OperationSessionInfo osi; + + osi.setSessionId(session.getSessionId()); + osi.setTxnNumber(session.getTxnNumber()); + opCtx->setLogicalSessionId(*osi.getSessionId()); + opCtx->setTxnNumber(*osi.getTxnNumber()); + + sharding_index_catalog_util::unregisterIndexCatalogEntry( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + osi, + ns(), + request().getName().toString(), + request().getCollectionUUID(), + true); + // Release the session if the commit is successfull. + InternalSessionPool::get(opCtx)->release(session); + } + + private: + NamespaceString ns() const override { + return request().getCommandParameter(); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; +}; +MONGO_REGISTER_TEST_COMMAND(ShardsvrUnregisterIndexTestCommand); +} // namespace +} // namespace mongo |