summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline_d.cpp
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@10gen.com>2022-02-02 22:54:26 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-02 23:49:05 +0000
commit3f7f9f57be3b127a6334c584b8a1795480baf1ae (patch)
tree515cd36f91d21d51ad156904338be17efd2984e6 /src/mongo/db/pipeline/pipeline_d.cpp
parent3731ee31ba39a831e59bf233a562e3120497c392 (diff)
downloadmongo-3f7f9f57be3b127a6334c584b8a1795480baf1ae.tar.gz
SERVER-58437 Implement pushdown logic for $lookup
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp79
1 files changed, 59 insertions, 20 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 67d52f4e103..ecb2b13d0b5 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -65,6 +65,7 @@
#include "mongo/db/pipeline/document_source_geo_near_cursor.h"
#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
+#include "mongo/db/pipeline/document_source_lookup.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h"
@@ -100,48 +101,86 @@ using write_ops::InsertCommandRequest;
namespace {
/**
- * Extracts a prefix of 'DocumentSourceGroup' stages from the given pipeline to prepare for
- * pushdown of $group into the inner query layer so that it can be executed using SBE. Group stages
- * are extracted from the pipeline under when all of the following conditions are met:
+ * Extracts a prefix of 'DocumentSourceGroup' and 'DocumentSourceLookUp' stages from the given
+ * pipeline to prepare for pushdown of $group and $lookup into the inner query layer so that it
+ * can be executed using SBE.
+ * Group stages are extracted from the pipeline when all of the following conditions are met:
* 0. When the 'internalQueryForceClassicEngine' feature flag is 'false'.
* 1. When 'allowDiskUse' is false. We currently don't support spilling in the SBE HashAgg
* stage. This will change once that is supported when SERVER-58436 is complete.
* 2. When the DocumentSourceGroup has 'doingMerge=false', this will change when we implement
* hash table spilling in SERVER-58436.
+ *
+ * Lookup stages are extracted from the pipeline when all of the following conditions are met:
+ * 0. When the 'internalQueryForceClassicEngine' feature flag is 'false'.
+ * 1. When the 'featureFlagSBELookupPushdown' feature flag is 'true'.
+ * 2. When the 'internalEnableMultipleAutoGetCollections' flag is 'true'
+ * 3. The $lookup uses only the 'localField'/'foreignField' syntax (no pipelines).
+ * 4. The foreign collection is neither sharded nor a view.
*/
-std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleGroupsForPushdown(
+std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleStagesForPushdown(
const intrusive_ptr<ExpressionContext>& expCtx,
- const CollectionPtr& collection,
+ const MultiCollection& collections,
const CanonicalQuery* cq,
Pipeline* pipeline) {
// We will eventually use the extracted group stages to populate 'CanonicalQuery::pipeline'
// which requires stages to be wrapped in an interface.
- std::vector<std::unique_ptr<InnerPipelineStageInterface>> groupsForPushdown;
+ std::vector<std::unique_ptr<InnerPipelineStageInterface>> stagesForPushdown;
// This handles the case of unionWith against an unknown collection.
- if (collection == nullptr) {
+ if (collections.getMainCollection() == nullptr) {
return {};
}
- if (!feature_flags::gFeatureFlagSBEGroupPushdown.isEnabled(
- serverGlobalParams.featureCompatibility) ||
- cq->getForceClassicEngine()) {
+ // No pushdown if we're using the classic engine.
+ if (cq->getForceClassicEngine()) {
return {};
}
auto&& sources = pipeline->getSources();
for (auto itr = sources.begin(); itr != sources.end();) {
- auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get());
- if (!(groupStage && groupStage->sbeCompatible()) || groupStage->doingMerge()) {
- // Only pushdown a prefix of group stages that are supported by sbe.
+ // $group pushdown logic.
+ if (auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get())) {
+ bool groupEligibleForPushdown = feature_flags::gFeatureFlagSBEGroupPushdown.isEnabled(
+ serverGlobalParams.featureCompatibility) &&
+ groupStage->sbeCompatible() && !groupStage->doingMerge();
+ if (groupEligibleForPushdown) {
+ stagesForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage));
+ sources.erase(itr++);
+ continue;
+ }
+ break;
+ }
+
+ // $lookup pushdown logic.
+ if (auto lookupStage = dynamic_cast<DocumentSourceLookUp*>(itr->get())) {
+ bool isForeignSharded = false;
+ bool isForeignView = false;
+ const auto& fromNs = lookupStage->getFromNs();
+ const auto& foreignColl = collections.lookupCollection(fromNs);
+ if (foreignColl) {
+ isForeignSharded = foreignColl.isSharded();
+ } else {
+ // If the right hand side targets a namespace that we can't find in
+ // 'collections', we infer that it is targeting a view.
+ isForeignView = true;
+ }
+
+ bool lookupEligibleForPushdown =
+ feature_flags::gFeatureFlagSBELookupPushdown.isEnabledAndIgnoreFCV() &&
+ internalEnableMultipleAutoGetCollections.load() && lookupStage->sbeCompatible() &&
+ !isForeignSharded && !isForeignView;
+ uassert(
+ 5843700, "$lookup push down logic worked correctly", !lookupEligibleForPushdown);
break;
}
- groupsForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage));
- sources.erase(itr++);
+
+ // Current stage cannot be pushed down.
+ break;
}
- return groupsForPushdown;
+ return stagesForPushdown;
}
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor(
@@ -158,7 +197,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
const size_t plannerOpts,
const MatchExpressionParser::AllowedFeatureSet& matcherFeatures,
Pipeline* pipeline) {
- const auto& collection = collections.getMainCollection();
+ const auto& mainColl = collections.getMainCollection();
auto findCommand = std::make_unique<FindCommandRequest>(nss);
query_request_helper::setTailableMode(expCtx->tailableMode, findCommand.get());
findCommand->setFilter(queryObj.getOwned());
@@ -240,7 +279,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
// example, if we have a document {a: [1,2]} and group by "a" a DISTINCT_SCAN on an "a"
// index would produce one result for '1' and another for '2', which would be incorrect.
auto distinctExecutor = getExecutorDistinct(
- &collection, plannerOpts | QueryPlannerParams::STRICT_DISTINCT_ONLY, &parsedDistinct);
+ &mainColl, plannerOpts | QueryPlannerParams::STRICT_DISTINCT_ONLY, &parsedDistinct);
if (!distinctExecutor.isOK()) {
return distinctExecutor.getStatus().withContext(
"Unable to use distinct scan to optimize $group stage");
@@ -257,8 +296,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
collections,
std::move(cq.getValue()),
[&](auto* canonicalQuery) {
- canonicalQuery->setPipeline(extractSbeCompatibleGroupsForPushdown(
- expCtx, collection, canonicalQuery, pipeline));
+ canonicalQuery->setPipeline(extractSbeCompatibleStagesForPushdown(
+ expCtx, collections, canonicalQuery, pipeline));
},
permitYield,
plannerOpts);