diff options
author | Svilen Mihaylov <svilen.mihaylov@mongodb.com> | 2022-04-25 18:39:45 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-25 19:11:22 +0000 |
commit | af7466641230ada452d552c45fc5c0d8c5189178 (patch) | |
tree | 67f3cfac0eebf83ba20504f642617a3c84544ad4 /src | |
parent | 938bb3e4d5578ba10c46361c9efc8c30c1629b0e (diff) | |
download | mongo-af7466641230ada452d552c45fc5c0d8c5189178.tar.gz |
SERVER-65900 Initial implementation of $lookup
Diffstat (limited to 'src')
9 files changed, 331 insertions, 24 deletions
diff --git a/src/mongo/db/exec/sbe/abt/abt_lower.cpp b/src/mongo/db/exec/sbe/abt/abt_lower.cpp index b2ca7b9bc1e..a7820613d7a 100644 --- a/src/mongo/db/exec/sbe/abt/abt_lower.cpp +++ b/src/mongo/db/exec/sbe/abt/abt_lower.cpp @@ -275,6 +275,8 @@ std::unique_ptr<sbe::EExpression> SBEExpressionLowering::transport( name = "max"; } else if (name == "$addToSet") { name = "addToSet"; + } else if (name == "$push") { + name = "addToArray"; } return sbe::makeE<sbe::EFunction>(name, toInlinedVector(std::move(args))); diff --git a/src/mongo/db/pipeline/abt/abt_document_source_visitor.cpp b/src/mongo/db/pipeline/abt/abt_document_source_visitor.cpp index 582f3263bfb..bf9dbfd8d9a 100644 --- a/src/mongo/db/pipeline/abt/abt_document_source_visitor.cpp +++ b/src/mongo/db/pipeline/abt/abt_document_source_visitor.cpp @@ -488,7 +488,134 @@ public: } void visit(const DocumentSourceLookUp* source) override { - unsupportedStage(source); + // This is an **experimental** implementation of $lookup. To achieve fully compatible + // implementation we need the following: + // 1. Add support for unwind to emit not just the array elements, but in addition the + // array itself. Such unwinding needs to occur on the inner side in order to match the + // left side both against the elements and the array itself. The inner side would perform + // a regular unwind. + // 2. Add support for left outer join. Currently we only results when there is a match. + // 3. Add ability to generate unique values (sequential or otherwise) in order to + // eliminate reliance of _id. This can be achieved for example via a stateful function. + // Currently, after joining the unwound elements, we perform a de-duplication based on _id + // to determine which corresponding documents match. + + uassert(6624303, "$lookup needs to be SBE compatible", source->sbeCompatible()); + + std::string scanDefName = source->getFromNs().coll().toString(); + const ProjectionName& scanProjName = _ctx.getNextId("scan"); + + PrefixId prefixId; + ABT pipelineABT = _metadata._scanDefs.at(scanDefName).exists() + ? make<ScanNode>(scanProjName, scanDefName) + : make<ValueScanNode>(ProjectionNameVector{scanProjName}); + + const ProjectionName& localIdProjName = _ctx.getNextId("localId"); + auto entry = _ctx.getNode(); + _ctx.setNode<EvaluationNode>(entry._rootProjection, + localIdProjName, + make<EvalPath>(make<PathGet>("_id", make<PathIdentity>()), + make<Variable>(entry._rootProjection)), + std::move(entry._node)); + + const auto& localPath = source->getLocalField(); + ProjectionName localProjName = entry._rootProjection; + for (size_t index = 0; index < localPath->getPathLength(); index++) { + ABT path = make<EvalPath>( + make<PathGet>(localPath->getFieldName(index).toString(), make<PathIdentity>()), + make<Variable>(std::move(localProjName))); + + localProjName = _ctx.getNextId("local"); + entry = _ctx.getNode(); + _ctx.setNode<EvaluationNode>( + entry._rootProjection, localProjName, std::move(path), std::move(entry._node)); + + entry = _ctx.getNode(); + _ctx.setNode<UnwindNode>(std::move(entry._rootProjection), + localProjName, + _ctx.getNextId("unwoundPidLocal"), + true /*retainNonArrays*/, + std::move(entry._node)); + } + + const ProjectionName& foreignIdProjName = _ctx.getNextId("foreignId"); + pipelineABT = + make<EvaluationNode>(foreignIdProjName, + make<EvalPath>(make<PathGet>("_id", make<PathIdentity>()), + make<Variable>(scanProjName)), + std::move(pipelineABT)); + + const auto& foreignPath = source->getForeignField(); + ProjectionName foreignProjName = scanProjName; + for (size_t index = 0; index < foreignPath->getPathLength(); index++) { + ABT path = make<EvalPath>( + make<PathGet>(foreignPath->getFieldName(index).toString(), make<PathIdentity>()), + make<Variable>(foreignProjName)); + + foreignProjName = _ctx.getNextId("foreign"); + pipelineABT = + make<EvaluationNode>(foreignProjName, std::move(path), std::move(pipelineABT)); + + // For the last field on the path we need to also emit arrays as values themselves (we + // need a new unwind option). + pipelineABT = make<UnwindNode>(foreignProjName, + _ctx.getNextId("unwoundPidForeign"), + true /*retainNonArrays*/, + std::move(pipelineABT)); + } + + const auto comparisonExprFn = [](const ProjectionName& projName) { + return make<If>(make<FunctionCall>("exists", makeSeq(make<Variable>(projName))), + make<Variable>(projName), + Constant::null()); + }; + + entry = _ctx.getNode(); + // TODO: use LeftOuter join when we support it. + _ctx.setNode<BinaryJoinNode>(std::move(entry._rootProjection), + JoinType::Inner, + ProjectionNameSet{}, + make<BinaryOp>(Operations::Eq, + comparisonExprFn(localProjName), + comparisonExprFn(foreignProjName)), + std::move(entry._node), + std::move(pipelineABT)); + + const ProjectionName& localRootProjId = _ctx.getNextId("localRoot"); + const ProjectionName& foreignRootProjId = _ctx.getNextId("foreignRoot"); + entry = _ctx.getNode(); + ABT groupByDedupNode = make<GroupByNode>( + ProjectionNameVector{localIdProjName, foreignIdProjName}, + ProjectionNameVector{localRootProjId, foreignRootProjId}, + makeSeq(make<FunctionCall>("$first", makeSeq(make<Variable>(entry._rootProjection))), + make<FunctionCall>("$first", makeSeq(make<Variable>(scanProjName)))), + std::move(entry._node)); + + const ProjectionName& localFoldedProjName = _ctx.getNextId("localFolded"); + const ProjectionName& foreignFoldedProjName = _ctx.getNextId("foreignFolded"); + ABT groupByFoldNode = make<GroupByNode>( + ProjectionNameVector{localIdProjName}, + ProjectionNameVector{localFoldedProjName, foreignFoldedProjName}, + makeSeq(make<FunctionCall>("$first", makeSeq(make<Variable>(localRootProjId))), + make<FunctionCall>("$push", makeSeq(make<Variable>(foreignRootProjId)))), + std::move(groupByDedupNode)); + + ABT resultPath = translateFieldPath( + source->getAsField(), + make<PathConstant>(make<Variable>(foreignFoldedProjName)), + [](const std::string& fieldName, const bool isLastElement, ABT input) { + if (!isLastElement) { + input = make<PathTraverse>(std::move(input)); + } + return make<PathField>(fieldName, std::move(input)); + }); + + const ProjectionName& resultProjName = _ctx.getNextId("result"); + _ctx.setNode<EvaluationNode>( + resultProjName, + resultProjName, + make<EvalPath>(std::move(resultPath), make<Variable>(localFoldedProjName)), + std::move(groupByFoldNode)); } void visit(const DocumentSourceMatch* source) override { diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index eb28f357291..4a8559d89fa 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -250,7 +250,7 @@ public: return _sbeCompatible; } - const NamespaceString& getFromNs() { + const NamespaceString& getFromNs() const { return _fromNs; } diff --git a/src/mongo/db/query/optimizer/cascades/ce_heuristic.cpp b/src/mongo/db/query/optimizer/cascades/ce_heuristic.cpp index 263fab109a5..6d89cd1a436 100644 --- a/src/mongo/db/query/optimizer/cascades/ce_heuristic.cpp +++ b/src/mongo/db/query/optimizer/cascades/ce_heuristic.cpp @@ -91,10 +91,18 @@ public: } CEType transport(const BinaryJoinNode& node, - CEType /*leftChildResult*/, - CEType /*rightChildResult*/, + CEType leftChildResult, + CEType rightChildResult, CEType /*exprResult*/) { - uasserted(6624039, "CE derivation not implemented."); + const auto& filter = node.getFilter(); + + double selectivity = 0.1; + if (filter == Constant::boolean(false)) { + selectivity = 0.0; + } else if (filter == Constant::boolean(true)) { + selectivity = 1.0; + } + return leftChildResult * rightChildResult * selectivity; } CEType transport(const UnionNode& node, diff --git a/src/mongo/db/query/optimizer/cascades/implementers.cpp b/src/mongo/db/query/optimizer/cascades/implementers.cpp index 7d7a21c2b52..9c951e14c6b 100644 --- a/src/mongo/db/query/optimizer/cascades/implementers.cpp +++ b/src/mongo/db/query/optimizer/cascades/implementers.cpp @@ -730,11 +730,6 @@ public: PhysProps leftPhysProps = _physProps; PhysProps rightPhysProps = _physProps; - // Specifically do not propagate limit-skip. - // TODO: handle similarly to physical join. - removeProperty<LimitSkipRequirement>(leftPhysProps); - removeProperty<LimitSkipRequirement>(rightPhysProps); - getProperty<DistributionRequirement>(leftPhysProps).setDisableExchanges(false); getProperty<DistributionRequirement>(rightPhysProps).setDisableExchanges(false); @@ -821,12 +816,98 @@ public: } } - void operator()(const ABT& /*n*/, const BinaryJoinNode& node) { - // TODO: optimize binary joins - uasserted(6624105, "not implemented"); + void operator()(const ABT& n, const BinaryJoinNode& node) { + if (hasProperty<LimitSkipRequirement>(_physProps)) { + // We cannot satisfy limit-skip requirements. + return; + } + if (getPropertyConst<DistributionRequirement>(_physProps) + .getDistributionAndProjections() + ._type != DistributionType::Centralized) { + // For now we only support centralized distribution. + return; + } + + const GroupIdType leftGroupId = + node.getLeftChild().cast<MemoLogicalDelegatorNode>()->getGroupId(); + const GroupIdType rightGroupId = + node.getRightChild().cast<MemoLogicalDelegatorNode>()->getGroupId(); + + const LogicalProps& leftLogicalProps = _memo.getGroup(leftGroupId)._logicalProperties; + const LogicalProps& rightLogicalProps = _memo.getGroup(rightGroupId)._logicalProperties; + + const ProjectionNameSet& leftProjections = + getPropertyConst<ProjectionAvailability>(leftLogicalProps).getProjections(); + const ProjectionNameSet& rightProjections = + getPropertyConst<ProjectionAvailability>(rightLogicalProps).getProjections(); + + PhysProps leftPhysProps = _physProps; + PhysProps rightPhysProps = _physProps; + + { + auto reqProjections = + getPropertyConst<ProjectionRequirement>(_physProps).getProjections(); + + // Add expression references to requirements. + VariableNameSetType references = collectVariableReferences(n); + for (const auto& varName : references) { + reqProjections.emplace_back(varName); + } + + // Split required projections between inner and outer side. + ProjectionNameOrderPreservingSet leftChildProjections; + ProjectionNameOrderPreservingSet rightChildProjections; + + for (const ProjectionName& projectionName : reqProjections.getVector()) { + + if (leftProjections.count(projectionName) > 0) { + leftChildProjections.emplace_back(projectionName); + } else if (rightProjections.count(projectionName) > 0) { + rightChildProjections.emplace_back(projectionName); + } else { + uasserted(6624304, + "Required projection must appear in either the left or the right " + "child projections"); + return; + } + } + + setPropertyOverwrite<ProjectionRequirement>(leftPhysProps, + std::move(leftChildProjections)); + setPropertyOverwrite<ProjectionRequirement>(rightPhysProps, + std::move(rightChildProjections)); + } + + if (hasProperty<CollationRequirement>(_physProps)) { + const auto& collationSpec = + getPropertyConst<CollationRequirement>(_physProps).getCollationSpec(); + + // Split collation between inner and outer side. + const CollationSplitResult& collationSplit = splitCollationSpec( + "" /*ridProjName*/, collationSpec, leftProjections, rightProjections); + if (!collationSplit._validSplit) { + return; + } + + setPropertyOverwrite<CollationRequirement>(leftPhysProps, + collationSplit._leftCollation); + setPropertyOverwrite<CollationRequirement>(rightPhysProps, + collationSplit._leftCollation); + } + + // TODO: consider hash join if the predicate is equality. + ABT physicalJoin = n; + BinaryJoinNode& newNode = *physicalJoin.cast<BinaryJoinNode>(); + + optimizeChildren<BinaryJoinNode>( + _queue, + kDefaultPriority, + std::move(physicalJoin), + ChildPropsType{{&newNode.getLeftChild(), std::move(leftPhysProps)}, + {&newNode.getRightChild(), std::move(rightPhysProps)}}); } - void operator()(const ABT& n, const UnionNode& node) { + void operator()(const ABT& /*n*/, const UnionNode& node) { if (hasProperty<LimitSkipRequirement>(_physProps)) { // We cannot satisfy limit-skip requirements. return; diff --git a/src/mongo/db/query/optimizer/cascades/logical_props_derivation.cpp b/src/mongo/db/query/optimizer/cascades/logical_props_derivation.cpp index 617d5a2074c..6498fd4fe26 100644 --- a/src/mongo/db/query/optimizer/cascades/logical_props_derivation.cpp +++ b/src/mongo/db/query/optimizer/cascades/logical_props_derivation.cpp @@ -249,12 +249,28 @@ public: } LogicalProps transport(const BinaryJoinNode& node, - LogicalProps /*leftChildResult*/, - LogicalProps /*rightChildResult*/, + LogicalProps leftChildResult, + LogicalProps rightChildResult, LogicalProps /*exprResult*/) { - // TODO: remove indexing availability property when implemented. - // TODO: combine scan defs from all children for CollectionAvailability. - uasserted(6624043, "Logical property derivation not implemented."); + // We are specifically not adding the node's projection to ProjectionAvailability here. + // The logical properties already contains projection availability which is derived first + // when the memo group is created. + + LogicalProps result = std::move(leftChildResult); + auto& mergedScanDefs = getProperty<CollectionAvailability>(result).getScanDefSet(); + auto& mergedDistributionSet = + getProperty<DistributionAvailability>(result).getDistributionSet(); + + auto rightChildScanDefs = + getProperty<CollectionAvailability>(rightChildResult).getScanDefSet(); + mergedScanDefs.merge(std::move(rightChildScanDefs)); + + auto rightChildDistributionSet = + getProperty<DistributionAvailability>(rightChildResult).getDistributionSet(); + mergedDistributionSet.merge(std::move(rightChildDistributionSet)); + + removeProperty<IndexingAvailability>(result); + return maybeUpdateNodePropsMap(node, std::move(result)); } LogicalProps transport(const UnionNode& node, diff --git a/src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp b/src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp index dce73fa0b71..1a2f5748be8 100644 --- a/src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp +++ b/src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp @@ -4532,5 +4532,81 @@ TEST(PhysRewriter, UnionRewrite) { optimized); } +TEST(PhysRewriter, JoinRewrite) { + using namespace properties; + + ABT scanNode1 = make<ScanNode>("ptest1", "test1"); + ABT scanNode2 = make<ScanNode>("ptest2", "test2"); + + // Each branch produces two projections, pUnion1 and pUnion2. + ABT evalNode1 = make<EvaluationNode>( + "p11", + make<EvalPath>(make<PathGet>("a", make<PathIdentity>()), make<Variable>("ptest1")), + std::move(scanNode1)); + ABT evalNode2 = make<EvaluationNode>( + "p12", + make<EvalPath>(make<PathGet>("b", make<PathIdentity>()), make<Variable>("ptest1")), + std::move(evalNode1)); + + ABT evalNode3 = make<EvaluationNode>( + "p21", + make<EvalPath>(make<PathGet>("a", make<PathIdentity>()), make<Variable>("ptest2")), + std::move(scanNode2)); + ABT evalNode4 = make<EvaluationNode>( + "p22", + make<EvalPath>(make<PathGet>("b", make<PathIdentity>()), make<Variable>("ptest2")), + std::move(evalNode3)); + + ABT joinNode = make<BinaryJoinNode>( + JoinType::Inner, + ProjectionNameSet{}, + make<BinaryOp>(Operations::Eq, make<Variable>("p12"), make<Variable>("p22")), + std::move(evalNode2), + std::move(evalNode4)); + + ABT rootNode = make<RootNode>(ProjectionRequirement{ProjectionNameVector{"p11", "p21"}}, + std::move(joinNode)); + + PrefixId prefixId; + OptPhaseManager phaseManager( + {OptPhaseManager::OptPhase::MemoSubstitutionPhase, + OptPhaseManager::OptPhase::MemoExplorationPhase, + OptPhaseManager::OptPhase::MemoImplementationPhase}, + prefixId, + {{{"test1", {{}, {}}}, {"test2", {{}, {}}}}}, + {true /*debugMode*/, 2 /*debugLevel*/, DebugInfo::kIterationLimitForTests}); + + ABT optimized = std::move(rootNode); + ASSERT_TRUE(phaseManager.optimize(optimized)); + ASSERT_EQ(4, phaseManager.getMemo().getStats()._physPlanExplorationCount); + + ASSERT_EXPLAIN_V2( + "Root []\n" + "| | projections: \n" + "| | p11\n" + "| | p21\n" + "| RefBlock: \n" + "| Variable [p11]\n" + "| Variable [p21]\n" + "BinaryJoin [joinType: Inner]\n" + "| | BinaryOp [Eq]\n" + "| | | Variable [p22]\n" + "| | Variable [p12]\n" + "| PhysicalScan [{'a': p21, 'b': p22}, test2]\n" + "| BindBlock:\n" + "| [p21]\n" + "| Source []\n" + "| [p22]\n" + "| Source []\n" + "PhysicalScan [{'a': p11, 'b': p12}, test1]\n" + " BindBlock:\n" + " [p11]\n" + " Source []\n" + " [p12]\n" + " Source []\n", + optimized); +} + + } // namespace } // namespace mongo::optimizer diff --git a/src/mongo/db/query/optimizer/props.cpp b/src/mongo/db/query/optimizer/props.cpp index c97fe18b466..892035d5924 100644 --- a/src/mongo/db/query/optimizer/props.cpp +++ b/src/mongo/db/query/optimizer/props.cpp @@ -34,14 +34,14 @@ namespace mongo::optimizer::properties { CollationRequirement::CollationRequirement(ProjectionCollationSpec spec) : _spec(std::move(spec)) { + uassert(6624302, "Empty collation spec", !_spec.empty()); + ProjectionNameSet projections; for (const auto& entry : _spec) { uassert(6624021, "Repeated projection name", projections.insert(entry.first).second); } } -CollationRequirement CollationRequirement::Empty = CollationRequirement(); - bool CollationRequirement::operator==(const CollationRequirement& other) const { return _spec == other._spec; } diff --git a/src/mongo/db/query/optimizer/props.h b/src/mongo/db/query/optimizer/props.h index 67dd4e090f1..0cd6941e46a 100644 --- a/src/mongo/db/query/optimizer/props.h +++ b/src/mongo/db/query/optimizer/props.h @@ -170,9 +170,6 @@ inline auto makePhysProps(Args&&... args) { */ class CollationRequirement final : public PhysPropertyTag { public: - static CollationRequirement Empty; - - CollationRequirement() = default; CollationRequirement(ProjectionCollationSpec spec); bool operator==(const CollationRequirement& other) const; |