summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYoonsoo Kim <yoonsoo.kim@mongodb.com>2021-09-22 10:57:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-09-22 11:55:02 +0000
commit28fa5831f7b8e7b6ca1ea84fc7bcc7389b2a0ea4 (patch)
treeaefcd1eb910f2ad27af4c6034275870415145b04
parent7ff8d8762cd47376e16f67e712a8f69d95d3689d (diff)
downloadmongo-28fa5831f7b8e7b6ca1ea84fc7bcc7389b2a0ea4.tar.gz
SERVER-58427 Implement pushdown of $group for plans with one solution
-rw-r--r--jstests/noPassthroughWithMongod/group_pushdown.js148
-rw-r--r--src/mongo/db/commands/dbcommands_d.cpp1
-rw-r--r--src/mongo/db/commands/find_cmd.cpp12
-rw-r--r--src/mongo/db/db_raii_test.cpp6
-rw-r--r--src/mongo/db/dbhelpers.cpp8
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp68
-rw-r--r--src/mongo/db/query/canonical_query.h4
-rw-r--r--src/mongo/db/query/get_executor.cpp37
-rw-r--r--src/mongo/db/query/get_executor.h12
-rw-r--r--src/mongo/db/query/query_feature_flags.idl6
-rw-r--r--src/mongo/db/query/stage_types.cpp1
-rw-r--r--src/mongo/db/transaction_history_iterator.cpp7
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp1
-rw-r--r--src/mongo/dbtests/query_stage_multiplan.cpp8
14 files changed, 295 insertions, 24 deletions
diff --git a/jstests/noPassthroughWithMongod/group_pushdown.js b/jstests/noPassthroughWithMongod/group_pushdown.js
new file mode 100644
index 00000000000..271d7c20ef5
--- /dev/null
+++ b/jstests/noPassthroughWithMongod/group_pushdown.js
@@ -0,0 +1,148 @@
+/**
+ * Tests basic functionality of pushing $group into the find layer.
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/analyze_plan.js");
+
+const featureEnabled =
+ assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagSBEGroupPushdown: 1}))
+ .featureFlagSBEGroupPushdown.value;
+if (!featureEnabled) {
+ jsTestLog("Skipping test because the sbe group pushdown feature flag is disabled");
+ return;
+}
+
+const coll = db.group_pushdown;
+coll.drop();
+
+assert.commandWorked(coll.insert([
+ {"_id": 1, "item": "a", "price": 10, "quantity": 2, "date": ISODate("2014-01-01T08:00:00Z")},
+ {"_id": 2, "item": "b", "price": 20, "quantity": 1, "date": ISODate("2014-02-03T09:00:00Z")},
+ {"_id": 3, "item": "a", "price": 5, "quantity": 5, "date": ISODate("2014-02-03T09:05:00Z")},
+ {"_id": 4, "item": "b", "price": 10, "quantity": 10, "date": ISODate("2014-02-15T08:00:00Z")},
+ {"_id": 5, "item": "c", "price": 5, "quantity": 10, "date": ISODate("2014-02-15T09:05:00Z")},
+]));
+
+let assertGroupPushdown = function(coll, pipeline, expectedResults, expectedGroupCountInExplain) {
+ const explain = coll.explain().aggregate(pipeline);
+ // When $group isnever pushed down it be present as a stage in the 'winningPlan' of $cursor.
+ assert.eq(expectedGroupCountInExplain, getAggPlanStages(explain, "GROUP").length, explain);
+
+ let results = coll.aggregate(pipeline).toArray();
+ assert.sameMembers(results, expectedResults);
+};
+
+let assertNoGroupPushdown = function(coll, pipeline, expectedResults, options = {}) {
+ const explain = coll.explain().aggregate(pipeline, options);
+ assert.eq(null, getAggPlanStage(explain, "GROUP"), explain);
+
+ let resultNoGroupPushdown = coll.aggregate(pipeline, options).toArray();
+ assert.sameMembers(resultNoGroupPushdown, expectedResults);
+};
+
+let assertResultsMatchWithAndWithoutPushdown = function(
+ coll, pipeline, expectedResults, expectedGroupCountInExplain) {
+ // Make sure the provided pipeline is eligible for pushdown.
+ assertGroupPushdown(coll, pipeline, expectedResults, expectedGroupCountInExplain);
+
+ // Turn sbe off.
+ db.adminCommand({setParameter: 1, internalQueryEnableSlotBasedExecutionEngine: false});
+
+ // Sanity check the results when no pushdown happens.
+ let resultNoGroupPushdown = coll.aggregate(pipeline).toArray();
+ assert.sameMembers(resultNoGroupPushdown, expectedResults);
+
+ // Turn sbe on which will allow $group stages that contain supported accumulators to be pushed
+ // down under certain conditions.
+ db.adminCommand({setParameter: 1, internalQueryEnableSlotBasedExecutionEngine: true});
+
+ let resultWithGroupPushdown = coll.aggregate(pipeline).toArray();
+ assert.sameMembers(resultNoGroupPushdown, resultWithGroupPushdown);
+};
+
+// Baseline with SBE off.
+assertNoGroupPushdown(coll,
+ [{$group: {_id: "$item", s: {$sum: "$price"}}}],
+ [{_id: "a", s: 15}, {_id: "b", s: 30}, {_id: "c", s: 5}]);
+
+// Turn sbe on which will allow $group stages that contain supported accumulators to be pushed
+// down under certain conditions.
+db.adminCommand({setParameter: 1, internalQueryEnableSlotBasedExecutionEngine: true});
+
+// Try a pipeline with no group stage.
+assert.eq(
+ coll.aggregate([{$match: {item: "c"}}]).toArray(),
+ [{"_id": 5, "item": "c", "price": 5, "quantity": 10, "date": ISODate("2014-02-15T09:05:00Z")}]);
+
+// Run a simple $group with supported $sum accumulator, and check if it gets pushed down.
+assertResultsMatchWithAndWithoutPushdown(coll,
+ [{$group: {_id: "$item", s: {$sum: "$price"}}}],
+ [{_id: "a", s: 15}, {_id: "b", s: 30}, {_id: "c", s: 5}],
+ 1);
+
+// Two group stages both get pushed down.
+assertResultsMatchWithAndWithoutPushdown(
+ coll,
+ [{$group: {_id: "$item", s: {$sum: "$price"}}}, {$group: {_id: "$quantity", c: {$count: {}}}}],
+ [{_id: null, c: 3}],
+ 2);
+
+// Run a group with an unsupported accumultor and check that it doesn't get pushed down.
+assertNoGroupPushdown(coll, [{$group: {_id: "$item", s: {$stdDevSamp: "$quantity"}}}], [
+ {"_id": "a", "s": 2.1213203435596424},
+ {"_id": "b", "s": 6.363961030678928},
+ {"_id": "c", "s": null}
+]);
+
+// Run a simple group with $sum and object _id, check if it doesn't get pushed down.
+assertNoGroupPushdown(coll,
+ [{$group: {_id: {"i": "$item"}, s: {$sum: "$price"}}}],
+ [{_id: {i: "a"}, s: 15}, {_id: {i: "b"}, s: 30}, {_id: {i: "c"}, s: 5}]);
+
+// Spilling isn't supported yet so $group with 'allowDiskUse' true won't get pushed down.
+assertNoGroupPushdown(coll,
+ [{$group: {_id: "$item", s: {$sum: "$price"}}}],
+ [{"_id": "b", "s": 30}, {"_id": "a", "s": 15}, {"_id": "c", "s": 5}],
+ {allowDiskUse: true, cursor: {batchSize: 1}});
+
+// Run a pipeline with match, sort, group to check if the whole pipeline gets pushed down.
+assertGroupPushdown(coll,
+ [{$match: {item: "a"}}, {$sort: {price: 1}}, {$group: {_id: "$item"}}],
+ [{"_id": "a"}],
+ 1);
+
+// Make sure the DISTINCT_SCAN case where the sort is proided by an index still works and is not
+// executed in SBE.
+assert.commandWorked(coll.createIndex({item: 1}));
+let explain = coll.explain().aggregate([{$sort: {item: 1}}, {$group: {_id: "$item"}}]);
+assert.neq(null, getAggPlanStage(explain, "DISTINCT_SCAN"), explain);
+assert.eq(null, getAggPlanStage(explain, "SORT"), explain);
+assert.commandWorked(coll.dropIndex({item: 1}));
+
+// Time to see if indexes prevent pushdown. Add an index on item, and make sure we don't execute in
+// sbe because we won't support $group pushdown until SERVER-58429.
+assert.commandWorked(coll.createIndex({item: 1}));
+assertNoGroupPushdown(coll,
+ [{$group: {_id: "$item", s: {$sum: "$price"}}}],
+ [{"_id": "b", "s": 30}, {"_id": "a", "s": 15}, {"_id": "c", "s": 5}]);
+assert.commandWorked(coll.dropIndex({item: 1}));
+
+// Supported group and then a group with no supported accumulators.
+explain = coll.explain().aggregate([
+ {$group: {_id: "$item", s: {$sum: "$price"}}},
+ {$group: {_id: "$quantity", c: {$stdDevPop: "$price"}}}
+]);
+assert.neq(null, getAggPlanStage(explain, "GROUP"), explain);
+assert(explain.stages[1].hasOwnProperty("$group"));
+
+// A group with one supported and one unsupported accumulators.
+explain = coll.explain().aggregate(
+ [{$group: {_id: "$item", s: {$sum: "$price"}, stdev: {$stdDevPop: "$price"}}}]);
+assert.eq(null, getAggPlanStage(explain, "GROUP"), explain);
+assert(explain.stages[1].hasOwnProperty("$group"));
+
+// Leave sbe in the initial state.
+db.adminCommand({setParameter: 1, internalQueryEnableSlotBasedExecutionEngine: false});
+})();
diff --git a/src/mongo/db/commands/dbcommands_d.cpp b/src/mongo/db/commands/dbcommands_d.cpp
index 67a1549b4fc..c2ae35541ea 100644
--- a/src/mongo/db/commands/dbcommands_d.cpp
+++ b/src/mongo/db/commands/dbcommands_d.cpp
@@ -302,6 +302,7 @@ public:
auto exec = uassertStatusOK(getExecutor(opCtx,
&coll,
std::move(cq),
+ nullptr /* extractAndAttachPipelineStages */,
PlanYieldPolicy::YieldPolicy::YIELD_MANUAL,
QueryPlannerParams::NO_TABLE_SCAN));
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index a684f314bfe..4120b38cd6d 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -354,7 +354,11 @@ public:
// Get the execution plan for the query.
bool permitYield = true;
auto exec =
- uassertStatusOK(getExecutorFind(opCtx, &collection, std::move(cq), permitYield));
+ uassertStatusOK(getExecutorFind(opCtx,
+ &collection,
+ std::move(cq),
+ nullptr /* extractAndAttachPipelineStages */,
+ permitYield));
auto bodyBuilder = result->getBodyBuilder();
// Got the execution tree. Explain it.
@@ -489,7 +493,11 @@ public:
// Get the execution plan for the query.
bool permitYield = true;
auto exec =
- uassertStatusOK(getExecutorFind(opCtx, &collection, std::move(cq), permitYield));
+ uassertStatusOK(getExecutorFind(opCtx,
+ &collection,
+ std::move(cq),
+ nullptr /* extractAndAttachPipelineStages */,
+ permitYield));
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
diff --git a/src/mongo/db/db_raii_test.cpp b/src/mongo/db/db_raii_test.cpp
index 2a8b03c6319..076d8cbbe13 100644
--- a/src/mongo/db/db_raii_test.cpp
+++ b/src/mongo/db/db_raii_test.cpp
@@ -91,7 +91,11 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makeTailableQueryPlan(
std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
bool permitYield = true;
- auto swExec = getExecutorFind(opCtx, &collection, std::move(cq), permitYield);
+ auto swExec = getExecutorFind(opCtx,
+ &collection,
+ std::move(cq),
+ nullptr /* extractAndAttachPipelineStages */,
+ permitYield);
ASSERT_OK(swExec.getStatus());
return std::move(swExec.getValue());
}
diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp
index 7689ab42cba..49b87426069 100644
--- a/src/mongo/db/dbhelpers.cpp
+++ b/src/mongo/db/dbhelpers.cpp
@@ -124,8 +124,12 @@ RecordId Helpers::findOne(OperationContext* opCtx,
unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
size_t options = requireIndex ? QueryPlannerParams::NO_TABLE_SCAN : QueryPlannerParams::DEFAULT;
- auto exec = uassertStatusOK(getExecutor(
- opCtx, &collection, std::move(cq), PlanYieldPolicy::YieldPolicy::NO_YIELD, options));
+ auto exec = uassertStatusOK(getExecutor(opCtx,
+ &collection,
+ std::move(cq),
+ nullptr /* extractAndAttachPipelineStages */,
+ PlanYieldPolicy::YieldPolicy::NO_YIELD,
+ options));
PlanExecutor::ExecState state;
BSONObj obj;
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 7a81488aca9..57592236797 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -69,12 +69,14 @@
#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/inner_pipeline_stage_impl.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/skip_and_limit.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_executor_factory.h"
#include "mongo/db/query/plan_summary_stats.h"
+#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/query/sort_pattern.h"
#include "mongo/db/s/collection_sharding_state.h"
@@ -96,6 +98,50 @@ using std::unique_ptr;
using write_ops::InsertCommandRequest;
namespace {
+/**
+ * Extracts a prefix of 'DocumentSourceGroup' stages from the given pipeline to prepare for
+ * pushdown of $group into the inner query layer so that it can be executed using SBE. Group stages
+ * are extracted from the pipeline under when all of the following conditions are met:
+ * 0. When the 'internalQueryEnableSlotBasedExecutionEngine' feature flag is 'true'.
+ * 1. When 'allowDiskUse' is false. We currently don't support spilling in the SBE HashAgg
+ * stage. This will change once that is supported when SERVER-58436 is complete.
+ * 2. When there's only a single index other than the implicit '_id' index on the provided
+ * collection. This case is necessary because we don't currently support extending the
+ * QuerySolution with the 'postMultiPlan' QuerySolutionNode when the PlanCache is involved in
+ * the query. This will be resolved when SERVER-58429 is complete.
+ */
+std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleGroupsForPushdown(
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ const CollectionPtr& collection,
+ const CanonicalQuery* cq,
+ Pipeline* pipeline) {
+ // We will eventually use the extracted group stages to populate 'CanonicalQuery::pipeline'
+ // which requires stages to be wrapped in an interface.
+ std::vector<std::unique_ptr<InnerPipelineStageInterface>> groupsForPushdown;
+
+ const auto isSingleIndex =
+ collection && collection->getIndexCatalog()->numIndexesTotal(expCtx->opCtx) == 1;
+ if (!feature_flags::gFeatureFlagSBEGroupPushdown.isEnabled(
+ serverGlobalParams.featureCompatibility) ||
+ !cq->getEnableSlotBasedExecutionEngine() || expCtx->allowDiskUse || !isSingleIndex) {
+ return {};
+ }
+
+ auto&& sources = pipeline->getSources();
+
+ for (auto itr = sources.begin(); itr != sources.end();) {
+ auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get());
+ if (!(groupStage && groupStage->sbeCompatible())) {
+ // Only pushdown a prefix of group stages that are supported by sbe.
+ break;
+ }
+ groupsForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage));
+ sources.erase(itr++);
+ }
+
+ return groupsForPushdown;
+}
+
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor(
const intrusive_ptr<ExpressionContext>& expCtx,
const CollectionPtr& collection,
@@ -108,7 +154,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
boost::optional<std::string> groupIdForDistinctScan,
const AggregateCommandRequest* aggRequest,
const size_t plannerOpts,
- const MatchExpressionParser::AllowedFeatureSet& matcherFeatures) {
+ const MatchExpressionParser::AllowedFeatureSet& matcherFeatures,
+ Pipeline* pipeline) {
auto findCommand = std::make_unique<FindCommandRequest>(nss);
query_request_helper::setTailableMode(expCtx->tailableMode, findCommand.get());
findCommand->setFilter(queryObj.getOwned());
@@ -186,9 +233,16 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
}
}
- bool permitYield = true;
- return getExecutorFind(
- expCtx->opCtx, &collection, std::move(cq.getValue()), permitYield, plannerOpts);
+ auto permitYield = true;
+ return getExecutorFind(expCtx->opCtx,
+ &collection,
+ std::move(cq.getValue()),
+ [&](auto* canonicalQuery) {
+ canonicalQuery->setPipeline(extractSbeCompatibleGroupsForPushdown(
+ expCtx, collection, canonicalQuery, pipeline));
+ },
+ permitYield,
+ plannerOpts);
}
/**
@@ -991,7 +1045,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
rewrittenGroupStage->groupId(),
aggRequest,
plannerOpts,
- matcherFeatures);
+ matcherFeatures,
+ pipeline);
if (swExecutorGrouped.isOK()) {
// Any $limit stage before the $group stage should make the pipeline ineligible for this
@@ -1039,7 +1094,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
boost::none, /* groupIdForDistinctScan */
aggRequest,
plannerOpts,
- matcherFeatures);
+ matcherFeatures,
+ pipeline);
}
Timestamp PipelineD::getLatestOplogTimestamp(const Pipeline* pipeline) {
diff --git a/src/mongo/db/query/canonical_query.h b/src/mongo/db/query/canonical_query.h
index 1e2a29d6def..49e2858f11e 100644
--- a/src/mongo/db/query/canonical_query.h
+++ b/src/mongo/db/query/canonical_query.h
@@ -228,6 +228,10 @@ public:
return _expCtx.get();
}
+ void setPipeline(std::vector<std::unique_ptr<InnerPipelineStageInterface>> pipeline) {
+ _pipeline = std::move(pipeline);
+ }
+
const std::vector<std::unique_ptr<InnerPipelineStageInterface>>& pipeline() const {
return _pipeline;
}
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 2543e2a53c7..be4e57a46da 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -1083,9 +1083,14 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getSlotBasedExe
OperationContext* opCtx,
const CollectionPtr* collection,
std::unique_ptr<CanonicalQuery> cq,
+ std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages,
PlanYieldPolicy::YieldPolicy requestedYieldPolicy,
size_t plannerOptions) {
invariant(cq);
+ if (extractAndAttachPipelineStages) {
+ extractAndAttachPipelineStages(cq.get());
+ }
+
auto nss = cq->nss();
auto yieldPolicy = makeSbeYieldPolicy(opCtx, requestedYieldPolicy, collection, nss);
SlotBasedPrepareExecutionHelper helper{
@@ -1159,12 +1164,17 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor(
OperationContext* opCtx,
const CollectionPtr* collection,
std::unique_ptr<CanonicalQuery> canonicalQuery,
+ std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages,
PlanYieldPolicy::YieldPolicy yieldPolicy,
size_t plannerOptions) {
return canonicalQuery->getEnableSlotBasedExecutionEngine() &&
isQuerySbeCompatible(opCtx, canonicalQuery.get(), plannerOptions)
- ? getSlotBasedExecutor(
- opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions)
+ ? getSlotBasedExecutor(opCtx,
+ collection,
+ std::move(canonicalQuery),
+ extractAndAttachPipelineStages,
+ yieldPolicy,
+ plannerOptions)
: getClassicExecutor(
opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions);
}
@@ -1177,6 +1187,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind
OperationContext* opCtx,
const CollectionPtr* collection,
std::unique_ptr<CanonicalQuery> canonicalQuery,
+ std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages,
bool permitYield,
size_t plannerOptions) {
auto yieldPolicy = (permitYield && !opCtx->inMultiDocumentTransaction())
@@ -1186,7 +1197,12 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind
if (OperationShardingState::isOperationVersioned(opCtx)) {
plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
}
- return getExecutor(opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions);
+ return getExecutor(opCtx,
+ collection,
+ std::move(canonicalQuery),
+ extractAndAttachPipelineStages,
+ yieldPolicy,
+ plannerOptions);
}
namespace {
@@ -2302,7 +2318,12 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorWith
MatchExpressionParser::kAllowAllSpecialFeatures),
"Unable to canonicalize query");
- return getExecutor(opCtx, coll, std::move(cqWithoutProjection), yieldPolicy, plannerOptions);
+ return getExecutor(opCtx,
+ coll,
+ std::move(cqWithoutProjection),
+ nullptr /* extractAndAttachPipelineStages */,
+ yieldPolicy,
+ plannerOptions);
}
} // namespace
@@ -2378,8 +2399,12 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDist
if (plannerOptions & QueryPlannerParams::STRICT_DISTINCT_ONLY) {
return {nullptr};
} else {
- return getExecutor(
- opCtx, coll, parsedDistinct->releaseQuery(), yieldPolicy, plannerOptions);
+ return getExecutor(opCtx,
+ coll,
+ parsedDistinct->releaseQuery(),
+ nullptr /* extractAndAttachPipelineStages */,
+ yieldPolicy,
+ plannerOptions);
}
}
auto solutions = std::move(statusWithMultiPlanSolns.getValue());
diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h
index e16499e7782..886d5d2340d 100644
--- a/src/mongo/db/query/get_executor.h
+++ b/src/mongo/db/query/get_executor.h
@@ -120,11 +120,17 @@ bool shouldWaitForOplogVisibility(OperationContext* opCtx,
* PlanExecutor.
*
* If the query cannot be executed, returns a Status indicating why.
+ *
+ * If the caller provides a 'extractAndAttachPipelineStages' function and the query is eligible for
+ * pushdown into the find layer this function will be invoked to extract pipeline stages and
+ * attach them to the provided 'CanonicalQuery'. This function should capture the Pipeline that
+ * stages should be extracted from.
*/
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor(
OperationContext* opCtx,
const CollectionPtr* collection,
std::unique_ptr<CanonicalQuery> canonicalQuery,
+ std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages,
PlanYieldPolicy::YieldPolicy yieldPolicy,
size_t plannerOptions = 0);
@@ -137,11 +143,17 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor(
* PlanExecutor.
*
* If the query cannot be executed, returns a Status indicating why.
+ *
+ * If the caller provides a 'extractAndAttachPipelineStages' function and the query is eligible for
+ * pushdown into the find layer this function will be invoked to extract pipeline stages and
+ * attach them to the provided 'CanonicalQuery'. This function should capture the Pipeline that
+ * stages should be extracted from.
*/
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind(
OperationContext* opCtx,
const CollectionPtr* collection,
std::unique_ptr<CanonicalQuery> canonicalQuery,
+ std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages,
bool permitYield = false,
size_t plannerOptions = QueryPlannerParams::DEFAULT);
diff --git a/src/mongo/db/query/query_feature_flags.idl b/src/mongo/db/query/query_feature_flags.idl
index 86e5e9f68ec..2166f6545f0 100644
--- a/src/mongo/db/query/query_feature_flags.idl
+++ b/src/mongo/db/query/query_feature_flags.idl
@@ -56,9 +56,9 @@ feature_flags:
cpp_varname: gFeatureFlagShardedTimeSeriesUpdateDelete
default: false
- featureFlagSBEGroupAndLookup:
- description: "Feature flag for allowing SBE $group and $lookup"
- cpp_varname: gFeatureFlagSBEGroupAndLookup
+ featureFlagSBEGroupPushdown:
+ description: "Feature flag for allowing SBE $group pushdown"
+ cpp_varname: gFeatureFlagSBEGroupPushdown
default: false
featureFlagExactTopNAccumulator:
diff --git a/src/mongo/db/query/stage_types.cpp b/src/mongo/db/query/stage_types.cpp
index 9b53650ee36..2c508774612 100644
--- a/src/mongo/db/query/stage_types.cpp
+++ b/src/mongo/db/query/stage_types.cpp
@@ -47,6 +47,7 @@ StringData stageTypeToString(StageType stageType) {
{STAGE_FETCH, "FETCH"_sd},
{STAGE_GEO_NEAR_2D, "GEO_NEAR_2D"_sd},
{STAGE_GEO_NEAR_2DSPHERE, "GEO_NEAR_2DSPHERE"_sd},
+ {STAGE_GROUP, "GROUP"_sd},
{STAGE_IDHACK, "IDHACK"_sd},
{STAGE_IXSCAN, "IXSCAN"_sd},
{STAGE_LIMIT, "LIMIT"_sd},
diff --git a/src/mongo/db/transaction_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp
index ae21a26038e..b01259efd5e 100644
--- a/src/mongo/db/transaction_history_iterator.cpp
+++ b/src/mongo/db/transaction_history_iterator.cpp
@@ -86,8 +86,11 @@ BSONObj findOneOplogEntry(OperationContext* opCtx,
CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(NamespaceString::kLocalDb),
Date_t::max());
- auto exec = uassertStatusOK(
- getExecutorFind(opCtx, &oplogRead.getCollection(), std::move(cq), permitYield));
+ auto exec = uassertStatusOK(getExecutorFind(opCtx,
+ &oplogRead.getCollection(),
+ std::move(cq),
+ nullptr /*extractAndAttachPipelineStages */,
+ permitYield));
PlanExecutor::ExecState getNextResult;
try {
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index 79d1d3f8472..8a965cfe559 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -102,6 +102,7 @@ protected:
auto exec = uassertStatusOK(getExecutor(opCtx(),
&_coll,
std::move(cq),
+ nullptr /* extractAndAttachPipelineStages */,
PlanYieldPolicy::YieldPolicy::NO_YIELD,
QueryPlannerParams::RETURN_OWNED_DATA));
diff --git a/src/mongo/dbtests/query_stage_multiplan.cpp b/src/mongo/dbtests/query_stage_multiplan.cpp
index e313eda05eb..8160787f241 100644
--- a/src/mongo/dbtests/query_stage_multiplan.cpp
+++ b/src/mongo/dbtests/query_stage_multiplan.cpp
@@ -566,8 +566,12 @@ TEST_F(QueryStageMultiPlanTest, MPSSummaryStats) {
auto findCommand = std::make_unique<FindCommandRequest>(nss);
findCommand->setFilter(BSON("foo" << BSON("$gte" << 0)));
auto cq = uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(findCommand)));
- auto exec = uassertStatusOK(
- getExecutor(opCtx(), &coll, std::move(cq), PlanYieldPolicy::YieldPolicy::NO_YIELD, 0));
+ auto exec = uassertStatusOK(getExecutor(opCtx(),
+ &coll,
+ std::move(cq),
+ nullptr /* extractAndAttachPipelineStages */,
+ PlanYieldPolicy::YieldPolicy::NO_YIELD,
+ 0));
auto execImpl = dynamic_cast<PlanExecutorImpl*>(exec.get());
ASSERT(execImpl);