summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@10gen.com>2022-02-02 22:54:26 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-02 23:49:05 +0000
commit3f7f9f57be3b127a6334c584b8a1795480baf1ae (patch)
tree515cd36f91d21d51ad156904338be17efd2984e6
parent3731ee31ba39a831e59bf233a562e3120497c392 (diff)
downloadmongo-3f7f9f57be3b127a6334c584b8a1795480baf1ae.tar.gz
SERVER-58437 Implement pushdown logic for $lookup
-rw-r--r--jstests/noPassthrough/lookup_pushdown.js200
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp25
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h11
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp79
-rw-r--r--src/mongo/db/query/get_executor.cpp2
-rw-r--r--src/mongo/db/query/multi_collection.h9
7 files changed, 305 insertions, 30 deletions
diff --git a/jstests/noPassthrough/lookup_pushdown.js b/jstests/noPassthrough/lookup_pushdown.js
new file mode 100644
index 00000000000..fe10110b407
--- /dev/null
+++ b/jstests/noPassthrough/lookup_pushdown.js
@@ -0,0 +1,200 @@
+/**
+ * Tests basic functionality of pushing $lookup into the find layer.
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/sbe_util.js"); // For checkSBEEnabled.
+
+function runTest(coll, pipeline, expectedToPushdown) {
+ const cmd = () => coll.aggregate(pipeline).toArray();
+ if (expectedToPushdown) {
+ assert.throwsWithCode(cmd, 5843700);
+ } else {
+ assert.doesNotThrow(cmd);
+ }
+}
+
+// Standalone cases.
+const conn = MongoRunner.runMongod({setParameter: "internalEnableMultipleAutoGetCollections=true"});
+assert.neq(null, conn, "mongod was unable to start up");
+const name = "lookup_pushdown";
+let db = conn.getDB(name);
+
+if (!checkSBEEnabled(db, ["featureFlagSBELookupPushdown"])) {
+ jsTestLog("Skipping test because the sbe lookup pushdown feature flag is disabled");
+ MongoRunner.stopMongod(conn);
+ return;
+}
+
+const foreignCollName = "foreign_lookup_pushdown";
+const viewName = "view_lookup_pushdown";
+let coll = db[name];
+assert.commandWorked(coll.insert({_id: 1, a: 2}));
+let foreignColl = db[foreignCollName];
+assert.commandWorked(foreignColl.insert({_id: 0, b: 2}));
+assert.commandWorked(db.createView(viewName, foreignCollName, [{$match: {b: {$gte: 0}}}]));
+let view = db[viewName];
+
+// Basic $lookup.
+runTest(coll,
+ [{$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}],
+ true /* expectedToPushdown */);
+
+// Self join $lookup, no views.
+runTest(coll,
+ [{$lookup: {from: name, localField: "a", foreignField: "a", as: "out"}}],
+ true /* expectedToPushdown */);
+
+// Self join $lookup; left hand is a view. This is expected to be pushed down because the view
+// pipeline itself is a $match, which is eligible for pushdown.
+runTest(view,
+ [{$lookup: {from: name, localField: "a", foreignField: "a", as: "out"}}],
+ true /* expectedToPushdown */);
+
+// Self join $lookup; right hand is a view.
+runTest(coll,
+ [{$lookup: {from: viewName, localField: "a", foreignField: "a", as: "out"}}],
+ false /* expectedToPushdown */);
+
+// Self join $lookup; both namespaces are views.
+runTest(view,
+ [{$lookup: {from: viewName, localField: "a", foreignField: "a", as: "out"}}],
+ false /* expectedToPushdown */);
+
+// $lookup preceded by $match.
+runTest(coll,
+ [
+ {$match: {a: {$gte: 0}}},
+ {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}
+ ],
+ true /* expectedToPushdown */);
+
+// $lookup preceded by $project.
+runTest(coll,
+ [
+ {$project: {a: 1}},
+ {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}
+ ],
+ true /* expectedToPushdown */);
+
+// $lookup preceded by $project which features an SBE-incompatible expression.
+// TODO SERVER-51542: Update or remove this test case once $pow is implemented in SBE.
+runTest(coll,
+ [
+ {$project: {exp: {$pow: ["$a", 3]}}},
+ {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}
+ ],
+ false /* expectedToPushdown */);
+
+// $lookup preceded by $group.
+runTest(coll,
+ [
+ {$group: {_id: "$a", sum: {$sum: 1}}},
+ {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}
+ ],
+ true /* expectedToPushdown */);
+
+// Consecutive $lookups (first is against view).
+runTest(coll,
+ [
+ {$lookup: {from: viewName, localField: "a", foreignField: "b", as: "out"}},
+ {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}
+ ],
+ false /* expectedToPushdown */);
+
+// Consecutive $lookups (first is against regular collection).
+runTest(coll,
+ [
+ {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}},
+ {$lookup: {from: viewName, localField: "a", foreignField: "b", as: "out"}}
+ ],
+ true /* expectedToPushdown */);
+
+// $lookup with pipeline.
+runTest(coll,
+ [{$lookup: {from: foreignCollName, let: {foo: "$b"}, pipeline: [{$match: {$expr: {$eq: ["$$foo",
+2]}}}], as: "out"}}], false /* expectedToPushdown */);
+
+// $lookup that absorbs $unwind.
+runTest(coll,
+ [
+ {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}},
+ {$unwind: "$out"}
+ ],
+ false /* expectedToPushdown */);
+
+// $lookup that absorbs $match.
+runTest(coll,
+ [
+ {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}},
+ {$unwind: "$out"},
+ {$match: {out: {$gte: 0}}}
+ ],
+ false /* expectedToPushdown */);
+
+// $lookup that does not absorb $match.
+runTest(coll,
+ [
+ {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}},
+ {$match: {out: {$gte: 0}}}
+ ],
+ true /* expectedToPushdown */);
+
+MongoRunner.stopMongod(conn);
+
+// Sharded cases.
+const st = new ShardingTest({
+ shards: 2,
+ mongos: 1,
+ other: {shardOptions: {setParameter: "internalEnableMultipleAutoGetCollections=true"}}
+});
+db = st.s.getDB(name);
+
+// Setup. Here, 'coll' is sharded, 'foreignColl' is unsharded, 'viewName' is an unsharded view,
+// and 'shardedViewName' is a sharded view.
+const shardedViewName = "sharded_foreign_view";
+coll = db[name];
+assert.commandWorked(coll.insert({a: 1, shardKey: 1}));
+assert.commandWorked(coll.insert({a: 2, shardKey: 10}));
+assert.commandWorked(coll.createIndex({shardKey: 1}));
+st.shardColl(coll.getName(), {shardKey: 1}, {shardKey: 5}, {shardKey: 5}, name);
+
+foreignColl = db[foreignCollName];
+assert.commandWorked(foreignColl.insert({b: 5}));
+
+assert.commandWorked(db.createView(viewName, foreignCollName, [{$match: {b: {$gte: 0}}}]));
+assert.commandWorked(db.createView(shardedViewName, name, [{$match: {b: {$gte: 0}}}]));
+
+// Both collections are unsharded.
+runTest(foreignColl,
+ [{$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}],
+ true /* expectedToPushdown */);
+
+// Sharded main collection, unsharded right side. This is not expected to be eligible for pushdown
+// because the $lookup will be preceded by a $mergeCursors stage on the merging shard.
+runTest(coll,
+ [{$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}],
+ false /* expectedToPushdown */);
+
+// Both collections are sharded.
+runTest(coll,
+ [{$lookup: {from: name, localField: "a", foreignField: "b", as: "out"}}],
+ false /* expectedToPushdown */);
+
+// Unsharded main collection, sharded right side.
+runTest(foreignColl,
+ [{$lookup: {from: name, localField: "a", foreignField: "b", as: "out"}}],
+ false /* expectedToPushdown */);
+
+// Unsharded main collection, unsharded view right side.
+runTest(foreignColl,
+ [{$lookup: {from: viewName, localField: "a", foreignField: "b", as: "out"}}],
+ false /* expectedToPushdown */);
+
+// Unsharded main collection, sharded view on the right side.
+runTest(foreignColl,
+ [{$lookup: {from: shardedViewName, localField: "a", foreignField: "b", as: "out"}}],
+ false /* expectedToPushdown */);
+st.stop();
+}());
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index e7634306797..be02bd21de9 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -659,8 +659,13 @@ Status runAggregate(OperationContext* opCtx,
auto initContext = [&](AutoGetCollectionViewMode m) -> void {
ctx.emplace(opCtx, nss, m);
for (const auto& ns : secondaryExecNssList) {
- secondaryCtx.emplace_back(
- std::make_unique<AutoGetCollectionForReadCommandMaybeLockFree>(opCtx, ns, m));
+ // Avoid locking the main namespace multiple times (we can't lock a secondary
+ // namespace multiple times because 'secondaryExecNssList is a set already). This
+ // emulates the behavior of 'AutoGetCollectionMulti'.
+ if (ns != nss) {
+ secondaryCtx.emplace_back(
+ std::make_unique<AutoGetCollectionForReadCommandMaybeLockFree>(opCtx, ns, m));
+ }
}
collections = MultiCollection(ctx, secondaryCtx);
};
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index f4c84b620ad..61b5173c21a 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -681,6 +681,9 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt(
// following $unwind stage.
if (nextUnwind && !_unwindSrc && nextUnwind->getUnwindPath() == _as.fullPath()) {
_unwindSrc = std::move(nextUnwind);
+
+ // We cannot push absorbed $unwind stages into SBE.
+ _sbeCompatible = false;
container->erase(std::next(itr));
return itr;
}
@@ -765,7 +768,10 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt(
return std::next(itr);
}
- // We can internalize the $match.
+ // We can internalize the $match. This $lookup should already be marked as SBE incompatible
+ // because a $match can only be internalized if an $unwind, which is SBE incompatible, was
+ // absorbed as well.
+ tassert(5843701, "This $lookup cannot be compatible with SBE", !_sbeCompatible);
if (!_matchSrc) {
_matchSrc = nextMatch;
} else {
@@ -1249,12 +1255,17 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
"$lookup with a 'let' argument must also specify 'pipeline'",
!hasLet);
- return new DocumentSourceLookUp(std::move(fromNs),
- std::move(as),
- std::move(localField),
- std::move(foreignField),
- std::move(fromCollator),
- pExpCtx);
+ auto lookupStage = new DocumentSourceLookUp(std::move(fromNs),
+ std::move(as),
+ std::move(localField),
+ std::move(foreignField),
+ std::move(fromCollator),
+ pExpCtx);
+
+ // $lookup stages with local/foreignField specified are eligible for pushdown into SBE if
+ // the context allows it.
+ lookupStage->_sbeCompatible = pExpCtx->sbeCompatible;
+ return lookupStage;
}
}
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 9b5f87803fc..3fa95b7e151 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -223,6 +223,14 @@ public:
boost::intrusive_ptr<DocumentSource> clone() const final;
+ bool sbeCompatible() const {
+ return _sbeCompatible;
+ }
+
+ const NamespaceString& getFromNs() {
+ return _fromNs;
+ }
+
protected:
GetNextResult doGetNext() final;
void doDispose() final;
@@ -365,6 +373,9 @@ private:
// collation when not set explicitly.
bool _hasExplicitCollation = false;
+ // Can this $lookup be pushed down into SBE?
+ bool _sbeCompatible = false;
+
// The aggregation pipeline to perform against the '_resolvedNs' namespace. Referenced view
// namespaces have been resolved.
std::vector<BSONObj> _resolvedPipeline;
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 67d52f4e103..ecb2b13d0b5 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -65,6 +65,7 @@
#include "mongo/db/pipeline/document_source_geo_near_cursor.h"
#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
+#include "mongo/db/pipeline/document_source_lookup.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h"
@@ -100,48 +101,86 @@ using write_ops::InsertCommandRequest;
namespace {
/**
- * Extracts a prefix of 'DocumentSourceGroup' stages from the given pipeline to prepare for
- * pushdown of $group into the inner query layer so that it can be executed using SBE. Group stages
- * are extracted from the pipeline under when all of the following conditions are met:
+ * Extracts a prefix of 'DocumentSourceGroup' and 'DocumentSourceLookUp' stages from the given
+ * pipeline to prepare for pushdown of $group and $lookup into the inner query layer so that it
+ * can be executed using SBE.
+ * Group stages are extracted from the pipeline when all of the following conditions are met:
* 0. When the 'internalQueryForceClassicEngine' feature flag is 'false'.
* 1. When 'allowDiskUse' is false. We currently don't support spilling in the SBE HashAgg
* stage. This will change once that is supported when SERVER-58436 is complete.
* 2. When the DocumentSourceGroup has 'doingMerge=false', this will change when we implement
* hash table spilling in SERVER-58436.
+ *
+ * Lookup stages are extracted from the pipeline when all of the following conditions are met:
+ * 0. When the 'internalQueryForceClassicEngine' feature flag is 'false'.
+ * 1. When the 'featureFlagSBELookupPushdown' feature flag is 'true'.
+ * 2. When the 'internalEnableMultipleAutoGetCollections' flag is 'true'
+ * 3. The $lookup uses only the 'localField'/'foreignField' syntax (no pipelines).
+ * 4. The foreign collection is neither sharded nor a view.
*/
-std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleGroupsForPushdown(
+std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleStagesForPushdown(
const intrusive_ptr<ExpressionContext>& expCtx,
- const CollectionPtr& collection,
+ const MultiCollection& collections,
const CanonicalQuery* cq,
Pipeline* pipeline) {
// We will eventually use the extracted group stages to populate 'CanonicalQuery::pipeline'
// which requires stages to be wrapped in an interface.
- std::vector<std::unique_ptr<InnerPipelineStageInterface>> groupsForPushdown;
+ std::vector<std::unique_ptr<InnerPipelineStageInterface>> stagesForPushdown;
// This handles the case of unionWith against an unknown collection.
- if (collection == nullptr) {
+ if (collections.getMainCollection() == nullptr) {
return {};
}
- if (!feature_flags::gFeatureFlagSBEGroupPushdown.isEnabled(
- serverGlobalParams.featureCompatibility) ||
- cq->getForceClassicEngine()) {
+ // No pushdown if we're using the classic engine.
+ if (cq->getForceClassicEngine()) {
return {};
}
auto&& sources = pipeline->getSources();
for (auto itr = sources.begin(); itr != sources.end();) {
- auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get());
- if (!(groupStage && groupStage->sbeCompatible()) || groupStage->doingMerge()) {
- // Only pushdown a prefix of group stages that are supported by sbe.
+ // $group pushdown logic.
+ if (auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get())) {
+ bool groupEligibleForPushdown = feature_flags::gFeatureFlagSBEGroupPushdown.isEnabled(
+ serverGlobalParams.featureCompatibility) &&
+ groupStage->sbeCompatible() && !groupStage->doingMerge();
+ if (groupEligibleForPushdown) {
+ stagesForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage));
+ sources.erase(itr++);
+ continue;
+ }
+ break;
+ }
+
+ // $lookup pushdown logic.
+ if (auto lookupStage = dynamic_cast<DocumentSourceLookUp*>(itr->get())) {
+ bool isForeignSharded = false;
+ bool isForeignView = false;
+ const auto& fromNs = lookupStage->getFromNs();
+ const auto& foreignColl = collections.lookupCollection(fromNs);
+ if (foreignColl) {
+ isForeignSharded = foreignColl.isSharded();
+ } else {
+ // If the right hand side targets a namespace that we can't find in
+ // 'collections', we infer that it is targeting a view.
+ isForeignView = true;
+ }
+
+ bool lookupEligibleForPushdown =
+ feature_flags::gFeatureFlagSBELookupPushdown.isEnabledAndIgnoreFCV() &&
+ internalEnableMultipleAutoGetCollections.load() && lookupStage->sbeCompatible() &&
+ !isForeignSharded && !isForeignView;
+ uassert(
+ 5843700, "$lookup push down logic worked correctly", !lookupEligibleForPushdown);
break;
}
- groupsForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage));
- sources.erase(itr++);
+
+ // Current stage cannot be pushed down.
+ break;
}
- return groupsForPushdown;
+ return stagesForPushdown;
}
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor(
@@ -158,7 +197,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
const size_t plannerOpts,
const MatchExpressionParser::AllowedFeatureSet& matcherFeatures,
Pipeline* pipeline) {
- const auto& collection = collections.getMainCollection();
+ const auto& mainColl = collections.getMainCollection();
auto findCommand = std::make_unique<FindCommandRequest>(nss);
query_request_helper::setTailableMode(expCtx->tailableMode, findCommand.get());
findCommand->setFilter(queryObj.getOwned());
@@ -240,7 +279,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
// example, if we have a document {a: [1,2]} and group by "a" a DISTINCT_SCAN on an "a"
// index would produce one result for '1' and another for '2', which would be incorrect.
auto distinctExecutor = getExecutorDistinct(
- &collection, plannerOpts | QueryPlannerParams::STRICT_DISTINCT_ONLY, &parsedDistinct);
+ &mainColl, plannerOpts | QueryPlannerParams::STRICT_DISTINCT_ONLY, &parsedDistinct);
if (!distinctExecutor.isOK()) {
return distinctExecutor.getStatus().withContext(
"Unable to use distinct scan to optimize $group stage");
@@ -257,8 +296,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
collections,
std::move(cq.getValue()),
[&](auto* canonicalQuery) {
- canonicalQuery->setPipeline(extractSbeCompatibleGroupsForPushdown(
- expCtx, collection, canonicalQuery, pipeline));
+ canonicalQuery->setPipeline(extractSbeCompatibleStagesForPushdown(
+ expCtx, collections, canonicalQuery, pipeline));
},
permitYield,
plannerOpts);
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index cd61dfefe7c..a08402df756 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -962,7 +962,7 @@ public:
std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> buildExecutableTree(
const QuerySolution& solution) const final {
- // TODO SERVER-58437 We don't pass '_collections' to the function below because at the
+ // TODO SERVER-62677 We don't pass '_collections' to the function below because at the
// moment, no pushdown is actually happening. This should be changed once the logic for
// pushdown is implemented.
return stage_builder::buildSlotBasedExecutableTree(
diff --git a/src/mongo/db/query/multi_collection.h b/src/mongo/db/query/multi_collection.h
index 77368d13682..9b3230a2cc6 100644
--- a/src/mongo/db/query/multi_collection.h
+++ b/src/mongo/db/query/multi_collection.h
@@ -80,6 +80,15 @@ public:
return _secondaryColls;
}
+ const CollectionPtr& lookupCollection(const NamespaceString& nss) const {
+ if (_mainCollName && nss == *_mainCollName) {
+ return *_mainColl;
+ } else if (auto itr = _secondaryColls.find(nss); itr != _secondaryColls.end()) {
+ return itr->second;
+ }
+ return CollectionPtr::null;
+ }
+
void clear() {
_mainColl = &CollectionPtr::null;
_secondaryColls.clear();