summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline_d.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp68
1 files changed, 62 insertions, 6 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 7a81488aca9..57592236797 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -69,12 +69,14 @@
#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/inner_pipeline_stage_impl.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/skip_and_limit.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_executor_factory.h"
#include "mongo/db/query/plan_summary_stats.h"
+#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/query/sort_pattern.h"
#include "mongo/db/s/collection_sharding_state.h"
@@ -96,6 +98,50 @@ using std::unique_ptr;
using write_ops::InsertCommandRequest;
namespace {
+/**
+ * Extracts a prefix of 'DocumentSourceGroup' stages from the given pipeline to prepare for
+ * pushdown of $group into the inner query layer so that it can be executed using SBE. Group stages
+ * are extracted from the pipeline under when all of the following conditions are met:
+ * 0. When the 'internalQueryEnableSlotBasedExecutionEngine' feature flag is 'true'.
+ * 1. When 'allowDiskUse' is false. We currently don't support spilling in the SBE HashAgg
+ * stage. This will change once that is supported when SERVER-58436 is complete.
+ * 2. When there's only a single index other than the implicit '_id' index on the provided
+ * collection. This case is necessary because we don't currently support extending the
+ * QuerySolution with the 'postMultiPlan' QuerySolutionNode when the PlanCache is involved in
+ * the query. This will be resolved when SERVER-58429 is complete.
+ */
+std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleGroupsForPushdown(
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ const CollectionPtr& collection,
+ const CanonicalQuery* cq,
+ Pipeline* pipeline) {
+ // We will eventually use the extracted group stages to populate 'CanonicalQuery::pipeline'
+ // which requires stages to be wrapped in an interface.
+ std::vector<std::unique_ptr<InnerPipelineStageInterface>> groupsForPushdown;
+
+ const auto isSingleIndex =
+ collection && collection->getIndexCatalog()->numIndexesTotal(expCtx->opCtx) == 1;
+ if (!feature_flags::gFeatureFlagSBEGroupPushdown.isEnabled(
+ serverGlobalParams.featureCompatibility) ||
+ !cq->getEnableSlotBasedExecutionEngine() || expCtx->allowDiskUse || !isSingleIndex) {
+ return {};
+ }
+
+ auto&& sources = pipeline->getSources();
+
+ for (auto itr = sources.begin(); itr != sources.end();) {
+ auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get());
+ if (!(groupStage && groupStage->sbeCompatible())) {
+ // Only pushdown a prefix of group stages that are supported by sbe.
+ break;
+ }
+ groupsForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage));
+ sources.erase(itr++);
+ }
+
+ return groupsForPushdown;
+}
+
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor(
const intrusive_ptr<ExpressionContext>& expCtx,
const CollectionPtr& collection,
@@ -108,7 +154,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
boost::optional<std::string> groupIdForDistinctScan,
const AggregateCommandRequest* aggRequest,
const size_t plannerOpts,
- const MatchExpressionParser::AllowedFeatureSet& matcherFeatures) {
+ const MatchExpressionParser::AllowedFeatureSet& matcherFeatures,
+ Pipeline* pipeline) {
auto findCommand = std::make_unique<FindCommandRequest>(nss);
query_request_helper::setTailableMode(expCtx->tailableMode, findCommand.get());
findCommand->setFilter(queryObj.getOwned());
@@ -186,9 +233,16 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
}
}
- bool permitYield = true;
- return getExecutorFind(
- expCtx->opCtx, &collection, std::move(cq.getValue()), permitYield, plannerOpts);
+ auto permitYield = true;
+ return getExecutorFind(expCtx->opCtx,
+ &collection,
+ std::move(cq.getValue()),
+ [&](auto* canonicalQuery) {
+ canonicalQuery->setPipeline(extractSbeCompatibleGroupsForPushdown(
+ expCtx, collection, canonicalQuery, pipeline));
+ },
+ permitYield,
+ plannerOpts);
}
/**
@@ -991,7 +1045,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
rewrittenGroupStage->groupId(),
aggRequest,
plannerOpts,
- matcherFeatures);
+ matcherFeatures,
+ pipeline);
if (swExecutorGrouped.isOK()) {
// Any $limit stage before the $group stage should make the pipeline ineligible for this
@@ -1039,7 +1094,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
boost::none, /* groupIdForDistinctScan */
aggRequest,
plannerOpts,
- matcherFeatures);
+ matcherFeatures,
+ pipeline);
}
Timestamp PipelineD::getLatestOplogTimestamp(const Pipeline* pipeline) {