diff options
-rw-r--r-- | jstests/noPassthrough/lookup_pushdown.js | 200 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.h | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 79 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/query/multi_collection.h | 9 |
7 files changed, 305 insertions, 30 deletions
diff --git a/jstests/noPassthrough/lookup_pushdown.js b/jstests/noPassthrough/lookup_pushdown.js new file mode 100644 index 00000000000..fe10110b407 --- /dev/null +++ b/jstests/noPassthrough/lookup_pushdown.js @@ -0,0 +1,200 @@ +/** + * Tests basic functionality of pushing $lookup into the find layer. + */ +(function() { +"use strict"; + +load("jstests/libs/sbe_util.js"); // For checkSBEEnabled. + +function runTest(coll, pipeline, expectedToPushdown) { + const cmd = () => coll.aggregate(pipeline).toArray(); + if (expectedToPushdown) { + assert.throwsWithCode(cmd, 5843700); + } else { + assert.doesNotThrow(cmd); + } +} + +// Standalone cases. +const conn = MongoRunner.runMongod({setParameter: "internalEnableMultipleAutoGetCollections=true"}); +assert.neq(null, conn, "mongod was unable to start up"); +const name = "lookup_pushdown"; +let db = conn.getDB(name); + +if (!checkSBEEnabled(db, ["featureFlagSBELookupPushdown"])) { + jsTestLog("Skipping test because the sbe lookup pushdown feature flag is disabled"); + MongoRunner.stopMongod(conn); + return; +} + +const foreignCollName = "foreign_lookup_pushdown"; +const viewName = "view_lookup_pushdown"; +let coll = db[name]; +assert.commandWorked(coll.insert({_id: 1, a: 2})); +let foreignColl = db[foreignCollName]; +assert.commandWorked(foreignColl.insert({_id: 0, b: 2})); +assert.commandWorked(db.createView(viewName, foreignCollName, [{$match: {b: {$gte: 0}}}])); +let view = db[viewName]; + +// Basic $lookup. +runTest(coll, + [{$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}], + true /* expectedToPushdown */); + +// Self join $lookup, no views. +runTest(coll, + [{$lookup: {from: name, localField: "a", foreignField: "a", as: "out"}}], + true /* expectedToPushdown */); + +// Self join $lookup; left hand is a view. This is expected to be pushed down because the view +// pipeline itself is a $match, which is eligible for pushdown. +runTest(view, + [{$lookup: {from: name, localField: "a", foreignField: "a", as: "out"}}], + true /* expectedToPushdown */); + +// Self join $lookup; right hand is a view. +runTest(coll, + [{$lookup: {from: viewName, localField: "a", foreignField: "a", as: "out"}}], + false /* expectedToPushdown */); + +// Self join $lookup; both namespaces are views. +runTest(view, + [{$lookup: {from: viewName, localField: "a", foreignField: "a", as: "out"}}], + false /* expectedToPushdown */); + +// $lookup preceded by $match. +runTest(coll, + [ + {$match: {a: {$gte: 0}}}, + {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}} + ], + true /* expectedToPushdown */); + +// $lookup preceded by $project. +runTest(coll, + [ + {$project: {a: 1}}, + {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}} + ], + true /* expectedToPushdown */); + +// $lookup preceded by $project which features an SBE-incompatible expression. +// TODO SERVER-51542: Update or remove this test case once $pow is implemented in SBE. +runTest(coll, + [ + {$project: {exp: {$pow: ["$a", 3]}}}, + {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}} + ], + false /* expectedToPushdown */); + +// $lookup preceded by $group. +runTest(coll, + [ + {$group: {_id: "$a", sum: {$sum: 1}}}, + {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}} + ], + true /* expectedToPushdown */); + +// Consecutive $lookups (first is against view). +runTest(coll, + [ + {$lookup: {from: viewName, localField: "a", foreignField: "b", as: "out"}}, + {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}} + ], + false /* expectedToPushdown */); + +// Consecutive $lookups (first is against regular collection). +runTest(coll, + [ + {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}, + {$lookup: {from: viewName, localField: "a", foreignField: "b", as: "out"}} + ], + true /* expectedToPushdown */); + +// $lookup with pipeline. +runTest(coll, + [{$lookup: {from: foreignCollName, let: {foo: "$b"}, pipeline: [{$match: {$expr: {$eq: ["$$foo", +2]}}}], as: "out"}}], false /* expectedToPushdown */); + +// $lookup that absorbs $unwind. +runTest(coll, + [ + {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}, + {$unwind: "$out"} + ], + false /* expectedToPushdown */); + +// $lookup that absorbs $match. +runTest(coll, + [ + {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}, + {$unwind: "$out"}, + {$match: {out: {$gte: 0}}} + ], + false /* expectedToPushdown */); + +// $lookup that does not absorb $match. +runTest(coll, + [ + {$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}, + {$match: {out: {$gte: 0}}} + ], + true /* expectedToPushdown */); + +MongoRunner.stopMongod(conn); + +// Sharded cases. +const st = new ShardingTest({ + shards: 2, + mongos: 1, + other: {shardOptions: {setParameter: "internalEnableMultipleAutoGetCollections=true"}} +}); +db = st.s.getDB(name); + +// Setup. Here, 'coll' is sharded, 'foreignColl' is unsharded, 'viewName' is an unsharded view, +// and 'shardedViewName' is a sharded view. +const shardedViewName = "sharded_foreign_view"; +coll = db[name]; +assert.commandWorked(coll.insert({a: 1, shardKey: 1})); +assert.commandWorked(coll.insert({a: 2, shardKey: 10})); +assert.commandWorked(coll.createIndex({shardKey: 1})); +st.shardColl(coll.getName(), {shardKey: 1}, {shardKey: 5}, {shardKey: 5}, name); + +foreignColl = db[foreignCollName]; +assert.commandWorked(foreignColl.insert({b: 5})); + +assert.commandWorked(db.createView(viewName, foreignCollName, [{$match: {b: {$gte: 0}}}])); +assert.commandWorked(db.createView(shardedViewName, name, [{$match: {b: {$gte: 0}}}])); + +// Both collections are unsharded. +runTest(foreignColl, + [{$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}], + true /* expectedToPushdown */); + +// Sharded main collection, unsharded right side. This is not expected to be eligible for pushdown +// because the $lookup will be preceded by a $mergeCursors stage on the merging shard. +runTest(coll, + [{$lookup: {from: foreignCollName, localField: "a", foreignField: "b", as: "out"}}], + false /* expectedToPushdown */); + +// Both collections are sharded. +runTest(coll, + [{$lookup: {from: name, localField: "a", foreignField: "b", as: "out"}}], + false /* expectedToPushdown */); + +// Unsharded main collection, sharded right side. +runTest(foreignColl, + [{$lookup: {from: name, localField: "a", foreignField: "b", as: "out"}}], + false /* expectedToPushdown */); + +// Unsharded main collection, unsharded view right side. +runTest(foreignColl, + [{$lookup: {from: viewName, localField: "a", foreignField: "b", as: "out"}}], + false /* expectedToPushdown */); + +// Unsharded main collection, sharded view on the right side. +runTest(foreignColl, + [{$lookup: {from: shardedViewName, localField: "a", foreignField: "b", as: "out"}}], + false /* expectedToPushdown */); +st.stop(); +}()); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index e7634306797..be02bd21de9 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -659,8 +659,13 @@ Status runAggregate(OperationContext* opCtx, auto initContext = [&](AutoGetCollectionViewMode m) -> void { ctx.emplace(opCtx, nss, m); for (const auto& ns : secondaryExecNssList) { - secondaryCtx.emplace_back( - std::make_unique<AutoGetCollectionForReadCommandMaybeLockFree>(opCtx, ns, m)); + // Avoid locking the main namespace multiple times (we can't lock a secondary + // namespace multiple times because 'secondaryExecNssList is a set already). This + // emulates the behavior of 'AutoGetCollectionMulti'. + if (ns != nss) { + secondaryCtx.emplace_back( + std::make_unique<AutoGetCollectionForReadCommandMaybeLockFree>(opCtx, ns, m)); + } } collections = MultiCollection(ctx, secondaryCtx); }; diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index f4c84b620ad..61b5173c21a 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -681,6 +681,9 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt( // following $unwind stage. if (nextUnwind && !_unwindSrc && nextUnwind->getUnwindPath() == _as.fullPath()) { _unwindSrc = std::move(nextUnwind); + + // We cannot push absorbed $unwind stages into SBE. + _sbeCompatible = false; container->erase(std::next(itr)); return itr; } @@ -765,7 +768,10 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt( return std::next(itr); } - // We can internalize the $match. + // We can internalize the $match. This $lookup should already be marked as SBE incompatible + // because a $match can only be internalized if an $unwind, which is SBE incompatible, was + // absorbed as well. + tassert(5843701, "This $lookup cannot be compatible with SBE", !_sbeCompatible); if (!_matchSrc) { _matchSrc = nextMatch; } else { @@ -1249,12 +1255,17 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson( "$lookup with a 'let' argument must also specify 'pipeline'", !hasLet); - return new DocumentSourceLookUp(std::move(fromNs), - std::move(as), - std::move(localField), - std::move(foreignField), - std::move(fromCollator), - pExpCtx); + auto lookupStage = new DocumentSourceLookUp(std::move(fromNs), + std::move(as), + std::move(localField), + std::move(foreignField), + std::move(fromCollator), + pExpCtx); + + // $lookup stages with local/foreignField specified are eligible for pushdown into SBE if + // the context allows it. + lookupStage->_sbeCompatible = pExpCtx->sbeCompatible; + return lookupStage; } } diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 9b5f87803fc..3fa95b7e151 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -223,6 +223,14 @@ public: boost::intrusive_ptr<DocumentSource> clone() const final; + bool sbeCompatible() const { + return _sbeCompatible; + } + + const NamespaceString& getFromNs() { + return _fromNs; + } + protected: GetNextResult doGetNext() final; void doDispose() final; @@ -365,6 +373,9 @@ private: // collation when not set explicitly. bool _hasExplicitCollation = false; + // Can this $lookup be pushed down into SBE? + bool _sbeCompatible = false; + // The aggregation pipeline to perform against the '_resolvedNs' namespace. Referenced view // namespaces have been resolved. std::vector<BSONObj> _resolvedPipeline; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 67d52f4e103..ecb2b13d0b5 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -65,6 +65,7 @@ #include "mongo/db/pipeline/document_source_geo_near_cursor.h" #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" +#include "mongo/db/pipeline/document_source_lookup.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_sample_from_random_cursor.h" @@ -100,48 +101,86 @@ using write_ops::InsertCommandRequest; namespace { /** - * Extracts a prefix of 'DocumentSourceGroup' stages from the given pipeline to prepare for - * pushdown of $group into the inner query layer so that it can be executed using SBE. Group stages - * are extracted from the pipeline under when all of the following conditions are met: + * Extracts a prefix of 'DocumentSourceGroup' and 'DocumentSourceLookUp' stages from the given + * pipeline to prepare for pushdown of $group and $lookup into the inner query layer so that it + * can be executed using SBE. + * Group stages are extracted from the pipeline when all of the following conditions are met: * 0. When the 'internalQueryForceClassicEngine' feature flag is 'false'. * 1. When 'allowDiskUse' is false. We currently don't support spilling in the SBE HashAgg * stage. This will change once that is supported when SERVER-58436 is complete. * 2. When the DocumentSourceGroup has 'doingMerge=false', this will change when we implement * hash table spilling in SERVER-58436. + * + * Lookup stages are extracted from the pipeline when all of the following conditions are met: + * 0. When the 'internalQueryForceClassicEngine' feature flag is 'false'. + * 1. When the 'featureFlagSBELookupPushdown' feature flag is 'true'. + * 2. When the 'internalEnableMultipleAutoGetCollections' flag is 'true' + * 3. The $lookup uses only the 'localField'/'foreignField' syntax (no pipelines). + * 4. The foreign collection is neither sharded nor a view. */ -std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleGroupsForPushdown( +std::vector<std::unique_ptr<InnerPipelineStageInterface>> extractSbeCompatibleStagesForPushdown( const intrusive_ptr<ExpressionContext>& expCtx, - const CollectionPtr& collection, + const MultiCollection& collections, const CanonicalQuery* cq, Pipeline* pipeline) { // We will eventually use the extracted group stages to populate 'CanonicalQuery::pipeline' // which requires stages to be wrapped in an interface. - std::vector<std::unique_ptr<InnerPipelineStageInterface>> groupsForPushdown; + std::vector<std::unique_ptr<InnerPipelineStageInterface>> stagesForPushdown; // This handles the case of unionWith against an unknown collection. - if (collection == nullptr) { + if (collections.getMainCollection() == nullptr) { return {}; } - if (!feature_flags::gFeatureFlagSBEGroupPushdown.isEnabled( - serverGlobalParams.featureCompatibility) || - cq->getForceClassicEngine()) { + // No pushdown if we're using the classic engine. + if (cq->getForceClassicEngine()) { return {}; } auto&& sources = pipeline->getSources(); for (auto itr = sources.begin(); itr != sources.end();) { - auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get()); - if (!(groupStage && groupStage->sbeCompatible()) || groupStage->doingMerge()) { - // Only pushdown a prefix of group stages that are supported by sbe. + // $group pushdown logic. + if (auto groupStage = dynamic_cast<DocumentSourceGroup*>(itr->get())) { + bool groupEligibleForPushdown = feature_flags::gFeatureFlagSBEGroupPushdown.isEnabled( + serverGlobalParams.featureCompatibility) && + groupStage->sbeCompatible() && !groupStage->doingMerge(); + if (groupEligibleForPushdown) { + stagesForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage)); + sources.erase(itr++); + continue; + } + break; + } + + // $lookup pushdown logic. + if (auto lookupStage = dynamic_cast<DocumentSourceLookUp*>(itr->get())) { + bool isForeignSharded = false; + bool isForeignView = false; + const auto& fromNs = lookupStage->getFromNs(); + const auto& foreignColl = collections.lookupCollection(fromNs); + if (foreignColl) { + isForeignSharded = foreignColl.isSharded(); + } else { + // If the right hand side targets a namespace that we can't find in + // 'collections', we infer that it is targeting a view. + isForeignView = true; + } + + bool lookupEligibleForPushdown = + feature_flags::gFeatureFlagSBELookupPushdown.isEnabledAndIgnoreFCV() && + internalEnableMultipleAutoGetCollections.load() && lookupStage->sbeCompatible() && + !isForeignSharded && !isForeignView; + uassert( + 5843700, "$lookup push down logic worked correctly", !lookupEligibleForPushdown); break; } - groupsForPushdown.push_back(std::make_unique<InnerPipelineStageImpl>(groupStage)); - sources.erase(itr++); + + // Current stage cannot be pushed down. + break; } - return groupsForPushdown; + return stagesForPushdown; } StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor( @@ -158,7 +197,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe const size_t plannerOpts, const MatchExpressionParser::AllowedFeatureSet& matcherFeatures, Pipeline* pipeline) { - const auto& collection = collections.getMainCollection(); + const auto& mainColl = collections.getMainCollection(); auto findCommand = std::make_unique<FindCommandRequest>(nss); query_request_helper::setTailableMode(expCtx->tailableMode, findCommand.get()); findCommand->setFilter(queryObj.getOwned()); @@ -240,7 +279,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe // example, if we have a document {a: [1,2]} and group by "a" a DISTINCT_SCAN on an "a" // index would produce one result for '1' and another for '2', which would be incorrect. auto distinctExecutor = getExecutorDistinct( - &collection, plannerOpts | QueryPlannerParams::STRICT_DISTINCT_ONLY, &parsedDistinct); + &mainColl, plannerOpts | QueryPlannerParams::STRICT_DISTINCT_ONLY, &parsedDistinct); if (!distinctExecutor.isOK()) { return distinctExecutor.getStatus().withContext( "Unable to use distinct scan to optimize $group stage"); @@ -257,8 +296,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe collections, std::move(cq.getValue()), [&](auto* canonicalQuery) { - canonicalQuery->setPipeline(extractSbeCompatibleGroupsForPushdown( - expCtx, collection, canonicalQuery, pipeline)); + canonicalQuery->setPipeline(extractSbeCompatibleStagesForPushdown( + expCtx, collections, canonicalQuery, pipeline)); }, permitYield, plannerOpts); diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index cd61dfefe7c..a08402df756 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -962,7 +962,7 @@ public: std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> buildExecutableTree( const QuerySolution& solution) const final { - // TODO SERVER-58437 We don't pass '_collections' to the function below because at the + // TODO SERVER-62677 We don't pass '_collections' to the function below because at the // moment, no pushdown is actually happening. This should be changed once the logic for // pushdown is implemented. return stage_builder::buildSlotBasedExecutableTree( diff --git a/src/mongo/db/query/multi_collection.h b/src/mongo/db/query/multi_collection.h index 77368d13682..9b3230a2cc6 100644 --- a/src/mongo/db/query/multi_collection.h +++ b/src/mongo/db/query/multi_collection.h @@ -80,6 +80,15 @@ public: return _secondaryColls; } + const CollectionPtr& lookupCollection(const NamespaceString& nss) const { + if (_mainCollName && nss == *_mainCollName) { + return *_mainColl; + } else if (auto itr = _secondaryColls.find(nss); itr != _secondaryColls.end()) { + return itr->second; + } + return CollectionPtr::null; + } + void clear() { _mainColl = &CollectionPtr::null; _secondaryColls.clear(); |