From dc009c5a8d484f6a0db2f357274e14e30ea9f476 Mon Sep 17 00:00:00 2001 From: Jordi Serra Torrens Date: Wed, 24 Feb 2021 08:53:55 +0000 Subject: SERVER-54874: Ensure reading consistent config.collections and config.chunks when refreshing the CatalogCache --- src/mongo/s/catalog/sharding_catalog_client.h | 11 + .../s/catalog/sharding_catalog_client_impl.cpp | 336 +++++++++++++++++++ src/mongo/s/catalog/sharding_catalog_client_impl.h | 6 + .../s/catalog/sharding_catalog_client_mock.cpp | 8 + src/mongo/s/catalog/sharding_catalog_client_mock.h | 6 + src/mongo/s/catalog_cache.cpp | 16 - src/mongo/s/catalog_cache_refresh_test.cpp | 365 ++++++++++++--------- src/mongo/s/catalog_cache_test_fixture.cpp | 45 ++- src/mongo/s/catalog_cache_test_fixture.h | 5 + src/mongo/s/config_server_catalog_cache_loader.cpp | 81 ++--- src/mongo/s/config_server_catalog_cache_loader.h | 14 + src/mongo/s/query/sharded_agg_test_fixture.h | 9 +- 12 files changed, 659 insertions(+), 243 deletions(-) (limited to 'src/mongo/s') diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index 184441c6f9b..a0481a25910 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -178,6 +178,17 @@ public: repl::ReadConcernLevel readConcern, const boost::optional& hint = boost::none) = 0; + /** + * Retrieves the collection metadata and its chunks metadata. If the collection epoch matches + * the one specified in sinceVersion, then it only returns chunks with 'lastmod' gte than + * sinceVersion; otherwise it returns all of its chunks. + */ + virtual std::pair> getCollectionAndChunks( + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& sinceVersion, + const repl::ReadConcernArgs& readConcern) = 0; + /** * Retrieves all zones defined for the specified collection. The returned vector is sorted based * on the min key of the zones. diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index d740406f842..bfbf46a75cf 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -44,6 +44,12 @@ #include "mongo/db/logical_session_cache.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/aggregate_command_gen.h" +#include "mongo/db/pipeline/document_source_facet.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_replace_root.h" +#include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" @@ -132,6 +138,276 @@ void sendRetryableWriteBatchRequestToConfig(OperationContext* opCtx, uassertStatusOK(writeStatus); } +AggregateCommand makeCollectionAndChunksAggregation(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& sinceVersion) { + auto expCtx = make_intrusive(opCtx, nullptr, nss); + StringMap resolvedNamespaces; + resolvedNamespaces[CollectionType::ConfigNS.coll()] = {CollectionType::ConfigNS, + std::vector()}; + resolvedNamespaces[ChunkType::ConfigNS.coll()] = {ChunkType::ConfigNS, std::vector()}; + expCtx->setResolvedNamespaces(resolvedNamespaces); + + using Doc = Document; + using Arr = std::vector; + + Pipeline::SourceContainer stages; + + // 1. Match config.collections entries with {_id: nss}. At most one will match. + // { + // "$match": { + // "_id": nss + // } + // } + stages.emplace_back(DocumentSourceMatch::create(Doc{{"_id", nss.toString()}}.toBson(), expCtx)); + + // 2. Lookup chunks in config.chunks for the matched collection. Match chunks by 'uuid' or 'ns' + // depending on whether the collection entry has 'timestamp' or not. If the collection entry has + // the same 'lastmodEpoch' as 'sinceVersion', then match only chunks with 'lastmod' greater or + // equal to Timestamp(sinceVersion). + // Because of SERVER-34926, a $lookup that uses an $expr operator together with a range match + // query won't be able to use indexes. To work around this, we use a $facet to create 4 + // different 'branches' depending on whether we match by 'ns' or 'uuid' and whether the refresh + // is incremental or not. This way, in each one of the $facet subpipelines we don't need to use + // the $expr operator in the $gte range comparison. Since the match conditions in each one of + // the $facet branches are mutually exclusive, only one of them will execute. + // + // { + // "$facet": { + // "collWithUUIDIncremental": [ + // { + // "$match": { + // "timestamp": { + // "$exists": 1 + // }, + // "lastmodEpoch": + // } + // }, + // { + // "$lookup": { + // "from": "chunks", + // "as": "chunks", + // "let": { + // "local_uuid": "$uuid" + // }, + // "pipeline": [ + // { + // "$match": { + // "$expr": { + // "$eq": [ + // "$uuid", + // "$$local_uuid" + // ] + // }, + // "lastmod": { + // "$gte": + // } + // } + // }, + // { + // "$sort": { + // "lastmod": 1 + // } + // } + // ] + // } + // } + // ], + // "collWithUUIDNonIncremental": [ + // { + // "$match": { + // "timestamp": { + // "$exists": 1 + // }, + // "lastmodEpoch": { + // "$ne": + // } + // } + // }, + // { + // "$lookup": { + // "from": "chunks", + // "as": "chunks", + // "let": { + // "local_uuid": "$uuid" + // }, + // "pipeline": [ + // { + // "$match": { + // "$expr": { + // "$eq": [ + // "$uuid", + // "$$local_uuid" + // ] + // } + // } + // }, + // { + // "$sort": { + // "lastmod": 1 + // } + // } + // ] + // } + // } + // ], + // "collWithNsIncremental": [...], + // "collWithNsNonIncremental": [...] + // } + // } + constexpr auto chunksLookupOutputFieldName = "chunks"_sd; + const auto buildLookUpStageFn = [&](bool withUUID, bool incremental) { + const auto letExpr = + withUUID ? Doc{{"local_uuid", "$uuid"_sd}} : Doc{{"local_ns", "$_id"_sd}}; + const auto eqNsOrUuidExpr = withUUID ? Arr{Value{"$uuid"_sd}, Value{"$$local_uuid"_sd}} + : Arr{Value{"$ns"_sd}, Value{"$$local_ns"_sd}}; + auto pipelineMatchExpr = [&]() { + if (incremental) { + return Doc{{"$expr", Doc{{"$eq", eqNsOrUuidExpr}}}, + {"lastmod", Doc{{"$gte", Timestamp(sinceVersion.toLong())}}}}; + } else { + return Doc{{"$expr", Doc{{"$eq", eqNsOrUuidExpr}}}}; + } + }(); + + return Doc{{"from", ChunkType::ConfigNS.coll()}, + {"as", chunksLookupOutputFieldName}, + {"let", letExpr}, + {"pipeline", + Arr{Value{Doc{{"$match", pipelineMatchExpr}}}, + Value{Doc{{"$sort", Doc{{"lastmod", 1}}}}}}}}; + }; + + constexpr auto collWithNsIncrementalFacetName = "collWithNsIncremental"_sd; + constexpr auto collWithUUIDIncrementalFacetName = "collWithUUIDIncremental"_sd; + constexpr auto collWithNsNonIncrementalFacetName = "collWithNsNonIncremental"_sd; + constexpr auto collWithUUIDNonIncrementalFacetName = "collWithUUIDNonIncremental"_sd; + + stages.emplace_back(DocumentSourceFacet::createFromBson( + // TODO SERVER-53283: Once 5.0 has branched out, the 'collWithNsIncremental' and + // 'collWithNsNonIncremental' branches are no longer needed. + Doc{{"$facet", + Doc{{collWithNsIncrementalFacetName, + Arr{Value{Doc{{"$match", + Doc{{"timestamp", Doc{{"$exists", 0}}}, + {"lastmodEpoch", sinceVersion.epoch()}}}}}, + Value{Doc{ + {"$lookup", + buildLookUpStageFn(false /* withUuid */, true /* incremental */)}}}}}, + {collWithUUIDIncrementalFacetName, + Arr{Value{Doc{{"$match", + Doc{{"timestamp", Doc{{"$exists", 1}}}, + {"lastmodEpoch", sinceVersion.epoch()}}}}}, + Value{ + Doc{{"$lookup", + buildLookUpStageFn(true /* withUuid */, true /* incremental */)}}}}}, + {collWithNsNonIncrementalFacetName, + Arr{Value{Doc{{"$match", + Doc{{"timestamp", Doc{{"$exists", 0}}}, + {"lastmodEpoch", Doc{{"$ne", sinceVersion.epoch()}}}}}}}, + Value{Doc{ + {"$lookup", + buildLookUpStageFn(false /* withUuid */, false /* incremental */)}}}}}, + {collWithUUIDNonIncrementalFacetName, + Arr{Value{Doc{{"$match", + Doc{{"timestamp", Doc{{"$exists", 1}}}, + {"lastmodEpoch", Doc{{"$ne", sinceVersion.epoch()}}}}}}}, + Value{Doc{ + {"$lookup", + buildLookUpStageFn(true /* withUuid */, false /* incremental */)}}}}}}}} + .toBson() + .firstElement(), + expCtx)); + + // 3. Collapse the arrays output by $facet (only one of them has an element) into a single array + // 'coll'. + // { + // "$project": { + // "_id": true, + // "coll": { + // "$setUnion": [ + // "$collWithNsIncremental", + // "$collWithUUIDIncremental", + // "$collWithNsNonIncremental", + // "$collWithUUIDNonIncremental" + // ] + // } + // } + // } + stages.emplace_back(DocumentSourceProject::createFromBson( + Doc{{"$project", + Doc{{"coll", + Doc{{"$setUnion", + Arr{Value{"$" + collWithNsIncrementalFacetName}, + Value{"$" + collWithUUIDIncrementalFacetName}, + Value{"$" + collWithNsNonIncrementalFacetName}, + Value{"$" + collWithUUIDNonIncrementalFacetName}}}}}}}} + .toBson() + .firstElement(), + expCtx)); + + // 4. Unwind the 'coll' array (which has at most one element). + // { + // "$unwind": { + // "path": "$coll" + // } + // } + stages.emplace_back(DocumentSourceUnwind::createFromBson( + Doc{{"$unwind", Doc{{"path", "$coll"_sd}}}}.toBson().firstElement(), expCtx)); + + // 5. Promote the 'coll' document to the top level. + // { + // "$replaceRoot": { + // "newRoot": "$coll" + // } + // } + stages.emplace_back(DocumentSourceReplaceRoot::createFromBson( + Doc{{"$replaceRoot", Doc{{"newRoot", "$coll"_sd}}}}.toBson().firstElement(), expCtx)); + + // 6. Unwind the 'chunks' array. + // { + // "$unwind": { + // "path": "$chunks", + // "preserveNullAndEmptyArrays": true, + // "includeArrayIndex": "chunksArrayIndex" + // } + // } + constexpr auto chunksArrayIndexFieldName = "chunksArrayIndex"_sd; + stages.emplace_back(DocumentSourceUnwind::createFromBson( + Doc{{"$unwind", + Doc{{"path", "$" + chunksLookupOutputFieldName}, + {"preserveNullAndEmptyArrays", true}, + {"includeArrayIndex", chunksArrayIndexFieldName}}}} + .toBson() + .firstElement(), + expCtx)); + + // 7. After unwinding the chunks we are left with the same collection metadata repeated for each + // one of the chunks. To reduce the size of the response, only keep the collection metadata for + // the first result entry and omit it from the following ones. + // { + // $replaceRoot: { + // newRoot: {$cond: [{$gt: ["$chunksArrayIndex", 0]}, {chunks: "$chunks"}, "$$ROOT"]} + // } + // } + stages.emplace_back(DocumentSourceReplaceRoot::createFromBson( + Doc{{"$replaceRoot", + Doc{{"newRoot", + Doc{{"$cond", + Arr{Value{ + Doc{{"$gt", Arr{Value{"$" + chunksArrayIndexFieldName}, Value{0}}}}}, + Value{Doc{ + {chunksLookupOutputFieldName, "$" + chunksLookupOutputFieldName}}}, + Value{"$$ROOT"_sd}}}}}}}} + .toBson() + .firstElement(), + expCtx)); + + auto pipeline = Pipeline::create(std::move(stages), expCtx); + auto serializedPipeline = pipeline->serializeToBson(); + return AggregateCommand(CollectionType::ConfigNS, std::move(serializedPipeline)); +} + } // namespace ShardingCatalogClientImpl::ShardingCatalogClientImpl() = default; @@ -437,6 +713,66 @@ StatusWith> ShardingCatalogClientImpl::getChunks( return chunks; } +std::pair> ShardingCatalogClientImpl::getCollectionAndChunks( + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& sinceVersion, + const repl::ReadConcernArgs& readConcern) { + auto aggRequest = makeCollectionAndChunksAggregation(opCtx, nss, sinceVersion); + aggRequest.setReadConcern(readConcern.toBSONInner()); + aggRequest.setWriteConcern(WriteConcernOptions()); + + const auto readPref = (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) + ? ReadPreferenceSetting() + : Grid::get(opCtx)->readPreferenceWithConfigTime(kConfigReadSelector); + aggRequest.setUnwrappedReadPref(readPref.toContainingBSON()); + + // Run the aggregation + std::vector aggResult; + auto callback = [&aggResult](const std::vector& batch) { + aggResult.insert(aggResult.end(), + std::make_move_iterator(batch.begin()), + std::make_move_iterator(batch.end())); + return true; + }; + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->runAggregation( + opCtx, aggRequest, callback)); + + uassert(ErrorCodes::NamespaceNotFound, + stream() << "Collection " << nss.ns() << " not found", + !aggResult.empty()); + + // The first aggregation result document has the config.collections entry plus the first + // returned chunk. Since the CollectionType idl is 'strict: false', it will ignore the foreign + // 'chunks' field joined onto it. + const CollectionType coll(aggResult.front()); + + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Collection " << nss.ns() << " is dropped.", + !coll.getDropped()); + + uassert(ErrorCodes::ConflictingOperationInProgress, + stream() << "No chunks were found for the collection " << nss, + aggResult.front().hasField("chunks")); + + std::vector chunks; + chunks.reserve(aggResult.size()); + for (const auto& elem : aggResult) { + const auto chunkElem = elem.getField("chunks"); + if (!chunkElem) { + // Only the first (and in that case, only) aggregation result may not have chunks. That + // case is already caught by the uassert above. + static constexpr auto msg = "No chunks found in aggregation result"; + LOGV2_ERROR(5487400, msg, "elem"_attr = elem); + uasserted(5487401, msg); + } + + auto chunkRes = uassertStatusOK(ChunkType::fromConfigBSON(chunkElem.Obj())); + chunks.emplace_back(std::move(chunkRes)); + } + return {std::move(coll), std::move(chunks)}; +}; + StatusWith> ShardingCatalogClientImpl::getTagsForCollection( OperationContext* opCtx, const NamespaceString& nss) { auto findStatus = _exhaustiveFindOnConfig(opCtx, diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.h b/src/mongo/s/catalog/sharding_catalog_client_impl.h index 767b4ae3c17..3d513b93c6e 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.h @@ -94,6 +94,12 @@ public: repl::ReadConcernLevel readConcern, const boost::optional& hint = boost::none) override; + std::pair> getCollectionAndChunks( + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& sinceVersion, + const repl::ReadConcernArgs& readConcern) override; + StatusWith> getTagsForCollection(OperationContext* opCtx, const NamespaceString& nss) override; diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp index deec7539db3..f0eee362746 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp @@ -87,6 +87,14 @@ StatusWith> ShardingCatalogClientMock::getChunks( return {ErrorCodes::InternalError, "Method not implemented"}; } +std::pair> ShardingCatalogClientMock::getCollectionAndChunks( + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& sinceVersion, + const repl::ReadConcernArgs& readConcern) { + uasserted(ErrorCodes::InternalError, "Method not implemented"); +} + StatusWith> ShardingCatalogClientMock::getTagsForCollection( OperationContext* opCtx, const NamespaceString& nss) { return {ErrorCodes::InternalError, "Method not implemented"}; diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index 0930579d55d..81e781b4d97 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -70,6 +70,12 @@ public: repl::ReadConcernLevel readConcern, const boost::optional& hint) override; + std::pair> getCollectionAndChunks( + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& sinceVersion, + const repl::ReadConcernArgs& readConcern) override; + StatusWith> getTagsForCollection(OperationContext* opCtx, const NamespaceString& nss) override; diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 3b473c8f335..23a44bfba92 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -218,22 +218,6 @@ StatusWith CatalogCache::_getCollectionRoutingInfoAt( if (acquireTries == kMaxInconsistentRoutingInfoRefreshAttempts) { return ex.toStatus(); } - } catch (ExceptionFor& ex) { - // TODO SERVER-53283: Remove once 5.0 has branched out. - // This would happen when the query to config.chunks fails because the index - // specified in the 'hint' provided by ConfigServerCatalogCache loader does no - // longer exist because it was dropped as part of the FCV upgrade/downgrade process - // to/from 5.0. - LOGV2_FOR_CATALOG_REFRESH(5310502, - 0, - "Collection refresh failed", - "namespace"_attr = nss, - "exception"_attr = redact(ex)); - _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); - acquireTries++; - if (acquireTries == kMaxInconsistentRoutingInfoRefreshAttempts) { - return ex.toStatus(); - } } catch (ExceptionFor& ex) { // TODO SERVER-53283: Remove once 5.0 has branched out. // This would happen when the query to config.chunks is killed because the index it diff --git a/src/mongo/s/catalog_cache_refresh_test.cpp b/src/mongo/s/catalog_cache_refresh_test.cpp index eafa39348ae..d650c22b865 100644 --- a/src/mongo/s/catalog_cache_refresh_test.cpp +++ b/src/mongo/s/catalog_cache_refresh_test.cpp @@ -32,7 +32,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/concurrency/locker_noop.h" -#include "mongo/db/query/query_request_helper.h" +#include "mongo/db/pipeline/aggregation_request_helper.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" @@ -70,9 +70,11 @@ protected: }()); } - void expectGetCollectionWithReshardingFields(OID epoch, - const ShardKeyPattern& shardKeyPattern, - UUID reshardingUUID) { + void expectCollectionAndChunksAggregationWithReshardingFields( + OID epoch, + const ShardKeyPattern& shardKeyPattern, + UUID reshardingUUID, + const std::vector& chunks) { expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { auto collType = getDefaultCollectionType(epoch, shardKeyPattern); @@ -80,7 +82,15 @@ protected: reshardingFields.setUuid(reshardingUUID); collType.setReshardingFields(std::move(reshardingFields)); - return std::vector{collType.toBSON()}; + std::vector aggResult; + std::transform(chunks.begin(), + chunks.end(), + std::back_inserter(aggResult), + [&collType](const auto& chunk) { + return collType.toBSON().addFields( + BSON("chunks" << chunk.toConfigBSON())); + }); + return aggResult; }()); } @@ -101,37 +111,28 @@ TEST_F(CatalogCacheRefreshTest, FullLoad) { expectGetDatabase(); - expectGetCollectionWithReshardingFields(epoch, shardKeyPattern, reshardingUUID); - expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { - ChunkVersion version(1, 0, epoch, boost::none /* timestamp */); + ChunkVersion version(1, 0, epoch, boost::none /* timestamp */); - ChunkType chunk1(kNss, - {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << -100)}, - version, - {"0"}); - chunk1.setName(OID::gen()); - version.incMinor(); + ChunkType chunk1( + kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << -100)}, version, {"0"}); + chunk1.setName(OID::gen()); + version.incMinor(); - ChunkType chunk2(kNss, {BSON("_id" << -100), BSON("_id" << 0)}, version, {"1"}); - chunk2.setName(OID::gen()); - version.incMinor(); + ChunkType chunk2(kNss, {BSON("_id" << -100), BSON("_id" << 0)}, version, {"1"}); + chunk2.setName(OID::gen()); + version.incMinor(); - ChunkType chunk3(kNss, {BSON("_id" << 0), BSON("_id" << 100)}, version, {"0"}); - chunk3.setName(OID::gen()); - version.incMinor(); + ChunkType chunk3(kNss, {BSON("_id" << 0), BSON("_id" << 100)}, version, {"0"}); + chunk3.setName(OID::gen()); + version.incMinor(); - ChunkType chunk4(kNss, - {BSON("_id" << 100), shardKeyPattern.getKeyPattern().globalMax()}, - version, - {"1"}); - chunk4.setName(OID::gen()); - version.incMinor(); + ChunkType chunk4( + kNss, {BSON("_id" << 100), shardKeyPattern.getKeyPattern().globalMax()}, version, {"1"}); + chunk4.setName(OID::gen()); + version.incMinor(); - return std::vector{chunk1.toConfigBSON(), - chunk2.toConfigBSON(), - chunk3.toConfigBSON(), - chunk4.toConfigBSON()}; - }()); + expectCollectionAndChunksAggregationWithReshardingFields( + epoch, shardKeyPattern, reshardingUUID, {chunk1, chunk2, chunk3, chunk4}); auto cm = *future.default_timed_get(); ASSERT(cm.isSharded()); @@ -242,14 +243,20 @@ TEST_F(CatalogCacheRefreshTest, FullLoadNoChunksFound) { expectGetDatabase(); // Return no chunks three times, which is how frequently the catalog cache retries - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, {}); + expectFindSendBSONObjVector(kConfigHostAndPort, [&] { + const auto coll = getDefaultCollectionType(epoch, shardKeyPattern); + return std::vector{coll.toBSON()}; + }()); - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, {}); + expectFindSendBSONObjVector(kConfigHostAndPort, [&] { + const auto coll = getDefaultCollectionType(epoch, shardKeyPattern); + return std::vector{coll.toBSON()}; + }()); - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, {}); + expectFindSendBSONObjVector(kConfigHostAndPort, [&] { + const auto coll = getDefaultCollectionType(epoch, shardKeyPattern); + return std::vector{coll.toBSON()}; + }()); try { auto cm = *future.default_timed_get(); @@ -271,14 +278,20 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadNoChunksFound) { auto future = scheduleRoutingInfoForcedRefresh(kNss); // Return no chunks three times, which is how frequently the catalog cache retries - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, {}); + expectFindSendBSONObjVector(kConfigHostAndPort, [&] { + const auto coll = getDefaultCollectionType(epoch, shardKeyPattern); + return std::vector{coll.toBSON()}; + }()); - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, {}); + expectFindSendBSONObjVector(kConfigHostAndPort, [&] { + const auto coll = getDefaultCollectionType(epoch, shardKeyPattern); + return std::vector{coll.toBSON()}; + }()); - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, {}); + expectFindSendBSONObjVector(kConfigHostAndPort, [&] { + const auto coll = getDefaultCollectionType(epoch, shardKeyPattern); + return std::vector{coll.toBSON()}; + }()); try { auto cm = *future.default_timed_get(); @@ -298,15 +311,15 @@ TEST_F(CatalogCacheRefreshTest, ChunksBSONCorrupted) { expectGetDatabase(); // Return no chunks three times, which is how frequently the catalog cache retries - expectGetCollection(epoch, shardKeyPattern); expectFindSendBSONObjVector(kConfigHostAndPort, [&] { - return std::vector{ChunkType( - kNss, - {shardKeyPattern.getKeyPattern().globalMin(), - BSON("_id" << 0)}, - ChunkVersion(1, 0, epoch, boost::none /* timestamp */), - {"0"}) - .toConfigBSON(), + const auto coll = getDefaultCollectionType(epoch, shardKeyPattern); + const auto chunk1 = + ChunkType(kNss, + {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, + ChunkVersion(1, 0, epoch, boost::none /* timestamp */), + {"0"}); + return std::vector{coll.toBSON().addFields( + BSON("chunks" << chunk1.toConfigBSON())), BSON("BadValue" << "This value should not be in a chunk config document")}; }()); @@ -350,20 +363,19 @@ TEST_F(CatalogCacheRefreshTest, FullLoadMissingChunkWithLowestVersion) { chunk4.setName(OID::gen()); version.incMinor(); - return std::vector{ - chunk2.toConfigBSON(), chunk3.toConfigBSON(), chunk4.toConfigBSON()}; + return std::vector{chunk2, chunk3, chunk4}; }(); // Return incomplete set of chunks three times, which is how frequently the catalog cache // retries - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); + expectCollectionAndChunksAggregation( + kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks); - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); + expectCollectionAndChunksAggregation( + kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks); - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); + expectCollectionAndChunksAggregation( + kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks); try { auto cm = *future.default_timed_get(); @@ -405,20 +417,19 @@ TEST_F(CatalogCacheRefreshTest, FullLoadMissingChunkWithHighestVersion) { chunk4.setName(OID::gen()); version.incMinor(); - return std::vector{ - chunk2.toConfigBSON(), chunk3.toConfigBSON(), chunk4.toConfigBSON()}; + return std::vector{chunk2, chunk3, chunk4}; }(); // Return incomplete set of chunks three times, which is how frequently the catalog cache // retries - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); + expectCollectionAndChunksAggregation( + kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks); - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); + expectCollectionAndChunksAggregation( + kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks); - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); + expectCollectionAndChunksAggregation( + kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks); try { auto cm = *future.default_timed_get(); @@ -463,20 +474,19 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithLowestVersion) { chunk4.setName(OID::gen()); version.incMinor(); - return std::vector{ - chunk2.toConfigBSON(), chunk3.toConfigBSON(), chunk4.toConfigBSON()}; + return std::vector{chunk2, chunk3, chunk4}; }(); // Return incomplete set of chunks three times, which is how frequently the catalog cache // retries - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); + expectCollectionAndChunksAggregation( + kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks); - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); + expectCollectionAndChunksAggregation( + kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks); - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); + expectCollectionAndChunksAggregation( + kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks); try { auto cm = *future.default_timed_get(); @@ -520,20 +530,19 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithHighestVersion) { chunk4.setName(OID::gen()); version.incMinor(); - return std::vector{ - chunk2.toConfigBSON(), chunk3.toConfigBSON(), chunk4.toConfigBSON()}; + return std::vector{chunk2, chunk3, chunk4}; }(); // Return incomplete set of chunks three times, which is how frequently the catalog cache // retries - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); + expectCollectionAndChunksAggregation( + kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks); - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); + expectCollectionAndChunksAggregation( + kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks); - expectGetCollection(epoch, shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); + expectCollectionAndChunksAggregation( + kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks); try { auto cm = *future.default_timed_get(); @@ -567,18 +576,28 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoad) { {"1"}); chunk2.setName(OID::gen()); - return std::vector{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; + return std::vector{chunk1, chunk2}; }(); // Return set of chunks, one of which has different epoch. Do it three times, which is how // frequently the catalog cache retries. - expectGetCollection(initialRoutingInfo.getVersion().epoch(), shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, inconsistentChunks); - expectGetCollection(initialRoutingInfo.getVersion().epoch(), shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, inconsistentChunks); - - expectGetCollection(initialRoutingInfo.getVersion().epoch(), shardKeyPattern); - expectFindSendBSONObjVector(kConfigHostAndPort, inconsistentChunks); + expectCollectionAndChunksAggregation(kNss, + initialRoutingInfo.getVersion().epoch(), + UUID::gen(), + shardKeyPattern, + inconsistentChunks); + + expectCollectionAndChunksAggregation(kNss, + initialRoutingInfo.getVersion().epoch(), + UUID::gen(), + shardKeyPattern, + inconsistentChunks); + + expectCollectionAndChunksAggregation(kNss, + initialRoutingInfo.getVersion().epoch(), + UUID::gen(), + shardKeyPattern, + inconsistentChunks); try { auto cm = *future.default_timed_get(); @@ -607,14 +626,23 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoadRecoveryAft // the situation where a collection existed with epoch0, we started a refresh for that // collection, the cursor yielded and while it yielded another node dropped the collection and // recreated it with different epoch and chunks. - expectGetCollection(oldVersion.epoch(), shardKeyPattern); onFindCommand([&](const RemoteCommandRequest& request) { - auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj); - auto diffQuery = query_request_helper::makeFromFindCommandForTests(opMsg.body); - ASSERT_BSONOBJ_EQ(BSON("ns" << kNss.ns() << "lastmod" - << BSON("$gte" << Timestamp(oldVersion.majorVersion(), - oldVersion.minorVersion()))), - diffQuery->getFilter()); + const auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj); + const auto aggRequest = unittest::assertGet( + aggregation_request_helper::parseFromBSONForTests(kNss, opMsg.body)); + const auto& pipeline = aggRequest.getPipeline(); + + ASSERT_EQ(pipeline[1]["$facet"]["collWithNsIncremental"] + .Array()[0]["$match"]["lastmodEpoch"] + .OID(), + oldVersion.epoch()); + ASSERT_BSONOBJ_EQ(pipeline[1]["$facet"]["collWithNsNonIncremental"] + .Array()[0]["$match"]["lastmodEpoch"] + .Obj(), + BSON("$ne" << oldVersion.epoch())); + + const auto collBSON = + getDefaultCollectionType(oldVersion.epoch(), shardKeyPattern).toBSON(); oldVersion.incMajor(); ChunkType chunk1(kNss, @@ -631,20 +659,29 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoadRecoveryAft {"1"}); chunk3.setName(OID::gen()); - return std::vector{chunk1.toConfigBSON(), chunk3.toConfigBSON()}; + const auto chunk1BSON = collBSON.addFields(BSON("chunks" << chunk1.toConfigBSON())); + const auto chunk3BSON = collBSON.addFields(BSON("chunks" << chunk3.toConfigBSON())); + return std::vector{chunk1BSON, chunk3BSON}; }); // On the second retry attempt, return the correct set of chunks from the recreated collection - expectGetCollection(newEpoch, shardKeyPattern); - ChunkVersion newVersion(5, 0, newEpoch, boost::none /* timestamp */); onFindCommand([&](const RemoteCommandRequest& request) { - // Ensure it is a differential query but starting from version zero (to fetch all the - // chunks) since the incremental refresh above produced a different version - auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj); - auto diffQuery = query_request_helper::makeFromFindCommandForTests(opMsg.body); - ASSERT_BSONOBJ_EQ(BSON("ns" << kNss.ns() << "lastmod" << BSON("$gte" << Timestamp(0, 0))), - diffQuery->getFilter()); + const auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj); + const auto aggRequest = unittest::assertGet( + aggregation_request_helper::parseFromBSONForTests(kNss, opMsg.body)); + const auto& pipeline = aggRequest.getPipeline(); + + ASSERT_EQ(pipeline[1]["$facet"]["collWithNsIncremental"] + .Array()[0]["$match"]["lastmodEpoch"] + .OID(), + oldVersion.epoch()); + ASSERT_BSONOBJ_EQ(pipeline[1]["$facet"]["collWithNsNonIncremental"] + .Array()[0]["$match"]["lastmodEpoch"] + .Obj(), + BSON("$ne" << oldVersion.epoch())); + + const auto collBSON = getDefaultCollectionType(newEpoch, shardKeyPattern).toBSON(); ChunkType chunk1(kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, @@ -663,8 +700,10 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoadRecoveryAft {"1"}); chunk3.setName(OID::gen()); - return std::vector{ - chunk1.toConfigBSON(), chunk2.toConfigBSON(), chunk3.toConfigBSON()}; + const auto chunk1BSON = collBSON.addFields(BSON("chunks" << chunk1.toConfigBSON())); + const auto chunk2BSON = collBSON.addFields(BSON("chunks" << chunk2.toConfigBSON())); + const auto chunk3BSON = collBSON.addFields(BSON("chunks" << chunk3.toConfigBSON())); + return std::vector{chunk1BSON, chunk2BSON, chunk3BSON}; }); auto cm = *future.default_timed_get(); @@ -687,18 +726,27 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterCollectionEpochChange) { auto future = scheduleRoutingInfoIncrementalRefresh(kNss); + ChunkVersion oldVersion = initialRoutingInfo.getVersion(); ChunkVersion newVersion(1, 0, OID::gen(), boost::none /* timestamp */); - // Return collection with a different epoch - expectGetCollection(newVersion.epoch(), shardKeyPattern); - - // Return set of chunks, which represent a split + // Return collection with a different epoch and a set of chunks, which represent a split onFindCommand([&](const RemoteCommandRequest& request) { - // Ensure it is a differential query but starting from version zero - auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj); - auto diffQuery = query_request_helper::makeFromFindCommandForTests(opMsg.body); - ASSERT_BSONOBJ_EQ(BSON("ns" << kNss.ns() << "lastmod" << BSON("$gte" << Timestamp(0, 0))), - diffQuery->getFilter()); + const auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj); + const auto aggRequest = unittest::assertGet( + aggregation_request_helper::parseFromBSONForTests(kNss, opMsg.body)); + const auto& pipeline = aggRequest.getPipeline(); + + ASSERT_EQ(pipeline[1]["$facet"]["collWithNsIncremental"] + .Array()[0]["$match"]["lastmodEpoch"] + .OID(), + oldVersion.epoch()); + ASSERT_BSONOBJ_EQ(pipeline[1]["$facet"]["collWithNsNonIncremental"] + .Array()[0]["$match"]["lastmodEpoch"] + .Obj(), + BSON("$ne" << oldVersion.epoch())); + + const auto collBSON = + getDefaultCollectionType(newVersion.epoch(), shardKeyPattern).toBSON(); ChunkType chunk1(kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, @@ -713,7 +761,9 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterCollectionEpochChange) { {"1"}); chunk2.setName(OID::gen()); - return std::vector{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; + const auto chunk1BSON = collBSON.addFields(BSON("chunks" << chunk1.toConfigBSON())); + const auto chunk2BSON = collBSON.addFields(BSON("chunks" << chunk2.toConfigBSON())); + return std::vector{chunk1BSON, chunk2BSON}; }); auto cm = *future.default_timed_get(); @@ -736,17 +786,26 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterSplit) { auto future = scheduleRoutingInfoIncrementalRefresh(kNss); - expectGetCollection(version.epoch(), shardKeyPattern); - // Return set of chunks, which represent a split onFindCommand([&](const RemoteCommandRequest& request) { - // Ensure it is a differential query - auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj); - auto diffQuery = query_request_helper::makeFromFindCommandForTests(opMsg.body); + const auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj); + const auto aggRequest = unittest::assertGet( + aggregation_request_helper::parseFromBSONForTests(kNss, opMsg.body)); + const auto& pipeline = aggRequest.getPipeline(); + ASSERT_BSONOBJ_EQ( - BSON("ns" << kNss.ns() << "lastmod" - << BSON("$gte" << Timestamp(version.majorVersion(), version.minorVersion()))), - diffQuery->getFilter()); + pipeline[1]["$facet"]["collWithNsIncremental"] + .Array()[1]["$lookup"]["pipeline"] + .Array()[0]["$match"]["lastmod"] + .Obj(), + BSON("$gte" << Timestamp(version.majorVersion(), version.minorVersion()))); + + ASSERT_EQ(pipeline[1]["$facet"]["collWithNsIncremental"] + .Array()[0]["$match"]["lastmodEpoch"] + .OID(), + version.epoch()); + + const auto collBSON = getDefaultCollectionType(version.epoch(), shardKeyPattern).toBSON(); version.incMajor(); ChunkType chunk1( @@ -758,7 +817,9 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterSplit) { kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"}); chunk2.setName(OID::gen()); - return std::vector{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; + const auto chunk1BSON = collBSON.addFields(BSON("chunks" << chunk1.toConfigBSON())); + const auto chunk2BSON = collBSON.addFields(BSON("chunks" << chunk2.toConfigBSON())); + return std::vector{chunk1BSON, chunk2BSON}; }); auto cm = *future.default_timed_get(); @@ -784,23 +845,20 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveWithReshardingFieldsAdde ChunkVersion expectedDestShardVersion; - expectGetCollectionWithReshardingFields(version.epoch(), shardKeyPattern, reshardingUUID); - // Return set of chunks, which represent a move - expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { - version.incMajor(); - expectedDestShardVersion = version; - ChunkType chunk1( - kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"1"}); - chunk1.setName(OID::gen()); + version.incMajor(); + expectedDestShardVersion = version; + ChunkType chunk1( + kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"1"}); + chunk1.setName(OID::gen()); - version.incMinor(); - ChunkType chunk2( - kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"}); - chunk2.setName(OID::gen()); + version.incMinor(); + ChunkType chunk2( + kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"}); + chunk2.setName(OID::gen()); - return std::vector{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; - }()); + expectCollectionAndChunksAggregationWithReshardingFields( + version.epoch(), shardKeyPattern, reshardingUUID, {chunk1, chunk2}); auto cm = *future.default_timed_get(); ASSERT(cm.isSharded()); @@ -831,20 +889,17 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveLastChunkWithReshardingF auto future = scheduleRoutingInfoIncrementalRefresh(kNss); // The collection type won't have resharding fields this time. - expectGetCollection(version.epoch(), shardKeyPattern); - // Return set of chunks, which represent a move - expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { - version.incMajor(); - ChunkType chunk1(kNss, - {shardKeyPattern.getKeyPattern().globalMin(), - shardKeyPattern.getKeyPattern().globalMax()}, - version, - {"1"}); - chunk1.setName(OID::gen()); - - return std::vector{chunk1.toConfigBSON()}; - }()); + version.incMajor(); + ChunkType chunk1( + kNss, + {shardKeyPattern.getKeyPattern().globalMin(), shardKeyPattern.getKeyPattern().globalMax()}, + version, + {"1"}); + chunk1.setName(OID::gen()); + + expectCollectionAndChunksAggregation( + kNss, version.epoch(), UUID::gen(), shardKeyPattern, {chunk1}); auto cm = *future.default_timed_get(); ASSERT(cm.isSharded()); diff --git a/src/mongo/s/catalog_cache_test_fixture.cpp b/src/mongo/s/catalog_cache_test_fixture.cpp index 0bb9c38ce9c..5154f98d3ed 100644 --- a/src/mongo/s/catalog_cache_test_fixture.cpp +++ b/src/mongo/s/catalog_cache_test_fixture.cpp @@ -180,8 +180,16 @@ ChunkManager CatalogCacheTestFixture::makeChunkManager( auto future = scheduleRoutingInfoUnforcedRefresh(nss); expectFindSendBSONObjVector(kConfigHostAndPort, {databaseBSON}); - expectFindSendBSONObjVector(kConfigHostAndPort, {collectionBSON}); - expectFindSendBSONObjVector(kConfigHostAndPort, initialChunks); + expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { + std::vector aggResult; + std::transform(initialChunks.begin(), + initialChunks.end(), + std::back_inserter(aggResult), + [&collectionBSON](const auto& chunk) { + return collectionBSON.addFields(BSON("chunks" << chunk)); + }); + return aggResult; + }()); return *future.default_timed_get(); } @@ -205,6 +213,29 @@ void CatalogCacheTestFixture::expectGetCollection(NamespaceString nss, }()); } +void CatalogCacheTestFixture::expectCollectionAndChunksAggregation( + NamespaceString nss, + OID epoch, + UUID uuid, + const ShardKeyPattern& shardKeyPattern, + const std::vector& chunks) { + expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { + CollectionType collType(nss, epoch, Date_t::now(), uuid); + collType.setKeyPattern(shardKeyPattern.toBSON()); + collType.setUnique(false); + const auto collObj = collType.toBSON(); + + std::vector aggResult; + std::transform(chunks.begin(), + chunks.end(), + std::back_inserter(aggResult), + [&collObj](const auto& chunk) { + return collObj.addFields(BSON("chunks" << chunk.toConfigBSON())); + }); + return aggResult; + }()); +} + ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShards( NamespaceString nss) { @@ -237,8 +268,11 @@ ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsI expectGetDatabase(nss); } } - expectGetCollection(nss, epoch, uuid, shardKeyPattern); expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { + CollectionType collType(nss, epoch, Date_t::now(), uuid); + collType.setKeyPattern(shardKeyPattern.toBSON()); + collType.setUnique(false); + ChunkVersion version(1, 0, epoch, boost::none /* timestamp */); ChunkType chunk1( @@ -251,7 +285,10 @@ ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsI chunk2.setName(OID::gen()); version.incMinor(); - return std::vector{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; + const auto collObj = collType.toBSON(); + const auto chunk1Obj = collObj.addFields(BSON("chunks" << chunk1.toConfigBSON())); + const auto chunk2Obj = collObj.addFields(BSON("chunks" << chunk2.toConfigBSON())); + return std::vector{chunk1Obj, chunk2Obj}; }()); return *future.default_timed_get(); diff --git a/src/mongo/s/catalog_cache_test_fixture.h b/src/mongo/s/catalog_cache_test_fixture.h index a42af988697..a1fed893d2d 100644 --- a/src/mongo/s/catalog_cache_test_fixture.h +++ b/src/mongo/s/catalog_cache_test_fixture.h @@ -129,6 +129,11 @@ protected: OID epoch, UUID uuid, const ShardKeyPattern& shardKeyPattern); + void expectCollectionAndChunksAggregation(NamespaceString nss, + OID epoch, + UUID uuid, + const ShardKeyPattern& shardKeyPattern, + const std::vector& chunks); const HostAndPort kConfigHostAndPort{"DummyConfig", 1234}; }; diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp index 1befe47c719..5c61ba621df 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.cpp +++ b/src/mongo/s/config_server_catalog_cache_loader.cpp @@ -37,10 +37,10 @@ #include "mongo/db/client.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/grid.h" -#include "mongo/util/fail_point.h" namespace mongo { @@ -48,8 +48,6 @@ using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunk namespace { -MONGO_FAIL_POINT_DEFINE(hangBeforeReadingChunks); - /** * Structure repsenting the generated query and sort order for a chunk diffing operation. */ @@ -86,63 +84,22 @@ QueryAndSort createConfigDiffQueryUuid(const UUID& uuid, ChunkVersion collection */ CollectionAndChangedChunks getChangedChunks(OperationContext* opCtx, const NamespaceString& nss, - ChunkVersion sinceVersion) { + ChunkVersion sinceVersion, + bool avoidSnapshotForRefresh) { const auto catalogClient = Grid::get(opCtx)->catalogClient(); - // Decide whether to do a full or partial load based on the state of the collection - const auto coll = catalogClient->getCollection(opCtx, nss); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "Collection " << nss.ns() << " is dropped.", - !coll.getDropped()); - - // If the collection's epoch has changed, do a full refresh - const ChunkVersion startingCollectionVersion = (sinceVersion.epoch() == coll.getEpoch()) - ? sinceVersion - : ChunkVersion(0, 0, coll.getEpoch(), coll.getTimestamp()); - - // Diff tracker should *always* find at least one chunk if collection exists - const auto diffQuery = [&]() { - if (coll.getTimestamp()) { - return createConfigDiffQueryUuid(coll.getUuid(), startingCollectionVersion); - } else { - return createConfigDiffQueryNs(nss, startingCollectionVersion); - } - }(); - - if (MONGO_unlikely(hangBeforeReadingChunks.shouldFail())) { - LOGV2(5310504, "Hit hangBeforeReadingChunks failpoint"); - hangBeforeReadingChunks.pauseWhileSet(opCtx); - } - - // TODO SERVER-53283: Remove once 5.0 has branched out. - // Use a hint to make sure the query will use an index. This ensures that the query on - // config.chunks will only execute if config.chunks is guaranteed to still have the same - // metadata format as we inferred from the config.collections entry we read. - // This is because when the config.chunks are patched up as part of the FCV upgrade (or - // downgrade), first the ns_1_lastmod_1 index (or uuid_1_lastmod_1) is dropped, then the 'ns' - // (or 'uuid') fields are unset from config.chunks. If the query is forced to use the expected - // index, we can guarantee that the config.chunks we will read will have the expected format. If - // it doesn't, it means that it's being patched-up. Then the query will fail and the refresh - // will be retried, this time expecting the new metadata format. - const auto hint = coll.getTimestamp() - ? BSON(ChunkType::collectionUUID() << 1 << ChunkType::lastmod() << 1) - : BSON(ChunkType::ns() << 1 << ChunkType::lastmod() << 1); - - // Query the chunks which have changed - repl::OpTime opTime; - const std::vector changedChunks = uassertStatusOK( - Grid::get(opCtx)->catalogClient()->getChunks(opCtx, - diffQuery.query, - diffQuery.sort, - boost::none, - &opTime, - repl::ReadConcernLevel::kMajorityReadConcern, - hint)); - - uassert(ErrorCodes::ConflictingOperationInProgress, - "No chunks were found for the collection", - !changedChunks.empty()); - + const auto readConcernLevel = !avoidSnapshotForRefresh + ? repl::ReadConcernLevel::kSnapshotReadConcern + : repl::ReadConcernLevel::kLocalReadConcern; + const auto afterClusterTime = (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) + ? repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime() + : Grid::get(opCtx)->configOpTime(); + const auto readConcern = + repl::ReadConcernArgs(LogicalTime(afterClusterTime.getTimestamp()), readConcernLevel); + + auto collAndChunks = + catalogClient->getCollectionAndChunks(opCtx, nss, sinceVersion, readConcern); + const auto& coll = collAndChunks.first; return CollectionAndChangedChunks{coll.getEpoch(), coll.getTimestamp(), coll.getUuid(), @@ -151,7 +108,7 @@ CollectionAndChangedChunks getChangedChunks(OperationContext* opCtx, coll.getUnique(), coll.getReshardingFields(), coll.getAllowMigrations(), - std::move(changedChunks)}; + std::move(collAndChunks.second)}; } } // namespace @@ -207,7 +164,7 @@ SemiFuture ConfigServerCatalogCacheLoader::getChunks getGlobalServiceContext()); auto opCtx = tc->makeOperationContext(); - return getChangedChunks(opCtx.get(), nss, version); + return getChangedChunks(opCtx.get(), nss, version, _avoidSnapshotForRefresh); }) .semi(); } @@ -225,4 +182,8 @@ SemiFuture ConfigServerCatalogCacheLoader::getDatabase(StringData .semi(); } +void ConfigServerCatalogCacheLoader::setAvoidSnapshotForRefresh_ForTest() { + _avoidSnapshotForRefresh = true; +} + } // namespace mongo diff --git a/src/mongo/s/config_server_catalog_cache_loader.h b/src/mongo/s/config_server_catalog_cache_loader.h index d16ae428b7f..8c1384946f8 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.h +++ b/src/mongo/s/config_server_catalog_cache_loader.h @@ -55,9 +55,23 @@ public: ChunkVersion version) override; SemiFuture getDatabase(StringData dbName) override; + /** + * Don't use outside of unit_tests. + * TODO SERVER-54394 Remove this + */ + void setAvoidSnapshotForRefresh_ForTest(); + private: // Thread pool to be used to perform metadata load std::shared_ptr _executor; + + /* + * If 'true' avoids using snapshot read concern when refreshing the cache. Only to be used by + * unit_tests that use the ephemeralForTesting storage engine, because currently it doesn't + * support snapshot read concern. + * TODO SERVER-54394 Remove this. + */ + bool _avoidSnapshotForRefresh = false; }; } // namespace mongo diff --git a/src/mongo/s/query/sharded_agg_test_fixture.h b/src/mongo/s/query/sharded_agg_test_fixture.h index 621a7e368a9..8fc5a618051 100644 --- a/src/mongo/s/query/sharded_agg_test_fixture.h +++ b/src/mongo/s/query/sharded_agg_test_fixture.h @@ -97,14 +97,7 @@ public: // Mock the expected config server queries. expectGetDatabase(nss); - expectGetCollection(nss, epoch, UUID::gen(), shardKey); - expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { - std::vector response; - for (auto&& chunk : chunkDistribution) { - response.push_back(chunk.toConfigBSON()); - } - return response; - }()); + expectCollectionAndChunksAggregation(nss, epoch, UUID::gen(), shardKey, chunkDistribution); const auto cm = future.default_timed_get(); ASSERT(cm->isSharded()); -- cgit v1.2.1