diff options
author | Mihai Andrei <mihai.andrei@10gen.com> | 2022-02-02 22:54:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-02 23:49:05 +0000 |
commit | 3f7f9f57be3b127a6334c584b8a1795480baf1ae (patch) | |
tree | 515cd36f91d21d51ad156904338be17efd2984e6 /src/mongo/db/pipeline/pipeline_d.cpp | |
parent | 3731ee31ba39a831e59bf233a562e3120497c392 (diff) | |
download | mongo-3f7f9f57be3b127a6334c584b8a1795480baf1ae.tar.gz |
SERVER-58437 Implement pushdown logic for $lookup
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 79 |
1 files changed, 59 insertions, 20 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 67d52f4e103..ecb2b13d0b5 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -65,6 +65,7 @@ #include "mongo/db/pipeline/document_source_geo_near_cursor.h" #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" +#include "mongo/db/pipeline/document_source_lookup.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_sample_from_random_cursor.h" @@ -100,48 +101,86 @@ using write_ops::InsertCommandRequest; namespace { /** - * Extracts a prefix of 'DocumentSourceGroup' stages from the given pipeline to prepare for - * pushdown of $group into the inner query layer so that it can be executed using SBE. Group stages - * are extracted from the pipeline under when all of the following conditions are met: + * Extracts a prefix of 'DocumentSourceGroup' and 'DocumentSourceLookUp' stages from the given + * pipeline to prepare for pushdown of $group and $lookup into the inner query layer so that it + * can be executed using SBE. + * Group stages are extracted from the pipeline when all of the following conditions are met: * 0. When the 'internalQueryForceClassicEngine' feature flag is 'false'. * 1. When 'allowDiskUse' is false. We currently don't support spilling in the SBE HashAgg * stage. This will change once that is supported when SERVER-58436 is complete. * 2. When the DocumentSourceGroup has 'doingMerge=false', this will change when we implement * hash table spilling in SERVER-58436. + * + * Lookup stages are extracted from the pipeline when all of the following conditions are met: + * 0. When the 'internalQueryForceClassicEngine' feature flag is 'false'. + * 1. When the 'featureFlagSBELookupPushdown' feature flag is 'true'. + * 2. When the 'internalEnableMultipleAutoGetCollections' flag is 'true' + * 3. The $lookup uses only the 'localField'/'foreignField' syntax (no pipelines). + * 4. The foreign collection is neither sharded nor a view. */ -std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleGroupsForPushdown( +std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleStagesForPushdown( const intrusive_ptr<ExpressionContext>& expCtx, - const CollectionPtr& collection, + const MultiCollection& collections, const CanonicalQuery* cq, Pipeline* pipeline) { // We will eventually use the extracted group stages to populate 'CanonicalQuery::pipeline' // which requires stages to be wrapped in an interface. - std::vector<std::unique_ptr<InnerPipelineStageInterface>> groupsForPushdown; + std::vector<std::unique_ptr<InnerPipelineStageInterface>> stagesForPushdown; // This handles the case of unionWith against an unknown collection. - if (collection == nullptr) { + if (collections.getMainCollection() == nullptr) { return {}; } - if (!feature_flags::gFeatureFlagSBEGroupPushdown.isEnabled( - serverGlobalParams.featureCompatibility) || - cq->getForceClassicEngine()) { + // No pushdown if we're using the classic engine. + if (cq->getForceClassicEngine()) { return {}; } auto&& sources = pipeline->getSources(); for (auto itr = sources.begin(); itr != sources.end();) { - auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get()); - if (!(groupStage && groupStage->sbeCompatible()) || groupStage->doingMerge()) { - // Only pushdown a prefix of group stages that are supported by sbe. + // $group pushdown logic. + if (auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get())) { + bool groupEligibleForPushdown = feature_flags::gFeatureFlagSBEGroupPushdown.isEnabled( + serverGlobalParams.featureCompatibility) && + groupStage->sbeCompatible() && !groupStage->doingMerge(); + if (groupEligibleForPushdown) { + stagesForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage)); + sources.erase(itr++); + continue; + } + break; + } + + // $lookup pushdown logic. + if (auto lookupStage = dynamic_cast<DocumentSourceLookUp*>(itr->get())) { + bool isForeignSharded = false; + bool isForeignView = false; + const auto& fromNs = lookupStage->getFromNs(); + const auto& foreignColl = collections.lookupCollection(fromNs); + if (foreignColl) { + isForeignSharded = foreignColl.isSharded(); + } else { + // If the right hand side targets a namespace that we can't find in + // 'collections', we infer that it is targeting a view. + isForeignView = true; + } + + bool lookupEligibleForPushdown = + feature_flags::gFeatureFlagSBELookupPushdown.isEnabledAndIgnoreFCV() && + internalEnableMultipleAutoGetCollections.load() && lookupStage->sbeCompatible() && + !isForeignSharded && !isForeignView; + uassert( + 5843700, "$lookup push down logic worked correctly", !lookupEligibleForPushdown); break; } - groupsForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage)); - sources.erase(itr++); + + // Current stage cannot be pushed down. + break; } - return groupsForPushdown; + return stagesForPushdown; } StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor( @@ -158,7 +197,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe const size_t plannerOpts, const MatchExpressionParser::AllowedFeatureSet& matcherFeatures, Pipeline* pipeline) { - const auto& collection = collections.getMainCollection(); + const auto& mainColl = collections.getMainCollection(); auto findCommand = std::make_unique<FindCommandRequest>(nss); query_request_helper::setTailableMode(expCtx->tailableMode, findCommand.get()); findCommand->setFilter(queryObj.getOwned()); @@ -240,7 +279,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe // example, if we have a document {a: [1,2]} and group by "a" a DISTINCT_SCAN on an "a" // index would produce one result for '1' and another for '2', which would be incorrect. auto distinctExecutor = getExecutorDistinct( - &collection, plannerOpts | QueryPlannerParams::STRICT_DISTINCT_ONLY, &parsedDistinct); + &mainColl, plannerOpts | QueryPlannerParams::STRICT_DISTINCT_ONLY, &parsedDistinct); if (!distinctExecutor.isOK()) { return distinctExecutor.getStatus().withContext( "Unable to use distinct scan to optimize $group stage"); @@ -257,8 +296,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe collections, std::move(cq.getValue()), [&](auto* canonicalQuery) { - canonicalQuery->setPipeline(extractSbeCompatibleGroupsForPushdown( - expCtx, collection, canonicalQuery, pipeline)); + canonicalQuery->setPipeline(extractSbeCompatibleStagesForPushdown( + expCtx, collections, canonicalQuery, pipeline)); }, permitYield, plannerOpts); |