From 7be22a4e693ed231f217c7670de076dc1960b238 Mon Sep 17 00:00:00 2001 From: Rishab Joshi Date: Thu, 11 Aug 2022 17:00:55 +0000 Subject: SERVER-66631 Implement command to enable and disable the change stream. --- src/mongo/db/SConscript | 34 ++- src/mongo/db/auth/action_type.idl | 2 + src/mongo/db/auth/builtin_roles.yml | 2 + .../db/change_stream_change_collection_manager.cpp | 45 ++-- .../db/change_stream_change_collection_manager.h | 12 +- ...change_stream_pre_images_collection_manager.cpp | 7 + .../change_stream_pre_images_collection_manager.h | 7 + src/mongo/db/change_stream_state.idl | 62 ++++++ src/mongo/db/commands/SConscript | 4 + .../db/commands/change_stream_state_command.cpp | 179 ++++++++++++++++ src/mongo/db/commands/run_aggregate.cpp | 7 +- src/mongo/db/exec/collection_scan.cpp | 5 +- src/mongo/db/mongod_main.cpp | 5 + src/mongo/db/namespace_string.cpp | 3 + src/mongo/db/namespace_string.h | 3 + ...ent_source_change_stream_check_resumability.cpp | 9 +- src/mongo/db/repl/SConscript | 2 + src/mongo/db/repl/primary_only_service_util.cpp | 128 ++++++++++++ src/mongo/db/repl/primary_only_service_util.h | 146 +++++++++++++ .../db/repl/primary_only_service_util_test.cpp | 211 +++++++++++++++++++ ...replication_coordinator_external_state_impl.cpp | 16 +- .../db/set_change_stream_state_coordinator.cpp | 232 +++++++++++++++++++++ src/mongo/db/set_change_stream_state_coordinator.h | 101 +++++++++ .../db/set_change_stream_state_coordinator.idl | 61 ++++++ src/mongo/platform/mutex.h | 4 +- 25 files changed, 1246 insertions(+), 41 deletions(-) create mode 100644 src/mongo/db/change_stream_state.idl create mode 100644 src/mongo/db/commands/change_stream_state_command.cpp create mode 100644 src/mongo/db/repl/primary_only_service_util.cpp create mode 100644 src/mongo/db/repl/primary_only_service_util.h create mode 100644 src/mongo/db/repl/primary_only_service_util_test.cpp create mode 100644 src/mongo/db/set_change_stream_state_coordinator.cpp create mode 100644 src/mongo/db/set_change_stream_state_coordinator.h create mode 100644 src/mongo/db/set_change_stream_state_coordinator.idl (limited to 'src/mongo') diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index d8068bdd0e9..6f2cc3a531f 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -519,6 +519,35 @@ env.Library( ], ) +env.Library( + target='change_stream_state', + source=['change_stream_state.idl'], + LIBDEPS=[ + '$BUILD_DIR/mongo/idl/idl_parser', + ], +) + +env.Library( + target='set_change_stream_state_coordinator', + source=[ + 'set_change_stream_state_coordinator.cpp', + 'set_change_stream_state_coordinator.idl', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_state', + '$BUILD_DIR/mongo/db/dbdirectclient', + '$BUILD_DIR/mongo/db/logical_session_id', + '$BUILD_DIR/mongo/db/logical_session_id_helpers', + '$BUILD_DIR/mongo/db/repl/primary_only_service', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', + '$BUILD_DIR/mongo/db/repl/wait_for_majority_service', + '$BUILD_DIR/mongo/db/rw_concern_d', + '$BUILD_DIR/mongo/idl/idl_parser', + ], +) + env.Library( target='change_stream_change_collection_manager', source=[ @@ -526,7 +555,9 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog/catalog_helpers', - "$BUILD_DIR/mongo/db/catalog/clustered_collection_options", + '$BUILD_DIR/mongo/db/catalog/clustered_collection_options', + '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/service_context', ], @@ -2367,6 +2398,7 @@ env.Library( '$BUILD_DIR/mongo/db/change_stream_options_manager', '$BUILD_DIR/mongo/db/change_streams_cluster_parameter', '$BUILD_DIR/mongo/db/pipeline/change_stream_expired_pre_image_remover', + '$BUILD_DIR/mongo/db/set_change_stream_state_coordinator', '$BUILD_DIR/mongo/idl/cluster_server_parameter', '$BUILD_DIR/mongo/idl/cluster_server_parameter_op_observer', '$BUILD_DIR/mongo/s/grid', diff --git a/src/mongo/db/auth/action_type.idl b/src/mongo/db/auth/action_type.idl index c5328ec0617..ad1fb2cb0f9 100644 --- a/src/mongo/db/auth/action_type.idl +++ b/src/mongo/db/auth/action_type.idl @@ -186,6 +186,8 @@ enums: viewRole : "viewRole" viewUser : "viewUser" applyOps : "applyOps" + setChangeStreamState: "setChangeStreamState" + getChangeStreamState: "getChangeStreamState" # In 'MatchType' the extra_data field "serverlessActionTypes" is used # by the AuthorizationSession while in multitenancy mode to determine diff --git a/src/mongo/db/auth/builtin_roles.yml b/src/mongo/db/auth/builtin_roles.yml index ae2c1aa57dd..64202331747 100644 --- a/src/mongo/db/auth/builtin_roles.yml +++ b/src/mongo/db/auth/builtin_roles.yml @@ -334,6 +334,8 @@ roles: - setFreeMonitoring - setClusterParameter - getClusterParameter + - setChangeStreamState + - getChangeStreamState - matchType: any_normal actions: &clusterManagerRoleDatabaseActions diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index 5a79d14953f..9dbf6937b49 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -38,6 +38,7 @@ #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog/drop_collection.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/change_stream_pre_images_collection_manager.h" #include "mongo/db/multitenancy_gen.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/oplog.h" @@ -187,32 +188,44 @@ bool ChangeStreamChangeCollectionManager::hasChangeCollection( opCtx, NamespaceString::makeChangeCollectionNSS(tenantId))); } -Status ChangeStreamChangeCollectionManager::createChangeCollection( +bool ChangeStreamChangeCollectionManager::isChangeStreamEnabled( + OperationContext* opCtx, boost::optional tenantId) const { + // A change stream in the serverless is declared as enabled if both the change collection and + // the pre-images collection exist for the provided tenant. + return isChangeCollectionsModeActive() && hasChangeCollection(opCtx, tenantId) && + ChangeStreamPreImagesCollectionManager::hasPreImagesCollection(opCtx, tenantId); +} + +void ChangeStreamChangeCollectionManager::createChangeCollection( OperationContext* opCtx, boost::optional tenantId) { // Make the change collection clustered by '_id'. The '_id' field will have the same value as // the 'ts' field of the oplog. CollectionOptions changeCollectionOptions; changeCollectionOptions.clusteredIndex.emplace(clustered_util::makeDefaultClusteredIdIndex()); changeCollectionOptions.capped = true; + const auto changeCollNss = NamespaceString::makeChangeCollectionNSS(tenantId); - auto status = createCollection(opCtx, - NamespaceString::makeChangeCollectionNSS(tenantId), - changeCollectionOptions, - BSONObj()); - if (status.code() == ErrorCodes::NamespaceExists) { - return Status::OK(); - } - - return status; + const auto status = createCollection(opCtx, changeCollNss, changeCollectionOptions, BSONObj()); + uassert(status.code(), + str::stream() << "Failed to create change collection: " << changeCollNss + << causedBy(status.reason()), + status.isOK() || status.code() == ErrorCodes::NamespaceExists); } -Status ChangeStreamChangeCollectionManager::dropChangeCollection( - OperationContext* opCtx, boost::optional tenantId) { +void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext* opCtx, + boost::optional tenantId) { DropReply dropReply; - return dropCollection(opCtx, - NamespaceString::makeChangeCollectionNSS(tenantId), - &dropReply, - DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); + const auto changeCollNss = NamespaceString::makeChangeCollectionNSS(tenantId); + + const auto status = + dropCollection(opCtx, + changeCollNss, + &dropReply, + DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); + uassert(status.code(), + str::stream() << "Failed to drop change collection: " << changeCollNss + << causedBy(status.reason()), + status.isOK() || status.code() == ErrorCodes::NamespaceNotFound); } void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h index 37a9dbaef27..b6836ac0c1e 100644 --- a/src/mongo/db/change_stream_change_collection_manager.h +++ b/src/mongo/db/change_stream_change_collection_manager.h @@ -72,19 +72,23 @@ public: bool hasChangeCollection(OperationContext* opCtx, boost::optional tenantId) const; /** - * Creates a change collection for the specified tenant, if it doesn't exist. Returns Status::OK - * if the change collection already exists. + * Returns true if the change stream is enabled for the provided tenant, false otherwise. + */ + bool isChangeStreamEnabled(OperationContext* opCtx, boost::optional tenantId) const; + + /** + * Creates a change collection for the specified tenant, if it doesn't exist. * * TODO: SERVER-65950 make tenantId field mandatory. */ - Status createChangeCollection(OperationContext* opCtx, boost::optional tenantId); + void createChangeCollection(OperationContext* opCtx, boost::optional tenantId); /** * Deletes the change collection for the specified tenant, if it already exist. * * TODO: SERVER-65950 make tenantId field mandatory. */ - Status dropChangeCollection(OperationContext* opCtx, boost::optional tenantId); + void dropChangeCollection(OperationContext* opCtx, boost::optional tenantId); /** * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of oplog diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp index afae70d7da4..a83aa9645f6 100644 --- a/src/mongo/db/change_stream_pre_images_collection_manager.cpp +++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp @@ -172,6 +172,13 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op uassertStatusOK(insertionStatus); } +bool ChangeStreamPreImagesCollectionManager::hasPreImagesCollection( + OperationContext* opCtx, boost::optional tenantId) { + auto catalog = CollectionCatalog::get(opCtx); + return static_cast(catalog->lookupCollectionByNamespace( + opCtx, NamespaceString::makePreImageCollectionNSS(tenantId))); +} + namespace { RecordId toRecordId(ChangeStreamPreImageId id) { return record_id_helpers::keyForElem( diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.h b/src/mongo/db/change_stream_pre_images_collection_manager.h index 75efb28c22d..dede0e38c96 100644 --- a/src/mongo/db/change_stream_pre_images_collection_manager.h +++ b/src/mongo/db/change_stream_pre_images_collection_manager.h @@ -88,6 +88,13 @@ public: boost::optional tenantId, const ChangeStreamPreImage& preImage); + /** + * Returns true if the pre-images collection exists, false otherwise. If 'tenantId' is provided + * then the pre-images collection associated with that tenant will be checked for existence, + * otherwise the default pre-images collection will be checked. + */ + static bool hasPreImagesCollection(OperationContext* opCtx, boost::optional tenantId); + /** * Scans the system pre-images collection and deletes the expired pre-images from it. */ diff --git a/src/mongo/db/change_stream_state.idl b/src/mongo/db/change_stream_state.idl new file mode 100644 index 00000000000..cec05b86bc1 --- /dev/null +++ b/src/mongo/db/change_stream_state.idl @@ -0,0 +1,62 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# . +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + ChangeStreamStateParameters: + description: "The parameters associated with 'setChangeStreamState' and 'getChangeStreamState' commands." + fields: + enabled: + description: "Represents the state of the change stream of a tenant. If true, then + the change stream should be enabled, false otherwise. The corresponding + value is also returned by the 'getChangeStreamState' command" + type: bool + +commands: + setChangeStreamState: + description: "The command to set the state of the change stream in the serverless for a + particular tenant." + command_name: setChangeStreamState + cpp_name: SetChangeStreamStateCommandRequest + api_version: "" + namespace: ignored + chained_structs: + ChangeStreamStateParameters: ChangeStreamStateParameters + getChangeStreamState: + description: "The command to get the state of the change stream in the serverless for a + particular tenant." + command_name: getChangeStreamState + cpp_name: GetChangeStreamStateCommandRequest + api_version: "" + namespace: ignored + reply_type: ChangeStreamStateParameters \ No newline at end of file diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index c995e5eccbf..e9772f9b61f 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -372,6 +372,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/index_key_validate', '$BUILD_DIR/mongo/db/catalog/multi_index_block', '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', '$BUILD_DIR/mongo/db/command_can_run_here', '$BUILD_DIR/mongo/db/commands', '$BUILD_DIR/mongo/db/concurrency/exception_util', @@ -513,6 +514,7 @@ env.Library( "apply_ops_cmd.cpp", "collection_to_capped.cpp", "compact.cpp", + "change_stream_state_command.cpp", "cpuload.cpp", "dbcheck.cpp", "dbcommands_d.cpp", @@ -565,6 +567,7 @@ env.Library( '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/change_stream_options_manager', '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_state', '$BUILD_DIR/mongo/db/cluster_transaction_api', '$BUILD_DIR/mongo/db/commands', '$BUILD_DIR/mongo/db/curop_failpoint_helpers', @@ -592,6 +595,7 @@ env.Library( '$BUILD_DIR/mongo/db/s/user_writes_recoverable_critical_section', '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/db/serverless/shard_split_donor_service', + '$BUILD_DIR/mongo/db/set_change_stream_state_coordinator', '$BUILD_DIR/mongo/db/tenant_id', '$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util', '$BUILD_DIR/mongo/db/transaction/transaction_api', diff --git a/src/mongo/db/commands/change_stream_state_command.cpp b/src/mongo/db/commands/change_stream_state_command.cpp new file mode 100644 index 00000000000..de8d98083ea --- /dev/null +++ b/src/mongo/db/commands/change_stream_state_command.cpp @@ -0,0 +1,179 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_state_gen.h" +#include "mongo/db/commands.h" +#include "mongo/db/set_change_stream_state_coordinator.h" +#include "mongo/logv2/log.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand + +namespace mongo { + +namespace { + +/** + * The command that should run in the replica-set to set the state of the change stream in the + * serverless. + */ +class SetChangeStreamStateCommand final : public TypedCommand { +public: + using Request = SetChangeStreamStateCommandRequest; + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool adminOnly() const override { + return true; + } + + std::string help() const override { + return "Sets the change stream state in the serverless\n" + "Usage:\n" + " {setChangeStreamState: 1, enabled: }\n" + "Fields:\n" + " enabled: enable or disable the change stream"; + } + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + uassert(ErrorCodes::CommandNotSupported, + str::stream() << SetChangeStreamStateCommandRequest::kCommandName + << " is only supported in the serverless", + ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()); + + // TODO SERVER-65950 use provided '$tenant' only and add 'uassert that tenant must be + // present. Remove 'getDollarTenant()' and fetch tenant from dbName(). + const std::string tenantId = request().getDollarTenant() + ? request().getDollarTenant()->toString() + : TenantId::kSystemTenantId.toString(); + + // Prepare the payload for the 'SetChangeStreamStateCoordinator'. + SetChangeStreamStateCoordinatorId coordinatorId; + coordinatorId.setTenantId({TenantId{OID(tenantId)}}); + SetChangeStreamStateCoordinatorDocument coordinatorDoc{ + coordinatorId, request().getChangeStreamStateParameters().toBSON()}; + + // Dispatch the request to the 'SetChangeStreamStateCoordinatorService'. + const auto service = SetChangeStreamStateCoordinatorService::getService(opCtx); + const auto instance = service->getOrCreateInstance(opCtx, coordinatorDoc.toBSON()); + + const auto coordinatorCompletionFuture = instance->getCompletionFuture(); + coordinatorCompletionFuture.get(opCtx); + } + + private: + bool supportsWriteConcern() const override { + return false; + } + + NamespaceString ns() const override { + return NamespaceString(); + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(), + ActionType::setChangeStreamState})); + } + }; +} setChangeStreamStateCommand; + +class GetChangeStreamStateCommand final : public TypedCommand { +public: + using Request = GetChangeStreamStateCommandRequest; + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool adminOnly() const override { + return true; + } + + std::string help() const override { + return "Gets the change stream state in the serverless\n" + "Usage:\n" + " {getChangeStreamState: 1}"; + } + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + auto typedRun(OperationContext* opCtx) { + uassert(ErrorCodes::CommandNotSupported, + str::stream() << GetChangeStreamStateCommandRequest::kCommandName + << " is only supported in the serverless", + ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()); + + // TODO SERVER-65950 use provided '$tenant' only and add 'uassert that tenant must be + // present. + boost::optional tenantId = boost::none; + + // Set the change stream enablement state in the 'reply' object. + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + GetChangeStreamStateCommandRequest::Reply reply{ + changeCollectionManager.isChangeStreamEnabled(opCtx, tenantId)}; + + return reply; + } + + private: + bool supportsWriteConcern() const override { + return false; + } + + NamespaceString ns() const override { + return NamespaceString(); + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(), + ActionType::getChangeStreamState})); + } + }; +} getChangeStreamStateCommand; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 36a23574a71..01ed5a70a0a 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -40,6 +40,7 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_stream_pre_images_collection_manager.h" #include "mongo/db/commands/cqf/cqf_aggregate.h" #include "mongo/db/commands/cqf/cqf_command_utils.h" #include "mongo/db/curop.h" @@ -752,14 +753,12 @@ Status runAggregate(OperationContext* opCtx, // Replace the execution namespace with the oplog. nss = NamespaceString::kRsOplogNamespace; - // In case of serverless the change stream will be opened on the change collection. We - // should first check if the change collection for the particular tenant exists and then - // replace the namespace with the change collection. + // In case of serverless the change stream will be opened on the change collection. if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); uassert(ErrorCodes::ChangeStreamNotEnabled, "Change streams must be enabled before being used.", - changeCollectionManager.hasChangeCollection(opCtx, origNss.tenantId())); + changeCollectionManager.isChangeStreamEnabled(opCtx, origNss.tenantId())); nss = NamespaceString::makeChangeCollectionNSS(origNss.tenantId()); } diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index edea7336f3e..b9910a7a6c8 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -302,7 +302,10 @@ void CollectionScan::assertTsHasNotFallenOff(const Record& record) { const bool tsHasNotFallenOff = oplogEntry.getTimestamp() <= *_params.assertTsHasNotFallenOff; uassert(ErrorCodes::OplogQueryMinTsMissing, - "Specified timestamp has already fallen off the oplog", + str::stream() + << "Specified timestamp has already fallen off the oplog for the input timestamp: " + << *_params.assertTsHasNotFallenOff + << ", oplog entry: " << oplogEntry.getEntry().toString(), isNewRS || tsHasNotFallenOff); // We don't need to check this assertion again after we've confirmed the first oplog event. _params.assertTsHasNotFallenOff = boost::none; diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index eb212db69e0..80fe723c81e 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -161,6 +161,7 @@ #include "mongo/db/service_entry_point_mongod.h" #include "mongo/db/session_catalog.h" #include "mongo/db/session_killer.h" +#include "mongo/db/set_change_stream_state_coordinator.h" #include "mongo/db/startup_recovery.h" #include "mongo/db/startup_warnings_mongod.h" #include "mongo/db/stats/counters.h" @@ -349,6 +350,9 @@ void registerPrimaryOnlyServices(ServiceContext* serviceContext) { } } + // TODO SERVER-65950 create 'SetChangeStreamStateCoordinatorService' only in the serverless. + services.push_back(std::make_unique(serviceContext)); + for (auto& service : services) { registry->registerService(std::move(service)); } @@ -1549,6 +1553,7 @@ int mongod_main(int argc, char* argv[]) { ReadWriteConcernDefaults::create(service, readWriteConcernDefaultsCacheLookupMongoD); ChangeStreamOptionsManager::create(service); + // TODO SERVER-65950 create 'ChangeStreamChangeCollectionManager' only in the serverless. ChangeStreamChangeCollectionManager::create(service); #if defined(_WIN32) diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 639694dc4af..9a79fc2f4ca 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -182,6 +182,9 @@ const NamespaceString NamespaceString::kShardCollectionCatalogNamespace(Namespac const NamespaceString NamespaceString::kLockpingsNamespace(NamespaceString::kConfigDb, "lockpings"); const NamespaceString NamespaceString::kDistLocksNamepsace(NamespaceString::kConfigDb, "locks"); +const NamespaceString NamespaceString::kSetChangeStreamStateCoordinatorNamespace( + NamespaceString::kConfigDb, "change_stream_coordinator"); + NamespaceString NamespaceString::parseFromStringExpectTenantIdInMultitenancyMode(StringData ns) { if (!gMultitenancySupport) { return NamespaceString(ns, boost::none); diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 3c54837625e..18cf434bd86 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -245,6 +245,9 @@ public: // TODO SERVER-68551: remove once 7.0 becomes last-lts static const NamespaceString kDistLocksNamepsace; + // Namespace used to store the state document of 'SetChangeStreamStateCoordinator'. + static const NamespaceString kSetChangeStreamStateCoordinatorNamespace; + /** * Constructs an empty NamespaceString. */ diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp index d95ff78f58e..ee473aada28 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp @@ -33,8 +33,12 @@ #include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" #include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/logv2/log.h" using boost::intrusive_ptr; + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + namespace mongo { namespace { @@ -162,7 +166,10 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckResumability::doGet auto nextInput = [this]() { try { return pSource->getNext(); - } catch (const ExceptionFor&) { + } catch (const ExceptionFor& ex) { + LOGV2_ERROR(6663107, + "Resume of change stream was not possible", + "reason"_attr = ex.reason()); uasserted(ErrorCodes::ChangeStreamHistoryLost, "Resume of change stream was not possible, as the resume point may no " "longer be in the oplog."); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 9c63c0a8151..40dcaa49c6e 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1669,6 +1669,7 @@ if wiredtiger: 'oplog_test.cpp', 'optime_extract_test.cpp', 'primary_only_service_test.cpp', + 'primary_only_service_util_test.cpp', 'read_concern_args_test.cpp', 'repl_set_config_checks_test.cpp', 'repl_set_config_test.cpp', @@ -1965,6 +1966,7 @@ env.Library( target='primary_only_service', source=[ 'primary_only_service.cpp', + 'primary_only_service_util.cpp', 'primary_only_service_op_observer.cpp', ], LIBDEPS=[ diff --git a/src/mongo/db/repl/primary_only_service_util.cpp b/src/mongo/db/repl/primary_only_service_util.cpp new file mode 100644 index 00000000000..d845ac2a262 --- /dev/null +++ b/src/mongo/db/repl/primary_only_service_util.cpp @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/primary_only_service_util.h" + +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/logv2/log.h" +#include "mongo/util/future_util.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication + +namespace mongo { + +namespace { + +// Set the exponential delay after evaluating the condition 'until' within the 'run' method. +const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); + +} // namespace + +DefaultPrimaryOnlyServiceInstance::~DefaultPrimaryOnlyServiceInstance() { + invariant(_completionPromise.getFuture().isReady()); +} + +SharedSemiFuture DefaultPrimaryOnlyServiceInstance::getCompletionFuture() { + return _completionPromise.getFuture(); +} + +void DefaultPrimaryOnlyServiceInstance::interrupt(Status status) noexcept { + LOGV2_DEBUG(6663100, + 1, + "DefaultPrimaryOnlyServiceInstance interrupted", + "instance"_attr = getInstanceName(), + "reason"_attr = redact(status)); + + // Resolve any unresolved promises to avoid hanging. + stdx::lock_guard lg(_mutex); + if (!_completionPromise.getFuture().isReady()) { + _completionPromise.setError(status); + } +} + +SemiFuture DefaultPrimaryOnlyServiceInstance::run( + std::shared_ptr executor, + const CancellationToken& token) noexcept { + return ExecutorFuture(**executor) + .then([this, executor, token, anchor = shared_from_this()] { + return AsyncTry([this, executor, token] { return _runImpl(executor, token); }) + .until([this, token](Status status) { return status.isOK() || token.isCanceled(); }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(**executor, CancellationToken::uncancelable()); + }) + .onCompletion([this, executor, token, anchor = shared_from_this()](const Status& status) { + if (!status.isOK()) { + LOGV2_ERROR(6663101, + "Error executing DefaultPrimaryOnlyServiceInstance", + "instance"_attr = getInstanceName(), + "error"_attr = redact(status)); + + // Nothing else to do, the _completionPromise will be cancelled once the coordinator + // is interrupted, because the only reasons to stop forward progress in this node is + // because of a stepdown happened or the coordinator was canceled. + dassert((token.isCanceled() && status.isA()) || + status.isA()); + return status; + } + + try { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + + _removeStateDocument(opCtx); + } catch (DBException& ex) { + LOGV2_WARNING( + 6663103, + "Failed to remove state document from DefaultPrimaryOnlyServiceInstance", + "instance"_attr = getInstanceName(), + "error"_attr = redact(ex)); + ex.addContext(str::stream() + << "Failed to remove state document from " << getInstanceName()); + + stdx::lock_guard lg(_mutex); + if (!_completionPromise.getFuture().isReady()) { + _completionPromise.setError(ex.toStatus()); + } + + throw; + } + + stdx::lock_guard lg(_mutex); + if (!_completionPromise.getFuture().isReady()) { + _completionPromise.emplaceValue(); + } + + return status; + }) + .semi(); +} + +} // namespace mongo diff --git a/src/mongo/db/repl/primary_only_service_util.h b/src/mongo/db/repl/primary_only_service_util.h new file mode 100644 index 00000000000..64aea0ac8d2 --- /dev/null +++ b/src/mongo/db/repl/primary_only_service_util.h @@ -0,0 +1,146 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/repl/primary_only_service.h" + +namespace mongo { + +/** + * A helper to CRUD the state document for the 'PrimaryOnlyService' using the provided namespace + * 'stateDocumentNs'. + */ +template +class PrimaryOnlyServiceStateStore { +public: + explicit PrimaryOnlyServiceStateStore(const NamespaceString& stateDocumentNs) + : _stateDocumentNs{stateDocumentNs} {} + + /** + * Stores the state document 'addDoc' on the provided namespace. + */ + void add( + OperationContext* opCtx, + const StateDocumentType& addDoc, + const WriteConcernOptions& writeConcern = WriteConcerns::kMajorityWriteConcernNoTimeout) { + PersistentTaskStore store(_stateDocumentNs); + store.add(opCtx, addDoc, writeConcern); + } + + /** + * Removes the state document matching the criteria stated in the 'removeDoc'. + */ + void remove( + OperationContext* opCtx, + const BSONObj& removeDoc, + const WriteConcernOptions& writeConcern = WriteConcerns::kMajorityWriteConcernNoTimeout) { + PersistentTaskStore store(_stateDocumentNs); + store.remove(opCtx, removeDoc, writeConcern); + } + + /** + * Updates the state document 'updateDoc' using the provided filter criteria 'filter'. + */ + void update( + OperationContext* opCtx, + const BSONObj& filter, + const BSONObj& updateDoc, + const WriteConcernOptions& writeConcern = WriteConcerns::kMajorityWriteConcernNoTimeout) { + PersistentTaskStore store(_stateDocumentNs); + store.update(opCtx, filter, updateDoc, writeConcern); + } + + /** + * Gets the count of the documents matching the 'filter' criteria. + */ + size_t count(OperationContext* opCtx, const BSONObj& filter = BSONObj{}) { + PersistentTaskStore store(_stateDocumentNs); + return store.count(opCtx, filter); + } + +private: + // Namespace where the state document will be stored. + const NamespaceString _stateDocumentNs; +}; + +/** + * A helper that provides a default implementation for the set of methods to create the + * 'PrimaryOnlyService::Instance'. + */ +class DefaultPrimaryOnlyServiceInstance + : public repl::PrimaryOnlyService::TypedInstance { +public: + ~DefaultPrimaryOnlyServiceInstance(); + + /** + * The name of the 'PrimaryOnlyService::Instance'. + */ + virtual const StringData getInstanceName() = 0; + + /** + * Interrupts the current running 'DefaultPrimaryOnlyServiceInstance' instance. + */ + void interrupt(Status status) noexcept override; + + /** + * Returns the completion state of the running instance. + */ + SharedSemiFuture getCompletionFuture(); + +private: + /** + * Invoked by the 'PrimaryOnlyService' to run the 'DefaultPrimaryOnlyServiceInstance' instance + * type. + */ + SemiFuture run(std::shared_ptr executor, + const CancellationToken& token) noexcept override; + + /** + * The derived class must override this method to write the custom logic. This method is invoked + * within the context of the 'run' method. + */ + virtual ExecutorFuture _runImpl(std::shared_ptr executor, + const CancellationToken& token) noexcept = 0; + + /** + * Removes the state document after the 'DefaultPrimaryOnlyServiceInstance' has been completed. + * It is the responsibility of the derived class to store the state document. + */ + virtual void _removeStateDocument(OperationContext* opCtx) = 0; + + // Guards the access of the '_completionPromise'. + Mutex _mutex = MONGO_MAKE_LATCH("DefaultPrimaryOnlyServiceInstance::_mutex"); + + // Tracks the completion state of the instance. + SharedPromise _completionPromise; +}; + +} // namespace mongo diff --git a/src/mongo/db/repl/primary_only_service_util_test.cpp b/src/mongo/db/repl/primary_only_service_util_test.cpp new file mode 100644 index 00000000000..0fc059d3d5a --- /dev/null +++ b/src/mongo/db/repl/primary_only_service_util_test.cpp @@ -0,0 +1,211 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/base/string_data.h" +#include "mongo/db/repl/primary_only_service_test_fixture.h" +#include "mongo/db/repl/primary_only_service_util.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/logv2/log.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/fail_point.h" + +using namespace mongo; +using namespace mongo::repl; + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +namespace { +constexpr auto kTestPrimaryOnlyServiceName = "TestService"_sd; +constexpr auto kTestPrimaryOnlyServiceInstanceName = "TestServiceInstance"_sd; +constexpr auto kTestPrimaryOnlyServiceStateDocumentNss = "config.test_primary_only_service"_sd; + +// Hangs the 'TestDefaultPrimaryOnlyServiceInstance' after inserting the state document. +MONGO_FAIL_POINT_DEFINE(HangTestPrimaryOnlyServiceAfterStateDocInsertion); + +/** + * Represents the state document class for the 'TestDefaultPrimaryOnlyService'. + */ +class TestStateDocument { +public: + void setState(int state) { + _state = state; + } + + const BSONObj toBSON() const { + return BSON("_id" << _state); + } + +private: + int _state = 0; +}; + +// Create a global instance of the 'PrimaryOnlyServiceStateStore' to store the state document. +PrimaryOnlyServiceStateStore gStateDocStore{ + NamespaceString{kTestPrimaryOnlyServiceStateDocumentNss}}; + +/** + * Test class for the 'DefaultPrimaryOnlyServiceInstance'. + */ +class TestDefaultPrimaryOnlyServiceInstance : public DefaultPrimaryOnlyServiceInstance { +public: + const StringData getInstanceName() final { + return kTestPrimaryOnlyServiceInstanceName; + } + + boost::optional reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode connMode, + MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept final { + return boost::none; + }; + + void checkIfOptionsConflict(const BSONObj& stateDoc) const final{}; + +private: + ExecutorFuture _runImpl(std::shared_ptr executor, + const CancellationToken& token) noexcept final { + + return ExecutorFuture(**executor).then([this, anchor = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + + TestStateDocument testStateDoc; + testStateDoc.setState(1); + gStateDocStore.add(opCtx, testStateDoc); + + if (MONGO_unlikely(HangTestPrimaryOnlyServiceAfterStateDocInsertion.shouldFail())) { + HangTestPrimaryOnlyServiceAfterStateDocInsertion.pauseWhileSet(); + } + }); + }; + + void _removeStateDocument(OperationContext* opCtx) final { + gStateDocStore.remove(opCtx, BSON("_id" << 1)); + }; +}; + +/** + * Test class for the 'PrimaryOnlyService'. + */ +class TestDefaultPrimaryOnlyService final : public repl::PrimaryOnlyService { +public: + explicit TestDefaultPrimaryOnlyService(ServiceContext* serviceContext) + : repl::PrimaryOnlyService(serviceContext) {} + + ~TestDefaultPrimaryOnlyService() = default; + + StringData getServiceName() const final { + return kTestPrimaryOnlyServiceName; + } + + NamespaceString getStateDocumentsNS() const final { + return NamespaceString(kTestPrimaryOnlyServiceStateDocumentNss); + } + + ThreadPool::Limits getThreadPoolLimits() const final { + return ThreadPool::Limits(); + } + + static TestDefaultPrimaryOnlyService* getService(OperationContext* opCtx) { + auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); + auto service = registry->lookupServiceByName(kTestPrimaryOnlyServiceInstanceName); + return checked_cast(std::move(service)); + } + + void checkIfConflictsWithOtherInstances( + OperationContext* opCtx, + BSONObj initialState, + const std::vector& existingInstances) final{}; + + std::shared_ptr constructInstance( + BSONObj initialState) final { + return std::make_shared(); + } +}; + +} // namespace + +class DefaultPrimaryOnlyServiceInstanceTest : public PrimaryOnlyServiceMongoDTest { +public: + std::unique_ptr makeService(ServiceContext* serviceContext) override { + return std::make_unique(serviceContext); + } + + void setUp() override { + PrimaryOnlyServiceMongoDTest::setUp(); + + _service = _registry->lookupServiceByName(kTestPrimaryOnlyServiceName); + ASSERT(_service); + } + + void tearDown() override { + // Ensure that even on test failures all failpoint state gets reset. + globalFailPointRegistry().disableAllFailpoints(); + + WaitForMajorityService::get(getServiceContext()).shutDown(); + + _registry->onShutdown(); + _service = nullptr; + + ServiceContextMongoDTest::tearDown(); + } + + void stepUp() { + auto opCtx = cc().makeOperationContext(); + PrimaryOnlyServiceMongoDTest::stepUp(opCtx.get()); + } +}; + +TEST_F(DefaultPrimaryOnlyServiceInstanceTest, VerifyTaskExecuted) { + // Initialize the fail point to hang the 'TestDefaultPrimaryOnlyServiceInstance' at the + // '_runImpl' method. + auto timesEntered = + HangTestPrimaryOnlyServiceAfterStateDocInsertion.setMode(FailPoint::alwaysOn); + + auto opCtx = makeOperationContext(); + + // Create an instance of the 'TestDefaultPrimaryOnlyServiceInstance'. + auto instance = TestDefaultPrimaryOnlyServiceInstance::getOrCreate( + opCtx.get(), _service, BSON("_id" << 0), true); + ASSERT(instance.get()); + + // Wait until the fail point is hit. + HangTestPrimaryOnlyServiceAfterStateDocInsertion.waitForTimesEntered(timesEntered + 1); + + // Verify that the state document has been inserted. This verifies that the method + // 'TestDefaultPrimaryOnlyServiceInstance::_runImpl' has been executed. + ASSERT_EQ(gStateDocStore.count(opCtx.get(), BSON("_id" << 1)), 1); + HangTestPrimaryOnlyServiceAfterStateDocInsertion.setMode(FailPoint::off); + + // Wait for the instance to complete. + instance->getCompletionFuture().get(); + + // Verify that the state document has now been removed. This verifies that the method + // 'DefaultPrimaryOnlyServiceInstanceTest::_removeStateDocument' has been executed. + ASSERT_EQ(gStateDocStore.count(opCtx.get(), BSON("_id" << 1)), 0); +} diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index df8655044d4..0039845afc7 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -544,18 +544,10 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC } }); - // Create the pre-images collection if it doesn't exist yet. - ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, - boost::none /* tenantId */); - - // TODO: SERVER-66631 move the change collection creation logic from here to the PM-2502 hooks. - // The change collection will be created when the change stream is enabled. - if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { - auto status = ChangeStreamChangeCollectionManager::get(opCtx).createChangeCollection( - opCtx, boost::none); - if (!status.isOK()) { - fassert(6520900, status); - } + // Create the pre-images collection if it doesn't exist yet in the non-serverless environment. + if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + ChangeStreamPreImagesCollectionManager::createPreImagesCollection( + opCtx, boost::none /* tenantId */); } serverGlobalParams.validateFeaturesAsPrimary.store(true); diff --git a/src/mongo/db/set_change_stream_state_coordinator.cpp b/src/mongo/db/set_change_stream_state_coordinator.cpp new file mode 100644 index 00000000000..a172b2e06fc --- /dev/null +++ b/src/mongo/db/set_change_stream_state_coordinator.cpp @@ -0,0 +1,232 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/set_change_stream_state_coordinator.h" + +#include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_state_gen.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/logv2/log.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + +namespace mongo { + +// Hangs the 'SetChangeStreamStateCoordinator' before calling the +// 'ChangeStreamStateCommandProcessor'. +MONGO_FAIL_POINT_DEFINE(hangSetChangeStreamStateCoordinatorBeforeCommandProcessor); +namespace { + +constexpr auto kSetChangeStreamStateCoordinatorServiceName = + "setChangeStreamStateCoordinatorService"_sd; +constexpr auto kSetChangeStreamStateCoordinatorName = "setChangeStreamStateCoordinator"_sd; + +/** + * Waits until the oplog entry has been majority committed. + */ +void waitForMajority(OperationContext* opCtx) { + const auto replCoord = repl::ReplicationCoordinator::get(opCtx); + const auto lastLocalOpTime = replCoord->getMyLastAppliedOpTime(); + WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(lastLocalOpTime, opCtx->getCancellationToken()) + .get(opCtx); +} + +/** + * A helper that inspect the the 'SetChangeStreamStateCoordinatorDocument' state document and + * processes the provided change stream command accordingly. + */ +class ChangeStreamStateCommandProcessor { +public: + explicit ChangeStreamStateCommandProcessor(SetChangeStreamStateCoordinatorDocument stateDoc) + : _stateDoc(stateDoc) {} + + /** + * Retrieves the tenant id and processes the change stream command. + */ + void process(OperationContext* opCtx) { + const auto setChangeStreamParameter = ChangeStreamStateParameters::parse( + IDLParserContext("ChangeStreamStateParameters"), _stateDoc.getCommand()); + + invariant(_stateDoc.getId().getTenantId()); + + // TODO SERVER-65950 replace 'tenantId' with the provided tenant id. + auto tenantId = boost::none; + + if (setChangeStreamParameter.getEnabled()) { + _enableChangeStream(opCtx, tenantId); + } else { + _disableChangeStream(opCtx, tenantId); + } + } + +private: + /** + * Enables the change stream in the serverless by creating the change collection and the + * pre-image collection. + */ + void _enableChangeStream(OperationContext* opCtx, boost::optional tenantId) { + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + changeCollectionManager.createChangeCollection(opCtx, tenantId); + + ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, tenantId); + + // Wait until the create requests are majority committed. + waitForMajority(opCtx); + } + + /** + * Disables the change stream in the serverless by dropping the change collection and the + * pre-image collection. + */ + void _disableChangeStream(OperationContext* opCtx, boost::optional tenantId) { + auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + changeCollectionManager.dropChangeCollection(opCtx, tenantId); + + ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, tenantId); + + // Wait until the drop requests are majority committed. + waitForMajority(opCtx); + } + + const SetChangeStreamStateCoordinatorDocument _stateDoc; +}; + +} // namespace + +const StringData SetChangeStreamStateCoordinator::getInstanceName() { + return kSetChangeStreamStateCoordinatorName; +} + +SetChangeStreamStateCoordinator::SetChangeStreamStateCoordinator(const BSONObj& stateDoc) + : _stateDoc{SetChangeStreamStateCoordinatorDocument::parse( + IDLParserContext("SetChangeStreamStateCoordinatorDocument"), stateDoc)}, + _stateDocStore{NamespaceString::kSetChangeStreamStateCoordinatorNamespace} {} + +boost::optional SetChangeStreamStateCoordinator::reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode connMode, + MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept { + // Tenant id must always be present. + invariant(_stateDoc.getId().getTenantId()); + + BSONObjBuilder bob; + bob.append("tenantId", _stateDoc.getId().getTenantId()->toString()); + bob.append("command", _stateDoc.getCommand()); + return bob.obj(); +} + +void SetChangeStreamStateCoordinator::checkIfOptionsConflict(const BSONObj& otherDocBSON) const { + const auto otherDoc = SetChangeStreamStateCoordinatorDocument::parse( + IDLParserContext("SetChangeStreamStateCoordinatorDocument"), otherDocBSON); + + // The '_id' field of the state document corresponds to the tenant id and hence if we are here + // then the current and the new request belongs to the same tenant. Reject the new request if it + // is not identical to the previous one. + uassert(ErrorCodes::ConflictingOperationInProgress, + "Another conflicting request is in progress", + SimpleBSONObjComparator::kInstance.evaluate(_stateDoc.toBSON() == otherDoc.toBSON())); +} + +ExecutorFuture SetChangeStreamStateCoordinator::_runImpl( + std::shared_ptr executor, + const CancellationToken& token) noexcept { + + return ExecutorFuture(**executor).then([this, anchor = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + + try { + _stateDocStore.add(opCtx, _stateDoc); + } catch (const ExceptionFor&) { + // A series of retries, step-up and step-down events can cause a node to try and insert + // the document when it has already been persisted locally, but we must still wait for + // majority commit. + waitForMajority(opCtx); + } + + hangSetChangeStreamStateCoordinatorBeforeCommandProcessor.pauseWhileSet( + Interruptible::notInterruptible()); + + // Dispatch the state document to be processed. + ChangeStreamStateCommandProcessor commandProcessor(_stateDoc); + commandProcessor.process(opCtx); + }); +} + +void SetChangeStreamStateCoordinator::_removeStateDocument(OperationContext* opCtx) { + _stateDocStore.remove( + opCtx, + BSON(SetChangeStreamStateCoordinatorDocument::kIdFieldName << _stateDoc.getId().toBSON())); +} + +StringData SetChangeStreamStateCoordinatorService::getServiceName() const { + return kSetChangeStreamStateCoordinatorServiceName; +} + +NamespaceString SetChangeStreamStateCoordinatorService::getStateDocumentsNS() const { + return NamespaceString::kSetChangeStreamStateCoordinatorNamespace; +} + +ThreadPool::Limits SetChangeStreamStateCoordinatorService::getThreadPoolLimits() const { + return ThreadPool::Limits(); +} + +SetChangeStreamStateCoordinatorService* SetChangeStreamStateCoordinatorService::getService( + OperationContext* opCtx) { + auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); + auto service = registry->lookupServiceByName(kSetChangeStreamStateCoordinatorServiceName); + return checked_cast(std::move(service)); +} + +std::shared_ptr +SetChangeStreamStateCoordinatorService::constructInstance(BSONObj stateDoc) { + return std::make_shared(std::move(stateDoc)); +} + +std::shared_ptr +SetChangeStreamStateCoordinatorService::getOrCreateInstance(OperationContext* opCtx, + BSONObj coorDoc) { + return [&] { + try { + auto [coordinator, _] = PrimaryOnlyService::getOrCreateInstance(opCtx, coorDoc, true); + return checked_pointer_cast(std::move(coordinator)); + } catch (const DBException& ex) { + LOGV2_ERROR(6663106, + "Failed to create coordinator instance", + "service"_attr = kSetChangeStreamStateCoordinatorServiceName, + "reason"_attr = ex); + throw; + } + }(); +} + +} // namespace mongo diff --git a/src/mongo/db/set_change_stream_state_coordinator.h b/src/mongo/db/set_change_stream_state_coordinator.h new file mode 100644 index 00000000000..aaa4d4305d6 --- /dev/null +++ b/src/mongo/db/set_change_stream_state_coordinator.h @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/primary_only_service_util.h" +#include "mongo/db/set_change_stream_state_coordinator_gen.h" + +namespace mongo { + +/** + * A 'PrimaryOnlyService::Instance' that runs in the replica sets and orchestrates the change stream + * requests in the serverless. + * + * At any time only one request is accepted, any subsequent request will be rejected by this + * instance type. + */ +class SetChangeStreamStateCoordinator : public DefaultPrimaryOnlyServiceInstance { +public: + explicit SetChangeStreamStateCoordinator(const BSONObj& stateDoc); + + const StringData getInstanceName() final; + + boost::optional reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode connMode, + MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept final; + + void checkIfOptionsConflict(const BSONObj& stateDoc) const final; + +private: + ExecutorFuture _runImpl(std::shared_ptr executor, + const CancellationToken& token) noexcept; + + void _removeStateDocument(OperationContext* opCtx) final; + + // The state document of the 'PrimaryOnlyService'. + SetChangeStreamStateCoordinatorDocument _stateDoc; + + // Stores the state document durably in the namespace 'config.change_stream_coordinator'. + PrimaryOnlyServiceStateStore _stateDocStore; +}; + +/** + * A 'PrimaryOnlyService' that creates and gets an instance of the + * 'SetChangeStreamStateCoordinator'. + */ +class SetChangeStreamStateCoordinatorService final : public repl::PrimaryOnlyService { +public: + explicit SetChangeStreamStateCoordinatorService(ServiceContext* serviceContext) + : repl::PrimaryOnlyService(serviceContext) {} + + ~SetChangeStreamStateCoordinatorService() = default; + + StringData getServiceName() const final; + + NamespaceString getStateDocumentsNS() const final; + + ThreadPool::Limits getThreadPoolLimits() const final; + + std::shared_ptr getOrCreateInstance(OperationContext* opCtx, + BSONObj coorDoc); + + static SetChangeStreamStateCoordinatorService* getService(OperationContext* opCtx); + + void checkIfConflictsWithOtherInstances( + OperationContext* opCtx, + BSONObj initialState, + const std::vector& existingInstances) final{}; + + std::shared_ptr constructInstance( + BSONObj initialState) final; +}; +} // namespace mongo diff --git a/src/mongo/db/set_change_stream_state_coordinator.idl b/src/mongo/db/set_change_stream_state_coordinator.idl new file mode 100644 index 00000000000..b3861888cdc --- /dev/null +++ b/src/mongo/db/set_change_stream_state_coordinator.idl @@ -0,0 +1,61 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# . +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/db/logical_session_id.idl" + - "mongo/idl/basic_types.idl" + +structs: + SetChangeStreamStateCoordinatorId: + description: "A specification that uniquely identifies each state document of the + 'SetChangeStreamStateCoordinatorService'." + generate_comparison_operators: false + strict: false + fields: + tenantId: + description: "Tenant id associated with the state document." + type: tenant_id + optional: true + + SetChangeStreamStateCoordinatorDocument: + description: "A specification that defines the schema of the + 'SetChangeStreamStateCoordinatorService' state document." + generate_comparison_operators: false + strict: false + fields: + _id: + description: "Defines the schema of the '_id' field of the state document." + cpp_name: id + type: SetChangeStreamStateCoordinatorId + command: + description: "The command to be executed by the 'SetChangeStreamStateCoordinator'" + type: object_owned \ No newline at end of file diff --git a/src/mongo/platform/mutex.h b/src/mongo/platform/mutex.h index 4a094048ebe..ed9ddab42ad 100644 --- a/src/mongo/platform/mutex.h +++ b/src/mongo/platform/mutex.h @@ -381,7 +381,7 @@ using Mutex = stdx::mutex; // NOLINT * Construct a mongo::Mutex using the result of MONGO_GET_LATCH_DATA with all arguments forwarded */ #ifndef MONGO_CONFIG_USE_RAW_LATCHES -#define MONGO_MAKE_LATCH(...) ::mongo::Mutex(MONGO_GET_LATCH_DATA(__VA_ARGS__)); +#define MONGO_MAKE_LATCH(...) ::mongo::Mutex(MONGO_GET_LATCH_DATA(__VA_ARGS__)) #else -#define MONGO_MAKE_LATCH(...) ::mongo::stdx::mutex(); // NOLINT +#define MONGO_MAKE_LATCH(...) ::mongo::stdx::mutex() // NOLINT #endif -- cgit v1.2.1