summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorMarcos José Grillo Ramirez <marcos.grillo@mongodb.com>2022-04-27 09:12:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-27 09:50:08 +0000
commitc78b6a067da0f6bc123f839f0f62bea64d8adf66 (patch)
tree9134b0ae6036a64623959b71b1f86178665c3c06 /src/mongo/db/s
parenta43a75a97c7b25adbbdb176560afe1cffbac4aa6 (diff)
downloadmongo-c78b6a067da0f6bc123f839f0f62bea64d8adf66.tar.gz
SERVER-65386 Change add shard behavior to absorb shard cluster's parameters when promoting first RS to shard
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h24
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp70
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp155
4 files changed, 218 insertions, 32 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 81714673261..f6581e8b90a 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -283,6 +283,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/collection_options',
'$BUILD_DIR/mongo/db/catalog_raii',
'$BUILD_DIR/mongo/db/commands/mongod_fcv',
+ '$BUILD_DIR/mongo/db/commands/set_cluster_parameter_invocation',
'$BUILD_DIR/mongo/db/commands/set_feature_compatibility_version_idl',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/dbdirectclient',
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 8621ffb153a..070c1d3d78f 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -637,12 +637,32 @@ private:
*/
void _setUserWriteBlockingStateOnNewShard(OperationContext* opCtx,
RemoteCommandTargeter* targeter);
+ /**
+ * Given a vector of cluster parameters in disk format, sets them locally.
+ */
+ void _setClusterParametersLocally(OperationContext* opCtx,
+ const std::vector<BSONObj>& parameters);
+ /**
+ * Gets the cluster parameters set on the shard and then saves them locally.
+ */
+ void _pullClusterParametersFromNewShard(OperationContext* opCtx,
+ RemoteCommandTargeter* targeter);
+
+ /**
+ * Clean all possible leftover cluster parameters on the new added shard and sets the ones
+ * stored on the config server.
+ */
+ void _pushClusterParametersToNewShard(OperationContext* opCtx,
+ RemoteCommandTargeter* targeter,
+ const std::vector<BSONObj>& clusterParameters);
/**
- * Sets the cluster parameters on the shard that is being added.
+ * Determines whether to absorb the cluster parameters on the newly added shard (if we're
+ * converting from a replica set to a sharded cluster) or set the cluster parameters stored on
+ * the config server in the newly added shard.
*/
- void _setClusterParametersOnNewShard(OperationContext* opCtx, RemoteCommandTargeter* targeter);
+ void _standardizeClusterParameters(OperationContext* opCtx, RemoteCommandTargeter* targeter);
// The owning service context
ServiceContext* const _serviceContext;
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp
index b9e33d855e6..0ed9c76d0f0 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/set_feature_compatibility_version_gen.h"
#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/query/cursor_response.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/add_shard_cmd_gen.h"
#include "mongo/db/s/add_shard_util.h"
@@ -171,9 +172,35 @@ protected:
});
}
- void expectRemoveSetClusterParameterDocs(const HostAndPort& target) {
+ void expectClusterParametersRequest(const HostAndPort& target) {
if (!gFeatureFlagClusterWideConfig.isEnabled(serverGlobalParams.featureCompatibility))
return;
+ auto clusterParameterDocs = uassertStatusOK(getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString::kClusterParametersNamespace,
+ BSONObj(),
+ BSONObj(),
+ boost::none));
+
+ auto shardsDocs = uassertStatusOK(getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ShardType::ConfigNS,
+ BSONObj(),
+ BSONObj(),
+ boost::none));
+
+ if (shardsDocs.docs.empty() && clusterParameterDocs.docs.empty()) {
+ expectFindClusterParameterDocs(target);
+ } else {
+ expectRemoveClusterParameterDocs(target);
+ }
+ }
+
+ void expectRemoveClusterParameterDocs(const HostAndPort& target) {
onCommandForAddShard([&](const RemoteCommandRequest& request) {
ASSERT_EQ(request.target, target);
ASSERT_EQ(request.dbname, NamespaceString::kClusterParametersNamespace.db());
@@ -192,6 +219,20 @@ protected:
});
}
+ void expectFindClusterParameterDocs(const HostAndPort& target) {
+ onCommandForAddShard([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(request.target, target);
+ ASSERT_EQ(request.dbname, NamespaceString::kClusterParametersNamespace.db());
+ ASSERT_BSONOBJ_EQ(request.cmdObj,
+ BSON("find" << NamespaceString::kClusterParametersNamespace.coll()
+ << "maxTimeMS" << 30000 << "readConcern"
+ << BSON("level"
+ << "majority")));
+ auto cursorRes = CursorResponse(NamespaceString::kClusterParametersNamespace, 0, {});
+ return cursorRes.toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+ }
+
/**
* Waits for a request for the shardIdentity document to be upserted into a shard from the
* config server on addShard.
@@ -462,8 +503,9 @@ TEST_F(AddShardTest, StandaloneBasicSuccess) {
// The shard receives a delete op to clear any leftover user_writes_critical_sections doc.
expectRemoveUserWritesCriticalSectionsDocs(shardTarget);
- // The shard receives a delete op to clear any leftover clusterParameters doc.
- expectRemoveSetClusterParameterDocs(shardTarget);
+ // The shard receives a delete op to clear any leftover clusterParameters doc or a find to get
+ // all cluster parameters in the replica set that is being promoted to a sharded cluster.
+ expectClusterParametersRequest(shardTarget);
// The shard receives the setFeatureCompatibilityVersion command.
expectSetFeatureCompatibilityVersion(shardTarget, BSON("ok" << 1), expectWriteConcern.toBSON());
@@ -548,10 +590,11 @@ TEST_F(AddShardTest, StandaloneGenerateName) {
// The shard receives a delete op to clear any leftover user_writes_critical_sections doc.
expectRemoveUserWritesCriticalSectionsDocs(shardTarget);
- // The shard receives a delete op to clear any leftover clusterParameters doc.
- expectRemoveSetClusterParameterDocs(shardTarget);
+ // The shard receives a delete op to clear any leftover clusterParameters doc or a find to get
+ // all cluster parameters in the replica set that is being promoted to a sharded cluster.
+ expectClusterParametersRequest(shardTarget);
- // The shard receives the setFeatureCompatibilityVersion command.
+ // The shard receives the setFeatureCompatibilityVersion command
expectSetFeatureCompatibilityVersion(
shardTarget, BSON("ok" << 1), operationContext()->getWriteConcern().toBSON());
@@ -949,8 +992,9 @@ TEST_F(AddShardTest, SuccessfullyAddReplicaSet) {
// The shard receives a delete op to clear any leftover user_writes_critical_sections doc.
expectRemoveUserWritesCriticalSectionsDocs(shardTarget);
- // The shard receives a delete op to clear any leftover clusterParameters doc.
- expectRemoveSetClusterParameterDocs(shardTarget);
+ // The shard receives a delete op to clear any leftover clusterParameters doc or a find to get
+ // all cluster parameters in the replica set that is being promoted to a sharded cluster.
+ expectClusterParametersRequest(shardTarget);
// The shard receives the setFeatureCompatibilityVersion command.
expectSetFeatureCompatibilityVersion(
@@ -1020,8 +1064,9 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) {
// The shard receives a delete op to clear any leftover user_writes_critical_sections doc.
expectRemoveUserWritesCriticalSectionsDocs(shardTarget);
- // The shard receives a delete op to clear any leftover clusterParameters doc.
- expectRemoveSetClusterParameterDocs(shardTarget);
+ // The shard receives a delete op to clear any leftover clusterParameters doc or a find to get
+ // all cluster parameters in the replica set that is being promoted to a sharded cluster.
+ expectClusterParametersRequest(shardTarget);
// The shard receives the setFeatureCompatibilityVersion command.
expectSetFeatureCompatibilityVersion(
@@ -1104,8 +1149,9 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) {
// The shard receives a delete op to clear any leftover user_writes_critical_sections doc.
expectRemoveUserWritesCriticalSectionsDocs(shardTarget);
- // The shard receives a delete op to clear any leftover clusterParameters doc.
- expectRemoveSetClusterParameterDocs(shardTarget);
+ // The shard receives a delete op to clear any leftover clusterParameters doc or a find to get
+ // all cluster parameters in the replica set that is being promoted to a sharded cluster.
+ expectClusterParametersRequest(shardTarget);
// The shard receives the setFeatureCompatibilityVersion command.
expectSetFeatureCompatibilityVersion(
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp
index b2bf68bdd6c..c6e1055ede9 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp
@@ -41,6 +41,7 @@
#include "mongo/bson/bsonobj_comparator.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connection_string.h"
+#include "mongo/client/fetcher.h"
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/client/replica_set_monitor.h"
@@ -51,6 +52,7 @@
#include "mongo/db/commands/cluster_server_parameter_cmds_gen.h"
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/commands/feature_compatibility_version_parser.h"
+#include "mongo/db/commands/set_cluster_parameter_invocation.h"
#include "mongo/db/commands/set_feature_compatibility_version_gen.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
@@ -72,6 +74,8 @@
#include "mongo/idl/cluster_server_parameter_gen.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/rpc/metadata/tracking_metadata.h"
#include "mongo/s/catalog/config_server_version.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_database_gen.h"
@@ -98,6 +102,9 @@ using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackA
using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn;
const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
+const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kNoTimeout};
/**
* Generates a unique name to be given to a newly added shard.
@@ -687,8 +694,8 @@ StatusWith<std::string> ShardingCatalogManager::addShard(
// Set the user-writes blocking state on the new shard.
_setUserWriteBlockingStateOnNewShard(opCtx, targeter.get());
- // Set the cluster parameters on the new shard.
- _setClusterParametersOnNewShard(opCtx, targeter.get());
+ // Determine the set of cluster parameters to be used.
+ _standardizeClusterParameters(opCtx, targeter.get());
{
// Keep the FCV stable across checking the FCV, sending setFCV to the new shard and writing
@@ -1119,12 +1126,99 @@ void ShardingCatalogManager::_setUserWriteBlockingStateOnNewShard(OperationConte
});
}
-void ShardingCatalogManager::_setClusterParametersOnNewShard(OperationContext* opCtx,
- RemoteCommandTargeter* targeter) {
- if (!gFeatureFlagClusterWideConfig.isEnabled(serverGlobalParams.featureCompatibility))
- return;
+void ShardingCatalogManager::_setClusterParametersLocally(OperationContext* opCtx,
+ const std::vector<BSONObj>& parameters) {
+ DBDirectClient client(opCtx);
+ ClusterParameterDBClientService dbService(client);
+ for (auto& parameter : parameters) {
+ SetClusterParameter setClusterParameterRequest(
+ BSON(parameter["_id"].String() << parameter.filterFieldsUndotted(
+ BSON("_id" << 1 << "clusterParameterTime" << 1), false)));
+ setClusterParameterRequest.setDbName(NamespaceString::kAdminDb);
+ std::unique_ptr<ServerParameterService> parameterService =
+ std::make_unique<ClusterParameterService>();
+ SetClusterParameterInvocation invocation{std::move(parameterService), dbService};
+ invocation.invoke(opCtx,
+ setClusterParameterRequest,
+ parameter["clusterParameterTime"].timestamp(),
+ kMajorityWriteConcern);
+ }
+}
+
+void ShardingCatalogManager::_pullClusterParametersFromNewShard(OperationContext* opCtx,
+ RemoteCommandTargeter* targeter) {
+ LOGV2(6538600, "Pulling cluster parameters from new shard");
+
+ // We can safely query the cluster parameters because the replica set must have been started
+ // with --shardsvr in order to add it into the cluster, and in this mode no setClusterParameter
+ // can be called on the replica set directly.
+ auto host = uassertStatusOK(
+ targeter->findHost(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}));
- LOGV2_DEBUG(6360600, 2, "Pushing cluster parameters into new shard");
+ const Milliseconds maxTimeMS =
+ std::min(opCtx->getRemainingMaxTimeMillis(), Milliseconds(Seconds{30}));
+ BSONObjBuilder findCmdBuilder;
+ {
+ FindCommandRequest findCommand(NamespaceString::kClusterParametersNamespace);
+ auto readConcern = repl::ReadConcernArgs(
+ boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern));
+ findCommand.setReadConcern(readConcern.toBSONInner());
+ findCommand.setMaxTimeMS(durationCount<Milliseconds>(maxTimeMS));
+ findCommand.serialize(BSONObj(), &findCmdBuilder);
+ }
+
+ // If for some reason the callback never gets invoked, we will return this status in response.
+ Status status =
+ Status(ErrorCodes::InternalError, "Internal error running cursor callback in command");
+
+ std::vector<BSONObj> parameters;
+ auto fetcherCallback =
+ [this, &status, &parameters](const Fetcher::QueryResponseStatus& dataStatus,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ // Throw out any accumulated results on error
+ if (!dataStatus.isOK()) {
+ status = dataStatus.getStatus();
+ return;
+ }
+ const auto& data = dataStatus.getValue();
+
+ for (const BSONObj& doc : data.documents) {
+ parameters.push_back(doc.getOwned());
+ }
+
+ status = Status::OK();
+
+ if (!getMoreBob) {
+ return;
+ }
+ getMoreBob->append("getMore", data.cursorId);
+ getMoreBob->append("collection", data.nss.coll());
+ };
+
+ Fetcher fetcher(_executorForAddShard.get(),
+ std::move(host),
+ NamespaceString::kClusterParametersNamespace.db().toString(),
+ findCmdBuilder.obj(),
+ fetcherCallback,
+ BSONObj(), /* metadata tracking, only used for shards */
+ maxTimeMS, /* command network timeout */
+ maxTimeMS /* getMore network timeout */);
+
+ uassertStatusOK(fetcher.schedule());
+
+ uassertStatusOK(fetcher.join(opCtx));
+
+ uassertStatusOK(status);
+
+ _setClusterParametersLocally(opCtx, parameters);
+}
+
+void ShardingCatalogManager::_pushClusterParametersToNewShard(
+ OperationContext* opCtx,
+ RemoteCommandTargeter* targeter,
+ const std::vector<BSONObj>& clusterParameters) {
+ LOGV2(6360600, "Pushing cluster parameters into new shard");
// Remove possible leftovers config.clusterParameters documents from the new shard.
{
@@ -1142,17 +1236,7 @@ void ShardingCatalogManager::_setClusterParametersOnNewShard(OperationContext* o
}
// Push cluster parameters into the newly added shard.
- auto clusterParameterDocs =
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- repl::ReadConcernLevel::kLocalReadConcern,
- NamespaceString::kClusterParametersNamespace,
- BSONObj(),
- BSONObj(),
- boost::none));
-
- for (auto& parameter : clusterParameterDocs.docs) {
+ for (auto& parameter : clusterParameters) {
ShardsvrSetClusterParameter setClusterParamsCmd(
BSON(parameter["_id"].String() << parameter.filterFieldsUndotted(
BSON("_id" << 1 << "clusterParameterTime" << 1), false)));
@@ -1168,4 +1252,39 @@ void ShardingCatalogManager::_setClusterParametersOnNewShard(OperationContext* o
}
}
+void ShardingCatalogManager::_standardizeClusterParameters(OperationContext* opCtx,
+ RemoteCommandTargeter* targeter) {
+ if (!gFeatureFlagClusterWideConfig.isEnabled(serverGlobalParams.featureCompatibility))
+ return;
+
+ auto clusterParameterDocs =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString::kClusterParametersNamespace,
+ BSONObj(),
+ BSONObj(),
+ boost::none));
+
+ auto shardsDocs =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ShardType::ConfigNS,
+ BSONObj(),
+ BSONObj(),
+ boost::none));
+
+ // If this is the first shard being added, and no cluster parameters have been set, then this
+ // can be seen as a replica set to shard conversion. Absorb all of this shard's cluster
+ // parameters.
+ if (shardsDocs.docs.empty() && clusterParameterDocs.docs.empty()) {
+ _pullClusterParametersFromNewShard(opCtx, targeter);
+ } else {
+ _pushClusterParametersToNewShard(opCtx, targeter, clusterParameterDocs.docs);
+ }
+}
+
} // namespace mongo