diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/db/auth/action_type.idl | 2 | ||||
-rw-r--r-- | src/mongo/db/auth/builtin_roles.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/change_stream_options.idl | 58 | ||||
-rw-r--r-- | src/mongo/db/change_stream_options_manager.cpp | 98 | ||||
-rw-r--r-- | src/mongo/db/change_stream_options_manager.h | 15 | ||||
-rw-r--r-- | src/mongo/db/change_stream_options_parameter.idl | 42 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/db/commands/change_stream_options.idl | 102 | ||||
-rw-r--r-- | src/mongo/db/commands/change_stream_options_command.cpp | 264 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h | 2 | ||||
-rw-r--r-- | src/mongo/db/query/query_feature_flags.idl | 5 |
14 files changed, 228 insertions, 408 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 043d2a0d512..c81942bc35f 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -490,13 +490,23 @@ env.Library( ) env.Library( + target='change_stream_options', + source=[ + 'change_stream_options.idl', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/idl/cluster_server_parameter', + ], +) + +env.Library( target='change_stream_options_manager', source=[ 'change_stream_options_manager.cpp', + 'change_stream_options_parameter.idl', ], LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/db/commands/change_stream_options', - '$BUILD_DIR/mongo/idl/feature_flag', + 'change_stream_options', ], ) diff --git a/src/mongo/db/auth/action_type.idl b/src/mongo/db/auth/action_type.idl index 6cfbd32d0a9..1d83378f275 100644 --- a/src/mongo/db/auth/action_type.idl +++ b/src/mongo/db/auth/action_type.idl @@ -96,7 +96,6 @@ enums: flushRouterConfig : "flushRouterConfig" forceUUID : "forceUUID" fsync : "fsync" - getChangeStreamOptions: "getChangeStreamOptions" getClusterParameter: "getClusterParameter" getDatabaseVersion : "getDatabaseVersion" getDefaultRWConcern : "getDefaultRWConcern" @@ -161,7 +160,6 @@ enums: runTenantMigration : "runTenantMigration" serverStatus : "serverStatus" setAuthenticationRestriction : "setAuthenticationRestriction" - setChangeStreamOptions: "setChangeStreamOptions" setClusterParameter: "setClusterParameter" setDefaultRWConcern : "setDefaultRWConcern" setFeatureCompatibilityVersion : "setFeatureCompatibilityVersion" diff --git a/src/mongo/db/auth/builtin_roles.cpp b/src/mongo/db/auth/builtin_roles.cpp index 27bb1ee6b54..2b0c63cb798 100644 --- a/src/mongo/db/auth/builtin_roles.cpp +++ b/src/mongo/db/auth/builtin_roles.cpp @@ -254,8 +254,6 @@ MONGO_INITIALIZER(AuthorizationBuiltinRoles)(InitializerContext* context) { << ActionType::setDefaultRWConcern << ActionType::setFeatureCompatibilityVersion << ActionType::setFreeMonitoring - << ActionType::setChangeStreamOptions - << ActionType::getChangeStreamOptions << ActionType::setClusterParameter << ActionType::getClusterParameter; diff --git a/src/mongo/db/change_stream_options.idl b/src/mongo/db/change_stream_options.idl new file mode 100644 index 00000000000..83e9d160868 --- /dev/null +++ b/src/mongo/db/change_stream_options.idl @@ -0,0 +1,58 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# Data structures for change stream configuration options. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + - "mongo/idl/cluster_server_parameter.idl" + +structs: + PreAndPostImagesOptions: + description: "Change streams pre- and post-images options." + fields: + expireAfterSeconds: + description: "The number of seconds after which a pre-image is eligible for + deletion. A string value 'off' enables the default expiration policy." + unstable: false + type: + variant: [string, safeInt64] + default: "\"off\"" + ChangeStreamOptions: + description: "A specification for the change streams options." + inline_chained_structs: true + chained_structs: + ClusterServerParameter: clusterServerParameter + fields: + preAndPostImages: + type: PreAndPostImagesOptions + unstable: false + default: true diff --git a/src/mongo/db/change_stream_options_manager.cpp b/src/mongo/db/change_stream_options_manager.cpp index b2f474777a8..4c27c32ec6f 100644 --- a/src/mongo/db/change_stream_options_manager.cpp +++ b/src/mongo/db/change_stream_options_manager.cpp @@ -32,6 +32,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/change_stream_options_manager.h" +#include "mongo/db/change_stream_options_parameter_gen.h" #include "mongo/logv2/log.h" namespace mongo { @@ -53,8 +54,7 @@ void ChangeStreamOptionsManager::create(ServiceContext* service) { getChangeStreamOptionsManager(service).emplace(service); } -boost::optional<ChangeStreamOptions> ChangeStreamOptionsManager::getOptions( - OperationContext* opCtx) { +const ChangeStreamOptions& ChangeStreamOptionsManager::getOptions(OperationContext* opCtx) { stdx::lock_guard<Latch> L(_mutex); return _changeStreamOptions; } @@ -63,7 +63,99 @@ StatusWith<ChangeStreamOptions> ChangeStreamOptionsManager::setOptions( OperationContext* opCtx, ChangeStreamOptions optionsToSet) { stdx::lock_guard<Latch> L(_mutex); _changeStreamOptions = std::move(optionsToSet); - return *_changeStreamOptions; + return _changeStreamOptions; +} + +void ChangeStreamOptionsParameter::append(OperationContext* opCtx, + BSONObjBuilder& bob, + const std::string& name) { + ChangeStreamOptionsManager& changeStreamOptionsManager = + ChangeStreamOptionsManager::get(getGlobalServiceContext()); + bob.append("_id"_sd, name); + bob.appendElementsUnique(changeStreamOptionsManager.getOptions(opCtx).toBSON()); +} + +Status ChangeStreamOptionsParameter::set(const BSONElement& newValueElement) { + try { + Status validateStatus = validate(newValueElement); + if (!validateStatus.isOK()) { + return validateStatus; + } + + ChangeStreamOptionsManager& changeStreamOptionsManager = + ChangeStreamOptionsManager::get(getGlobalServiceContext()); + ChangeStreamOptions newOptions = ChangeStreamOptions::parse( + IDLParserErrorContext("changeStreamOptions"), newValueElement.Obj()); + + return changeStreamOptionsManager + .setOptions(Client::getCurrent()->getOperationContext(), newOptions) + .getStatus(); + } catch (const AssertionException&) { + return {ErrorCodes::BadValue, "Could not parse changeStreamOptions parameter"}; + } +} + +Status ChangeStreamOptionsParameter::validate(const BSONElement& newValueElement) const { + try { + BSONObj changeStreamOptionsObj = newValueElement.Obj(); + Status validateStatus = Status::OK(); + + // PreAndPostImages currently contains a single field, `expireAfterSeconds`, that is + // default- initialized to 'off'. This is useful for parameter initialization at startup but + // causes the IDL parser to not enforce the presence of `expireAfterSeconds` in BSON + // representations. We assert that and the existence of PreAndPostImages here. + IDLParserErrorContext ctxt = IDLParserErrorContext("changeStreamOptions"_sd); + if (auto preAndPostImagesObj = changeStreamOptionsObj["preAndPostImages"_sd]; + !preAndPostImagesObj.eoo()) { + if (preAndPostImagesObj["expireAfterSeconds"_sd].eoo()) { + ctxt.throwMissingField("expireAfterSeconds"_sd); + } + } else { + ctxt.throwMissingField("preAndPostImages"_sd); + } + + ChangeStreamOptions newOptions = ChangeStreamOptions::parse(ctxt, changeStreamOptionsObj); + auto preAndPostImages = newOptions.getPreAndPostImages(); + stdx::visit( + visit_helper::Overloaded{ + [&](const std::string& expireAfterSeconds) { + if (expireAfterSeconds != "off"_sd) { + validateStatus = { + ErrorCodes::BadValue, + "Non-numeric value of 'expireAfterSeconds' should be 'off'"}; + } + }, + [&](const std::int64_t& expireAfterSeconds) { + if (expireAfterSeconds <= 0) { + validateStatus = { + ErrorCodes::BadValue, + "Numeric value of 'expireAfterSeconds' should be positive"}; + } + }, + }, + preAndPostImages.getExpireAfterSeconds()); + + return validateStatus; + } catch (const AssertionException& ex) { + return {ErrorCodes::BadValue, + str::stream() << "Failed parsing new changeStreamOptions value" << ex.reason()}; + } +} + +Status ChangeStreamOptionsParameter::reset() { + // Replace the current changeStreamOptions with a default-constructed one, which should + // automatically set preAndPostImages.expirationSeconds to 'off' by default. + ChangeStreamOptionsManager& changeStreamOptionsManager = + ChangeStreamOptionsManager::get(getGlobalServiceContext()); + return changeStreamOptionsManager + .setOptions(Client::getCurrent()->getOperationContext(), ChangeStreamOptions()) + .getStatus(); +} + +const LogicalTime ChangeStreamOptionsParameter::getClusterParameterTime() const { + ChangeStreamOptionsManager& changeStreamOptionsManager = + ChangeStreamOptionsManager::get(getGlobalServiceContext()); + return changeStreamOptionsManager.getClusterParameterTime(); } } // namespace mongo diff --git a/src/mongo/db/change_stream_options_manager.h b/src/mongo/db/change_stream_options_manager.h index f87258b11c8..ea1fe2a3b47 100644 --- a/src/mongo/db/change_stream_options_manager.h +++ b/src/mongo/db/change_stream_options_manager.h @@ -29,7 +29,7 @@ #pragma once -#include "mongo/db/commands/change_stream_options_gen.h" +#include "mongo/db/change_stream_options_gen.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/platform/mutex.h" @@ -63,9 +63,9 @@ public: static ChangeStreamOptionsManager& get(OperationContext* opCtx); /** - * Returns the change-streams options if present, boost::none otherwise. + * Returns the change-streams options. */ - boost::optional<ChangeStreamOptions> getOptions(OperationContext* opCtx); + const ChangeStreamOptions& getOptions(OperationContext* opCtx); /** * Sets the provided change-streams options. Returns OK on success, otherwise appropriate error @@ -74,11 +74,18 @@ public: StatusWith<ChangeStreamOptions> setOptions(OperationContext* opCtx, ChangeStreamOptions optionsToSet); + /** + * Returns the clusterParameterTime of the current change stream options. + */ + const LogicalTime& getClusterParameterTime() { + return _changeStreamOptions.getClusterParameterTime(); + } + private: ChangeStreamOptionsManager(const ChangeStreamOptionsManager&) = delete; ChangeStreamOptionsManager& operator=(const ChangeStreamOptionsManager&) = delete; - boost::optional<ChangeStreamOptions> _changeStreamOptions; + ChangeStreamOptions _changeStreamOptions; Mutex _mutex = MONGO_MAKE_LATCH("ChangeStreamOptionsManager::mutex"); }; diff --git a/src/mongo/db/change_stream_options_parameter.idl b/src/mongo/db/change_stream_options_parameter.idl new file mode 100644 index 00000000000..122e2793e3b --- /dev/null +++ b/src/mongo/db/change_stream_options_parameter.idl @@ -0,0 +1,42 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# Cluster server parameter for change streams configuration options. + +global: + cpp_namespace: "mongo" + +server_parameters: + changeStreamOptions: + description: "Cluster server parameter for change stream options" + set_at: cluster + cpp_class: + name: ChangeStreamOptionsParameter + override_set: true + override_validate: true +
\ No newline at end of file diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index a6138057096..6a08a6b7e9e 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -203,18 +203,6 @@ env.Library( ) env.Library( - target='change_stream_options', - source=[ - 'change_stream_options.idl', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/auth/auth', - '$BUILD_DIR/mongo/db/auth/authprivilege', - '$BUILD_DIR/mongo/db/read_write_concern_defaults', - ] -) - -env.Library( target="authentication_commands", source=[ 'authentication_commands.cpp', @@ -515,7 +503,6 @@ env.Library( target="mongod", source=[ "apply_ops_cmd.cpp", - "change_stream_options_command.cpp", "collection_to_capped.cpp", "compact.cpp", "cpuload.cpp", @@ -568,7 +555,6 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/index_key_validate', '$BUILD_DIR/mongo/db/change_stream_options_manager', '$BUILD_DIR/mongo/db/commands', - '$BUILD_DIR/mongo/db/commands/change_stream_options', '$BUILD_DIR/mongo/db/curop_failpoint_helpers', '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/exec/sbe_cmd', diff --git a/src/mongo/db/commands/change_stream_options.idl b/src/mongo/db/commands/change_stream_options.idl deleted file mode 100644 index 56146ad9049..00000000000 --- a/src/mongo/db/commands/change_stream_options.idl +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright (C) 2022-present MongoDB, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the Server Side Public License, version 1, -# as published by MongoDB, Inc. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Server Side Public License for more details. -# -# You should have received a copy of the Server Side Public License -# along with this program. If not, see -# <http://www.mongodb.com/licensing/server-side-public-license>. -# -# As a special exception, the copyright holders give permission to link the -# code of portions of this program with the OpenSSL library under certain -# conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the Server Side Public License in all respects for -# all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, -# delete this exception statement from your version. If you delete this -# exception statement from all source files in the program, then also delete -# it in the license file. -# - -# Commands to set and get change streams configuration options and associated data structures. - -global: - cpp_namespace: "mongo" - -imports: - - "mongo/db/auth/access_checks.idl" - - "mongo/db/auth/action_type.idl" - - "mongo/db/write_concern_options.idl" - - "mongo/idl/basic_types.idl" - -structs: - PreAndPostImagesOptions: - description: "Change streams pre- and post-images options." - fields: - expireAfterSeconds: - description: "The number of seconds after which a pre-image is eligible for - deletion. A string value 'off' enables the default expiration policy." - optional: true - unstable: false - type: - variant: [string, safeInt64] - ChangeStreamOptions: - description: "A specification for the change streams options." - fields: - preAndPostImages: - type: PreAndPostImagesOptions - optional: true - unstable: false - GetChangeStreamOptionsResponse: - description: "A response for the get change streams options command." - chained_structs: - ChangeStreamOptions: ChangeStreamOptions - -commands: - setChangeStreamOptions: - description: "A command to set the change streams options." - command_name: setChangeStreamOptions - namespace: ignored - cpp_name: setChangeStreamOptions - strict: true - api_version: "1" - access_check: - complex: - - check: is_authenticated - - privilege: - resource_pattern: cluster - action_type: setChangeStreamOptions - fields: - preAndPostImages: - description: "The pre- and post-images options to set." - type: PreAndPostImagesOptions - optional: true - unstable: false - writeConcern: - description: "The level of write concern for the command." - type: WriteConcern - optional: true - unstable: false - reply_type: OkReply - getChangeStreamOptions: - description: "A command to get the change streams options." - command_name: getChangeStreamOptions - namespace: ignored - cpp_name: getChangeStreamOptions - strict: true - api_version: "1" - access_check: - complex: - - check: is_authenticated - - privilege: - resource_pattern: cluster - action_type: getChangeStreamOptions - reply_type: GetChangeStreamOptionsResponse
\ No newline at end of file diff --git a/src/mongo/db/commands/change_stream_options_command.cpp b/src/mongo/db/commands/change_stream_options_command.cpp deleted file mode 100644 index 7fad14c0ac7..00000000000 --- a/src/mongo/db/commands/change_stream_options_command.cpp +++ /dev/null @@ -1,264 +0,0 @@ -/** - * Copyright (C) 2022-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand - -#include "mongo/platform/basic.h" - -#include "mongo/db/auth/authorization_checks.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/change_stream_options_manager.h" -#include "mongo/db/commands.h" -#include "mongo/db/commands/change_stream_options_gen.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/logv2/log.h" - -namespace mongo { -namespace { -/** - * A versioned command class to set the change streams options. This command should not run - * on 'mongoS'. - */ -class SetChangeStreamOptionsCommand - : public SetChangeStreamOptionsCmdVersion1Gen<SetChangeStreamOptionsCommand> { -public: - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kNever; - } - - bool adminOnly() const final { - return true; - } - - std::string help() const override { - return "Sets change streams configuration options.\n" - "Usage: { setChangeStreamOptions: 1, preAndPostImages: { expireAfterSeconds: " - "<long>|'off'> }, writeConcern: { <write concern> }}"; - } - - class Invocation final : public InvocationBaseGen { - public: - Invocation(OperationContext* opCtx, - const Command* command, - const OpMsgRequest& opMsgRequest) - : InvocationBaseGen(opCtx, command, opMsgRequest) {} - - bool supportsWriteConcern() const final { - return true; - } - - NamespaceString ns() const final { - return NamespaceString(); - } - - Reply typedRun(OperationContext* opCtx) final { - assertCanIssueCommand(opCtx); - return setOptionsAndReply(opCtx); - } - - private: - /** - * A helper to verify if the command can be run. Throws 'uassert' in case of failures. - */ - void assertCanIssueCommand(OperationContext* opCtx) { - const auto replCoord = repl::ReplicationCoordinator::get(opCtx); - - uassert(5869200, - str::stream() << "'" << SetChangeStreamOptions::kCommandName - << "' is not supported on standalone nodes.", - replCoord->isReplEnabled()); - - uassert(5869201, - str::stream() << "'" << SetChangeStreamOptions::kCommandName - << "' is not supported on shard nodes.", - serverGlobalParams.clusterRole != ClusterRole::ShardServer); - - uassert(5869202, - "Expected at least one change stream option to set", - request().getPreAndPostImages()); - } - - /** - * Sets the change streams options using 'changeStreamOptionsManager'. - */ - Reply setOptionsAndReply(OperationContext* opCtx) { - // Create a request object for the 'changeStreamOptionsManager'. - auto optionsToSet = ChangeStreamOptions(); - - if (auto& preAndPostImage = request().getPreAndPostImages()) { - uassert(5869203, - "Expected 'expireAfterSeconds' option", - preAndPostImage->getExpireAfterSeconds()); - - stdx::visit( - visit_helper::Overloaded{ - [&](const std::string& expirationPolicyMode) { - uassert(5869204, - "Non-numeric value of 'expireAfterSeconds' should be 'off'", - expirationPolicyMode == "off"); - }, - [&](const std::int64_t& expiryTime) { - uassert(5869205, - "Numeric value of 'expireAfterSeconds' should be positive", - expiryTime > 0); - }}, - *preAndPostImage->getExpireAfterSeconds()); - - optionsToSet.setPreAndPostImages(preAndPostImage); - } - - // Store the change streams configuration options. - auto& changeStreamOptionManager = ChangeStreamOptionsManager::get(opCtx); - auto status = changeStreamOptionManager.setOptions(opCtx, optionsToSet); - uassert(5869206, - str::stream() << "Failed to set change stream options, status: " - << status.getStatus().codeString(), - status.isOK()); - - return Reply(); - } - - void doCheckAuthorization(OperationContext* opCtx) const override { - uassert(ErrorCodes::Unauthorized, - "Feature flag featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy " - "must be enabled", - feature_flags::gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy - .isEnabled(serverGlobalParams.featureCompatibility)); - - uassert(ErrorCodes::Unauthorized, - "Unauthorized", - AuthorizationSession::get(opCtx->getClient()) - ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(), - ActionType::setChangeStreamOptions})); - } - }; -} setChangeStreamOptionsCommand; - -/** - * A versioned command class to get the change streams options. This command should not run on - * 'mongoS'. - */ -class GetChangeStreamOptionsCommand - : public GetChangeStreamOptionsCmdVersion1Gen<GetChangeStreamOptionsCommand> { -public: - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kNever; - } - - bool adminOnly() const override { - return true; - } - - std::string help() const final { - return "Gets change streams configuration options.\n" - "Usage: { getChangeStreamOptions: 1 }"; - } - - class Invocation final : public InvocationBaseGen { - public: - Invocation(OperationContext* opCtx, - const Command* command, - const OpMsgRequest& opMsgRequest) - : InvocationBaseGen(opCtx, command, opMsgRequest) {} - - bool supportsWriteConcern() const final { - return false; - } - - NamespaceString ns() const final { - return NamespaceString(); - } - - Reply typedRun(OperationContext* opCtx) final { - assertCanIssueCommand(opCtx); - return getOptionsAndReply(opCtx); - } - - private: - /** - * A helper to verify if the command can be run. Throws 'uassert' in case of failures. - */ - void assertCanIssueCommand(OperationContext* opCtx) { - const auto replCoord = repl::ReplicationCoordinator::get(opCtx); - - uassert(5869207, - str::stream() << "'" << GetChangeStreamOptions::kCommandName - << "' is not supported on standalone nodes.", - replCoord->isReplEnabled()); - - uassert(5869208, - str::stream() << "'" << GetChangeStreamOptions::kCommandName - << "' is not supported on shard nodes.", - serverGlobalParams.clusterRole != ClusterRole::ShardServer); - } - - /** - * Gets the change streams options from the 'ChangeStreamOptionsManager', creates a response - * from it, and return it back to the client. - */ - Reply getOptionsAndReply(OperationContext* opCtx) { - auto reply = Reply(); - - // Get the change streams options, if present and return it back to the client. - auto& changeStreamOptionManager = ChangeStreamOptionsManager::get(opCtx); - - if (auto changeStreamOptions = changeStreamOptionManager.getOptions(opCtx)) { - if (auto preAndPostImages = changeStreamOptions->getPreAndPostImages()) { - // Add 'expiredAfterSeconds' to the reply message only when the default - // expiration policy is not enabled. A string type of 'expireAfterSeconds' - // signifies that the value it holds is 'off'. - if (preAndPostImages->getExpireAfterSeconds() && - !stdx::holds_alternative<std::string>( - *preAndPostImages->getExpireAfterSeconds())) { - reply.setChangeStreamOptions(std::move(*changeStreamOptions)); - } - } - } - - return reply; - } - - void doCheckAuthorization(OperationContext* opCtx) const override { - uassert(ErrorCodes::Unauthorized, - "Feature flag featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy " - "must be enabled", - feature_flags::gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy - .isEnabled(serverGlobalParams.featureCompatibility)); - - uassert(ErrorCodes::Unauthorized, - "Unauthorized", - AuthorizationSession::get(opCtx->getClient()) - ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(), - ActionType::getChangeStreamOptions})); - } - }; -} getChangeStreamOptionsCommand; - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index bb37a5b15ba..6b54a25e0b8 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -645,8 +645,8 @@ env.CppUnitTest( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/change_stream_options', '$BUILD_DIR/mongo/db/change_stream_options_manager', - '$BUILD_DIR/mongo/db/commands/change_stream_options', '$BUILD_DIR/mongo/db/cst/cst', '$BUILD_DIR/mongo/db/exec/document_value/document_value', '$BUILD_DIR/mongo/db/exec/document_value/document_value_test_util', diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp index f015f39680c..02a6b932ffd 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp @@ -70,13 +70,14 @@ bool PreImageAttributes::isExpiredPreImage(const boost::optional<Date_t>& preIma return preImageOplogEntryIsDeleted || operationTime <= expirationTime; } -// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if present, boost::none otherwise. +// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if not 'off', boost::none otherwise. boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions( ChangeStreamOptions& changeStreamOptions) { - if (auto preAndPostImages = changeStreamOptions.getPreAndPostImages(); preAndPostImages && - preAndPostImages->getExpireAfterSeconds() && - !stdx::holds_alternative<std::string>(*preAndPostImages->getExpireAfterSeconds())) { - return stdx::get<std::int64_t>(*preAndPostImages->getExpireAfterSeconds()); + const stdx::variant<std::string, std::int64_t>& expireAfterSeconds = + changeStreamOptions.getPreAndPostImages().getExpireAfterSeconds(); + + if (!stdx::holds_alternative<std::string>(expireAfterSeconds)) { + return stdx::get<std::int64_t>(expireAfterSeconds); } return boost::none; @@ -88,9 +89,8 @@ boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_ boost::optional<std::int64_t> expireAfterSeconds = boost::none; // Get the expiration time directly from the change stream manager. - if (auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx)) { - expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(*changeStreamOptions); - } + auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx); + expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(changeStreamOptions); // A pre-image is eligible for deletion if: // pre-image's op-time + expireAfterSeconds < currentTime. diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h index 89849552b0a..0ddd491991f 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h @@ -29,7 +29,7 @@ #pragma once -#include "mongo/db/commands/change_stream_options_gen.h" +#include "mongo/db/change_stream_options_gen.h" #include "mongo/db/service_context.h" namespace mongo { diff --git a/src/mongo/db/query/query_feature_flags.idl b/src/mongo/db/query/query_feature_flags.idl index a0d7ce2c14c..4dcbb2382a8 100644 --- a/src/mongo/db/query/query_feature_flags.idl +++ b/src/mongo/db/query/query_feature_flags.idl @@ -124,11 +124,6 @@ feature_flags: default: true version: 6.0 - featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy: - description: "Feature flag to enable time based retention policy of point-in-time pre- and post-images of documents in change streams" - cpp_varname: gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy - default: false - featureFlagSBELookupPushdown: description: "Feature flag for allowing SBE $lookup pushdown" cpp_varname: gFeatureFlagSBELookupPushdown |