summaryrefslogtreecommitdiff
path: root/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
diff options
context:
space:
mode:
authorAllison Easton <allison.easton@mongodb.com>2022-09-07 14:08:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-07 15:24:10 +0000
commit72ac86380b3d9e50a2011ae48f2e169eb2b148a1 (patch)
treede9e22826282bd4bf6adb5918d8c878b805ba695 /src/mongo/s/catalog/sharding_catalog_client_impl.cpp
parent16b22af0483333fba942c835cdc4ab5ae3dfcf23 (diff)
downloadmongo-72ac86380b3d9e50a2011ae48f2e169eb2b148a1.tar.gz
SERVER-69240 Add new aggregation to the CatalogClient to fetch the global indexes for a refresh
Diffstat (limited to 'src/mongo/s/catalog/sharding_catalog_client_impl.cpp')
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.cpp226
1 files changed, 184 insertions, 42 deletions
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
index 8727da6767a..c0e23d17269 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
@@ -294,6 +294,156 @@ AggregateCommandRequest makeCollectionAndChunksAggregation(OperationContext* opC
return AggregateCommandRequest(CollectionType::ConfigNS, std::move(serializedPipeline));
}
+AggregateCommandRequest makeCollectionAndIndexesAggregation(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ auto expCtx = make_intrusive<ExpressionContext>(opCtx, nullptr, CollectionType::ConfigNS);
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ resolvedNamespaces[CollectionType::ConfigNS.coll()] = {CollectionType::ConfigNS,
+ std::vector<BSONObj>()};
+ resolvedNamespaces[NamespaceString::kConfigsvrIndexCatalogNamespace.coll()] = {
+ NamespaceString::kConfigsvrIndexCatalogNamespace, std::vector<BSONObj>()};
+ expCtx->setResolvedNamespaces(resolvedNamespaces);
+
+ using Doc = Document;
+ using Arr = std::vector<Value>;
+
+ Pipeline::SourceContainer stages;
+
+ // 1. Match config.collections entries with {_id: nss}. This stage will produce, at most, one
+ // config.collections document.
+ // {
+ // $match: {
+ // _id: <nss>
+ // }
+ // }
+ stages.emplace_back(DocumentSourceMatch::create(
+ Doc{{CollectionType::kNssFieldName, nss.toString()}}.toBson(), expCtx));
+
+ // 2. Retrieve config.csrs.indexes entries with the same uuid as the one from the
+ // config.collections document.
+ //
+ // The $lookup stages get the config.csrs.indexes. The $lookup is
+ // immediately followed by $unwind to take advantage of the $lookup + $unwind coalescence
+ // optimization which avoids creating large intermediate documents.
+ //
+ // This $unionWith stage will produce one result document for each config.csrs.indexes document.
+ //
+ // Note that we must not make any assumption on where the document produced by stage 1 will be
+ // placed in the response in relation with the documents produced by stage 2. The
+ // config.collections document produced in stage 1 could be interleaved between the
+ // config.csrs.indexes documents produced by stage 2.
+ //
+ // {
+ // $unionWith: {
+ // coll: "collections",
+ // pipeline: [
+ // { $match: { _id: <nss> } },
+ // {
+ // $lookup: {
+ // from: "csrs.indexes",
+ // as: "indexes",
+ // let: { local_uuid: "$uuid" },
+ // pipeline: [
+ // {
+ // $match: {
+ // $expr: {
+ // $eq: ["$collectionUUID", "$$local_uuid"],
+ // },
+ // }
+ // },
+ // ]
+ // }
+ // },
+ // {
+ // $unwind: {
+ // path: "$indexes"
+ // }
+ // },
+ // {
+ // $project: { _id: false, indexes: true }
+ // },
+ // ]
+ // }
+ // }
+ const auto letExpr = Doc{{"local_uuid", "$" + CollectionType::kUuidFieldName}};
+
+ const auto uuidExpr =
+ Arr{Value{"$" + IndexCatalogType::kCollectionUUIDFieldName}, Value{"$$local_uuid"_sd}};
+
+ constexpr auto indexesLookupOutputFieldName = "indexes"_sd;
+
+ const Doc lookupPipeline{
+ {"from", NamespaceString::kConfigsvrIndexCatalogNamespace.coll()},
+ {"as", indexesLookupOutputFieldName},
+ {"let", letExpr},
+ {"pipeline", Arr{Value{Doc{{"$match", Doc{{"$expr", Doc{{"$eq", uuidExpr}}}}}}}}}};
+
+ Doc unionWithPipeline{
+ {"coll", CollectionType::ConfigNS.coll()},
+ {"pipeline",
+ Arr{Value{Doc{{"$match", Doc{{CollectionType::kNssFieldName, nss.toString()}}}}},
+ Value{Doc{{"$lookup", lookupPipeline}}},
+ Value{Doc{{"$unwind", Doc{{"path", "$" + indexesLookupOutputFieldName}}}}},
+ Value{Doc{{"$project", Doc{{"_id", false}, {indexesLookupOutputFieldName, true}}}}}}}};
+
+ stages.emplace_back(DocumentSourceUnionWith::createFromBson(
+ Doc{{"$unionWith", unionWithPipeline}}.toBson().firstElement(), expCtx));
+
+ auto pipeline = Pipeline::create(std::move(stages), expCtx);
+ auto serializedPipeline = pipeline->serializeToBson();
+ return AggregateCommandRequest(CollectionType::ConfigNS, std::move(serializedPipeline));
+}
+
+std::vector<BSONObj> runCatalogAggregation(OperationContext* opCtx,
+ AggregateCommandRequest& aggRequest,
+ const repl::ReadConcernArgs& readConcern,
+ const Milliseconds& maxTimeout) {
+ aggRequest.setReadConcern(readConcern.toBSONInner());
+ aggRequest.setWriteConcern(WriteConcernOptions());
+
+ const auto readPref = [&]() -> ReadPreferenceSetting {
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ return {};
+ }
+
+ const auto vcTime = VectorClock::get(opCtx)->getTime();
+ ReadPreferenceSetting readPref{kConfigReadSelector};
+ readPref.minClusterTime = vcTime.configTime().asTimestamp();
+ return readPref;
+ }();
+
+ aggRequest.setUnwrappedReadPref(readPref.toContainingBSON());
+
+ if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
+ const Milliseconds maxTimeMS = std::min(opCtx->getRemainingMaxTimeMillis(), maxTimeout);
+ aggRequest.setMaxTimeMS(durationCount<Milliseconds>(maxTimeMS));
+ }
+
+ // Run the aggregation
+ std::vector<BSONObj> aggResult;
+ auto callback = [&aggResult](const std::vector<BSONObj>& batch,
+ const boost::optional<BSONObj>& postBatchResumeToken) {
+ aggResult.insert(aggResult.end(),
+ std::make_move_iterator(batch.begin()),
+ std::make_move_iterator(batch.end()));
+ return true;
+ };
+
+ const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ for (int retry = 1; retry <= kMaxWriteRetry; retry++) {
+ const Status status = configShard->runAggregation(opCtx, aggRequest, callback);
+ if (retry < kMaxWriteRetry &&
+ configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent)) {
+ aggResult.clear();
+ continue;
+ }
+ uassertStatusOK(status);
+ break;
+ }
+
+ return aggResult;
+}
+
} // namespace
ShardingCatalogClientImpl::ShardingCatalogClientImpl() = default;
@@ -624,49 +774,9 @@ std::pair<CollectionType, std::vector<ChunkType>> ShardingCatalogClientImpl::get
const ChunkVersion& sinceVersion,
const repl::ReadConcernArgs& readConcern) {
auto aggRequest = makeCollectionAndChunksAggregation(opCtx, nss, sinceVersion);
- aggRequest.setReadConcern(readConcern.toBSONInner());
- aggRequest.setWriteConcern(WriteConcernOptions());
- const auto readPref = [&]() -> ReadPreferenceSetting {
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- return {};
- }
-
- const auto vcTime = VectorClock::get(opCtx)->getTime();
- ReadPreferenceSetting readPref{kConfigReadSelector};
- readPref.minClusterTime = vcTime.configTime().asTimestamp();
- return readPref;
- }();
-
- aggRequest.setUnwrappedReadPref(readPref.toContainingBSON());
-
- if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
- const Milliseconds maxTimeMS = std::min(opCtx->getRemainingMaxTimeMillis(),
- Milliseconds(gFindChunksOnConfigTimeoutMS.load()));
- aggRequest.setMaxTimeMS(durationCount<Milliseconds>(maxTimeMS));
- }
-
- // Run the aggregation
- std::vector<BSONObj> aggResult;
- auto callback = [&aggResult](const std::vector<BSONObj>& batch,
- const boost::optional<BSONObj>& postBatchResumeToken) {
- aggResult.insert(aggResult.end(),
- std::make_move_iterator(batch.begin()),
- std::make_move_iterator(batch.end()));
- return true;
- };
-
- const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
- for (int retry = 1; retry <= kMaxWriteRetry; retry++) {
- const Status status = configShard->runAggregation(opCtx, aggRequest, callback);
- if (retry < kMaxWriteRetry &&
- configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent)) {
- aggResult.clear();
- continue;
- }
- uassertStatusOK(status);
- break;
- }
+ std::vector<BSONObj> aggResult = runCatalogAggregation(
+ opCtx, aggRequest, readConcern, Milliseconds(gFindChunksOnConfigTimeoutMS.load()));
uassert(ErrorCodes::NamespaceNotFound,
str::stream() << "Collection " << nss.ns() << " not found",
@@ -716,6 +826,38 @@ std::pair<CollectionType, std::vector<ChunkType>> ShardingCatalogClientImpl::get
return {std::move(*coll), std::move(chunks)};
};
+std::pair<CollectionType, std::vector<IndexCatalogType>>
+ShardingCatalogClientImpl::getCollectionAndGlobalIndexes(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const repl::ReadConcernArgs& readConcern) {
+ auto aggRequest = makeCollectionAndIndexesAggregation(opCtx, nss);
+
+ std::vector<BSONObj> aggResult =
+ runCatalogAggregation(opCtx, aggRequest, readConcern, Shard::kDefaultConfigCommandTimeout);
+
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Collection " << nss.ns() << " not found",
+ !aggResult.empty());
+
+ boost::optional<CollectionType> coll;
+ std::vector<IndexCatalogType> indexes;
+ indexes.reserve(aggResult.size() - 1);
+ {
+ for (const auto& elem : aggResult) {
+ const auto indexElem = elem.getField("indexes");
+ if (!indexElem) {
+ coll.emplace(elem);
+ } else {
+ indexes.emplace_back(
+ IndexCatalogType::parse(IDLParserContext("IndexCatalogType"), indexElem.Obj()));
+ }
+ }
+ uassert(6924000, "'collections' document not found in aggregation response", coll);
+ }
+
+ return {std::move(*coll), std::move(indexes)};
+}
+
StatusWith<std::vector<TagsType>> ShardingCatalogClientImpl::getTagsForCollection(
OperationContext* opCtx, const NamespaceString& nss) {
auto findStatus = _exhaustiveFindOnConfig(opCtx,