diff options
author | Allison Easton <allison.easton@mongodb.com> | 2022-09-07 14:08:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-07 15:24:10 +0000 |
commit | 72ac86380b3d9e50a2011ae48f2e169eb2b148a1 (patch) | |
tree | de9e22826282bd4bf6adb5918d8c878b805ba695 | |
parent | 16b22af0483333fba942c835cdc4ab5ae3dfcf23 (diff) | |
download | mongo-72ac86380b3d9e50a2011ae48f2e169eb2b148a1.tar.gz |
SERVER-69240 Add new aggregation to the CatalogClient to fetch the global indexes for a refresh
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_catalog_client_aggregations_test.cpp | 138 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client.h | 11 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_impl.cpp | 226 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_impl.h | 5 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_mock.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_mock.h | 5 |
7 files changed, 351 insertions, 42 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index e53f0fe51bc..22f3b6d50c9 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -739,6 +739,7 @@ env.CppUnitTest( 'resharding/resharding_coordinator_test.cpp', 'resharding/resharding_coordinator_service_test.cpp', 'resharding/resharding_util_test.cpp', + 'sharding_catalog_client_aggregations_test.cpp', 'sharding_ddl_util_test.cpp', 'sharding_util_refresh_test.cpp', 'topology_time_ticker_test.cpp', diff --git a/src/mongo/db/s/sharding_catalog_client_aggregations_test.cpp b/src/mongo/db/s/sharding_catalog_client_aggregations_test.cpp new file mode 100644 index 00000000000..6c70544b6b2 --- /dev/null +++ b/src/mongo/db/s/sharding_catalog_client_aggregations_test.cpp @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/config/config_server_test_fixture.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_shard.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +namespace mongo { +namespace { + +// These tests are for the aggregations in the CatalogClient. They are here because the unittests in +// sharding_catalog_client_test.cpp are part of the s_test which does not have storage. + +using CatalogClientAggregationsTest = ConfigServerTestFixture; + +TEST_F(CatalogClientAggregationsTest, TestCollectionAndIndexesAggregationWithNoIndexes) { + const NamespaceString nss{"TestDB.TestColl"}; + const ChunkVersion placementVersion{{OID::gen(), Timestamp(1, 0)}, {1, 0}}; + const std::string shardName = "shard01"; + const UUID uuid{UUID::gen()}; + const KeyPattern shardKey{BSON("_id" << 1)}; + setupShards({ShardType(shardName, "host01")}); + setupCollection(nss, + shardKey, + {ChunkType(uuid, + ChunkRange(BSONObjBuilder().appendMinKey("_id").obj(), + BSONObjBuilder().appendMaxKey("_id").obj()), + placementVersion, + ShardId("shard01"))}); + auto [collection, indexes] = catalogClient()->getCollectionAndGlobalIndexes( + operationContext(), nss, {repl::ReadConcernLevel::kSnapshotReadConcern}); + + ASSERT_EQ(indexes.size(), 0); + ASSERT_EQ(collection.getEpoch(), placementVersion.epoch()); + ASSERT_EQ(collection.getTimestamp(), placementVersion.getTimestamp()); + ASSERT_EQ(collection.getUuid(), uuid); +} + +TEST_F(CatalogClientAggregationsTest, TestCollectionAndIndexesWithIndexes) { + const NamespaceString nss{"TestDB.TestColl"}; + const ChunkVersion placementVersion{{OID::gen(), Timestamp(1, 0)}, {1, 0}}; + const std::string shardName = "shard01"; + const UUID uuid{UUID::gen()}; + const KeyPattern shardKey{BSON("_id" << 1)}; + setupShards({ShardType(shardName, "host01")}); + setupCollection(nss, + shardKey, + {ChunkType(uuid, + ChunkRange(BSONObjBuilder().appendMinKey("_id").obj(), + BSONObjBuilder().appendMaxKey("_id").obj()), + placementVersion, + ShardId("shard01"))}); + IndexCatalogType index1{"x_1", shardKey.toBSON(), {}, Timestamp(3, 0), uuid}; + uassertStatusOK(insertToConfigCollection( + operationContext(), NamespaceString::kConfigsvrIndexCatalogNamespace, index1.toBSON())); + IndexCatalogType index2{"y_1", shardKey.toBSON(), {}, Timestamp(4, 0), uuid}; + uassertStatusOK(insertToConfigCollection( + operationContext(), NamespaceString::kConfigsvrIndexCatalogNamespace, index2.toBSON())); + + auto [collection, indexes] = catalogClient()->getCollectionAndGlobalIndexes( + operationContext(), nss, {repl::ReadConcernLevel::kSnapshotReadConcern}); + + ASSERT_EQ(indexes.size(), 2); + ASSERT_EQ(collection.getEpoch(), placementVersion.epoch()); + ASSERT_EQ(collection.getTimestamp(), placementVersion.getTimestamp()); + ASSERT_EQ(collection.getUuid(), uuid); +} + +TEST_F(CatalogClientAggregationsTest, TestCollectionAndIndexesWithMultipleCollections) { + const NamespaceString nssColl1{"TestDB.Collection1"}; + const NamespaceString nssColl2{"TestDB.Collection2"}; + const ChunkVersion placementVersion{{OID::gen(), Timestamp(1, 0)}, {1, 0}}; + const std::string shardName = "shard01"; + const UUID uuidColl1{UUID::gen()}; + const UUID uuidColl2{UUID::gen()}; + const KeyPattern shardKey{BSON("_id" << 1)}; + setupShards({ShardType(shardName, "host01")}); + setupCollection(nssColl1, + shardKey, + {ChunkType(uuidColl1, + ChunkRange(BSONObjBuilder().appendMinKey("_id").obj(), + BSONObjBuilder().appendMaxKey("_id").obj()), + placementVersion, + ShardId("shard01"))}); + setupCollection(nssColl2, + shardKey, + {ChunkType(uuidColl2, + ChunkRange(BSONObjBuilder().appendMinKey("_id").obj(), + BSONObjBuilder().appendMaxKey("_id").obj()), + placementVersion, + ShardId("shard01"))}); + IndexCatalogType index1{"x_1", shardKey.toBSON(), {}, Timestamp(3, 0), uuidColl1}; + uassertStatusOK(insertToConfigCollection( + operationContext(), NamespaceString::kConfigsvrIndexCatalogNamespace, index1.toBSON())); + IndexCatalogType index2{"y_1", shardKey.toBSON(), {}, Timestamp(4, 0), uuidColl2}; + uassertStatusOK(insertToConfigCollection( + operationContext(), NamespaceString::kConfigsvrIndexCatalogNamespace, index2.toBSON())); + + auto [collection, indexes] = catalogClient()->getCollectionAndGlobalIndexes( + operationContext(), nssColl1, {repl::ReadConcernLevel::kSnapshotReadConcern}); + + ASSERT_EQ(indexes.size(), 1); + ASSERT_EQ(collection.getEpoch(), placementVersion.epoch()); + ASSERT_EQ(collection.getTimestamp(), placementVersion.getTimestamp()); + ASSERT_EQ(collection.getUuid(), uuidColl1); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index 31c7f695c40..b969d2775bb 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -38,8 +38,10 @@ #include "mongo/db/repl/optime_with.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/write_concern_options.h" +#include "mongo/s/catalog/type_index_catalog_gen.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/index_version.h" namespace mongo { @@ -193,6 +195,15 @@ public: const repl::ReadConcernArgs& readConcern) = 0; /** + * Retrieves the collection metadata and its global index metadata. This function will return + * all of the global idexes for a collection. + */ + virtual std::pair<CollectionType, std::vector<IndexCatalogType>> getCollectionAndGlobalIndexes( + OperationContext* opCtx, + const NamespaceString& nss, + const repl::ReadConcernArgs& readConcern) = 0; + + /** * Retrieves all zones defined for the specified collection. The returned vector is sorted based * on the min key of the zones. * diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 8727da6767a..c0e23d17269 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -294,6 +294,156 @@ AggregateCommandRequest makeCollectionAndChunksAggregation(OperationContext* opC return AggregateCommandRequest(CollectionType::ConfigNS, std::move(serializedPipeline)); } +AggregateCommandRequest makeCollectionAndIndexesAggregation(OperationContext* opCtx, + const NamespaceString& nss) { + auto expCtx = make_intrusive<ExpressionContext>(opCtx, nullptr, CollectionType::ConfigNS); + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + resolvedNamespaces[CollectionType::ConfigNS.coll()] = {CollectionType::ConfigNS, + std::vector<BSONObj>()}; + resolvedNamespaces[NamespaceString::kConfigsvrIndexCatalogNamespace.coll()] = { + NamespaceString::kConfigsvrIndexCatalogNamespace, std::vector<BSONObj>()}; + expCtx->setResolvedNamespaces(resolvedNamespaces); + + using Doc = Document; + using Arr = std::vector<Value>; + + Pipeline::SourceContainer stages; + + // 1. Match config.collections entries with {_id: nss}. This stage will produce, at most, one + // config.collections document. + // { + // $match: { + // _id: <nss> + // } + // } + stages.emplace_back(DocumentSourceMatch::create( + Doc{{CollectionType::kNssFieldName, nss.toString()}}.toBson(), expCtx)); + + // 2. Retrieve config.csrs.indexes entries with the same uuid as the one from the + // config.collections document. + // + // The $lookup stages get the config.csrs.indexes. The $lookup is + // immediately followed by $unwind to take advantage of the $lookup + $unwind coalescence + // optimization which avoids creating large intermediate documents. + // + // This $unionWith stage will produce one result document for each config.csrs.indexes document. + // + // Note that we must not make any assumption on where the document produced by stage 1 will be + // placed in the response in relation with the documents produced by stage 2. The + // config.collections document produced in stage 1 could be interleaved between the + // config.csrs.indexes documents produced by stage 2. + // + // { + // $unionWith: { + // coll: "collections", + // pipeline: [ + // { $match: { _id: <nss> } }, + // { + // $lookup: { + // from: "csrs.indexes", + // as: "indexes", + // let: { local_uuid: "$uuid" }, + // pipeline: [ + // { + // $match: { + // $expr: { + // $eq: ["$collectionUUID", "$$local_uuid"], + // }, + // } + // }, + // ] + // } + // }, + // { + // $unwind: { + // path: "$indexes" + // } + // }, + // { + // $project: { _id: false, indexes: true } + // }, + // ] + // } + // } + const auto letExpr = Doc{{"local_uuid", "$" + CollectionType::kUuidFieldName}}; + + const auto uuidExpr = + Arr{Value{"$" + IndexCatalogType::kCollectionUUIDFieldName}, Value{"$$local_uuid"_sd}}; + + constexpr auto indexesLookupOutputFieldName = "indexes"_sd; + + const Doc lookupPipeline{ + {"from", NamespaceString::kConfigsvrIndexCatalogNamespace.coll()}, + {"as", indexesLookupOutputFieldName}, + {"let", letExpr}, + {"pipeline", Arr{Value{Doc{{"$match", Doc{{"$expr", Doc{{"$eq", uuidExpr}}}}}}}}}}; + + Doc unionWithPipeline{ + {"coll", CollectionType::ConfigNS.coll()}, + {"pipeline", + Arr{Value{Doc{{"$match", Doc{{CollectionType::kNssFieldName, nss.toString()}}}}}, + Value{Doc{{"$lookup", lookupPipeline}}}, + Value{Doc{{"$unwind", Doc{{"path", "$" + indexesLookupOutputFieldName}}}}}, + Value{Doc{{"$project", Doc{{"_id", false}, {indexesLookupOutputFieldName, true}}}}}}}}; + + stages.emplace_back(DocumentSourceUnionWith::createFromBson( + Doc{{"$unionWith", unionWithPipeline}}.toBson().firstElement(), expCtx)); + + auto pipeline = Pipeline::create(std::move(stages), expCtx); + auto serializedPipeline = pipeline->serializeToBson(); + return AggregateCommandRequest(CollectionType::ConfigNS, std::move(serializedPipeline)); +} + +std::vector<BSONObj> runCatalogAggregation(OperationContext* opCtx, + AggregateCommandRequest& aggRequest, + const repl::ReadConcernArgs& readConcern, + const Milliseconds& maxTimeout) { + aggRequest.setReadConcern(readConcern.toBSONInner()); + aggRequest.setWriteConcern(WriteConcernOptions()); + + const auto readPref = [&]() -> ReadPreferenceSetting { + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return {}; + } + + const auto vcTime = VectorClock::get(opCtx)->getTime(); + ReadPreferenceSetting readPref{kConfigReadSelector}; + readPref.minClusterTime = vcTime.configTime().asTimestamp(); + return readPref; + }(); + + aggRequest.setUnwrappedReadPref(readPref.toContainingBSON()); + + if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) { + const Milliseconds maxTimeMS = std::min(opCtx->getRemainingMaxTimeMillis(), maxTimeout); + aggRequest.setMaxTimeMS(durationCount<Milliseconds>(maxTimeMS)); + } + + // Run the aggregation + std::vector<BSONObj> aggResult; + auto callback = [&aggResult](const std::vector<BSONObj>& batch, + const boost::optional<BSONObj>& postBatchResumeToken) { + aggResult.insert(aggResult.end(), + std::make_move_iterator(batch.begin()), + std::make_move_iterator(batch.end())); + return true; + }; + + const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + for (int retry = 1; retry <= kMaxWriteRetry; retry++) { + const Status status = configShard->runAggregation(opCtx, aggRequest, callback); + if (retry < kMaxWriteRetry && + configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent)) { + aggResult.clear(); + continue; + } + uassertStatusOK(status); + break; + } + + return aggResult; +} + } // namespace ShardingCatalogClientImpl::ShardingCatalogClientImpl() = default; @@ -624,49 +774,9 @@ std::pair<CollectionType, std::vector<ChunkType>> ShardingCatalogClientImpl::get const ChunkVersion& sinceVersion, const repl::ReadConcernArgs& readConcern) { auto aggRequest = makeCollectionAndChunksAggregation(opCtx, nss, sinceVersion); - aggRequest.setReadConcern(readConcern.toBSONInner()); - aggRequest.setWriteConcern(WriteConcernOptions()); - const auto readPref = [&]() -> ReadPreferenceSetting { - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - return {}; - } - - const auto vcTime = VectorClock::get(opCtx)->getTime(); - ReadPreferenceSetting readPref{kConfigReadSelector}; - readPref.minClusterTime = vcTime.configTime().asTimestamp(); - return readPref; - }(); - - aggRequest.setUnwrappedReadPref(readPref.toContainingBSON()); - - if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) { - const Milliseconds maxTimeMS = std::min(opCtx->getRemainingMaxTimeMillis(), - Milliseconds(gFindChunksOnConfigTimeoutMS.load())); - aggRequest.setMaxTimeMS(durationCount<Milliseconds>(maxTimeMS)); - } - - // Run the aggregation - std::vector<BSONObj> aggResult; - auto callback = [&aggResult](const std::vector<BSONObj>& batch, - const boost::optional<BSONObj>& postBatchResumeToken) { - aggResult.insert(aggResult.end(), - std::make_move_iterator(batch.begin()), - std::make_move_iterator(batch.end())); - return true; - }; - - const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - for (int retry = 1; retry <= kMaxWriteRetry; retry++) { - const Status status = configShard->runAggregation(opCtx, aggRequest, callback); - if (retry < kMaxWriteRetry && - configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent)) { - aggResult.clear(); - continue; - } - uassertStatusOK(status); - break; - } + std::vector<BSONObj> aggResult = runCatalogAggregation( + opCtx, aggRequest, readConcern, Milliseconds(gFindChunksOnConfigTimeoutMS.load())); uassert(ErrorCodes::NamespaceNotFound, str::stream() << "Collection " << nss.ns() << " not found", @@ -716,6 +826,38 @@ std::pair<CollectionType, std::vector<ChunkType>> ShardingCatalogClientImpl::get return {std::move(*coll), std::move(chunks)}; }; +std::pair<CollectionType, std::vector<IndexCatalogType>> +ShardingCatalogClientImpl::getCollectionAndGlobalIndexes(OperationContext* opCtx, + const NamespaceString& nss, + const repl::ReadConcernArgs& readConcern) { + auto aggRequest = makeCollectionAndIndexesAggregation(opCtx, nss); + + std::vector<BSONObj> aggResult = + runCatalogAggregation(opCtx, aggRequest, readConcern, Shard::kDefaultConfigCommandTimeout); + + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Collection " << nss.ns() << " not found", + !aggResult.empty()); + + boost::optional<CollectionType> coll; + std::vector<IndexCatalogType> indexes; + indexes.reserve(aggResult.size() - 1); + { + for (const auto& elem : aggResult) { + const auto indexElem = elem.getField("indexes"); + if (!indexElem) { + coll.emplace(elem); + } else { + indexes.emplace_back( + IndexCatalogType::parse(IDLParserContext("IndexCatalogType"), indexElem.Obj())); + } + } + uassert(6924000, "'collections' document not found in aggregation response", coll); + } + + return {std::move(*coll), std::move(indexes)}; +} + StatusWith<std::vector<TagsType>> ShardingCatalogClientImpl::getTagsForCollection( OperationContext* opCtx, const NamespaceString& nss) { auto findStatus = _exhaustiveFindOnConfig(opCtx, diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.h b/src/mongo/s/catalog/sharding_catalog_client_impl.h index 0e2207678fe..1a818e27efc 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.h @@ -107,6 +107,11 @@ public: const ChunkVersion& sinceVersion, const repl::ReadConcernArgs& readConcern) override; + std::pair<CollectionType, std::vector<IndexCatalogType>> getCollectionAndGlobalIndexes( + OperationContext* opCtx, + const NamespaceString& nss, + const repl::ReadConcernArgs& readConcern) override; + StatusWith<std::vector<TagsType>> getTagsForCollection(OperationContext* opCtx, const NamespaceString& nss) override; diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp index 132c239da63..618c32ff619 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp @@ -104,6 +104,13 @@ std::pair<CollectionType, std::vector<ChunkType>> ShardingCatalogClientMock::get uasserted(ErrorCodes::InternalError, "Method not implemented"); } +std::pair<CollectionType, std::vector<IndexCatalogType>> +ShardingCatalogClientMock::getCollectionAndGlobalIndexes(OperationContext* opCtx, + const NamespaceString& nss, + const repl::ReadConcernArgs& readConcern) { + uasserted(ErrorCodes::InternalError, "Method not implemented"); +} + StatusWith<std::vector<TagsType>> ShardingCatalogClientMock::getTagsForCollection( OperationContext* opCtx, const NamespaceString& nss) { return {ErrorCodes::InternalError, "Method not implemented"}; diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index 1085dfb80fe..33b69eceef3 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -82,6 +82,11 @@ public: const ChunkVersion& sinceVersion, const repl::ReadConcernArgs& readConcern) override; + std::pair<CollectionType, std::vector<IndexCatalogType>> getCollectionAndGlobalIndexes( + OperationContext* opCtx, + const NamespaceString& nss, + const repl::ReadConcernArgs& readConcern) override; + StatusWith<std::vector<TagsType>> getTagsForCollection(OperationContext* opCtx, const NamespaceString& nss) override; |