summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVarun Ravichandran <varun.ravichandran@mongodb.com>2022-03-29 21:08:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-12 01:15:19 +0000
commitceb27a18202caf692f595f978fecc7b416a6f6a2 (patch)
tree8361a978537e25f3a51268fd3d6e534530e9e51c
parent1e9543d5b465e29bf3ee05e21434062cbc9f22f2 (diff)
downloadmongo-ceb27a18202caf692f595f978fecc7b416a6f6a2.tar.gz
SERVER-61802: Create changeStreamOptions cluster server parameter
-rw-r--r--buildscripts/idl/idl/binder.py10
-rw-r--r--buildscripts/idl/idl/errors.py15
-rw-r--r--buildscripts/idl/idl/generator.py4
-rw-r--r--buildscripts/idl/idl_check_compatibility.py15
-rw-r--r--buildscripts/idl/tests/test_binder.py21
-rw-r--r--jstests/core/views/views_all_commands.js4
-rw-r--r--jstests/libs/cluster_server_parameter_utils.js31
-rw-r--r--jstests/noPassthrough/change_stream_options.js183
-rw-r--r--jstests/noPassthrough/change_stream_options_command.js158
-rw-r--r--jstests/noPassthrough/change_stream_pre_image_time_based_expiration_replset.js22
-rw-r--r--jstests/noPassthrough/change_stream_pre_image_time_based_expiration_sharded.js25
-rw-r--r--jstests/noPassthrough/libs/change_stream_pre_image_time_based_expiration_utils.js (renamed from jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js)40
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js4
-rw-r--r--jstests/sharding/database_versioning_all_commands.js11
-rw-r--r--jstests/sharding/read_write_concern_defaults_application.js6
-rw-r--r--jstests/sharding/safe_secondary_reads_drop_recreate.js5
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js5
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js5
-rw-r--r--src/mongo/db/SConscript14
-rw-r--r--src/mongo/db/auth/action_type.idl2
-rw-r--r--src/mongo/db/auth/builtin_roles.cpp2
-rw-r--r--src/mongo/db/change_stream_options.idl58
-rw-r--r--src/mongo/db/change_stream_options_manager.cpp98
-rw-r--r--src/mongo/db/change_stream_options_manager.h15
-rw-r--r--src/mongo/db/change_stream_options_parameter.idl42
-rw-r--r--src/mongo/db/commands/SConscript14
-rw-r--r--src/mongo/db/commands/change_stream_options.idl102
-rw-r--r--src/mongo/db/commands/change_stream_options_command.cpp264
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp16
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h2
-rw-r--r--src/mongo/db/query/query_feature_flags.idl5
-rw-r--r--src/mongo/idl/SConscript4
-rw-r--r--src/mongo/idl/cluster_server_parameter_op_observer_test.cpp4
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/commands/SConscript3
-rw-r--r--src/mongo/s/commands/cluster_change_stream_options_command.cpp212
-rw-r--r--src/mongo/s/mongos_main.cpp2
38 files changed, 585 insertions, 841 deletions
diff --git a/buildscripts/idl/idl/binder.py b/buildscripts/idl/idl/binder.py
index 1b11d1af4f7..1511b6c1b07 100644
--- a/buildscripts/idl/idl/binder.py
+++ b/buildscripts/idl/idl/binder.py
@@ -438,7 +438,7 @@ def _bind_struct_field(ctxt, ast_field, idl_type):
ast_field.type = _bind_struct_type(struct)
ast_field.type.is_array = isinstance(idl_type, syntax.ArrayType)
- _validate_field_of_type_struct(ctxt, ast_field)
+ _validate_default_of_type_struct(ctxt, ast_field)
def _bind_variant_field(ctxt, ast_field, idl_type):
@@ -747,11 +747,11 @@ def _validate_ignored_field(ctxt, field):
ctxt.add_ignored_field_must_be_empty_error(field, field.name, "default")
-def _validate_field_of_type_struct(ctxt, field):
+def _validate_default_of_type_struct(ctxt, field):
# type: (errors.ParserContext, Union[syntax.Field, ast.Field]) -> None
- """Validate that for fields with a type of struct, no other properties are set."""
- if field.default is not None:
- ctxt.add_struct_field_must_be_empty_error(field, field.name, "default")
+ """Validate that for fields with a type of struct, the only default permitted is true, which causes it to be default-constructed."""
+ if (field.default is not None) and (field.default != "true"):
+ ctxt.add_struct_default_must_be_true_or_empty_error(field, field.name)
def _validate_variant_type(ctxt, syntax_symbol, field):
diff --git a/buildscripts/idl/idl/errors.py b/buildscripts/idl/idl/errors.py
index f1c387df2f7..4efd18797f9 100644
--- a/buildscripts/idl/idl/errors.py
+++ b/buildscripts/idl/idl/errors.py
@@ -63,7 +63,7 @@ ERROR_ID_BAD_BSON_BINDATA_SUBTYPE_TYPE = "ID0015"
ERROR_ID_BAD_BSON_BINDATA_SUBTYPE_VALUE = "ID0016"
ERROR_ID_NO_STRINGDATA = "ID0017"
ERROR_ID_FIELD_MUST_BE_EMPTY_FOR_IGNORED = "ID0018"
-ERROR_ID_FIELD_MUST_BE_EMPTY_FOR_STRUCT = "ID0019"
+ERROR_ID_DEFAULT_MUST_BE_TRUE_OR_EMPTY_FOR_STRUCT = "ID0019"
ERROR_ID_CUSTOM_SCALAR_SERIALIZATION_NOT_SUPPORTED = "ID0020"
ERROR_ID_BAD_ANY_TYPE_USE = "ID0021"
ERROR_ID_BAD_NUMERIC_CPP_TYPE = "ID0022"
@@ -462,14 +462,13 @@ class ParserContext(object):
("Field '%s' cannot contain a value for property '%s' when a field is marked as ignored"
) % (name, field_name))
- def add_struct_field_must_be_empty_error(self, location, name, field_name):
- # type: (common.SourceLocation, str, str) -> None
- """Add an error about field must be empty for fields of type struct."""
+ def add_struct_default_must_be_true_or_empty_error(self, location, name):
+ # type: (common.SourceLocation, str) -> None
+ """Add an error about default must be True or empty for fields of type struct."""
# pylint: disable=invalid-name
- self._add_error(
- location, ERROR_ID_FIELD_MUST_BE_EMPTY_FOR_STRUCT,
- ("Field '%s' cannot contain a value for property '%s' when a field's type is a struct")
- % (name, field_name))
+ self._add_error(location, ERROR_ID_DEFAULT_MUST_BE_TRUE_OR_EMPTY_FOR_STRUCT, (
+ "Field '%s' can only contain value 'true' for property 'default' when a field's type is a struct"
+ ) % (name))
def add_not_custom_scalar_serialization_not_supported_error(self, location, ast_type,
ast_parent, bson_type_name):
diff --git a/buildscripts/idl/idl/generator.py b/buildscripts/idl/idl/generator.py
index 6c3bc4948cc..d8b38d51a0c 100644
--- a/buildscripts/idl/idl/generator.py
+++ b/buildscripts/idl/idl/generator.py
@@ -617,10 +617,14 @@ class _CppHeaderFileWriter(_CppFileWriterBase):
member_type = cpp_type_info.get_storage_type()
member_name = _get_field_member_name(field)
+ # Struct fields are allowed to specify default: true so that the member gets default-
+ # constructed.
if field.default and not field.constructed:
if field.type.is_enum:
self._writer.write_line('%s %s{%s::%s};' % (member_type, member_name,
field.type.cpp_type, field.default))
+ elif field.type.is_struct:
+ self._writer.write_line('%s %s;' % (member_type, member_name))
else:
self._writer.write_line('%s %s{%s};' % (member_type, member_name, field.default))
else:
diff --git a/buildscripts/idl/idl_check_compatibility.py b/buildscripts/idl/idl_check_compatibility.py
index d34dbc83409..9aab17079f3 100644
--- a/buildscripts/idl/idl_check_compatibility.py
+++ b/buildscripts/idl/idl_check_compatibility.py
@@ -191,6 +191,15 @@ SKIPPED_FILES = [
"nsICollation.idl", "nsIStringBundle.idl", "nsIScriptableUConv.idl", "nsITextToSubURI.idl"
]
+# Do not add commands that were visible to users in previously released versions.
+IGNORE_COMMANDS_LIST: List[str] = [
+ # The following commands were released behind a feature flag in 5.3 but were shelved in
+ # favor of getClusterParameter and setClusterParameter. Since the feature flag was not enabled
+ # in 5.3, they were effectively unusable and so can be safely removed from the strict API.
+ 'getChangeStreamOptions',
+ 'setChangeStreamOptions',
+]
+
class FieldCompatibility:
"""Information about a Field to check compatibility."""
@@ -1193,6 +1202,12 @@ def check_compatibility(old_idl_dir: str, new_idl_dir: str, old_import_directori
if old_cmd.api_version == "" or old_cmd.imported:
continue
+ # Ignore select commands that were removed after being added to the strict API.
+ # Only commands that were never visible to the end-user in previous releases
+ # (i.e., hidden behind a feature flag) should be allowed here.
+ if old_cmd.command_name in IGNORE_COMMANDS_LIST:
+ continue
+
if old_cmd.api_version != "1":
# We're not ready to handle future API versions yet.
ctxt.add_command_invalid_api_version_error(
diff --git a/buildscripts/idl/tests/test_binder.py b/buildscripts/idl/tests/test_binder.py
index 2ce3dd45559..b52c755e34b 100644
--- a/buildscripts/idl/tests/test_binder.py
+++ b/buildscripts/idl/tests/test_binder.py
@@ -942,6 +942,23 @@ class TestBinder(testcase.IDLTestcase):
always_serialize: true
"""))
+ # Test field of a struct type with default=true
+ self.assert_bind(test_preamble + textwrap.dedent("""
+ structs:
+ foo:
+ description: foo
+ fields:
+ field1: string
+
+ bar:
+ description: foo
+ fields:
+ field2:
+ type: foo
+ default: true
+
+ """))
+
def test_field_negative(self):
# type: () -> None
"""Negative field tests."""
@@ -964,7 +981,7 @@ class TestBinder(testcase.IDLTestcase):
bindata_subtype: uuid
""")
- # Test field of a struct type with a default
+ # Test field of a struct type with a non-true default
self.assert_bind_fail(
test_preamble + textwrap.dedent("""
structs:
@@ -980,7 +997,7 @@ class TestBinder(testcase.IDLTestcase):
type: foo
default: foo
- """), idl.errors.ERROR_ID_FIELD_MUST_BE_EMPTY_FOR_STRUCT)
+ """), idl.errors.ERROR_ID_DEFAULT_MUST_BE_TRUE_OR_EMPTY_FOR_STRUCT)
# Test array as field name
self.assert_bind_fail(
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index 8b5882ecd30..4bded0416d6 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -364,7 +364,7 @@ let viewsCommandTests = {
fsyncUnlock: {skip: isUnrelated},
getAuditConfig: {skip: isUnrelated},
getDatabaseVersion: {skip: isUnrelated},
- getChangeStreamOptions: {skip: isUnrelated},
+ getChangeStreamOptions: {skip: isUnrelated}, // TODO SERVER-65353 remove in 6.1.
getClusterParameter: {skip: isUnrelated},
getCmdLineOpts: {skip: isUnrelated},
getDefaultRWConcern: {skip: isUnrelated},
@@ -605,7 +605,7 @@ let viewsCommandTests = {
saslStart: {skip: isUnrelated},
sbe: {skip: isAnInternalCommand},
serverStatus: {command: {serverStatus: 1}, skip: isUnrelated},
- setChangeStreamOptions: {skip: isUnrelated},
+ setChangeStreamOptions: {skip: isUnrelated}, // TODO SERVER-65353 remove in 6.1.
setIndexCommitQuorum: {skip: isUnrelated},
setAuditConfig: {skip: isUnrelated},
setCommittedSnapshot: {skip: isAnInternalCommand},
diff --git a/jstests/libs/cluster_server_parameter_utils.js b/jstests/libs/cluster_server_parameter_utils.js
index 4e778e321f6..4b9e71ef4ef 100644
--- a/jstests/libs/cluster_server_parameter_utils.js
+++ b/jstests/libs/cluster_server_parameter_utils.js
@@ -11,8 +11,12 @@
*
*/
-const clusterParameterNames =
- ["testStrClusterParameter", "testIntClusterParameter", "testBoolClusterParameter"];
+const clusterParameterNames = [
+ "testStrClusterParameter",
+ "testIntClusterParameter",
+ "testBoolClusterParameter",
+ "changeStreamOptions"
+];
const clusterParametersDefault = [
{
_id: "testStrClusterParameter",
@@ -25,6 +29,12 @@ const clusterParametersDefault = [
{
_id: "testBoolClusterParameter",
boolData: false,
+ },
+ {
+ _id: "changeStreamOptions",
+ preAndPostImages: {
+ expireAfterSeconds: "off",
+ },
}
];
@@ -40,6 +50,12 @@ const clusterParametersInsert = [
{
_id: "testBoolClusterParameter",
boolData: true,
+ },
+ {
+ _id: "changeStreamOptions",
+ preAndPostImages: {
+ expireAfterSeconds: 30,
+ },
}
];
@@ -55,6 +71,12 @@ const clusterParametersUpdate = [
{
_id: "testBoolClusterParameter",
boolData: false,
+ },
+ {
+ _id: "changeStreamOptions",
+ preAndPostImages: {
+ expireAfterSeconds: "off",
+ },
}
];
@@ -226,6 +248,8 @@ function testValidClusterParameterCommands(conn) {
function testInvalidGetClusterParameter(conn) {
const adminDB = conn.getDB('admin');
// Assert that specifying a nonexistent parameter returns an error.
+ assert.commandFailed(
+ adminDB.runCommand({setClusterParameter: {nonexistentParam: {intData: 5}}}));
assert.commandFailedWithCode(adminDB.runCommand({getClusterParameter: "nonexistentParam"}),
ErrorCodes.NoSuchKey);
assert.commandFailedWithCode(adminDB.runCommand({getClusterParameter: ["nonexistentParam"]}),
@@ -233,6 +257,9 @@ function testInvalidGetClusterParameter(conn) {
assert.commandFailedWithCode(
adminDB.runCommand({getClusterParameter: ["testIntClusterParameter", "nonexistentParam"]}),
ErrorCodes.NoSuchKey);
+
+ // Assert that specifying a known parameter with a scalar value fails.
+ assert.commandFailed(adminDB.runCommand({setClusterParameter: {testIntClusterParameter: 5}}));
}
// Tests that invalid uses of set/getClusterParameter fail with the appropriate errors.
diff --git a/jstests/noPassthrough/change_stream_options.js b/jstests/noPassthrough/change_stream_options.js
new file mode 100644
index 00000000000..abc518b9f69
--- /dev/null
+++ b/jstests/noPassthrough/change_stream_options.js
@@ -0,0 +1,183 @@
+// Tests setClusterParameter and getClusterParameter for changeStreamOptions on standalone, replica
+// set and sharded cluster configurations.
+// @tags: [
+// requires_fcv_60,
+// featureFlagClusterWideConfig,
+// ]
+(function() {
+"use strict";
+
+const testDBName = jsTestName();
+
+// Tests set and get change stream options command with 'admin' database.
+function testChangeStreamOptionsWithAdminDB(conn) {
+ const adminDB = conn.getDB("admin");
+
+ // A set command without any parameter options should fail.
+ assert.commandFailedWithCode(
+ adminDB.runCommand({setClusterParameter: {changeStreamOptions: {}}}), ErrorCodes.BadValue);
+
+ // A set request with empty 'preAndPostImages' should fail.
+ assert.commandFailedWithCode(
+ adminDB.runCommand({setClusterParameter: {changeStreamOptions: {preAndPostImages: {}}}}),
+ ErrorCodes.BadValue);
+
+ // An invalid string value of 'expireAfterSeconds' should fail.
+ assert.commandFailedWithCode(adminDB.runCommand({
+ setClusterParameter:
+ {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: "unknown"}}}
+ }),
+ ErrorCodes.BadValue);
+
+ // A negative value of 'expireAfterSeconds' should fail.
+ assert.commandFailedWithCode(adminDB.runCommand({
+ setClusterParameter: {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: -1}}}
+ }),
+ ErrorCodes.BadValue);
+
+ // A zero value of 'expireAfterSeconds' should fail.
+ assert.commandFailedWithCode(adminDB.runCommand({
+ setClusterParameter: {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: 0}}}
+ }),
+ ErrorCodes.BadValue);
+
+ // Disable purging of expired pre- and post-images and validate that the get change stream
+ // options retrieves the expected options.
+ assert.commandWorked(adminDB.runCommand({
+ setClusterParameter: {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: "off"}}}
+ }));
+ const response1 =
+ assert.commandWorked(adminDB.runCommand({getClusterParameter: "changeStreamOptions"}));
+ assert.eq(
+ response1.clusterParameters[0].preAndPostImages, {expireAfterSeconds: "off"}, response1);
+
+ // Set the expiration time for pre- and post-images and validate get change stream options
+ // command.
+ assert.commandWorked(adminDB.runCommand({
+ setClusterParameter:
+ {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: NumberLong(10)}}}
+ }));
+ const response2 =
+ assert.commandWorked(adminDB.runCommand({getClusterParameter: "changeStreamOptions"}));
+ assert.eq(response2.clusterParameters[0].preAndPostImages,
+ {expireAfterSeconds: NumberLong(10)},
+ response2);
+}
+
+// Tests the set and get change stream options on the standalone configuration.
+(function testChangeStreamOptionsOnStandalone() {
+ const standalone = MongoRunner.runMongod();
+ const adminDB = standalone.getDB("admin");
+
+ // Verify that the set and get commands cannot be issued on a standalone server.
+ assert.commandFailedWithCode(adminDB.runCommand({
+ setClusterParameter:
+ {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: NumberLong(10)}}}
+ }),
+ ErrorCodes.IllegalOperation);
+ assert.commandFailedWithCode(adminDB.runCommand({getClusterParameter: "changeStreamOptions"}),
+ ErrorCodes.IllegalOperation);
+
+ MongoRunner.stopMongod(standalone);
+})();
+
+// Tests the set and get change stream options on the replica-set.
+(function testChangeStreamOptionsOnReplicaSet() {
+ const replSetTest = new ReplSetTest({name: "replSet", nodes: 2});
+ replSetTest.startSet();
+ replSetTest.initiate();
+
+ const primary = replSetTest.getPrimary();
+ const secondary = replSetTest.getSecondaries()[0];
+
+ // Verify that the set and get commands cannot be issued on database other than the 'admin'.
+ [primary, secondary].forEach(conn => {
+ assert.commandFailedWithCode(conn.getDB(testDBName).runCommand({
+ setClusterParameter:
+ {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: NumberLong(10)}}}
+ }),
+ ErrorCodes.Unauthorized);
+
+ assert.commandFailedWithCode(
+ conn.getDB(testDBName).runCommand({getClusterParameter: "changeStreamOptions"}),
+ ErrorCodes.Unauthorized);
+ });
+
+ // Tests the set and get commands on the primary node.
+ testChangeStreamOptionsWithAdminDB(primary);
+
+ replSetTest.stopSet();
+})();
+
+// Tests the set and get change stream options on the sharded cluster.
+(function testChangeStreamOptionsOnShardedCluster() {
+ const shardingTest = new ShardingTest({shards: 1, mongos: 1});
+ const adminDB = shardingTest.rs0.getPrimary().getDB("admin");
+
+ // Test that setClusterParameter cannot be issued directly on shards in the sharded cluster,
+ // while getClusterParameter can.
+ assert.commandFailedWithCode(adminDB.runCommand({
+ setClusterParameter:
+ {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: NumberLong(10)}}}
+ }),
+ ErrorCodes.NotImplemented);
+ assert.commandWorked(adminDB.runCommand({getClusterParameter: "changeStreamOptions"}));
+
+ // Run the set and get commands on the mongoS.
+ testChangeStreamOptionsWithAdminDB(shardingTest.s);
+
+ shardingTest.stop();
+})();
+
+// Tests that setClusterParameter and getClusterParameter can only be executed by user with
+// privilege actions 'setClusterParameter' and 'getClusterParameter' respectively.
+(function testClusterParameterChangeStreamOptionsForAuthorization() {
+ const replSetTest =
+ new ReplSetTest({name: "shard", nodes: 1, useHostName: true, waitForKeys: false});
+ replSetTest.startSet({keyFile: "jstests/libs/key1"});
+ replSetTest.initiate();
+
+ const primary = replSetTest.getPrimary();
+
+ // Create a user with admin role on 'admin' database.
+ primary.getDB("admin").createUser({
+ user: "adminUser",
+ pwd: "adminUser",
+ roles: [{role: "userAdminAnyDatabase", db: "admin"}]
+ });
+
+ // Verify that the admin user is unauthorized to execute set and get change stream options
+ // command.
+ assert(primary.getDB("admin").auth("adminUser", "adminUser"));
+ assert.commandFailedWithCode(primary.getDB("admin").runCommand({
+ setClusterParameter:
+ {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: NumberLong(10)}}}
+ }),
+ ErrorCodes.Unauthorized);
+ assert.commandFailedWithCode(
+ primary.getDB("admin").runCommand({getClusterParameter: "changeStreamOptions"}),
+ ErrorCodes.Unauthorized);
+
+ // Create a user with cluster admin role on 'admin' database.
+ primary.getDB("admin").createUser({
+ user: "clusterManager",
+ pwd: "clusterManager",
+ roles: [{role: "clusterManager", db: "admin"}]
+ });
+
+ primary.getDB("admin").logout();
+
+ // Verify that the cluster manager user is authorized to execute set and get change stream
+ // options command.
+ assert(primary.getDB("admin").auth("clusterManager", "clusterManager"));
+ assert.commandWorked(primary.getDB("admin").runCommand({
+ setClusterParameter:
+ {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: NumberLong(10)}}}
+ }));
+ assert.commandWorked(
+ primary.getDB("admin").runCommand({getClusterParameter: "changeStreamOptions"}));
+ primary.getDB("admin").logout();
+
+ replSetTest.stopSet();
+})();
+}());
diff --git a/jstests/noPassthrough/change_stream_options_command.js b/jstests/noPassthrough/change_stream_options_command.js
deleted file mode 100644
index 69d2cab6c8c..00000000000
--- a/jstests/noPassthrough/change_stream_options_command.js
+++ /dev/null
@@ -1,158 +0,0 @@
-// Tests commands to set and get change stream options on standalone, replica set and sharded
-// cluster configuration.
-// @tags: [
-// requires_fcv_53,
-// featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy,
-// ]
-(function() {
-"use strict";
-
-const testDBName = jsTestName();
-
-// Tests set and get change stream options command with 'admin' database.
-function testChangeStreamOptionsWithAdminDB(conn) {
- const adminDB = conn.getDB("admin");
-
- // A set command without any parameter options should fail.
- assert.commandFailedWithCode(adminDB.runCommand({setChangeStreamOptions: 1}), 5869202);
-
- // A set request with empty 'preAndPostImages' should fail.
- assert.commandFailedWithCode(
- adminDB.runCommand({setChangeStreamOptions: 1, preAndPostImages: {}}), 5869203);
-
- // An invalid string value of 'expireAfterSeconds' should fail.
- assert.commandFailedWithCode(
- adminDB.runCommand(
- {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: "unknown"}}),
- 5869204);
-
- // A negative value of 'expireAfterSeconds' should fail.
- assert.commandFailedWithCode(
- adminDB.runCommand({setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: -1}}),
- 5869205);
-
- // A zero value of 'expireAfterSeconds' should fail.
- assert.commandFailedWithCode(
- adminDB.runCommand({setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: 0}}),
- 5869205);
-
- // Disable purging of expired pre- and post-images and validate that the get change stream
- // options retrieves the expected options.
- assert.commandWorked(adminDB.runCommand(
- {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: "off"}}));
- const response1 = assert.commandWorked(adminDB.runCommand({getChangeStreamOptions: 1}));
- assert.neq(response1.hasOwnProperty("preAndPostImages"), response1);
-
- // Set the expiration time for pre- and post-images and validate get change stream options
- // command.
- assert.commandWorked(adminDB.runCommand(
- {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: 10}}));
- const response2 = assert.commandWorked(adminDB.runCommand({getChangeStreamOptions: 1}));
- assert.eq(response2.preAndPostImages, {expireAfterSeconds: NumberLong(10)}, response2);
-}
-
-// Tests the set and get change stream options on the standalone configuration.
-(function testChangeStreamOptionsOnStandalone() {
- const standalone = MongoRunner.runMongod();
- const adminDB = standalone.getDB("admin");
-
- // Verify that the set and get commands cannot be issued on a standalone server.
- assert.commandFailedWithCode(
- adminDB.runCommand({setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: 10}}),
- 5869200);
- assert.commandFailedWithCode(adminDB.runCommand({getChangeStreamOptions: 1}), 5869207);
-
- MongoRunner.stopMongod(standalone);
-})();
-
-// Tests the set and get change stream options on the replica-set.
-(function testChangeStreamOptionsOnReplicaSet() {
- const replSetTest = new ReplSetTest({name: "replSet", nodes: 2});
- replSetTest.startSet();
- replSetTest.initiate();
-
- const primary = replSetTest.getPrimary();
- const secondary = replSetTest.getSecondaries()[0];
-
- // Verify that the set and get commands cannot be issued on database other than the 'admin'.
- [primary, secondary].forEach(conn => {
- assert.commandFailedWithCode(conn.getDB(testDBName).runCommand({
- setChangeStreamOptions: 1,
- preAndPostImages: {expireAfterSeconds: 10}
- }),
- ErrorCodes.Unauthorized);
-
- assert.commandFailedWithCode(conn.getDB(testDBName).runCommand({getChangeStreamOptions: 1}),
- ErrorCodes.Unauthorized);
- });
-
- // Tests the set and get commands on the primary node.
- testChangeStreamOptionsWithAdminDB(primary);
-
- replSetTest.stopSet();
-})();
-
-// Tests the set and get change stream options on the sharded cluster.
-(function testChangeStreamOptionsOnShardedCluster() {
- const shardingTest = new ShardingTest({shards: 1, mongos: 1});
- const adminDB = shardingTest.rs0.getPrimary().getDB("admin");
-
- // Test that set and get commands cannot be issued directly on shards in the sharded cluster.
- assert.commandFailedWithCode(
- adminDB.runCommand({setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: 10}}),
- 5869201);
- assert.commandFailedWithCode(adminDB.runCommand({getChangeStreamOptions: 1}), 5869208);
-
- // Run the set and get commands on the mongoS.
- testChangeStreamOptionsWithAdminDB(shardingTest.s);
-
- shardingTest.stop();
-})();
-
-// Tests that set and get change stream options command can only be executed by user with privilege
-// actions 'setChangeStreamOptions' and 'getChangeStreamOptions' respectively.
-(function testChangeStreamOptionsForAuthorization() {
- const replSetTest =
- new ReplSetTest({name: "shard", nodes: 1, useHostName: true, waitForKeys: false});
- replSetTest.startSet({keyFile: "jstests/libs/key1"});
- replSetTest.initiate();
-
- const primary = replSetTest.getPrimary();
-
- // Create a user with admin role on 'admin' database.
- primary.getDB("admin").createUser({
- user: "adminUser",
- pwd: "adminUser",
- roles: [{role: "userAdminAnyDatabase", db: "admin"}]
- });
-
- // Verify that the admin user is unauthorized to execute set and get change stream options
- // command.
- assert(primary.getDB("admin").auth("adminUser", "adminUser"));
- assert.commandFailedWithCode(
- primary.getDB("admin").runCommand(
- {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: 10}}),
- ErrorCodes.Unauthorized);
- assert.commandFailedWithCode(primary.getDB("admin").runCommand({getChangeStreamOptions: 1}),
- ErrorCodes.Unauthorized);
-
- // Create a user with cluster admin role on 'admin' database.
- primary.getDB("admin").createUser({
- user: "clusterManager",
- pwd: "clusterManager",
- roles: [{role: "clusterManager", db: "admin"}]
- });
-
- primary.getDB("admin").logout();
-
- // Verify that the cluster manager user is authorized to execute set and get change stream
- // options command.
- assert(primary.getDB("admin").auth("clusterManager", "clusterManager"));
- assert.commandWorked(primary.getDB("admin").runCommand(
- {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: 10}}));
- assert.commandWorked(primary.getDB("admin").runCommand({getChangeStreamOptions: 1}));
- primary.getDB("admin").logout();
-
- replSetTest.stopSet();
-})();
-}());
diff --git a/jstests/noPassthrough/change_stream_pre_image_time_based_expiration_replset.js b/jstests/noPassthrough/change_stream_pre_image_time_based_expiration_replset.js
new file mode 100644
index 00000000000..b135a5acb6b
--- /dev/null
+++ b/jstests/noPassthrough/change_stream_pre_image_time_based_expiration_replset.js
@@ -0,0 +1,22 @@
+// Tests time-based pre-image retention policy of change stream pre-images remover job.
+// @tags: [
+// requires_fcv_60,
+// featureFlagClusterWideConfig,
+// ]
+(function() {
+"use strict";
+
+load("jstests/noPassthrough/libs/change_stream_pre_image_time_based_expiration_utils.js");
+
+// Tests pre-image time based expiration on a replica-set.
+(function testChangeStreamPreImagesforTimeBasedExpirationOnReplicaSet() {
+ const replSetTest = new ReplSetTest({name: "replSet", nodes: 3});
+ replSetTest.startSet();
+ replSetTest.initiate();
+
+ const conn = replSetTest.getPrimary();
+ const primary = replSetTest.getPrimary();
+ testTimeBasedPreImageRetentionPolicy(conn, primary);
+ replSetTest.stopSet();
+})();
+}());
diff --git a/jstests/noPassthrough/change_stream_pre_image_time_based_expiration_sharded.js b/jstests/noPassthrough/change_stream_pre_image_time_based_expiration_sharded.js
new file mode 100644
index 00000000000..f42d8e3b5a2
--- /dev/null
+++ b/jstests/noPassthrough/change_stream_pre_image_time_based_expiration_sharded.js
@@ -0,0 +1,25 @@
+// Tests time-based pre-image retention policy of change stream pre-images remover job.
+// @tags: [
+// requires_fcv_60,
+// featureFlagClusterWideConfig,
+// ]
+(function() {
+"use strict";
+
+load("jstests/noPassthrough/libs/change_stream_pre_image_time_based_expiration_utils.js");
+
+// Tests pre-image time-based expiration on a sharded cluster.
+(function testChangeStreamPreImagesforTimeBasedExpirationOnShardedCluster() {
+ const options = {
+ mongos: 1,
+ config: 1,
+ shards: 1,
+ rs: {
+ nodes: 3,
+ },
+ };
+ const st = new ShardingTest(options);
+ testTimeBasedPreImageRetentionPolicy(st.s0, st.rs0.getPrimary());
+ st.stop();
+})();
+}());
diff --git a/jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js b/jstests/noPassthrough/libs/change_stream_pre_image_time_based_expiration_utils.js
index c993c82ead5..0a2073a0716 100644
--- a/jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js
+++ b/jstests/noPassthrough/libs/change_stream_pre_image_time_based_expiration_utils.js
@@ -1,11 +1,4 @@
-// Tests time-based pre-image retention policy of change stream pre-images remover job.
-// @tags: [
-// requires_fcv_60,
-// featureFlagChangeStreamPreAndPostImages,
-// featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy,
-// ]
-(function() {
-"use strict";
+// Library functions for change_stream_pre_image_time_based_expiration tests.
load("jstests/libs/fail_point_util.js"); // For configureFailPoint.
@@ -32,6 +25,10 @@ function verifyPreImages(preImageColl, expectedPreImages, collectionsInfo) {
}
// Tests time-based change stream pre-image retention policy.
+// When run on a replica set, both 'conn' and 'primary' store connections to the
+// replica set primary node.
+// When run on a sharded cluster, 'conn' represents the connection to the mongos while
+// 'primary' represents the connection to the shard primary node.
function testTimeBasedPreImageRetentionPolicy(conn, primary) {
// Status for pre-images that define if pre-image is expected to expire or not.
const shouldExpire = "shouldExpire";
@@ -55,7 +52,7 @@ function testTimeBasedPreImageRetentionPolicy(conn, primary) {
];
const collectionCount = docsStatePerCollection.length;
- const testDB = conn.getDB("test");
+ const testDB = primary.getDB("test");
// Create several collections with pre- and post-images enabled.
for (let collIdx = 0; collIdx < collectionCount; collIdx++) {
@@ -75,8 +72,9 @@ function testTimeBasedPreImageRetentionPolicy(conn, primary) {
});
// Disable pre-image time-based expiration policy.
- assert.commandWorked(conn.getDB("admin").runCommand(
- {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: "off"}}));
+ assert.commandWorked(conn.getDB("admin").runCommand({
+ setClusterParameter: {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: "off"}}}
+ }));
let shouldRetainDocs = [];
let shouldExpireDocs = [];
@@ -161,8 +159,10 @@ function testTimeBasedPreImageRetentionPolicy(conn, primary) {
verifyPreImages(preImageColl, allDocs, collectionsInfo);
// Enable time-based pre-image expiration and configure the 'expireAfterSeconds' to 1 seconds.
- assert.commandWorked(conn.getDB("admin").runCommand(
- {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: expireAfterSeconds}}));
+ assert.commandWorked(conn.getDB("admin").runCommand({
+ setClusterParameter:
+ {changeStreamOptions: {preAndPostImages: {expireAfterSeconds: expireAfterSeconds}}}
+ }));
// Verify that at some point in time, all expired pre-images will be deleted.
assert.soon(() => {
@@ -175,17 +175,3 @@ function testTimeBasedPreImageRetentionPolicy(conn, primary) {
currentTimeFailPoint.off();
}
-
-// Tests pre-image time based expiration on a replica-set.
-// TODO SERVER-61802: Add test cases for shared cluster.
-(function testChangeStreamPreImagesforTimeBasedExpirationOnReplicaSet() {
- const replSetTest = new ReplSetTest({name: "replSet", nodes: 1});
- replSetTest.startSet();
- replSetTest.initiate();
-
- const conn = replSetTest.getPrimary();
- const primary = replSetTest.getPrimary();
- testTimeBasedPreImageRetentionPolicy(conn, primary);
- replSetTest.stopSet();
-})();
-}());
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index d5000e2deef..b6c1fcdc778 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -215,7 +215,7 @@ const allCommands = {
fsync: {skip: isNotAUserDataRead},
fsyncUnlock: {skip: isNotAUserDataRead},
getAuditConfig: {skip: isNotAUserDataRead},
- getChangeStreamOptions: {skip: isNotAUserDataRead},
+ getChangeStreamOptions: {skip: isNotAUserDataRead}, // TODO SERVER-65353 remove in 6.1.
getClusterParameter: {skip: isNotAUserDataRead},
getCmdLineOpts: {skip: isNotAUserDataRead},
getDatabaseVersion: {skip: isNotAUserDataRead},
@@ -332,7 +332,7 @@ const allCommands = {
sbe: {skip: isAnInternalCommand},
serverStatus: {skip: isNotAUserDataRead},
setAuditConfig: {skip: isNotAUserDataRead},
- setChangeStreamOptions: {skip: isPrimaryOnly},
+ setChangeStreamOptions: {skip: isPrimaryOnly}, // TODO SERVER-65353 remove in 6.1.
setCommittedSnapshot: {skip: isNotAUserDataRead},
setDefaultRWConcern: {skip: isPrimaryOnly},
setIndexCommitQuorum: {skip: isPrimaryOnly},
diff --git a/jstests/sharding/database_versioning_all_commands.js b/jstests/sharding/database_versioning_all_commands.js
index 5127c7836ad..fe7c409f524 100644
--- a/jstests/sharding/database_versioning_all_commands.js
+++ b/jstests/sharding/database_versioning_all_commands.js
@@ -456,8 +456,10 @@ let testCases = {
flushRouterConfig: {skip: "executes locally on mongos (not sent to any remote node)"},
fsync: {skip: "broadcast to all shards"},
getAuditConfig: {skip: "not on a user database", conditional: true},
- getChangeStreamOptions:
- {skip: "executes locally on mongos (not sent to any remote node)", conditional: true},
+ getChangeStreamOptions: {
+ skip: "executes locally on mongos (not sent to any remote node)",
+ conditional: true
+ }, // TODO SERVER-65353 remove in 6.1.
getClusterParameter: {skip: "always targets the config server"},
getCmdLineOpts: {skip: "executes locally on mongos (not sent to any remote node)"},
getDefaultRWConcern: {skip: "executes locally on mongos (not sent to any remote node)"},
@@ -626,7 +628,10 @@ let testCases = {
serverStatus: {skip: "executes locally on mongos (not sent to any remote node)"},
setAllowMigrations: {skip: "not on a user database"},
setAuditConfig: {skip: "not on a user database", conditional: true},
- setChangeStreamOptions: {skip: "always targets the config server", conditional: true},
+ setChangeStreamOptions: {
+ skip: "always targets the config server",
+ conditional: true
+ }, // TODO SERVER-65353 remove in 6.1.
setDefaultRWConcern: {skip: "always targets the config server"},
setIndexCommitQuorum: {
run: {
diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js
index 7f495cbe2a4..96d0ab9f4d0 100644
--- a/jstests/sharding/read_write_concern_defaults_application.js
+++ b/jstests/sharding/read_write_concern_defaults_application.js
@@ -477,7 +477,8 @@ let testCases = {
fsync: {skip: "does not accept read or write concern"},
fsyncUnlock: {skip: "does not accept read or write concern"},
getAuditConfig: {skip: "does not accept read or write concern"},
- getChangeStreamOptions: {skip: "TODO PM-2502"},
+ getChangeStreamOptions:
+ {skip: "does not accept read or write concern"}, // TODO SERVER-65353 remove in 6.1.
getClusterParameter: {skip: "does not accept read or write concern"},
getCmdLineOpts: {skip: "does not accept read or write concern"},
getDatabaseVersion: {skip: "does not accept read or write concern"},
@@ -689,7 +690,8 @@ let testCases = {
serverStatus: {skip: "does not accept read or write concern"},
setAllowMigrations: {skip: "does not accept read or write concern"},
setAuditConfig: {skip: "does not accept read or write concern"},
- setChangeStreamOptions: {skip: "TODO PM-2502"},
+ setChangeStreamOptions:
+ {skip: "does not accept read or write concern"}, // TODO SERVER-65353 remove in 6.1.
setCommittedSnapshot: {skip: "internal command"},
setDefaultRWConcern: {skip: "special case (must run after all other commands)"},
setFeatureCompatibilityVersion: {skip: "does not accept read or write concern"},
diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js
index 21e887dc4bc..d6ad842361f 100644
--- a/jstests/sharding/safe_secondary_reads_drop_recreate.js
+++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js
@@ -213,7 +213,8 @@ let testCases = {
fsync: {skip: "does not return user data"},
fsyncUnlock: {skip: "does not return user data"},
getAuditConfig: {skip: "does not return user data"},
- getChangeStreamOptions: {skip: "does not return user data"},
+ getChangeStreamOptions:
+ {skip: "does not return user data"}, // TODO SERVER-65353 remove in 6.1.
getClusterParameter: {skip: "does not return user data"},
getCmdLineOpts: {skip: "does not return user data"},
getDefaultRWConcern: {skip: "does not return user data"},
@@ -323,7 +324,7 @@ let testCases = {
serverStatus: {skip: "does not return user data"},
setAllowMigrations: {skip: "primary only"},
setAuditConfig: {skip: "does not return user data"},
- setChangeStreamOptions: {skip: "primary only"},
+ setChangeStreamOptions: {skip: "primary only"}, // TODO SERVER-65353 remove in 6.1.
setCommittedSnapshot: {skip: "does not return user data"},
setDefaultRWConcern: {skip: "primary only"},
setIndexCommitQuorum: {skip: "primary only"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
index b7a35ef9d49..3e01be669a4 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
@@ -246,7 +246,8 @@ let testCases = {
fsync: {skip: "does not return user data"},
fsyncUnlock: {skip: "does not return user data"},
getAuditConfig: {skip: "does not return user data"},
- getChangeStreamOptions: {skip: "does not return user data"},
+ getChangeStreamOptions:
+ {skip: "does not return user data"}, // TODO SERVER-65353 remove in 6.1.
getClusterParameter: {skip: "does not return user data"},
getCmdLineOpts: {skip: "does not return user data"},
getDefaultRWConcern: {skip: "does not return user data"},
@@ -394,7 +395,7 @@ let testCases = {
serverStatus: {skip: "does not return user data"},
setAllowMigrations: {skip: "primary only"},
setAuditConfig: {skip: "does not return user data"},
- setChangeStreamOptions: {skip: "primary only"},
+ setChangeStreamOptions: {skip: "primary only"}, // TODO SERVER-65353 remove in 6.1.
setCommittedSnapshot: {skip: "does not return user data"},
setDefaultRWConcern: {skip: "primary only"},
setIndexCommitQuorum: {skip: "primary only"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
index 4fef3a7cbb8..5c75ee77baf 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
@@ -218,7 +218,8 @@ let testCases = {
fsync: {skip: "does not return user data"},
fsyncUnlock: {skip: "does not return user data"},
getAuditConfig: {skip: "does not return user data"},
- getChangeStreamOptions: {skip: "does not return user data"},
+ getChangeStreamOptions:
+ {skip: "does not return user data"}, // TODO SERVER-65353 remove in 6.1.
getClusterParameter: {skip: "does not return user data"},
getCmdLineOpts: {skip: "does not return user data"},
getDefaultRWConcern: {skip: "does not return user data"},
@@ -330,7 +331,7 @@ let testCases = {
serverStatus: {skip: "does not return user data"},
setAllowMigrations: {skip: "primary only"},
setAuditConfig: {skip: "does not return user data"},
- setChangeStreamOptions: {skip: "primary only"},
+ setChangeStreamOptions: {skip: "primary only"}, // TODO SERVER-65353 remove in 6.1.
setCommittedSnapshot: {skip: "does not return user data"},
setDefaultRWConcern: {skip: "primary only"},
setIndexCommitQuorum: {skip: "primary only"},
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 043d2a0d512..c81942bc35f 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -490,13 +490,23 @@ env.Library(
)
env.Library(
+ target='change_stream_options',
+ source=[
+ 'change_stream_options.idl',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/idl/cluster_server_parameter',
+ ],
+)
+
+env.Library(
target='change_stream_options_manager',
source=[
'change_stream_options_manager.cpp',
+ 'change_stream_options_parameter.idl',
],
LIBDEPS_PRIVATE=[
- '$BUILD_DIR/mongo/db/commands/change_stream_options',
- '$BUILD_DIR/mongo/idl/feature_flag',
+ 'change_stream_options',
],
)
diff --git a/src/mongo/db/auth/action_type.idl b/src/mongo/db/auth/action_type.idl
index 6cfbd32d0a9..1d83378f275 100644
--- a/src/mongo/db/auth/action_type.idl
+++ b/src/mongo/db/auth/action_type.idl
@@ -96,7 +96,6 @@ enums:
flushRouterConfig : "flushRouterConfig"
forceUUID : "forceUUID"
fsync : "fsync"
- getChangeStreamOptions: "getChangeStreamOptions"
getClusterParameter: "getClusterParameter"
getDatabaseVersion : "getDatabaseVersion"
getDefaultRWConcern : "getDefaultRWConcern"
@@ -161,7 +160,6 @@ enums:
runTenantMigration : "runTenantMigration"
serverStatus : "serverStatus"
setAuthenticationRestriction : "setAuthenticationRestriction"
- setChangeStreamOptions: "setChangeStreamOptions"
setClusterParameter: "setClusterParameter"
setDefaultRWConcern : "setDefaultRWConcern"
setFeatureCompatibilityVersion : "setFeatureCompatibilityVersion"
diff --git a/src/mongo/db/auth/builtin_roles.cpp b/src/mongo/db/auth/builtin_roles.cpp
index 27bb1ee6b54..2b0c63cb798 100644
--- a/src/mongo/db/auth/builtin_roles.cpp
+++ b/src/mongo/db/auth/builtin_roles.cpp
@@ -254,8 +254,6 @@ MONGO_INITIALIZER(AuthorizationBuiltinRoles)(InitializerContext* context) {
<< ActionType::setDefaultRWConcern
<< ActionType::setFeatureCompatibilityVersion
<< ActionType::setFreeMonitoring
- << ActionType::setChangeStreamOptions
- << ActionType::getChangeStreamOptions
<< ActionType::setClusterParameter
<< ActionType::getClusterParameter;
diff --git a/src/mongo/db/change_stream_options.idl b/src/mongo/db/change_stream_options.idl
new file mode 100644
index 00000000000..83e9d160868
--- /dev/null
+++ b/src/mongo/db/change_stream_options.idl
@@ -0,0 +1,58 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# Data structures for change stream configuration options.
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+ - "mongo/idl/cluster_server_parameter.idl"
+
+structs:
+ PreAndPostImagesOptions:
+ description: "Change streams pre- and post-images options."
+ fields:
+ expireAfterSeconds:
+ description: "The number of seconds after which a pre-image is eligible for
+ deletion. A string value 'off' enables the default expiration policy."
+ unstable: false
+ type:
+ variant: [string, safeInt64]
+ default: "\"off\""
+ ChangeStreamOptions:
+ description: "A specification for the change streams options."
+ inline_chained_structs: true
+ chained_structs:
+ ClusterServerParameter: clusterServerParameter
+ fields:
+ preAndPostImages:
+ type: PreAndPostImagesOptions
+ unstable: false
+ default: true
diff --git a/src/mongo/db/change_stream_options_manager.cpp b/src/mongo/db/change_stream_options_manager.cpp
index b2f474777a8..4c27c32ec6f 100644
--- a/src/mongo/db/change_stream_options_manager.cpp
+++ b/src/mongo/db/change_stream_options_manager.cpp
@@ -32,6 +32,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/change_stream_options_manager.h"
+#include "mongo/db/change_stream_options_parameter_gen.h"
#include "mongo/logv2/log.h"
namespace mongo {
@@ -53,8 +54,7 @@ void ChangeStreamOptionsManager::create(ServiceContext* service) {
getChangeStreamOptionsManager(service).emplace(service);
}
-boost::optional<ChangeStreamOptions> ChangeStreamOptionsManager::getOptions(
- OperationContext* opCtx) {
+const ChangeStreamOptions& ChangeStreamOptionsManager::getOptions(OperationContext* opCtx) {
stdx::lock_guard<Latch> L(_mutex);
return _changeStreamOptions;
}
@@ -63,7 +63,99 @@ StatusWith<ChangeStreamOptions> ChangeStreamOptionsManager::setOptions(
OperationContext* opCtx, ChangeStreamOptions optionsToSet) {
stdx::lock_guard<Latch> L(_mutex);
_changeStreamOptions = std::move(optionsToSet);
- return *_changeStreamOptions;
+ return _changeStreamOptions;
+}
+
+void ChangeStreamOptionsParameter::append(OperationContext* opCtx,
+ BSONObjBuilder& bob,
+ const std::string& name) {
+ ChangeStreamOptionsManager& changeStreamOptionsManager =
+ ChangeStreamOptionsManager::get(getGlobalServiceContext());
+ bob.append("_id"_sd, name);
+ bob.appendElementsUnique(changeStreamOptionsManager.getOptions(opCtx).toBSON());
+}
+
+Status ChangeStreamOptionsParameter::set(const BSONElement& newValueElement) {
+ try {
+ Status validateStatus = validate(newValueElement);
+ if (!validateStatus.isOK()) {
+ return validateStatus;
+ }
+
+ ChangeStreamOptionsManager& changeStreamOptionsManager =
+ ChangeStreamOptionsManager::get(getGlobalServiceContext());
+ ChangeStreamOptions newOptions = ChangeStreamOptions::parse(
+ IDLParserErrorContext("changeStreamOptions"), newValueElement.Obj());
+
+ return changeStreamOptionsManager
+ .setOptions(Client::getCurrent()->getOperationContext(), newOptions)
+ .getStatus();
+ } catch (const AssertionException&) {
+ return {ErrorCodes::BadValue, "Could not parse changeStreamOptions parameter"};
+ }
+}
+
+Status ChangeStreamOptionsParameter::validate(const BSONElement& newValueElement) const {
+ try {
+ BSONObj changeStreamOptionsObj = newValueElement.Obj();
+ Status validateStatus = Status::OK();
+
+ // PreAndPostImages currently contains a single field, `expireAfterSeconds`, that is
+ // default- initialized to 'off'. This is useful for parameter initialization at startup but
+ // causes the IDL parser to not enforce the presence of `expireAfterSeconds` in BSON
+ // representations. We assert that and the existence of PreAndPostImages here.
+ IDLParserErrorContext ctxt = IDLParserErrorContext("changeStreamOptions"_sd);
+ if (auto preAndPostImagesObj = changeStreamOptionsObj["preAndPostImages"_sd];
+ !preAndPostImagesObj.eoo()) {
+ if (preAndPostImagesObj["expireAfterSeconds"_sd].eoo()) {
+ ctxt.throwMissingField("expireAfterSeconds"_sd);
+ }
+ } else {
+ ctxt.throwMissingField("preAndPostImages"_sd);
+ }
+
+ ChangeStreamOptions newOptions = ChangeStreamOptions::parse(ctxt, changeStreamOptionsObj);
+ auto preAndPostImages = newOptions.getPreAndPostImages();
+ stdx::visit(
+ visit_helper::Overloaded{
+ [&](const std::string& expireAfterSeconds) {
+ if (expireAfterSeconds != "off"_sd) {
+ validateStatus = {
+ ErrorCodes::BadValue,
+ "Non-numeric value of 'expireAfterSeconds' should be 'off'"};
+ }
+ },
+ [&](const std::int64_t& expireAfterSeconds) {
+ if (expireAfterSeconds <= 0) {
+ validateStatus = {
+ ErrorCodes::BadValue,
+ "Numeric value of 'expireAfterSeconds' should be positive"};
+ }
+ },
+ },
+ preAndPostImages.getExpireAfterSeconds());
+
+ return validateStatus;
+ } catch (const AssertionException& ex) {
+ return {ErrorCodes::BadValue,
+ str::stream() << "Failed parsing new changeStreamOptions value" << ex.reason()};
+ }
+}
+
+Status ChangeStreamOptionsParameter::reset() {
+ // Replace the current changeStreamOptions with a default-constructed one, which should
+ // automatically set preAndPostImages.expirationSeconds to 'off' by default.
+ ChangeStreamOptionsManager& changeStreamOptionsManager =
+ ChangeStreamOptionsManager::get(getGlobalServiceContext());
+ return changeStreamOptionsManager
+ .setOptions(Client::getCurrent()->getOperationContext(), ChangeStreamOptions())
+ .getStatus();
+}
+
+const LogicalTime ChangeStreamOptionsParameter::getClusterParameterTime() const {
+ ChangeStreamOptionsManager& changeStreamOptionsManager =
+ ChangeStreamOptionsManager::get(getGlobalServiceContext());
+ return changeStreamOptionsManager.getClusterParameterTime();
}
} // namespace mongo
diff --git a/src/mongo/db/change_stream_options_manager.h b/src/mongo/db/change_stream_options_manager.h
index f87258b11c8..ea1fe2a3b47 100644
--- a/src/mongo/db/change_stream_options_manager.h
+++ b/src/mongo/db/change_stream_options_manager.h
@@ -29,7 +29,7 @@
#pragma once
-#include "mongo/db/commands/change_stream_options_gen.h"
+#include "mongo/db/change_stream_options_gen.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/mutex.h"
@@ -63,9 +63,9 @@ public:
static ChangeStreamOptionsManager& get(OperationContext* opCtx);
/**
- * Returns the change-streams options if present, boost::none otherwise.
+ * Returns the change-streams options.
*/
- boost::optional<ChangeStreamOptions> getOptions(OperationContext* opCtx);
+ const ChangeStreamOptions& getOptions(OperationContext* opCtx);
/**
* Sets the provided change-streams options. Returns OK on success, otherwise appropriate error
@@ -74,11 +74,18 @@ public:
StatusWith<ChangeStreamOptions> setOptions(OperationContext* opCtx,
ChangeStreamOptions optionsToSet);
+ /**
+ * Returns the clusterParameterTime of the current change stream options.
+ */
+ const LogicalTime& getClusterParameterTime() {
+ return _changeStreamOptions.getClusterParameterTime();
+ }
+
private:
ChangeStreamOptionsManager(const ChangeStreamOptionsManager&) = delete;
ChangeStreamOptionsManager& operator=(const ChangeStreamOptionsManager&) = delete;
- boost::optional<ChangeStreamOptions> _changeStreamOptions;
+ ChangeStreamOptions _changeStreamOptions;
Mutex _mutex = MONGO_MAKE_LATCH("ChangeStreamOptionsManager::mutex");
};
diff --git a/src/mongo/db/change_stream_options_parameter.idl b/src/mongo/db/change_stream_options_parameter.idl
new file mode 100644
index 00000000000..122e2793e3b
--- /dev/null
+++ b/src/mongo/db/change_stream_options_parameter.idl
@@ -0,0 +1,42 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# Cluster server parameter for change streams configuration options.
+
+global:
+ cpp_namespace: "mongo"
+
+server_parameters:
+ changeStreamOptions:
+ description: "Cluster server parameter for change stream options"
+ set_at: cluster
+ cpp_class:
+ name: ChangeStreamOptionsParameter
+ override_set: true
+ override_validate: true
+ \ No newline at end of file
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index a6138057096..6a08a6b7e9e 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -203,18 +203,6 @@ env.Library(
)
env.Library(
- target='change_stream_options',
- source=[
- 'change_stream_options.idl',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/auth/auth',
- '$BUILD_DIR/mongo/db/auth/authprivilege',
- '$BUILD_DIR/mongo/db/read_write_concern_defaults',
- ]
-)
-
-env.Library(
target="authentication_commands",
source=[
'authentication_commands.cpp',
@@ -515,7 +503,6 @@ env.Library(
target="mongod",
source=[
"apply_ops_cmd.cpp",
- "change_stream_options_command.cpp",
"collection_to_capped.cpp",
"compact.cpp",
"cpuload.cpp",
@@ -568,7 +555,6 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/index_key_validate',
'$BUILD_DIR/mongo/db/change_stream_options_manager',
'$BUILD_DIR/mongo/db/commands',
- '$BUILD_DIR/mongo/db/commands/change_stream_options',
'$BUILD_DIR/mongo/db/curop_failpoint_helpers',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/exec/sbe_cmd',
diff --git a/src/mongo/db/commands/change_stream_options.idl b/src/mongo/db/commands/change_stream_options.idl
deleted file mode 100644
index 56146ad9049..00000000000
--- a/src/mongo/db/commands/change_stream_options.idl
+++ /dev/null
@@ -1,102 +0,0 @@
-# Copyright (C) 2022-present MongoDB, Inc.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the Server Side Public License, version 1,
-# as published by MongoDB, Inc.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# Server Side Public License for more details.
-#
-# You should have received a copy of the Server Side Public License
-# along with this program. If not, see
-# <http://www.mongodb.com/licensing/server-side-public-license>.
-#
-# As a special exception, the copyright holders give permission to link the
-# code of portions of this program with the OpenSSL library under certain
-# conditions as described in each individual source file and distribute
-# linked combinations including the program with the OpenSSL library. You
-# must comply with the Server Side Public License in all respects for
-# all of the code used other than as permitted herein. If you modify file(s)
-# with this exception, you may extend this exception to your version of the
-# file(s), but you are not obligated to do so. If you do not wish to do so,
-# delete this exception statement from your version. If you delete this
-# exception statement from all source files in the program, then also delete
-# it in the license file.
-#
-
-# Commands to set and get change streams configuration options and associated data structures.
-
-global:
- cpp_namespace: "mongo"
-
-imports:
- - "mongo/db/auth/access_checks.idl"
- - "mongo/db/auth/action_type.idl"
- - "mongo/db/write_concern_options.idl"
- - "mongo/idl/basic_types.idl"
-
-structs:
- PreAndPostImagesOptions:
- description: "Change streams pre- and post-images options."
- fields:
- expireAfterSeconds:
- description: "The number of seconds after which a pre-image is eligible for
- deletion. A string value 'off' enables the default expiration policy."
- optional: true
- unstable: false
- type:
- variant: [string, safeInt64]
- ChangeStreamOptions:
- description: "A specification for the change streams options."
- fields:
- preAndPostImages:
- type: PreAndPostImagesOptions
- optional: true
- unstable: false
- GetChangeStreamOptionsResponse:
- description: "A response for the get change streams options command."
- chained_structs:
- ChangeStreamOptions: ChangeStreamOptions
-
-commands:
- setChangeStreamOptions:
- description: "A command to set the change streams options."
- command_name: setChangeStreamOptions
- namespace: ignored
- cpp_name: setChangeStreamOptions
- strict: true
- api_version: "1"
- access_check:
- complex:
- - check: is_authenticated
- - privilege:
- resource_pattern: cluster
- action_type: setChangeStreamOptions
- fields:
- preAndPostImages:
- description: "The pre- and post-images options to set."
- type: PreAndPostImagesOptions
- optional: true
- unstable: false
- writeConcern:
- description: "The level of write concern for the command."
- type: WriteConcern
- optional: true
- unstable: false
- reply_type: OkReply
- getChangeStreamOptions:
- description: "A command to get the change streams options."
- command_name: getChangeStreamOptions
- namespace: ignored
- cpp_name: getChangeStreamOptions
- strict: true
- api_version: "1"
- access_check:
- complex:
- - check: is_authenticated
- - privilege:
- resource_pattern: cluster
- action_type: getChangeStreamOptions
- reply_type: GetChangeStreamOptionsResponse \ No newline at end of file
diff --git a/src/mongo/db/commands/change_stream_options_command.cpp b/src/mongo/db/commands/change_stream_options_command.cpp
deleted file mode 100644
index 7fad14c0ac7..00000000000
--- a/src/mongo/db/commands/change_stream_options_command.cpp
+++ /dev/null
@@ -1,264 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/auth/authorization_checks.h"
-#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/change_stream_options_manager.h"
-#include "mongo/db/commands.h"
-#include "mongo/db/commands/change_stream_options_gen.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/logv2/log.h"
-
-namespace mongo {
-namespace {
-/**
- * A versioned command class to set the change streams options. This command should not run
- * on 'mongoS'.
- */
-class SetChangeStreamOptionsCommand
- : public SetChangeStreamOptionsCmdVersion1Gen<SetChangeStreamOptionsCommand> {
-public:
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
- return AllowedOnSecondary::kNever;
- }
-
- bool adminOnly() const final {
- return true;
- }
-
- std::string help() const override {
- return "Sets change streams configuration options.\n"
- "Usage: { setChangeStreamOptions: 1, preAndPostImages: { expireAfterSeconds: "
- "<long>|'off'> }, writeConcern: { <write concern> }}";
- }
-
- class Invocation final : public InvocationBaseGen {
- public:
- Invocation(OperationContext* opCtx,
- const Command* command,
- const OpMsgRequest& opMsgRequest)
- : InvocationBaseGen(opCtx, command, opMsgRequest) {}
-
- bool supportsWriteConcern() const final {
- return true;
- }
-
- NamespaceString ns() const final {
- return NamespaceString();
- }
-
- Reply typedRun(OperationContext* opCtx) final {
- assertCanIssueCommand(opCtx);
- return setOptionsAndReply(opCtx);
- }
-
- private:
- /**
- * A helper to verify if the command can be run. Throws 'uassert' in case of failures.
- */
- void assertCanIssueCommand(OperationContext* opCtx) {
- const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
-
- uassert(5869200,
- str::stream() << "'" << SetChangeStreamOptions::kCommandName
- << "' is not supported on standalone nodes.",
- replCoord->isReplEnabled());
-
- uassert(5869201,
- str::stream() << "'" << SetChangeStreamOptions::kCommandName
- << "' is not supported on shard nodes.",
- serverGlobalParams.clusterRole != ClusterRole::ShardServer);
-
- uassert(5869202,
- "Expected at least one change stream option to set",
- request().getPreAndPostImages());
- }
-
- /**
- * Sets the change streams options using 'changeStreamOptionsManager'.
- */
- Reply setOptionsAndReply(OperationContext* opCtx) {
- // Create a request object for the 'changeStreamOptionsManager'.
- auto optionsToSet = ChangeStreamOptions();
-
- if (auto& preAndPostImage = request().getPreAndPostImages()) {
- uassert(5869203,
- "Expected 'expireAfterSeconds' option",
- preAndPostImage->getExpireAfterSeconds());
-
- stdx::visit(
- visit_helper::Overloaded{
- [&](const std::string& expirationPolicyMode) {
- uassert(5869204,
- "Non-numeric value of 'expireAfterSeconds' should be 'off'",
- expirationPolicyMode == "off");
- },
- [&](const std::int64_t& expiryTime) {
- uassert(5869205,
- "Numeric value of 'expireAfterSeconds' should be positive",
- expiryTime > 0);
- }},
- *preAndPostImage->getExpireAfterSeconds());
-
- optionsToSet.setPreAndPostImages(preAndPostImage);
- }
-
- // Store the change streams configuration options.
- auto& changeStreamOptionManager = ChangeStreamOptionsManager::get(opCtx);
- auto status = changeStreamOptionManager.setOptions(opCtx, optionsToSet);
- uassert(5869206,
- str::stream() << "Failed to set change stream options, status: "
- << status.getStatus().codeString(),
- status.isOK());
-
- return Reply();
- }
-
- void doCheckAuthorization(OperationContext* opCtx) const override {
- uassert(ErrorCodes::Unauthorized,
- "Feature flag featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy "
- "must be enabled",
- feature_flags::gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy
- .isEnabled(serverGlobalParams.featureCompatibility));
-
- uassert(ErrorCodes::Unauthorized,
- "Unauthorized",
- AuthorizationSession::get(opCtx->getClient())
- ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(),
- ActionType::setChangeStreamOptions}));
- }
- };
-} setChangeStreamOptionsCommand;
-
-/**
- * A versioned command class to get the change streams options. This command should not run on
- * 'mongoS'.
- */
-class GetChangeStreamOptionsCommand
- : public GetChangeStreamOptionsCmdVersion1Gen<GetChangeStreamOptionsCommand> {
-public:
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
- return AllowedOnSecondary::kNever;
- }
-
- bool adminOnly() const override {
- return true;
- }
-
- std::string help() const final {
- return "Gets change streams configuration options.\n"
- "Usage: { getChangeStreamOptions: 1 }";
- }
-
- class Invocation final : public InvocationBaseGen {
- public:
- Invocation(OperationContext* opCtx,
- const Command* command,
- const OpMsgRequest& opMsgRequest)
- : InvocationBaseGen(opCtx, command, opMsgRequest) {}
-
- bool supportsWriteConcern() const final {
- return false;
- }
-
- NamespaceString ns() const final {
- return NamespaceString();
- }
-
- Reply typedRun(OperationContext* opCtx) final {
- assertCanIssueCommand(opCtx);
- return getOptionsAndReply(opCtx);
- }
-
- private:
- /**
- * A helper to verify if the command can be run. Throws 'uassert' in case of failures.
- */
- void assertCanIssueCommand(OperationContext* opCtx) {
- const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
-
- uassert(5869207,
- str::stream() << "'" << GetChangeStreamOptions::kCommandName
- << "' is not supported on standalone nodes.",
- replCoord->isReplEnabled());
-
- uassert(5869208,
- str::stream() << "'" << GetChangeStreamOptions::kCommandName
- << "' is not supported on shard nodes.",
- serverGlobalParams.clusterRole != ClusterRole::ShardServer);
- }
-
- /**
- * Gets the change streams options from the 'ChangeStreamOptionsManager', creates a response
- * from it, and return it back to the client.
- */
- Reply getOptionsAndReply(OperationContext* opCtx) {
- auto reply = Reply();
-
- // Get the change streams options, if present and return it back to the client.
- auto& changeStreamOptionManager = ChangeStreamOptionsManager::get(opCtx);
-
- if (auto changeStreamOptions = changeStreamOptionManager.getOptions(opCtx)) {
- if (auto preAndPostImages = changeStreamOptions->getPreAndPostImages()) {
- // Add 'expiredAfterSeconds' to the reply message only when the default
- // expiration policy is not enabled. A string type of 'expireAfterSeconds'
- // signifies that the value it holds is 'off'.
- if (preAndPostImages->getExpireAfterSeconds() &&
- !stdx::holds_alternative<std::string>(
- *preAndPostImages->getExpireAfterSeconds())) {
- reply.setChangeStreamOptions(std::move(*changeStreamOptions));
- }
- }
- }
-
- return reply;
- }
-
- void doCheckAuthorization(OperationContext* opCtx) const override {
- uassert(ErrorCodes::Unauthorized,
- "Feature flag featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy "
- "must be enabled",
- feature_flags::gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy
- .isEnabled(serverGlobalParams.featureCompatibility));
-
- uassert(ErrorCodes::Unauthorized,
- "Unauthorized",
- AuthorizationSession::get(opCtx->getClient())
- ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(),
- ActionType::getChangeStreamOptions}));
- }
- };
-} getChangeStreamOptionsCommand;
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index bb37a5b15ba..6b54a25e0b8 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -645,8 +645,8 @@ env.CppUnitTest(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/change_stream_options',
'$BUILD_DIR/mongo/db/change_stream_options_manager',
- '$BUILD_DIR/mongo/db/commands/change_stream_options',
'$BUILD_DIR/mongo/db/cst/cst',
'$BUILD_DIR/mongo/db/exec/document_value/document_value',
'$BUILD_DIR/mongo/db/exec/document_value/document_value_test_util',
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
index f015f39680c..02a6b932ffd 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
@@ -70,13 +70,14 @@ bool PreImageAttributes::isExpiredPreImage(const boost::optional<Date_t>& preIma
return preImageOplogEntryIsDeleted || operationTime <= expirationTime;
}
-// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if present, boost::none otherwise.
+// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if not 'off', boost::none otherwise.
boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions(
ChangeStreamOptions& changeStreamOptions) {
- if (auto preAndPostImages = changeStreamOptions.getPreAndPostImages(); preAndPostImages &&
- preAndPostImages->getExpireAfterSeconds() &&
- !stdx::holds_alternative<std::string>(*preAndPostImages->getExpireAfterSeconds())) {
- return stdx::get<std::int64_t>(*preAndPostImages->getExpireAfterSeconds());
+ const stdx::variant<std::string, std::int64_t>& expireAfterSeconds =
+ changeStreamOptions.getPreAndPostImages().getExpireAfterSeconds();
+
+ if (!stdx::holds_alternative<std::string>(expireAfterSeconds)) {
+ return stdx::get<std::int64_t>(expireAfterSeconds);
}
return boost::none;
@@ -88,9 +89,8 @@ boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_
boost::optional<std::int64_t> expireAfterSeconds = boost::none;
// Get the expiration time directly from the change stream manager.
- if (auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx)) {
- expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(*changeStreamOptions);
- }
+ auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx);
+ expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(changeStreamOptions);
// A pre-image is eligible for deletion if:
// pre-image's op-time + expireAfterSeconds < currentTime.
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
index 89849552b0a..0ddd491991f 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
@@ -29,7 +29,7 @@
#pragma once
-#include "mongo/db/commands/change_stream_options_gen.h"
+#include "mongo/db/change_stream_options_gen.h"
#include "mongo/db/service_context.h"
namespace mongo {
diff --git a/src/mongo/db/query/query_feature_flags.idl b/src/mongo/db/query/query_feature_flags.idl
index a0d7ce2c14c..4dcbb2382a8 100644
--- a/src/mongo/db/query/query_feature_flags.idl
+++ b/src/mongo/db/query/query_feature_flags.idl
@@ -124,11 +124,6 @@ feature_flags:
default: true
version: 6.0
- featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy:
- description: "Feature flag to enable time based retention policy of point-in-time pre- and post-images of documents in change streams"
- cpp_varname: gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy
- default: false
-
featureFlagSBELookupPushdown:
description: "Feature flag for allowing SBE $lookup pushdown"
cpp_varname: gFeatureFlagSBELookupPushdown
diff --git a/src/mongo/idl/SConscript b/src/mongo/idl/SConscript
index a63af90f83a..de25a54c950 100644
--- a/src/mongo/idl/SConscript
+++ b/src/mongo/idl/SConscript
@@ -55,11 +55,10 @@ env.Library(
'cluster_server_parameter.idl',
],
LIBDEPS=[
- '$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/logical_time',
+ 'feature_flag',
],
LIBDEPS_PRIVATE=[
- 'feature_flag',
'idl_parser',
],
)
@@ -87,6 +86,7 @@ env.CppUnitTest(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/change_stream_options_manager',
'$BUILD_DIR/mongo/db/repl/oplog',
'$BUILD_DIR/mongo/db/repl/oplog_interface_local',
'$BUILD_DIR/mongo/db/repl/replmocks',
diff --git a/src/mongo/idl/cluster_server_parameter_op_observer_test.cpp b/src/mongo/idl/cluster_server_parameter_op_observer_test.cpp
index 0b4b6b9163f..a55a88980c0 100644
--- a/src/mongo/idl/cluster_server_parameter_op_observer_test.cpp
+++ b/src/mongo/idl/cluster_server_parameter_op_observer_test.cpp
@@ -31,6 +31,7 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/change_stream_options_manager.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface_local.h"
@@ -154,6 +155,9 @@ public:
std::make_unique<repl::ReplicationCoordinatorMock>(service, createReplSettings()));
repl::createOplog(opCtx.get());
+ // Set up the ChangeStreamOptionsManager so that it can be retrieved/set.
+ ChangeStreamOptionsManager::create(service);
+
// Ensure that we are primary.
auto replCoord = repl::ReplicationCoordinator::get(opCtx.get());
ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 32bb92585ee..7432c3a3b86 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -525,6 +525,7 @@ env.Library(
'$BUILD_DIR/mongo/client/remote_command_targeter',
'$BUILD_DIR/mongo/db/audit',
'$BUILD_DIR/mongo/db/auth/authmongos',
+ '$BUILD_DIR/mongo/db/change_stream_options_manager',
'$BUILD_DIR/mongo/db/commands/rwc_defaults_commands',
'$BUILD_DIR/mongo/db/ftdc/ftdc_mongos',
'$BUILD_DIR/mongo/db/process_health/fault_manager',
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 5cdade782f9..91a07c18245 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -33,7 +33,6 @@ env.Library(
'cluster_abort_transaction_cmd_s.cpp',
'cluster_available_query_options_cmd.cpp',
'cluster_build_info.cpp',
- 'cluster_change_stream_options_command.cpp',
'cluster_cleanup_reshard_collection_cmd.cpp',
'cluster_coll_stats_cmd.cpp',
'cluster_collection_mod_cmd.cpp',
@@ -107,7 +106,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/api_parameters',
'$BUILD_DIR/mongo/db/auth/auth_checks',
- '$BUILD_DIR/mongo/db/commands/change_stream_options',
+ '$BUILD_DIR/mongo/db/change_stream_options_manager',
'$BUILD_DIR/mongo/db/commands/cluster_server_parameter_cmds_idl',
'$BUILD_DIR/mongo/db/commands/core',
'$BUILD_DIR/mongo/db/commands/create_command',
diff --git a/src/mongo/s/commands/cluster_change_stream_options_command.cpp b/src/mongo/s/commands/cluster_change_stream_options_command.cpp
deleted file mode 100644
index e91aa657fb1..00000000000
--- a/src/mongo/s/commands/cluster_change_stream_options_command.cpp
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/auth/authorization_checks.h"
-#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/change_stream_options_manager.h"
-#include "mongo/db/commands.h"
-#include "mongo/db/commands/change_stream_options_gen.h"
-#include "mongo/db/read_write_concern_defaults.h"
-#include "mongo/db/repl/read_concern_args.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/logv2/log.h"
-#include "mongo/s/cluster_commands_helpers.h"
-#include "mongo/s/grid.h"
-
-namespace mongo {
-namespace {
-
-/**
- * Creates a reply object of type 'Reply' from raw 'response' BSON object. Any extra fields are
- * removed from the 'response' object and then passed into to the IDL parser to create the response
- * object.
- */
-template <typename Reply>
-Reply createReply(BSONObj response, const std::string& errorContextField) {
- constexpr auto kReplData = "$replData"_sd;
- constexpr auto kLastCommittedOpTime = "lastCommittedOpTime"_sd;
- constexpr auto kClusterTime = "$clusterTime"_sd;
- constexpr auto kConfigTime = "$configTime"_sd;
- constexpr auto kTopologyTime = "$topologyTime"_sd;
- constexpr auto kOperationTime = "operationTime"_sd;
-
- // A set of fields to be removed from the 'response' object.
- StringDataSet ignorableFields({kReplData,
- kLastCommittedOpTime,
- ErrorReply::kOkFieldName,
- kClusterTime,
- kConfigTime,
- kTopologyTime,
- kOperationTime});
-
- // Create the reply object from the redacted response object.
- return Reply::parse(IDLParserErrorContext(errorContextField),
- response.removeFields(ignorableFields));
-}
-
-class ClusterSetChangeStreamOptionsCommand
- : public SetChangeStreamOptionsCmdVersion1Gen<ClusterSetChangeStreamOptionsCommand> {
-public:
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
- return AllowedOnSecondary::kNever;
- }
-
- bool adminOnly() const override {
- return true;
- }
-
- std::string help() const override {
- return "Sets change streams configuration options.\n"
- "Usage: { setChangeStreamOptions: 1, preAndPostImages: { expireAfterSeconds: "
- "<long>|'off'> }, writeConcern: { <write concern> }}";
- }
-
- class Invocation final : public InvocationBaseGen {
- public:
- Invocation(OperationContext* opCtx,
- const Command* command,
- const OpMsgRequest& opMsgRequest)
- : InvocationBaseGen(opCtx, command, opMsgRequest) {}
-
- bool supportsWriteConcern() const final {
- return true;
- }
-
- NamespaceString ns() const final {
- return NamespaceString();
- }
-
- Reply typedRun(OperationContext* opCtx) final {
- // Get the configuration server connection and dispatch the request to it to set the
- // change streams options.
- auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
- auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- NamespaceString::kAdminDb.toString(),
- CommandHelpers::appendMajorityWriteConcern(
- CommandHelpers::filterCommandRequestForPassthrough(request().toBSON({})),
- opCtx->getWriteConcern()),
- Shard::RetryPolicy::kNotIdempotent));
-
- uassertStatusOK(cmdResponse.commandStatus);
-
- return Reply();
- }
-
- private:
- void doCheckAuthorization(OperationContext* opCtx) const override {
- uassert(ErrorCodes::Unauthorized,
- "Feature flag featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy "
- "must be enabled",
- feature_flags::gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy
- .isEnabled(serverGlobalParams.featureCompatibility));
-
- uassert(ErrorCodes::Unauthorized,
- "Unauthorized",
- AuthorizationSession::get(opCtx->getClient())
- ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(),
- ActionType::setChangeStreamOptions}));
- }
- };
-} clusterSetChangeStreamOptionsCommand;
-
-class ClusterGetChangeStreamOptionsCommand
- : public GetChangeStreamOptionsCmdVersion1Gen<ClusterGetChangeStreamOptionsCommand> {
-public:
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
- return AllowedOnSecondary::kNever;
- }
- bool adminOnly() const override {
- return true;
- }
-
- std::string help() const final {
- return "Gets change streams configuration options.\n"
- "Usage: { getChangeStreamOptions: 1 }";
- }
-
- class Invocation final : public InvocationBaseGen {
- public:
- Invocation(OperationContext* opCtx,
- const Command* command,
- const OpMsgRequest& opMsgRequest)
- : InvocationBaseGen(opCtx, command, opMsgRequest) {}
-
- bool supportsWriteConcern() const final {
- return false;
- }
-
- NamespaceString ns() const final {
- return NamespaceString();
- }
-
- Reply typedRun(OperationContext* opCtx) final {
- // Get the change streams options from the configuration server.
- auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
- auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- NamespaceString::kAdminDb.toString(),
- applyReadWriteConcern(
- opCtx,
- this,
- CommandHelpers::filterCommandRequestForPassthrough(request().toBSON({}))),
- Shard::RetryPolicy::kIdempotent));
-
- uassertStatusOK(cmdResponse.commandStatus);
-
- // Create the reply object from the configuration server response and return it back to
- // the client.
- return createReply<Reply>(cmdResponse.response, "ClusterGetChangeStreamOptionsCommand");
- }
-
- private:
- void doCheckAuthorization(OperationContext* opCtx) const override {
- uassert(ErrorCodes::Unauthorized,
- "Feature flag featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy "
- "must be enabled",
- feature_flags::gFeatureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy
- .isEnabled(serverGlobalParams.featureCompatibility));
-
- uassert(ErrorCodes::Unauthorized,
- "Unauthorized",
- AuthorizationSession::get(opCtx->getClient())
- ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(),
- ActionType::getChangeStreamOptions}));
- }
- };
-} clusterGetChangeStreamOptionsCommand;
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/s/mongos_main.cpp b/src/mongo/s/mongos_main.cpp
index 7ed6e3d4315..bea3ea0ae4b 100644
--- a/src/mongo/s/mongos_main.cpp
+++ b/src/mongo/s/mongos_main.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authz_manager_external_state_s.h"
#include "mongo/db/auth/user_cache_invalidator_job.h"
+#include "mongo/db/change_stream_options_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/client_metadata_propagation_egress_hook.h"
#include "mongo/db/dbdirectclient.h"
@@ -699,6 +700,7 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
}
ReadWriteConcernDefaults::create(serviceContext, readWriteConcernDefaultsCacheLookupMongoS);
+ ChangeStreamOptionsManager::create(serviceContext);
auto opCtxHolder = tc->makeOperationContext();
auto const opCtx = opCtxHolder.get();