summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorJordi Serra Torrens <jordi.serra-torrens@mongodb.com>2021-02-24 08:53:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-08 17:21:27 +0000
commitdc009c5a8d484f6a0db2f357274e14e30ea9f476 (patch)
treed36f970ae4e8604f25809d6133058fd823005a75 /src/mongo/s
parent931b53d712eaeaec29653799722127983687c284 (diff)
downloadmongo-dc009c5a8d484f6a0db2f357274e14e30ea9f476.tar.gz
SERVER-54874: Ensure reading consistent config.collections and config.chunks when refreshing the CatalogCache
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client.h11
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.cpp336
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.h6
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.cpp8
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.h6
-rw-r--r--src/mongo/s/catalog_cache.cpp16
-rw-r--r--src/mongo/s/catalog_cache_refresh_test.cpp365
-rw-r--r--src/mongo/s/catalog_cache_test_fixture.cpp45
-rw-r--r--src/mongo/s/catalog_cache_test_fixture.h5
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.cpp81
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.h14
-rw-r--r--src/mongo/s/query/sharded_agg_test_fixture.h9
12 files changed, 659 insertions, 243 deletions
diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h
index 184441c6f9b..a0481a25910 100644
--- a/src/mongo/s/catalog/sharding_catalog_client.h
+++ b/src/mongo/s/catalog/sharding_catalog_client.h
@@ -179,6 +179,17 @@ public:
const boost::optional<BSONObj>& hint = boost::none) = 0;
/**
+ * Retrieves the collection metadata and its chunks metadata. If the collection epoch matches
+ * the one specified in sinceVersion, then it only returns chunks with 'lastmod' gte than
+ * sinceVersion; otherwise it returns all of its chunks.
+ */
+ virtual std::pair<CollectionType, std::vector<ChunkType>> getCollectionAndChunks(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& sinceVersion,
+ const repl::ReadConcernArgs& readConcern) = 0;
+
+ /**
* Retrieves all zones defined for the specified collection. The returned vector is sorted based
* on the min key of the zones.
*
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
index d740406f842..bfbf46a75cf 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
@@ -44,6 +44,12 @@
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
+#include "mongo/db/pipeline/document_source_facet.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_replace_root.h"
+#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_client_info.h"
@@ -132,6 +138,276 @@ void sendRetryableWriteBatchRequestToConfig(OperationContext* opCtx,
uassertStatusOK(writeStatus);
}
+AggregateCommand makeCollectionAndChunksAggregation(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& sinceVersion) {
+ auto expCtx = make_intrusive<ExpressionContext>(opCtx, nullptr, nss);
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ resolvedNamespaces[CollectionType::ConfigNS.coll()] = {CollectionType::ConfigNS,
+ std::vector<BSONObj>()};
+ resolvedNamespaces[ChunkType::ConfigNS.coll()] = {ChunkType::ConfigNS, std::vector<BSONObj>()};
+ expCtx->setResolvedNamespaces(resolvedNamespaces);
+
+ using Doc = Document;
+ using Arr = std::vector<Value>;
+
+ Pipeline::SourceContainer stages;
+
+ // 1. Match config.collections entries with {_id: nss}. At most one will match.
+ // {
+ // "$match": {
+ // "_id": nss
+ // }
+ // }
+ stages.emplace_back(DocumentSourceMatch::create(Doc{{"_id", nss.toString()}}.toBson(), expCtx));
+
+ // 2. Lookup chunks in config.chunks for the matched collection. Match chunks by 'uuid' or 'ns'
+ // depending on whether the collection entry has 'timestamp' or not. If the collection entry has
+ // the same 'lastmodEpoch' as 'sinceVersion', then match only chunks with 'lastmod' greater or
+ // equal to Timestamp(sinceVersion).
+ // Because of SERVER-34926, a $lookup that uses an $expr operator together with a range match
+ // query won't be able to use indexes. To work around this, we use a $facet to create 4
+ // different 'branches' depending on whether we match by 'ns' or 'uuid' and whether the refresh
+ // is incremental or not. This way, in each one of the $facet subpipelines we don't need to use
+ // the $expr operator in the $gte range comparison. Since the match conditions in each one of
+ // the $facet branches are mutually exclusive, only one of them will execute.
+ //
+ // {
+ // "$facet": {
+ // "collWithUUIDIncremental": [
+ // {
+ // "$match": {
+ // "timestamp": {
+ // "$exists": 1
+ // },
+ // "lastmodEpoch": <sinceVersion.epoch>
+ // }
+ // },
+ // {
+ // "$lookup": {
+ // "from": "chunks",
+ // "as": "chunks",
+ // "let": {
+ // "local_uuid": "$uuid"
+ // },
+ // "pipeline": [
+ // {
+ // "$match": {
+ // "$expr": {
+ // "$eq": [
+ // "$uuid",
+ // "$$local_uuid"
+ // ]
+ // },
+ // "lastmod": {
+ // "$gte": <Timestamp(sinceVersion)>
+ // }
+ // }
+ // },
+ // {
+ // "$sort": {
+ // "lastmod": 1
+ // }
+ // }
+ // ]
+ // }
+ // }
+ // ],
+ // "collWithUUIDNonIncremental": [
+ // {
+ // "$match": {
+ // "timestamp": {
+ // "$exists": 1
+ // },
+ // "lastmodEpoch": {
+ // "$ne": <sinceVersion.epoch>
+ // }
+ // }
+ // },
+ // {
+ // "$lookup": {
+ // "from": "chunks",
+ // "as": "chunks",
+ // "let": {
+ // "local_uuid": "$uuid"
+ // },
+ // "pipeline": [
+ // {
+ // "$match": {
+ // "$expr": {
+ // "$eq": [
+ // "$uuid",
+ // "$$local_uuid"
+ // ]
+ // }
+ // }
+ // },
+ // {
+ // "$sort": {
+ // "lastmod": 1
+ // }
+ // }
+ // ]
+ // }
+ // }
+ // ],
+ // "collWithNsIncremental": [...],
+ // "collWithNsNonIncremental": [...]
+ // }
+ // }
+ constexpr auto chunksLookupOutputFieldName = "chunks"_sd;
+ const auto buildLookUpStageFn = [&](bool withUUID, bool incremental) {
+ const auto letExpr =
+ withUUID ? Doc{{"local_uuid", "$uuid"_sd}} : Doc{{"local_ns", "$_id"_sd}};
+ const auto eqNsOrUuidExpr = withUUID ? Arr{Value{"$uuid"_sd}, Value{"$$local_uuid"_sd}}
+ : Arr{Value{"$ns"_sd}, Value{"$$local_ns"_sd}};
+ auto pipelineMatchExpr = [&]() {
+ if (incremental) {
+ return Doc{{"$expr", Doc{{"$eq", eqNsOrUuidExpr}}},
+ {"lastmod", Doc{{"$gte", Timestamp(sinceVersion.toLong())}}}};
+ } else {
+ return Doc{{"$expr", Doc{{"$eq", eqNsOrUuidExpr}}}};
+ }
+ }();
+
+ return Doc{{"from", ChunkType::ConfigNS.coll()},
+ {"as", chunksLookupOutputFieldName},
+ {"let", letExpr},
+ {"pipeline",
+ Arr{Value{Doc{{"$match", pipelineMatchExpr}}},
+ Value{Doc{{"$sort", Doc{{"lastmod", 1}}}}}}}};
+ };
+
+ constexpr auto collWithNsIncrementalFacetName = "collWithNsIncremental"_sd;
+ constexpr auto collWithUUIDIncrementalFacetName = "collWithUUIDIncremental"_sd;
+ constexpr auto collWithNsNonIncrementalFacetName = "collWithNsNonIncremental"_sd;
+ constexpr auto collWithUUIDNonIncrementalFacetName = "collWithUUIDNonIncremental"_sd;
+
+ stages.emplace_back(DocumentSourceFacet::createFromBson(
+ // TODO SERVER-53283: Once 5.0 has branched out, the 'collWithNsIncremental' and
+ // 'collWithNsNonIncremental' branches are no longer needed.
+ Doc{{"$facet",
+ Doc{{collWithNsIncrementalFacetName,
+ Arr{Value{Doc{{"$match",
+ Doc{{"timestamp", Doc{{"$exists", 0}}},
+ {"lastmodEpoch", sinceVersion.epoch()}}}}},
+ Value{Doc{
+ {"$lookup",
+ buildLookUpStageFn(false /* withUuid */, true /* incremental */)}}}}},
+ {collWithUUIDIncrementalFacetName,
+ Arr{Value{Doc{{"$match",
+ Doc{{"timestamp", Doc{{"$exists", 1}}},
+ {"lastmodEpoch", sinceVersion.epoch()}}}}},
+ Value{
+ Doc{{"$lookup",
+ buildLookUpStageFn(true /* withUuid */, true /* incremental */)}}}}},
+ {collWithNsNonIncrementalFacetName,
+ Arr{Value{Doc{{"$match",
+ Doc{{"timestamp", Doc{{"$exists", 0}}},
+ {"lastmodEpoch", Doc{{"$ne", sinceVersion.epoch()}}}}}}},
+ Value{Doc{
+ {"$lookup",
+ buildLookUpStageFn(false /* withUuid */, false /* incremental */)}}}}},
+ {collWithUUIDNonIncrementalFacetName,
+ Arr{Value{Doc{{"$match",
+ Doc{{"timestamp", Doc{{"$exists", 1}}},
+ {"lastmodEpoch", Doc{{"$ne", sinceVersion.epoch()}}}}}}},
+ Value{Doc{
+ {"$lookup",
+ buildLookUpStageFn(true /* withUuid */, false /* incremental */)}}}}}}}}
+ .toBson()
+ .firstElement(),
+ expCtx));
+
+ // 3. Collapse the arrays output by $facet (only one of them has an element) into a single array
+ // 'coll'.
+ // {
+ // "$project": {
+ // "_id": true,
+ // "coll": {
+ // "$setUnion": [
+ // "$collWithNsIncremental",
+ // "$collWithUUIDIncremental",
+ // "$collWithNsNonIncremental",
+ // "$collWithUUIDNonIncremental"
+ // ]
+ // }
+ // }
+ // }
+ stages.emplace_back(DocumentSourceProject::createFromBson(
+ Doc{{"$project",
+ Doc{{"coll",
+ Doc{{"$setUnion",
+ Arr{Value{"$" + collWithNsIncrementalFacetName},
+ Value{"$" + collWithUUIDIncrementalFacetName},
+ Value{"$" + collWithNsNonIncrementalFacetName},
+ Value{"$" + collWithUUIDNonIncrementalFacetName}}}}}}}}
+ .toBson()
+ .firstElement(),
+ expCtx));
+
+ // 4. Unwind the 'coll' array (which has at most one element).
+ // {
+ // "$unwind": {
+ // "path": "$coll"
+ // }
+ // }
+ stages.emplace_back(DocumentSourceUnwind::createFromBson(
+ Doc{{"$unwind", Doc{{"path", "$coll"_sd}}}}.toBson().firstElement(), expCtx));
+
+ // 5. Promote the 'coll' document to the top level.
+ // {
+ // "$replaceRoot": {
+ // "newRoot": "$coll"
+ // }
+ // }
+ stages.emplace_back(DocumentSourceReplaceRoot::createFromBson(
+ Doc{{"$replaceRoot", Doc{{"newRoot", "$coll"_sd}}}}.toBson().firstElement(), expCtx));
+
+ // 6. Unwind the 'chunks' array.
+ // {
+ // "$unwind": {
+ // "path": "$chunks",
+ // "preserveNullAndEmptyArrays": true,
+ // "includeArrayIndex": "chunksArrayIndex"
+ // }
+ // }
+ constexpr auto chunksArrayIndexFieldName = "chunksArrayIndex"_sd;
+ stages.emplace_back(DocumentSourceUnwind::createFromBson(
+ Doc{{"$unwind",
+ Doc{{"path", "$" + chunksLookupOutputFieldName},
+ {"preserveNullAndEmptyArrays", true},
+ {"includeArrayIndex", chunksArrayIndexFieldName}}}}
+ .toBson()
+ .firstElement(),
+ expCtx));
+
+ // 7. After unwinding the chunks we are left with the same collection metadata repeated for each
+ // one of the chunks. To reduce the size of the response, only keep the collection metadata for
+ // the first result entry and omit it from the following ones.
+ // {
+ // $replaceRoot: {
+ // newRoot: {$cond: [{$gt: ["$chunksArrayIndex", 0]}, {chunks: "$chunks"}, "$$ROOT"]}
+ // }
+ // }
+ stages.emplace_back(DocumentSourceReplaceRoot::createFromBson(
+ Doc{{"$replaceRoot",
+ Doc{{"newRoot",
+ Doc{{"$cond",
+ Arr{Value{
+ Doc{{"$gt", Arr{Value{"$" + chunksArrayIndexFieldName}, Value{0}}}}},
+ Value{Doc{
+ {chunksLookupOutputFieldName, "$" + chunksLookupOutputFieldName}}},
+ Value{"$$ROOT"_sd}}}}}}}}
+ .toBson()
+ .firstElement(),
+ expCtx));
+
+ auto pipeline = Pipeline::create(std::move(stages), expCtx);
+ auto serializedPipeline = pipeline->serializeToBson();
+ return AggregateCommand(CollectionType::ConfigNS, std::move(serializedPipeline));
+}
+
} // namespace
ShardingCatalogClientImpl::ShardingCatalogClientImpl() = default;
@@ -437,6 +713,66 @@ StatusWith<std::vector<ChunkType>> ShardingCatalogClientImpl::getChunks(
return chunks;
}
+std::pair<CollectionType, std::vector<ChunkType>> ShardingCatalogClientImpl::getCollectionAndChunks(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& sinceVersion,
+ const repl::ReadConcernArgs& readConcern) {
+ auto aggRequest = makeCollectionAndChunksAggregation(opCtx, nss, sinceVersion);
+ aggRequest.setReadConcern(readConcern.toBSONInner());
+ aggRequest.setWriteConcern(WriteConcernOptions());
+
+ const auto readPref = (serverGlobalParams.clusterRole == ClusterRole::ConfigServer)
+ ? ReadPreferenceSetting()
+ : Grid::get(opCtx)->readPreferenceWithConfigTime(kConfigReadSelector);
+ aggRequest.setUnwrappedReadPref(readPref.toContainingBSON());
+
+ // Run the aggregation
+ std::vector<BSONObj> aggResult;
+ auto callback = [&aggResult](const std::vector<BSONObj>& batch) {
+ aggResult.insert(aggResult.end(),
+ std::make_move_iterator(batch.begin()),
+ std::make_move_iterator(batch.end()));
+ return true;
+ };
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->runAggregation(
+ opCtx, aggRequest, callback));
+
+ uassert(ErrorCodes::NamespaceNotFound,
+ stream() << "Collection " << nss.ns() << " not found",
+ !aggResult.empty());
+
+ // The first aggregation result document has the config.collections entry plus the first
+ // returned chunk. Since the CollectionType idl is 'strict: false', it will ignore the foreign
+ // 'chunks' field joined onto it.
+ const CollectionType coll(aggResult.front());
+
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Collection " << nss.ns() << " is dropped.",
+ !coll.getDropped());
+
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ stream() << "No chunks were found for the collection " << nss,
+ aggResult.front().hasField("chunks"));
+
+ std::vector<ChunkType> chunks;
+ chunks.reserve(aggResult.size());
+ for (const auto& elem : aggResult) {
+ const auto chunkElem = elem.getField("chunks");
+ if (!chunkElem) {
+ // Only the first (and in that case, only) aggregation result may not have chunks. That
+ // case is already caught by the uassert above.
+ static constexpr auto msg = "No chunks found in aggregation result";
+ LOGV2_ERROR(5487400, msg, "elem"_attr = elem);
+ uasserted(5487401, msg);
+ }
+
+ auto chunkRes = uassertStatusOK(ChunkType::fromConfigBSON(chunkElem.Obj()));
+ chunks.emplace_back(std::move(chunkRes));
+ }
+ return {std::move(coll), std::move(chunks)};
+};
+
StatusWith<std::vector<TagsType>> ShardingCatalogClientImpl::getTagsForCollection(
OperationContext* opCtx, const NamespaceString& nss) {
auto findStatus = _exhaustiveFindOnConfig(opCtx,
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.h b/src/mongo/s/catalog/sharding_catalog_client_impl.h
index 767b4ae3c17..3d513b93c6e 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.h
@@ -94,6 +94,12 @@ public:
repl::ReadConcernLevel readConcern,
const boost::optional<BSONObj>& hint = boost::none) override;
+ std::pair<CollectionType, std::vector<ChunkType>> getCollectionAndChunks(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& sinceVersion,
+ const repl::ReadConcernArgs& readConcern) override;
+
StatusWith<std::vector<TagsType>> getTagsForCollection(OperationContext* opCtx,
const NamespaceString& nss) override;
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
index deec7539db3..f0eee362746 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
@@ -87,6 +87,14 @@ StatusWith<std::vector<ChunkType>> ShardingCatalogClientMock::getChunks(
return {ErrorCodes::InternalError, "Method not implemented"};
}
+std::pair<CollectionType, std::vector<ChunkType>> ShardingCatalogClientMock::getCollectionAndChunks(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& sinceVersion,
+ const repl::ReadConcernArgs& readConcern) {
+ uasserted(ErrorCodes::InternalError, "Method not implemented");
+}
+
StatusWith<std::vector<TagsType>> ShardingCatalogClientMock::getTagsForCollection(
OperationContext* opCtx, const NamespaceString& nss) {
return {ErrorCodes::InternalError, "Method not implemented"};
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h
index 0930579d55d..81e781b4d97 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h
@@ -70,6 +70,12 @@ public:
repl::ReadConcernLevel readConcern,
const boost::optional<BSONObj>& hint) override;
+ std::pair<CollectionType, std::vector<ChunkType>> getCollectionAndChunks(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& sinceVersion,
+ const repl::ReadConcernArgs& readConcern) override;
+
StatusWith<std::vector<TagsType>> getTagsForCollection(OperationContext* opCtx,
const NamespaceString& nss) override;
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 3b473c8f335..23a44bfba92 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -218,22 +218,6 @@ StatusWith<ChunkManager> CatalogCache::_getCollectionRoutingInfoAt(
if (acquireTries == kMaxInconsistentRoutingInfoRefreshAttempts) {
return ex.toStatus();
}
- } catch (ExceptionFor<ErrorCodes::BadValue>& ex) {
- // TODO SERVER-53283: Remove once 5.0 has branched out.
- // This would happen when the query to config.chunks fails because the index
- // specified in the 'hint' provided by ConfigServerCatalogCache loader does no
- // longer exist because it was dropped as part of the FCV upgrade/downgrade process
- // to/from 5.0.
- LOGV2_FOR_CATALOG_REFRESH(5310502,
- 0,
- "Collection refresh failed",
- "namespace"_attr = nss,
- "exception"_attr = redact(ex));
- _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
- acquireTries++;
- if (acquireTries == kMaxInconsistentRoutingInfoRefreshAttempts) {
- return ex.toStatus();
- }
} catch (ExceptionFor<ErrorCodes::QueryPlanKilled>& ex) {
// TODO SERVER-53283: Remove once 5.0 has branched out.
// This would happen when the query to config.chunks is killed because the index it
diff --git a/src/mongo/s/catalog_cache_refresh_test.cpp b/src/mongo/s/catalog_cache_refresh_test.cpp
index eafa39348ae..d650c22b865 100644
--- a/src/mongo/s/catalog_cache_refresh_test.cpp
+++ b/src/mongo/s/catalog_cache_refresh_test.cpp
@@ -32,7 +32,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/concurrency/locker_noop.h"
-#include "mongo/db/query/query_request_helper.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_database.h"
@@ -70,9 +70,11 @@ protected:
}());
}
- void expectGetCollectionWithReshardingFields(OID epoch,
- const ShardKeyPattern& shardKeyPattern,
- UUID reshardingUUID) {
+ void expectCollectionAndChunksAggregationWithReshardingFields(
+ OID epoch,
+ const ShardKeyPattern& shardKeyPattern,
+ UUID reshardingUUID,
+ const std::vector<ChunkType>& chunks) {
expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
auto collType = getDefaultCollectionType(epoch, shardKeyPattern);
@@ -80,7 +82,15 @@ protected:
reshardingFields.setUuid(reshardingUUID);
collType.setReshardingFields(std::move(reshardingFields));
- return std::vector<BSONObj>{collType.toBSON()};
+ std::vector<BSONObj> aggResult;
+ std::transform(chunks.begin(),
+ chunks.end(),
+ std::back_inserter(aggResult),
+ [&collType](const auto& chunk) {
+ return collType.toBSON().addFields(
+ BSON("chunks" << chunk.toConfigBSON()));
+ });
+ return aggResult;
}());
}
@@ -101,37 +111,28 @@ TEST_F(CatalogCacheRefreshTest, FullLoad) {
expectGetDatabase();
- expectGetCollectionWithReshardingFields(epoch, shardKeyPattern, reshardingUUID);
- expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
- ChunkVersion version(1, 0, epoch, boost::none /* timestamp */);
+ ChunkVersion version(1, 0, epoch, boost::none /* timestamp */);
- ChunkType chunk1(kNss,
- {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << -100)},
- version,
- {"0"});
- chunk1.setName(OID::gen());
- version.incMinor();
+ ChunkType chunk1(
+ kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << -100)}, version, {"0"});
+ chunk1.setName(OID::gen());
+ version.incMinor();
- ChunkType chunk2(kNss, {BSON("_id" << -100), BSON("_id" << 0)}, version, {"1"});
- chunk2.setName(OID::gen());
- version.incMinor();
+ ChunkType chunk2(kNss, {BSON("_id" << -100), BSON("_id" << 0)}, version, {"1"});
+ chunk2.setName(OID::gen());
+ version.incMinor();
- ChunkType chunk3(kNss, {BSON("_id" << 0), BSON("_id" << 100)}, version, {"0"});
- chunk3.setName(OID::gen());
- version.incMinor();
+ ChunkType chunk3(kNss, {BSON("_id" << 0), BSON("_id" << 100)}, version, {"0"});
+ chunk3.setName(OID::gen());
+ version.incMinor();
- ChunkType chunk4(kNss,
- {BSON("_id" << 100), shardKeyPattern.getKeyPattern().globalMax()},
- version,
- {"1"});
- chunk4.setName(OID::gen());
- version.incMinor();
+ ChunkType chunk4(
+ kNss, {BSON("_id" << 100), shardKeyPattern.getKeyPattern().globalMax()}, version, {"1"});
+ chunk4.setName(OID::gen());
+ version.incMinor();
- return std::vector<BSONObj>{chunk1.toConfigBSON(),
- chunk2.toConfigBSON(),
- chunk3.toConfigBSON(),
- chunk4.toConfigBSON()};
- }());
+ expectCollectionAndChunksAggregationWithReshardingFields(
+ epoch, shardKeyPattern, reshardingUUID, {chunk1, chunk2, chunk3, chunk4});
auto cm = *future.default_timed_get();
ASSERT(cm.isSharded());
@@ -242,14 +243,20 @@ TEST_F(CatalogCacheRefreshTest, FullLoadNoChunksFound) {
expectGetDatabase();
// Return no chunks three times, which is how frequently the catalog cache retries
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, {});
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&] {
+ const auto coll = getDefaultCollectionType(epoch, shardKeyPattern);
+ return std::vector<BSONObj>{coll.toBSON()};
+ }());
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, {});
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&] {
+ const auto coll = getDefaultCollectionType(epoch, shardKeyPattern);
+ return std::vector<BSONObj>{coll.toBSON()};
+ }());
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, {});
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&] {
+ const auto coll = getDefaultCollectionType(epoch, shardKeyPattern);
+ return std::vector<BSONObj>{coll.toBSON()};
+ }());
try {
auto cm = *future.default_timed_get();
@@ -271,14 +278,20 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadNoChunksFound) {
auto future = scheduleRoutingInfoForcedRefresh(kNss);
// Return no chunks three times, which is how frequently the catalog cache retries
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, {});
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&] {
+ const auto coll = getDefaultCollectionType(epoch, shardKeyPattern);
+ return std::vector<BSONObj>{coll.toBSON()};
+ }());
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, {});
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&] {
+ const auto coll = getDefaultCollectionType(epoch, shardKeyPattern);
+ return std::vector<BSONObj>{coll.toBSON()};
+ }());
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, {});
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&] {
+ const auto coll = getDefaultCollectionType(epoch, shardKeyPattern);
+ return std::vector<BSONObj>{coll.toBSON()};
+ }());
try {
auto cm = *future.default_timed_get();
@@ -298,15 +311,15 @@ TEST_F(CatalogCacheRefreshTest, ChunksBSONCorrupted) {
expectGetDatabase();
// Return no chunks three times, which is how frequently the catalog cache retries
- expectGetCollection(epoch, shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, [&] {
- return std::vector<BSONObj>{ChunkType(
- kNss,
- {shardKeyPattern.getKeyPattern().globalMin(),
- BSON("_id" << 0)},
- ChunkVersion(1, 0, epoch, boost::none /* timestamp */),
- {"0"})
- .toConfigBSON(),
+ const auto coll = getDefaultCollectionType(epoch, shardKeyPattern);
+ const auto chunk1 =
+ ChunkType(kNss,
+ {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)},
+ ChunkVersion(1, 0, epoch, boost::none /* timestamp */),
+ {"0"});
+ return std::vector<BSONObj>{coll.toBSON().addFields(
+ BSON("chunks" << chunk1.toConfigBSON())),
BSON("BadValue"
<< "This value should not be in a chunk config document")};
}());
@@ -350,20 +363,19 @@ TEST_F(CatalogCacheRefreshTest, FullLoadMissingChunkWithLowestVersion) {
chunk4.setName(OID::gen());
version.incMinor();
- return std::vector<BSONObj>{
- chunk2.toConfigBSON(), chunk3.toConfigBSON(), chunk4.toConfigBSON()};
+ return std::vector<ChunkType>{chunk2, chunk3, chunk4};
}();
// Return incomplete set of chunks three times, which is how frequently the catalog cache
// retries
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
+ expectCollectionAndChunksAggregation(
+ kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks);
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
+ expectCollectionAndChunksAggregation(
+ kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks);
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
+ expectCollectionAndChunksAggregation(
+ kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks);
try {
auto cm = *future.default_timed_get();
@@ -405,20 +417,19 @@ TEST_F(CatalogCacheRefreshTest, FullLoadMissingChunkWithHighestVersion) {
chunk4.setName(OID::gen());
version.incMinor();
- return std::vector<BSONObj>{
- chunk2.toConfigBSON(), chunk3.toConfigBSON(), chunk4.toConfigBSON()};
+ return std::vector<ChunkType>{chunk2, chunk3, chunk4};
}();
// Return incomplete set of chunks three times, which is how frequently the catalog cache
// retries
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
+ expectCollectionAndChunksAggregation(
+ kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks);
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
+ expectCollectionAndChunksAggregation(
+ kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks);
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
+ expectCollectionAndChunksAggregation(
+ kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks);
try {
auto cm = *future.default_timed_get();
@@ -463,20 +474,19 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithLowestVersion) {
chunk4.setName(OID::gen());
version.incMinor();
- return std::vector<BSONObj>{
- chunk2.toConfigBSON(), chunk3.toConfigBSON(), chunk4.toConfigBSON()};
+ return std::vector<ChunkType>{chunk2, chunk3, chunk4};
}();
// Return incomplete set of chunks three times, which is how frequently the catalog cache
// retries
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
+ expectCollectionAndChunksAggregation(
+ kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks);
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
+ expectCollectionAndChunksAggregation(
+ kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks);
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
+ expectCollectionAndChunksAggregation(
+ kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks);
try {
auto cm = *future.default_timed_get();
@@ -520,20 +530,19 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithHighestVersion) {
chunk4.setName(OID::gen());
version.incMinor();
- return std::vector<BSONObj>{
- chunk2.toConfigBSON(), chunk3.toConfigBSON(), chunk4.toConfigBSON()};
+ return std::vector<ChunkType>{chunk2, chunk3, chunk4};
}();
// Return incomplete set of chunks three times, which is how frequently the catalog cache
// retries
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
+ expectCollectionAndChunksAggregation(
+ kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks);
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
+ expectCollectionAndChunksAggregation(
+ kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks);
- expectGetCollection(epoch, shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
+ expectCollectionAndChunksAggregation(
+ kNss, epoch, UUID::gen(), shardKeyPattern, incompleteChunks);
try {
auto cm = *future.default_timed_get();
@@ -567,18 +576,28 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoad) {
{"1"});
chunk2.setName(OID::gen());
- return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
+ return std::vector<ChunkType>{chunk1, chunk2};
}();
// Return set of chunks, one of which has different epoch. Do it three times, which is how
// frequently the catalog cache retries.
- expectGetCollection(initialRoutingInfo.getVersion().epoch(), shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, inconsistentChunks);
- expectGetCollection(initialRoutingInfo.getVersion().epoch(), shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, inconsistentChunks);
-
- expectGetCollection(initialRoutingInfo.getVersion().epoch(), shardKeyPattern);
- expectFindSendBSONObjVector(kConfigHostAndPort, inconsistentChunks);
+ expectCollectionAndChunksAggregation(kNss,
+ initialRoutingInfo.getVersion().epoch(),
+ UUID::gen(),
+ shardKeyPattern,
+ inconsistentChunks);
+
+ expectCollectionAndChunksAggregation(kNss,
+ initialRoutingInfo.getVersion().epoch(),
+ UUID::gen(),
+ shardKeyPattern,
+ inconsistentChunks);
+
+ expectCollectionAndChunksAggregation(kNss,
+ initialRoutingInfo.getVersion().epoch(),
+ UUID::gen(),
+ shardKeyPattern,
+ inconsistentChunks);
try {
auto cm = *future.default_timed_get();
@@ -607,14 +626,23 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoadRecoveryAft
// the situation where a collection existed with epoch0, we started a refresh for that
// collection, the cursor yielded and while it yielded another node dropped the collection and
// recreated it with different epoch and chunks.
- expectGetCollection(oldVersion.epoch(), shardKeyPattern);
onFindCommand([&](const RemoteCommandRequest& request) {
- auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
- auto diffQuery = query_request_helper::makeFromFindCommandForTests(opMsg.body);
- ASSERT_BSONOBJ_EQ(BSON("ns" << kNss.ns() << "lastmod"
- << BSON("$gte" << Timestamp(oldVersion.majorVersion(),
- oldVersion.minorVersion()))),
- diffQuery->getFilter());
+ const auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
+ const auto aggRequest = unittest::assertGet(
+ aggregation_request_helper::parseFromBSONForTests(kNss, opMsg.body));
+ const auto& pipeline = aggRequest.getPipeline();
+
+ ASSERT_EQ(pipeline[1]["$facet"]["collWithNsIncremental"]
+ .Array()[0]["$match"]["lastmodEpoch"]
+ .OID(),
+ oldVersion.epoch());
+ ASSERT_BSONOBJ_EQ(pipeline[1]["$facet"]["collWithNsNonIncremental"]
+ .Array()[0]["$match"]["lastmodEpoch"]
+ .Obj(),
+ BSON("$ne" << oldVersion.epoch()));
+
+ const auto collBSON =
+ getDefaultCollectionType(oldVersion.epoch(), shardKeyPattern).toBSON();
oldVersion.incMajor();
ChunkType chunk1(kNss,
@@ -631,20 +659,29 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoadRecoveryAft
{"1"});
chunk3.setName(OID::gen());
- return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk3.toConfigBSON()};
+ const auto chunk1BSON = collBSON.addFields(BSON("chunks" << chunk1.toConfigBSON()));
+ const auto chunk3BSON = collBSON.addFields(BSON("chunks" << chunk3.toConfigBSON()));
+ return std::vector<BSONObj>{chunk1BSON, chunk3BSON};
});
// On the second retry attempt, return the correct set of chunks from the recreated collection
- expectGetCollection(newEpoch, shardKeyPattern);
-
ChunkVersion newVersion(5, 0, newEpoch, boost::none /* timestamp */);
onFindCommand([&](const RemoteCommandRequest& request) {
- // Ensure it is a differential query but starting from version zero (to fetch all the
- // chunks) since the incremental refresh above produced a different version
- auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
- auto diffQuery = query_request_helper::makeFromFindCommandForTests(opMsg.body);
- ASSERT_BSONOBJ_EQ(BSON("ns" << kNss.ns() << "lastmod" << BSON("$gte" << Timestamp(0, 0))),
- diffQuery->getFilter());
+ const auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
+ const auto aggRequest = unittest::assertGet(
+ aggregation_request_helper::parseFromBSONForTests(kNss, opMsg.body));
+ const auto& pipeline = aggRequest.getPipeline();
+
+ ASSERT_EQ(pipeline[1]["$facet"]["collWithNsIncremental"]
+ .Array()[0]["$match"]["lastmodEpoch"]
+ .OID(),
+ oldVersion.epoch());
+ ASSERT_BSONOBJ_EQ(pipeline[1]["$facet"]["collWithNsNonIncremental"]
+ .Array()[0]["$match"]["lastmodEpoch"]
+ .Obj(),
+ BSON("$ne" << oldVersion.epoch()));
+
+ const auto collBSON = getDefaultCollectionType(newEpoch, shardKeyPattern).toBSON();
ChunkType chunk1(kNss,
{shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)},
@@ -663,8 +700,10 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoadRecoveryAft
{"1"});
chunk3.setName(OID::gen());
- return std::vector<BSONObj>{
- chunk1.toConfigBSON(), chunk2.toConfigBSON(), chunk3.toConfigBSON()};
+ const auto chunk1BSON = collBSON.addFields(BSON("chunks" << chunk1.toConfigBSON()));
+ const auto chunk2BSON = collBSON.addFields(BSON("chunks" << chunk2.toConfigBSON()));
+ const auto chunk3BSON = collBSON.addFields(BSON("chunks" << chunk3.toConfigBSON()));
+ return std::vector<BSONObj>{chunk1BSON, chunk2BSON, chunk3BSON};
});
auto cm = *future.default_timed_get();
@@ -687,18 +726,27 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterCollectionEpochChange) {
auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
+ ChunkVersion oldVersion = initialRoutingInfo.getVersion();
ChunkVersion newVersion(1, 0, OID::gen(), boost::none /* timestamp */);
- // Return collection with a different epoch
- expectGetCollection(newVersion.epoch(), shardKeyPattern);
-
- // Return set of chunks, which represent a split
+ // Return collection with a different epoch and a set of chunks, which represent a split
onFindCommand([&](const RemoteCommandRequest& request) {
- // Ensure it is a differential query but starting from version zero
- auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
- auto diffQuery = query_request_helper::makeFromFindCommandForTests(opMsg.body);
- ASSERT_BSONOBJ_EQ(BSON("ns" << kNss.ns() << "lastmod" << BSON("$gte" << Timestamp(0, 0))),
- diffQuery->getFilter());
+ const auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
+ const auto aggRequest = unittest::assertGet(
+ aggregation_request_helper::parseFromBSONForTests(kNss, opMsg.body));
+ const auto& pipeline = aggRequest.getPipeline();
+
+ ASSERT_EQ(pipeline[1]["$facet"]["collWithNsIncremental"]
+ .Array()[0]["$match"]["lastmodEpoch"]
+ .OID(),
+ oldVersion.epoch());
+ ASSERT_BSONOBJ_EQ(pipeline[1]["$facet"]["collWithNsNonIncremental"]
+ .Array()[0]["$match"]["lastmodEpoch"]
+ .Obj(),
+ BSON("$ne" << oldVersion.epoch()));
+
+ const auto collBSON =
+ getDefaultCollectionType(newVersion.epoch(), shardKeyPattern).toBSON();
ChunkType chunk1(kNss,
{shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)},
@@ -713,7 +761,9 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterCollectionEpochChange) {
{"1"});
chunk2.setName(OID::gen());
- return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
+ const auto chunk1BSON = collBSON.addFields(BSON("chunks" << chunk1.toConfigBSON()));
+ const auto chunk2BSON = collBSON.addFields(BSON("chunks" << chunk2.toConfigBSON()));
+ return std::vector<BSONObj>{chunk1BSON, chunk2BSON};
});
auto cm = *future.default_timed_get();
@@ -736,17 +786,26 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterSplit) {
auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
- expectGetCollection(version.epoch(), shardKeyPattern);
-
// Return set of chunks, which represent a split
onFindCommand([&](const RemoteCommandRequest& request) {
- // Ensure it is a differential query
- auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
- auto diffQuery = query_request_helper::makeFromFindCommandForTests(opMsg.body);
+ const auto opMsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
+ const auto aggRequest = unittest::assertGet(
+ aggregation_request_helper::parseFromBSONForTests(kNss, opMsg.body));
+ const auto& pipeline = aggRequest.getPipeline();
+
ASSERT_BSONOBJ_EQ(
- BSON("ns" << kNss.ns() << "lastmod"
- << BSON("$gte" << Timestamp(version.majorVersion(), version.minorVersion()))),
- diffQuery->getFilter());
+ pipeline[1]["$facet"]["collWithNsIncremental"]
+ .Array()[1]["$lookup"]["pipeline"]
+ .Array()[0]["$match"]["lastmod"]
+ .Obj(),
+ BSON("$gte" << Timestamp(version.majorVersion(), version.minorVersion())));
+
+ ASSERT_EQ(pipeline[1]["$facet"]["collWithNsIncremental"]
+ .Array()[0]["$match"]["lastmodEpoch"]
+ .OID(),
+ version.epoch());
+
+ const auto collBSON = getDefaultCollectionType(version.epoch(), shardKeyPattern).toBSON();
version.incMajor();
ChunkType chunk1(
@@ -758,7 +817,9 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterSplit) {
kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"});
chunk2.setName(OID::gen());
- return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
+ const auto chunk1BSON = collBSON.addFields(BSON("chunks" << chunk1.toConfigBSON()));
+ const auto chunk2BSON = collBSON.addFields(BSON("chunks" << chunk2.toConfigBSON()));
+ return std::vector<BSONObj>{chunk1BSON, chunk2BSON};
});
auto cm = *future.default_timed_get();
@@ -784,23 +845,20 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveWithReshardingFieldsAdde
ChunkVersion expectedDestShardVersion;
- expectGetCollectionWithReshardingFields(version.epoch(), shardKeyPattern, reshardingUUID);
-
// Return set of chunks, which represent a move
- expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
- version.incMajor();
- expectedDestShardVersion = version;
- ChunkType chunk1(
- kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"1"});
- chunk1.setName(OID::gen());
+ version.incMajor();
+ expectedDestShardVersion = version;
+ ChunkType chunk1(
+ kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"1"});
+ chunk1.setName(OID::gen());
- version.incMinor();
- ChunkType chunk2(
- kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"});
- chunk2.setName(OID::gen());
+ version.incMinor();
+ ChunkType chunk2(
+ kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"});
+ chunk2.setName(OID::gen());
- return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
- }());
+ expectCollectionAndChunksAggregationWithReshardingFields(
+ version.epoch(), shardKeyPattern, reshardingUUID, {chunk1, chunk2});
auto cm = *future.default_timed_get();
ASSERT(cm.isSharded());
@@ -831,20 +889,17 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveLastChunkWithReshardingF
auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
// The collection type won't have resharding fields this time.
- expectGetCollection(version.epoch(), shardKeyPattern);
-
// Return set of chunks, which represent a move
- expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
- version.incMajor();
- ChunkType chunk1(kNss,
- {shardKeyPattern.getKeyPattern().globalMin(),
- shardKeyPattern.getKeyPattern().globalMax()},
- version,
- {"1"});
- chunk1.setName(OID::gen());
-
- return std::vector<BSONObj>{chunk1.toConfigBSON()};
- }());
+ version.incMajor();
+ ChunkType chunk1(
+ kNss,
+ {shardKeyPattern.getKeyPattern().globalMin(), shardKeyPattern.getKeyPattern().globalMax()},
+ version,
+ {"1"});
+ chunk1.setName(OID::gen());
+
+ expectCollectionAndChunksAggregation(
+ kNss, version.epoch(), UUID::gen(), shardKeyPattern, {chunk1});
auto cm = *future.default_timed_get();
ASSERT(cm.isSharded());
diff --git a/src/mongo/s/catalog_cache_test_fixture.cpp b/src/mongo/s/catalog_cache_test_fixture.cpp
index 0bb9c38ce9c..5154f98d3ed 100644
--- a/src/mongo/s/catalog_cache_test_fixture.cpp
+++ b/src/mongo/s/catalog_cache_test_fixture.cpp
@@ -180,8 +180,16 @@ ChunkManager CatalogCacheTestFixture::makeChunkManager(
auto future = scheduleRoutingInfoUnforcedRefresh(nss);
expectFindSendBSONObjVector(kConfigHostAndPort, {databaseBSON});
- expectFindSendBSONObjVector(kConfigHostAndPort, {collectionBSON});
- expectFindSendBSONObjVector(kConfigHostAndPort, initialChunks);
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
+ std::vector<BSONObj> aggResult;
+ std::transform(initialChunks.begin(),
+ initialChunks.end(),
+ std::back_inserter(aggResult),
+ [&collectionBSON](const auto& chunk) {
+ return collectionBSON.addFields(BSON("chunks" << chunk));
+ });
+ return aggResult;
+ }());
return *future.default_timed_get();
}
@@ -205,6 +213,29 @@ void CatalogCacheTestFixture::expectGetCollection(NamespaceString nss,
}());
}
+void CatalogCacheTestFixture::expectCollectionAndChunksAggregation(
+ NamespaceString nss,
+ OID epoch,
+ UUID uuid,
+ const ShardKeyPattern& shardKeyPattern,
+ const std::vector<ChunkType>& chunks) {
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
+ CollectionType collType(nss, epoch, Date_t::now(), uuid);
+ collType.setKeyPattern(shardKeyPattern.toBSON());
+ collType.setUnique(false);
+ const auto collObj = collType.toBSON();
+
+ std::vector<BSONObj> aggResult;
+ std::transform(chunks.begin(),
+ chunks.end(),
+ std::back_inserter(aggResult),
+ [&collObj](const auto& chunk) {
+ return collObj.addFields(BSON("chunks" << chunk.toConfigBSON()));
+ });
+ return aggResult;
+ }());
+}
+
ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShards(
NamespaceString nss) {
@@ -237,8 +268,11 @@ ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsI
expectGetDatabase(nss);
}
}
- expectGetCollection(nss, epoch, uuid, shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
+ CollectionType collType(nss, epoch, Date_t::now(), uuid);
+ collType.setKeyPattern(shardKeyPattern.toBSON());
+ collType.setUnique(false);
+
ChunkVersion version(1, 0, epoch, boost::none /* timestamp */);
ChunkType chunk1(
@@ -251,7 +285,10 @@ ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsI
chunk2.setName(OID::gen());
version.incMinor();
- return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
+ const auto collObj = collType.toBSON();
+ const auto chunk1Obj = collObj.addFields(BSON("chunks" << chunk1.toConfigBSON()));
+ const auto chunk2Obj = collObj.addFields(BSON("chunks" << chunk2.toConfigBSON()));
+ return std::vector<BSONObj>{chunk1Obj, chunk2Obj};
}());
return *future.default_timed_get();
diff --git a/src/mongo/s/catalog_cache_test_fixture.h b/src/mongo/s/catalog_cache_test_fixture.h
index a42af988697..a1fed893d2d 100644
--- a/src/mongo/s/catalog_cache_test_fixture.h
+++ b/src/mongo/s/catalog_cache_test_fixture.h
@@ -129,6 +129,11 @@ protected:
OID epoch,
UUID uuid,
const ShardKeyPattern& shardKeyPattern);
+ void expectCollectionAndChunksAggregation(NamespaceString nss,
+ OID epoch,
+ UUID uuid,
+ const ShardKeyPattern& shardKeyPattern,
+ const std::vector<ChunkType>& chunks);
const HostAndPort kConfigHostAndPort{"DummyConfig", 1234};
};
diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp
index 1befe47c719..5c61ba621df 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.cpp
+++ b/src/mongo/s/config_server_catalog_cache_loader.cpp
@@ -37,10 +37,10 @@
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/grid.h"
-#include "mongo/util/fail_point.h"
namespace mongo {
@@ -48,8 +48,6 @@ using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunk
namespace {
-MONGO_FAIL_POINT_DEFINE(hangBeforeReadingChunks);
-
/**
* Structure repsenting the generated query and sort order for a chunk diffing operation.
*/
@@ -86,63 +84,22 @@ QueryAndSort createConfigDiffQueryUuid(const UUID& uuid, ChunkVersion collection
*/
CollectionAndChangedChunks getChangedChunks(OperationContext* opCtx,
const NamespaceString& nss,
- ChunkVersion sinceVersion) {
+ ChunkVersion sinceVersion,
+ bool avoidSnapshotForRefresh) {
const auto catalogClient = Grid::get(opCtx)->catalogClient();
- // Decide whether to do a full or partial load based on the state of the collection
- const auto coll = catalogClient->getCollection(opCtx, nss);
- uassert(ErrorCodes::NamespaceNotFound,
- str::stream() << "Collection " << nss.ns() << " is dropped.",
- !coll.getDropped());
-
- // If the collection's epoch has changed, do a full refresh
- const ChunkVersion startingCollectionVersion = (sinceVersion.epoch() == coll.getEpoch())
- ? sinceVersion
- : ChunkVersion(0, 0, coll.getEpoch(), coll.getTimestamp());
-
- // Diff tracker should *always* find at least one chunk if collection exists
- const auto diffQuery = [&]() {
- if (coll.getTimestamp()) {
- return createConfigDiffQueryUuid(coll.getUuid(), startingCollectionVersion);
- } else {
- return createConfigDiffQueryNs(nss, startingCollectionVersion);
- }
- }();
-
- if (MONGO_unlikely(hangBeforeReadingChunks.shouldFail())) {
- LOGV2(5310504, "Hit hangBeforeReadingChunks failpoint");
- hangBeforeReadingChunks.pauseWhileSet(opCtx);
- }
-
- // TODO SERVER-53283: Remove once 5.0 has branched out.
- // Use a hint to make sure the query will use an index. This ensures that the query on
- // config.chunks will only execute if config.chunks is guaranteed to still have the same
- // metadata format as we inferred from the config.collections entry we read.
- // This is because when the config.chunks are patched up as part of the FCV upgrade (or
- // downgrade), first the ns_1_lastmod_1 index (or uuid_1_lastmod_1) is dropped, then the 'ns'
- // (or 'uuid') fields are unset from config.chunks. If the query is forced to use the expected
- // index, we can guarantee that the config.chunks we will read will have the expected format. If
- // it doesn't, it means that it's being patched-up. Then the query will fail and the refresh
- // will be retried, this time expecting the new metadata format.
- const auto hint = coll.getTimestamp()
- ? BSON(ChunkType::collectionUUID() << 1 << ChunkType::lastmod() << 1)
- : BSON(ChunkType::ns() << 1 << ChunkType::lastmod() << 1);
-
- // Query the chunks which have changed
- repl::OpTime opTime;
- const std::vector<ChunkType> changedChunks = uassertStatusOK(
- Grid::get(opCtx)->catalogClient()->getChunks(opCtx,
- diffQuery.query,
- diffQuery.sort,
- boost::none,
- &opTime,
- repl::ReadConcernLevel::kMajorityReadConcern,
- hint));
-
- uassert(ErrorCodes::ConflictingOperationInProgress,
- "No chunks were found for the collection",
- !changedChunks.empty());
-
+ const auto readConcernLevel = !avoidSnapshotForRefresh
+ ? repl::ReadConcernLevel::kSnapshotReadConcern
+ : repl::ReadConcernLevel::kLocalReadConcern;
+ const auto afterClusterTime = (serverGlobalParams.clusterRole == ClusterRole::ConfigServer)
+ ? repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime()
+ : Grid::get(opCtx)->configOpTime();
+ const auto readConcern =
+ repl::ReadConcernArgs(LogicalTime(afterClusterTime.getTimestamp()), readConcernLevel);
+
+ auto collAndChunks =
+ catalogClient->getCollectionAndChunks(opCtx, nss, sinceVersion, readConcern);
+ const auto& coll = collAndChunks.first;
return CollectionAndChangedChunks{coll.getEpoch(),
coll.getTimestamp(),
coll.getUuid(),
@@ -151,7 +108,7 @@ CollectionAndChangedChunks getChangedChunks(OperationContext* opCtx,
coll.getUnique(),
coll.getReshardingFields(),
coll.getAllowMigrations(),
- std::move(changedChunks)};
+ std::move(collAndChunks.second)};
}
} // namespace
@@ -207,7 +164,7 @@ SemiFuture<CollectionAndChangedChunks> ConfigServerCatalogCacheLoader::getChunks
getGlobalServiceContext());
auto opCtx = tc->makeOperationContext();
- return getChangedChunks(opCtx.get(), nss, version);
+ return getChangedChunks(opCtx.get(), nss, version, _avoidSnapshotForRefresh);
})
.semi();
}
@@ -225,4 +182,8 @@ SemiFuture<DatabaseType> ConfigServerCatalogCacheLoader::getDatabase(StringData
.semi();
}
+void ConfigServerCatalogCacheLoader::setAvoidSnapshotForRefresh_ForTest() {
+ _avoidSnapshotForRefresh = true;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/config_server_catalog_cache_loader.h b/src/mongo/s/config_server_catalog_cache_loader.h
index d16ae428b7f..8c1384946f8 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.h
+++ b/src/mongo/s/config_server_catalog_cache_loader.h
@@ -55,9 +55,23 @@ public:
ChunkVersion version) override;
SemiFuture<DatabaseType> getDatabase(StringData dbName) override;
+ /**
+ * Don't use outside of unit_tests.
+ * TODO SERVER-54394 Remove this
+ */
+ void setAvoidSnapshotForRefresh_ForTest();
+
private:
// Thread pool to be used to perform metadata load
std::shared_ptr<ThreadPool> _executor;
+
+ /*
+ * If 'true' avoids using snapshot read concern when refreshing the cache. Only to be used by
+ * unit_tests that use the ephemeralForTesting storage engine, because currently it doesn't
+ * support snapshot read concern.
+ * TODO SERVER-54394 Remove this.
+ */
+ bool _avoidSnapshotForRefresh = false;
};
} // namespace mongo
diff --git a/src/mongo/s/query/sharded_agg_test_fixture.h b/src/mongo/s/query/sharded_agg_test_fixture.h
index 621a7e368a9..8fc5a618051 100644
--- a/src/mongo/s/query/sharded_agg_test_fixture.h
+++ b/src/mongo/s/query/sharded_agg_test_fixture.h
@@ -97,14 +97,7 @@ public:
// Mock the expected config server queries.
expectGetDatabase(nss);
- expectGetCollection(nss, epoch, UUID::gen(), shardKey);
- expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
- std::vector<BSONObj> response;
- for (auto&& chunk : chunkDistribution) {
- response.push_back(chunk.toConfigBSON());
- }
- return response;
- }());
+ expectCollectionAndChunksAggregation(nss, epoch, UUID::gen(), shardKey, chunkDistribution);
const auto cm = future.default_timed_get();
ASSERT(cm->isSharded());