diff options
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 68 |
1 files changed, 62 insertions, 6 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 7a81488aca9..57592236797 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -69,12 +69,14 @@ #include "mongo/db/pipeline/document_source_sample_from_random_cursor.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/inner_pipeline_stage_impl.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/skip_and_limit.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/plan_summary_stats.h" +#include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/query/sort_pattern.h" #include "mongo/db/s/collection_sharding_state.h" @@ -96,6 +98,50 @@ using std::unique_ptr; using write_ops::InsertCommandRequest; namespace { +/** + * Extracts a prefix of 'DocumentSourceGroup' stages from the given pipeline to prepare for + * pushdown of $group into the inner query layer so that it can be executed using SBE. Group stages + * are extracted from the pipeline under when all of the following conditions are met: + * 0. When the 'internalQueryEnableSlotBasedExecutionEngine' feature flag is 'true'. + * 1. When 'allowDiskUse' is false. We currently don't support spilling in the SBE HashAgg + * stage. This will change once that is supported when SERVER-58436 is complete. + * 2. When there's only a single index other than the implicit '_id' index on the provided + * collection. This case is necessary because we don't currently support extending the + * QuerySolution with the 'postMultiPlan' QuerySolutionNode when the PlanCache is involved in + * the query. This will be resolved when SERVER-58429 is complete. + */ +std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleGroupsForPushdown( + const intrusive_ptr<ExpressionContext>& expCtx, + const CollectionPtr& collection, + const CanonicalQuery* cq, + Pipeline* pipeline) { + // We will eventually use the extracted group stages to populate 'CanonicalQuery::pipeline' + // which requires stages to be wrapped in an interface. + std::vector<std::unique_ptr<InnerPipelineStageInterface>> groupsForPushdown; + + const auto isSingleIndex = + collection && collection->getIndexCatalog()->numIndexesTotal(expCtx->opCtx) == 1; + if (!feature_flags::gFeatureFlagSBEGroupPushdown.isEnabled( + serverGlobalParams.featureCompatibility) || + !cq->getEnableSlotBasedExecutionEngine() || expCtx->allowDiskUse || !isSingleIndex) { + return {}; + } + + auto&& sources = pipeline->getSources(); + + for (auto itr = sources.begin(); itr != sources.end();) { + auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get()); + if (!(groupStage && groupStage->sbeCompatible())) { + // Only pushdown a prefix of group stages that are supported by sbe. + break; + } + groupsForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage)); + sources.erase(itr++); + } + + return groupsForPushdown; +} + StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor( const intrusive_ptr<ExpressionContext>& expCtx, const CollectionPtr& collection, @@ -108,7 +154,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe boost::optional<std::string> groupIdForDistinctScan, const AggregateCommandRequest* aggRequest, const size_t plannerOpts, - const MatchExpressionParser::AllowedFeatureSet& matcherFeatures) { + const MatchExpressionParser::AllowedFeatureSet& matcherFeatures, + Pipeline* pipeline) { auto findCommand = std::make_unique<FindCommandRequest>(nss); query_request_helper::setTailableMode(expCtx->tailableMode, findCommand.get()); findCommand->setFilter(queryObj.getOwned()); @@ -186,9 +233,16 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe } } - bool permitYield = true; - return getExecutorFind( - expCtx->opCtx, &collection, std::move(cq.getValue()), permitYield, plannerOpts); + auto permitYield = true; + return getExecutorFind(expCtx->opCtx, + &collection, + std::move(cq.getValue()), + [&](auto* canonicalQuery) { + canonicalQuery->setPipeline(extractSbeCompatibleGroupsForPushdown( + expCtx, collection, canonicalQuery, pipeline)); + }, + permitYield, + plannerOpts); } /** @@ -991,7 +1045,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep rewrittenGroupStage->groupId(), aggRequest, plannerOpts, - matcherFeatures); + matcherFeatures, + pipeline); if (swExecutorGrouped.isOK()) { // Any $limit stage before the $group stage should make the pipeline ineligible for this @@ -1039,7 +1094,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep boost::none, /* groupIdForDistinctScan */ aggRequest, plannerOpts, - matcherFeatures); + matcherFeatures, + pipeline); } Timestamp PipelineD::getLatestOplogTimestamp(const Pipeline* pipeline) { |