diff options
Diffstat (limited to 'src/mongo/db/query/optimizer/utils/interval_utils.cpp')
-rw-r--r-- | src/mongo/db/query/optimizer/utils/interval_utils.cpp | 137 |
1 files changed, 101 insertions, 36 deletions
diff --git a/src/mongo/db/query/optimizer/utils/interval_utils.cpp b/src/mongo/db/query/optimizer/utils/interval_utils.cpp index a53a816b98a..996ae4c5f62 100644 --- a/src/mongo/db/query/optimizer/utils/interval_utils.cpp +++ b/src/mongo/db/query/optimizer/utils/interval_utils.cpp @@ -120,19 +120,17 @@ std::vector<IntervalRequirement> intersectIntervals(const IntervalRequirement& i constFold(expr); return expr; }; - const auto minMaxFn = [](const Operations op, const ABT& v1, const ABT& v2) { - // Encodes max(v1, v2). - return make<If>(make<BinaryOp>(op, v1, v2), v1, v2); + const auto minFn = [](const ABT& v1, const ABT& v2) { + return make<If>(make<BinaryOp>(Operations::Lte, v1, v2), v1, v2); }; - const auto minMaxFn1 = [](const Operations op, const ABT& v1, const ABT& v2, const ABT& v3) { - // Encodes v1 op v2 ? v3 : v2 - return make<If>(make<BinaryOp>(op, v1, v2), v3, v2); + const auto maxFn = [](const ABT& v1, const ABT& v2) { + return make<If>(make<BinaryOp>(Operations::Gte, v1, v2), v1, v2); }; // In the simplest case our bound is (max(low1, low2), min(high1, high2)) if none of the bounds // are inclusive. - const ABT maxLow = foldFn(minMaxFn(Operations::Gte, low1, low2)); - const ABT minHigh = foldFn(minMaxFn(Operations::Lte, high1, high2)); + const ABT maxLow = foldFn(maxFn(low1, low2)); + const ABT minHigh = foldFn(minFn(high1, high2)); if (foldFn(make<BinaryOp>(Operations::Gt, maxLow, minHigh)) == Constant::boolean(true)) { // Low bound is greater than high bound. return {}; @@ -145,15 +143,15 @@ std::vector<IntervalRequirement> intersectIntervals(const IntervalRequirement& i // We form a "main" result interval which is closed on any side with "agreement" between the two // intervals. For example [low1, high1] ^ [low2, high2) -> [max(low1, low2), min(high1, high2)) - BoundRequirement lowBoundMain(low1Inc && low2Inc, maxLow); - BoundRequirement highBoundMain(high1Inc && high2Inc, minHigh); + BoundRequirement lowBoundPrimary(low1Inc && low2Inc, maxLow); + BoundRequirement highBoundPrimary(high1Inc && high2Inc, minHigh); const bool boundsEqual = foldFn(make<BinaryOp>(Operations::Eq, maxLow, minHigh)) == Constant::boolean(true); if (boundsEqual) { if (low1Inc && high1Inc && low2Inc && high2Inc) { // Point interval. - return {{std::move(lowBoundMain), std::move(highBoundMain)}}; + return {{std::move(lowBoundPrimary), std::move(highBoundPrimary)}}; } if ((!low1Inc && !low2Inc) || (!high1Inc && !high2Inc)) { // Fully open on both sides. @@ -162,24 +160,23 @@ std::vector<IntervalRequirement> intersectIntervals(const IntervalRequirement& i } if (low1Inc == low2Inc && high1Inc == high2Inc) { // Inclusion matches on both sides. - return {{std::move(lowBoundMain), std::move(highBoundMain)}}; + return {{std::move(lowBoundPrimary), std::move(highBoundPrimary)}}; } // At this point we have intervals without inclusion agreement, for example - // [low1, high1) ^ (low2, high2]. We have the main result which in this case is the open + // [low1, high1) ^ (low2, high2]. We have the primary interval which in this case is the open // (max(low1, low2), min(high1, high2)). Then we add an extra closed interval for each side with - // disagreement. For example for the lower sides we add: [low2 >= low1 ? MaxKey : low1, - // min(max(low1, low2), min(high1, high2)] This is a closed interval which would reduce to - // [max(low1, low2), max(low1, low2)] if low2 < low1. If low2 >= low1 the interval reduces to an - // empty one [MaxKey, min(max(low1, low2), min(high1, high2)] which will return no results from - // an index scan. We do not know that in general if we do not have constants (we cannot fold). + // disagreement. For example for the lower sides we add: [indicator ? low1 : MaxKey, low1]. This + // is a closed interval which would reduce to [low1, low1] if low1 > low2 and the intervals + // intersect and are non-empty. If low2 >= low1 the interval reduces to an empty one, + // [MaxKey, low1], which will return no results from an index scan. We do not know that in + // general if we do not have constants (we cannot fold). // - // If we can fold the extra interval, we exploit the fact that (max(low1, low2), - // min(high1, high2)) U [max(low1, low2), max(low1, low2)] is [max(low1, low2), min(high1, - // high2)) (observe left side is now closed). Then we create a similar auxiliary interval for - // the right side if there is disagreement on the inclusion. Finally, we attempt to fold both - // intervals. Should we conclude definitively that they are point intervals, we update the - // inclusion of the main interval for the respective side. + // If we can fold the aux interval, we combine the aux interval into the primary one, which + // would yield [low1, min(high1, high2)) if we can prove that low1 > low2. Then we create a + // similar auxiliary interval for the right side if there is disagreement on the inclusion. + // We'll attempt to fold both intervals. Should we conclude definitively that they are + // point intervals, we update the inclusion of the main interval for the respective side. std::vector<IntervalRequirement> result; const auto addAuxInterval = [&](ABT low, ABT high, BoundRequirement& bound) { @@ -199,26 +196,94 @@ std::vector<IntervalRequirement> intersectIntervals(const IntervalRequirement& i } }; + /* + * An auxiliary interval should resolve to a non-empty interval if the original intervals we're + * intersecting overlap and produce something non-empty. Below we create an overlap indicator, + * which tells us if the intervals overlap. + * + * For intersection, the pair [1,2) and [2, 3] does not overlap, while [1,2] and [2, 3] does. So + * we need to adjust our comparisons depending on if the bounds are both inclusive or not. + */ + const Operations cmpLows = low1Inc && low2Inc ? Operations::Lte : Operations::Lt; + const Operations cmpLow1High2 = low1Inc && high2Inc ? Operations::Lte : Operations::Lt; + const Operations cmpLow2High1 = low2Inc && high1Inc ? Operations::Lte : Operations::Lt; + const Operations cmpHighs = high1Inc && high2Inc ? Operations::Lte : Operations::Lt; + /* + * Our final overlap indicator is as follows (using < or <= depending on inclusiveness) + * (low1,high1) ^ (low2,high2) overlap if: + * low2 < low1 < high2 || low2 < high1 < high2 || low1 < low2 < high1 || low1 < high2 < high1 + * As long as both intervals are non-empty. + * + * This covers the four cases: + * 1. int1 intersects int2 from below, ex: (1,3) ^ (2,4) + * 2. int1 intersects int2 from above, ex: (2,4) ^ (1,3) + * 3. int1 is a subset of int2, ex: (2,3) ^ (1,4) + * 4. int2 is a subset of int1, ex: (1,4) ^ (2,3) + */ + ABT int1NonEmpty = + make<BinaryOp>(low1Inc && high1Inc ? Operations::Lte : Operations::Lt, low1, high1); + ABT int2NonEmpty = + make<BinaryOp>(low2Inc && high2Inc ? Operations::Lte : Operations::Lt, low2, high2); + ABT overlapCondition = + make<BinaryOp>(Operations::Or, + make<BinaryOp>(Operations::Or, + make<BinaryOp>(Operations::And, + make<BinaryOp>(cmpLows, low2, low1), + make<BinaryOp>(cmpLow1High2, low1, high2)), + make<BinaryOp>(Operations::And, + make<BinaryOp>(cmpLow2High1, low2, high1), + make<BinaryOp>(cmpHighs, high1, high2))), + make<BinaryOp>(Operations::Or, + make<BinaryOp>(Operations::And, + make<BinaryOp>(cmpLows, low1, low2), + make<BinaryOp>(cmpLow2High1, low2, high1)), + make<BinaryOp>(Operations::And, + make<BinaryOp>(cmpLow1High2, low1, high2), + make<BinaryOp>(cmpHighs, high2, high1)))); + overlapCondition = make<BinaryOp>( + Operations::And, + std::move(overlapCondition), + make<BinaryOp>(Operations::And, std::move(int1NonEmpty), std::move(int2NonEmpty))); + + /* + * It's possible our aux indicators could be simplified. For example, a more concise indicator + * for [low1, high1] ^ (low2, high2] might be int1_nonempty && (int2 contains low1). This + * condition implies the intervals are non-empty and overlap, meaning the intersection is + * non-empty. It also implies that low1 > low2, meaning the inclusive bound wins. + */ if (low1Inc != low2Inc) { - const ABT low = foldFn(minMaxFn1( - Operations::Gte, low1Inc ? low2 : low1, low1Inc ? low1 : low2, Constant::maxKey())); - const ABT high = foldFn(minMaxFn(Operations::Lte, maxLow, minHigh)); - addAuxInterval(std::move(low), std::move(high), lowBoundMain); + ABT incBound = low1Inc ? low1 : low2; + ABT nonIncBound = low1Inc ? low2 : low1; + + // Our aux interval should be non-empty if overlap_indicator && (incBound > nonIncBound) + ABT auxCondition = + make<BinaryOp>(Operations::And, + overlapCondition, + make<BinaryOp>(Operations::Gt, incBound, std::move(nonIncBound))); + ABT low = foldFn(make<If>(std::move(auxCondition), incBound, Constant::maxKey())); + ABT high = std::move(incBound); + addAuxInterval(std::move(low), std::move(high), lowBoundPrimary); } if (high1Inc != high2Inc) { - const ABT low = foldFn(minMaxFn(Operations::Gte, maxLow, minHigh)); - const ABT high = foldFn(minMaxFn1(Operations::Lte, - high1Inc ? high2 : high1, - high1Inc ? high1 : high2, - Constant::minKey())); - addAuxInterval(std::move(low), std::move(high), highBoundMain); + ABT incBound = high1Inc ? high1 : high2; + ABT nonIncBound = high1Inc ? high2 : high1; + + ABT low = incBound; + // Our aux interval should be non-empty if overlap_indicator && (incBound < nonIncBound) + ABT auxCondition = + make<BinaryOp>(Operations::And, + overlapCondition, + make<BinaryOp>(Operations::Lt, incBound, std::move(nonIncBound))); + ABT high = + foldFn(make<If>(std::move(auxCondition), std::move(incBound), Constant::minKey())); + addAuxInterval(std::move(low), std::move(high), highBoundPrimary); } - if (!boundsEqual || (lowBoundMain.isInclusive() && highBoundMain.isInclusive())) { + if (!boundsEqual || (lowBoundPrimary.isInclusive() && highBoundPrimary.isInclusive())) { // We add the main interval to the result as long as it is a valid point interval, or the // bounds are not equal. - result.emplace_back(std::move(lowBoundMain), std::move(highBoundMain)); + result.emplace_back(std::move(lowBoundPrimary), std::move(highBoundPrimary)); } return result; } |