diff options
author | Marcos José Grillo Ramirez <marcos.grillo@mongodb.com> | 2022-04-08 17:03:34 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-08 17:43:16 +0000 |
commit | e15d236351e05a1d9bf3d3b043a260fd90972301 (patch) | |
tree | c9ff4209d405d6f64fdb417533285758021bf5ac /src/mongo | |
parent | 8aafd05e614a1fb68b2fa1de53a71ba11ee934b0 (diff) | |
download | mongo-e15d236351e05a1d9bf3d3b043a260fd90972301.tar.gz |
SERVER-63606 Serialize setClusterParameter with add/remove shard and push parameters on newly added shards
Diffstat (limited to 'src/mongo')
7 files changed, 161 insertions, 33 deletions
diff --git a/src/mongo/db/commands/set_cluster_parameter_invocation.cpp b/src/mongo/db/commands/set_cluster_parameter_invocation.cpp index b021b800e12..6d32f73b393 100644 --- a/src/mongo/db/commands/set_cluster_parameter_invocation.cpp +++ b/src/mongo/db/commands/set_cluster_parameter_invocation.cpp @@ -70,9 +70,10 @@ bool SetClusterParameterInvocation::invoke(OperationContext* opCtx, BSONObj query = BSON("_id" << parameterName); BSONObj update = updateBuilder.obj(); + uassertStatusOK(serverParameter->validate(update)); + LOGV2_DEBUG( 6432603, 2, "Updating cluster parameter on-disk", "clusterParameter"_attr = parameterName); - uassertStatusOK(serverParameter->validate(update)); return uassertStatusOK(_dbService.updateParameterOnDisk(opCtx, query, update, writeConcern)); } diff --git a/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp b/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp index 3966879144d..07f812c3e51 100644 --- a/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp +++ b/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp @@ -37,6 +37,7 @@ #include "mongo/db/commands/cluster_server_parameter_cmds_gen.h" #include "mongo/db/commands/set_cluster_parameter_invocation.h" #include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/sharding_logging.h" #include "mongo/db/s/sharding_util.h" #include "mongo/db/vector_clock.h" @@ -175,38 +176,47 @@ ExecutorFuture<void> SetClusterParameterCoordinator::_runImpl( _doc.setClusterParameterTime(clusterParameterTime.asTimestamp()); } }) - .then(_executePhase(Phase::kSetClusterParameter, - [this, executor = executor, anchor = shared_from_this()] { - auto opCtxHolder = cc().makeOperationContext(); - auto* opCtx = opCtxHolder.get(); - - ShardingLogging::get(opCtx)->logChange( - opCtx, - "setClusterParameter.start", - NamespaceString::kClusterParametersNamespace.toString(), - _doc.getParameter(), - kMajorityWriteConcern); - - // If the parameter was already set on the config server, there is - // nothing else to do. - if (_isClusterParameterSetAtTimestamp(opCtx)) { - return; - } - - _doc = _updateSession(opCtx, _doc); - const auto session = _getCurrentSession(); - - _sendSetClusterParameterToAllShards(opCtx, session, executor); - - _commit(opCtx); - - ShardingLogging::get(opCtx)->logChange( - opCtx, - "setClusterParameter.end", - NamespaceString::kClusterParametersNamespace.toString(), - _doc.getParameter(), - kMajorityWriteConcern); - })); + .then(_executePhase( + Phase::kSetClusterParameter, [this, executor = executor, anchor = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + + ShardingLogging::get(opCtx)->logChange( + opCtx, + "setClusterParameter.start", + NamespaceString::kClusterParametersNamespace.toString(), + _doc.getParameter(), + kMajorityWriteConcern); + + // If the parameter was already set on the config server, there is + // nothing else to do. + if (_isClusterParameterSetAtTimestamp(opCtx)) { + return; + } + + _doc = _updateSession(opCtx, _doc); + const auto session = _getCurrentSession(); + + { + // Ensure the topology is stable so shards added concurrently will + // not miss the cluster parameter. Keep it stable until we have + // persisted the cluster parameter on the configsvr so that new + // shards that get added will see the new cluster parameter. + Lock::SharedLock stableTopologyRegion = + ShardingCatalogManager::get(opCtx)->enterStableTopologyRegion(opCtx); + + _sendSetClusterParameterToAllShards(opCtx, session, executor); + + _commit(opCtx); + } + + ShardingLogging::get(opCtx)->logChange( + opCtx, + "setClusterParameter.end", + NamespaceString::kClusterParametersNamespace.toString(), + _doc.getParameter(), + kMajorityWriteConcern); + })); } } // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 36ac49564e4..121d6595b21 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -634,6 +634,12 @@ private: void _setUserWriteBlockingStateOnNewShard(OperationContext* opCtx, RemoteCommandTargeter* targeter); + + /** + * Sets the cluster parameters on the shard that is being added. + */ + void _setClusterParametersOnNewShard(OperationContext* opCtx, RemoteCommandTargeter* targeter); + // The owning service context ServiceContext* const _serviceContext; diff --git a/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp index 24b558f7b40..3c42d87e79f 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp @@ -45,6 +45,7 @@ #include "mongo/db/s/config/config_server_test_fixture.h" #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/type_shard_identity.h" +#include "mongo/idl/cluster_server_parameter_gen.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/type_changelog.h" #include "mongo/s/catalog/type_config_version.h" @@ -170,6 +171,27 @@ protected: }); } + void expectRemoveSetClusterParameterDocs(const HostAndPort& target) { + if (!gFeatureFlagClusterWideConfig.isEnabled(serverGlobalParams.featureCompatibility)) + return; + onCommandForAddShard([&](const RemoteCommandRequest& request) { + ASSERT_EQ(request.target, target); + ASSERT_EQ(request.dbname, NamespaceString::kClusterParametersNamespace.db()); + ASSERT_BSONOBJ_EQ(request.cmdObj, + BSON("delete" << NamespaceString::kClusterParametersNamespace.coll() + << "bypassDocumentValidation" << false << "ordered" + << true << "deletes" + << BSON_ARRAY(BSON("q" << BSONObj() << "limit" << 0)) + << "writeConcern" + << BSON("w" + << "majority" + << "wtimeout" << 60000))); + ASSERT_BSONOBJ_EQ(rpc::makeEmptyMetadata(), request.metadata); + + return BSON("ok" << 1); + }); + } + /** * Waits for a request for the shardIdentity document to be upserted into a shard from the * config server on addShard. @@ -437,6 +459,9 @@ TEST_F(AddShardTest, StandaloneBasicSuccess) { // The shard receives the _addShard command expectAddShardCmdReturnSuccess(shardTarget, expectedShardName); + // The shard receives a delete op to clear any leftover clusterParameters doc. + expectRemoveSetClusterParameterDocs(shardTarget); + // The shard receives a delete op to clear any leftover user_writes_critical_sections doc. expectRemoveUserWritesCriticalSectionsDocs(shardTarget); @@ -520,6 +545,9 @@ TEST_F(AddShardTest, StandaloneGenerateName) { // The shard receives the _addShard command expectAddShardCmdReturnSuccess(shardTarget, expectedShardName); + // The shard receives a delete op to clear any leftover clusterParameters doc. + expectRemoveSetClusterParameterDocs(shardTarget); + // The shard receives a delete op to clear any leftover user_writes_critical_sections doc. expectRemoveUserWritesCriticalSectionsDocs(shardTarget); @@ -918,6 +946,9 @@ TEST_F(AddShardTest, SuccessfullyAddReplicaSet) { // The shard receives the _addShard command expectAddShardCmdReturnSuccess(shardTarget, expectedShardName); + // The shard receives a delete op to clear any leftover clusterParameters doc. + expectRemoveSetClusterParameterDocs(shardTarget); + // The shard receives a delete op to clear any leftover user_writes_critical_sections doc. expectRemoveUserWritesCriticalSectionsDocs(shardTarget); @@ -986,6 +1017,9 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) { // The shard receives the _addShard command expectAddShardCmdReturnSuccess(shardTarget, expectedShardName); + // The shard receives a delete op to clear any leftover clusterParameters doc. + expectRemoveSetClusterParameterDocs(shardTarget); + // The shard receives a delete op to clear any leftover user_writes_critical_sections doc. expectRemoveUserWritesCriticalSectionsDocs(shardTarget); @@ -1067,6 +1101,9 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) { // The shard receives the _addShard command expectAddShardCmdReturnSuccess(shardTarget, expectedShardName); + // The shard receives a delete op to clear any leftover clusterParameters doc. + expectRemoveSetClusterParameterDocs(shardTarget); + // The shard receives a delete op to clear any leftover user_writes_critical_sections doc. expectRemoveUserWritesCriticalSectionsDocs(shardTarget); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp index a667a80e94c..0690bb27dd1 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp @@ -48,6 +48,7 @@ #include "mongo/db/audit.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/client.h" +#include "mongo/db/commands/cluster_server_parameter_cmds_gen.h" #include "mongo/db/commands/feature_compatibility_version.h" #include "mongo/db/commands/feature_compatibility_version_parser.h" #include "mongo/db/commands/set_feature_compatibility_version_gen.h" @@ -68,6 +69,7 @@ #include "mongo/db/vector_clock_mutable.h" #include "mongo/db/wire_version.h" #include "mongo/executor/task_executor.h" +#include "mongo/idl/cluster_server_parameter_gen.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/config_server_version.h" @@ -685,6 +687,9 @@ StatusWith<std::string> ShardingCatalogManager::addShard( // Set the user-writes blocking state on the new shard. _setUserWriteBlockingStateOnNewShard(opCtx, targeter.get()); + // Set the cluster parameters on the new shard. + _setClusterParametersOnNewShard(opCtx, targeter.get()); + { // Keep the FCV stable across checking the FCV, sending setFCV to the new shard and writing // the entry for the new shard to config.shards. This ensures the FCV doesn't change after @@ -1114,4 +1119,53 @@ void ShardingCatalogManager::_setUserWriteBlockingStateOnNewShard(OperationConte }); } +void ShardingCatalogManager::_setClusterParametersOnNewShard(OperationContext* opCtx, + RemoteCommandTargeter* targeter) { + if (!gFeatureFlagClusterWideConfig.isEnabled(serverGlobalParams.featureCompatibility)) + return; + + LOGV2_DEBUG(6360600, 2, "Pushing cluster parameters into new shard"); + + // Remove possible leftovers config.clusterParameters documents from the new shard. + { + write_ops::DeleteCommandRequest deleteOp(NamespaceString::kClusterParametersNamespace); + write_ops::DeleteOpEntry query({}, true /*multi*/); + deleteOp.setDeletes({query}); + + const auto swCommandResponse = + _runCommandForAddShard(opCtx, + targeter, + NamespaceString::kClusterParametersNamespace.db(), + CommandHelpers::appendMajorityWriteConcern(deleteOp.toBSON({}))); + uassertStatusOK(swCommandResponse.getStatus()); + uassertStatusOK(getStatusFromWriteCommandReply(swCommandResponse.getValue().response)); + } + + // Push cluster parameters into the newly added shard. + auto clusterParameterDocs = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString::kClusterParametersNamespace, + BSONObj(), + BSONObj(), + boost::none)); + + for (const auto parameter : clusterParameterDocs.docs) { + ShardsvrSetClusterParameter setClusterParamsCmd( + BSON(parameter["_id"].String() << parameter.filterFieldsUndotted( + BSON("_id" << 1 << "clusterParameterTime" << 1), false))); + setClusterParamsCmd.setDbName(NamespaceString::kAdminDb); + setClusterParamsCmd.setClusterParameterTime(parameter["clusterParameterTime"].timestamp()); + + const auto cmdResponse = _runCommandForAddShard( + opCtx, + targeter, + NamespaceString::kAdminDb, + CommandHelpers::appendMajorityWriteConcern(setClusterParamsCmd.toBSON({}))); + uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(cmdResponse)); + } +} + } // namespace mongo diff --git a/src/mongo/idl/cluster_server_parameter.idl b/src/mongo/idl/cluster_server_parameter.idl index dc1b53ee0e6..6e8b4b28756 100644 --- a/src/mongo/idl/cluster_server_parameter.idl +++ b/src/mongo/idl/cluster_server_parameter.idl @@ -75,6 +75,17 @@ structs: type: string default: "\"off\"" + TestBoolClusterParameterStorage: + description: "Storage used for testBoolClusterParameter" + inline_chained_structs: true + chained_structs: + ClusterServerParameter: ClusterServerParameter + fields: + boolData: + description: "Some bool parameter" + type: bool + default: 0 + feature_flags: featureFlagClusterWideConfig: description: Mechanism for cluster-wide configuration options @@ -95,3 +106,10 @@ server_parameters: cpp_vartype: TestStrClusterParameterStorage cpp_varname: strStorage test_only: true + + testBoolClusterParameter: + set_at: cluster + description: "Test cluster parameter that is only usable if enableTestCommands=true" + cpp_vartype: TestBoolClusterParameterStorage + cpp_varname: boolStorage + test_only: true diff --git a/src/mongo/s/commands/cluster_get_cluster_parameter_cmd.cpp b/src/mongo/s/commands/cluster_get_cluster_parameter_cmd.cpp index 71bda72dfaf..0d9b7cd3bb5 100644 --- a/src/mongo/s/commands/cluster_get_cluster_parameter_cmd.cpp +++ b/src/mongo/s/commands/cluster_get_cluster_parameter_cmd.cpp @@ -144,8 +144,10 @@ public: // Sort and find the set difference of the requested parameters and the parameters // returned. std::vector<std::string> defaultParameterNames; + defaultParameterNames.reserve(requestedParameterNames.size() - onDiskParameterNames.size()); + std::sort(onDiskParameterNames.begin(), onDiskParameterNames.end()); std::sort(requestedParameterNames.begin(), requestedParameterNames.end()); std::set_difference(requestedParameterNames.begin(), |