diff options
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql/ranges.py')
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/ranges.py | 497 |
1 files changed, 403 insertions, 94 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/ranges.py b/lib/sqlalchemy/dialects/postgresql/ranges.py index 6729f3785..a4c39d063 100644 --- a/lib/sqlalchemy/dialects/postgresql/ranges.py +++ b/lib/sqlalchemy/dialects/postgresql/ranges.py @@ -134,6 +134,119 @@ class Range(Generic[_T]): else: return None + def _compare_edges( + self, + value1: Optional[_T], + bound1: str, + value2: Optional[_T], + bound2: str, + only_values: bool = False, + ) -> int: + """Compare two range bounds. + + Return -1, 0 or 1 respectively when `value1` is less than, + equal to or greater than `value2`. + + When `only_value` is ``True``, do not consider the *inclusivity* + of the edges, just their values. + """ + + value1_is_lower_bound = bound1 in {"[", "("} + value2_is_lower_bound = bound2 in {"[", "("} + + # Infinite edges are equal when they are on the same side, + # otherwise a lower edge is considered less than the upper end + if value1 is value2 is None: + if value1_is_lower_bound == value2_is_lower_bound: + return 0 + else: + return -1 if value1_is_lower_bound else 1 + elif value1 is None: + return -1 if value1_is_lower_bound else 1 + elif value2 is None: + return 1 if value2_is_lower_bound else -1 + + # Short path for trivial case + if bound1 == bound2 and value1 == value2: + return 0 + + value1_inc = bound1 in {"[", "]"} + value2_inc = bound2 in {"[", "]"} + step = self._get_discrete_step() + + if step is not None: + # "Normalize" the two edges as '[)', to simplify successive + # logic when the range is discrete: otherwise we would need + # to handle the comparison between ``(0`` and ``[1`` that + # are equal when dealing with integers while for floats the + # former is lesser than the latter + + if value1_is_lower_bound: + if not value1_inc: + value1 += step + value1_inc = True + else: + if value1_inc: + value1 += step + value1_inc = False + if value2_is_lower_bound: + if not value2_inc: + value2 += step + value2_inc = True + else: + if value2_inc: + value2 += step + value2_inc = False + + if value1 < value2: + return -1 + elif value1 > value2: + return 1 + elif only_values: + return 0 + else: + # Neither one is infinite but are equal, so we + # need to consider the respective inclusive/exclusive + # flag + + if value1_inc and value2_inc: + return 0 + elif not value1_inc and not value2_inc: + if value1_is_lower_bound == value2_is_lower_bound: + return 0 + else: + return 1 if value1_is_lower_bound else -1 + elif not value1_inc: + return 1 if value1_is_lower_bound else -1 + elif not value2_inc: + return -1 if value2_is_lower_bound else 1 + else: + return 0 + + def __eq__(self, other: Range) -> bool: + """Compare this range to the `other` taking into account + bounds inclusivity, returning ``True`` if they are equal. + """ + + if self.empty and other.empty: + return True + elif self.empty != other.empty: + return False + + slower = self.lower + slower_b = self.bounds[0] + olower = other.lower + olower_b = other.bounds[0] + supper = self.upper + supper_b = self.bounds[1] + oupper = other.upper + oupper_b = other.bounds[1] + + return ( + self._compare_edges(slower, slower_b, olower, olower_b) == 0 + and self._compare_edges(supper, supper_b, oupper, oupper_b) == 0 + ) + def contained_by(self, other: Range) -> bool: "Determine whether this range is a contained by `other`." @@ -145,72 +258,23 @@ class Range(Generic[_T]): if other.empty: return False + slower = self.lower + slower_b = self.bounds[0] olower = other.lower - oupper = other.upper + olower_b = other.bounds[0] - # A bilateral unbound range contains any other range - if olower is oupper is None: - return True + if self._compare_edges(slower, slower_b, olower, olower_b) < 0: + return False - slower = self.lower supper = self.upper + supper_b = self.bounds[1] + oupper = other.upper + oupper_b = other.bounds[1] - # A lower-bound range cannot contain a lower-unbound range - if slower is None and olower is not None: - return False - - # Likewise on the right side - if supper is None and oupper is not None: + if self._compare_edges(supper, supper_b, oupper, oupper_b) > 0: return False - slower_inc = self.bounds[0] == "[" - supper_inc = self.bounds[1] == "]" - olower_inc = other.bounds[0] == "[" - oupper_inc = other.bounds[1] == "]" - - # Check the lower end - step = -1 - if slower is not None and olower is not None: - lside = olower < slower - if not lside: - if not slower_inc or olower_inc: - lside = olower == slower - if not lside: - # Cover (1,x] vs [2,x) and (0,x] vs [1,x) - if not slower_inc and olower_inc and slower < olower: - step = self._get_discrete_step() - if step is not None: - lside = olower == (slower + step) - elif slower_inc and not olower_inc and slower > olower: - step = self._get_discrete_step() - if step is not None: - lside = (olower + step) == slower - if not lside: - return False - - # Lower end already considered, an upper-unbound range surely contains - # this - if oupper is None: - return True - - # Check the upper end - uside = oupper > supper - if not uside: - if not supper_inc or oupper_inc: - uside = oupper == supper - if not uside: - # Cover (x,2] vs [x,3) and (x,1] vs [x,2) - if supper_inc and not oupper_inc and supper < oupper: - if step == -1: - step = self._get_discrete_step() - if step is not None: - uside = oupper == (supper + step) - elif not supper_inc and oupper_inc and supper > oupper: - if step == -1: - step = self._get_discrete_step() - if step is not None: - uside = (oupper + step) == supper - return uside + return True def contains(self, value: Union[_T, Range]) -> bool: "Determine whether this range contains `value`." @@ -220,52 +284,286 @@ class Range(Generic[_T]): else: return self._contains_value(value) - def overlaps(self, other): - """Boolean expression. Returns true if the column overlaps - (has points in common with) the right hand operand. - """ - raise NotImplementedError("not yet implemented") + def overlaps(self, other: Range) -> bool: + "Determine whether this range overlaps with `other`." - def strictly_left_of(self, other): - """Boolean expression. Returns true if the column is strictly - left of the right hand operand. - """ - raise NotImplementedError("not yet implemented") + # Empty ranges never overlap with any other range + if self.empty or other.empty: + return False + + slower = self.lower + slower_b = self.bounds[0] + supper = self.upper + supper_b = self.bounds[1] + olower = other.lower + olower_b = other.bounds[0] + oupper = other.upper + oupper_b = other.bounds[1] + + # Check whether this lower bound is contained in the other range + if ( + self._compare_edges(slower, slower_b, olower, olower_b) >= 0 + and self._compare_edges(slower, slower_b, oupper, oupper_b) <= 0 + ): + return True + + # Check whether other lower bound is contained in this range + if ( + self._compare_edges(olower, olower_b, slower, slower_b) >= 0 + and self._compare_edges(olower, olower_b, supper, supper_b) <= 0 + ): + return True + + return False + + def strictly_left_of(self, other: Range) -> bool: + "Determine whether this range is completely to the left of `other`." + + # Empty ranges are neither to left nor to the right of any other range + if self.empty or other.empty: + return False + + supper = self.upper + supper_b = self.bounds[1] + olower = other.lower + olower_b = other.bounds[0] + + # Check whether this upper edge is less than other's lower end + return self._compare_edges(supper, supper_b, olower, olower_b) < 0 __lshift__ = strictly_left_of - def strictly_right_of(self, other): - """Boolean expression. Returns true if the column is strictly - right of the right hand operand. - """ - raise NotImplementedError("not yet implemented") + def strictly_right_of(self, other: Range) -> bool: + "Determine whether this range is completely to the right of `other`." + + # Empty ranges are neither to left nor to the right of any other range + if self.empty or other.empty: + return False + + slower = self.lower + slower_b = self.bounds[0] + oupper = other.upper + oupper_b = other.bounds[1] + + # Check whether this lower edge is greater than other's upper end + return self._compare_edges(slower, slower_b, oupper, oupper_b) > 0 __rshift__ = strictly_right_of - def not_extend_right_of(self, other): - """Boolean expression. Returns true if the range in the column - does not extend right of the range in the operand. - """ - raise NotImplementedError("not yet implemented") + def not_extend_left_of(self, other: Range) -> bool: + "Determine whether this does not extend to the left of `other`." - def not_extend_left_of(self, other): - """Boolean expression. Returns true if the range in the column - does not extend left of the range in the operand. - """ - raise NotImplementedError("not yet implemented") + # Empty ranges are neither to left nor to the right of any other range + if self.empty or other.empty: + return False + + slower = self.lower + slower_b = self.bounds[0] + olower = other.lower + olower_b = other.bounds[0] + + # Check whether this lower edge is not less than other's lower end + return self._compare_edges(slower, slower_b, olower, olower_b) >= 0 + + def not_extend_right_of(self, other: Range) -> bool: + "Determine whether this does not extend to the right of `other`." + + # Empty ranges are neither to left nor to the right of any other range + if self.empty or other.empty: + return False - def adjacent_to(self, other): - """Boolean expression. Returns true if the range in the column - is adjacent to the range in the operand. + supper = self.upper + supper_b = self.bounds[1] + oupper = other.upper + oupper_b = other.bounds[1] + + # Check whether this upper edge is not greater than other's upper end + return self._compare_edges(supper, supper_b, oupper, oupper_b) <= 0 + + def _upper_edge_adjacent_to_lower( + self, + value1: Optional[_T], + bound1: str, + value2: Optional[_T], + bound2: str, + ) -> bool: + """Determine whether an upper bound is immediately successive to a + lower bound.""" + + # Since we need a peculiar way to handle the bounds inclusivity, + # just do a comparison by value here + res = self._compare_edges(value1, bound1, value2, bound2, True) + if res == -1: + step = self._get_discrete_step() + if step is None: + return False + if bound1 == "]": + if bound2 == "[": + return value1 == value2 - step + else: + return value1 == value2 + else: + if bound2 == "[": + return value1 == value2 + else: + return value1 == value2 - step + elif res == 0: + # Cover cases like [0,0] -|- [1,] and [0,2) -|- (1,3] + if ( + bound1 == "]" + and bound2 == "[" + or bound1 == ")" + and bound2 == "(" + ): + step = self._get_discrete_step() + if step is not None: + return True + return ( + bound1 == ")" + and bound2 == "[" + or bound1 == "]" + and bound2 == "(" + ) + else: + return False + + def adjacent_to(self, other: Range) -> bool: + "Determine whether this range is adjacent to the `other`." + + # Empty ranges are not adjacent to any other range + if self.empty or other.empty: + return False + + slower = self.lower + slower_b = self.bounds[0] + supper = self.upper + supper_b = self.bounds[1] + olower = other.lower + olower_b = other.bounds[0] + oupper = other.upper + oupper_b = other.bounds[1] + + return self._upper_edge_adjacent_to_lower( + supper, supper_b, olower, olower_b + ) or self._upper_edge_adjacent_to_lower( + oupper, oupper_b, slower, slower_b + ) + + def union(self, other: Range) -> Range: + """Compute the union of this range with the `other`. + + This raises a ``ValueError`` exception if the two ranges are + "disjunct", that is neither adjacent nor overlapping. """ - raise NotImplementedError("not yet implemented") - def __add__(self, other): - """Range expression. Returns the union of the two ranges. - Will raise an exception if the resulting range is not - contiguous. + # Empty ranges are "additive identities" + if self.empty: + return other + if other.empty: + return self + + if not self.overlaps(other) and not self.adjacent_to(other): + raise ValueError( + "Adding non-overlapping and non-adjacent" + " ranges is not implemented" + ) + + slower = self.lower + slower_b = self.bounds[0] + supper = self.upper + supper_b = self.bounds[1] + olower = other.lower + olower_b = other.bounds[0] + oupper = other.upper + oupper_b = other.bounds[1] + + if self._compare_edges(slower, slower_b, olower, olower_b) < 0: + rlower = slower + rlower_b = slower_b + else: + rlower = olower + rlower_b = olower_b + + if self._compare_edges(supper, supper_b, oupper, oupper_b) > 0: + rupper = supper + rupper_b = supper_b + else: + rupper = oupper + rupper_b = oupper_b + + return Range(rlower, rupper, bounds=rlower_b + rupper_b) + + __add__ = union + + def difference(self, other: Range) -> Range: + """Compute the difference between this range and the `other`. + + This raises a ``ValueError`` exception if the two ranges are + "disjunct", that is neither adjacent nor overlapping. """ - raise NotImplementedError("not yet implemented") + + # Subtracting an empty range is a no-op + if self.empty or other.empty: + return self + + slower = self.lower + slower_b = self.bounds[0] + supper = self.upper + supper_b = self.bounds[1] + olower = other.lower + olower_b = other.bounds[0] + oupper = other.upper + oupper_b = other.bounds[1] + + sl_vs_ol = self._compare_edges(slower, slower_b, olower, olower_b) + su_vs_ou = self._compare_edges(supper, supper_b, oupper, oupper_b) + if sl_vs_ol < 0 and su_vs_ou > 0: + raise ValueError( + "Subtracting a strictly inner range is not implemented" + ) + + sl_vs_ou = self._compare_edges(slower, slower_b, oupper, oupper_b) + su_vs_ol = self._compare_edges(supper, supper_b, olower, olower_b) + + # If the ranges do not overlap, result is simply the first + if sl_vs_ou > 0 or su_vs_ol < 0: + return self + + # If this range is completely contained by the other, result is empty + if sl_vs_ol >= 0 and su_vs_ou <= 0: + return Range(None, None, empty=True) + + # If this range extends to the left of the other and ends in its + # middle + if sl_vs_ol <= 0 and su_vs_ol >= 0 and su_vs_ou <= 0: + rupper_b = ")" if olower_b == "[" else "]" + if ( + slower_b != "[" + and rupper_b != "]" + and self._compare_edges(slower, slower_b, olower, rupper_b) + == 0 + ): + return Range(None, None, empty=True) + else: + return Range(slower, olower, bounds=slower_b + rupper_b) + + # If this range starts in the middle of the other and extends to its + # right + if sl_vs_ol >= 0 and su_vs_ou >= 0 and sl_vs_ou <= 0: + rlower_b = "(" if oupper_b == "]" else "[" + if ( + rlower_b != "[" + and supper_b != "]" + and self._compare_edges(oupper, rlower_b, supper, supper_b) + == 0 + ): + return Range(None, None, empty=True) + else: + return Range(oupper, supper, bounds=rlower_b + supper_b) + + assert False, f"Unhandled case computing {self} - {other}" + + __sub__ = difference def __str__(self): return self._stringify() @@ -390,13 +688,24 @@ class AbstractRange(sqltypes.TypeEngine): """ return self.expr.op("-|-", is_comparison=True)(other) - def __add__(self, other): + def union(self, other): """Range expression. Returns the union of the two ranges. Will raise an exception if the resulting range is not contiguous. """ return self.expr.op("+")(other) + __add__ = union + + def difference(self, other): + """Range expression. Returns the union of the two ranges. + Will raise an exception if the resulting range is not + contiguous. + """ + return self.expr.op("-")(other) + + __sub__ = difference + class AbstractRangeImpl(AbstractRange): """marker for AbstractRange that will apply a subclass-specific |