diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2020-07-21 19:16:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-07 22:18:46 +0000 |
commit | 426ed301b2e61107e82822764a855feb27168742 (patch) | |
tree | 4f5a38722f2e658c80ba2be3da8935e9c84415e7 /src | |
parent | f0bd888de22307dad1825323cadd3b708f670ae0 (diff) | |
download | mongo-426ed301b2e61107e82822764a855feb27168742.tar.gz |
SERVER-49290 Support running $lookup locally on shard for config.cache.chunks.* namespaces
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 160 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.h | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_test.cpp | 176 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 50 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 11 |
7 files changed, 342 insertions, 71 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index f2b8f148779..6928d8da953 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -231,12 +231,16 @@ bool NamespaceString::isNamespaceAlwaysUnsharded() const { return true; if (ns() == "config.cache.databases" || ns() == "config.cache.collections" || - (db() == "config" && coll().startsWith("cache.chunks"))) + isConfigDotCacheDotChunks()) return true; return false; } +bool NamespaceString::isConfigDotCacheDotChunks() const { + return db() == "config" && coll().startsWith("cache.chunks."); +} + bool NamespaceString::isReplicated() const { if (isLocal()) { return false; diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 8f5f3feb49b..7c65d4c8871 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -280,6 +280,11 @@ public: bool isNamespaceAlwaysUnsharded() const; /** + * Returns whether the specified namespace is config.cache.chunks.<>. + */ + bool isConfigDotCacheDotChunks() const; + + /** * Returns whether a namespace is replicated, based only on its string value. One notable * omission is that map reduce `tmp.mr` collections may or may not be replicated. Callers must * decide how to handle that case separately. diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 95c33761454..25fe9668ce0 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -39,6 +39,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/matcher/expression_algo.h" #include "mongo/db/pipeline/document_path_support.h" +#include "mongo/db/pipeline/document_source_merge_gen.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/query/query_knobs_gen.h" @@ -50,6 +51,66 @@ namespace mongo { using boost::intrusive_ptr; using std::vector; +namespace { + +/** + * Constructs a query of the following shape: + * {$or: [ + * {'fieldName': {$eq: 'values[0]'}}, + * {'fieldName': {$eq: 'values[1]'}}, + * ... + * ]} + */ +BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& values) { + BSONObjBuilder orBuilder; + { + BSONArrayBuilder orPredicatesBuilder(orBuilder.subarrayStart("$or")); + for (auto&& value : values) { + orPredicatesBuilder.append(BSON(fieldName << BSON("$eq" << value))); + } + } + return orBuilder.obj(); +} + +void lookupPipeValidator(const Pipeline& pipeline) { + const auto& sources = pipeline.getSources(); + std::for_each(sources.begin(), sources.end(), [](auto& src) { + uassert(51047, + str::stream() << src->getSourceName() + << " is not allowed within a $lookup's sub-pipeline", + src->constraints().isAllowedInLookupPipeline()); + }); +} + +bool foreignShardedLookupAllowed() { + return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load(); +} + +// Parses $lookup 'from' field. The 'from' field must be a string or an object in the form of +// {from: {db: "config", coll: "cache.chunks.*}, ...}. +NamespaceString parseLookupFromAndResolveNamespace(const BSONElement& elem, StringData defaultDb) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "$lookup 'from' field must be either a string or an object, but found " + << typeName(elem.type()), + elem.type() == BSONType::String || elem.type() == BSONType::Object); + + if (elem.type() == BSONType::String) { + return NamespaceString(defaultDb, elem.valueStringData()); + } + + // Valdate the db and coll names. + auto spec = NamespaceSpec::parse({elem.fieldNameStringData()}, elem.embeddedObject()); + auto nss = NamespaceString(spec.getDb().value_or(""), spec.getColl().value_or("")); + uassert( + ErrorCodes::FailedToParse, + str::stream() << "$lookup with syntax {from: {db:<>, coll:<>},..} is not supported for db: " + << nss.db() << " and coll: " << nss.coll(), + nss.isConfigDotCacheDotChunks()); + return nss; +} + +} // namespace + DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, std::string as, const boost::intrusive_ptr<ExpressionContext>& expCtx) @@ -120,12 +181,7 @@ std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LitePars uassert(ErrorCodes::FailedToParse, str::stream() << "missing 'from' option to $lookup stage specification: " << specObj, fromElement); - uassert(ErrorCodes::FailedToParse, - str::stream() << "'from' option to $lookup must be a string, but was type " - << typeName(specObj["from"].type()), - fromElement.type() == BSONType::String); - - NamespaceString fromNss(nss.db(), fromElement.valueStringData()); + auto fromNss = parseLookupFromAndResolveNamespace(fromElement, nss.db()); uassert(ErrorCodes::InvalidNamespace, str::stream() << "invalid $lookup namespace: " << fromNss.ns(), fromNss.isValid()); @@ -176,18 +232,19 @@ const char* DocumentSourceLookUp::getSourceName() const { } StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { - // If executing on mongos and the foreign collection is sharded, then this stage can run on - // mongos or any shard. - HostTypeRequirement hostRequirement = - (pExpCtx->inMongos && pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs)) - ? HostTypeRequirement::kNone - : HostTypeRequirement::kPrimaryShard; - - const bool foreignShardedAllowed = - getTestCommandsEnabled() && internalQueryAllowShardedLookup.load(); - if (!foreignShardedAllowed) { - // Always run on the primary shard. - hostRequirement = HostTypeRequirement::kPrimaryShard; + HostTypeRequirement hostRequirement; + if (_fromNs.isConfigDotCacheDotChunks()) { + // $lookup from config.cache.chunks* namespaces is permitted to run on each individual + // shard, rather than just the primary, since each shard should have an identical copy of + // the namespace. + hostRequirement = HostTypeRequirement::kAnyShard; + } else { + // When $lookup on sharded foreign collections is allowed, the foreign collection is + // sharded, and the stage is executing on mongos, the stage can run on mongos or any shard. + hostRequirement = (foreignShardedLookupAllowed() && pExpCtx->inMongos && + pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs)) + ? HostTypeRequirement::kNone + : HostTypeRequirement::kPrimaryShard; } // By default, $lookup is allowed in a transaction and does not use disk. @@ -213,42 +270,6 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { return constraints; } -namespace { - -/** - * Constructs a query of the following shape: - * {$or: [ - * {'fieldName': {$eq: 'values[0]'}}, - * {'fieldName': {$eq: 'values[1]'}}, - * ... - * ]} - */ -BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& values) { - BSONObjBuilder orBuilder; - { - BSONArrayBuilder orPredicatesBuilder(orBuilder.subarrayStart("$or")); - for (auto&& value : values) { - orPredicatesBuilder.append(BSON(fieldName << BSON("$eq" << value))); - } - } - return orBuilder.obj(); -} - -void lookupPipeValidator(const Pipeline& pipeline) { - const auto& sources = pipeline.getSources(); - std::for_each(sources.begin(), sources.end(), [](auto& src) { - uassert(51047, - str::stream() << src->getSourceName() - << " is not allowed within a $lookup's sub-pipeline", - src->constraints().isAllowedInLookupPipeline()); - }); -} - -bool foreignShardedLookupAllowed() { - return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load(); -} -} // namespace - DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() { if (_unwindSrc) { return unwindResult(); @@ -685,6 +706,12 @@ void DocumentSourceLookUp::initializeResolvedIntrospectionPipeline() { void DocumentSourceLookUp::serializeToArray( std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { Document doc; + + // Support alternative $lookup from config.cache.chunks* namespaces. + auto fromValue = (pExpCtx->ns.db() == _fromNs.db()) + ? Value(_fromNs.coll()) + : Value(Document{{"db", _fromNs.db()}, {"coll", _fromNs.coll()}}); + if (wasConstructedWithPipelineSyntax()) { MutableDocument exprList; for (auto letVar : _letVariables) { @@ -698,13 +725,13 @@ void DocumentSourceLookUp::serializeToArray( } doc = Document{{getSourceName(), - Document{{"from", _fromNs.coll()}, + Document{{"from", fromValue}, {"as", _as.fullPath()}, {"let", exprList.freeze()}, {"pipeline", pipeline}}}}; } else { doc = Document{{getSourceName(), - {Document{{"from", _fromNs.coll()}, + {Document{{"from", fromValue}, {"as", _as.fullPath()}, {"localField", _localField->fullPath()}, {"foreignField", _foreignField->fullPath()}}}}}; @@ -781,6 +808,18 @@ DepsTracker::State DocumentSourceLookUp::getDependencies(DepsTracker* deps) cons return DepsTracker::State::SEE_NEXT; } +boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceLookUp::distributedPlanLogic() { + if (_fromExpCtx->ns.isConfigDotCacheDotChunks()) { + // When $lookup reads from config.cache.chunks.* namespaces, it should run on each + // individual shard in parallel. This is a special case, and atypical for standard $lookup + // since a full copy of config.cache.chunks.* collections exists on all shards. + return boost::none; + } + + // {shardsStage, mergingStage, sortPattern} + return DistributedPlanLogic{nullptr, this, boost::none}; +} + void DocumentSourceLookUp::detachFromOperationContext() { if (_pipeline) { // We have a pipeline we're going to be executing across multiple calls to getNext(), so we @@ -845,14 +884,17 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson( continue; } + if (argName == "from") { + fromNs = parseLookupFromAndResolveNamespace(argument, pExpCtx->ns.db()); + continue; + } + uassert(ErrorCodes::FailedToParse, str::stream() << "$lookup argument '" << argName << "' must be a string, found " << argument << ": " << argument.type(), argument.type() == BSONType::String); - if (argName == "from") { - fromNs = NamespaceString(pExpCtx->ns.db().toString() + '.' + argument.String()); - } else if (argName == "as") { + if (argName == "as") { as = argument.String(); } else if (argName == "localField") { localField = argument.String(); diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index e36be9f2762..63e17f643f5 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -105,10 +105,7 @@ public: DepsTracker::State getDependencies(DepsTracker* deps) const final; - boost::optional<DistributedPlanLogic> distributedPlanLogic() final { - // {shardsStage, mergingStage, sortPattern} - return DistributedPlanLogic{nullptr, this, boost::none}; - } + boost::optional<DistributedPlanLogic> distributedPlanLogic() final; void addInvolvedCollections(stdx::unordered_set<NamespaceString>* collectionNames) const final; diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 781248b4930..baa1fd032d0 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -30,12 +30,14 @@ #include "mongo/platform/basic.h" #include <boost/intrusive_ptr.hpp> +#include <boost/optional/optional_io.hpp> #include <deque> #include <vector> #include "mongo/bson/bsonmisc.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/unordered_fields_bsonobj_comparator.h" #include "mongo/db/exec/document_value/document.h" #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/exec/document_value/value.h" @@ -430,6 +432,180 @@ TEST_F(DocumentSourceLookUpTest, ShouldBeAbleToReParseSerializedStage) { ASSERT_VALUE_EQ(newSerialization[0], serialization[0]); } +// Tests that $lookup with special 'from' syntax from: {db: config, coll: cache.chunks.*} can +// be round tripped. +TEST_F(DocumentSourceLookUpTest, LookupReParseSerializedStageWithFromDBAndColl) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("config", "cache.chunks.test.foo"); + expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ + {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + + auto originalBSON = BSON("$lookup" << BSON("from" << BSON("db" + << "config" + << "coll" + << "cache.chunks.test.foo") + << "localField" + << "x" + << "foreignField" + << "id" + << "as" + << "results")); + auto lookupStage = DocumentSourceLookUp::createFromBson(originalBSON.firstElement(), expCtx); + + // + // Serialize the $lookup stage and confirm contents. + // + vector<Value> serialization; + static const UnorderedFieldsBSONObjComparator kComparator; + lookupStage->serializeToArray(serialization); + auto serializedBSON = serialization[0].getDocument().toBson(); + ASSERT_EQ(kComparator.compare(serializedBSON, originalBSON), 0); + + auto roundTripped = DocumentSourceLookUp::createFromBson(serializedBSON.firstElement(), expCtx); + + vector<Value> newSerialization; + roundTripped->serializeToArray(newSerialization); + + ASSERT_EQ(newSerialization.size(), 1UL); + ASSERT_VALUE_EQ(newSerialization[0], serialization[0]); +} + +// Tests that $lookup with 'let' and special 'from' syntax from: {db: config, coll: cache.chunks.*} +// can be round tripped. +TEST_F(DocumentSourceLookUpTest, LookupWithLetReParseSerializedStageWithFromDBAndColl) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("config", "cache.chunks.test.foo"); + expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ + {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + + auto originalBSON = + BSON("$lookup" << BSON("from" << BSON("db" + << "config" + << "coll" + << "cache.chunks.test.foo") + << "let" + << BSON("local_x" + << "$x") + << "pipeline" << BSON_ARRAY(BSON("$match" << BSON("x" << 1))) + << "as" + << "as")); + auto lookupStage = DocumentSourceLookUp::createFromBson(originalBSON.firstElement(), expCtx); + + // + // Serialize the $lookup stage and confirm contents. + // + vector<Value> serialization; + static const UnorderedFieldsBSONObjComparator kComparator; + lookupStage->serializeToArray(serialization); + auto serializedBSON = serialization[0].getDocument().toBson(); + ASSERT_EQ(kComparator.compare(serializedBSON, originalBSON), 0); + + auto roundTripped = DocumentSourceLookUp::createFromBson(serializedBSON.firstElement(), expCtx); + + vector<Value> newSerialization; + roundTripped->serializeToArray(newSerialization); + + ASSERT_EQ(newSerialization.size(), 1UL); + ASSERT_VALUE_EQ(newSerialization[0], serialization[0]); +} + +// $lookup : {from : {db: <>, coll: <>}} syntax doesn't work for a namespace that isn't +// config.cache.chunks*. +TEST_F(DocumentSourceLookUpTest, RejectsPipelineFromDBAndCollWithBadDBAndColl) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ + {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + + ASSERT_THROWS_CODE( + DocumentSourceLookUp::createFromBson( + fromjson("{$lookup: {from: {db: 'test', coll: 'coll'}, as: 'as', pipeline: []}}") + .firstElement(), + expCtx), + AssertionException, + ErrorCodes::FailedToParse); +} + +// $lookup : {from : {db: <>, coll: <>}} syntax doesn't work for a namespace when "coll" is +// "cache.chunks.*" but "db" is not "config". +TEST_F(DocumentSourceLookUpTest, RejectsPipelineFromDBAndCollWithBadColl) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("config", "coll"); + expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ + {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + + ASSERT_THROWS_CODE( + DocumentSourceLookUp::createFromBson( + fromjson("{$lookup: {from: {db: 'config', coll: 'coll'}, as: 'as', pipeline: []}}") + .firstElement(), + expCtx), + AssertionException, + ErrorCodes::FailedToParse); +} + +// $lookup : {from : {db: <>, coll: <>}} syntax fails when "db" is config but "coll" is +// not "cache.chunks.*". +TEST_F(DocumentSourceLookUpTest, RejectsPipelineFromDBAndCollWithBadDB) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "cache.chunks.test.foo"); + expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ + {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + + ASSERT_THROWS_CODE(DocumentSourceLookUp::createFromBson( + fromjson("{$lookup: {from: {db: 'test', coll: 'cache.chunks.test.foo'}, " + "as: 'as', pipeline: []}}") + .firstElement(), + expCtx), + AssertionException, + ErrorCodes::FailedToParse); +} + +// Tests that $lookup distributedPlanLogic() is boost::none, allowing for the stage to run on each +// shard, when it reads from config.cache.chunks.* namespaces using from: {db: <> , coll: <> } +// syntax. +TEST_F(DocumentSourceLookUpTest, FromDBAndCollDistributedPlanLogic) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("config", "cache.chunks.test.foo"); + expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ + {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + + auto lookupStage = DocumentSourceLookUp::createFromBson( + BSON("$lookup" << BSON("from" << BSON("db" + << "config" + << "coll" + << "cache.chunks.test.foo") + << "let" + << BSON("local_x" + << "$x") + << "pipeline" << BSON_ARRAY(BSON("$match" << BSON("x" << 1))) + << "as" + << "as")) + .firstElement(), + expCtx); + + ASSERT(!lookupStage->distributedPlanLogic()); +} + +// Tests $lookup distributedPlanLogic() is prohibited from executing on the shardsStage for standard +// $lookup with from: <string> syntax. +TEST_F(DocumentSourceLookUpTest, LookupDistributedPlanLogic) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ + {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + + auto lookupStage = DocumentSourceLookUp::createFromBson( + BSON("$lookup" << BSON("from" + << "coll" + << "pipeline" << BSON_ARRAY(BSON("$match" << BSON("x" << 1))) << "as" + << "as")) + .firstElement(), + expCtx); + ASSERT(lookupStage->distributedPlanLogic()); + ASSERT(lookupStage->distributedPlanLogic()->shardsStage == nullptr); + ASSERT(lookupStage->distributedPlanLogic()->mergingStage != nullptr); +} + TEST(MakeMatchStageFromInput, NonArrayValueUsesEqQuery) { auto input = Document{{"local", 1}}; BSONObj matchStage = DocumentSourceLookUp::makeMatchStageFromInput( diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 239924ce42e..1328409f11c 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -2321,6 +2321,11 @@ public: virtual string shardPipeJson() = 0; virtual string mergePipeJson() = 0; + // Allows tests to override the default resolvedNamespaces. + virtual NamespaceString getLookupCollNs() { + return NamespaceString("a", "lookupColl"); + } + BSONObj pipelineFromJsonArray(const string& array) { return fromjson("{pipeline: " + array + "}"); } @@ -2342,7 +2347,7 @@ public: // For $graphLookup and $lookup, we have to populate the resolvedNamespaces so that the // operations will be able to have a resolved view definition. - NamespaceString lookupCollNs("a", "lookupColl"); + auto lookupCollNs = getLookupCollNs(); ctx->setResolvedNamespace(lookupCollNs, {lookupCollNs, std::vector<BSONObj>{}}); // Test that we can both split the pipeline and reassemble it into its original form. @@ -2390,6 +2395,47 @@ class Empty : public Base { } }; +// Since each shard has an identical copy of config.cache.chunks.* namespaces, $lookup from +// config.cache.chunks.* should run on each shard in parallel. +namespace lookupFromShardsInParallel { +class LookupWithDBAndColl : public Base { + string inputPipeJson() { + return "[{$lookup: {from: {db: 'config', coll: 'cache.chunks.test.foo'}, as: 'results', " + "localField: 'x', foreignField: '_id'}}]"; + } + string shardPipeJson() { + return inputPipeJson(); + } + + string mergePipeJson() { + return "[]"; + } + + NamespaceString getLookupCollNs() override { + return {"config", "cache.chunks.test.foo"}; + } +}; + +class LookupWithLetWithDBAndColl : public Base { + string inputPipeJson() { + return "[{$lookup: {from: {db: 'config', coll: 'cache.chunks.test.foo'}, as: 'results', " + "let: {x_field: '$x'}, pipeline: []}}]"; + } + string shardPipeJson() { + return inputPipeJson(); + } + + string mergePipeJson() { + return "[]"; + } + + NamespaceString getLookupCollNs() override { + return {"config", "cache.chunks.test.foo"}; + } +}; + +} // namespace lookupFromShardsInParallel + namespace moveFinalUnwindFromShardsToMerger { class OneUnwind : public Base { @@ -4030,6 +4076,8 @@ public: add<Optimizations::Sharded::limitFieldsSentFromShardsToMerger:: ShardedMatchSortProjLimBecomesMatchTopKSortProj>(); add<Optimizations::Sharded::limitFieldsSentFromShardsToMerger::ShardAlreadyExhaustive>(); + add<Optimizations::Sharded::lookupFromShardsInParallel::LookupWithDBAndColl>(); + add<Optimizations::Sharded::lookupFromShardsInParallel::LookupWithLetWithDBAndColl>(); add<Optimizations::Sharded::needsPrimaryShardMerger::Out>(); add<Optimizations::Sharded::needsPrimaryShardMerger::MergeWithUnshardedCollection>(); add<Optimizations::Sharded::needsPrimaryShardMerger::MergeWithShardedCollection>(); diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 1058c618cfd..e9b0e3f48ae 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -1146,13 +1146,12 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* owne return shardVersionRetry( expCtx->opCtx, catalogCache, expCtx->ns, "targeting pipeline to attach cursors"_sd, [&]() { auto pipelineToTarget = pipeline->clone(); - if (!allowTargetingShards || expCtx->ns.db() == "local") { - // If the db is local, this may be a change stream examining the oplog. We know the - // oplog (and any other local collections) will not be sharded. - return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( - pipelineToTarget.release()); + if (allowTargetingShards && !expCtx->ns.isConfigDotCacheDotChunks() && + expCtx->ns.db() != "local") { + return targetShardsAndAddMergeCursors(expCtx, pipelineToTarget.release()); } - return targetShardsAndAddMergeCursors(expCtx, pipelineToTarget.release()); + return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( + pipelineToTarget.release()); }); } |