diff options
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 39 |
1 files changed, 29 insertions, 10 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index ac2173ae50c..ec84917b5c5 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -107,7 +107,7 @@ using write_ops::InsertCommandRequest; namespace { /** - * Extracts a prefix of 'DocumentSourceGroup' and 'DocumentSourceLookUp' stages from the given + * Finds a prefix of 'DocumentSourceGroup' and 'DocumentSourceLookUp' stages from the given * pipeline to prepare for pushdown of $group and $lookup into the inner query layer so that it * can be executed using SBE. * Group stages are extracted from the pipeline when all of the following conditions are met: @@ -121,11 +121,11 @@ namespace { * - The $lookup uses only the 'localField'/'foreignField' syntax (no pipelines). * - The foreign collection is neither sharded nor a view. */ -std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleStagesForPushdown( +std::vector<std::unique_ptr<InnerPipelineStageInterface>> findSbeCompatibleStagesForPushdown( const intrusive_ptr<ExpressionContext>& expCtx, const MultipleCollectionAccessor& collections, const CanonicalQuery* cq, - Pipeline* pipeline) { + const Pipeline* pipeline) { // We will eventually use the extracted group stages to populate 'CanonicalQuery::pipeline' // which requires stages to be wrapped in an interface. std::vector<std::unique_ptr<InnerPipelineStageInterface>> stagesForPushdown; @@ -140,7 +140,7 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleSt return {}; } - auto&& sources = pipeline->getSources(); + const auto& sources = pipeline->getSources(); bool isMainCollectionSharded = false; if (const auto& mainColl = collections.getMainCollection()) { @@ -158,7 +158,7 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleSt internalQuerySlotBasedExecutionDisableLookupPushdown.load() || isMainCollectionSharded || collections.isAnySecondaryNamespaceAViewOrSharded(); - for (auto itr = sources.begin(); itr != sources.end();) { + for (auto itr = sources.begin(); itr != sources.end(); ++itr) { const bool isLastSource = itr->get() == sources.back().get(); // $group pushdown logic. @@ -170,7 +170,6 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleSt if (groupStage->sbeCompatible() && !groupStage->doingMerge()) { stagesForPushdown.push_back( std::make_unique<InnerPipelineStageImpl>(groupStage, isLastSource)); - sources.erase(itr++); continue; } break; @@ -187,7 +186,6 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleSt if (lookupStage->sbeCompatible()) { stagesForPushdown.push_back( std::make_unique<InnerPipelineStageImpl>(lookupStage, isLastSource)); - sources.erase(itr++); continue; } break; @@ -199,6 +197,21 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleSt return stagesForPushdown; } +/** + * Removes the first 'stagesToRemove' stages from the pipeline. This function is meant to be paired + * with a call to findSbeCompatibleStagesForPushdown() - the caller must first get the stages to + * push down, then remove them. + */ +void trimPipelineStages(Pipeline* pipeline, size_t stagesToRemove) { + auto& sources = pipeline->getSources(); + tassert(7087104, + "stagesToRemove must be <= number of pipeline sources", + stagesToRemove <= sources.size()); + for (size_t i = 0; i < stagesToRemove; ++i) { + sources.erase(sources.begin()); + } +} + StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor( const intrusive_ptr<ExpressionContext>& expCtx, const MultipleCollectionAccessor& collections, @@ -296,9 +309,15 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe return getExecutorFind(expCtx->opCtx, collections, std::move(cq.getValue()), - [&](auto* canonicalQuery) { - canonicalQuery->setPipeline(extractSbeCompatibleStagesForPushdown( - expCtx, collections, canonicalQuery, pipeline)); + [&](auto* canonicalQuery, bool attachOnly) { + if (attachOnly) { + canonicalQuery->setPipeline(findSbeCompatibleStagesForPushdown( + expCtx, collections, canonicalQuery, pipeline)); + } else { + // Not attaching - we need to trim the already pushed down + // pipeline stages from the pipeline. + trimPipelineStages(pipeline, canonicalQuery->pipeline().size()); + } }, permitYield, plannerOpts); |