summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2023-01-27 17:29:27 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-27 19:26:18 +0000
commit9c7c82b40b7493cb63f971429e04cfb419c62b93 (patch)
treecc33f021b19156e409a8618cae280020f8834a4a
parent25639888e57caae19d152e89c4d68aa935618142 (diff)
downloadmongo-9c7c82b40b7493cb63f971429e04cfb419c62b93.tar.gz
SERVER-72872 Introduce configsvr command to query placementHistory in controlled conditions
-rw-r--r--jstests/core/views/views_all_commands.js1
-rw-r--r--jstests/replsets/all_commands_downgrading_to_upgraded.js1
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js1
-rw-r--r--jstests/sharding/read_write_concern_defaults_application.js1
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/config/configsvr_get_historical_placement_info.cpp123
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client.h13
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.cpp462
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.h15
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.cpp7
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.h6
-rw-r--r--src/mongo/s/request_types/get_historical_placement_info.idl77
13 files changed, 494 insertions, 215 deletions
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index 906886c51ba..e9891cb65eb 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -109,6 +109,7 @@ let viewsCommandTests = {
_configsvrCreateDatabase: {skip: isAnInternalCommand},
_configsvrDropIndexCatalogEntry: {skip: isAnInternalCommand},
_configsvrEnsureChunkVersionIsGreaterThan: {skip: isAnInternalCommand},
+ _configsvrGetHistoricalPlacement: {skip: isAnInternalCommand}, // TODO SERVER-73029 remove
_configsvrMoveChunk: {skip: isAnInternalCommand}, // Can be removed once 6.0 is last LTS
_configsvrMovePrimary: {skip: isAnInternalCommand},
_configsvrMoveRange: {skip: isAnInternalCommand},
diff --git a/jstests/replsets/all_commands_downgrading_to_upgraded.js b/jstests/replsets/all_commands_downgrading_to_upgraded.js
index 322f9198f42..0e4d06fcfb0 100644
--- a/jstests/replsets/all_commands_downgrading_to_upgraded.js
+++ b/jstests/replsets/all_commands_downgrading_to_upgraded.js
@@ -63,6 +63,7 @@ const allCommands = {
_configsvrCreateDatabase: {skip: isAnInternalCommand},
_configsvrDropIndexCatalogEntry: {skip: isAnInternalCommand},
_configsvrEnsureChunkVersionIsGreaterThan: {skip: isAnInternalCommand},
+ _configsvrGetHistoricalPlacement: {skip: isAnInternalCommand}, // TODO SERVER-73029 remove
_configsvrMoveChunk: {skip: isAnInternalCommand},
_configsvrMoveRange: {skip: isAnInternalCommand},
_configsvrRefineCollectionShardKey: {skip: isAnInternalCommand},
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index 8e04bdfd7c0..d24dc1603ad 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -48,6 +48,7 @@ const allCommands = {
_configsvrCreateDatabase: {skip: isPrimaryOnly},
_configsvrDropIndexCatalogEntry: {skip: isPrimaryOnly},
_configsvrEnsureChunkVersionIsGreaterThan: {skip: isPrimaryOnly},
+ _configsvrGetHistoricalPlacement: {skip: isAnInternalCommand}, // TODO SERVER-73029 remove
_configsvrMoveChunk: {skip: isPrimaryOnly},
_configsvrMoveRange: {skip: isPrimaryOnly},
_configsvrRefineCollectionShardKey: {skip: isPrimaryOnly},
diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js
index 81de7608132..6a011fa767b 100644
--- a/jstests/sharding/read_write_concern_defaults_application.js
+++ b/jstests/sharding/read_write_concern_defaults_application.js
@@ -101,6 +101,7 @@ let testCases = {
_configsvrCreateDatabase: {skip: "internal command"},
_configsvrDropIndexCatalogEntry: {skip: "internal command"},
_configsvrEnsureChunkVersionIsGreaterThan: {skip: "internal command"},
+ _configsvrGetHistoricalPlacement: {skip: "internal command"}, // TODO SERVER-73029 remove
_configsvrMoveChunk: {skip: "internal command"},
_configsvrMovePrimary: {skip: "internal command"}, // Can be removed once 6.0 is last LTS
_configsvrMoveRange: {skip: "internal command"},
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 144838d2097..688b4e251a6 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -445,6 +445,7 @@ env.Library(
'config/configsvr_drop_index_catalog_command.cpp',
'config/configsvr_create_database_command.cpp',
'config/configsvr_ensure_chunk_version_is_greater_than_command.cpp',
+ 'config/configsvr_get_historical_placement_info.cpp',
'config/configsvr_merge_all_chunks_on_shard_command.cpp',
'config/configsvr_merge_chunks_command.cpp',
'config/configsvr_move_chunk_command.cpp',
diff --git a/src/mongo/db/s/config/configsvr_get_historical_placement_info.cpp b/src/mongo/db/s/config/configsvr_get_historical_placement_info.cpp
new file mode 100644
index 00000000000..9e71224c7a0
--- /dev/null
+++ b/src/mongo/db/s/config/configsvr_get_historical_placement_info.cpp
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/s/request_types/get_historical_placement_info_gen.h"
+#include "mongo/s/sharding_feature_flags_gen.h"
+
+
+namespace mongo {
+namespace {
+
+class ConfigsvrGetHistoricalPlacementCommand final
+ : public TypedCommand<ConfigsvrGetHistoricalPlacementCommand> {
+public:
+ using Request = ConfigsvrGetHistoricalPlacement;
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ ConfigsvrGetHistoricalPlacementResponse typedRun(OperationContext* opCtx) {
+ const NamespaceString& nss = ns();
+
+ uassert(ErrorCodes::IllegalOperation,
+ "_configsvrGetHistoricalPlacement can only be run on config servers",
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
+
+ // Set the operation context read concern level to majority for reads into the config
+ // database.
+ repl::ReadConcernArgs::get(opCtx) =
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern);
+
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
+
+ if (!feature_flags::gHistoricalPlacementShardingCatalog.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ auto shardsWithOpTime = uassertStatusOK(catalogClient->getAllShards(
+ opCtx, repl::ReadConcernLevel::kMajorityReadConcern));
+ std::vector<ShardId> shardIds;
+ std::transform(shardsWithOpTime.value.begin(),
+ shardsWithOpTime.value.end(),
+ std::back_inserter(shardIds),
+ [](const ShardType& s) { return s.getName(); });
+ ConfigsvrGetHistoricalPlacementResponse response(std::move(shardIds));
+ response.setIsExact(false);
+ return response;
+ }
+
+ boost::optional<NamespaceString> targetedNs = request().getTargetWholeCluster()
+ ? (boost::optional<NamespaceString>)boost::none
+ : nss;
+ return ConfigsvrGetHistoricalPlacementResponse(
+ catalogClient->getHistoricalPlacement(opCtx, request().getAt(), targetedNs));
+ }
+
+ private:
+ NamespaceString ns() const override {
+ return request().getCommandParameter();
+ }
+
+ bool supportsWriteConcern() const override {
+ return false;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
+ ActionType::internal));
+ }
+ };
+
+ bool skipApiVersionCheck() const override {
+ // Internal command (server to server).
+ return true;
+ }
+
+ std::string help() const override {
+ return "Internal command, which is exported by the sharding config server. Do not call "
+ "directly. Allows to run queries concerning historical placement of a namespace in "
+ "a controlled way.";
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+} configsvrGetHistoricalPlacementCmd;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 1a410bbeefd..ae3e0b0fc72 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -220,6 +220,7 @@ env.Library(
'request_types/flush_resharding_state_change.idl',
'request_types/flush_routing_table_cache_updates.idl',
'request_types/get_database_version.idl',
+ 'request_types/get_historical_placement_info.idl',
'request_types/shardsvr_join_migrations_request.idl',
'request_types/merge_chunk_request.idl',
'request_types/migration_secondary_throttle_options.cpp',
diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h
index d93e8e9bddf..31a171b6c4c 100644
--- a/src/mongo/s/catalog/sharding_catalog_client.h
+++ b/src/mongo/s/catalog/sharding_catalog_client.h
@@ -360,6 +360,19 @@ public:
virtual std::vector<ShardId> getShardsThatOwnDataAtClusterTime(
OperationContext* opCtx, const Timestamp& clusterTime) = 0;
+ /**
+ * Queries config.placementHistory to retrieve placement metadata on the requested namespace at
+ * a specific point in time. When no namespace is specified, placement metadata on the whole
+ * cluster will be returned. This function is meant to be exclusively invoked by config server
+ * nodes.
+ *
+ * TODO (SERVER-73029): convert to private method of ShardingCatalogClientImpl
+ */
+ virtual std::vector<ShardId> getHistoricalPlacement(
+ OperationContext* opCtx,
+ const Timestamp& atClusterTime,
+ const boost::optional<NamespaceString>& nss) = 0;
+
protected:
ShardingCatalogClient() = default;
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
index 9afaf5e38ff..e3b78e19477 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
@@ -383,215 +383,6 @@ std::vector<BSONObj> runCatalogAggregation(OperationContext* opCtx,
return aggResult;
}
-std::vector<ShardId> makeAndRunPlacementHistoryAggregation(
- OperationContext* opCtx,
- const std::shared_ptr<Shard>& configShard,
- const Timestamp& minClusterTime,
- const boost::optional<NamespaceString>& nss) {
- /*
- Stage 1. Select only the entry with timestamp <= clusterTime and filter out all nss that are
- not the collection or the database
- Stage 2. sort by timestamp
- Stage 3. Extract the first document for each database and collection matching the received
- namespace
- Stage 4. Discard the entries with empty shards (i.e. the collection was dropped or
- renamed)
- Stage 5. Group all documents and concat shards (this will generate an array of arrays)
- Stage 6. Flatten the array of arrays into a set (this will also remove duplicates)
-
- Stage 7. Access to the list of shards currently active in the cluster
- Stage 8. Count the number of shards obtained on stage 6 that also appear in the list of
- active shards
- Stage 9. Do not return the list of active shards (used only for the count)
-
- regex=^db(\.collection)?$ // matches db or db.collection ( this is skipped in case the whole
- cluster info is searched)
- [
- {
- "$match": { "timestamp": { "$lte": clusterTime } , "nss" : { $regex: regex} }
- },
- {
- "$sort": { "timestamp": -1 }
- },
- {
- "$group": {
- _id: "$nss",
- shards: { $first: "$shards" }
- }
- },
- { "$match": { shards: { $not: { $size: 0 } } } },
- {
- "$group": {
- _id: "",
- shards: { $push: "$shards" }
- }
- },
- {
- $project: {
- "shards": {
- $reduce: {
- input: "$shards",
- initialValue: [],
- in: { "$setUnion": [ "$$this", "$$value"] }
- }
- }
- }
- },
- {
- $lookup:
- {
- from: "shards",
- localField: "shards",
- foreignField: "_id",
- as: "activeShards"
- }
- },
- { "$set" : { "numActiveShards" : { "$size" : "$activeShards" } } },
- { "$project": { "activeShards" : 0 } }
- ]
- */
-
- auto expCtx = make_intrusive<ExpressionContext>(
- opCtx, nullptr /*collator*/, NamespaceString::kConfigsvrPlacementHistoryNamespace);
- StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
- resolvedNamespaces[NamespaceString::kConfigsvrShardsNamespace.coll()] = {
- NamespaceString::kConfigsvrShardsNamespace, std::vector<BSONObj>() /* pipeline */};
- resolvedNamespaces[NamespaceString::kConfigsvrPlacementHistoryNamespace.coll()] = {
- NamespaceString::kConfigsvrPlacementHistoryNamespace,
- std::vector<BSONObj>() /* pipeline */};
- expCtx->setResolvedNamespaces(resolvedNamespaces);
-
- // 1. Get all the history entries prior to the requested time concerning either the collection
- // or the parent database.
-
-
- auto matchStage = [&]() {
- bool isClusterSearch = !nss.has_value();
- if (isClusterSearch)
- return DocumentSourceMatch::create(BSON("timestamp" << BSON("$lte" << minClusterTime)),
- expCtx);
-
- bool isCollectionSearch = !nss->db().empty() && !nss->coll().empty();
- auto collMatchExpression = isCollectionSearch ? pcre_util::quoteMeta(nss->coll()) : ".*";
- auto regexString =
- "^" + pcre_util::quoteMeta(nss->db()) + "(\\." + collMatchExpression + ")?$";
- return DocumentSourceMatch::create(BSON("timestamp" << BSON("$lte" << minClusterTime)
- << "nss"
- << BSON("$regex" << regexString)),
- expCtx);
- }();
-
- // 2 & 3. Sort by timestamp and extract the first document for collection and database
- auto sortStage = DocumentSourceSort::create(expCtx, BSON("timestamp" << -1));
- auto groupStageBson = BSON("_id"
- << "$nss"
- << "shards"
- << BSON("$first"
- << "$shards"));
- auto groupStage = DocumentSourceGroup::createFromBson(
- Document{{"$group", std::move(groupStageBson)}}.toBson().firstElement(), expCtx);
-
- // Stage 4. Discard the entries with empty shards (i.e. the collection was dropped or renamed)
- auto noShardsFilter =
- DocumentSourceMatch::create(BSON("shards" << BSON("$not" << BSON("$size" << 0))), expCtx);
-
- // Stage 5. Group all documents and concat shards (this will generate an array of arrays)
- auto groupStageBson2 = BSON("_id"
- << ""
- << "shards"
- << BSON("$push"
- << "$shards"));
- auto groupStageConcat = DocumentSourceGroup::createFromBson(
- Document{{"$group", std::move(groupStageBson2)}}.toBson().firstElement(), expCtx);
-
- // Stage 6. Flatten the array of arrays into a set (this will also remove duplicates)
- auto projectStageBson =
- BSON("shards" << BSON("$reduce" << BSON("input"
- << "$shards"
- << "initialValue" << BSONArray() << "in"
- << BSON("$setUnion" << BSON_ARRAY("$$this"
- << "$$value")))));
- auto projectStageFlatten = DocumentSourceProject::createFromBson(
- Document{{"$project", std::move(projectStageBson)}}.toBson().firstElement(), expCtx);
-
-
- // Stage 7. Lookup active shards with left outer join on config.shards
- Document lookupStageDoc = {
- {"from", NamespaceString::kConfigsvrShardsNamespace.coll().toString()},
- {"localField", StringData("shards")},
- {"foreignField", StringData("_id")},
- {"as", StringData("activeShards")}};
-
- auto lookupStage = DocumentSourceLookUp::createFromBson(
- Document{{"$lookup", std::move(lookupStageDoc)}}.toBson().firstElement(), expCtx);
-
- // Stage 8. Count number of active shards
- auto setStageDoc = Document(
- {{"$set", Document{{"numActiveShards", Document{{"$size", "$activeShards"_sd}}}}}});
- auto setStage =
- DocumentSourceAddFields::createFromBson(setStageDoc.toBson().firstElement(), expCtx);
-
- // Stage 9. Disable activeShards field to avoid sending it to the client
- auto projectStageDoc = Document{{"activeShards", 0}};
- auto projectStage = DocumentSourceProject::create(
- projectStageDoc.toBson(), expCtx, "getShardsThatOwnDataForNamespaceAtClusterTime");
-
- // Create pipeline
- Pipeline::SourceContainer stages;
- stages.emplace_back(std::move(matchStage));
- stages.emplace_back(std::move(sortStage));
- stages.emplace_back(std::move(groupStage));
- stages.emplace_back(std::move(noShardsFilter));
- stages.emplace_back(std::move(groupStageConcat));
- stages.emplace_back(std::move(projectStageFlatten));
- stages.emplace_back(std::move(lookupStage));
- stages.emplace_back(std::move(setStage));
- stages.emplace_back(std::move(projectStage));
-
- const auto pipeline = Pipeline::create(stages, expCtx);
- auto aggRequest = AggregateCommandRequest(NamespaceString::kConfigsvrPlacementHistoryNamespace,
- pipeline->serializeToBson());
-
-
- // Run the aggregation
- const auto readConcern = [&]() -> repl::ReadConcernArgs {
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer &&
- !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) {
- // When the feature flag is on, the config server may read from a secondary which may
- // need to wait for replication, so we should use afterClusterTime.
- return {repl::ReadConcernLevel::kMajorityReadConcern};
- } else {
- const auto time = VectorClock::get(opCtx)->getTime();
- return {time.configTime(), repl::ReadConcernLevel::kMajorityReadConcern};
- }
- }();
-
- auto aggrResult = runCatalogAggregation(
- opCtx, configShard, aggRequest, readConcern, Shard::kDefaultConfigCommandTimeout);
-
- // Parse the result
- std::vector<ShardId> activeShards;
- if (!aggrResult.empty()) {
- invariant(aggrResult.size() == 1);
-
- // Extract the result
- const auto& doc = aggrResult.front();
- auto numActiveShards = doc.getField("numActiveShards").Int();
- // Use Obj() instead of Array() to avoid instantiating a temporary std::vector.
- const auto& shards = doc.getField("shards").Obj();
-
- uassert(ErrorCodes::SnapshotTooOld,
- "Part of the history may no longer be retrieved because of one or more removed "
- "shards.",
- numActiveShards == static_cast<int>(shards.nFields()));
-
- for (const auto& shard : shards) {
- activeShards.push_back(shard.String());
- }
- }
- return activeShards;
-}
-
} // namespace
ShardingCatalogClientImpl::ShardingCatalogClientImpl(std::shared_ptr<Shard> overrideConfigShard)
@@ -704,6 +495,24 @@ StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::_fetchData
}
}
+std::vector<ShardId> ShardingCatalogClientImpl::_fetchPlacementMetadata(
+ OperationContext* opCtx, ConfigsvrGetHistoricalPlacement&& request) {
+ auto remoteResponse = uassertStatusOK(_getConfigShard(opCtx)->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ NamespaceString::kAdminDb.toString(),
+ request.toBSON(BSONObj()),
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kIdempotentOrCursorInvalidated));
+
+ uassertStatusOK(remoteResponse.commandStatus);
+
+ auto placementDetails = ConfigsvrGetHistoricalPlacementResponse::parse(
+ IDLParserContext("ShardingCatalogClient"), remoteResponse.response);
+
+ return placementDetails.getShards();
+}
+
CollectionType ShardingCatalogClientImpl::getCollection(OperationContext* opCtx,
const NamespaceString& nss,
repl::ReadConcernLevel readConcernLevel) {
@@ -1471,8 +1280,11 @@ std::vector<ShardId> ShardingCatalogClientImpl::getShardsThatOwnDataForCollAtClu
"A full collection namespace must be specified",
!collName.coll().empty());
- return makeAndRunPlacementHistoryAggregation(
- opCtx, _getConfigShard(opCtx), clusterTime, collName);
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ return getHistoricalPlacement(opCtx, clusterTime, collName);
+ }
+
+ return _fetchPlacementMetadata(opCtx, ConfigsvrGetHistoricalPlacement(collName, clusterTime));
}
@@ -1483,15 +1295,235 @@ std::vector<ShardId> ShardingCatalogClientImpl::getShardsThatOwnDataForDbAtClust
"A full db namespace must be specified",
dbName.coll().empty() && !dbName.db().empty());
- return makeAndRunPlacementHistoryAggregation(
- opCtx, _getConfigShard(opCtx), clusterTime, dbName);
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ return getHistoricalPlacement(opCtx, clusterTime, dbName);
+ }
+
+ return _fetchPlacementMetadata(opCtx, ConfigsvrGetHistoricalPlacement(dbName, clusterTime));
}
std::vector<ShardId> ShardingCatalogClientImpl::getShardsThatOwnDataAtClusterTime(
OperationContext* opCtx, const Timestamp& clusterTime) {
- return makeAndRunPlacementHistoryAggregation(
- opCtx, _getConfigShard(opCtx), clusterTime, boost::none);
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ return getHistoricalPlacement(opCtx, clusterTime, boost::none);
+ }
+
+ ConfigsvrGetHistoricalPlacement request(NamespaceString(), clusterTime);
+ request.setTargetWholeCluster(true);
+ return _fetchPlacementMetadata(opCtx, std::move(request));
+}
+
+std::vector<ShardId> ShardingCatalogClientImpl::getHistoricalPlacement(
+ OperationContext* opCtx,
+ const Timestamp& atClusterTime,
+ const boost::optional<NamespaceString>& nss) {
+
+ // TODO (SERVER-73029): Remove the invariant
+ invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
+ auto configShard = _getConfigShard(opCtx);
+ /*
+ Stage 1. Select only the entry with timestamp <= clusterTime and filter out all nss that are
+ not the collection or the database
+ Stage 2. sort by timestamp
+ Stage 3. Extract the first document for each database and collection matching the received
+ namespace
+ Stage 4. Discard the entries with empty shards (i.e. the collection was dropped or
+ renamed)
+ Stage 5. Group all documents and concat shards (this will generate an array of arrays)
+ Stage 6. Flatten the array of arrays into a set (this will also remove duplicates)
+
+ Stage 7. Access to the list of shards currently active in the cluster
+ Stage 8. Count the number of shards obtained on stage 6 that also appear in the list of
+ active shards
+ Stage 9. Do not return the list of active shards (used only for the count)
+
+ regex=^db(\.collection)?$ // matches db or db.collection ( this is skipped in case the whole
+ cluster info is searched)
+ [
+ {
+ "$match": { "timestamp": { "$lte": clusterTime } , "nss" : { $regex: regex} }
+ },
+ {
+ "$sort": { "timestamp": -1 }
+ },
+ {
+ "$group": {
+ _id: "$nss",
+ shards: { $first: "$shards" }
+ }
+ },
+ { "$match": { shards: { $not: { $size: 0 } } } },
+ {
+ "$group": {
+ _id: "",
+ shards: { $push: "$shards" }
+ }
+ },
+ {
+ $project: {
+ "shards": {
+ $reduce: {
+ input: "$shards",
+ initialValue: [],
+ in: { "$setUnion": [ "$$this", "$$value"] }
+ }
+ }
+ }
+ },
+ {
+ $lookup:
+ {
+ from: "shards",
+ localField: "shards",
+ foreignField: "_id",
+ as: "activeShards"
+ }
+ },
+ { "$set" : { "numActiveShards" : { "$size" : "$activeShards" } } },
+ { "$project": { "activeShards" : 0 } }
+ ]
+ */
+
+ auto expCtx = make_intrusive<ExpressionContext>(
+ opCtx, nullptr /*collator*/, NamespaceString::kConfigsvrPlacementHistoryNamespace);
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ resolvedNamespaces[NamespaceString::kConfigsvrShardsNamespace.coll()] = {
+ NamespaceString::kConfigsvrShardsNamespace, std::vector<BSONObj>() /* pipeline */};
+ resolvedNamespaces[NamespaceString::kConfigsvrPlacementHistoryNamespace.coll()] = {
+ NamespaceString::kConfigsvrPlacementHistoryNamespace,
+ std::vector<BSONObj>() /* pipeline */};
+ expCtx->setResolvedNamespaces(resolvedNamespaces);
+
+ // 1. Get all the history entries prior to the requested time concerning either the collection
+ // or the parent database.
+
+
+ auto matchStage = [&]() {
+ bool isClusterSearch = !nss.has_value();
+ if (isClusterSearch)
+ return DocumentSourceMatch::create(BSON("timestamp" << BSON("$lte" << atClusterTime)),
+ expCtx);
+
+ bool isCollectionSearch = !nss->db().empty() && !nss->coll().empty();
+ auto collMatchExpression = isCollectionSearch ? pcre_util::quoteMeta(nss->coll()) : ".*";
+ auto regexString =
+ "^" + pcre_util::quoteMeta(nss->db()) + "(\\." + collMatchExpression + ")?$";
+ return DocumentSourceMatch::create(BSON("timestamp" << BSON("$lte" << atClusterTime)
+ << "nss"
+ << BSON("$regex" << regexString)),
+ expCtx);
+ }();
+
+ // 2 & 3. Sort by timestamp and extract the first document for collection and database
+ auto sortStage = DocumentSourceSort::create(expCtx, BSON("timestamp" << -1));
+ auto groupStageBson = BSON("_id"
+ << "$nss"
+ << "shards"
+ << BSON("$first"
+ << "$shards"));
+ auto groupStage = DocumentSourceGroup::createFromBson(
+ Document{{"$group", std::move(groupStageBson)}}.toBson().firstElement(), expCtx);
+
+ // Stage 4. Discard the entries with empty shards (i.e. the collection was dropped or renamed)
+ auto noShardsFilter =
+ DocumentSourceMatch::create(BSON("shards" << BSON("$not" << BSON("$size" << 0))), expCtx);
+
+ // Stage 5. Group all documents and concat shards (this will generate an array of arrays)
+ auto groupStageBson2 = BSON("_id"
+ << ""
+ << "shards"
+ << BSON("$push"
+ << "$shards"));
+ auto groupStageConcat = DocumentSourceGroup::createFromBson(
+ Document{{"$group", std::move(groupStageBson2)}}.toBson().firstElement(), expCtx);
+
+ // Stage 6. Flatten the array of arrays into a set (this will also remove duplicates)
+ auto projectStageBson =
+ BSON("shards" << BSON("$reduce" << BSON("input"
+ << "$shards"
+ << "initialValue" << BSONArray() << "in"
+ << BSON("$setUnion" << BSON_ARRAY("$$this"
+ << "$$value")))));
+ auto projectStageFlatten = DocumentSourceProject::createFromBson(
+ Document{{"$project", std::move(projectStageBson)}}.toBson().firstElement(), expCtx);
+
+
+ // Stage 7. Lookup active shards with left outer join on config.shards
+ Document lookupStageDoc = {
+ {"from", NamespaceString::kConfigsvrShardsNamespace.coll().toString()},
+ {"localField", StringData("shards")},
+ {"foreignField", StringData("_id")},
+ {"as", StringData("activeShards")}};
+
+ auto lookupStage = DocumentSourceLookUp::createFromBson(
+ Document{{"$lookup", std::move(lookupStageDoc)}}.toBson().firstElement(), expCtx);
+
+ // Stage 8. Count number of active shards
+ auto setStageDoc = Document(
+ {{"$set", Document{{"numActiveShards", Document{{"$size", "$activeShards"_sd}}}}}});
+ auto setStage =
+ DocumentSourceAddFields::createFromBson(setStageDoc.toBson().firstElement(), expCtx);
+
+ // Stage 9. Disable activeShards field to avoid sending it to the client
+ auto projectStageDoc = Document{{"activeShards", 0}};
+ auto projectStage = DocumentSourceProject::create(
+ projectStageDoc.toBson(), expCtx, "getShardsThatOwnDataForNamespaceAtClusterTime");
+
+ // Create pipeline
+ Pipeline::SourceContainer stages;
+ stages.emplace_back(std::move(matchStage));
+ stages.emplace_back(std::move(sortStage));
+ stages.emplace_back(std::move(groupStage));
+ stages.emplace_back(std::move(noShardsFilter));
+ stages.emplace_back(std::move(groupStageConcat));
+ stages.emplace_back(std::move(projectStageFlatten));
+ stages.emplace_back(std::move(lookupStage));
+ stages.emplace_back(std::move(setStage));
+ stages.emplace_back(std::move(projectStage));
+
+ const auto pipeline = Pipeline::create(stages, expCtx);
+ auto aggRequest = AggregateCommandRequest(NamespaceString::kConfigsvrPlacementHistoryNamespace,
+ pipeline->serializeToBson());
+
+
+ // Run the aggregation
+ const auto readConcern = [&]() -> repl::ReadConcernArgs {
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer &&
+ !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) {
+ // When the feature flag is on, the config server may read from a secondary which may
+ // need to wait for replication, so we should use afterClusterTime.
+ return {repl::ReadConcernLevel::kMajorityReadConcern};
+ } else {
+ const auto time = VectorClock::get(opCtx)->getTime();
+ return {time.configTime(), repl::ReadConcernLevel::kMajorityReadConcern};
+ }
+ }();
+
+ auto aggrResult = runCatalogAggregation(
+ opCtx, configShard, aggRequest, readConcern, Shard::kDefaultConfigCommandTimeout);
+
+ // Parse the result
+ std::vector<ShardId> activeShards;
+ if (!aggrResult.empty()) {
+ invariant(aggrResult.size() == 1);
+
+ // Extract the result
+ const auto& doc = aggrResult.front();
+ auto numActiveShards = doc.getField("numActiveShards").Int();
+ // Use Obj() instead of Array() to avoid instantiating a temporary std::vector.
+ const auto& shards = doc.getField("shards").Obj();
+
+ uassert(ErrorCodes::SnapshotTooOld,
+ "Part of the history may no longer be retrieved because of one or more removed "
+ "shards.",
+ numActiveShards == static_cast<int>(shards.nFields()));
+
+ for (const auto& shard : shards) {
+ activeShards.push_back(shard.String());
+ }
+ }
+ return activeShards;
}
std::shared_ptr<Shard> ShardingCatalogClientImpl::_getConfigShard(OperationContext* opCtx) {
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.h b/src/mongo/s/catalog/sharding_catalog_client_impl.h
index 49006332c4c..6bd6f16bcc3 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.h
@@ -34,6 +34,7 @@
#include "mongo/platform/mutex.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/request_types/get_historical_placement_info_gen.h"
namespace mongo {
@@ -204,6 +205,11 @@ public:
std::vector<ShardId> getShardsThatOwnDataAtClusterTime(OperationContext* opCtx,
const Timestamp& clusterTime) override;
+ std::vector<ShardId> getHistoricalPlacement(
+ OperationContext* opCtx,
+ const Timestamp& atClusterTime,
+ const boost::optional<NamespaceString>& nss) override;
+
private:
/**
* Updates a single document (if useMultiUpdate is false) or multiple documents (if
@@ -247,6 +253,15 @@ private:
repl::ReadConcernLevel readConcernLevel);
/**
+ * Queries the config server to retrieve placement data based on the Request object.
+ * TODO (SERVER-73029): Remove the method - and replace its invocations with
+ * runPlacementHistoryAggregation()
+ */
+ std::vector<ShardId> _fetchPlacementMetadata(OperationContext* opCtx,
+ ConfigsvrGetHistoricalPlacement&& request);
+
+
+ /**
* Returns the Shard type that should be used to access the config server. Unless an instance
* was provided at construction, which may be done e.g. to force using local operations, falls
* back to using the config shard from the ShardRegistry.
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
index 47c96b6c81c..60feb3833a3 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
@@ -223,4 +223,11 @@ std::vector<ShardId> ShardingCatalogClientMock::getShardsThatOwnDataAtClusterTim
OperationContext* opCtx, const Timestamp& clusterTime) {
uasserted(ErrorCodes::InternalError, "Method not implemented");
}
+
+std::vector<ShardId> ShardingCatalogClientMock::getHistoricalPlacement(
+ OperationContext* opCtx,
+ const Timestamp& atClusterTime,
+ const boost::optional<NamespaceString>& nss) {
+ uasserted(ErrorCodes::InternalError, "Method not implemented");
+}
} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h
index 888d00da2c5..30cf2ed0826 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h
@@ -162,6 +162,12 @@ public:
std::vector<ShardId> getShardsThatOwnDataAtClusterTime(OperationContext* opCtx,
const Timestamp& clusterTime) override;
+ std::vector<ShardId> getHistoricalPlacement(
+ OperationContext* opCtx,
+ const Timestamp& atClusterTime,
+ const boost::optional<NamespaceString>& nss) override;
+
+
private:
StatusWith<repl::OpTimeWith<std::vector<BSONObj>>> _exhaustiveFindOnConfig(
OperationContext* opCtx,
diff --git a/src/mongo/s/request_types/get_historical_placement_info.idl b/src/mongo/s/request_types/get_historical_placement_info.idl
new file mode 100644
index 00000000000..aca7f49a4cf
--- /dev/null
+++ b/src/mongo/s/request_types/get_historical_placement_info.idl
@@ -0,0 +1,77 @@
+# Copyright (C) 2023-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+# TODO (SERVER-73029): remove this file and the class implementing _configsvrGetHistoricalPlacement
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/db/basic_types.idl"
+ - "mongo/s/sharding_types.idl"
+
+structs:
+
+ ConfigsvrGetHistoricalPlacementResponse:
+ description: "Response for the _configsvrGetHistoricalPlacement command"
+ strict: false
+ is_command_reply: true
+ fields:
+ shards:
+ type: array<shard_id>
+ description: "The set of shard IDs containing data on the requested nss/cluster at
+ the point in time"
+ isExact:
+ type: bool
+ description: "When true, the returned list of shards is an accurate recording of
+ the placement info at the requested point in time.
+ When false, the result value represents an approximation based on
+ a present/past reading of config.shards"
+ default: true
+
+
+commands:
+
+ _configsvrGetHistoricalPlacement:
+ command_name: _configsvrGetHistoricalPlacement
+ cpp_name: ConfigsvrGetHistoricalPlacement
+ description: "Internal command to retrieve the list of shard IDs hosting data for the
+ namespace/cluster being targeted at a specific point in time"
+ namespace: type
+ api_version: ""
+ type: namespacestring
+ strict: false
+ fields:
+ at:
+ type: timestamp
+ description: "The requested point in time"
+ optional: false
+ targetWholeCluster:
+ type: bool
+ description: "When true, the command retrieves placement information concerning
+ the whole cluster (ignoring the namespace parameter)"
+ default: false