summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcos José Grillo Ramirez <marcos.grillo@mongodb.com>2022-04-05 17:11:59 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-05 20:28:45 +0000
commit91a607d3a2c748ead682c1a44d37263254de26f8 (patch)
tree1245c2efd4eb7ebcc46f8128c292c8df8744ec3d
parentc96f8dacc4c71b4774c932a07be4fac71b6db628 (diff)
downloadmongo-91a607d3a2c748ead682c1a44d37263254de26f8.tar.gz
SERVER-63870 Integrate replica set setClusterParameter into POS with replay protection
-rw-r--r--jstests/auth/lib/commands_lib.js8
-rw-r--r--jstests/core/views/views_all_commands.js1
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js1
-rw-r--r--jstests/sharding/read_write_concern_defaults_application.js1
-rw-r--r--jstests/sharding/set_cluster_parameter.js78
-rw-r--r--src/mongo/db/commands/cluster_server_parameter_cmds.idl2
-rw-r--r--src/mongo/db/commands/set_cluster_parameter_command.cpp6
-rw-r--r--src/mongo/db/commands/set_cluster_parameter_invocation.cpp40
-rw-r--r--src/mongo/db/commands/set_cluster_parameter_invocation.h18
-rw-r--r--src/mongo/db/commands/set_cluster_parameter_invocation_test.cpp38
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/config/configsvr_set_cluster_parameter_command.cpp27
-rw-r--r--src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp117
-rw-r--r--src/mongo/db/s/config/set_cluster_parameter_coordinator.h18
-rw-r--r--src/mongo/db/s/config/set_cluster_parameter_coordinator_document.idl5
-rw-r--r--src/mongo/db/s/shardsvr_set_cluster_parameter_command.cpp127
-rw-r--r--src/mongo/db/transaction_validation.cpp1
-rw-r--r--src/mongo/idl/cluster_server_parameter_op_observer.cpp11
-rw-r--r--src/mongo/s/request_types/sharded_ddl_commands.idl14
19 files changed, 452 insertions, 63 deletions
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js
index dbc73db0e47..5f9d5856505 100644
--- a/jstests/auth/lib/commands_lib.js
+++ b/jstests/auth/lib/commands_lib.js
@@ -4178,7 +4178,11 @@ var authCommandsLib = {
{
testname: "getClusterParameter",
command: {getClusterParameter: "testIntClusterParameter"},
- skipTest: (conn) => !TestData.setParameters.featureFlagClusterWideConfig,
+ skipTest: (conn) => {
+ const hello = assert.commandWorked(conn.getDB("admin").runCommand({hello: 1}));
+ const isStandalone = hello.msg !== "isdbgrid" && !hello.hasOwnProperty('setName');
+ return !TestData.setParameters.featureFlagClusterWideConfig || isStandalone;
+ },
testcases: [
{
runOnDb: adminDbName,
@@ -5666,7 +5670,7 @@ var authCommandsLib = {
},
{
testname: "setClusterParameter",
- command: {setClusterParameter: {testIntClusterParameterParam: {intData: 17}}},
+ command: {setClusterParameter: {testIntClusterParameter: {intData: 17}}},
skipTest: (conn) => {
const hello = assert.commandWorked(conn.getDB("admin").runCommand({hello: 1}));
const isStandalone = hello.msg !== "isdbgrid" && !hello.hasOwnProperty('setName');
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index 551cb62518c..c6451f6934d 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -169,6 +169,7 @@ let viewsCommandTests = {
_shardsvrReshardCollection: {skip: isAnInternalCommand},
_shardsvrReshardingOperationTime: {skip: isAnInternalCommand},
_shardsvrSetAllowMigrations: {skip: isAnInternalCommand},
+ _shardsvrSetClusterParameter: {skip: isAnInternalCommand},
_shardsvrSetUserWriteBlockMode: {skip: isAnInternalCommand},
_shardsvrShardCollection:
{skip: isAnInternalCommand}, // TODO SERVER-58843: Remove once 6.0 becomes last LTS
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index d9bc04fe52c..b089f91df8a 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -97,6 +97,7 @@ const allCommands = {
_shardsvrReshardingOperationTime: {skip: isPrimaryOnly},
_shardsvrRefineCollectionShardKey: {skip: isPrimaryOnly},
_shardsvrSetAllowMigrations: {skip: isPrimaryOnly},
+ _shardsvrSetClusterParameter: {skip: isAnInternalCommand},
_shardsvrSetUserWriteBlockMode: {skip: isPrimaryOnly},
_shardsvrCollMod: {skip: isPrimaryOnly},
_shardsvrCollModParticipant: {skip: isAnInternalCommand},
diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js
index d7df5133590..31d5beb1541 100644
--- a/jstests/sharding/read_write_concern_defaults_application.js
+++ b/jstests/sharding/read_write_concern_defaults_application.js
@@ -168,6 +168,7 @@ let testCases = {
_shardsvrReshardCollection: {skip: "internal command"},
_shardsvrReshardingOperationTime: {skip: "internal command"},
_shardsvrSetAllowMigrations: {skip: "internal command"},
+ _shardsvrSetClusterParameter: {skip: "internal command"},
_shardsvrSetUserWriteBlockMode: {skip: "internal command"},
_shardsvrCollMod: {skip: "internal command"},
_shardsvrCollModParticipant: {skip: "internal command"},
diff --git a/jstests/sharding/set_cluster_parameter.js b/jstests/sharding/set_cluster_parameter.js
index ad550c6b685..70a2cab98e2 100644
--- a/jstests/sharding/set_cluster_parameter.js
+++ b/jstests/sharding/set_cluster_parameter.js
@@ -13,15 +13,31 @@
load('jstests/libs/fail_point_util.js');
-const st = new ShardingTest({shards: 1});
+const clusterParameter1Value = {
+ intData: 42
+};
+const clusterParameter1Name = 'testIntClusterParameter';
+const clusterParameter1 = {
+ [clusterParameter1Name]: clusterParameter1Value
+};
+
+const clusterParameter2Value = {
+ strData: 'on'
+};
+const clusterParameter2Name = 'testStrClusterParameter';
+const clusterParameter2 = {
+ [clusterParameter2Name]: clusterParameter2Value
+};
+
+const st = new ShardingTest({shards: 1, rs: {nodes: 3}});
let fp =
configureFailPoint(st.configRS.getPrimary(), 'hangBeforeRunningConfigsvrCoordinatorInstance');
-let setClusterParameterSuccessThread = new Thread((mongosConnString) => {
+let setClusterParameterSuccessThread = new Thread((mongosConnString, clusterParameter) => {
let mongos = new Mongo(mongosConnString);
- assert.commandWorked(mongos.adminCommand({setClusterParameter: {param: true}}));
-}, st.s.host);
+ assert.commandWorked(mongos.adminCommand({setClusterParameter: clusterParameter}));
+}, st.s.host, clusterParameter1);
setClusterParameterSuccessThread.start();
fp.wait();
@@ -29,12 +45,13 @@ fp.wait();
jsTestLog(
'Check that 2 requests for the same cluster parameter and same value generates only one coordinator.');
-let setClusterParameterJoinSuccessThread = new Thread((mongosConnString) => {
+let setClusterParameterJoinSuccessThread = new Thread((mongosConnString, clusterParameter) => {
let mongos = new Mongo(mongosConnString);
- assert.commandWorked(mongos.adminCommand({setClusterParameter: {param: true}}));
-}, st.s.host);
+ assert.commandWorked(mongos.adminCommand({setClusterParameter: clusterParameter}));
+}, st.s.host, clusterParameter1);
setClusterParameterJoinSuccessThread.start();
+fp.wait();
let currOp =
st.configRS.getPrimary()
@@ -44,17 +61,58 @@ let currOp =
.toArray();
assert.eq(1, currOp.length);
assert(currOp[0].hasOwnProperty('command'));
-assert(currOp[0].command.hasOwnProperty('param'));
-assert.eq(true, currOp[0].command.param);
+assert.docEq(currOp[0].command, clusterParameter1);
jsTestLog('Check that a second request will fail with ConflictingOperationInProgress.');
-assert.commandFailedWithCode(st.s.adminCommand({setClusterParameter: {otherParam: true}}),
+assert.commandFailedWithCode(st.s.adminCommand({setClusterParameter: clusterParameter2}),
ErrorCodes.ConflictingOperationInProgress);
fp.off();
setClusterParameterSuccessThread.join();
setClusterParameterJoinSuccessThread.join();
+jsTestLog('Check forward progress until completion in the presence of a config server stepdown.');
+
+fp = configureFailPoint(st.configRS.getPrimary(), 'hangBeforeRunningConfigsvrCoordinatorInstance');
+
+let setClusterParameterThread = new Thread((mongosConnString, clusterParameter) => {
+ let mongos = new Mongo(mongosConnString);
+ assert.commandWorked(mongos.adminCommand({setClusterParameter: clusterParameter}));
+}, st.s.host, clusterParameter2);
+
+setClusterParameterThread.start();
+fp.wait();
+
+let newPrimary = st.configRS.getSecondary();
+
+st.configRS.stepUp(newPrimary);
+
+// After the stepdown the command should be retried and finish successfully.
+setClusterParameterThread.join();
+
+const clusterParametersConfigColl =
+ st.configRS.getPrimary().getCollection('config.clusterParameters');
+
+const shardParametersConfigColl = st.rs0.getPrimary().getCollection('config.clusterParameters');
+
+assert.eq(1, clusterParametersConfigColl.countDocuments({_id: clusterParameter2Name}));
+const configClusterParameter = clusterParametersConfigColl.findOne(
+ {_id: clusterParameter2Name}, {_id: 0, clusterParameterTime: 0});
+const shardClusterParameter = shardParametersConfigColl.findOne({_id: clusterParameter2Name},
+ {_id: 0, clusterParameterTime: 0});
+assert.docEq(configClusterParameter, clusterParameter2Value);
+assert.docEq(shardClusterParameter, clusterParameter2Value);
+
+fp.off();
+
+// Check the full cluster has the same clusterParameterTime as the config server.
+const configParameterTime =
+ clusterParametersConfigColl.findOne({_id: clusterParameter2Name}, {clusterParameterTime: 1})
+ .clusterParameterTime;
+assert.eq(configParameterTime,
+ shardParametersConfigColl.findOne({_id: clusterParameter2Name}, {clusterParameterTime: 1})
+ .clusterParameterTime);
+
st.stop();
})();
diff --git a/src/mongo/db/commands/cluster_server_parameter_cmds.idl b/src/mongo/db/commands/cluster_server_parameter_cmds.idl
index 9c768f43f23..256b4cbd6d6 100644
--- a/src/mongo/db/commands/cluster_server_parameter_cmds.idl
+++ b/src/mongo/db/commands/cluster_server_parameter_cmds.idl
@@ -46,7 +46,7 @@ commands:
api_version: ""
namespace: type
type: object
- strict: true
+ strict: false
getClusterParameter:
description: "Retrieves the in-memory value of the specified cluster server parameter(s)"
diff --git a/src/mongo/db/commands/set_cluster_parameter_command.cpp b/src/mongo/db/commands/set_cluster_parameter_command.cpp
index 9d4074ac0bc..c4472f1ae12 100644
--- a/src/mongo/db/commands/set_cluster_parameter_command.cpp
+++ b/src/mongo/db/commands/set_cluster_parameter_command.cpp
@@ -42,6 +42,10 @@ namespace mongo {
namespace {
+const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kNoTimeout};
+
class SetClusterParameterCommand final : public TypedCommand<SetClusterParameterCommand> {
public:
using Request = SetClusterParameter;
@@ -76,7 +80,7 @@ public:
SetClusterParameterInvocation invocation{std::move(parameterService), dbService};
- invocation.invoke(opCtx, request());
+ invocation.invoke(opCtx, request(), boost::none, kMajorityWriteConcern);
}
private:
diff --git a/src/mongo/db/commands/set_cluster_parameter_invocation.cpp b/src/mongo/db/commands/set_cluster_parameter_invocation.cpp
index 513ae376564..8d29147d282 100644
--- a/src/mongo/db/commands/set_cluster_parameter_invocation.cpp
+++ b/src/mongo/db/commands/set_cluster_parameter_invocation.cpp
@@ -42,8 +42,10 @@
namespace mongo {
-void SetClusterParameterInvocation::invoke(OperationContext* opCtx,
- const SetClusterParameter& cmd) {
+bool SetClusterParameterInvocation::invoke(OperationContext* opCtx,
+ const SetClusterParameter& cmd,
+ boost::optional<Timestamp> paramTime,
+ const WriteConcernOptions& writeConcern) {
BSONObj cmdParamObj = cmd.getCommandParameter();
BSONElement commandElement = cmdParamObj.firstElement();
@@ -51,14 +53,16 @@ void SetClusterParameterInvocation::invoke(OperationContext* opCtx,
const ServerParameter* serverParameter = _sps->getIfExists(parameterName);
- uassert(6432601,
+ uassert(ErrorCodes::IllegalOperation,
str::stream() << "Unknown Cluster Parameter " << parameterName,
serverParameter != nullptr);
- uassert(6432602,
+
+ uassert(ErrorCodes::IllegalOperation,
"Cluster parameter value must be an object",
BSONType::Object == commandElement.type());
- LogicalTime clusterTime = _dbService.getUpdateClusterTime(opCtx);
+ LogicalTime clusterTime =
+ paramTime ? LogicalTime(*paramTime) : _dbService.getUpdateClusterTime(opCtx);
BSONObjBuilder updateBuilder;
updateBuilder << "_id" << parameterName << "clusterParameterTime" << clusterTime.toBSON();
@@ -67,11 +71,11 @@ void SetClusterParameterInvocation::invoke(OperationContext* opCtx,
BSONObj query = BSON("_id" << parameterName);
BSONObj update = updateBuilder.obj();
- LOGV2(6432603, "Updating cluster parameter on-disk", "clusterParameter"_attr = parameterName);
+ LOGV2_DEBUG(
+ 6432603, 2, "Updating cluster parameter on-disk", "clusterParameter"_attr = parameterName);
uassertStatusOK(serverParameter->validate(update));
- Status updateResult = _dbService.updateParameterOnDisk(query, update);
- uassertStatusOK(updateResult);
+ return uassertStatusOK(_dbService.updateParameterOnDisk(opCtx, query, update, writeConcern));
}
LogicalTime ClusterParameterDBClientService::getUpdateClusterTime(OperationContext* opCtx) {
@@ -79,23 +83,23 @@ LogicalTime ClusterParameterDBClientService::getUpdateClusterTime(OperationConte
return vt.clusterTime();
}
-Status ClusterParameterDBClientService::updateParameterOnDisk(BSONObj query, BSONObj update) {
+StatusWith<bool> ClusterParameterDBClientService::updateParameterOnDisk(
+ OperationContext* opCtx,
+ BSONObj query,
+ BSONObj update,
+ const WriteConcernOptions& writeConcern) {
BSONObj res;
BSONObjBuilder set;
set.append("$set", update);
set.doneFast();
- const std::string configDb = "config";
- const auto kMajorityWriteConcern = BSON("writeConcern" << BSON("w"
- << "majority"));
- const NamespaceString kClusterParametersNamespace(configDb, "clusterParameters");
-
try {
_dbClient.runCommand(
- configDb,
+ NamespaceString::kConfigDb.toString(),
[&] {
- write_ops::UpdateCommandRequest updateOp(kClusterParametersNamespace);
+ write_ops::UpdateCommandRequest updateOp(
+ NamespaceString::kClusterParametersNamespace);
updateOp.setUpdates({[&] {
write_ops::UpdateOpEntry entry;
entry.setQ(query);
@@ -105,7 +109,7 @@ Status ClusterParameterDBClientService::updateParameterOnDisk(BSONObj query, BSO
return entry;
}()});
- return updateOp.toBSON(kMajorityWriteConcern);
+ return updateOp.toBSON(writeConcern.toBSON());
}(),
res);
} catch (const DBException& ex) {
@@ -119,7 +123,7 @@ Status ClusterParameterDBClientService::updateParameterOnDisk(BSONObj query, BSO
return Status(ErrorCodes::FailedToParse, errmsg);
}
- return Status::OK();
+ return response.getNModified() > 0 || response.getN() > 0;
}
const ServerParameter* ClusterParameterService::getIfExists(StringData name) {
diff --git a/src/mongo/db/commands/set_cluster_parameter_invocation.h b/src/mongo/db/commands/set_cluster_parameter_invocation.h
index 35b74530a95..d6ff09f0d47 100644
--- a/src/mongo/db/commands/set_cluster_parameter_invocation.h
+++ b/src/mongo/db/commands/set_cluster_parameter_invocation.h
@@ -48,7 +48,10 @@ public:
class DBClientService {
public:
- virtual Status updateParameterOnDisk(BSONObj query, BSONObj update) = 0;
+ virtual StatusWith<bool> updateParameterOnDisk(OperationContext* opCtx,
+ BSONObj query,
+ BSONObj update,
+ const WriteConcernOptions&) = 0;
virtual LogicalTime getUpdateClusterTime(OperationContext*) = 0;
virtual ~DBClientService() = default;
};
@@ -56,7 +59,10 @@ public:
class ClusterParameterDBClientService final : public DBClientService {
public:
ClusterParameterDBClientService(DBDirectClient& dbDirectClient) : _dbClient(dbDirectClient) {}
- Status updateParameterOnDisk(BSONObj query, BSONObj update) override;
+ StatusWith<bool> updateParameterOnDisk(OperationContext* opCtx,
+ BSONObj query,
+ BSONObj update,
+ const WriteConcernOptions&) override;
LogicalTime getUpdateClusterTime(OperationContext*) override;
private:
@@ -69,13 +75,13 @@ public:
DBClientService& dbClientService)
: _sps(std::move(serverParmeterService)), _dbService(dbClientService) {}
- void invoke(OperationContext*, const SetClusterParameter&);
+ bool invoke(OperationContext*,
+ const SetClusterParameter&,
+ boost::optional<Timestamp>,
+ const WriteConcernOptions&);
private:
std::unique_ptr<ServerParameterService> _sps;
DBClientService& _dbService;
-
- Status updateParameterOnDisk(BSONObj query, BSONObj update);
};
-
} // namespace mongo
diff --git a/src/mongo/db/commands/set_cluster_parameter_invocation_test.cpp b/src/mongo/db/commands/set_cluster_parameter_invocation_test.cpp
index 3934e890684..895b094d47d 100644
--- a/src/mongo/db/commands/set_cluster_parameter_invocation_test.cpp
+++ b/src/mongo/db/commands/set_cluster_parameter_invocation_test.cpp
@@ -44,6 +44,10 @@
namespace mongo {
namespace {
+const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kNoTimeout};
+
// Mocks
class MockParameterService : public ServerParameterService {
public:
@@ -89,11 +93,14 @@ private:
class DBClientMock : public DBClientService {
public:
- DBClientMock(std::function<Status(BSONObj, BSONObj)> updateParameterOnDiskMock) {
+ DBClientMock(std::function<StatusWith<bool>(BSONObj, BSONObj)> updateParameterOnDiskMock) {
this->updateParameterOnDiskMockImpl = updateParameterOnDiskMock;
}
- Status updateParameterOnDisk(BSONObj cmd, BSONObj info) override {
+ StatusWith<bool> updateParameterOnDisk(OperationContext* opCtx,
+ BSONObj cmd,
+ BSONObj info,
+ const WriteConcernOptions&) override {
return updateParameterOnDiskMockImpl(cmd, info);
}
@@ -103,7 +110,7 @@ public:
}
private:
- std::function<Status(BSONObj, BSONObj)> updateParameterOnDiskMockImpl;
+ std::function<StatusWith<bool>(BSONObj, BSONObj)> updateParameterOnDiskMockImpl;
};
MockServerParameter alwaysValidatingServerParameter(StringData name) {
@@ -123,7 +130,7 @@ MockServerParameter alwaysInvalidatingServerParameter(StringData name) {
}
DBClientMock alwaysSucceedingDbClient() {
- DBClientMock dbServiceMock([&](BSONObj cmd, BSONObj info) { return Status::OK(); });
+ DBClientMock dbServiceMock([&](BSONObj cmd, BSONObj info) { return true; });
return dbServiceMock;
}
@@ -164,7 +171,7 @@ TEST(SetClusterParameterCommand, SucceedsWithObjectParameter) {
<< "majority")));
SetClusterParameter testCmd(obj);
- fixture.invoke(&spyCtx, testCmd);
+ fixture.invoke(&spyCtx, testCmd, boost::none, kMajorityWriteConcern);
}
TEST(SetClusterParameterCommand, ThrowsWithNonObjectParameter) {
@@ -189,7 +196,9 @@ TEST(SetClusterParameterCommand, ThrowsWithNonObjectParameter) {
OperationContext spyCtx(clientPtr, 1234);
SetClusterParameter testCmd(obj);
- ASSERT_THROWS_CODE(fixture.invoke(&spyCtx, testCmd), DBException, 6432602);
+ ASSERT_THROWS_CODE(fixture.invoke(&spyCtx, testCmd, boost::none, kMajorityWriteConcern),
+ DBException,
+ ErrorCodes::IllegalOperation);
}
TEST(SetClusterParameterCommand, ThrowsWhenServerParameterValidationFails) {
@@ -217,10 +226,11 @@ TEST(SetClusterParameterCommand, ThrowsWhenServerParameterValidationFails) {
OperationContext spyCtx(clientPtr, 1234);
SetClusterParameter testCmd(obj);
- ASSERT_THROWS_CODE_AND_WHAT(fixture.invoke(&spyCtx, testCmd),
- DBException,
- ErrorCodes::BadValue,
- "Parameter Validation Failed"_sd);
+ ASSERT_THROWS_CODE_AND_WHAT(
+ fixture.invoke(&spyCtx, testCmd, boost::none, kMajorityWriteConcern),
+ DBException,
+ ErrorCodes::BadValue,
+ "Parameter Validation Failed"_sd);
}
TEST(SetClusterParameterCommand, ThrowsWhenDBUpdateFails) {
@@ -248,7 +258,9 @@ TEST(SetClusterParameterCommand, ThrowsWhenDBUpdateFails) {
SetClusterParameter testCmd(obj);
- ASSERT_THROWS_WHAT(fixture.invoke(&spyCtx, testCmd), DBException, "DB Client Update Failed"_sd);
+ ASSERT_THROWS_WHAT(fixture.invoke(&spyCtx, testCmd, boost::none, kMajorityWriteConcern),
+ DBException,
+ "DB Client Update Failed"_sd);
}
TEST(SetClusterParameterCommand, ThrowsWhenParameterNotPresent) {
@@ -275,7 +287,9 @@ TEST(SetClusterParameterCommand, ThrowsWhenParameterNotPresent) {
SetClusterParameter testCmd(obj);
- ASSERT_THROWS_CODE(fixture.invoke(&spyCtx, testCmd), DBException, 6432601);
+ ASSERT_THROWS_CODE(fixture.invoke(&spyCtx, testCmd, boost::none, kMajorityWriteConcern),
+ DBException,
+ ErrorCodes::IllegalOperation);
}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 45a68eafca0..d16459c2c44 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -422,6 +422,7 @@ env.Library(
'shardsvr_reshard_collection_command.cpp',
'shardsvr_resharding_operation_time_command.cpp',
'shardsvr_set_allow_migrations_command.cpp',
+ 'shardsvr_set_cluster_parameter_command.cpp',
'shardsvr_set_user_write_block_mode_command.cpp',
'shardsvr_split_chunk_command.cpp',
'split_vector_command.cpp',
@@ -437,6 +438,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/mongod_fcv',
'$BUILD_DIR/mongo/db/commands/rename_collection_idl',
'$BUILD_DIR/mongo/db/commands/server_status',
+ '$BUILD_DIR/mongo/db/commands/set_cluster_parameter_invocation',
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/commands/txn_cmd_request',
'$BUILD_DIR/mongo/db/fle_crud',
diff --git a/src/mongo/db/s/config/configsvr_set_cluster_parameter_command.cpp b/src/mongo/db/s/config/configsvr_set_cluster_parameter_command.cpp
index beb08387bb7..6a0109194f6 100644
--- a/src/mongo/db/s/config/configsvr_set_cluster_parameter_command.cpp
+++ b/src/mongo/db/s/config/configsvr_set_cluster_parameter_command.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/set_cluster_parameter_invocation.h"
#include "mongo/db/s/config/configsvr_coordinator_service.h"
#include "mongo/db/s/config/set_cluster_parameter_coordinator.h"
#include "mongo/idl/cluster_server_parameter_gen.h"
@@ -62,6 +63,32 @@ public:
"featureFlagClusterWideConfig not enabled",
gFeatureFlagClusterWideConfig.isEnabled(serverGlobalParams.featureCompatibility));
+ // Validate parameter before creating coordinator.
+ {
+ BSONObj cmdParamObj = request().getCommandParameter();
+ BSONElement commandElement = cmdParamObj.firstElement();
+ StringData parameterName = commandElement.fieldName();
+ std::unique_ptr<ServerParameterService> sps =
+ std::make_unique<ClusterParameterService>();
+ const ServerParameter* serverParameter = sps->getIfExists(parameterName);
+
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Unknown Cluster Parameter " << parameterName,
+ serverParameter != nullptr);
+
+ uassert(ErrorCodes::IllegalOperation,
+ "Cluster parameter value must be an object",
+ BSONType::Object == commandElement.type());
+
+ BSONObjBuilder clusterParamBuilder;
+ clusterParamBuilder << "_id" << parameterName;
+ clusterParamBuilder.appendElements(commandElement.Obj());
+
+ BSONObj clusterParam = clusterParamBuilder.obj();
+
+ uassertStatusOK(serverParameter->validate(clusterParam));
+ }
+
SetClusterParameterCoordinatorDocument coordinatorDoc;
coordinatorDoc.setConfigsvrCoordinatorMetadata(
{ConfigsvrCoordinatorTypeEnum::kSetClusterParameter});
diff --git a/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp b/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp
index 209ebf8a2f3..3966879144d 100644
--- a/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp
+++ b/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp
@@ -33,13 +33,26 @@
#include "mongo/db/s/config/set_cluster_parameter_coordinator.h"
+#include "mongo/db/cancelable_operation_context.h"
+#include "mongo/db/commands/cluster_server_parameter_cmds_gen.h"
+#include "mongo/db/commands/set_cluster_parameter_invocation.h"
+#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/s/sharding_logging.h"
+#include "mongo/db/s/sharding_util.h"
+#include "mongo/db/vector_clock.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/grid.h"
#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
namespace mongo {
+namespace {
+
+const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kNoTimeout};
+}
bool SetClusterParameterCoordinator::hasSameOptions(const BSONObj& otherDocBSON) const {
- // TODO SERVER-63870: add command parameters to comparison.
const auto otherDoc = StateDoc::parse(
IDLParserErrorContext("SetClusterParameterCoordinatorDocument"), otherDocBSON);
return SimpleBSONObjComparator::kInstance.evaluate(_doc.getParameter() ==
@@ -87,6 +100,59 @@ void SetClusterParameterCoordinator::_enterPhase(Phase newPhase) {
_doc = std::move(newDoc);
}
+bool SetClusterParameterCoordinator::_isClusterParameterSetAtTimestamp(OperationContext* opCtx) {
+ auto parameterElem = _doc.getParameter().firstElement();
+ auto parameterName = parameterElem.fieldName();
+ auto parameter = _doc.getParameter()[parameterName].Obj();
+ auto configsvrParameters =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString::kClusterParametersNamespace,
+ BSON("_id" << parameterName << "clusterParameterTime"
+ << *_doc.getClusterParameterTime()),
+ BSONObj(),
+ boost::none));
+
+ dassert(configsvrParameters.docs.size() <= 1);
+
+ return !configsvrParameters.docs.empty();
+}
+
+void SetClusterParameterCoordinator::_sendSetClusterParameterToAllShards(
+ OperationContext* opCtx,
+ const OperationSessionInfo& session,
+ std::shared_ptr<executor::ScopedTaskExecutor> executor) {
+ auto shards = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx);
+
+ LOGV2_DEBUG(6387001, 1, "Sending setClusterParameter to shards:", "shards"_attr = shards);
+
+ ShardsvrSetClusterParameter request(_doc.getParameter());
+ request.setDbName(NamespaceString::kAdminDb);
+ request.setClusterParameterTime(*_doc.getClusterParameterTime());
+ sharding_util::sendCommandToShards(
+ opCtx,
+ NamespaceString::kAdminDb,
+ CommandHelpers::appendMajorityWriteConcern(request.toBSON(session.toBSON())),
+ shards,
+ **executor);
+}
+
+void SetClusterParameterCoordinator::_commit(OperationContext* opCtx) {
+ LOGV2_DEBUG(6387002, 1, "Updating configsvr cluster parameter");
+
+ SetClusterParameter setClusterParameterRequest(_doc.getParameter());
+ setClusterParameterRequest.setDbName(NamespaceString::kAdminDb);
+ std::unique_ptr<ServerParameterService> parameterService =
+ std::make_unique<ClusterParameterService>();
+ DBDirectClient client(opCtx);
+ ClusterParameterDBClientService dbService(client);
+ SetClusterParameterInvocation invocation{std::move(parameterService), dbService};
+ invocation.invoke(
+ opCtx, setClusterParameterRequest, _doc.getClusterParameterTime(), kMajorityWriteConcern);
+}
+
const ConfigsvrCoordinatorMetadata& SetClusterParameterCoordinator::metadata() const {
return _doc.getConfigsvrCoordinatorMetadata();
}
@@ -95,9 +161,52 @@ ExecutorFuture<void> SetClusterParameterCoordinator::_runImpl(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept {
return ExecutorFuture<void>(**executor)
- .then(_executePhase(Phase::kSetClusterParameter, [this, anchor = shared_from_this()] {
- // TODO Implement
- }));
+ .then([this, anchor = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+
+ // Select a cluster parameter time only once, when the coordinator is run the first
+ // time, this way, even if the process steps down while sending the command to the
+ // shards, on the next run will use the same time for the remaining shards.
+ if (!_doc.getClusterParameterTime()) {
+ // Select a clusterParameter time.
+ auto vt = VectorClock::get(opCtx)->getTime();
+ auto clusterParameterTime = vt.clusterTime();
+ _doc.setClusterParameterTime(clusterParameterTime.asTimestamp());
+ }
+ })
+ .then(_executePhase(Phase::kSetClusterParameter,
+ [this, executor = executor, anchor = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+
+ ShardingLogging::get(opCtx)->logChange(
+ opCtx,
+ "setClusterParameter.start",
+ NamespaceString::kClusterParametersNamespace.toString(),
+ _doc.getParameter(),
+ kMajorityWriteConcern);
+
+ // If the parameter was already set on the config server, there is
+ // nothing else to do.
+ if (_isClusterParameterSetAtTimestamp(opCtx)) {
+ return;
+ }
+
+ _doc = _updateSession(opCtx, _doc);
+ const auto session = _getCurrentSession();
+
+ _sendSetClusterParameterToAllShards(opCtx, session, executor);
+
+ _commit(opCtx);
+
+ ShardingLogging::get(opCtx)->logChange(
+ opCtx,
+ "setClusterParameter.end",
+ NamespaceString::kClusterParametersNamespace.toString(),
+ _doc.getParameter(),
+ kMajorityWriteConcern);
+ }));
}
} // namespace mongo
diff --git a/src/mongo/db/s/config/set_cluster_parameter_coordinator.h b/src/mongo/db/s/config/set_cluster_parameter_coordinator.h
index ec0f0fead04..2e172991a00 100644
--- a/src/mongo/db/s/config/set_cluster_parameter_coordinator.h
+++ b/src/mongo/db/s/config/set_cluster_parameter_coordinator.h
@@ -57,6 +57,24 @@ private:
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
+ /*
+ * Performs a local write with majority write concern to set the parameter.
+ */
+ void _commit(OperationContext* opCtx);
+
+ /*
+ * Checks if the cluster parameter was already set to the provided value.
+ */
+ bool _isClusterParameterSetAtTimestamp(OperationContext* opCtx);
+
+ /*
+ * Sends setClusterParameter to every shard in the cluster with the appropiate session.
+ */
+ void _sendSetClusterParameterToAllShards(
+ OperationContext* opCtx,
+ const OperationSessionInfo& opInfo,
+ std::shared_ptr<executor::ScopedTaskExecutor> executor);
+
const ConfigsvrCoordinatorMetadata& metadata() const override;
template <typename Func>
diff --git a/src/mongo/db/s/config/set_cluster_parameter_coordinator_document.idl b/src/mongo/db/s/config/set_cluster_parameter_coordinator_document.idl
index ced314175c8..ebffbc826a1 100644
--- a/src/mongo/db/s/config/set_cluster_parameter_coordinator_document.idl
+++ b/src/mongo/db/s/config/set_cluster_parameter_coordinator_document.idl
@@ -55,7 +55,10 @@ structs:
type: SetClusterParameterCoordinatorPhase
description: "Coordinator phase."
default: kUnset
- # TODO SERVER-63870: add setClusterParameter request arguments.
parameter:
type: object_owned
description: "Parameter to be set in the cluster"
+ clusterParameterTime:
+ type: timestamp
+ description: "Timestamp determined at the beginning that determines the parameter time."
+ optional: true
diff --git a/src/mongo/db/s/shardsvr_set_cluster_parameter_command.cpp b/src/mongo/db/s/shardsvr_set_cluster_parameter_command.cpp
new file mode 100644
index 00000000000..c84ad1dd27e
--- /dev/null
+++ b/src/mongo/db/s/shardsvr_set_cluster_parameter_command.cpp
@@ -0,0 +1,127 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/cancelable_operation_context.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/commands/set_cluster_parameter_invocation.h"
+#include "mongo/db/commands/user_management_commands_gen.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
+
+namespace mongo {
+namespace {
+
+const WriteConcernOptions kLocalWriteConcern{
+ 1, WriteConcernOptions::SyncMode::UNSET, WriteConcernOptions::kNoTimeout};
+
+class ShardsvrSetClusterParameterCommand final
+ : public TypedCommand<ShardsvrSetClusterParameterCommand> {
+public:
+ using Request = ShardsvrSetClusterParameter;
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << Request::kCommandName << " can only be run on shard servers",
+ serverGlobalParams.clusterRole == ClusterRole::ShardServer);
+ CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName,
+ opCtx->getWriteConcern());
+
+ SetClusterParameter setClusterParameterRequest(request().getCommandParameter());
+ setClusterParameterRequest.setDbName(NamespaceString::kAdminDb);
+ std::unique_ptr<ServerParameterService> parameterService =
+ std::make_unique<ClusterParameterService>();
+ DBDirectClient client(opCtx);
+ ClusterParameterDBClientService dbService(client);
+ SetClusterParameterInvocation invocation{std::move(parameterService), dbService};
+ // Use local write concern for setClusterParameter, the idea is that the command is
+ // being called with majority write concern, so, we'll wait for majority after checking
+ // out the session.
+ bool writePerformed = invocation.invoke(opCtx,
+ setClusterParameterRequest,
+ request().getClusterParameterTime(),
+ kLocalWriteConcern);
+ if (!writePerformed) {
+ // Since no write happened on this txnNumber, we need to make a dummy write so
+ // that secondaries can be aware of this txn.
+ DBDirectClient client(opCtx);
+ client.update(NamespaceString::kServerConfigurationNamespace.ns(),
+ BSON("_id"
+ << "SetClusterParameterStats"),
+ BSON("$inc" << BSON("count" << 1)),
+ true /* upsert */,
+ false /* multi */);
+ }
+ }
+
+ private:
+ NamespaceString ns() const override {
+ return NamespaceString();
+ }
+
+ bool supportsWriteConcern() const override {
+ return true;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
+ ActionType::internal));
+ }
+ };
+
+ std::string help() const override {
+ return "Internal command, which is exported by the shard servers. Do not call "
+ "directly. Set's the cluster parameter in the node.";
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+} shardsvrSetClusterParameterCmd;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp
index 04bc909208e..cc136f0fa6f 100644
--- a/src/mongo/db/transaction_validation.cpp
+++ b/src/mongo/db/transaction_validation.cpp
@@ -65,6 +65,7 @@ const StringMap<int> retryableWriteCommands = {{"clusterDelete", 1},
{"_shardsvrParticipantBlock", 1},
{"_configsvrCollMod", 1},
{"_shardsvrCollModParticipant", 1},
+ {"_shardsvrSetClusterParameter", 1},
{"_shardsvrSetUserWriteBlockMode", 1}};
// TODO SERVER-65101: Replace this with a property on each command.
diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.cpp b/src/mongo/idl/cluster_server_parameter_op_observer.cpp
index 19362dd7bb5..de195a75ab1 100644
--- a/src/mongo/idl/cluster_server_parameter_op_observer.cpp
+++ b/src/mongo/idl/cluster_server_parameter_op_observer.cpp
@@ -41,9 +41,6 @@ namespace mongo {
namespace {
constexpr auto kIdField = "_id"_sd;
constexpr auto kCPTField = "clusterParameterTime"_sd;
-constexpr auto kConfigDB = "config"_sd;
-constexpr auto kClusterParametersColl = "clusterParameters"_sd;
-const NamespaceString kClusterParametersNS(kConfigDB, kClusterParametersColl);
/**
* Per-operation scratch space indicating the document being deleted.
@@ -53,7 +50,7 @@ const NamespaceString kClusterParametersNS(kConfigDB, kClusterParametersColl);
const auto aboutToDeleteDoc = OperationContext::declareDecoration<std::string>();
bool isConfigNamespace(const NamespaceString& nss) {
- return nss == kClusterParametersNS;
+ return nss == NamespaceString::kClusterParametersNamespace;
}
constexpr auto kOplog = "oplog"_sd;
@@ -127,7 +124,7 @@ void doLoadAllParametersFromDisk(OperationContext* opCtx, StringData mode, OnEnt
std::vector<Status> failures;
DBDirectClient client(opCtx);
- FindCommandRequest findRequest{kClusterParametersNS};
+ FindCommandRequest findRequest{NamespaceString::kClusterParametersNamespace};
client.find(std::move(findRequest), ReadPreferenceSetting{}, [&](BSONObj doc) {
try {
onEntry(doc, mode);
@@ -242,7 +239,7 @@ void ClusterServerParameterOpObserver::onDelete(OperationContext* opCtx,
void ClusterServerParameterOpObserver::onDropDatabase(OperationContext* opCtx,
const std::string& dbName) {
- if (dbName == kConfigDB) {
+ if (dbName == NamespaceString::kConfigDb) {
// Entire config DB deleted, reset to default state.
clearAllParameters();
}
@@ -303,7 +300,7 @@ void ClusterServerParameterOpObserver::onImportCollection(OperationContext* opCt
void ClusterServerParameterOpObserver::_onReplicationRollback(OperationContext* opCtx,
const RollbackObserverInfo& rbInfo) {
- if (rbInfo.rollbackNamespaces.count(kClusterParametersNS)) {
+ if (rbInfo.rollbackNamespaces.count(NamespaceString::kClusterParametersNamespace)) {
// Some kind of rollback happend in the settings collection.
// Just reload from disk to be safe.
resynchronizeAllParametersFromDisk(opCtx);
diff --git a/src/mongo/s/request_types/sharded_ddl_commands.idl b/src/mongo/s/request_types/sharded_ddl_commands.idl
index fd0a7a1946f..8f40c11e835 100644
--- a/src/mongo/s/request_types/sharded_ddl_commands.idl
+++ b/src/mongo/s/request_types/sharded_ddl_commands.idl
@@ -453,12 +453,24 @@ commands:
description: "Determines the phase of the blocking/unblocking procedure to be
executed."
+ _shardsvrSetClusterParameter:
+ command_name: _shardsvrSetClusterParameter
+ cpp_name: ShardsvrSetClusterParameter
+ description: "internal _shardsvrSetClusterParameter command"
+ namespace: type
+ type: object
+ api_version: ""
+ strict: false
+ fields:
+ clusterParameterTime:
+ type: timestamp
+ description: "Time that will be associated with the cluster parameter"
+
_configsvrSetClusterParameter:
command_name: _configsvrSetClusterParameter
cpp_name: ConfigsvrSetClusterParameter
description: "Internal command sent to the config server to start a coordinator and set the
cluster parameter."
- # TODO SERVER-63870: use replica set command parameters.
namespace: type
type: object_owned
api_version: ""