diff options
author | Marcos José Grillo Ramirez <marcos.grillo@mongodb.com> | 2022-04-27 09:12:20 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-27 09:50:08 +0000 |
commit | c78b6a067da0f6bc123f839f0f62bea64d8adf66 (patch) | |
tree | 9134b0ae6036a64623959b71b1f86178665c3c06 /src/mongo | |
parent | a43a75a97c7b25adbbdb176560afe1cffbac4aa6 (diff) | |
download | mongo-c78b6a067da0f6bc123f839f0f62bea64d8adf66.tar.gz |
SERVER-65386 Change add shard behavior to absorb shard cluster's parameters when promoting first RS to shard
Diffstat (limited to 'src/mongo')
4 files changed, 218 insertions, 32 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 81714673261..f6581e8b90a 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -283,6 +283,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/collection_options', '$BUILD_DIR/mongo/db/catalog_raii', '$BUILD_DIR/mongo/db/commands/mongod_fcv', + '$BUILD_DIR/mongo/db/commands/set_cluster_parameter_invocation', '$BUILD_DIR/mongo/db/commands/set_feature_compatibility_version_idl', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/dbdirectclient', diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 8621ffb153a..070c1d3d78f 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -637,12 +637,32 @@ private: */ void _setUserWriteBlockingStateOnNewShard(OperationContext* opCtx, RemoteCommandTargeter* targeter); + /** + * Given a vector of cluster parameters in disk format, sets them locally. + */ + void _setClusterParametersLocally(OperationContext* opCtx, + const std::vector<BSONObj>& parameters); + /** + * Gets the cluster parameters set on the shard and then saves them locally. + */ + void _pullClusterParametersFromNewShard(OperationContext* opCtx, + RemoteCommandTargeter* targeter); + + /** + * Clean all possible leftover cluster parameters on the new added shard and sets the ones + * stored on the config server. + */ + void _pushClusterParametersToNewShard(OperationContext* opCtx, + RemoteCommandTargeter* targeter, + const std::vector<BSONObj>& clusterParameters); /** - * Sets the cluster parameters on the shard that is being added. + * Determines whether to absorb the cluster parameters on the newly added shard (if we're + * converting from a replica set to a sharded cluster) or set the cluster parameters stored on + * the config server in the newly added shard. */ - void _setClusterParametersOnNewShard(OperationContext* opCtx, RemoteCommandTargeter* targeter); + void _standardizeClusterParameters(OperationContext* opCtx, RemoteCommandTargeter* targeter); // The owning service context ServiceContext* const _serviceContext; diff --git a/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp index b9e33d855e6..0ed9c76d0f0 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp @@ -39,6 +39,7 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/set_feature_compatibility_version_gen.h" #include "mongo/db/ops/write_ops.h" +#include "mongo/db/query/cursor_response.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/add_shard_cmd_gen.h" #include "mongo/db/s/add_shard_util.h" @@ -171,9 +172,35 @@ protected: }); } - void expectRemoveSetClusterParameterDocs(const HostAndPort& target) { + void expectClusterParametersRequest(const HostAndPort& target) { if (!gFeatureFlagClusterWideConfig.isEnabled(serverGlobalParams.featureCompatibility)) return; + auto clusterParameterDocs = uassertStatusOK(getConfigShard()->exhaustiveFindOnConfig( + operationContext(), + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString::kClusterParametersNamespace, + BSONObj(), + BSONObj(), + boost::none)); + + auto shardsDocs = uassertStatusOK(getConfigShard()->exhaustiveFindOnConfig( + operationContext(), + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + repl::ReadConcernLevel::kLocalReadConcern, + ShardType::ConfigNS, + BSONObj(), + BSONObj(), + boost::none)); + + if (shardsDocs.docs.empty() && clusterParameterDocs.docs.empty()) { + expectFindClusterParameterDocs(target); + } else { + expectRemoveClusterParameterDocs(target); + } + } + + void expectRemoveClusterParameterDocs(const HostAndPort& target) { onCommandForAddShard([&](const RemoteCommandRequest& request) { ASSERT_EQ(request.target, target); ASSERT_EQ(request.dbname, NamespaceString::kClusterParametersNamespace.db()); @@ -192,6 +219,20 @@ protected: }); } + void expectFindClusterParameterDocs(const HostAndPort& target) { + onCommandForAddShard([&](const RemoteCommandRequest& request) { + ASSERT_EQ(request.target, target); + ASSERT_EQ(request.dbname, NamespaceString::kClusterParametersNamespace.db()); + ASSERT_BSONOBJ_EQ(request.cmdObj, + BSON("find" << NamespaceString::kClusterParametersNamespace.coll() + << "maxTimeMS" << 30000 << "readConcern" + << BSON("level" + << "majority"))); + auto cursorRes = CursorResponse(NamespaceString::kClusterParametersNamespace, 0, {}); + return cursorRes.toBSON(CursorResponse::ResponseType::InitialResponse); + }); + } + /** * Waits for a request for the shardIdentity document to be upserted into a shard from the * config server on addShard. @@ -462,8 +503,9 @@ TEST_F(AddShardTest, StandaloneBasicSuccess) { // The shard receives a delete op to clear any leftover user_writes_critical_sections doc. expectRemoveUserWritesCriticalSectionsDocs(shardTarget); - // The shard receives a delete op to clear any leftover clusterParameters doc. - expectRemoveSetClusterParameterDocs(shardTarget); + // The shard receives a delete op to clear any leftover clusterParameters doc or a find to get + // all cluster parameters in the replica set that is being promoted to a sharded cluster. + expectClusterParametersRequest(shardTarget); // The shard receives the setFeatureCompatibilityVersion command. expectSetFeatureCompatibilityVersion(shardTarget, BSON("ok" << 1), expectWriteConcern.toBSON()); @@ -548,10 +590,11 @@ TEST_F(AddShardTest, StandaloneGenerateName) { // The shard receives a delete op to clear any leftover user_writes_critical_sections doc. expectRemoveUserWritesCriticalSectionsDocs(shardTarget); - // The shard receives a delete op to clear any leftover clusterParameters doc. - expectRemoveSetClusterParameterDocs(shardTarget); + // The shard receives a delete op to clear any leftover clusterParameters doc or a find to get + // all cluster parameters in the replica set that is being promoted to a sharded cluster. + expectClusterParametersRequest(shardTarget); - // The shard receives the setFeatureCompatibilityVersion command. + // The shard receives the setFeatureCompatibilityVersion command expectSetFeatureCompatibilityVersion( shardTarget, BSON("ok" << 1), operationContext()->getWriteConcern().toBSON()); @@ -949,8 +992,9 @@ TEST_F(AddShardTest, SuccessfullyAddReplicaSet) { // The shard receives a delete op to clear any leftover user_writes_critical_sections doc. expectRemoveUserWritesCriticalSectionsDocs(shardTarget); - // The shard receives a delete op to clear any leftover clusterParameters doc. - expectRemoveSetClusterParameterDocs(shardTarget); + // The shard receives a delete op to clear any leftover clusterParameters doc or a find to get + // all cluster parameters in the replica set that is being promoted to a sharded cluster. + expectClusterParametersRequest(shardTarget); // The shard receives the setFeatureCompatibilityVersion command. expectSetFeatureCompatibilityVersion( @@ -1020,8 +1064,9 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) { // The shard receives a delete op to clear any leftover user_writes_critical_sections doc. expectRemoveUserWritesCriticalSectionsDocs(shardTarget); - // The shard receives a delete op to clear any leftover clusterParameters doc. - expectRemoveSetClusterParameterDocs(shardTarget); + // The shard receives a delete op to clear any leftover clusterParameters doc or a find to get + // all cluster parameters in the replica set that is being promoted to a sharded cluster. + expectClusterParametersRequest(shardTarget); // The shard receives the setFeatureCompatibilityVersion command. expectSetFeatureCompatibilityVersion( @@ -1104,8 +1149,9 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) { // The shard receives a delete op to clear any leftover user_writes_critical_sections doc. expectRemoveUserWritesCriticalSectionsDocs(shardTarget); - // The shard receives a delete op to clear any leftover clusterParameters doc. - expectRemoveSetClusterParameterDocs(shardTarget); + // The shard receives a delete op to clear any leftover clusterParameters doc or a find to get + // all cluster parameters in the replica set that is being promoted to a sharded cluster. + expectClusterParametersRequest(shardTarget); // The shard receives the setFeatureCompatibilityVersion command. expectSetFeatureCompatibilityVersion( diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp index b2bf68bdd6c..c6e1055ede9 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp @@ -41,6 +41,7 @@ #include "mongo/bson/bsonobj_comparator.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connection_string.h" +#include "mongo/client/fetcher.h" #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/replica_set_monitor.h" @@ -51,6 +52,7 @@ #include "mongo/db/commands/cluster_server_parameter_cmds_gen.h" #include "mongo/db/commands/feature_compatibility_version.h" #include "mongo/db/commands/feature_compatibility_version_parser.h" +#include "mongo/db/commands/set_cluster_parameter_invocation.h" #include "mongo/db/commands/set_feature_compatibility_version_gen.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" @@ -72,6 +74,8 @@ #include "mongo/idl/cluster_server_parameter_gen.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/rpc/metadata/tracking_metadata.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_database_gen.h" @@ -98,6 +102,9 @@ using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackA using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn; const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); +const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout}; /** * Generates a unique name to be given to a newly added shard. @@ -687,8 +694,8 @@ StatusWith<std::string> ShardingCatalogManager::addShard( // Set the user-writes blocking state on the new shard. _setUserWriteBlockingStateOnNewShard(opCtx, targeter.get()); - // Set the cluster parameters on the new shard. - _setClusterParametersOnNewShard(opCtx, targeter.get()); + // Determine the set of cluster parameters to be used. + _standardizeClusterParameters(opCtx, targeter.get()); { // Keep the FCV stable across checking the FCV, sending setFCV to the new shard and writing @@ -1119,12 +1126,99 @@ void ShardingCatalogManager::_setUserWriteBlockingStateOnNewShard(OperationConte }); } -void ShardingCatalogManager::_setClusterParametersOnNewShard(OperationContext* opCtx, - RemoteCommandTargeter* targeter) { - if (!gFeatureFlagClusterWideConfig.isEnabled(serverGlobalParams.featureCompatibility)) - return; +void ShardingCatalogManager::_setClusterParametersLocally(OperationContext* opCtx, + const std::vector<BSONObj>& parameters) { + DBDirectClient client(opCtx); + ClusterParameterDBClientService dbService(client); + for (auto& parameter : parameters) { + SetClusterParameter setClusterParameterRequest( + BSON(parameter["_id"].String() << parameter.filterFieldsUndotted( + BSON("_id" << 1 << "clusterParameterTime" << 1), false))); + setClusterParameterRequest.setDbName(NamespaceString::kAdminDb); + std::unique_ptr<ServerParameterService> parameterService = + std::make_unique<ClusterParameterService>(); + SetClusterParameterInvocation invocation{std::move(parameterService), dbService}; + invocation.invoke(opCtx, + setClusterParameterRequest, + parameter["clusterParameterTime"].timestamp(), + kMajorityWriteConcern); + } +} + +void ShardingCatalogManager::_pullClusterParametersFromNewShard(OperationContext* opCtx, + RemoteCommandTargeter* targeter) { + LOGV2(6538600, "Pulling cluster parameters from new shard"); + + // We can safely query the cluster parameters because the replica set must have been started + // with --shardsvr in order to add it into the cluster, and in this mode no setClusterParameter + // can be called on the replica set directly. + auto host = uassertStatusOK( + targeter->findHost(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly})); - LOGV2_DEBUG(6360600, 2, "Pushing cluster parameters into new shard"); + const Milliseconds maxTimeMS = + std::min(opCtx->getRemainingMaxTimeMillis(), Milliseconds(Seconds{30})); + BSONObjBuilder findCmdBuilder; + { + FindCommandRequest findCommand(NamespaceString::kClusterParametersNamespace); + auto readConcern = repl::ReadConcernArgs( + boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern)); + findCommand.setReadConcern(readConcern.toBSONInner()); + findCommand.setMaxTimeMS(durationCount<Milliseconds>(maxTimeMS)); + findCommand.serialize(BSONObj(), &findCmdBuilder); + } + + // If for some reason the callback never gets invoked, we will return this status in response. + Status status = + Status(ErrorCodes::InternalError, "Internal error running cursor callback in command"); + + std::vector<BSONObj> parameters; + auto fetcherCallback = + [this, &status, ¶meters](const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + // Throw out any accumulated results on error + if (!dataStatus.isOK()) { + status = dataStatus.getStatus(); + return; + } + const auto& data = dataStatus.getValue(); + + for (const BSONObj& doc : data.documents) { + parameters.push_back(doc.getOwned()); + } + + status = Status::OK(); + + if (!getMoreBob) { + return; + } + getMoreBob->append("getMore", data.cursorId); + getMoreBob->append("collection", data.nss.coll()); + }; + + Fetcher fetcher(_executorForAddShard.get(), + std::move(host), + NamespaceString::kClusterParametersNamespace.db().toString(), + findCmdBuilder.obj(), + fetcherCallback, + BSONObj(), /* metadata tracking, only used for shards */ + maxTimeMS, /* command network timeout */ + maxTimeMS /* getMore network timeout */); + + uassertStatusOK(fetcher.schedule()); + + uassertStatusOK(fetcher.join(opCtx)); + + uassertStatusOK(status); + + _setClusterParametersLocally(opCtx, parameters); +} + +void ShardingCatalogManager::_pushClusterParametersToNewShard( + OperationContext* opCtx, + RemoteCommandTargeter* targeter, + const std::vector<BSONObj>& clusterParameters) { + LOGV2(6360600, "Pushing cluster parameters into new shard"); // Remove possible leftovers config.clusterParameters documents from the new shard. { @@ -1142,17 +1236,7 @@ void ShardingCatalogManager::_setClusterParametersOnNewShard(OperationContext* o } // Push cluster parameters into the newly added shard. - auto clusterParameterDocs = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString::kClusterParametersNamespace, - BSONObj(), - BSONObj(), - boost::none)); - - for (auto& parameter : clusterParameterDocs.docs) { + for (auto& parameter : clusterParameters) { ShardsvrSetClusterParameter setClusterParamsCmd( BSON(parameter["_id"].String() << parameter.filterFieldsUndotted( BSON("_id" << 1 << "clusterParameterTime" << 1), false))); @@ -1168,4 +1252,39 @@ void ShardingCatalogManager::_setClusterParametersOnNewShard(OperationContext* o } } +void ShardingCatalogManager::_standardizeClusterParameters(OperationContext* opCtx, + RemoteCommandTargeter* targeter) { + if (!gFeatureFlagClusterWideConfig.isEnabled(serverGlobalParams.featureCompatibility)) + return; + + auto clusterParameterDocs = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString::kClusterParametersNamespace, + BSONObj(), + BSONObj(), + boost::none)); + + auto shardsDocs = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + repl::ReadConcernLevel::kLocalReadConcern, + ShardType::ConfigNS, + BSONObj(), + BSONObj(), + boost::none)); + + // If this is the first shard being added, and no cluster parameters have been set, then this + // can be seen as a replica set to shard conversion. Absorb all of this shard's cluster + // parameters. + if (shardsDocs.docs.empty() && clusterParameterDocs.docs.empty()) { + _pullClusterParametersFromNewShard(opCtx, targeter); + } else { + _pushClusterParametersToNewShard(opCtx, targeter, clusterParameterDocs.docs); + } +} + } // namespace mongo |