diff options
38 files changed, 585 insertions, 841 deletions
diff --git a/buildscripts/idl/idl/binder.py b/buildscripts/idl/idl/binder.py index 1b11d1af4f7..1511b6c1b07 100644 --- a/buildscripts/idl/idl/binder.py +++ b/buildscripts/idl/idl/binder.py @@ -438,7 +438,7 @@ def _bind_struct_field(ctxt, ast_field, idl_type): ast_field.type = _bind_struct_type(struct) ast_field.type.is_array = isinstance(idl_type, syntax.ArrayType) - _validate_field_of_type_struct(ctxt, ast_field) + _validate_default_of_type_struct(ctxt, ast_field) def _bind_variant_field(ctxt, ast_field, idl_type): @@ -747,11 +747,11 @@ def _validate_ignored_field(ctxt, field): ctxt.add_ignored_field_must_be_empty_error(field, field.name, "default") -def _validate_field_of_type_struct(ctxt, field): +def _validate_default_of_type_struct(ctxt, field): # type: (errors.ParserContext, Union[syntax.Field, ast.Field]) -> None - """Validate that for fields with a type of struct, no other properties are set.""" - if field.default is not None: - ctxt.add_struct_field_must_be_empty_error(field, field.name, "default") + """Validate that for fields with a type of struct, the only default permitted is true, which causes it to be default-constructed.""" + if (field.default is not None) and (field.default != "true"): + ctxt.add_struct_default_must_be_true_or_empty_error(field, field.name) def _validate_variant_type(ctxt, syntax_symbol, field): diff --git a/buildscripts/idl/idl/errors.py b/buildscripts/idl/idl/errors.py index f1c387df2f7..4efd18797f9 100644 --- a/buildscripts/idl/idl/errors.py +++ b/buildscripts/idl/idl/errors.py @@ -63,7 +63,7 @@ ERROR_ID_BAD_BSON_BINDATA_SUBTYPE_TYPE = "ID0015" ERROR_ID_BAD_BSON_BINDATA_SUBTYPE_VALUE = "ID0016" ERROR_ID_NO_STRINGDATA = "ID0017" ERROR_ID_FIELD_MUST_BE_EMPTY_FOR_IGNORED = "ID0018" -ERROR_ID_FIELD_MUST_BE_EMPTY_FOR_STRUCT = "ID0019" +ERROR_ID_DEFAULT_MUST_BE_TRUE_OR_EMPTY_FOR_STRUCT = "ID0019" ERROR_ID_CUSTOM_SCALAR_SERIALIZATION_NOT_SUPPORTED = "ID0020" ERROR_ID_BAD_ANY_TYPE_USE = "ID0021" ERROR_ID_BAD_NUMERIC_CPP_TYPE = "ID0022" @@ -462,14 +462,13 @@ class ParserContext(object): ("Field '%s' cannot contain a value for property '%s' when a field is marked as ignored" ) % (name, field_name)) - def add_struct_field_must_be_empty_error(self, location, name, field_name): - # type: (common.SourceLocation, str, str) -> None - """Add an error about field must be empty for fields of type struct.""" + def add_struct_default_must_be_true_or_empty_error(self, location, name): + # type: (common.SourceLocation, str) -> None + """Add an error about default must be True or empty for fields of type struct.""" # pylint: disable=invalid-name - self._add_error( - location, ERROR_ID_FIELD_MUST_BE_EMPTY_FOR_STRUCT, - ("Field '%s' cannot contain a value for property '%s' when a field's type is a struct") - % (name, field_name)) + self._add_error(location, ERROR_ID_DEFAULT_MUST_BE_TRUE_OR_EMPTY_FOR_STRUCT, ( + "Field '%s' can only contain value 'true' for property 'default' when a field's type is a struct" + ) % (name)) def add_not_custom_scalar_serialization_not_supported_error(self, location, ast_type, ast_parent, bson_type_name): diff --git a/buildscripts/idl/idl/generator.py b/buildscripts/idl/idl/generator.py index 6c3bc4948cc..d8b38d51a0c 100644 --- a/buildscripts/idl/idl/generator.py +++ b/buildscripts/idl/idl/generator.py @@ -617,10 +617,14 @@ class _CppHeaderFileWriter(_CppFileWriterBase): member_type = cpp_type_info.get_storage_type() member_name = _get_field_member_name(field) + # Struct fields are allowed to specify default: true so that the member gets default- + # constructed. if field.default and not field.constructed: if field.type.is_enum: self._writer.write_line('%s %s{%s::%s};' % (member_type, member_name, field.type.cpp_type, field.default)) + elif field.type.is_struct: + self._writer.write_line('%s %s;' % (member_type, member_name)) else: self._writer.write_line('%s %s{%s};' % (member_type, member_name, field.default)) else: diff --git a/buildscripts/idl/idl_check_compatibility.py b/buildscripts/idl/idl_check_compatibility.py index d34dbc83409..9aab17079f3 100644 --- a/buildscripts/idl/idl_check_compatibility.py +++ b/buildscripts/idl/idl_check_compatibility.py @@ -191,6 +191,15 @@ SKIPPED_FILES = [ "nsICollation.idl", "nsIStringBundle.idl", "nsIScriptableUConv.idl", "nsITextToSubURI.idl" ] +# Do not add commands that were visible to users in previously released versions. +IGNORE_COMMANDS_LIST: List[str] = [ + # The following commands were released behind a feature flag in 5.3 but were shelved in + # favor of getClusterParameter and setClusterParameter. Since the feature flag was not enabled + # in 5.3, they were effectively unusable and so can be safely removed from the strict API. + 'getChangeStreamOptions', + 'setChangeStreamOptions', +] + class FieldCompatibility: """Information about a Field to check compatibility.""" @@ -1193,6 +1202,12 @@ def check_compatibility(old_idl_dir: str, new_idl_dir: str, old_import_directori if old_cmd.api_version == "" or old_cmd.imported: continue + # Ignore select commands that were removed after being added to the strict API. + # Only commands that were never visible to the end-user in previous releases + # (i.e., hidden behind a feature flag) should be allowed here. + if old_cmd.command_name in IGNORE_COMMANDS_LIST: + continue + if old_cmd.api_version != "1": # We're not ready to handle future API versions yet. ctxt.add_command_invalid_api_version_error( diff --git a/buildscripts/idl/tests/test_binder.py b/buildscripts/idl/tests/test_binder.py index 2ce3dd45559..b52c755e34b 100644 --- a/buildscripts/idl/tests/test_binder.py +++ b/buildscripts/idl/tests/test_binder.py @@ -942,6 +942,23 @@ class TestBinder(testcase.IDLTestcase): always_serialize: true """)) + # Test field of a struct type with default=true + self.assert_bind(test_preamble + textwrap.dedent(""" + structs: + foo: + description: foo + fields: + field1: string + + bar: + description: foo + fields: + field2: + type: foo + default: true + + """)) + def test_field_negative(self): # type: () -> None """Negative field tests.""" @@ -964,7 +981,7 @@ class TestBinder(testcase.IDLTestcase): bindata_subtype: uuid """) - # Test field of a struct type with a default + # Test field of a struct type with a non-true default self.assert_bind_fail( test_preamble + textwrap.dedent(""" structs: @@ -980,7 +997,7 @@ class TestBinder(testcase.IDLTestcase): type: foo default: foo - """), idl.errors.ERROR_ID_FIELD_MUST_BE_EMPTY_FOR_STRUCT) + """), idl.errors.ERROR_ID_DEFAULT_MUST_BE_TRUE_OR_EMPTY_FOR_STRUCT) # Test array as field name self.assert_bind_fail( diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index 8b5882ecd30..4bded0416d6 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -364,7 +364,7 @@ let viewsCommandTests = { fsyncUnlock: {skip: isUnrelated}, getAuditConfig: {skip: isUnrelated}, getDatabaseVersion: {skip: isUnrelated}, - getChangeStreamOptions: {skip: isUnrelated}, + getChangeStreamOptions: {skip: isUnrelated}, // TODO SERVER-65353 remove in 6.1. getClusterParameter: {skip: isUnrelated}, getCmdLineOpts: {skip: isUnrelated}, getDefaultRWConcern: {skip: isUnrelated}, @@ -605,7 +605,7 @@ let viewsCommandTests = { saslStart: {skip: isUnrelated}, sbe: {skip: isAnInternalCommand}, serverStatus: {command: {serverStatus: 1}, skip: isUnrelated}, - setChangeStreamOptions: {skip: isUnrelated}, + setChangeStreamOptions: {skip: isUnrelated}, // TODO SERVER-65353 remove in 6.1. setIndexCommitQuorum: {skip: isUnrelated}, setAuditConfig: {skip: isUnrelated}, setCommittedSnapshot: {skip: isAnInternalCommand}, diff --git a/jstests/libs/cluster_server_parameter_utils.js b/jstests/libs/cluster_server_parameter_utils.js index 4e778e321f6..4b9e71ef4ef 100644 --- a/jstests/libs/cluster_server_parameter_utils.js +++ b/jstests/libs/cluster_server_parameter_utils.js @@ -11,8 +11,12 @@ * */ -const clusterParameterNames = - ["testStrClusterParameter", "testIntClusterParameter", "testBoolClusterParameter"]; +const clusterParameterNames = [ + "testStrClusterParameter", + "testIntClusterParameter", + "testBoolClusterParameter", + "changeStreamOptions" +]; const clusterParametersDefault = [ { _id: "testStrClusterParameter", @@ -25,6 +29,12 @@ const clusterParametersDefault = [ { _id: "testBoolClusterParameter", boolData: false, + }, + { + _id: "changeStreamOptions", + preAndPostImages: { + expireAfterSeconds: "off", + }, } ]; @@ -40,6 +50,12 @@ const clusterParametersInsert = [ { _id: "testBoolClusterParameter", boolData: true, + }, + { + _id: "changeStreamOptions", + preAndPostImages: { + expireAfterSeconds: 30, + }, } ]; @@ -55,6 +71,12 @@ const clusterParametersUpdate = [ { _id: "testBoolClusterParameter", boolData: false, + }, + { + _id: "changeStreamOptions", + preAndPostImages: { + expireAfterSeconds: "off", + }, } ]; @@ -226,6 +248,8 @@ function testValidClusterParameterCommands(conn) { function testInvalidGetClusterParameter(conn) { const adminDB = conn.getDB('admin'); // Assert that specifying a nonexistent parameter returns an error. + assert.commandFailed( + adminDB.runCommand({setClusterParameter: {nonexistentParam: {intData: 5}}})); assert.commandFailedWithCode(adminDB.runCommand({getClusterParameter: "nonexistentParam"}), ErrorCodes.NoSuchKey); assert.commandFailedWithCode(adminDB.runCommand({getClusterParameter: ["nonexistentParam"]}), @@ -233,6 +257,9 @@ function testInvalidGetClusterParameter(conn) { assert.commandFailedWithCode( adminDB.runCommand({getClusterParameter: ["testIntClusterParameter", "nonexistentParam"]}), ErrorCodes.NoSuchKey); + + // Assert that specifying a known parameter with a scalar value fails. + assert.commandFailed(adminDB.runCommand({setClusterParameter: {testIntClusterParameter: 5}})); } // Tests that invalid uses of set/getClusterParameter fail with the appropriate errors. diff --git a/jstests/noPassthrough/change_stream_options.js b/jstests/noPassthrough/change_stream_options.js new file mode 100644 index 00000000000..abc518b9f69 --- /dev/null +++ b/jstests/noPassthrough/change_stream_options.js @@ -0,0 +1,183 @@ +// Tests setClusterParameter and getClusterParameter for changeStreamOptions on standalone, replica +// set and sharded cluster configurations. +// @tags: [ +// requires_fcv_60, +// featureFlagClusterWideConfig, +// ] +(function() { +"use strict"; + +const testDBName = jsTestName(); + +// Tests set and get change stream options command with 'admin' database. +function testChangeStreamOptionsWithAdminDB(conn) { + const adminDB = conn.getDB("admin"); + + // A set command without any parameter options should fail. + assert.commandFailedWithCode( + adminDB.runCommand({setClusterParameter: {changeStreamOptions: {}}}), ErrorCodes.BadValue); + + // A set request with empty 'preAndPostImages' should fail. + assert.commandFailedWithCode( + adminDB.runCommand({setClusterParameter: {changeStreamOptions: {preAndPostImages: {}}}}), + ErrorCodes.BadValue); + + // An invalid string value of 'expireAfterSeconds' should fail. + assert.commandFailedWithCode(adminDB.runCommand({ + setClusterParameter: + {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: "unknown"}}} + }), + ErrorCodes.BadValue); + + // A negative value of 'expireAfterSeconds' should fail. + assert.commandFailedWithCode(adminDB.runCommand({ + setClusterParameter: {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: -1}}} + }), + ErrorCodes.BadValue); + + // A zero value of 'expireAfterSeconds' should fail. + assert.commandFailedWithCode(adminDB.runCommand({ + setClusterParameter: {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: 0}}} + }), + ErrorCodes.BadValue); + + // Disable purging of expired pre- and post-images and validate that the get change stream + // options retrieves the expected options. + assert.commandWorked(adminDB.runCommand({ + setClusterParameter: {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: "off"}}} + })); + const response1 = + assert.commandWorked(adminDB.runCommand({getClusterParameter: "changeStreamOptions"})); + assert.eq( + response1.clusterParameters[0].preAndPostImages, {expireAfterSeconds: "off"}, response1); + + // Set the expiration time for pre- and post-images and validate get change stream options + // command. + assert.commandWorked(adminDB.runCommand({ + setClusterParameter: + {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: NumberLong(10)}}} + })); + const response2 = + assert.commandWorked(adminDB.runCommand({getClusterParameter: "changeStreamOptions"})); + assert.eq(response2.clusterParameters[0].preAndPostImages, + {expireAfterSeconds: NumberLong(10)}, + response2); +} + +// Tests the set and get change stream options on the standalone configuration. +(function testChangeStreamOptionsOnStandalone() { + const standalone = MongoRunner.runMongod(); + const adminDB = standalone.getDB("admin"); + + // Verify that the set and get commands cannot be issued on a standalone server. + assert.commandFailedWithCode(adminDB.runCommand({ + setClusterParameter: + {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: NumberLong(10)}}} + }), + ErrorCodes.IllegalOperation); + assert.commandFailedWithCode(adminDB.runCommand({getClusterParameter: "changeStreamOptions"}), + ErrorCodes.IllegalOperation); + + MongoRunner.stopMongod(standalone); +})(); + +// Tests the set and get change stream options on the replica-set. +(function testChangeStreamOptionsOnReplicaSet() { + const replSetTest = new ReplSetTest({name: "replSet", nodes: 2}); + replSetTest.startSet(); + replSetTest.initiate(); + + const primary = replSetTest.getPrimary(); + const secondary = replSetTest.getSecondaries()[0]; + + // Verify that the set and get commands cannot be issued on database other than the 'admin'. + [primary, secondary].forEach(conn => { + assert.commandFailedWithCode(conn.getDB(testDBName).runCommand({ + setClusterParameter: + {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: NumberLong(10)}}} + }), + ErrorCodes.Unauthorized); + + assert.commandFailedWithCode( + conn.getDB(testDBName).runCommand({getClusterParameter: "changeStreamOptions"}), + ErrorCodes.Unauthorized); + }); + + // Tests the set and get commands on the primary node. + testChangeStreamOptionsWithAdminDB(primary); + + replSetTest.stopSet(); +})(); + +// Tests the set and get change stream options on the sharded cluster. +(function testChangeStreamOptionsOnShardedCluster() { + const shardingTest = new ShardingTest({shards: 1, mongos: 1}); + const adminDB = shardingTest.rs0.getPrimary().getDB("admin"); + + // Test that setClusterParameter cannot be issued directly on shards in the sharded cluster, + // while getClusterParameter can. + assert.commandFailedWithCode(adminDB.runCommand({ + setClusterParameter: + {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: NumberLong(10)}}} + }), + ErrorCodes.NotImplemented); + assert.commandWorked(adminDB.runCommand({getClusterParameter: "changeStreamOptions"})); + + // Run the set and get commands on the mongoS. + testChangeStreamOptionsWithAdminDB(shardingTest.s); + + shardingTest.stop(); +})(); + +// Tests that setClusterParameter and getClusterParameter can only be executed by user with +// privilege actions 'setClusterParameter' and 'getClusterParameter' respectively. +(function testClusterParameterChangeStreamOptionsForAuthorization() { + const replSetTest = + new ReplSetTest({name: "shard", nodes: 1, useHostName: true, waitForKeys: false}); + replSetTest.startSet({keyFile: "jstests/libs/key1"}); + replSetTest.initiate(); + + const primary = replSetTest.getPrimary(); + + // Create a user with admin role on 'admin' database. + primary.getDB("admin").createUser({ + user: "adminUser", + pwd: "adminUser", + roles: [{role: "userAdminAnyDatabase", db: "admin"}] + }); + + // Verify that the admin user is unauthorized to execute set and get change stream options + // command. + assert(primary.getDB("admin").auth("adminUser", "adminUser")); + assert.commandFailedWithCode(primary.getDB("admin").runCommand({ + setClusterParameter: + {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: NumberLong(10)}}} + }), + ErrorCodes.Unauthorized); + assert.commandFailedWithCode( + primary.getDB("admin").runCommand({getClusterParameter: "changeStreamOptions"}), + ErrorCodes.Unauthorized); + + // Create a user with cluster admin role on 'admin' database. + primary.getDB("admin").createUser({ + user: "clusterManager", + pwd: "clusterManager", + roles: [{role: "clusterManager", db: "admin"}] + }); + + primary.getDB("admin").logout(); + + // Verify that the cluster manager user is authorized to execute set and get change stream + // options command. + assert(primary.getDB("admin").auth("clusterManager", "clusterManager")); + assert.commandWorked(primary.getDB("admin").runCommand({ + setClusterParameter: + {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: NumberLong(10)}}} + })); + assert.commandWorked( + primary.getDB("admin").runCommand({getClusterParameter: "changeStreamOptions"})); + primary.getDB("admin").logout(); + + replSetTest.stopSet(); +})(); +}()); diff --git a/jstests/noPassthrough/change_stream_options_command.js b/jstests/noPassthrough/change_stream_options_command.js deleted file mode 100644 index 69d2cab6c8c..00000000000 --- a/jstests/noPassthrough/change_stream_options_command.js +++ /dev/null @@ -1,158 +0,0 @@ -// Tests commands to set and get change stream options on standalone, replica set and sharded -// cluster configuration. -// @tags: [ -// requires_fcv_53, -// featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy, -// ] -(function() { -"use strict"; - -const testDBName = jsTestName(); - -// Tests set and get change stream options command with 'admin' database. -function testChangeStreamOptionsWithAdminDB(conn) { - const adminDB = conn.getDB("admin"); - - // A set command without any parameter options should fail. - assert.commandFailedWithCode(adminDB.runCommand({setChangeStreamOptions: 1}), 5869202); - - // A set request with empty 'preAndPostImages' should fail. - assert.commandFailedWithCode( - adminDB.runCommand({setChangeStreamOptions: 1, preAndPostImages: {}}), 5869203); - - // An invalid string value of 'expireAfterSeconds' should fail. - assert.commandFailedWithCode( - adminDB.runCommand( - {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: "unknown"}}), - 5869204); - - // A negative value of 'expireAfterSeconds' should fail. - assert.commandFailedWithCode( - adminDB.runCommand({setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: -1}}), - 5869205); - - // A zero value of 'expireAfterSeconds' should fail. - assert.commandFailedWithCode( - adminDB.runCommand({setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: 0}}), - 5869205); - - // Disable purging of expired pre- and post-images and validate that the get change stream - // options retrieves the expected options. - assert.commandWorked(adminDB.runCommand( - {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: "off"}})); - const response1 = assert.commandWorked(adminDB.runCommand({getChangeStreamOptions: 1})); - assert.neq(response1.hasOwnProperty("preAndPostImages"), response1); - - // Set the expiration time for pre- and post-images and validate get change stream options - // command. - assert.commandWorked(adminDB.runCommand( - {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: 10}})); - const response2 = assert.commandWorked(adminDB.runCommand({getChangeStreamOptions: 1})); - assert.eq(response2.preAndPostImages, {expireAfterSeconds: NumberLong(10)}, response2); -} - -// Tests the set and get change stream options on the standalone configuration. -(function testChangeStreamOptionsOnStandalone() { - const standalone = MongoRunner.runMongod(); - const adminDB = standalone.getDB("admin"); - - // Verify that the set and get commands cannot be issued on a standalone server. - assert.commandFailedWithCode( - adminDB.runCommand({setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: 10}}), - 5869200); - assert.commandFailedWithCode(adminDB.runCommand({getChangeStreamOptions: 1}), 5869207); - - MongoRunner.stopMongod(standalone); -})(); - -// Tests the set and get change stream options on the replica-set. -(function testChangeStreamOptionsOnReplicaSet() { - const replSetTest = new ReplSetTest({name: "replSet", nodes: 2}); - replSetTest.startSet(); - replSetTest.initiate(); - - const primary = replSetTest.getPrimary(); - const secondary = replSetTest.getSecondaries()[0]; - - // Verify that the set and get commands cannot be issued on database other than the 'admin'. - [primary, secondary].forEach(conn => { - assert.commandFailedWithCode(conn.getDB(testDBName).runCommand({ - setChangeStreamOptions: 1, - preAndPostImages: {expireAfterSeconds: 10} - }), - ErrorCodes.Unauthorized); - - assert.commandFailedWithCode(conn.getDB(testDBName).runCommand({getChangeStreamOptions: 1}), - ErrorCodes.Unauthorized); - }); - - // Tests the set and get commands on the primary node. - testChangeStreamOptionsWithAdminDB(primary); - - replSetTest.stopSet(); -})(); - -// Tests the set and get change stream options on the sharded cluster. -(function testChangeStreamOptionsOnShardedCluster() { - const shardingTest = new ShardingTest({shards: 1, mongos: 1}); - const adminDB = shardingTest.rs0.getPrimary().getDB("admin"); - - // Test that set and get commands cannot be issued directly on shards in the sharded cluster. - assert.commandFailedWithCode( - adminDB.runCommand({setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: 10}}), - 5869201); - assert.commandFailedWithCode(adminDB.runCommand({getChangeStreamOptions: 1}), 5869208); - - // Run the set and get commands on the mongoS. - testChangeStreamOptionsWithAdminDB(shardingTest.s); - - shardingTest.stop(); -})(); - -// Tests that set and get change stream options command can only be executed by user with privilege -// actions 'setChangeStreamOptions' and 'getChangeStreamOptions' respectively. -(function testChangeStreamOptionsForAuthorization() { - const replSetTest = - new ReplSetTest({name: "shard", nodes: 1, useHostName: true, waitForKeys: false}); - replSetTest.startSet({keyFile: "jstests/libs/key1"}); - replSetTest.initiate(); - - const primary = replSetTest.getPrimary(); - - // Create a user with admin role on 'admin' database. - primary.getDB("admin").createUser({ - user: "adminUser", - pwd: "adminUser", - roles: [{role: "userAdminAnyDatabase", db: "admin"}] - }); - - // Verify that the admin user is unauthorized to execute set and get change stream options - // command. - assert(primary.getDB("admin").auth("adminUser", "adminUser")); - assert.commandFailedWithCode( - primary.getDB("admin").runCommand( - {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: 10}}), - ErrorCodes.Unauthorized); - assert.commandFailedWithCode(primary.getDB("admin").runCommand({getChangeStreamOptions: 1}), - ErrorCodes.Unauthorized); - - // Create a user with cluster admin role on 'admin' database. - primary.getDB("admin").createUser({ - user: "clusterManager", - pwd: "clusterManager", - roles: [{role: "clusterManager", db: "admin"}] - }); - - primary.getDB("admin").logout(); - - // Verify that the cluster manager user is authorized to execute set and get change stream - // options command. - assert(primary.getDB("admin").auth("clusterManager", "clusterManager")); - assert.commandWorked(primary.getDB("admin").runCommand( - {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: 10}})); - assert.commandWorked(primary.getDB("admin").runCommand({getChangeStreamOptions: 1})); - primary.getDB("admin").logout(); - - replSetTest.stopSet(); -})(); -}()); diff --git a/jstests/noPassthrough/change_stream_pre_image_time_based_expiration_replset.js b/jstests/noPassthrough/change_stream_pre_image_time_based_expiration_replset.js new file mode 100644 index 00000000000..b135a5acb6b --- /dev/null +++ b/jstests/noPassthrough/change_stream_pre_image_time_based_expiration_replset.js @@ -0,0 +1,22 @@ +// Tests time-based pre-image retention policy of change stream pre-images remover job. +// @tags: [ +// requires_fcv_60, +// featureFlagClusterWideConfig, +// ] +(function() { +"use strict"; + +load("jstests/noPassthrough/libs/change_stream_pre_image_time_based_expiration_utils.js"); + +// Tests pre-image time based expiration on a replica-set. +(function testChangeStreamPreImagesforTimeBasedExpirationOnReplicaSet() { + const replSetTest = new ReplSetTest({name: "replSet", nodes: 3}); + replSetTest.startSet(); + replSetTest.initiate(); + + const conn = replSetTest.getPrimary(); + const primary = replSetTest.getPrimary(); + testTimeBasedPreImageRetentionPolicy(conn, primary); + replSetTest.stopSet(); +})(); +}()); diff --git a/jstests/noPassthrough/change_stream_pre_image_time_based_expiration_sharded.js b/jstests/noPassthrough/change_stream_pre_image_time_based_expiration_sharded.js new file mode 100644 index 00000000000..f42d8e3b5a2 --- /dev/null +++ b/jstests/noPassthrough/change_stream_pre_image_time_based_expiration_sharded.js @@ -0,0 +1,25 @@ +// Tests time-based pre-image retention policy of change stream pre-images remover job. +// @tags: [ +// requires_fcv_60, +// featureFlagClusterWideConfig, +// ] +(function() { +"use strict"; + +load("jstests/noPassthrough/libs/change_stream_pre_image_time_based_expiration_utils.js"); + +// Tests pre-image time-based expiration on a sharded cluster. +(function testChangeStreamPreImagesforTimeBasedExpirationOnShardedCluster() { + const options = { + mongos: 1, + config: 1, + shards: 1, + rs: { + nodes: 3, + }, + }; + const st = new ShardingTest(options); + testTimeBasedPreImageRetentionPolicy(st.s0, st.rs0.getPrimary()); + st.stop(); +})(); +}()); diff --git a/jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js b/jstests/noPassthrough/libs/change_stream_pre_image_time_based_expiration_utils.js index c993c82ead5..0a2073a0716 100644 --- a/jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js +++ b/jstests/noPassthrough/libs/change_stream_pre_image_time_based_expiration_utils.js @@ -1,11 +1,4 @@ -// Tests time-based pre-image retention policy of change stream pre-images remover job. -// @tags: [ -// requires_fcv_60, -// featureFlagChangeStreamPreAndPostImages, -// featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy, -// ] -(function() { -"use strict"; +// Library functions for change_stream_pre_image_time_based_expiration tests. load("jstests/libs/fail_point_util.js"); // For configureFailPoint. @@ -32,6 +25,10 @@ function verifyPreImages(preImageColl, expectedPreImages, collectionsInfo) { } // Tests time-based change stream pre-image retention policy. +// When run on a replica set, both 'conn' and 'primary' store connections to the +// replica set primary node. +// When run on a sharded cluster, 'conn' represents the connection to the mongos while +// 'primary' represents the connection to the shard primary node. function testTimeBasedPreImageRetentionPolicy(conn, primary) { // Status for pre-images that define if pre-image is expected to expire or not. const shouldExpire = "shouldExpire"; @@ -55,7 +52,7 @@ function testTimeBasedPreImageRetentionPolicy(conn, primary) { ]; const collectionCount = docsStatePerCollection.length; - const testDB = conn.getDB("test"); + const testDB = primary.getDB("test"); // Create several collections with pre- and post-images enabled. for (let collIdx = 0; collIdx < collectionCount; collIdx++) { @@ -75,8 +72,9 @@ function testTimeBasedPreImageRetentionPolicy(conn, primary) { }); // Disable pre-image time-based expiration policy. - assert.commandWorked(conn.getDB("admin").runCommand( - {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: "off"}})); + assert.commandWorked(conn.getDB("admin").runCommand({ + setClusterParameter: {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: "off"}}} + })); let shouldRetainDocs = []; let shouldExpireDocs = []; @@ -161,8 +159,10 @@ function testTimeBasedPreImageRetentionPolicy(conn, primary) { verifyPreImages(preImageColl, allDocs, collectionsInfo); // Enable time-based pre-image expiration and configure the 'expireAfterSeconds' to 1 seconds. - assert.commandWorked(conn.getDB("admin").runCommand( - {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: expireAfterSeconds}})); + assert.commandWorked(conn.getDB("admin").runCommand({ + setClusterParameter: + {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: expireAfterSeconds}}} + })); // Verify that at some point in time, all expired pre-images will be deleted. assert.soon(() => { @@ -175,17 +175,3 @@ function testTimeBasedPreImageRetentionPolicy(conn, primary) { currentTimeFailPoint.off(); } - -// Tests pre-image time based expiration on a replica-set. -// TODO SERVER-61802: Add test cases for shared cluster. -(function testChangeStreamPreImagesforTimeBasedExpirationOnReplicaSet() { - const replSetTest = new ReplSetTest({name: "replSet", nodes: 1}); - replSetTest.startSet(); - replSetTest.initiate(); - - const conn = replSetTest.getPrimary(); - const primary = replSetTest.getPrimary(); - testTimeBasedPreImageRetentionPolicy(conn, primary); - replSetTest.stopSet(); -})(); -}()); diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js index d5000e2deef..b6c1fcdc778 100644 --- a/jstests/replsets/db_reads_while_recovering_all_commands.js +++ b/jstests/replsets/db_reads_while_recovering_all_commands.js @@ -215,7 +215,7 @@ const allCommands = { fsync: {skip: isNotAUserDataRead}, fsyncUnlock: {skip: isNotAUserDataRead}, getAuditConfig: {skip: isNotAUserDataRead}, - getChangeStreamOptions: {skip: isNotAUserDataRead}, + getChangeStreamOptions: {skip: isNotAUserDataRead}, // TODO SERVER-65353 remove in 6.1. getClusterParameter: {skip: isNotAUserDataRead}, getCmdLineOpts: {skip: isNotAUserDataRead}, getDatabaseVersion: {skip: isNotAUserDataRead}, @@ -332,7 +332,7 @@ const allCommands = { sbe: {skip: isAnInternalCommand}, serverStatus: {skip: isNotAUserDataRead}, setAuditConfig: {skip: isNotAUserDataRead}, - setChangeStreamOptions: {skip: isPrimaryOnly}, + setChangeStreamOptions: {skip: isPrimaryOnly}, // TODO SERVER-65353 remove in 6.1. setCommittedSnapshot: {skip: isNotAUserDataRead}, setDefaultRWConcern: {skip: isPrimaryOnly}, setIndexCommitQuorum: {skip: isPrimaryOnly}, diff --git a/jstests/sharding/database_versioning_all_commands.js b/jstests/sharding/database_versioning_all_commands.js index 5127c7836ad..fe7c409f524 100644 --- a/jstests/sharding/database_versioning_all_commands.js +++ b/jstests/sharding/database_versioning_all_commands.js @@ -456,8 +456,10 @@ let testCases = { flushRouterConfig: {skip: "executes locally on mongos (not sent to any remote node)"}, fsync: {skip: "broadcast to all shards"}, getAuditConfig: {skip: "not on a user database", conditional: true}, - getChangeStreamOptions: - {skip: "executes locally on mongos (not sent to any remote node)", conditional: true}, + getChangeStreamOptions: { + skip: "executes locally on mongos (not sent to any remote node)", + conditional: true + }, // TODO SERVER-65353 remove in 6.1. getClusterParameter: {skip: "always targets the config server"}, getCmdLineOpts: {skip: "executes locally on mongos (not sent to any remote node)"}, getDefaultRWConcern: {skip: "executes locally on mongos (not sent to any remote node)"}, @@ -626,7 +628,10 @@ let testCases = { serverStatus: {skip: "executes locally on mongos (not sent to any remote node)"}, setAllowMigrations: {skip: "not on a user database"}, setAuditConfig: {skip: "not on a user database", conditional: true}, - setChangeStreamOptions: {skip: "always targets the config server", conditional: true}, + setChangeStreamOptions: { + skip: "always targets the config server", + conditional: true + }, // TODO SERVER-65353 remove in 6.1. setDefaultRWConcern: {skip: "always targets the config server"}, setIndexCommitQuorum: { run: { diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js index 7f495cbe2a4..96d0ab9f4d0 100644 --- a/jstests/sharding/read_write_concern_defaults_application.js +++ b/jstests/sharding/read_write_concern_defaults_application.js @@ -477,7 +477,8 @@ let testCases = { fsync: {skip: "does not accept read or write concern"}, fsyncUnlock: {skip: "does not accept read or write concern"}, getAuditConfig: {skip: "does not accept read or write concern"}, - getChangeStreamOptions: {skip: "TODO PM-2502"}, + getChangeStreamOptions: + {skip: "does not accept read or write concern"}, // TODO SERVER-65353 remove in 6.1. getClusterParameter: {skip: "does not accept read or write concern"}, getCmdLineOpts: {skip: "does not accept read or write concern"}, getDatabaseVersion: {skip: "does not accept read or write concern"}, @@ -689,7 +690,8 @@ let testCases = { serverStatus: {skip: "does not accept read or write concern"}, setAllowMigrations: {skip: "does not accept read or write concern"}, setAuditConfig: {skip: "does not accept read or write concern"}, - setChangeStreamOptions: {skip: "TODO PM-2502"}, + setChangeStreamOptions: + {skip: "does not accept read or write concern"}, // TODO SERVER-65353 remove in 6.1. setCommittedSnapshot: {skip: "internal command"}, setDefaultRWConcern: {skip: "special case (must run after all other commands)"}, setFeatureCompatibilityVersion: {skip: "does not accept read or write concern"}, diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js index 21e887dc4bc..d6ad842361f 100644 --- a/jstests/sharding/safe_secondary_reads_drop_recreate.js +++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js @@ -213,7 +213,8 @@ let testCases = { fsync: {skip: "does not return user data"}, fsyncUnlock: {skip: "does not return user data"}, getAuditConfig: {skip: "does not return user data"}, - getChangeStreamOptions: {skip: "does not return user data"}, + getChangeStreamOptions: + {skip: "does not return user data"}, // TODO SERVER-65353 remove in 6.1. getClusterParameter: {skip: "does not return user data"}, getCmdLineOpts: {skip: "does not return user data"}, getDefaultRWConcern: {skip: "does not return user data"}, @@ -323,7 +324,7 @@ let testCases = { serverStatus: {skip: "does not return user data"}, setAllowMigrations: {skip: "primary only"}, setAuditConfig: {skip: "does not return user data"}, - setChangeStreamOptions: {skip: "primary only"}, + setChangeStreamOptions: {skip: "primary only"}, // TODO SERVER-65353 remove in 6.1. setCommittedSnapshot: {skip: "does not return user data"}, setDefaultRWConcern: {skip: "primary only"}, setIndexCommitQuorum: {skip: "primary only"}, diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js index b7a35ef9d49..3e01be669a4 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js @@ -246,7 +246,8 @@ let testCases = { fsync: {skip: "does not return user data"}, fsyncUnlock: {skip: "does not return user data"}, getAuditConfig: {skip: "does not return user data"}, - getChangeStreamOptions: {skip: "does not return user data"}, + getChangeStreamOptions: + {skip: "does not return user data"}, // TODO SERVER-65353 remove in 6.1. getClusterParameter: {skip: "does not return user data"}, getCmdLineOpts: {skip: "does not return user data"}, getDefaultRWConcern: {skip: "does not return user data"}, @@ -394,7 +395,7 @@ let testCases = { serverStatus: {skip: "does not return user data"}, setAllowMigrations: {skip: "primary only"}, setAuditConfig: {skip: "does not return user data"}, - setChangeStreamOptions: {skip: "primary only"}, + setChangeStreamOptions: {skip: "primary only"}, // TODO SERVER-65353 remove in 6.1. setCommittedSnapshot: {skip: "does not return user data"}, setDefaultRWConcern: {skip: "primary only"}, setIndexCommitQuorum: {skip: "primary only"}, diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js index 4fef3a7cbb8..5c75ee77baf 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js @@ -218,7 +218,8 @@ let testCases = { fsync: {skip: "does not return user data"}, fsyncUnlock: {skip: "does not return user data"}, getAuditConfig: {skip: "does not return user data"}, - getChangeStreamOptions: {skip: "does not return user data"}, + getChangeStreamOptions: + {skip: "does not return user data"}, // TODO SERVER-65353 remove in 6.1. getClusterParameter: {skip: "does not return user data"}, getCmdLineOpts: {skip: "does not return user data"}, getDefaultRWConcern: {skip: "does not return user data"}, @@ -330,7 +331,7 @@ let testCases = { serverStatus: {skip: "does not return user data"}, setAllowMigrations: {skip: "primary only"}, setAuditConfig: {skip: "does not return user data"}, - setChangeStreamOptions: {skip: "primary only"}, + setChangeStreamOptions: {skip: "primary only"}, // TODO SERVER-65353 remove in 6.1. setCommittedSnapshot: {skip: "does not return user data"}, setDefaultRWConcern: {skip: "primary only"}, setIndexCommitQuorum: {skip: "primary only"}, diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 043d2a0d512..c81942bc35f 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -490,13 +490,23 @@ env.Library( ) env.Library( + target='change_stream_options', + source=[ + 'change_stream_options.idl', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/idl/cluster_server_parameter', + ], +) + +env.Library( target='change_stream_options_manager', source=[ 'change_stream_options_manager.cpp', + 'change_stream_options_parameter.idl', ], LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/db/commands/change_stream_options', - '$BUILD_DIR/mongo/idl/feature_flag', + 'change_stream_options', ], ) diff --git a/src/mongo/db/auth/action_type.idl b/src/mongo/db/auth/action_type.idl index 6cfbd32d0a9..1d83378f275 100644 --- a/src/mongo/db/auth/action_type.idl +++ b/src/mongo/db/auth/action_type.idl @@ -96,7 +96,6 @@ enums: flushRouterConfig : "flushRouterConfig" forceUUID : "forceUUID" fsync : "fsync" - getChangeStreamOptions: "getChangeStreamOptions" getClusterParameter: "getClusterParameter" getDatabaseVersion : "getDatabaseVersion" getDefaultRWConcern : "getDefaultRWConcern" @@ -161,7 +160,6 @@ enums: runTenantMigration : "runTenantMigration" serverStatus : "serverStatus" setAuthenticationRestriction : "setAuthenticationRestriction" - setChangeStreamOptions: "setChangeStreamOptions" setClusterParameter: "setClusterParameter" setDefaultRWConcern : "setDefaultRWConcern" setFeatureCompatibilityVersion : "setFeatureCompatibilityVersion" diff --git a/src/mongo/db/auth/builtin_roles.cpp b/src/mongo/db/auth/builtin_roles.cpp index 27bb1ee6b54..2b0c63cb798 100644 --- a/src/mongo/db/auth/builtin_roles.cpp +++ b/src/mongo/db/auth/builtin_roles.cpp @@ -254,8 +254,6 @@ MONGO_INITIALIZER(AuthorizationBuiltinRoles)(InitializerContext* context) { << ActionType::setDefaultRWConcern << ActionType::setFeatureCompatibilityVersion << ActionType::setFreeMonitoring - << ActionType::setChangeStreamOptions - << ActionType::getChangeStreamOptions << ActionType::setClusterParameter << ActionType::getClusterParameter; diff --git a/src/mongo/db/change_stream_options.idl b/src/mongo/db/change_stream_options.idl new file mode 100644 index 00000000000..83e9d160868 --- /dev/null +++ b/src/mongo/db/change_stream_options.idl @@ -0,0 +1,58 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# Data structures for change stream configuration options. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + - "mongo/idl/cluster_server_parameter.idl" + +structs: + PreAndPostImagesOptions: + description: "Change streams pre- and post-images options." + fields: + expireAfterSeconds: + description: "The number of seconds after which a pre-image is eligible for + deletion. A string value 'off' enables the default expiration policy." + unstable: false + type: + variant: [string, safeInt64] + default: "\"off\"" + ChangeStreamOptions: + description: "A specification for the change streams options." + inline_chained_structs: true + chained_structs: + ClusterServerParameter: clusterServerParameter + fields: + preAndPostImages: + type: PreAndPostImagesOptions + unstable: false + default: true diff --git a/src/mongo/db/change_stream_options_manager.cpp b/src/mongo/db/change_stream_options_manager.cpp index b2f474777a8..4c27c32ec6f 100644 --- a/src/mongo/db/change_stream_options_manager.cpp +++ b/src/mongo/db/change_stream_options_manager.cpp @@ -32,6 +32,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/change_stream_options_manager.h" +#include "mongo/db/change_stream_options_parameter_gen.h" #include "mongo/logv2/log.h" namespace mongo { @@ -53,8 +54,7 @@ void ChangeStreamOptionsManager::create(ServiceContext* service) { getChangeStreamOptionsManager(service).emplace(service); } -boost::optional<ChangeStreamOptions> ChangeStreamOptionsManager::getOptions( - OperationContext* opCtx) { +const ChangeStreamOptions& ChangeStreamOptionsManager::getOptions(OperationContext* opCtx) { stdx::lock_guard<Latch> L(_mutex); return _changeStreamOptions; } @@ -63,7 +63,99 @@ StatusWith<ChangeStreamOptions> ChangeStreamOptionsManager::setOptions( OperationContext* opCtx, ChangeStreamOptions optionsToSet) { stdx::lock_guard<Latch> L(_mutex); _changeStreamOptions = std::move(optionsToSet); - return *_changeStreamOptions; + return _changeStreamOptions; +} + +void ChangeStreamOptionsParameter::append(OperationContext* opCtx, + BSONObjBuilder& bob, + const std::string& name) { + ChangeStreamOptionsManager& changeStreamOptionsManager = + ChangeStreamOptionsManager::get(getGlobalServiceContext()); + bob.append("_id"_sd, name); + bob.appendElementsUnique(changeStreamOptionsManager.getOptions(opCtx).toBSON()); +} + +Status ChangeStreamOptionsParameter::set(const BSONElement& newValueElement) { + try { + Status validateStatus = validate(newValueElement); + if (!validateStatus.isOK()) { + return validateStatus; + } + + ChangeStreamOptionsManager& changeStreamOptionsManager = + ChangeStreamOptionsManager::get(getGlobalServiceContext()); + ChangeStreamOptions newOptions = ChangeStreamOptions::parse( + IDLParserErrorContext("changeStreamOptions"), newValueElement.Obj()); + + return changeStreamOptionsManager + .setOptions(Client::getCurrent()->getOperationContext(), newOptions) + .getStatus(); + } catch (const AssertionException&) { + return {ErrorCodes::BadValue, "Could not parse changeStreamOptions parameter"}; + } +} + +Status ChangeStreamOptionsParameter::validate(const BSONElement& newValueElement) const { + try { + BSONObj changeStreamOptionsObj = newValueElement.Obj(); + Status validateStatus = Status::OK(); + + // PreAndPostImages currently contains a single field, `expireAfterSeconds`, that is + // default- initialized to 'off'. This is useful for parameter initialization at startup but + // causes the IDL parser to not enforce the presence of `expireAfterSeconds` in BSON + // representations. We assert that and the existence of PreAndPostImages here. + IDLParserErrorContext ctxt = IDLParserErrorContext("changeStreamOptions"_sd); + if (auto preAndPostImagesObj = changeStreamOptionsObj["preAndPostImages"_sd]; + !preAndPostImagesObj.eoo()) { + if (preAndPostImagesObj["expireAfterSeconds"_sd].eoo()) { + ctxt.throwMissingField("expireAfterSeconds"_sd); + } + } else { + ctxt.throwMissingField("preAndPostImages"_sd); + } + + ChangeStreamOptions newOptions = ChangeStreamOptions::parse(ctxt, changeStreamOptionsObj); + auto preAndPostImages = newOptions.getPreAndPostImages(); + stdx::visit( + visit_helper::Overloaded{ + [&](const std::string& expireAfterSeconds) { + if (expireAfterSeconds != "off"_sd) { + validateStatus = { + ErrorCodes::BadValue, + "Non-numeric value of 'expireAfterSeconds' should be 'off'"}; + } + }, + [&](const std::int64_t& expireAfterSeconds) { + if (expireAfterSeconds <= 0) { + validateStatus = { + ErrorCodes::BadValue, + "Numeric value of 'expireAfterSeconds' should be positive"}; + } + }, + }, + preAndPostImages.getExpireAfterSeconds()); + + return validateStatus; + } catch (const AssertionException& ex) { + return {ErrorCodes::BadValue, + str::stream() << "Failed parsing new changeStreamOptions value" << ex.reason()}; + } +} + +Status ChangeStreamOptionsParameter::reset() { + // Replace the current changeStreamOptions with a default-constructed one, which should + // automatically set preAndPostImages.expirationSeconds to 'off' by default. + ChangeStreamOptionsManager& changeStreamOptionsManager = + ChangeStreamOptionsManager::get(getGlobalServiceContext()); + return changeStreamOptionsManager + .setOptions(Client::getCurrent()->getOperationContext(), ChangeStreamOptions()) + .getStatus(); +} + +const LogicalTime ChangeStreamOptionsParameter::getClusterParameterTime() const { + ChangeStreamOptionsManager& changeStreamOptionsManager = + ChangeStreamOptionsManager::get(getGlobalServiceContext()); + return changeStreamOptionsManager.getClusterParameterTime(); } } // namespace mongo diff --git a/src/mongo/db/change_stream_options_manager.h b/src/mongo/db/change_stream_options_manager.h index f87258b11c8..ea1fe2a3b47 100644 --- a/src/mongo/db/change_stream_options_manager.h +++ b/src/mongo/db/change_stream_options_manager.h @@ -29,7 +29,7 @@ #pragma once -#include "mongo/db/commands/change_stream_options_gen.h" +#include "mongo/db/change_stream_options_gen.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/platform/mutex.h" @@ -63,9 +63,9 @@ public: static ChangeStreamOptionsManager& get(OperationContext* opCtx); /** - * Returns the change-streams options if present, boost::none otherwise. + * Returns the change-streams options. */ - boost::optional<ChangeStreamOptions> getOptions(OperationContext* opCtx); + const ChangeStreamOptions& getOptions(OperationContext* opCtx); /** * Sets the provided change-streams options. Returns OK on success, otherwise appropriate error @@ -74,11 +74,18 @@ public: StatusWith<ChangeStreamOptions> setOptions(OperationContext* opCtx, ChangeStreamOptions optionsToSet); + /** + * Returns the clusterParameterTime of the current change stream options. + */ + const LogicalTime& getClusterParameterTime() { + return _changeStreamOptions.getClusterParameterTime(); + } + private: ChangeStreamOptionsManager(const ChangeStreamOptionsManager&) = delete; ChangeStreamOptionsManager& operator=(const ChangeStreamOptionsManager&) = delete; - boost::optional<ChangeStreamOptions> _changeStreamOptions; + ChangeStreamOptions _changeStreamOptions; Mutex _mutex = MONGO_MAKE_LATCH("ChangeStreamOptionsManager::mutex"); }; diff --git a/src/mongo/db/change_stream_options_parameter.idl b/src/mongo/db/change_stream_options_parameter.idl new file mode 100644 index 00000000000..122e2793e3b --- /dev/null +++ b/src/mongo/db/change_stream_options_parameter.idl @@ -0,0 +1,42 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# Cluster server parameter for change streams configuration options. + +global: + cpp_namespace: "mongo" + +server_parameters: + changeStreamOptions: + description: "Cluster server parameter for change stream options" + set_at: cluster + cpp_class: + name: ChangeStreamOptionsParameter + override_set: true + override_validate: true +
\ No newline at end of file diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index a6138057096..6a08a6b7e9e 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -203,18 +203,6 @@ env.Library( ) env.Library( - target='change_stream_options', - source=[ - 'change_stream_options.idl', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/auth/auth', - '$BUILD_DIR/mongo/db/auth/authprivilege', - '$BUILD_DIR/mongo/db/read_write_concern_defaults', - ] -) - -env.Library( target="authentication_commands", source=[ 'authentication_commands.cpp', @@ -515,7 +503,6 @@ env.Library( target="mongod", source=[ "apply_ops_cmd.cpp", - "change_stream_options_command.cpp", "collection_to_capped.cpp", "compact.cpp", "cpuload.cpp", @@ -568,7 +555,6 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/index_key_validate', '$BUILD_DIR/mongo/db/change_stream_options_manager', '$BUILD_DIR/mongo/db/commands', - '$BUILD_DIR/mongo/db/commands/change_stream_options', '$BUILD_DIR/mongo/db/curop_failpoint_helpers', '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/exec/sbe_cmd', diff --git a/src/mongo/db/commands/change_stream_options.idl b/src/mongo/db/commands/change_stream_options.idl deleted file mode 100644 index 56146ad9049..00000000000 --- a/src/mongo/db/commands/change_stream_options.idl +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright (C) 2022-present MongoDB, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the Server Side Public License, version 1, -# as published by MongoDB, Inc. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Server Side Public License for more details. -# -# You should have received a copy of the Server Side Public License -# along with this program. If not, see -# <http://www.mongodb.com/licensing/server-side-public-license>. -# -# As a special exception, the copyright holders give permission to link the -# code of portions of this program with the OpenSSL library under certain -# conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the Server Side Public License in all respects for -# all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, -# delete this exception statement from your version. If you delete this -# exception statement from all source files in the program, then also delete -# it in the license file. -# - -# Commands to set and get change streams configuration options and associated data structures. - -global: - cpp_namespace: "mongo" - -imports: - - "mongo/db/auth/access_checks.idl" - - "mongo/db/auth/action_type.idl" - - "mongo/db/write_concern_options.idl" - - "mongo/idl/basic_types.idl" - -structs: - PreAndPostImagesOptions: - description: "Change streams pre- and post-images options." - fields: - expireAfterSeconds: - description: "The number of seconds after which a pre-image is eligible for - deletion. A string value 'off' enables the default expiration policy." - optional: true - unstable: false - type: - variant: [string, safeInt64] - ChangeStreamOptions: - description: "A specification for the change streams options." - fields: - preAndPostImages: - type: PreAndPostImagesOptions - optional: true - unstable: false - GetChangeStreamOptionsResponse: - description: "A response for the get change streams options command." - chained_structs: - ChangeStreamOptions: ChangeStreamOptions - -commands: - setChangeStreamOptions: - description: "A command to set the change streams options." - command_name: setChangeStreamOptions - namespace: ignored - cpp_name: setChangeStreamOptions - strict: true - api_version: "1" - access_check: - complex: - - check: is_authenticated - - privilege: - resource_pattern: cluster - action_type: setChangeStreamOptions - fields: - preAndPostImages: - description: "The pre- and post-images options to set." - type: PreAndPostImagesOptions - optional: true - unstable: false - writeConcern: - description: "The level of write concern for the command." - type: WriteConcern - optional: true - unstable: false - reply_type: OkReply - getChangeStreamOptions: - description: "A command to get the change streams options." - command_name: getChangeStreamOptions - namespace: ignored - cpp_name: getChangeStreamOptions - strict: true - api_version: "1" - access_check: - complex: - - check: is_authenticated - - privilege: - resource_pattern: cluster - action_type: getChangeStreamOptions - reply_type: GetChangeStreamOptionsResponse
\ No newline at end of file diff --git a/src/mongo/db/commands/change_stream_options_command.cpp b/src/mongo/db/commands/change_stream_options_command.cpp deleted file mode 100644 index 7fad14c0ac7..00000000000 --- a/src/mongo/db/commands/change_stream_options_command.cpp +++ /dev/null @@ -1,264 +0,0 @@ -/** - * Copyright (C) 2022-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand - -#include "mongo/platform/basic.h" - -#include "mongo/db/auth/authorization_checks.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/change_stream_options_manager.h" -#include "mongo/db/commands.h" -#include "mongo/db/commands/change_stream_options_gen.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/logv2/log.h" - -namespace mongo { -namespace { -/** - * A versioned command class to set the change streams options. This command should not run - * on 'mongoS'. - */ -class SetChangeStreamOptionsCommand - : public SetChangeStreamOptionsCmdVersion1Gen<SetChangeStreamOptionsCommand> { -public: - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kNever; - } - - bool adminOnly() const final { - return true; - } - - std::string help() const override { - return "Sets change streams configuration options.\n" - "Usage: { setChangeStreamOptions: 1, preAndPostImages: { expireAfterSeconds: " - "<long>|'off'> }, writeConcern: { <write concern> }}"; - } - - class Invocation final : public InvocationBaseGen { - public: - Invocation(OperationContext* opCtx, - const Command* command, - const OpMsgRequest& opMsgRequest) - : InvocationBaseGen(opCtx, command, opMsgRequest) {} - - bool supportsWriteConcern() const final { - return true; - } - - NamespaceString ns() const final { - return NamespaceString(); - } - - Reply typedRun(OperationContext* opCtx) final { - assertCanIssueCommand(opCtx); - return setOptionsAndReply(opCtx); - } - - private: - /** - * A helper to verify if the command can be run. Throws 'uassert' in case of failures. - */ - void assertCanIssueCommand(OperationContext* opCtx) { - const auto replCoord = repl::ReplicationCoordinator::get(opCtx); - - uassert(5869200, - str::stream() << "'" << SetChangeStreamOptions::kCommandName - << "' is not supported on standalone nodes.", - replCoord->isReplEnabled()); - - uassert(5869201, - str::stream() << "'" << SetChangeStreamOptions::kCommandName - << "' is not supported on shard nodes.", - serverGlobalParams.clusterRole != ClusterRole::ShardServer); - - uassert(5869202, - "Expected at least one change stream option to set", - request().getPreAndPostImages()); - } - - /** - * Sets the change streams options using 'changeStreamOptionsManager'. - */ - Reply setOptionsAndReply(OperationContext* opCtx) { - // Create a request object for the 'changeStreamOptionsManager'. - auto optionsToSet = ChangeStreamOptions(); - - if (auto& preAndPostImage = request().getPreAndPostImages()) { - uassert(5869203, - "Expected 'expireAfterSeconds' option", - preAndPostImage->getExpireAfterSeconds()); - - stdx::visit( - visit_helper::Overloaded{ - [&](const std::string& expirationPolicyMode) { - uassert(5869204, - "Non-numeric value of 'expireAfterSeconds' should be 'off'", - expirationPolicyMode == "off"); - }, - [&](const std::int64_t& expiryTime) { - uassert(5869205, - "Numeric value of 'expireAfterSeconds' should be positive", - expiryTime > 0); - }}, - *preAndPostImage->getExpireAfterSeconds()); - - optionsToSet.setPreAndPostImages(preAndPostImage); - } - - // Store the change streams configuration options. - auto& changeStreamOptionManager = ChangeStreamOptionsManager::get(opCtx); - auto status = changeStreamOptionManager.setOptions(opCtx, optionsToSet); - uassert(5869206, - str::stream() << "Failed to set change stream options, status: " - << status.getStatus().codeString(), - status.isOK()); - - return Reply(); - } - - void doCheckAuthorization(OperationContext* opCtx) const override { - uassert(ErrorCodes::Unauthorized, - "Feature flag featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy " - "must be enabled", - feature_flags::gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy - .isEnabled(serverGlobalParams.featureCompatibility)); - - uassert(ErrorCodes::Unauthorized, - "Unauthorized", - AuthorizationSession::get(opCtx->getClient()) - ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(), - ActionType::setChangeStreamOptions})); - } - }; -} setChangeStreamOptionsCommand; - -/** - * A versioned command class to get the change streams options. This command should not run on - * 'mongoS'. - */ -class GetChangeStreamOptionsCommand - : public GetChangeStreamOptionsCmdVersion1Gen<GetChangeStreamOptionsCommand> { -public: - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kNever; - } - - bool adminOnly() const override { - return true; - } - - std::string help() const final { - return "Gets change streams configuration options.\n" - "Usage: { getChangeStreamOptions: 1 }"; - } - - class Invocation final : public InvocationBaseGen { - public: - Invocation(OperationContext* opCtx, - const Command* command, - const OpMsgRequest& opMsgRequest) - : InvocationBaseGen(opCtx, command, opMsgRequest) {} - - bool supportsWriteConcern() const final { - return false; - } - - NamespaceString ns() const final { - return NamespaceString(); - } - - Reply typedRun(OperationContext* opCtx) final { - assertCanIssueCommand(opCtx); - return getOptionsAndReply(opCtx); - } - - private: - /** - * A helper to verify if the command can be run. Throws 'uassert' in case of failures. - */ - void assertCanIssueCommand(OperationContext* opCtx) { - const auto replCoord = repl::ReplicationCoordinator::get(opCtx); - - uassert(5869207, - str::stream() << "'" << GetChangeStreamOptions::kCommandName - << "' is not supported on standalone nodes.", - replCoord->isReplEnabled()); - - uassert(5869208, - str::stream() << "'" << GetChangeStreamOptions::kCommandName - << "' is not supported on shard nodes.", - serverGlobalParams.clusterRole != ClusterRole::ShardServer); - } - - /** - * Gets the change streams options from the 'ChangeStreamOptionsManager', creates a response - * from it, and return it back to the client. - */ - Reply getOptionsAndReply(OperationContext* opCtx) { - auto reply = Reply(); - - // Get the change streams options, if present and return it back to the client. - auto& changeStreamOptionManager = ChangeStreamOptionsManager::get(opCtx); - - if (auto changeStreamOptions = changeStreamOptionManager.getOptions(opCtx)) { - if (auto preAndPostImages = changeStreamOptions->getPreAndPostImages()) { - // Add 'expiredAfterSeconds' to the reply message only when the default - // expiration policy is not enabled. A string type of 'expireAfterSeconds' - // signifies that the value it holds is 'off'. - if (preAndPostImages->getExpireAfterSeconds() && - !stdx::holds_alternative<std::string>( - *preAndPostImages->getExpireAfterSeconds())) { - reply.setChangeStreamOptions(std::move(*changeStreamOptions)); - } - } - } - - return reply; - } - - void doCheckAuthorization(OperationContext* opCtx) const override { - uassert(ErrorCodes::Unauthorized, - "Feature flag featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy " - "must be enabled", - feature_flags::gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy - .isEnabled(serverGlobalParams.featureCompatibility)); - - uassert(ErrorCodes::Unauthorized, - "Unauthorized", - AuthorizationSession::get(opCtx->getClient()) - ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(), - ActionType::getChangeStreamOptions})); - } - }; -} getChangeStreamOptionsCommand; - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index bb37a5b15ba..6b54a25e0b8 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -645,8 +645,8 @@ env.CppUnitTest( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/change_stream_options', '$BUILD_DIR/mongo/db/change_stream_options_manager', - '$BUILD_DIR/mongo/db/commands/change_stream_options', '$BUILD_DIR/mongo/db/cst/cst', '$BUILD_DIR/mongo/db/exec/document_value/document_value', '$BUILD_DIR/mongo/db/exec/document_value/document_value_test_util', diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp index f015f39680c..02a6b932ffd 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp @@ -70,13 +70,14 @@ bool PreImageAttributes::isExpiredPreImage(const boost::optional<Date_t>& preIma return preImageOplogEntryIsDeleted || operationTime <= expirationTime; } -// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if present, boost::none otherwise. +// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if not 'off', boost::none otherwise. boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions( ChangeStreamOptions& changeStreamOptions) { - if (auto preAndPostImages = changeStreamOptions.getPreAndPostImages(); preAndPostImages && - preAndPostImages->getExpireAfterSeconds() && - !stdx::holds_alternative<std::string>(*preAndPostImages->getExpireAfterSeconds())) { - return stdx::get<std::int64_t>(*preAndPostImages->getExpireAfterSeconds()); + const stdx::variant<std::string, std::int64_t>& expireAfterSeconds = + changeStreamOptions.getPreAndPostImages().getExpireAfterSeconds(); + + if (!stdx::holds_alternative<std::string>(expireAfterSeconds)) { + return stdx::get<std::int64_t>(expireAfterSeconds); } return boost::none; @@ -88,9 +89,8 @@ boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_ boost::optional<std::int64_t> expireAfterSeconds = boost::none; // Get the expiration time directly from the change stream manager. - if (auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx)) { - expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(*changeStreamOptions); - } + auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx); + expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(changeStreamOptions); // A pre-image is eligible for deletion if: // pre-image's op-time + expireAfterSeconds < currentTime. diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h index 89849552b0a..0ddd491991f 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h @@ -29,7 +29,7 @@ #pragma once -#include "mongo/db/commands/change_stream_options_gen.h" +#include "mongo/db/change_stream_options_gen.h" #include "mongo/db/service_context.h" namespace mongo { diff --git a/src/mongo/db/query/query_feature_flags.idl b/src/mongo/db/query/query_feature_flags.idl index a0d7ce2c14c..4dcbb2382a8 100644 --- a/src/mongo/db/query/query_feature_flags.idl +++ b/src/mongo/db/query/query_feature_flags.idl @@ -124,11 +124,6 @@ feature_flags: default: true version: 6.0 - featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy: - description: "Feature flag to enable time based retention policy of point-in-time pre- and post-images of documents in change streams" - cpp_varname: gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy - default: false - featureFlagSBELookupPushdown: description: "Feature flag for allowing SBE $lookup pushdown" cpp_varname: gFeatureFlagSBELookupPushdown diff --git a/src/mongo/idl/SConscript b/src/mongo/idl/SConscript index a63af90f83a..de25a54c950 100644 --- a/src/mongo/idl/SConscript +++ b/src/mongo/idl/SConscript @@ -55,11 +55,10 @@ env.Library( 'cluster_server_parameter.idl', ], LIBDEPS=[ - '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/logical_time', + 'feature_flag', ], LIBDEPS_PRIVATE=[ - 'feature_flag', 'idl_parser', ], ) @@ -87,6 +86,7 @@ env.CppUnitTest( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/change_stream_options_manager', '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/oplog_interface_local', '$BUILD_DIR/mongo/db/repl/replmocks', diff --git a/src/mongo/idl/cluster_server_parameter_op_observer_test.cpp b/src/mongo/idl/cluster_server_parameter_op_observer_test.cpp index 0b4b6b9163f..a55a88980c0 100644 --- a/src/mongo/idl/cluster_server_parameter_op_observer_test.cpp +++ b/src/mongo/idl/cluster_server_parameter_op_observer_test.cpp @@ -31,6 +31,7 @@ #include "mongo/platform/basic.h" +#include "mongo/db/change_stream_options_manager.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_interface_local.h" @@ -154,6 +155,9 @@ public: std::make_unique<repl::ReplicationCoordinatorMock>(service, createReplSettings())); repl::createOplog(opCtx.get()); + // Set up the ChangeStreamOptionsManager so that it can be retrieved/set. + ChangeStreamOptionsManager::create(service); + // Ensure that we are primary. auto replCoord = repl::ReplicationCoordinator::get(opCtx.get()); ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 32bb92585ee..7432c3a3b86 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -525,6 +525,7 @@ env.Library( '$BUILD_DIR/mongo/client/remote_command_targeter', '$BUILD_DIR/mongo/db/audit', '$BUILD_DIR/mongo/db/auth/authmongos', + '$BUILD_DIR/mongo/db/change_stream_options_manager', '$BUILD_DIR/mongo/db/commands/rwc_defaults_commands', '$BUILD_DIR/mongo/db/ftdc/ftdc_mongos', '$BUILD_DIR/mongo/db/process_health/fault_manager', diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 5cdade782f9..91a07c18245 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -33,7 +33,6 @@ env.Library( 'cluster_abort_transaction_cmd_s.cpp', 'cluster_available_query_options_cmd.cpp', 'cluster_build_info.cpp', - 'cluster_change_stream_options_command.cpp', 'cluster_cleanup_reshard_collection_cmd.cpp', 'cluster_coll_stats_cmd.cpp', 'cluster_collection_mod_cmd.cpp', @@ -107,7 +106,7 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/api_parameters', '$BUILD_DIR/mongo/db/auth/auth_checks', - '$BUILD_DIR/mongo/db/commands/change_stream_options', + '$BUILD_DIR/mongo/db/change_stream_options_manager', '$BUILD_DIR/mongo/db/commands/cluster_server_parameter_cmds_idl', '$BUILD_DIR/mongo/db/commands/core', '$BUILD_DIR/mongo/db/commands/create_command', diff --git a/src/mongo/s/commands/cluster_change_stream_options_command.cpp b/src/mongo/s/commands/cluster_change_stream_options_command.cpp deleted file mode 100644 index e91aa657fb1..00000000000 --- a/src/mongo/s/commands/cluster_change_stream_options_command.cpp +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Copyright (C) 2022-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand - -#include "mongo/platform/basic.h" - -#include "mongo/db/auth/authorization_checks.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/change_stream_options_manager.h" -#include "mongo/db/commands.h" -#include "mongo/db/commands/change_stream_options_gen.h" -#include "mongo/db/read_write_concern_defaults.h" -#include "mongo/db/repl/read_concern_args.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/logv2/log.h" -#include "mongo/s/cluster_commands_helpers.h" -#include "mongo/s/grid.h" - -namespace mongo { -namespace { - -/** - * Creates a reply object of type 'Reply' from raw 'response' BSON object. Any extra fields are - * removed from the 'response' object and then passed into to the IDL parser to create the response - * object. - */ -template <typename Reply> -Reply createReply(BSONObj response, const std::string& errorContextField) { - constexpr auto kReplData = "$replData"_sd; - constexpr auto kLastCommittedOpTime = "lastCommittedOpTime"_sd; - constexpr auto kClusterTime = "$clusterTime"_sd; - constexpr auto kConfigTime = "$configTime"_sd; - constexpr auto kTopologyTime = "$topologyTime"_sd; - constexpr auto kOperationTime = "operationTime"_sd; - - // A set of fields to be removed from the 'response' object. - StringDataSet ignorableFields({kReplData, - kLastCommittedOpTime, - ErrorReply::kOkFieldName, - kClusterTime, - kConfigTime, - kTopologyTime, - kOperationTime}); - - // Create the reply object from the redacted response object. - return Reply::parse(IDLParserErrorContext(errorContextField), - response.removeFields(ignorableFields)); -} - -class ClusterSetChangeStreamOptionsCommand - : public SetChangeStreamOptionsCmdVersion1Gen<ClusterSetChangeStreamOptionsCommand> { -public: - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kNever; - } - - bool adminOnly() const override { - return true; - } - - std::string help() const override { - return "Sets change streams configuration options.\n" - "Usage: { setChangeStreamOptions: 1, preAndPostImages: { expireAfterSeconds: " - "<long>|'off'> }, writeConcern: { <write concern> }}"; - } - - class Invocation final : public InvocationBaseGen { - public: - Invocation(OperationContext* opCtx, - const Command* command, - const OpMsgRequest& opMsgRequest) - : InvocationBaseGen(opCtx, command, opMsgRequest) {} - - bool supportsWriteConcern() const final { - return true; - } - - NamespaceString ns() const final { - return NamespaceString(); - } - - Reply typedRun(OperationContext* opCtx) final { - // Get the configuration server connection and dispatch the request to it to set the - // change streams options. - auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - NamespaceString::kAdminDb.toString(), - CommandHelpers::appendMajorityWriteConcern( - CommandHelpers::filterCommandRequestForPassthrough(request().toBSON({})), - opCtx->getWriteConcern()), - Shard::RetryPolicy::kNotIdempotent)); - - uassertStatusOK(cmdResponse.commandStatus); - - return Reply(); - } - - private: - void doCheckAuthorization(OperationContext* opCtx) const override { - uassert(ErrorCodes::Unauthorized, - "Feature flag featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy " - "must be enabled", - feature_flags::gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy - .isEnabled(serverGlobalParams.featureCompatibility)); - - uassert(ErrorCodes::Unauthorized, - "Unauthorized", - AuthorizationSession::get(opCtx->getClient()) - ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(), - ActionType::setChangeStreamOptions})); - } - }; -} clusterSetChangeStreamOptionsCommand; - -class ClusterGetChangeStreamOptionsCommand - : public GetChangeStreamOptionsCmdVersion1Gen<ClusterGetChangeStreamOptionsCommand> { -public: - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kNever; - } - bool adminOnly() const override { - return true; - } - - std::string help() const final { - return "Gets change streams configuration options.\n" - "Usage: { getChangeStreamOptions: 1 }"; - } - - class Invocation final : public InvocationBaseGen { - public: - Invocation(OperationContext* opCtx, - const Command* command, - const OpMsgRequest& opMsgRequest) - : InvocationBaseGen(opCtx, command, opMsgRequest) {} - - bool supportsWriteConcern() const final { - return false; - } - - NamespaceString ns() const final { - return NamespaceString(); - } - - Reply typedRun(OperationContext* opCtx) final { - // Get the change streams options from the configuration server. - auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - NamespaceString::kAdminDb.toString(), - applyReadWriteConcern( - opCtx, - this, - CommandHelpers::filterCommandRequestForPassthrough(request().toBSON({}))), - Shard::RetryPolicy::kIdempotent)); - - uassertStatusOK(cmdResponse.commandStatus); - - // Create the reply object from the configuration server response and return it back to - // the client. - return createReply<Reply>(cmdResponse.response, "ClusterGetChangeStreamOptionsCommand"); - } - - private: - void doCheckAuthorization(OperationContext* opCtx) const override { - uassert(ErrorCodes::Unauthorized, - "Feature flag featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy " - "must be enabled", - feature_flags::gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy - .isEnabled(serverGlobalParams.featureCompatibility)); - - uassert(ErrorCodes::Unauthorized, - "Unauthorized", - AuthorizationSession::get(opCtx->getClient()) - ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(), - ActionType::getChangeStreamOptions})); - } - }; -} clusterGetChangeStreamOptionsCommand; - -} // namespace -} // namespace mongo diff --git a/src/mongo/s/mongos_main.cpp b/src/mongo/s/mongos_main.cpp index 7ed6e3d4315..bea3ea0ae4b 100644 --- a/src/mongo/s/mongos_main.cpp +++ b/src/mongo/s/mongos_main.cpp @@ -49,6 +49,7 @@ #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authz_manager_external_state_s.h" #include "mongo/db/auth/user_cache_invalidator_job.h" +#include "mongo/db/change_stream_options_manager.h" #include "mongo/db/client.h" #include "mongo/db/client_metadata_propagation_egress_hook.h" #include "mongo/db/dbdirectclient.h" @@ -699,6 +700,7 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { } ReadWriteConcernDefaults::create(serviceContext, readWriteConcernDefaultsCacheLookupMongoS); + ChangeStreamOptionsManager::create(serviceContext); auto opCtxHolder = tc->makeOperationContext(); auto const opCtx = opCtxHolder.get(); |