summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSvilen Mihaylov <svilen.mihaylov@mongodb.com>2022-04-25 18:39:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-25 19:11:22 +0000
commitaf7466641230ada452d552c45fc5c0d8c5189178 (patch)
tree67f3cfac0eebf83ba20504f642617a3c84544ad4 /src
parent938bb3e4d5578ba10c46361c9efc8c30c1629b0e (diff)
downloadmongo-af7466641230ada452d552c45fc5c0d8c5189178.tar.gz
SERVER-65900 Initial implementation of $lookup
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/exec/sbe/abt/abt_lower.cpp2
-rw-r--r--src/mongo/db/pipeline/abt/abt_document_source_visitor.cpp129
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h2
-rw-r--r--src/mongo/db/query/optimizer/cascades/ce_heuristic.cpp14
-rw-r--r--src/mongo/db/query/optimizer/cascades/implementers.cpp99
-rw-r--r--src/mongo/db/query/optimizer/cascades/logical_props_derivation.cpp26
-rw-r--r--src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp76
-rw-r--r--src/mongo/db/query/optimizer/props.cpp4
-rw-r--r--src/mongo/db/query/optimizer/props.h3
9 files changed, 331 insertions, 24 deletions
diff --git a/src/mongo/db/exec/sbe/abt/abt_lower.cpp b/src/mongo/db/exec/sbe/abt/abt_lower.cpp
index b2ca7b9bc1e..a7820613d7a 100644
--- a/src/mongo/db/exec/sbe/abt/abt_lower.cpp
+++ b/src/mongo/db/exec/sbe/abt/abt_lower.cpp
@@ -275,6 +275,8 @@ std::unique_ptr<sbe::EExpression> SBEExpressionLowering::transport(
name = "max";
} else if (name == "$addToSet") {
name = "addToSet";
+ } else if (name == "$push") {
+ name = "addToArray";
}
return sbe::makeE<sbe::EFunction>(name, toInlinedVector(std::move(args)));
diff --git a/src/mongo/db/pipeline/abt/abt_document_source_visitor.cpp b/src/mongo/db/pipeline/abt/abt_document_source_visitor.cpp
index 582f3263bfb..bf9dbfd8d9a 100644
--- a/src/mongo/db/pipeline/abt/abt_document_source_visitor.cpp
+++ b/src/mongo/db/pipeline/abt/abt_document_source_visitor.cpp
@@ -488,7 +488,134 @@ public:
}
void visit(const DocumentSourceLookUp* source) override {
- unsupportedStage(source);
+ // This is an **experimental** implementation of $lookup. To achieve fully compatible
+ // implementation we need the following:
+ // 1. Add support for unwind to emit not just the array elements, but in addition the
+ // array itself. Such unwinding needs to occur on the inner side in order to match the
+ // left side both against the elements and the array itself. The inner side would perform
+ // a regular unwind.
+ // 2. Add support for left outer join. Currently we only results when there is a match.
+ // 3. Add ability to generate unique values (sequential or otherwise) in order to
+ // eliminate reliance of _id. This can be achieved for example via a stateful function.
+ // Currently, after joining the unwound elements, we perform a de-duplication based on _id
+ // to determine which corresponding documents match.
+
+ uassert(6624303, "$lookup needs to be SBE compatible", source->sbeCompatible());
+
+ std::string scanDefName = source->getFromNs().coll().toString();
+ const ProjectionName& scanProjName = _ctx.getNextId("scan");
+
+ PrefixId prefixId;
+ ABT pipelineABT = _metadata._scanDefs.at(scanDefName).exists()
+ ? make<ScanNode>(scanProjName, scanDefName)
+ : make<ValueScanNode>(ProjectionNameVector{scanProjName});
+
+ const ProjectionName& localIdProjName = _ctx.getNextId("localId");
+ auto entry = _ctx.getNode();
+ _ctx.setNode<EvaluationNode>(entry._rootProjection,
+ localIdProjName,
+ make<EvalPath>(make<PathGet>("_id", make<PathIdentity>()),
+ make<Variable>(entry._rootProjection)),
+ std::move(entry._node));
+
+ const auto& localPath = source->getLocalField();
+ ProjectionName localProjName = entry._rootProjection;
+ for (size_t index = 0; index < localPath->getPathLength(); index++) {
+ ABT path = make<EvalPath>(
+ make<PathGet>(localPath->getFieldName(index).toString(), make<PathIdentity>()),
+ make<Variable>(std::move(localProjName)));
+
+ localProjName = _ctx.getNextId("local");
+ entry = _ctx.getNode();
+ _ctx.setNode<EvaluationNode>(
+ entry._rootProjection, localProjName, std::move(path), std::move(entry._node));
+
+ entry = _ctx.getNode();
+ _ctx.setNode<UnwindNode>(std::move(entry._rootProjection),
+ localProjName,
+ _ctx.getNextId("unwoundPidLocal"),
+ true /*retainNonArrays*/,
+ std::move(entry._node));
+ }
+
+ const ProjectionName& foreignIdProjName = _ctx.getNextId("foreignId");
+ pipelineABT =
+ make<EvaluationNode>(foreignIdProjName,
+ make<EvalPath>(make<PathGet>("_id", make<PathIdentity>()),
+ make<Variable>(scanProjName)),
+ std::move(pipelineABT));
+
+ const auto& foreignPath = source->getForeignField();
+ ProjectionName foreignProjName = scanProjName;
+ for (size_t index = 0; index < foreignPath->getPathLength(); index++) {
+ ABT path = make<EvalPath>(
+ make<PathGet>(foreignPath->getFieldName(index).toString(), make<PathIdentity>()),
+ make<Variable>(foreignProjName));
+
+ foreignProjName = _ctx.getNextId("foreign");
+ pipelineABT =
+ make<EvaluationNode>(foreignProjName, std::move(path), std::move(pipelineABT));
+
+ // For the last field on the path we need to also emit arrays as values themselves (we
+ // need a new unwind option).
+ pipelineABT = make<UnwindNode>(foreignProjName,
+ _ctx.getNextId("unwoundPidForeign"),
+ true /*retainNonArrays*/,
+ std::move(pipelineABT));
+ }
+
+ const auto comparisonExprFn = [](const ProjectionName& projName) {
+ return make<If>(make<FunctionCall>("exists", makeSeq(make<Variable>(projName))),
+ make<Variable>(projName),
+ Constant::null());
+ };
+
+ entry = _ctx.getNode();
+ // TODO: use LeftOuter join when we support it.
+ _ctx.setNode<BinaryJoinNode>(std::move(entry._rootProjection),
+ JoinType::Inner,
+ ProjectionNameSet{},
+ make<BinaryOp>(Operations::Eq,
+ comparisonExprFn(localProjName),
+ comparisonExprFn(foreignProjName)),
+ std::move(entry._node),
+ std::move(pipelineABT));
+
+ const ProjectionName& localRootProjId = _ctx.getNextId("localRoot");
+ const ProjectionName& foreignRootProjId = _ctx.getNextId("foreignRoot");
+ entry = _ctx.getNode();
+ ABT groupByDedupNode = make<GroupByNode>(
+ ProjectionNameVector{localIdProjName, foreignIdProjName},
+ ProjectionNameVector{localRootProjId, foreignRootProjId},
+ makeSeq(make<FunctionCall>("$first", makeSeq(make<Variable>(entry._rootProjection))),
+ make<FunctionCall>("$first", makeSeq(make<Variable>(scanProjName)))),
+ std::move(entry._node));
+
+ const ProjectionName& localFoldedProjName = _ctx.getNextId("localFolded");
+ const ProjectionName& foreignFoldedProjName = _ctx.getNextId("foreignFolded");
+ ABT groupByFoldNode = make<GroupByNode>(
+ ProjectionNameVector{localIdProjName},
+ ProjectionNameVector{localFoldedProjName, foreignFoldedProjName},
+ makeSeq(make<FunctionCall>("$first", makeSeq(make<Variable>(localRootProjId))),
+ make<FunctionCall>("$push", makeSeq(make<Variable>(foreignRootProjId)))),
+ std::move(groupByDedupNode));
+
+ ABT resultPath = translateFieldPath(
+ source->getAsField(),
+ make<PathConstant>(make<Variable>(foreignFoldedProjName)),
+ [](const std::string& fieldName, const bool isLastElement, ABT input) {
+ if (!isLastElement) {
+ input = make<PathTraverse>(std::move(input));
+ }
+ return make<PathField>(fieldName, std::move(input));
+ });
+
+ const ProjectionName& resultProjName = _ctx.getNextId("result");
+ _ctx.setNode<EvaluationNode>(
+ resultProjName,
+ resultProjName,
+ make<EvalPath>(std::move(resultPath), make<Variable>(localFoldedProjName)),
+ std::move(groupByFoldNode));
}
void visit(const DocumentSourceMatch* source) override {
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index eb28f357291..4a8559d89fa 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -250,7 +250,7 @@ public:
return _sbeCompatible;
}
- const NamespaceString& getFromNs() {
+ const NamespaceString& getFromNs() const {
return _fromNs;
}
diff --git a/src/mongo/db/query/optimizer/cascades/ce_heuristic.cpp b/src/mongo/db/query/optimizer/cascades/ce_heuristic.cpp
index 263fab109a5..6d89cd1a436 100644
--- a/src/mongo/db/query/optimizer/cascades/ce_heuristic.cpp
+++ b/src/mongo/db/query/optimizer/cascades/ce_heuristic.cpp
@@ -91,10 +91,18 @@ public:
}
CEType transport(const BinaryJoinNode& node,
- CEType /*leftChildResult*/,
- CEType /*rightChildResult*/,
+ CEType leftChildResult,
+ CEType rightChildResult,
CEType /*exprResult*/) {
- uasserted(6624039, "CE derivation not implemented.");
+ const auto& filter = node.getFilter();
+
+ double selectivity = 0.1;
+ if (filter == Constant::boolean(false)) {
+ selectivity = 0.0;
+ } else if (filter == Constant::boolean(true)) {
+ selectivity = 1.0;
+ }
+ return leftChildResult * rightChildResult * selectivity;
}
CEType transport(const UnionNode& node,
diff --git a/src/mongo/db/query/optimizer/cascades/implementers.cpp b/src/mongo/db/query/optimizer/cascades/implementers.cpp
index 7d7a21c2b52..9c951e14c6b 100644
--- a/src/mongo/db/query/optimizer/cascades/implementers.cpp
+++ b/src/mongo/db/query/optimizer/cascades/implementers.cpp
@@ -730,11 +730,6 @@ public:
PhysProps leftPhysProps = _physProps;
PhysProps rightPhysProps = _physProps;
- // Specifically do not propagate limit-skip.
- // TODO: handle similarly to physical join.
- removeProperty<LimitSkipRequirement>(leftPhysProps);
- removeProperty<LimitSkipRequirement>(rightPhysProps);
-
getProperty<DistributionRequirement>(leftPhysProps).setDisableExchanges(false);
getProperty<DistributionRequirement>(rightPhysProps).setDisableExchanges(false);
@@ -821,12 +816,98 @@ public:
}
}
- void operator()(const ABT& /*n*/, const BinaryJoinNode& node) {
- // TODO: optimize binary joins
- uasserted(6624105, "not implemented");
+ void operator()(const ABT& n, const BinaryJoinNode& node) {
+ if (hasProperty<LimitSkipRequirement>(_physProps)) {
+ // We cannot satisfy limit-skip requirements.
+ return;
+ }
+ if (getPropertyConst<DistributionRequirement>(_physProps)
+ .getDistributionAndProjections()
+ ._type != DistributionType::Centralized) {
+ // For now we only support centralized distribution.
+ return;
+ }
+
+ const GroupIdType leftGroupId =
+ node.getLeftChild().cast<MemoLogicalDelegatorNode>()->getGroupId();
+ const GroupIdType rightGroupId =
+ node.getRightChild().cast<MemoLogicalDelegatorNode>()->getGroupId();
+
+ const LogicalProps& leftLogicalProps = _memo.getGroup(leftGroupId)._logicalProperties;
+ const LogicalProps& rightLogicalProps = _memo.getGroup(rightGroupId)._logicalProperties;
+
+ const ProjectionNameSet& leftProjections =
+ getPropertyConst<ProjectionAvailability>(leftLogicalProps).getProjections();
+ const ProjectionNameSet& rightProjections =
+ getPropertyConst<ProjectionAvailability>(rightLogicalProps).getProjections();
+
+ PhysProps leftPhysProps = _physProps;
+ PhysProps rightPhysProps = _physProps;
+
+ {
+ auto reqProjections =
+ getPropertyConst<ProjectionRequirement>(_physProps).getProjections();
+
+ // Add expression references to requirements.
+ VariableNameSetType references = collectVariableReferences(n);
+ for (const auto& varName : references) {
+ reqProjections.emplace_back(varName);
+ }
+
+ // Split required projections between inner and outer side.
+ ProjectionNameOrderPreservingSet leftChildProjections;
+ ProjectionNameOrderPreservingSet rightChildProjections;
+
+ for (const ProjectionName& projectionName : reqProjections.getVector()) {
+
+ if (leftProjections.count(projectionName) > 0) {
+ leftChildProjections.emplace_back(projectionName);
+ } else if (rightProjections.count(projectionName) > 0) {
+ rightChildProjections.emplace_back(projectionName);
+ } else {
+ uasserted(6624304,
+ "Required projection must appear in either the left or the right "
+ "child projections");
+ return;
+ }
+ }
+
+ setPropertyOverwrite<ProjectionRequirement>(leftPhysProps,
+ std::move(leftChildProjections));
+ setPropertyOverwrite<ProjectionRequirement>(rightPhysProps,
+ std::move(rightChildProjections));
+ }
+
+ if (hasProperty<CollationRequirement>(_physProps)) {
+ const auto& collationSpec =
+ getPropertyConst<CollationRequirement>(_physProps).getCollationSpec();
+
+ // Split collation between inner and outer side.
+ const CollationSplitResult& collationSplit = splitCollationSpec(
+ "" /*ridProjName*/, collationSpec, leftProjections, rightProjections);
+ if (!collationSplit._validSplit) {
+ return;
+ }
+
+ setPropertyOverwrite<CollationRequirement>(leftPhysProps,
+ collationSplit._leftCollation);
+ setPropertyOverwrite<CollationRequirement>(rightPhysProps,
+ collationSplit._leftCollation);
+ }
+
+ // TODO: consider hash join if the predicate is equality.
+ ABT physicalJoin = n;
+ BinaryJoinNode& newNode = *physicalJoin.cast<BinaryJoinNode>();
+
+ optimizeChildren<BinaryJoinNode>(
+ _queue,
+ kDefaultPriority,
+ std::move(physicalJoin),
+ ChildPropsType{{&newNode.getLeftChild(), std::move(leftPhysProps)},
+ {&newNode.getRightChild(), std::move(rightPhysProps)}});
}
- void operator()(const ABT& n, const UnionNode& node) {
+ void operator()(const ABT& /*n*/, const UnionNode& node) {
if (hasProperty<LimitSkipRequirement>(_physProps)) {
// We cannot satisfy limit-skip requirements.
return;
diff --git a/src/mongo/db/query/optimizer/cascades/logical_props_derivation.cpp b/src/mongo/db/query/optimizer/cascades/logical_props_derivation.cpp
index 617d5a2074c..6498fd4fe26 100644
--- a/src/mongo/db/query/optimizer/cascades/logical_props_derivation.cpp
+++ b/src/mongo/db/query/optimizer/cascades/logical_props_derivation.cpp
@@ -249,12 +249,28 @@ public:
}
LogicalProps transport(const BinaryJoinNode& node,
- LogicalProps /*leftChildResult*/,
- LogicalProps /*rightChildResult*/,
+ LogicalProps leftChildResult,
+ LogicalProps rightChildResult,
LogicalProps /*exprResult*/) {
- // TODO: remove indexing availability property when implemented.
- // TODO: combine scan defs from all children for CollectionAvailability.
- uasserted(6624043, "Logical property derivation not implemented.");
+ // We are specifically not adding the node's projection to ProjectionAvailability here.
+ // The logical properties already contains projection availability which is derived first
+ // when the memo group is created.
+
+ LogicalProps result = std::move(leftChildResult);
+ auto& mergedScanDefs = getProperty<CollectionAvailability>(result).getScanDefSet();
+ auto& mergedDistributionSet =
+ getProperty<DistributionAvailability>(result).getDistributionSet();
+
+ auto rightChildScanDefs =
+ getProperty<CollectionAvailability>(rightChildResult).getScanDefSet();
+ mergedScanDefs.merge(std::move(rightChildScanDefs));
+
+ auto rightChildDistributionSet =
+ getProperty<DistributionAvailability>(rightChildResult).getDistributionSet();
+ mergedDistributionSet.merge(std::move(rightChildDistributionSet));
+
+ removeProperty<IndexingAvailability>(result);
+ return maybeUpdateNodePropsMap(node, std::move(result));
}
LogicalProps transport(const UnionNode& node,
diff --git a/src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp b/src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp
index dce73fa0b71..1a2f5748be8 100644
--- a/src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp
+++ b/src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp
@@ -4532,5 +4532,81 @@ TEST(PhysRewriter, UnionRewrite) {
optimized);
}
+TEST(PhysRewriter, JoinRewrite) {
+ using namespace properties;
+
+ ABT scanNode1 = make<ScanNode>("ptest1", "test1");
+ ABT scanNode2 = make<ScanNode>("ptest2", "test2");
+
+ // Each branch produces two projections, pUnion1 and pUnion2.
+ ABT evalNode1 = make<EvaluationNode>(
+ "p11",
+ make<EvalPath>(make<PathGet>("a", make<PathIdentity>()), make<Variable>("ptest1")),
+ std::move(scanNode1));
+ ABT evalNode2 = make<EvaluationNode>(
+ "p12",
+ make<EvalPath>(make<PathGet>("b", make<PathIdentity>()), make<Variable>("ptest1")),
+ std::move(evalNode1));
+
+ ABT evalNode3 = make<EvaluationNode>(
+ "p21",
+ make<EvalPath>(make<PathGet>("a", make<PathIdentity>()), make<Variable>("ptest2")),
+ std::move(scanNode2));
+ ABT evalNode4 = make<EvaluationNode>(
+ "p22",
+ make<EvalPath>(make<PathGet>("b", make<PathIdentity>()), make<Variable>("ptest2")),
+ std::move(evalNode3));
+
+ ABT joinNode = make<BinaryJoinNode>(
+ JoinType::Inner,
+ ProjectionNameSet{},
+ make<BinaryOp>(Operations::Eq, make<Variable>("p12"), make<Variable>("p22")),
+ std::move(evalNode2),
+ std::move(evalNode4));
+
+ ABT rootNode = make<RootNode>(ProjectionRequirement{ProjectionNameVector{"p11", "p21"}},
+ std::move(joinNode));
+
+ PrefixId prefixId;
+ OptPhaseManager phaseManager(
+ {OptPhaseManager::OptPhase::MemoSubstitutionPhase,
+ OptPhaseManager::OptPhase::MemoExplorationPhase,
+ OptPhaseManager::OptPhase::MemoImplementationPhase},
+ prefixId,
+ {{{"test1", {{}, {}}}, {"test2", {{}, {}}}}},
+ {true /*debugMode*/, 2 /*debugLevel*/, DebugInfo::kIterationLimitForTests});
+
+ ABT optimized = std::move(rootNode);
+ ASSERT_TRUE(phaseManager.optimize(optimized));
+ ASSERT_EQ(4, phaseManager.getMemo().getStats()._physPlanExplorationCount);
+
+ ASSERT_EXPLAIN_V2(
+ "Root []\n"
+ "| | projections: \n"
+ "| | p11\n"
+ "| | p21\n"
+ "| RefBlock: \n"
+ "| Variable [p11]\n"
+ "| Variable [p21]\n"
+ "BinaryJoin [joinType: Inner]\n"
+ "| | BinaryOp [Eq]\n"
+ "| | | Variable [p22]\n"
+ "| | Variable [p12]\n"
+ "| PhysicalScan [{'a': p21, 'b': p22}, test2]\n"
+ "| BindBlock:\n"
+ "| [p21]\n"
+ "| Source []\n"
+ "| [p22]\n"
+ "| Source []\n"
+ "PhysicalScan [{'a': p11, 'b': p12}, test1]\n"
+ " BindBlock:\n"
+ " [p11]\n"
+ " Source []\n"
+ " [p12]\n"
+ " Source []\n",
+ optimized);
+}
+
+
} // namespace
} // namespace mongo::optimizer
diff --git a/src/mongo/db/query/optimizer/props.cpp b/src/mongo/db/query/optimizer/props.cpp
index c97fe18b466..892035d5924 100644
--- a/src/mongo/db/query/optimizer/props.cpp
+++ b/src/mongo/db/query/optimizer/props.cpp
@@ -34,14 +34,14 @@
namespace mongo::optimizer::properties {
CollationRequirement::CollationRequirement(ProjectionCollationSpec spec) : _spec(std::move(spec)) {
+ uassert(6624302, "Empty collation spec", !_spec.empty());
+
ProjectionNameSet projections;
for (const auto& entry : _spec) {
uassert(6624021, "Repeated projection name", projections.insert(entry.first).second);
}
}
-CollationRequirement CollationRequirement::Empty = CollationRequirement();
-
bool CollationRequirement::operator==(const CollationRequirement& other) const {
return _spec == other._spec;
}
diff --git a/src/mongo/db/query/optimizer/props.h b/src/mongo/db/query/optimizer/props.h
index 67dd4e090f1..0cd6941e46a 100644
--- a/src/mongo/db/query/optimizer/props.h
+++ b/src/mongo/db/query/optimizer/props.h
@@ -170,9 +170,6 @@ inline auto makePhysProps(Args&&... args) {
*/
class CollationRequirement final : public PhysPropertyTag {
public:
- static CollationRequirement Empty;
-
- CollationRequirement() = default;
CollationRequirement(ProjectionCollationSpec spec);
bool operator==(const CollationRequirement& other) const;