summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/optimizer/utils/interval_utils.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/query/optimizer/utils/interval_utils.cpp')
-rw-r--r--src/mongo/db/query/optimizer/utils/interval_utils.cpp137
1 files changed, 101 insertions, 36 deletions
diff --git a/src/mongo/db/query/optimizer/utils/interval_utils.cpp b/src/mongo/db/query/optimizer/utils/interval_utils.cpp
index a53a816b98a..996ae4c5f62 100644
--- a/src/mongo/db/query/optimizer/utils/interval_utils.cpp
+++ b/src/mongo/db/query/optimizer/utils/interval_utils.cpp
@@ -120,19 +120,17 @@ std::vector<IntervalRequirement> intersectIntervals(const IntervalRequirement& i
constFold(expr);
return expr;
};
- const auto minMaxFn = [](const Operations op, const ABT& v1, const ABT& v2) {
- // Encodes max(v1, v2).
- return make<If>(make<BinaryOp>(op, v1, v2), v1, v2);
+ const auto minFn = [](const ABT& v1, const ABT& v2) {
+ return make<If>(make<BinaryOp>(Operations::Lte, v1, v2), v1, v2);
};
- const auto minMaxFn1 = [](const Operations op, const ABT& v1, const ABT& v2, const ABT& v3) {
- // Encodes v1 op v2 ? v3 : v2
- return make<If>(make<BinaryOp>(op, v1, v2), v3, v2);
+ const auto maxFn = [](const ABT& v1, const ABT& v2) {
+ return make<If>(make<BinaryOp>(Operations::Gte, v1, v2), v1, v2);
};
// In the simplest case our bound is (max(low1, low2), min(high1, high2)) if none of the bounds
// are inclusive.
- const ABT maxLow = foldFn(minMaxFn(Operations::Gte, low1, low2));
- const ABT minHigh = foldFn(minMaxFn(Operations::Lte, high1, high2));
+ const ABT maxLow = foldFn(maxFn(low1, low2));
+ const ABT minHigh = foldFn(minFn(high1, high2));
if (foldFn(make<BinaryOp>(Operations::Gt, maxLow, minHigh)) == Constant::boolean(true)) {
// Low bound is greater than high bound.
return {};
@@ -145,15 +143,15 @@ std::vector<IntervalRequirement> intersectIntervals(const IntervalRequirement& i
// We form a "main" result interval which is closed on any side with "agreement" between the two
// intervals. For example [low1, high1] ^ [low2, high2) -> [max(low1, low2), min(high1, high2))
- BoundRequirement lowBoundMain(low1Inc && low2Inc, maxLow);
- BoundRequirement highBoundMain(high1Inc && high2Inc, minHigh);
+ BoundRequirement lowBoundPrimary(low1Inc && low2Inc, maxLow);
+ BoundRequirement highBoundPrimary(high1Inc && high2Inc, minHigh);
const bool boundsEqual =
foldFn(make<BinaryOp>(Operations::Eq, maxLow, minHigh)) == Constant::boolean(true);
if (boundsEqual) {
if (low1Inc && high1Inc && low2Inc && high2Inc) {
// Point interval.
- return {{std::move(lowBoundMain), std::move(highBoundMain)}};
+ return {{std::move(lowBoundPrimary), std::move(highBoundPrimary)}};
}
if ((!low1Inc && !low2Inc) || (!high1Inc && !high2Inc)) {
// Fully open on both sides.
@@ -162,24 +160,23 @@ std::vector<IntervalRequirement> intersectIntervals(const IntervalRequirement& i
}
if (low1Inc == low2Inc && high1Inc == high2Inc) {
// Inclusion matches on both sides.
- return {{std::move(lowBoundMain), std::move(highBoundMain)}};
+ return {{std::move(lowBoundPrimary), std::move(highBoundPrimary)}};
}
// At this point we have intervals without inclusion agreement, for example
- // [low1, high1) ^ (low2, high2]. We have the main result which in this case is the open
+ // [low1, high1) ^ (low2, high2]. We have the primary interval which in this case is the open
// (max(low1, low2), min(high1, high2)). Then we add an extra closed interval for each side with
- // disagreement. For example for the lower sides we add: [low2 >= low1 ? MaxKey : low1,
- // min(max(low1, low2), min(high1, high2)] This is a closed interval which would reduce to
- // [max(low1, low2), max(low1, low2)] if low2 < low1. If low2 >= low1 the interval reduces to an
- // empty one [MaxKey, min(max(low1, low2), min(high1, high2)] which will return no results from
- // an index scan. We do not know that in general if we do not have constants (we cannot fold).
+ // disagreement. For example for the lower sides we add: [indicator ? low1 : MaxKey, low1]. This
+ // is a closed interval which would reduce to [low1, low1] if low1 > low2 and the intervals
+ // intersect and are non-empty. If low2 >= low1 the interval reduces to an empty one,
+ // [MaxKey, low1], which will return no results from an index scan. We do not know that in
+ // general if we do not have constants (we cannot fold).
//
- // If we can fold the extra interval, we exploit the fact that (max(low1, low2),
- // min(high1, high2)) U [max(low1, low2), max(low1, low2)] is [max(low1, low2), min(high1,
- // high2)) (observe left side is now closed). Then we create a similar auxiliary interval for
- // the right side if there is disagreement on the inclusion. Finally, we attempt to fold both
- // intervals. Should we conclude definitively that they are point intervals, we update the
- // inclusion of the main interval for the respective side.
+ // If we can fold the aux interval, we combine the aux interval into the primary one, which
+ // would yield [low1, min(high1, high2)) if we can prove that low1 > low2. Then we create a
+ // similar auxiliary interval for the right side if there is disagreement on the inclusion.
+ // We'll attempt to fold both intervals. Should we conclude definitively that they are
+ // point intervals, we update the inclusion of the main interval for the respective side.
std::vector<IntervalRequirement> result;
const auto addAuxInterval = [&](ABT low, ABT high, BoundRequirement& bound) {
@@ -199,26 +196,94 @@ std::vector<IntervalRequirement> intersectIntervals(const IntervalRequirement& i
}
};
+ /*
+ * An auxiliary interval should resolve to a non-empty interval if the original intervals we're
+ * intersecting overlap and produce something non-empty. Below we create an overlap indicator,
+ * which tells us if the intervals overlap.
+ *
+ * For intersection, the pair [1,2) and [2, 3] does not overlap, while [1,2] and [2, 3] does. So
+ * we need to adjust our comparisons depending on if the bounds are both inclusive or not.
+ */
+ const Operations cmpLows = low1Inc && low2Inc ? Operations::Lte : Operations::Lt;
+ const Operations cmpLow1High2 = low1Inc && high2Inc ? Operations::Lte : Operations::Lt;
+ const Operations cmpLow2High1 = low2Inc && high1Inc ? Operations::Lte : Operations::Lt;
+ const Operations cmpHighs = high1Inc && high2Inc ? Operations::Lte : Operations::Lt;
+ /*
+ * Our final overlap indicator is as follows (using < or <= depending on inclusiveness)
+ * (low1,high1) ^ (low2,high2) overlap if:
+ * low2 < low1 < high2 || low2 < high1 < high2 || low1 < low2 < high1 || low1 < high2 < high1
+ * As long as both intervals are non-empty.
+ *
+ * This covers the four cases:
+ * 1. int1 intersects int2 from below, ex: (1,3) ^ (2,4)
+ * 2. int1 intersects int2 from above, ex: (2,4) ^ (1,3)
+ * 3. int1 is a subset of int2, ex: (2,3) ^ (1,4)
+ * 4. int2 is a subset of int1, ex: (1,4) ^ (2,3)
+ */
+ ABT int1NonEmpty =
+ make<BinaryOp>(low1Inc && high1Inc ? Operations::Lte : Operations::Lt, low1, high1);
+ ABT int2NonEmpty =
+ make<BinaryOp>(low2Inc && high2Inc ? Operations::Lte : Operations::Lt, low2, high2);
+ ABT overlapCondition =
+ make<BinaryOp>(Operations::Or,
+ make<BinaryOp>(Operations::Or,
+ make<BinaryOp>(Operations::And,
+ make<BinaryOp>(cmpLows, low2, low1),
+ make<BinaryOp>(cmpLow1High2, low1, high2)),
+ make<BinaryOp>(Operations::And,
+ make<BinaryOp>(cmpLow2High1, low2, high1),
+ make<BinaryOp>(cmpHighs, high1, high2))),
+ make<BinaryOp>(Operations::Or,
+ make<BinaryOp>(Operations::And,
+ make<BinaryOp>(cmpLows, low1, low2),
+ make<BinaryOp>(cmpLow2High1, low2, high1)),
+ make<BinaryOp>(Operations::And,
+ make<BinaryOp>(cmpLow1High2, low1, high2),
+ make<BinaryOp>(cmpHighs, high2, high1))));
+ overlapCondition = make<BinaryOp>(
+ Operations::And,
+ std::move(overlapCondition),
+ make<BinaryOp>(Operations::And, std::move(int1NonEmpty), std::move(int2NonEmpty)));
+
+ /*
+ * It's possible our aux indicators could be simplified. For example, a more concise indicator
+ * for [low1, high1] ^ (low2, high2] might be int1_nonempty && (int2 contains low1). This
+ * condition implies the intervals are non-empty and overlap, meaning the intersection is
+ * non-empty. It also implies that low1 > low2, meaning the inclusive bound wins.
+ */
if (low1Inc != low2Inc) {
- const ABT low = foldFn(minMaxFn1(
- Operations::Gte, low1Inc ? low2 : low1, low1Inc ? low1 : low2, Constant::maxKey()));
- const ABT high = foldFn(minMaxFn(Operations::Lte, maxLow, minHigh));
- addAuxInterval(std::move(low), std::move(high), lowBoundMain);
+ ABT incBound = low1Inc ? low1 : low2;
+ ABT nonIncBound = low1Inc ? low2 : low1;
+
+ // Our aux interval should be non-empty if overlap_indicator && (incBound > nonIncBound)
+ ABT auxCondition =
+ make<BinaryOp>(Operations::And,
+ overlapCondition,
+ make<BinaryOp>(Operations::Gt, incBound, std::move(nonIncBound)));
+ ABT low = foldFn(make<If>(std::move(auxCondition), incBound, Constant::maxKey()));
+ ABT high = std::move(incBound);
+ addAuxInterval(std::move(low), std::move(high), lowBoundPrimary);
}
if (high1Inc != high2Inc) {
- const ABT low = foldFn(minMaxFn(Operations::Gte, maxLow, minHigh));
- const ABT high = foldFn(minMaxFn1(Operations::Lte,
- high1Inc ? high2 : high1,
- high1Inc ? high1 : high2,
- Constant::minKey()));
- addAuxInterval(std::move(low), std::move(high), highBoundMain);
+ ABT incBound = high1Inc ? high1 : high2;
+ ABT nonIncBound = high1Inc ? high2 : high1;
+
+ ABT low = incBound;
+ // Our aux interval should be non-empty if overlap_indicator && (incBound < nonIncBound)
+ ABT auxCondition =
+ make<BinaryOp>(Operations::And,
+ overlapCondition,
+ make<BinaryOp>(Operations::Lt, incBound, std::move(nonIncBound)));
+ ABT high =
+ foldFn(make<If>(std::move(auxCondition), std::move(incBound), Constant::minKey()));
+ addAuxInterval(std::move(low), std::move(high), highBoundPrimary);
}
- if (!boundsEqual || (lowBoundMain.isInclusive() && highBoundMain.isInclusive())) {
+ if (!boundsEqual || (lowBoundPrimary.isInclusive() && highBoundPrimary.isInclusive())) {
// We add the main interval to the result as long as it is a valid point interval, or the
// bounds are not equal.
- result.emplace_back(std::move(lowBoundMain), std::move(highBoundMain));
+ result.emplace_back(std::move(lowBoundPrimary), std::move(highBoundPrimary));
}
return result;
}