From 91a607d3a2c748ead682c1a44d37263254de26f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcos=20Jos=C3=A9=20Grillo=20Ramirez?= Date: Tue, 5 Apr 2022 17:11:59 +0000 Subject: SERVER-63870 Integrate replica set setClusterParameter into POS with replay protection --- jstests/auth/lib/commands_lib.js | 8 +- jstests/core/views/views_all_commands.js | 1 + .../db_reads_while_recovering_all_commands.js | 1 + .../read_write_concern_defaults_application.js | 1 + jstests/sharding/set_cluster_parameter.js | 78 +++++++++++-- .../db/commands/cluster_server_parameter_cmds.idl | 2 +- .../db/commands/set_cluster_parameter_command.cpp | 6 +- .../commands/set_cluster_parameter_invocation.cpp | 40 ++++--- .../db/commands/set_cluster_parameter_invocation.h | 18 ++- .../set_cluster_parameter_invocation_test.cpp | 38 ++++-- src/mongo/db/s/SConscript | 2 + .../configsvr_set_cluster_parameter_command.cpp | 27 +++++ .../s/config/set_cluster_parameter_coordinator.cpp | 117 ++++++++++++++++++- .../s/config/set_cluster_parameter_coordinator.h | 18 +++ .../set_cluster_parameter_coordinator_document.idl | 5 +- .../s/shardsvr_set_cluster_parameter_command.cpp | 127 +++++++++++++++++++++ src/mongo/db/transaction_validation.cpp | 1 + .../idl/cluster_server_parameter_op_observer.cpp | 11 +- src/mongo/s/request_types/sharded_ddl_commands.idl | 14 ++- 19 files changed, 452 insertions(+), 63 deletions(-) create mode 100644 src/mongo/db/s/shardsvr_set_cluster_parameter_command.cpp diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js index dbc73db0e47..5f9d5856505 100644 --- a/jstests/auth/lib/commands_lib.js +++ b/jstests/auth/lib/commands_lib.js @@ -4178,7 +4178,11 @@ var authCommandsLib = { { testname: "getClusterParameter", command: {getClusterParameter: "testIntClusterParameter"}, - skipTest: (conn) => !TestData.setParameters.featureFlagClusterWideConfig, + skipTest: (conn) => { + const hello = assert.commandWorked(conn.getDB("admin").runCommand({hello: 1})); + const isStandalone = hello.msg !== "isdbgrid" && !hello.hasOwnProperty('setName'); + return !TestData.setParameters.featureFlagClusterWideConfig || isStandalone; + }, testcases: [ { runOnDb: adminDbName, @@ -5666,7 +5670,7 @@ var authCommandsLib = { }, { testname: "setClusterParameter", - command: {setClusterParameter: {testIntClusterParameterParam: {intData: 17}}}, + command: {setClusterParameter: {testIntClusterParameter: {intData: 17}}}, skipTest: (conn) => { const hello = assert.commandWorked(conn.getDB("admin").runCommand({hello: 1})); const isStandalone = hello.msg !== "isdbgrid" && !hello.hasOwnProperty('setName'); diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index 551cb62518c..c6451f6934d 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -169,6 +169,7 @@ let viewsCommandTests = { _shardsvrReshardCollection: {skip: isAnInternalCommand}, _shardsvrReshardingOperationTime: {skip: isAnInternalCommand}, _shardsvrSetAllowMigrations: {skip: isAnInternalCommand}, + _shardsvrSetClusterParameter: {skip: isAnInternalCommand}, _shardsvrSetUserWriteBlockMode: {skip: isAnInternalCommand}, _shardsvrShardCollection: {skip: isAnInternalCommand}, // TODO SERVER-58843: Remove once 6.0 becomes last LTS diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js index d9bc04fe52c..b089f91df8a 100644 --- a/jstests/replsets/db_reads_while_recovering_all_commands.js +++ b/jstests/replsets/db_reads_while_recovering_all_commands.js @@ -97,6 +97,7 @@ const allCommands = { _shardsvrReshardingOperationTime: {skip: isPrimaryOnly}, _shardsvrRefineCollectionShardKey: {skip: isPrimaryOnly}, _shardsvrSetAllowMigrations: {skip: isPrimaryOnly}, + _shardsvrSetClusterParameter: {skip: isAnInternalCommand}, _shardsvrSetUserWriteBlockMode: {skip: isPrimaryOnly}, _shardsvrCollMod: {skip: isPrimaryOnly}, _shardsvrCollModParticipant: {skip: isAnInternalCommand}, diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js index d7df5133590..31d5beb1541 100644 --- a/jstests/sharding/read_write_concern_defaults_application.js +++ b/jstests/sharding/read_write_concern_defaults_application.js @@ -168,6 +168,7 @@ let testCases = { _shardsvrReshardCollection: {skip: "internal command"}, _shardsvrReshardingOperationTime: {skip: "internal command"}, _shardsvrSetAllowMigrations: {skip: "internal command"}, + _shardsvrSetClusterParameter: {skip: "internal command"}, _shardsvrSetUserWriteBlockMode: {skip: "internal command"}, _shardsvrCollMod: {skip: "internal command"}, _shardsvrCollModParticipant: {skip: "internal command"}, diff --git a/jstests/sharding/set_cluster_parameter.js b/jstests/sharding/set_cluster_parameter.js index ad550c6b685..70a2cab98e2 100644 --- a/jstests/sharding/set_cluster_parameter.js +++ b/jstests/sharding/set_cluster_parameter.js @@ -13,15 +13,31 @@ load('jstests/libs/fail_point_util.js'); -const st = new ShardingTest({shards: 1}); +const clusterParameter1Value = { + intData: 42 +}; +const clusterParameter1Name = 'testIntClusterParameter'; +const clusterParameter1 = { + [clusterParameter1Name]: clusterParameter1Value +}; + +const clusterParameter2Value = { + strData: 'on' +}; +const clusterParameter2Name = 'testStrClusterParameter'; +const clusterParameter2 = { + [clusterParameter2Name]: clusterParameter2Value +}; + +const st = new ShardingTest({shards: 1, rs: {nodes: 3}}); let fp = configureFailPoint(st.configRS.getPrimary(), 'hangBeforeRunningConfigsvrCoordinatorInstance'); -let setClusterParameterSuccessThread = new Thread((mongosConnString) => { +let setClusterParameterSuccessThread = new Thread((mongosConnString, clusterParameter) => { let mongos = new Mongo(mongosConnString); - assert.commandWorked(mongos.adminCommand({setClusterParameter: {param: true}})); -}, st.s.host); + assert.commandWorked(mongos.adminCommand({setClusterParameter: clusterParameter})); +}, st.s.host, clusterParameter1); setClusterParameterSuccessThread.start(); fp.wait(); @@ -29,12 +45,13 @@ fp.wait(); jsTestLog( 'Check that 2 requests for the same cluster parameter and same value generates only one coordinator.'); -let setClusterParameterJoinSuccessThread = new Thread((mongosConnString) => { +let setClusterParameterJoinSuccessThread = new Thread((mongosConnString, clusterParameter) => { let mongos = new Mongo(mongosConnString); - assert.commandWorked(mongos.adminCommand({setClusterParameter: {param: true}})); -}, st.s.host); + assert.commandWorked(mongos.adminCommand({setClusterParameter: clusterParameter})); +}, st.s.host, clusterParameter1); setClusterParameterJoinSuccessThread.start(); +fp.wait(); let currOp = st.configRS.getPrimary() @@ -44,17 +61,58 @@ let currOp = .toArray(); assert.eq(1, currOp.length); assert(currOp[0].hasOwnProperty('command')); -assert(currOp[0].command.hasOwnProperty('param')); -assert.eq(true, currOp[0].command.param); +assert.docEq(currOp[0].command, clusterParameter1); jsTestLog('Check that a second request will fail with ConflictingOperationInProgress.'); -assert.commandFailedWithCode(st.s.adminCommand({setClusterParameter: {otherParam: true}}), +assert.commandFailedWithCode(st.s.adminCommand({setClusterParameter: clusterParameter2}), ErrorCodes.ConflictingOperationInProgress); fp.off(); setClusterParameterSuccessThread.join(); setClusterParameterJoinSuccessThread.join(); +jsTestLog('Check forward progress until completion in the presence of a config server stepdown.'); + +fp = configureFailPoint(st.configRS.getPrimary(), 'hangBeforeRunningConfigsvrCoordinatorInstance'); + +let setClusterParameterThread = new Thread((mongosConnString, clusterParameter) => { + let mongos = new Mongo(mongosConnString); + assert.commandWorked(mongos.adminCommand({setClusterParameter: clusterParameter})); +}, st.s.host, clusterParameter2); + +setClusterParameterThread.start(); +fp.wait(); + +let newPrimary = st.configRS.getSecondary(); + +st.configRS.stepUp(newPrimary); + +// After the stepdown the command should be retried and finish successfully. +setClusterParameterThread.join(); + +const clusterParametersConfigColl = + st.configRS.getPrimary().getCollection('config.clusterParameters'); + +const shardParametersConfigColl = st.rs0.getPrimary().getCollection('config.clusterParameters'); + +assert.eq(1, clusterParametersConfigColl.countDocuments({_id: clusterParameter2Name})); +const configClusterParameter = clusterParametersConfigColl.findOne( + {_id: clusterParameter2Name}, {_id: 0, clusterParameterTime: 0}); +const shardClusterParameter = shardParametersConfigColl.findOne({_id: clusterParameter2Name}, + {_id: 0, clusterParameterTime: 0}); +assert.docEq(configClusterParameter, clusterParameter2Value); +assert.docEq(shardClusterParameter, clusterParameter2Value); + +fp.off(); + +// Check the full cluster has the same clusterParameterTime as the config server. +const configParameterTime = + clusterParametersConfigColl.findOne({_id: clusterParameter2Name}, {clusterParameterTime: 1}) + .clusterParameterTime; +assert.eq(configParameterTime, + shardParametersConfigColl.findOne({_id: clusterParameter2Name}, {clusterParameterTime: 1}) + .clusterParameterTime); + st.stop(); })(); diff --git a/src/mongo/db/commands/cluster_server_parameter_cmds.idl b/src/mongo/db/commands/cluster_server_parameter_cmds.idl index 9c768f43f23..256b4cbd6d6 100644 --- a/src/mongo/db/commands/cluster_server_parameter_cmds.idl +++ b/src/mongo/db/commands/cluster_server_parameter_cmds.idl @@ -46,7 +46,7 @@ commands: api_version: "" namespace: type type: object - strict: true + strict: false getClusterParameter: description: "Retrieves the in-memory value of the specified cluster server parameter(s)" diff --git a/src/mongo/db/commands/set_cluster_parameter_command.cpp b/src/mongo/db/commands/set_cluster_parameter_command.cpp index 9d4074ac0bc..c4472f1ae12 100644 --- a/src/mongo/db/commands/set_cluster_parameter_command.cpp +++ b/src/mongo/db/commands/set_cluster_parameter_command.cpp @@ -42,6 +42,10 @@ namespace mongo { namespace { +const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout}; + class SetClusterParameterCommand final : public TypedCommand { public: using Request = SetClusterParameter; @@ -76,7 +80,7 @@ public: SetClusterParameterInvocation invocation{std::move(parameterService), dbService}; - invocation.invoke(opCtx, request()); + invocation.invoke(opCtx, request(), boost::none, kMajorityWriteConcern); } private: diff --git a/src/mongo/db/commands/set_cluster_parameter_invocation.cpp b/src/mongo/db/commands/set_cluster_parameter_invocation.cpp index 513ae376564..8d29147d282 100644 --- a/src/mongo/db/commands/set_cluster_parameter_invocation.cpp +++ b/src/mongo/db/commands/set_cluster_parameter_invocation.cpp @@ -42,8 +42,10 @@ namespace mongo { -void SetClusterParameterInvocation::invoke(OperationContext* opCtx, - const SetClusterParameter& cmd) { +bool SetClusterParameterInvocation::invoke(OperationContext* opCtx, + const SetClusterParameter& cmd, + boost::optional paramTime, + const WriteConcernOptions& writeConcern) { BSONObj cmdParamObj = cmd.getCommandParameter(); BSONElement commandElement = cmdParamObj.firstElement(); @@ -51,14 +53,16 @@ void SetClusterParameterInvocation::invoke(OperationContext* opCtx, const ServerParameter* serverParameter = _sps->getIfExists(parameterName); - uassert(6432601, + uassert(ErrorCodes::IllegalOperation, str::stream() << "Unknown Cluster Parameter " << parameterName, serverParameter != nullptr); - uassert(6432602, + + uassert(ErrorCodes::IllegalOperation, "Cluster parameter value must be an object", BSONType::Object == commandElement.type()); - LogicalTime clusterTime = _dbService.getUpdateClusterTime(opCtx); + LogicalTime clusterTime = + paramTime ? LogicalTime(*paramTime) : _dbService.getUpdateClusterTime(opCtx); BSONObjBuilder updateBuilder; updateBuilder << "_id" << parameterName << "clusterParameterTime" << clusterTime.toBSON(); @@ -67,11 +71,11 @@ void SetClusterParameterInvocation::invoke(OperationContext* opCtx, BSONObj query = BSON("_id" << parameterName); BSONObj update = updateBuilder.obj(); - LOGV2(6432603, "Updating cluster parameter on-disk", "clusterParameter"_attr = parameterName); + LOGV2_DEBUG( + 6432603, 2, "Updating cluster parameter on-disk", "clusterParameter"_attr = parameterName); uassertStatusOK(serverParameter->validate(update)); - Status updateResult = _dbService.updateParameterOnDisk(query, update); - uassertStatusOK(updateResult); + return uassertStatusOK(_dbService.updateParameterOnDisk(opCtx, query, update, writeConcern)); } LogicalTime ClusterParameterDBClientService::getUpdateClusterTime(OperationContext* opCtx) { @@ -79,23 +83,23 @@ LogicalTime ClusterParameterDBClientService::getUpdateClusterTime(OperationConte return vt.clusterTime(); } -Status ClusterParameterDBClientService::updateParameterOnDisk(BSONObj query, BSONObj update) { +StatusWith ClusterParameterDBClientService::updateParameterOnDisk( + OperationContext* opCtx, + BSONObj query, + BSONObj update, + const WriteConcernOptions& writeConcern) { BSONObj res; BSONObjBuilder set; set.append("$set", update); set.doneFast(); - const std::string configDb = "config"; - const auto kMajorityWriteConcern = BSON("writeConcern" << BSON("w" - << "majority")); - const NamespaceString kClusterParametersNamespace(configDb, "clusterParameters"); - try { _dbClient.runCommand( - configDb, + NamespaceString::kConfigDb.toString(), [&] { - write_ops::UpdateCommandRequest updateOp(kClusterParametersNamespace); + write_ops::UpdateCommandRequest updateOp( + NamespaceString::kClusterParametersNamespace); updateOp.setUpdates({[&] { write_ops::UpdateOpEntry entry; entry.setQ(query); @@ -105,7 +109,7 @@ Status ClusterParameterDBClientService::updateParameterOnDisk(BSONObj query, BSO return entry; }()}); - return updateOp.toBSON(kMajorityWriteConcern); + return updateOp.toBSON(writeConcern.toBSON()); }(), res); } catch (const DBException& ex) { @@ -119,7 +123,7 @@ Status ClusterParameterDBClientService::updateParameterOnDisk(BSONObj query, BSO return Status(ErrorCodes::FailedToParse, errmsg); } - return Status::OK(); + return response.getNModified() > 0 || response.getN() > 0; } const ServerParameter* ClusterParameterService::getIfExists(StringData name) { diff --git a/src/mongo/db/commands/set_cluster_parameter_invocation.h b/src/mongo/db/commands/set_cluster_parameter_invocation.h index 35b74530a95..d6ff09f0d47 100644 --- a/src/mongo/db/commands/set_cluster_parameter_invocation.h +++ b/src/mongo/db/commands/set_cluster_parameter_invocation.h @@ -48,7 +48,10 @@ public: class DBClientService { public: - virtual Status updateParameterOnDisk(BSONObj query, BSONObj update) = 0; + virtual StatusWith updateParameterOnDisk(OperationContext* opCtx, + BSONObj query, + BSONObj update, + const WriteConcernOptions&) = 0; virtual LogicalTime getUpdateClusterTime(OperationContext*) = 0; virtual ~DBClientService() = default; }; @@ -56,7 +59,10 @@ public: class ClusterParameterDBClientService final : public DBClientService { public: ClusterParameterDBClientService(DBDirectClient& dbDirectClient) : _dbClient(dbDirectClient) {} - Status updateParameterOnDisk(BSONObj query, BSONObj update) override; + StatusWith updateParameterOnDisk(OperationContext* opCtx, + BSONObj query, + BSONObj update, + const WriteConcernOptions&) override; LogicalTime getUpdateClusterTime(OperationContext*) override; private: @@ -69,13 +75,13 @@ public: DBClientService& dbClientService) : _sps(std::move(serverParmeterService)), _dbService(dbClientService) {} - void invoke(OperationContext*, const SetClusterParameter&); + bool invoke(OperationContext*, + const SetClusterParameter&, + boost::optional, + const WriteConcernOptions&); private: std::unique_ptr _sps; DBClientService& _dbService; - - Status updateParameterOnDisk(BSONObj query, BSONObj update); }; - } // namespace mongo diff --git a/src/mongo/db/commands/set_cluster_parameter_invocation_test.cpp b/src/mongo/db/commands/set_cluster_parameter_invocation_test.cpp index 3934e890684..895b094d47d 100644 --- a/src/mongo/db/commands/set_cluster_parameter_invocation_test.cpp +++ b/src/mongo/db/commands/set_cluster_parameter_invocation_test.cpp @@ -44,6 +44,10 @@ namespace mongo { namespace { +const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout}; + // Mocks class MockParameterService : public ServerParameterService { public: @@ -89,11 +93,14 @@ private: class DBClientMock : public DBClientService { public: - DBClientMock(std::function updateParameterOnDiskMock) { + DBClientMock(std::function(BSONObj, BSONObj)> updateParameterOnDiskMock) { this->updateParameterOnDiskMockImpl = updateParameterOnDiskMock; } - Status updateParameterOnDisk(BSONObj cmd, BSONObj info) override { + StatusWith updateParameterOnDisk(OperationContext* opCtx, + BSONObj cmd, + BSONObj info, + const WriteConcernOptions&) override { return updateParameterOnDiskMockImpl(cmd, info); } @@ -103,7 +110,7 @@ public: } private: - std::function updateParameterOnDiskMockImpl; + std::function(BSONObj, BSONObj)> updateParameterOnDiskMockImpl; }; MockServerParameter alwaysValidatingServerParameter(StringData name) { @@ -123,7 +130,7 @@ MockServerParameter alwaysInvalidatingServerParameter(StringData name) { } DBClientMock alwaysSucceedingDbClient() { - DBClientMock dbServiceMock([&](BSONObj cmd, BSONObj info) { return Status::OK(); }); + DBClientMock dbServiceMock([&](BSONObj cmd, BSONObj info) { return true; }); return dbServiceMock; } @@ -164,7 +171,7 @@ TEST(SetClusterParameterCommand, SucceedsWithObjectParameter) { << "majority"))); SetClusterParameter testCmd(obj); - fixture.invoke(&spyCtx, testCmd); + fixture.invoke(&spyCtx, testCmd, boost::none, kMajorityWriteConcern); } TEST(SetClusterParameterCommand, ThrowsWithNonObjectParameter) { @@ -189,7 +196,9 @@ TEST(SetClusterParameterCommand, ThrowsWithNonObjectParameter) { OperationContext spyCtx(clientPtr, 1234); SetClusterParameter testCmd(obj); - ASSERT_THROWS_CODE(fixture.invoke(&spyCtx, testCmd), DBException, 6432602); + ASSERT_THROWS_CODE(fixture.invoke(&spyCtx, testCmd, boost::none, kMajorityWriteConcern), + DBException, + ErrorCodes::IllegalOperation); } TEST(SetClusterParameterCommand, ThrowsWhenServerParameterValidationFails) { @@ -217,10 +226,11 @@ TEST(SetClusterParameterCommand, ThrowsWhenServerParameterValidationFails) { OperationContext spyCtx(clientPtr, 1234); SetClusterParameter testCmd(obj); - ASSERT_THROWS_CODE_AND_WHAT(fixture.invoke(&spyCtx, testCmd), - DBException, - ErrorCodes::BadValue, - "Parameter Validation Failed"_sd); + ASSERT_THROWS_CODE_AND_WHAT( + fixture.invoke(&spyCtx, testCmd, boost::none, kMajorityWriteConcern), + DBException, + ErrorCodes::BadValue, + "Parameter Validation Failed"_sd); } TEST(SetClusterParameterCommand, ThrowsWhenDBUpdateFails) { @@ -248,7 +258,9 @@ TEST(SetClusterParameterCommand, ThrowsWhenDBUpdateFails) { SetClusterParameter testCmd(obj); - ASSERT_THROWS_WHAT(fixture.invoke(&spyCtx, testCmd), DBException, "DB Client Update Failed"_sd); + ASSERT_THROWS_WHAT(fixture.invoke(&spyCtx, testCmd, boost::none, kMajorityWriteConcern), + DBException, + "DB Client Update Failed"_sd); } TEST(SetClusterParameterCommand, ThrowsWhenParameterNotPresent) { @@ -275,7 +287,9 @@ TEST(SetClusterParameterCommand, ThrowsWhenParameterNotPresent) { SetClusterParameter testCmd(obj); - ASSERT_THROWS_CODE(fixture.invoke(&spyCtx, testCmd), DBException, 6432601); + ASSERT_THROWS_CODE(fixture.invoke(&spyCtx, testCmd, boost::none, kMajorityWriteConcern), + DBException, + ErrorCodes::IllegalOperation); } } // namespace } // namespace mongo diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 45a68eafca0..d16459c2c44 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -422,6 +422,7 @@ env.Library( 'shardsvr_reshard_collection_command.cpp', 'shardsvr_resharding_operation_time_command.cpp', 'shardsvr_set_allow_migrations_command.cpp', + 'shardsvr_set_cluster_parameter_command.cpp', 'shardsvr_set_user_write_block_mode_command.cpp', 'shardsvr_split_chunk_command.cpp', 'split_vector_command.cpp', @@ -437,6 +438,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/mongod_fcv', '$BUILD_DIR/mongo/db/commands/rename_collection_idl', '$BUILD_DIR/mongo/db/commands/server_status', + '$BUILD_DIR/mongo/db/commands/set_cluster_parameter_invocation', '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/commands/txn_cmd_request', '$BUILD_DIR/mongo/db/fle_crud', diff --git a/src/mongo/db/s/config/configsvr_set_cluster_parameter_command.cpp b/src/mongo/db/s/config/configsvr_set_cluster_parameter_command.cpp index beb08387bb7..6a0109194f6 100644 --- a/src/mongo/db/s/config/configsvr_set_cluster_parameter_command.cpp +++ b/src/mongo/db/s/config/configsvr_set_cluster_parameter_command.cpp @@ -33,6 +33,7 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/set_cluster_parameter_invocation.h" #include "mongo/db/s/config/configsvr_coordinator_service.h" #include "mongo/db/s/config/set_cluster_parameter_coordinator.h" #include "mongo/idl/cluster_server_parameter_gen.h" @@ -62,6 +63,32 @@ public: "featureFlagClusterWideConfig not enabled", gFeatureFlagClusterWideConfig.isEnabled(serverGlobalParams.featureCompatibility)); + // Validate parameter before creating coordinator. + { + BSONObj cmdParamObj = request().getCommandParameter(); + BSONElement commandElement = cmdParamObj.firstElement(); + StringData parameterName = commandElement.fieldName(); + std::unique_ptr sps = + std::make_unique(); + const ServerParameter* serverParameter = sps->getIfExists(parameterName); + + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Unknown Cluster Parameter " << parameterName, + serverParameter != nullptr); + + uassert(ErrorCodes::IllegalOperation, + "Cluster parameter value must be an object", + BSONType::Object == commandElement.type()); + + BSONObjBuilder clusterParamBuilder; + clusterParamBuilder << "_id" << parameterName; + clusterParamBuilder.appendElements(commandElement.Obj()); + + BSONObj clusterParam = clusterParamBuilder.obj(); + + uassertStatusOK(serverParameter->validate(clusterParam)); + } + SetClusterParameterCoordinatorDocument coordinatorDoc; coordinatorDoc.setConfigsvrCoordinatorMetadata( {ConfigsvrCoordinatorTypeEnum::kSetClusterParameter}); diff --git a/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp b/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp index 209ebf8a2f3..3966879144d 100644 --- a/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp +++ b/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp @@ -33,13 +33,26 @@ #include "mongo/db/s/config/set_cluster_parameter_coordinator.h" +#include "mongo/db/cancelable_operation_context.h" +#include "mongo/db/commands/cluster_server_parameter_cmds_gen.h" +#include "mongo/db/commands/set_cluster_parameter_invocation.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/s/sharding_logging.h" +#include "mongo/db/s/sharding_util.h" +#include "mongo/db/vector_clock.h" #include "mongo/logv2/log.h" +#include "mongo/s/grid.h" #include "mongo/s/request_types/sharded_ddl_commands_gen.h" namespace mongo { +namespace { + +const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout}; +} bool SetClusterParameterCoordinator::hasSameOptions(const BSONObj& otherDocBSON) const { - // TODO SERVER-63870: add command parameters to comparison. const auto otherDoc = StateDoc::parse( IDLParserErrorContext("SetClusterParameterCoordinatorDocument"), otherDocBSON); return SimpleBSONObjComparator::kInstance.evaluate(_doc.getParameter() == @@ -87,6 +100,59 @@ void SetClusterParameterCoordinator::_enterPhase(Phase newPhase) { _doc = std::move(newDoc); } +bool SetClusterParameterCoordinator::_isClusterParameterSetAtTimestamp(OperationContext* opCtx) { + auto parameterElem = _doc.getParameter().firstElement(); + auto parameterName = parameterElem.fieldName(); + auto parameter = _doc.getParameter()[parameterName].Obj(); + auto configsvrParameters = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString::kClusterParametersNamespace, + BSON("_id" << parameterName << "clusterParameterTime" + << *_doc.getClusterParameterTime()), + BSONObj(), + boost::none)); + + dassert(configsvrParameters.docs.size() <= 1); + + return !configsvrParameters.docs.empty(); +} + +void SetClusterParameterCoordinator::_sendSetClusterParameterToAllShards( + OperationContext* opCtx, + const OperationSessionInfo& session, + std::shared_ptr executor) { + auto shards = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); + + LOGV2_DEBUG(6387001, 1, "Sending setClusterParameter to shards:", "shards"_attr = shards); + + ShardsvrSetClusterParameter request(_doc.getParameter()); + request.setDbName(NamespaceString::kAdminDb); + request.setClusterParameterTime(*_doc.getClusterParameterTime()); + sharding_util::sendCommandToShards( + opCtx, + NamespaceString::kAdminDb, + CommandHelpers::appendMajorityWriteConcern(request.toBSON(session.toBSON())), + shards, + **executor); +} + +void SetClusterParameterCoordinator::_commit(OperationContext* opCtx) { + LOGV2_DEBUG(6387002, 1, "Updating configsvr cluster parameter"); + + SetClusterParameter setClusterParameterRequest(_doc.getParameter()); + setClusterParameterRequest.setDbName(NamespaceString::kAdminDb); + std::unique_ptr parameterService = + std::make_unique(); + DBDirectClient client(opCtx); + ClusterParameterDBClientService dbService(client); + SetClusterParameterInvocation invocation{std::move(parameterService), dbService}; + invocation.invoke( + opCtx, setClusterParameterRequest, _doc.getClusterParameterTime(), kMajorityWriteConcern); +} + const ConfigsvrCoordinatorMetadata& SetClusterParameterCoordinator::metadata() const { return _doc.getConfigsvrCoordinatorMetadata(); } @@ -95,9 +161,52 @@ ExecutorFuture SetClusterParameterCoordinator::_runImpl( std::shared_ptr executor, const CancellationToken& token) noexcept { return ExecutorFuture(**executor) - .then(_executePhase(Phase::kSetClusterParameter, [this, anchor = shared_from_this()] { - // TODO Implement - })); + .then([this, anchor = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + + // Select a cluster parameter time only once, when the coordinator is run the first + // time, this way, even if the process steps down while sending the command to the + // shards, on the next run will use the same time for the remaining shards. + if (!_doc.getClusterParameterTime()) { + // Select a clusterParameter time. + auto vt = VectorClock::get(opCtx)->getTime(); + auto clusterParameterTime = vt.clusterTime(); + _doc.setClusterParameterTime(clusterParameterTime.asTimestamp()); + } + }) + .then(_executePhase(Phase::kSetClusterParameter, + [this, executor = executor, anchor = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + + ShardingLogging::get(opCtx)->logChange( + opCtx, + "setClusterParameter.start", + NamespaceString::kClusterParametersNamespace.toString(), + _doc.getParameter(), + kMajorityWriteConcern); + + // If the parameter was already set on the config server, there is + // nothing else to do. + if (_isClusterParameterSetAtTimestamp(opCtx)) { + return; + } + + _doc = _updateSession(opCtx, _doc); + const auto session = _getCurrentSession(); + + _sendSetClusterParameterToAllShards(opCtx, session, executor); + + _commit(opCtx); + + ShardingLogging::get(opCtx)->logChange( + opCtx, + "setClusterParameter.end", + NamespaceString::kClusterParametersNamespace.toString(), + _doc.getParameter(), + kMajorityWriteConcern); + })); } } // namespace mongo diff --git a/src/mongo/db/s/config/set_cluster_parameter_coordinator.h b/src/mongo/db/s/config/set_cluster_parameter_coordinator.h index ec0f0fead04..2e172991a00 100644 --- a/src/mongo/db/s/config/set_cluster_parameter_coordinator.h +++ b/src/mongo/db/s/config/set_cluster_parameter_coordinator.h @@ -57,6 +57,24 @@ private: ExecutorFuture _runImpl(std::shared_ptr executor, const CancellationToken& token) noexcept override; + /* + * Performs a local write with majority write concern to set the parameter. + */ + void _commit(OperationContext* opCtx); + + /* + * Checks if the cluster parameter was already set to the provided value. + */ + bool _isClusterParameterSetAtTimestamp(OperationContext* opCtx); + + /* + * Sends setClusterParameter to every shard in the cluster with the appropiate session. + */ + void _sendSetClusterParameterToAllShards( + OperationContext* opCtx, + const OperationSessionInfo& opInfo, + std::shared_ptr executor); + const ConfigsvrCoordinatorMetadata& metadata() const override; template diff --git a/src/mongo/db/s/config/set_cluster_parameter_coordinator_document.idl b/src/mongo/db/s/config/set_cluster_parameter_coordinator_document.idl index ced314175c8..ebffbc826a1 100644 --- a/src/mongo/db/s/config/set_cluster_parameter_coordinator_document.idl +++ b/src/mongo/db/s/config/set_cluster_parameter_coordinator_document.idl @@ -55,7 +55,10 @@ structs: type: SetClusterParameterCoordinatorPhase description: "Coordinator phase." default: kUnset - # TODO SERVER-63870: add setClusterParameter request arguments. parameter: type: object_owned description: "Parameter to be set in the cluster" + clusterParameterTime: + type: timestamp + description: "Timestamp determined at the beginning that determines the parameter time." + optional: true diff --git a/src/mongo/db/s/shardsvr_set_cluster_parameter_command.cpp b/src/mongo/db/s/shardsvr_set_cluster_parameter_command.cpp new file mode 100644 index 00000000000..c84ad1dd27e --- /dev/null +++ b/src/mongo/db/s/shardsvr_set_cluster_parameter_command.cpp @@ -0,0 +1,127 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/cancelable_operation_context.h" +#include "mongo/db/commands.h" +#include "mongo/db/commands/set_cluster_parameter_invocation.h" +#include "mongo/db/commands/user_management_commands_gen.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/logv2/log.h" +#include "mongo/s/grid.h" +#include "mongo/s/request_types/sharded_ddl_commands_gen.h" + +namespace mongo { +namespace { + +const WriteConcernOptions kLocalWriteConcern{ + 1, WriteConcernOptions::SyncMode::UNSET, WriteConcernOptions::kNoTimeout}; + +class ShardsvrSetClusterParameterCommand final + : public TypedCommand { +public: + using Request = ShardsvrSetClusterParameter; + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + uassert(ErrorCodes::IllegalOperation, + str::stream() << Request::kCommandName << " can only be run on shard servers", + serverGlobalParams.clusterRole == ClusterRole::ShardServer); + CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName, + opCtx->getWriteConcern()); + + SetClusterParameter setClusterParameterRequest(request().getCommandParameter()); + setClusterParameterRequest.setDbName(NamespaceString::kAdminDb); + std::unique_ptr parameterService = + std::make_unique(); + DBDirectClient client(opCtx); + ClusterParameterDBClientService dbService(client); + SetClusterParameterInvocation invocation{std::move(parameterService), dbService}; + // Use local write concern for setClusterParameter, the idea is that the command is + // being called with majority write concern, so, we'll wait for majority after checking + // out the session. + bool writePerformed = invocation.invoke(opCtx, + setClusterParameterRequest, + request().getClusterParameterTime(), + kLocalWriteConcern); + if (!writePerformed) { + // Since no write happened on this txnNumber, we need to make a dummy write so + // that secondaries can be aware of this txn. + DBDirectClient client(opCtx); + client.update(NamespaceString::kServerConfigurationNamespace.ns(), + BSON("_id" + << "SetClusterParameterStats"), + BSON("$inc" << BSON("count" << 1)), + true /* upsert */, + false /* multi */); + } + } + + private: + NamespaceString ns() const override { + return NamespaceString(); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; + + std::string help() const override { + return "Internal command, which is exported by the shard servers. Do not call " + "directly. Set's the cluster parameter in the node."; + } + + bool adminOnly() const override { + return true; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } +} shardsvrSetClusterParameterCmd; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp index 04bc909208e..cc136f0fa6f 100644 --- a/src/mongo/db/transaction_validation.cpp +++ b/src/mongo/db/transaction_validation.cpp @@ -65,6 +65,7 @@ const StringMap retryableWriteCommands = {{"clusterDelete", 1}, {"_shardsvrParticipantBlock", 1}, {"_configsvrCollMod", 1}, {"_shardsvrCollModParticipant", 1}, + {"_shardsvrSetClusterParameter", 1}, {"_shardsvrSetUserWriteBlockMode", 1}}; // TODO SERVER-65101: Replace this with a property on each command. diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.cpp b/src/mongo/idl/cluster_server_parameter_op_observer.cpp index 19362dd7bb5..de195a75ab1 100644 --- a/src/mongo/idl/cluster_server_parameter_op_observer.cpp +++ b/src/mongo/idl/cluster_server_parameter_op_observer.cpp @@ -41,9 +41,6 @@ namespace mongo { namespace { constexpr auto kIdField = "_id"_sd; constexpr auto kCPTField = "clusterParameterTime"_sd; -constexpr auto kConfigDB = "config"_sd; -constexpr auto kClusterParametersColl = "clusterParameters"_sd; -const NamespaceString kClusterParametersNS(kConfigDB, kClusterParametersColl); /** * Per-operation scratch space indicating the document being deleted. @@ -53,7 +50,7 @@ const NamespaceString kClusterParametersNS(kConfigDB, kClusterParametersColl); const auto aboutToDeleteDoc = OperationContext::declareDecoration(); bool isConfigNamespace(const NamespaceString& nss) { - return nss == kClusterParametersNS; + return nss == NamespaceString::kClusterParametersNamespace; } constexpr auto kOplog = "oplog"_sd; @@ -127,7 +124,7 @@ void doLoadAllParametersFromDisk(OperationContext* opCtx, StringData mode, OnEnt std::vector failures; DBDirectClient client(opCtx); - FindCommandRequest findRequest{kClusterParametersNS}; + FindCommandRequest findRequest{NamespaceString::kClusterParametersNamespace}; client.find(std::move(findRequest), ReadPreferenceSetting{}, [&](BSONObj doc) { try { onEntry(doc, mode); @@ -242,7 +239,7 @@ void ClusterServerParameterOpObserver::onDelete(OperationContext* opCtx, void ClusterServerParameterOpObserver::onDropDatabase(OperationContext* opCtx, const std::string& dbName) { - if (dbName == kConfigDB) { + if (dbName == NamespaceString::kConfigDb) { // Entire config DB deleted, reset to default state. clearAllParameters(); } @@ -303,7 +300,7 @@ void ClusterServerParameterOpObserver::onImportCollection(OperationContext* opCt void ClusterServerParameterOpObserver::_onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) { - if (rbInfo.rollbackNamespaces.count(kClusterParametersNS)) { + if (rbInfo.rollbackNamespaces.count(NamespaceString::kClusterParametersNamespace)) { // Some kind of rollback happend in the settings collection. // Just reload from disk to be safe. resynchronizeAllParametersFromDisk(opCtx); diff --git a/src/mongo/s/request_types/sharded_ddl_commands.idl b/src/mongo/s/request_types/sharded_ddl_commands.idl index fd0a7a1946f..8f40c11e835 100644 --- a/src/mongo/s/request_types/sharded_ddl_commands.idl +++ b/src/mongo/s/request_types/sharded_ddl_commands.idl @@ -453,12 +453,24 @@ commands: description: "Determines the phase of the blocking/unblocking procedure to be executed." + _shardsvrSetClusterParameter: + command_name: _shardsvrSetClusterParameter + cpp_name: ShardsvrSetClusterParameter + description: "internal _shardsvrSetClusterParameter command" + namespace: type + type: object + api_version: "" + strict: false + fields: + clusterParameterTime: + type: timestamp + description: "Time that will be associated with the cluster parameter" + _configsvrSetClusterParameter: command_name: _configsvrSetClusterParameter cpp_name: ConfigsvrSetClusterParameter description: "Internal command sent to the config server to start a coordinator and set the cluster parameter." - # TODO SERVER-63870: use replica set command parameters. namespace: type type: object_owned api_version: "" -- cgit v1.2.1