summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline_d.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp39
1 files changed, 29 insertions, 10 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index ac2173ae50c..ec84917b5c5 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -107,7 +107,7 @@ using write_ops::InsertCommandRequest;
namespace {
/**
- * Extracts a prefix of 'DocumentSourceGroup' and 'DocumentSourceLookUp' stages from the given
+ * Finds a prefix of 'DocumentSourceGroup' and 'DocumentSourceLookUp' stages from the given
* pipeline to prepare for pushdown of $group and $lookup into the inner query layer so that it
* can be executed using SBE.
* Group stages are extracted from the pipeline when all of the following conditions are met:
@@ -121,11 +121,11 @@ namespace {
* - The $lookup uses only the 'localField'/'foreignField' syntax (no pipelines).
* - The foreign collection is neither sharded nor a view.
*/
-std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleStagesForPushdown(
+std::vector<std::unique_ptr<InnerPipelineStageInterface>> findSbeCompatibleStagesForPushdown(
const intrusive_ptr<ExpressionContext>& expCtx,
const MultipleCollectionAccessor& collections,
const CanonicalQuery* cq,
- Pipeline* pipeline) {
+ const Pipeline* pipeline) {
// We will eventually use the extracted group stages to populate 'CanonicalQuery::pipeline'
// which requires stages to be wrapped in an interface.
std::vector<std::unique_ptr<InnerPipelineStageInterface>> stagesForPushdown;
@@ -140,7 +140,7 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleSt
return {};
}
- auto&& sources = pipeline->getSources();
+ const auto& sources = pipeline->getSources();
bool isMainCollectionSharded = false;
if (const auto& mainColl = collections.getMainCollection()) {
@@ -158,7 +158,7 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleSt
internalQuerySlotBasedExecutionDisableLookupPushdown.load() || isMainCollectionSharded ||
collections.isAnySecondaryNamespaceAViewOrSharded();
- for (auto itr = sources.begin(); itr != sources.end();) {
+ for (auto itr = sources.begin(); itr != sources.end(); ++itr) {
const bool isLastSource = itr->get() == sources.back().get();
// $group pushdown logic.
@@ -170,7 +170,6 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleSt
if (groupStage->sbeCompatible() && !groupStage->doingMerge()) {
stagesForPushdown.push_back(
std::make_unique<InnerPipelineStageImpl>(groupStage, isLastSource));
- sources.erase(itr++);
continue;
}
break;
@@ -187,7 +186,6 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleSt
if (lookupStage->sbeCompatible()) {
stagesForPushdown.push_back(
std::make_unique<InnerPipelineStageImpl>(lookupStage, isLastSource));
- sources.erase(itr++);
continue;
}
break;
@@ -199,6 +197,21 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleSt
return stagesForPushdown;
}
+/**
+ * Removes the first 'stagesToRemove' stages from the pipeline. This function is meant to be paired
+ * with a call to findSbeCompatibleStagesForPushdown() - the caller must first get the stages to
+ * push down, then remove them.
+ */
+void trimPipelineStages(Pipeline* pipeline, size_t stagesToRemove) {
+ auto& sources = pipeline->getSources();
+ tassert(7087104,
+ "stagesToRemove must be <= number of pipeline sources",
+ stagesToRemove <= sources.size());
+ for (size_t i = 0; i < stagesToRemove; ++i) {
+ sources.erase(sources.begin());
+ }
+}
+
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor(
const intrusive_ptr<ExpressionContext>& expCtx,
const MultipleCollectionAccessor& collections,
@@ -296,9 +309,15 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
return getExecutorFind(expCtx->opCtx,
collections,
std::move(cq.getValue()),
- [&](auto* canonicalQuery) {
- canonicalQuery->setPipeline(extractSbeCompatibleStagesForPushdown(
- expCtx, collections, canonicalQuery, pipeline));
+ [&](auto* canonicalQuery, bool attachOnly) {
+ if (attachOnly) {
+ canonicalQuery->setPipeline(findSbeCompatibleStagesForPushdown(
+ expCtx, collections, canonicalQuery, pipeline));
+ } else {
+ // Not attaching - we need to trim the already pushed down
+ // pipeline stages from the pipeline.
+ trimPipelineStages(pipeline, canonicalQuery->pipeline().size());
+ }
},
permitYield,
plannerOpts);