From 4a307daa1f6af15a48d8e12b38e10f16dec9664d Mon Sep 17 00:00:00 2001 From: Matt Boros Date: Mon, 9 Jan 2023 16:52:49 +0000 Subject: SERVER-71656 Add additional conditions to auxiliary intervals during interval intersection --- .../db/query/optimizer/interval_simplify_test.cpp | 177 ++++++++++++++------- .../optimizer/physical_rewriter_optimizer_test.cpp | 21 ++- .../db/query/optimizer/utils/interval_utils.cpp | 137 +++++++++++----- 3 files changed, 237 insertions(+), 98 deletions(-) diff --git a/src/mongo/db/query/optimizer/interval_simplify_test.cpp b/src/mongo/db/query/optimizer/interval_simplify_test.cpp index 9454a896922..288ccec7c75 100644 --- a/src/mongo/db/query/optimizer/interval_simplify_test.cpp +++ b/src/mongo/db/query/optimizer/interval_simplify_test.cpp @@ -355,8 +355,12 @@ TEST_F(IntervalIntersection, VariableIntervals1) { ASSERT_INTERVAL( "{\n" " {\n" - " {[If [] BinaryOp [Gte] Variable [v2] Variable [v1] Const [maxKey] Variable [v1]," - " If [] BinaryOp [Gte] Variable [v1] Variable [v2] Variable [v1] Variable [v2]]}\n" + " {[If [] BinaryOp [And] BinaryOp [And] BinaryOp [Or] BinaryOp [Or] BinaryOp [And] " + "BinaryOp [Lt] Variable [v2] Variable [v1] Const [true] BinaryOp [And] BinaryOp [Lt] " + "Variable [v2] Const [maxKey] Const [true] BinaryOp [Or] BinaryOp [And] BinaryOp [Lt] " + "Variable [v1] Variable [v2] BinaryOp [Lt] Variable [v2] Const [maxKey] Const [true] " + "BinaryOp [Lt] Variable [v2] Const [maxKey] BinaryOp [Gt] Variable [v1] Variable [v2] " + "Variable [v1] Const [maxKey], Variable [v1]]}\n" " }\n" " U \n" " {\n" @@ -404,16 +408,19 @@ TEST_F(IntervalIntersection, VariableIntervals3) { ASSERT_INTERVAL( "{\n" " {\n" - " {[If [] BinaryOp [Gte] Variable [v1] Variable [v2] Const [maxKey] Variable " - "[v2], If [] BinaryOp [Lte] If [] BinaryOp [Gte] Variable [v1] Variable [v2] Variable " - "[v1] Variable [v2] If [] BinaryOp [Lte] Variable [v3] Variable [v4] Variable [v3] " - "Variable [v4] If [] BinaryOp [Gte] Variable [v1] Variable [v2] Variable [v1] Variable " - "[v2] If [] BinaryOp [Lte] Variable [v3] Variable [v4] Variable [v3] Variable [v4]]}\n" + " {[If [] BinaryOp [And] BinaryOp [And] BinaryOp [Or] BinaryOp [Or] BinaryOp [And] " + "BinaryOp [Lt] Variable [v2] Variable [v1] BinaryOp [Lt] Variable [v1] Variable [v4] " + "BinaryOp [And] BinaryOp [Lte] Variable [v2] Variable [v3] BinaryOp [Lte] Variable [v3] " + "Variable [v4] BinaryOp [Or] BinaryOp [And] BinaryOp [Lt] Variable [v1] Variable [v2] " + "BinaryOp [Lte] Variable [v2] Variable [v3] BinaryOp [And] BinaryOp [Lt] Variable [v1] " + "Variable [v4] BinaryOp [Lte] Variable [v4] Variable [v3] BinaryOp [And] BinaryOp [Lt] " + "Variable [v1] Variable [v3] BinaryOp [Lte] Variable [v2] Variable [v4] BinaryOp [Gt] " + "Variable [v2] Variable [v1] Variable [v2] Const [maxKey], Variable [v2]]}\n" " }\n" " U \n" " {\n" - " {(If [] BinaryOp [Gte] Variable [v1] Variable [v2] Variable [v1] Variable " - "[v2], If [] BinaryOp [Lte] Variable [v3] Variable [v4] Variable [v3] Variable [v4]]}\n" + " {(If [] BinaryOp [Gte] Variable [v1] Variable [v2] Variable [v1] Variable [v2], " + "If [] BinaryOp [Lte] Variable [v3] Variable [v4] Variable [v3] Variable [v4]]}\n" " }\n" "}\n", *result); @@ -434,25 +441,30 @@ TEST_F(IntervalIntersection, VariableIntervals4) { ASSERT_INTERVAL( "{\n" " {\n" - " {[If [] BinaryOp [Gte] Variable [v1] Variable [v2] Const [maxKey] Variable " - "[v2], If [] BinaryOp [Lte] If [] BinaryOp [Gte] Variable [v1] Variable [v2] Variable " - "[v1] Variable [v2] If [] BinaryOp [Lte] Variable [v3] Variable [v4] Variable [v3] " - "Variable [v4] If [] BinaryOp [Gte] Variable [v1] Variable [v2] Variable [v1] Variable " - "[v2] If [] BinaryOp [Lte] Variable [v3] Variable [v4] Variable [v3] Variable [v4]]}\n" + " {[If [] BinaryOp [And] BinaryOp [And] BinaryOp [Or] BinaryOp [Or] BinaryOp [And] " + "BinaryOp [Lt] Variable [v2] Variable [v1] BinaryOp [Lt] Variable [v1] Variable [v4] " + "BinaryOp [And] BinaryOp [Lte] Variable [v2] Variable [v3] BinaryOp [Lt] Variable [v3] " + "Variable [v4] BinaryOp [Or] BinaryOp [And] BinaryOp [Lt] Variable [v1] Variable [v2] " + "BinaryOp [Lte] Variable [v2] Variable [v3] BinaryOp [And] BinaryOp [Lt] Variable [v1] " + "Variable [v4] BinaryOp [Lt] Variable [v4] Variable [v3] BinaryOp [And] BinaryOp [Lt] " + "Variable [v1] Variable [v3] BinaryOp [Lt] Variable [v2] Variable [v4] BinaryOp [Gt] " + "Variable [v2] Variable [v1] Variable [v2] Const [maxKey], Variable [v2]]}\n" " }\n" " U \n" " {\n" - " {[If [] BinaryOp [Gte] If [] BinaryOp [Gte] Variable [v1] Variable [v2] " - "Variable [v1] Variable [v2] If [] BinaryOp [Lte] Variable [v3] Variable [v4] Variable " - "[v3] Variable [v4] If [] BinaryOp [Gte] Variable [v1] Variable [v2] Variable [v1] " - "Variable [v2] If [] BinaryOp [Lte] Variable [v3] Variable [v4] Variable [v3] Variable " - "[v4], If [] BinaryOp [Lte] Variable [v4] Variable [v3] Const [minKey] Variable " - "[v3]]}\n" + " {[Variable [v3], If [] BinaryOp [And] BinaryOp [And] BinaryOp [Or] BinaryOp [Or] " + "BinaryOp [And] BinaryOp [Lt] Variable [v2] Variable [v1] BinaryOp [Lt] Variable [v1] " + "Variable [v4] BinaryOp [And] BinaryOp [Lte] Variable [v2] Variable [v3] BinaryOp [Lt] " + "Variable [v3] Variable [v4] BinaryOp [Or] BinaryOp [And] BinaryOp [Lt] Variable [v1] " + "Variable [v2] BinaryOp [Lte] Variable [v2] Variable [v3] BinaryOp [And] BinaryOp [Lt] " + "Variable [v1] Variable [v4] BinaryOp [Lt] Variable [v4] Variable [v3] BinaryOp [And] " + "BinaryOp [Lt] Variable [v1] Variable [v3] BinaryOp [Lt] Variable [v2] Variable [v4] " + "BinaryOp [Lt] Variable [v3] Variable [v4] Variable [v3] Const [minKey]]}\n" " }\n" " U \n" " {\n" - " {(If [] BinaryOp [Gte] Variable [v1] Variable [v2] Variable [v1] Variable " - "[v2], If [] BinaryOp [Lte] Variable [v3] Variable [v4] Variable [v3] Variable [v4])}\n" + " {(If [] BinaryOp [Gte] Variable [v1] Variable [v2] Variable [v1] Variable [v2], " + "If [] BinaryOp [Lte] Variable [v3] Variable [v4] Variable [v3] Variable [v4])}\n" " }\n" "}\n", *result); @@ -630,13 +642,27 @@ bool compareIntervals(const IntervalReqExpr::Node& original, return transport.computeInclusion(original) == transport.computeInclusion(simplified); } +void constFoldBounds(IntervalReqExpr::Node& node) { + for (auto& disjunct : node.cast()->nodes()) { + for (auto& conjunct : disjunct.cast()->nodes()) { + auto& interval = conjunct.cast()->getExpr(); + ABT lowABT = interval.getLowBound().getBound(); + ABT highABT = interval.getHighBound().getBound(); + ConstEval::constFold(lowABT); + ConstEval::constFold(highABT); + interval = IntervalRequirement{ + BoundRequirement{interval.getLowBound().isInclusive(), std::move(lowABT)}, + BoundRequirement{interval.getHighBound().isInclusive(), std::move(highABT)}, + }; + } + } +} + /* * Create two random intervals composed of constants and test intersection/union on them. */ template void testIntervalPermutation(int permutation) { - auto prefixId = PrefixId::createForTests(); - const bool low1Inc = decode<2>(permutation); const int low1 = decode(permutation); const bool high1Inc = decode<2>(permutation); @@ -645,17 +671,33 @@ void testIntervalPermutation(int permutation) { const int low2 = decode(permutation); const bool high2Inc = decode<2>(permutation); const int high2 = decode(permutation); + const bool useRealConstFold = decode<2>(permutation); + + // This function can be passed as a substitute for the real constant folding function, to test + // that our simplification methods work when we cannot constant fold anything. + const auto noOpConstFold = [](ABT& n) { + // No-op. + }; // Test intersection. { auto original = _disj( _conj(_interval({low1Inc, Constant::int32(low1)}, {high1Inc, Constant::int32(high1)}), _interval({low2Inc, Constant::int32(low2)}, {high2Inc, Constant::int32(high2)}))); - auto simplified = intersectDNFIntervals(original, ConstEval::constFold); + auto simplified = intersectDNFIntervals( + original, useRealConstFold ? ConstEval::constFold : noOpConstFold); + if (simplified) { - // Since we are testing with constants, we should have at most one interval. - ASSERT_TRUE(IntervalReqExpr::getSingularDNF(*simplified)); - compareIntervals(original, *simplified); + if (useRealConstFold) { + // Since we are testing with constants, we should have at most one interval as long + // as we use real constant folding. + ASSERT_TRUE(IntervalReqExpr::getSingularDNF(*simplified)); + } else { + // If we didn't use the real constant folding function, we have to constant fold + // now, because our bounds will have If's. + constFoldBounds(*simplified); + } + ASSERT(compareIntervals(original, *simplified)); } else { IntervalInclusionTransport transport; ASSERT(transport.computeInclusion(original) == ExtendedBitset()); @@ -715,36 +757,38 @@ void testIntervalFuzz(const uint64_t seed, PseudoRandom& threadLocalRNG) { } EvalVariables varEval(std::move(varMap)); - // Create between one and five intervals. + // Create three intervals. + constexpr size_t numIntervals = 3; - // TODO SERVER-71656 we should be able to uncomment this after SERVER-71656 is complete. // Intersect with multiple intervals. - // const size_t numIntervals = 2 + threadLocalRNG.nextInt32(5); - // { - // IntervalReqExpr::NodeVector intervalVec; - // for (size_t i = 0; i < numIntervals; i++) { - // intervalVec.push_back(IntervalReqExpr::make( - // IntervalRequirement{makeRandomBound(), makeRandomBound()})); - // } - - // auto original = - // IntervalReqExpr::make(IntervalReqExpr::NodeVector{ - // IntervalReqExpr::make(std::move(intervalVec))}); - // auto simplified = intersectDNFIntervals(original, ConstEval::constFold); - - // varEval.replaceVarsInInterval(original); - // if (simplified) { - // varEval.replaceVarsInInterval(*simplified); - // compareIntervals(original, *simplified); - // } else { - // IntervalInclusionTransport transport; - // ASSERT(transport.computeInclusion(original) == ExtendedBitset()); - // } - // } + { + IntervalReqExpr::NodeVector intervalVec; + for (size_t i = 0; i < numIntervals; i++) { + intervalVec.push_back(IntervalReqExpr::make( + IntervalRequirement{makeRandomBound(threadLocalRNG, vars), + makeRandomBound(threadLocalRNG, vars)})); + } + + auto original = + IntervalReqExpr::make(IntervalReqExpr::NodeVector{ + IntervalReqExpr::make(std::move(intervalVec))}); + auto simplified = intersectDNFIntervals(original, ConstEval::constFold); + + varEval.replaceVarsInInterval(original); + if (simplified) { + varEval.replaceVarsInInterval(*simplified); + ASSERT(compareIntervals(original, *simplified)); + } else { + // 'simplified' false means the simplified interval is empty (always-false) and can't be + // represented as a BoolExpr, so check that 'original' returns false on every example. + IntervalInclusionTransport transport; + ASSERT(transport.computeInclusion(original) == ExtendedBitset()); + } + } // TODO SERVER-71175 should include a union test here - // TODO SERVER-71656 should extend this to have DNF intervals that are not purely disjunctions + // TODO SERVER-71175 should extend this to have DNF intervals that are not purely disjunctions // or purely conjunctions. A mix of ORs and ANDs seems necessary, to test that the two // simplification methods (intersecting and unioning) don't interfere with each other. } @@ -753,8 +797,12 @@ static constexpr int bitsetSize = 10; static const size_t numThreads = ProcessInfo::getNumCores(); TEST_F(IntervalIntersection, IntervalPermutations) { + // Number of permutations is bitsetSize^4 * 2^4 * 2 + // The first term is needed because we generate four bounds to intersect two intervals. The + // second term is for the inclusivity of the four bounds. The third term is to determine if we + // test with real constant folding or a no-op constant folding function. static constexpr int numPermutations = - bitsetSize * bitsetSize * bitsetSize * bitsetSize * 2 * 2 * 2 * 2; + (bitsetSize * bitsetSize * bitsetSize * bitsetSize) * (2 * 2 * 2 * 2) * 2; /** * Test for interval intersection. Generate intervals with constants in the * range of [0, N), with random inclusion/exclusion of the endpoints. Intersect the intervals @@ -816,5 +864,26 @@ TEST_F(IntervalIntersection, IntervalFuzz) { std::cout << "...done. Took: " << elapsedFuzz << " s.\n"; } +TEST(IntervalIntersection, IntersectionSpecialCase) { + auto original = IntervalReqExpr::make(IntervalReqExpr::NodeVector{ + IntervalReqExpr::make(IntervalReqExpr::NodeVector{ + IntervalReqExpr::make(IntervalRequirement{ + {true, make("var1")}, {true, make("var1")}}), + IntervalReqExpr::make(IntervalRequirement{ + {false, make("var2")}, {false, make("var3")}})})}); + + auto simplified = intersectDNFIntervals(original, ConstEval::constFold); + ASSERT(simplified); + + EvalVariables varEval({ + {"var1", Constant::int32(3)}, + {"var2", Constant::int32(0)}, + {"var3", Constant::int32(3)}, + }); + varEval.replaceVarsInInterval(original); + varEval.replaceVarsInInterval(*simplified); + ASSERT(compareIntervals(original, *simplified)); +} + } // namespace } // namespace mongo::optimizer diff --git a/src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp b/src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp index 9b48fd9d3ad..bff5d9fa2be 100644 --- a/src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp +++ b/src/mongo/db/query/optimizer/physical_rewriter_optimizer_test.cpp @@ -1702,17 +1702,22 @@ TEST(PhysRewriter, FilterIndexingVariable) { "| | BindBlock:\n" "| | [rid_0]\n" "| | Source []\n" - "| IndexScan [{'': rid_0}, scanDefName: c1, indexDefName: index1, interval: {[If [" - "] BinaryOp [Gte] FunctionCall [getQueryParam] Const [0] FunctionCall [getQueryParam] Con" - "st [1] Const [maxKey] FunctionCall [getQueryParam] Const [1], If [] BinaryOp [Gte] Funct" - "ionCall [getQueryParam] Const [1] FunctionCall [getQueryParam] Const [0] FunctionCall [g" - "etQueryParam] Const [1] FunctionCall [getQueryParam] Const [0]]}]\n" + "| IndexScan [{'': rid_0}, scanDefName: c1, indexDefName: index1, interval: {[If [] " + "BinaryOp [And] BinaryOp [And] BinaryOp [Or] BinaryOp [Or] BinaryOp [And] BinaryOp [Lt] " + "FunctionCall [getQueryParam] Const [0] FunctionCall [getQueryParam] Const [1] Const " + "[true] BinaryOp [And] BinaryOp [Lt] FunctionCall [getQueryParam] Const [0] Const [maxKey] " + "Const [true] BinaryOp [Or] BinaryOp [And] BinaryOp [Lt] FunctionCall [getQueryParam] " + "Const [1] FunctionCall [getQueryParam] Const [0] BinaryOp [Lt] FunctionCall " + "[getQueryParam] Const [0] Const [maxKey] Const [true] BinaryOp [Lt] FunctionCall " + "[getQueryParam] Const [0] Const [maxKey] BinaryOp [Gt] FunctionCall [getQueryParam] Const " + "[1] FunctionCall [getQueryParam] Const [0] FunctionCall [getQueryParam] Const [1] Const " + "[maxKey], FunctionCall [getQueryParam] Const [1]]}]\n" "| BindBlock:\n" "| [rid_0]\n" "| Source []\n" - "IndexScan [{'': rid_0}, scanDefName: c1, indexDefName: index1, interval: {>If [] Bi" - "naryOp [Gte] FunctionCall [getQueryParam] Const [1] FunctionCall [getQueryParam] Const [" - "0] FunctionCall [getQueryParam] Const [1] FunctionCall [getQueryParam] Const [0]}]\n" + "IndexScan [{'': rid_0}, scanDefName: c1, indexDefName: index1, interval: {>If [] " + "BinaryOp [Gte] FunctionCall [getQueryParam] Const [1] FunctionCall [getQueryParam] Const " + "[0] FunctionCall [getQueryParam] Const [1] FunctionCall [getQueryParam] Const [0]}]\n" " BindBlock:\n" " [rid_0]\n" " Source []\n", diff --git a/src/mongo/db/query/optimizer/utils/interval_utils.cpp b/src/mongo/db/query/optimizer/utils/interval_utils.cpp index a53a816b98a..996ae4c5f62 100644 --- a/src/mongo/db/query/optimizer/utils/interval_utils.cpp +++ b/src/mongo/db/query/optimizer/utils/interval_utils.cpp @@ -120,19 +120,17 @@ std::vector intersectIntervals(const IntervalRequirement& i constFold(expr); return expr; }; - const auto minMaxFn = [](const Operations op, const ABT& v1, const ABT& v2) { - // Encodes max(v1, v2). - return make(make(op, v1, v2), v1, v2); + const auto minFn = [](const ABT& v1, const ABT& v2) { + return make(make(Operations::Lte, v1, v2), v1, v2); }; - const auto minMaxFn1 = [](const Operations op, const ABT& v1, const ABT& v2, const ABT& v3) { - // Encodes v1 op v2 ? v3 : v2 - return make(make(op, v1, v2), v3, v2); + const auto maxFn = [](const ABT& v1, const ABT& v2) { + return make(make(Operations::Gte, v1, v2), v1, v2); }; // In the simplest case our bound is (max(low1, low2), min(high1, high2)) if none of the bounds // are inclusive. - const ABT maxLow = foldFn(minMaxFn(Operations::Gte, low1, low2)); - const ABT minHigh = foldFn(minMaxFn(Operations::Lte, high1, high2)); + const ABT maxLow = foldFn(maxFn(low1, low2)); + const ABT minHigh = foldFn(minFn(high1, high2)); if (foldFn(make(Operations::Gt, maxLow, minHigh)) == Constant::boolean(true)) { // Low bound is greater than high bound. return {}; @@ -145,15 +143,15 @@ std::vector intersectIntervals(const IntervalRequirement& i // We form a "main" result interval which is closed on any side with "agreement" between the two // intervals. For example [low1, high1] ^ [low2, high2) -> [max(low1, low2), min(high1, high2)) - BoundRequirement lowBoundMain(low1Inc && low2Inc, maxLow); - BoundRequirement highBoundMain(high1Inc && high2Inc, minHigh); + BoundRequirement lowBoundPrimary(low1Inc && low2Inc, maxLow); + BoundRequirement highBoundPrimary(high1Inc && high2Inc, minHigh); const bool boundsEqual = foldFn(make(Operations::Eq, maxLow, minHigh)) == Constant::boolean(true); if (boundsEqual) { if (low1Inc && high1Inc && low2Inc && high2Inc) { // Point interval. - return {{std::move(lowBoundMain), std::move(highBoundMain)}}; + return {{std::move(lowBoundPrimary), std::move(highBoundPrimary)}}; } if ((!low1Inc && !low2Inc) || (!high1Inc && !high2Inc)) { // Fully open on both sides. @@ -162,24 +160,23 @@ std::vector intersectIntervals(const IntervalRequirement& i } if (low1Inc == low2Inc && high1Inc == high2Inc) { // Inclusion matches on both sides. - return {{std::move(lowBoundMain), std::move(highBoundMain)}}; + return {{std::move(lowBoundPrimary), std::move(highBoundPrimary)}}; } // At this point we have intervals without inclusion agreement, for example - // [low1, high1) ^ (low2, high2]. We have the main result which in this case is the open + // [low1, high1) ^ (low2, high2]. We have the primary interval which in this case is the open // (max(low1, low2), min(high1, high2)). Then we add an extra closed interval for each side with - // disagreement. For example for the lower sides we add: [low2 >= low1 ? MaxKey : low1, - // min(max(low1, low2), min(high1, high2)] This is a closed interval which would reduce to - // [max(low1, low2), max(low1, low2)] if low2 < low1. If low2 >= low1 the interval reduces to an - // empty one [MaxKey, min(max(low1, low2), min(high1, high2)] which will return no results from - // an index scan. We do not know that in general if we do not have constants (we cannot fold). + // disagreement. For example for the lower sides we add: [indicator ? low1 : MaxKey, low1]. This + // is a closed interval which would reduce to [low1, low1] if low1 > low2 and the intervals + // intersect and are non-empty. If low2 >= low1 the interval reduces to an empty one, + // [MaxKey, low1], which will return no results from an index scan. We do not know that in + // general if we do not have constants (we cannot fold). // - // If we can fold the extra interval, we exploit the fact that (max(low1, low2), - // min(high1, high2)) U [max(low1, low2), max(low1, low2)] is [max(low1, low2), min(high1, - // high2)) (observe left side is now closed). Then we create a similar auxiliary interval for - // the right side if there is disagreement on the inclusion. Finally, we attempt to fold both - // intervals. Should we conclude definitively that they are point intervals, we update the - // inclusion of the main interval for the respective side. + // If we can fold the aux interval, we combine the aux interval into the primary one, which + // would yield [low1, min(high1, high2)) if we can prove that low1 > low2. Then we create a + // similar auxiliary interval for the right side if there is disagreement on the inclusion. + // We'll attempt to fold both intervals. Should we conclude definitively that they are + // point intervals, we update the inclusion of the main interval for the respective side. std::vector result; const auto addAuxInterval = [&](ABT low, ABT high, BoundRequirement& bound) { @@ -199,26 +196,94 @@ std::vector intersectIntervals(const IntervalRequirement& i } }; + /* + * An auxiliary interval should resolve to a non-empty interval if the original intervals we're + * intersecting overlap and produce something non-empty. Below we create an overlap indicator, + * which tells us if the intervals overlap. + * + * For intersection, the pair [1,2) and [2, 3] does not overlap, while [1,2] and [2, 3] does. So + * we need to adjust our comparisons depending on if the bounds are both inclusive or not. + */ + const Operations cmpLows = low1Inc && low2Inc ? Operations::Lte : Operations::Lt; + const Operations cmpLow1High2 = low1Inc && high2Inc ? Operations::Lte : Operations::Lt; + const Operations cmpLow2High1 = low2Inc && high1Inc ? Operations::Lte : Operations::Lt; + const Operations cmpHighs = high1Inc && high2Inc ? Operations::Lte : Operations::Lt; + /* + * Our final overlap indicator is as follows (using < or <= depending on inclusiveness) + * (low1,high1) ^ (low2,high2) overlap if: + * low2 < low1 < high2 || low2 < high1 < high2 || low1 < low2 < high1 || low1 < high2 < high1 + * As long as both intervals are non-empty. + * + * This covers the four cases: + * 1. int1 intersects int2 from below, ex: (1,3) ^ (2,4) + * 2. int1 intersects int2 from above, ex: (2,4) ^ (1,3) + * 3. int1 is a subset of int2, ex: (2,3) ^ (1,4) + * 4. int2 is a subset of int1, ex: (1,4) ^ (2,3) + */ + ABT int1NonEmpty = + make(low1Inc && high1Inc ? Operations::Lte : Operations::Lt, low1, high1); + ABT int2NonEmpty = + make(low2Inc && high2Inc ? Operations::Lte : Operations::Lt, low2, high2); + ABT overlapCondition = + make(Operations::Or, + make(Operations::Or, + make(Operations::And, + make(cmpLows, low2, low1), + make(cmpLow1High2, low1, high2)), + make(Operations::And, + make(cmpLow2High1, low2, high1), + make(cmpHighs, high1, high2))), + make(Operations::Or, + make(Operations::And, + make(cmpLows, low1, low2), + make(cmpLow2High1, low2, high1)), + make(Operations::And, + make(cmpLow1High2, low1, high2), + make(cmpHighs, high2, high1)))); + overlapCondition = make( + Operations::And, + std::move(overlapCondition), + make(Operations::And, std::move(int1NonEmpty), std::move(int2NonEmpty))); + + /* + * It's possible our aux indicators could be simplified. For example, a more concise indicator + * for [low1, high1] ^ (low2, high2] might be int1_nonempty && (int2 contains low1). This + * condition implies the intervals are non-empty and overlap, meaning the intersection is + * non-empty. It also implies that low1 > low2, meaning the inclusive bound wins. + */ if (low1Inc != low2Inc) { - const ABT low = foldFn(minMaxFn1( - Operations::Gte, low1Inc ? low2 : low1, low1Inc ? low1 : low2, Constant::maxKey())); - const ABT high = foldFn(minMaxFn(Operations::Lte, maxLow, minHigh)); - addAuxInterval(std::move(low), std::move(high), lowBoundMain); + ABT incBound = low1Inc ? low1 : low2; + ABT nonIncBound = low1Inc ? low2 : low1; + + // Our aux interval should be non-empty if overlap_indicator && (incBound > nonIncBound) + ABT auxCondition = + make(Operations::And, + overlapCondition, + make(Operations::Gt, incBound, std::move(nonIncBound))); + ABT low = foldFn(make(std::move(auxCondition), incBound, Constant::maxKey())); + ABT high = std::move(incBound); + addAuxInterval(std::move(low), std::move(high), lowBoundPrimary); } if (high1Inc != high2Inc) { - const ABT low = foldFn(minMaxFn(Operations::Gte, maxLow, minHigh)); - const ABT high = foldFn(minMaxFn1(Operations::Lte, - high1Inc ? high2 : high1, - high1Inc ? high1 : high2, - Constant::minKey())); - addAuxInterval(std::move(low), std::move(high), highBoundMain); + ABT incBound = high1Inc ? high1 : high2; + ABT nonIncBound = high1Inc ? high2 : high1; + + ABT low = incBound; + // Our aux interval should be non-empty if overlap_indicator && (incBound < nonIncBound) + ABT auxCondition = + make(Operations::And, + overlapCondition, + make(Operations::Lt, incBound, std::move(nonIncBound))); + ABT high = + foldFn(make(std::move(auxCondition), std::move(incBound), Constant::minKey())); + addAuxInterval(std::move(low), std::move(high), highBoundPrimary); } - if (!boundsEqual || (lowBoundMain.isInclusive() && highBoundMain.isInclusive())) { + if (!boundsEqual || (lowBoundPrimary.isInclusive() && highBoundPrimary.isInclusive())) { // We add the main interval to the result as long as it is a valid point interval, or the // bounds are not equal. - result.emplace_back(std::move(lowBoundMain), std::move(highBoundMain)); + result.emplace_back(std::move(lowBoundPrimary), std::move(highBoundPrimary)); } return result; } -- cgit v1.2.1