summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/sql/default_comparator.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/sql/default_comparator.py')
-rw-r--r--lib/sqlalchemy/sql/default_comparator.py279
1 files changed, 190 insertions, 89 deletions
diff --git a/lib/sqlalchemy/sql/default_comparator.py b/lib/sqlalchemy/sql/default_comparator.py
index 55a586285..1759e686e 100644
--- a/lib/sqlalchemy/sql/default_comparator.py
+++ b/lib/sqlalchemy/sql/default_comparator.py
@@ -8,6 +8,15 @@
"""Default implementation of SQL comparison operations.
"""
+import typing
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import NoReturn
+from typing import Optional
+from typing import Tuple
+from typing import Type
+from typing import Union
from . import coercions
from . import operators
@@ -16,28 +25,38 @@ from . import type_api
from .elements import and_
from .elements import BinaryExpression
from .elements import ClauseList
-from .elements import collate
+from .elements import CollationClause
from .elements import CollectionAggregate
from .elements import False_
from .elements import Null
from .elements import or_
from .elements import True_
from .elements import UnaryExpression
+from .operators import OperatorType
from .. import exc
from .. import util
+_T = typing.TypeVar("_T", bound=Any)
+
+if typing.TYPE_CHECKING:
+ from .elements import ColumnElement
+ from .sqltypes import TypeEngine
+
def _boolean_compare(
- expr,
- op,
- obj,
- negate=None,
- reverse=False,
+ expr: "ColumnElement",
+ op: OperatorType,
+ obj: roles.BinaryElementRole,
+ *,
+ negate_op: Optional[OperatorType] = None,
+ reverse: bool = False,
_python_is_types=(util.NoneType, bool),
_any_all_expr=False,
- result_type=None,
- **kwargs,
-):
+ result_type: Optional[
+ Union[Type["TypeEngine[bool]"], "TypeEngine[bool]"]
+ ] = None,
+ **kwargs: Any,
+) -> BinaryExpression[bool]:
if result_type is None:
result_type = type_api.BOOLEANTYPE
@@ -54,7 +73,7 @@ def _boolean_compare(
coercions.expect(roles.ConstExprRole, obj),
op,
type_=result_type,
- negate=negate,
+ negate=negate_op,
modifiers=kwargs,
)
elif op in (
@@ -66,7 +85,7 @@ def _boolean_compare(
coercions.expect(roles.ConstExprRole, obj),
op,
type_=result_type,
- negate=negate,
+ negate=negate_op,
modifiers=kwargs,
)
elif _any_all_expr:
@@ -104,11 +123,21 @@ def _boolean_compare(
if reverse:
return BinaryExpression(
- obj, expr, op, type_=result_type, negate=negate, modifiers=kwargs
+ obj,
+ expr,
+ op,
+ type_=result_type,
+ negate=negate_op,
+ modifiers=kwargs,
)
else:
return BinaryExpression(
- expr, obj, op, type_=result_type, negate=negate, modifiers=kwargs
+ expr,
+ obj,
+ op,
+ type_=result_type,
+ negate=negate_op,
+ modifiers=kwargs,
)
@@ -124,15 +153,26 @@ def _custom_op_operate(expr, op, obj, reverse=False, result_type=None, **kw):
)
-def _binary_operate(expr, op, obj, reverse=False, result_type=None, **kw):
- obj = coercions.expect(
+def _binary_operate(
+ expr: "ColumnElement",
+ op: OperatorType,
+ obj: roles.BinaryElementRole,
+ *,
+ reverse=False,
+ result_type: Optional[
+ Union[Type["TypeEngine[_T]"], "TypeEngine[_T]"]
+ ] = None,
+ **kw: Any,
+) -> BinaryExpression[_T]:
+
+ coerced_obj = coercions.expect(
roles.BinaryElementRole, obj, expr=expr, operator=op
)
if reverse:
- left, right = obj, expr
+ left, right = coerced_obj, expr
else:
- left, right = expr, obj
+ left, right = expr, coerced_obj
if result_type is None:
op, result_type = left.comparator._adapt_expression(
@@ -142,7 +182,7 @@ def _binary_operate(expr, op, obj, reverse=False, result_type=None, **kw):
return BinaryExpression(left, right, op, type_=result_type, modifiers=kw)
-def _conjunction_operate(expr, op, other, **kw):
+def _conjunction_operate(expr, op, other, **kw) -> "ColumnElement":
if op is operators.and_:
return and_(expr, other)
elif op is operators.or_:
@@ -151,11 +191,11 @@ def _conjunction_operate(expr, op, other, **kw):
raise NotImplementedError()
-def _scalar(expr, op, fn, **kw):
+def _scalar(expr, op, fn, **kw) -> "ColumnElement":
return fn(expr)
-def _in_impl(expr, op, seq_or_selectable, negate_op, **kw):
+def _in_impl(expr, op, seq_or_selectable, negate_op, **kw) -> "ColumnElement":
seq_or_selectable = coercions.expect(
roles.InElementRole, seq_or_selectable, expr=expr, operator=op
)
@@ -163,11 +203,11 @@ def _in_impl(expr, op, seq_or_selectable, negate_op, **kw):
op, negate_op = seq_or_selectable._annotations["in_ops"]
return _boolean_compare(
- expr, op, seq_or_selectable, negate=negate_op, **kw
+ expr, op, seq_or_selectable, negate_op=negate_op, **kw
)
-def _getitem_impl(expr, op, other, **kw):
+def _getitem_impl(expr, op, other, **kw) -> "ColumnElement":
if isinstance(expr.type, type_api.INDEXABLE):
other = coercions.expect(
roles.BinaryElementRole, other, expr=expr, operator=op
@@ -177,13 +217,13 @@ def _getitem_impl(expr, op, other, **kw):
_unsupported_impl(expr, op, other, **kw)
-def _unsupported_impl(expr, op, *arg, **kw):
+def _unsupported_impl(expr, op, *arg, **kw) -> NoReturn:
raise NotImplementedError(
"Operator '%s' is not supported on " "this expression" % op.__name__
)
-def _inv_impl(expr, op, **kw):
+def _inv_impl(expr, op, **kw) -> "ColumnElement":
"""See :meth:`.ColumnOperators.__inv__`."""
# undocumented element currently used by the ORM for
@@ -194,12 +234,12 @@ def _inv_impl(expr, op, **kw):
return expr._negate()
-def _neg_impl(expr, op, **kw):
+def _neg_impl(expr, op, **kw) -> "ColumnElement":
"""See :meth:`.ColumnOperators.__neg__`."""
return UnaryExpression(expr, operator=operators.neg, type_=expr.type)
-def _match_impl(expr, op, other, **kw):
+def _match_impl(expr, op, other, **kw) -> "ColumnElement":
"""See :meth:`.ColumnOperators.match`."""
return _boolean_compare(
@@ -212,21 +252,21 @@ def _match_impl(expr, op, other, **kw):
operator=operators.match_op,
),
result_type=type_api.MATCHTYPE,
- negate=operators.not_match_op
+ negate_op=operators.not_match_op
if op is operators.match_op
else operators.match_op,
**kw,
)
-def _distinct_impl(expr, op, **kw):
+def _distinct_impl(expr, op, **kw) -> "ColumnElement":
"""See :meth:`.ColumnOperators.distinct`."""
return UnaryExpression(
expr, operator=operators.distinct_op, type_=expr.type
)
-def _between_impl(expr, op, cleft, cright, **kw):
+def _between_impl(expr, op, cleft, cright, **kw) -> "ColumnElement":
"""See :meth:`.ColumnOperators.between`."""
return BinaryExpression(
expr,
@@ -255,11 +295,11 @@ def _between_impl(expr, op, cleft, cright, **kw):
)
-def _collate_impl(expr, op, other, **kw):
- return collate(expr, other)
+def _collate_impl(expr, op, collation, **kw) -> "ColumnElement":
+ return CollationClause._create_collation_expression(expr, collation)
-def _regexp_match_impl(expr, op, pattern, flags, **kw):
+def _regexp_match_impl(expr, op, pattern, flags, **kw) -> "ColumnElement":
if flags is not None:
flags = coercions.expect(
roles.BinaryElementRole,
@@ -272,14 +312,16 @@ def _regexp_match_impl(expr, op, pattern, flags, **kw):
op,
pattern,
flags=flags,
- negate=operators.not_regexp_match_op
+ negate_op=operators.not_regexp_match_op
if op is operators.regexp_match_op
else operators.regexp_match_op,
**kw,
)
-def _regexp_replace_impl(expr, op, pattern, replacement, flags, **kw):
+def _regexp_replace_impl(
+ expr, op, pattern, replacement, flags, **kw
+) -> "ColumnElement":
replacement = coercions.expect(
roles.BinaryElementRole,
replacement,
@@ -299,59 +341,118 @@ def _regexp_replace_impl(expr, op, pattern, replacement, flags, **kw):
# a mapping of operators with the method they use, along with
-# their negated operator for comparison operators
-operator_lookup = {
- "and_": (_conjunction_operate,),
- "or_": (_conjunction_operate,),
- "inv": (_inv_impl,),
- "add": (_binary_operate,),
- "mul": (_binary_operate,),
- "sub": (_binary_operate,),
- "div": (_binary_operate,),
- "mod": (_binary_operate,),
- "truediv": (_binary_operate,),
- "floordiv": (_binary_operate,),
- "custom_op": (_custom_op_operate,),
- "json_path_getitem_op": (_binary_operate,),
- "json_getitem_op": (_binary_operate,),
- "concat_op": (_binary_operate,),
- "any_op": (_scalar, CollectionAggregate._create_any),
- "all_op": (_scalar, CollectionAggregate._create_all),
- "lt": (_boolean_compare, operators.ge),
- "le": (_boolean_compare, operators.gt),
- "ne": (_boolean_compare, operators.eq),
- "gt": (_boolean_compare, operators.le),
- "ge": (_boolean_compare, operators.lt),
- "eq": (_boolean_compare, operators.ne),
- "is_distinct_from": (_boolean_compare, operators.is_not_distinct_from),
- "is_not_distinct_from": (_boolean_compare, operators.is_distinct_from),
- "like_op": (_boolean_compare, operators.not_like_op),
- "ilike_op": (_boolean_compare, operators.not_ilike_op),
- "not_like_op": (_boolean_compare, operators.like_op),
- "not_ilike_op": (_boolean_compare, operators.ilike_op),
- "contains_op": (_boolean_compare, operators.not_contains_op),
- "startswith_op": (_boolean_compare, operators.not_startswith_op),
- "endswith_op": (_boolean_compare, operators.not_endswith_op),
- "desc_op": (_scalar, UnaryExpression._create_desc),
- "asc_op": (_scalar, UnaryExpression._create_asc),
- "nulls_first_op": (_scalar, UnaryExpression._create_nulls_first),
- "nulls_last_op": (_scalar, UnaryExpression._create_nulls_last),
- "in_op": (_in_impl, operators.not_in_op),
- "not_in_op": (_in_impl, operators.in_op),
- "is_": (_boolean_compare, operators.is_),
- "is_not": (_boolean_compare, operators.is_not),
- "collate": (_collate_impl,),
- "match_op": (_match_impl,),
- "not_match_op": (_match_impl,),
- "distinct_op": (_distinct_impl,),
- "between_op": (_between_impl,),
- "not_between_op": (_between_impl,),
- "neg": (_neg_impl,),
- "getitem": (_getitem_impl,),
- "lshift": (_unsupported_impl,),
- "rshift": (_unsupported_impl,),
- "contains": (_unsupported_impl,),
- "regexp_match_op": (_regexp_match_impl,),
- "not_regexp_match_op": (_regexp_match_impl,),
- "regexp_replace_op": (_regexp_replace_impl,),
+# additional keyword arguments to be passed
+operator_lookup: Dict[
+ str, Tuple[Callable[..., "ColumnElement"], util.immutabledict]
+] = {
+ "and_": (_conjunction_operate, util.EMPTY_DICT),
+ "or_": (_conjunction_operate, util.EMPTY_DICT),
+ "inv": (_inv_impl, util.EMPTY_DICT),
+ "add": (_binary_operate, util.EMPTY_DICT),
+ "mul": (_binary_operate, util.EMPTY_DICT),
+ "sub": (_binary_operate, util.EMPTY_DICT),
+ "div": (_binary_operate, util.EMPTY_DICT),
+ "mod": (_binary_operate, util.EMPTY_DICT),
+ "truediv": (_binary_operate, util.EMPTY_DICT),
+ "floordiv": (_binary_operate, util.EMPTY_DICT),
+ "custom_op": (_custom_op_operate, util.EMPTY_DICT),
+ "json_path_getitem_op": (_binary_operate, util.EMPTY_DICT),
+ "json_getitem_op": (_binary_operate, util.EMPTY_DICT),
+ "concat_op": (_binary_operate, util.EMPTY_DICT),
+ "any_op": (
+ _scalar,
+ util.immutabledict({"fn": CollectionAggregate._create_any}),
+ ),
+ "all_op": (
+ _scalar,
+ util.immutabledict({"fn": CollectionAggregate._create_all}),
+ ),
+ "lt": (_boolean_compare, util.immutabledict({"negate_op": operators.ge})),
+ "le": (_boolean_compare, util.immutabledict({"negate_op": operators.gt})),
+ "ne": (_boolean_compare, util.immutabledict({"negate_op": operators.eq})),
+ "gt": (_boolean_compare, util.immutabledict({"negate_op": operators.le})),
+ "ge": (_boolean_compare, util.immutabledict({"negate_op": operators.lt})),
+ "eq": (_boolean_compare, util.immutabledict({"negate_op": operators.ne})),
+ "is_distinct_from": (
+ _boolean_compare,
+ util.immutabledict({"negate_op": operators.is_not_distinct_from}),
+ ),
+ "is_not_distinct_from": (
+ _boolean_compare,
+ util.immutabledict({"negate_op": operators.is_distinct_from}),
+ ),
+ "like_op": (
+ _boolean_compare,
+ util.immutabledict({"negate_op": operators.not_like_op}),
+ ),
+ "ilike_op": (
+ _boolean_compare,
+ util.immutabledict({"negate_op": operators.not_ilike_op}),
+ ),
+ "not_like_op": (
+ _boolean_compare,
+ util.immutabledict({"negate_op": operators.like_op}),
+ ),
+ "not_ilike_op": (
+ _boolean_compare,
+ util.immutabledict({"negate_op": operators.ilike_op}),
+ ),
+ "contains_op": (
+ _boolean_compare,
+ util.immutabledict({"negate_op": operators.not_contains_op}),
+ ),
+ "startswith_op": (
+ _boolean_compare,
+ util.immutabledict({"negate_op": operators.not_startswith_op}),
+ ),
+ "endswith_op": (
+ _boolean_compare,
+ util.immutabledict({"negate_op": operators.not_endswith_op}),
+ ),
+ "desc_op": (
+ _scalar,
+ util.immutabledict({"fn": UnaryExpression._create_desc}),
+ ),
+ "asc_op": (
+ _scalar,
+ util.immutabledict({"fn": UnaryExpression._create_asc}),
+ ),
+ "nulls_first_op": (
+ _scalar,
+ util.immutabledict({"fn": UnaryExpression._create_nulls_first}),
+ ),
+ "nulls_last_op": (
+ _scalar,
+ util.immutabledict({"fn": UnaryExpression._create_nulls_last}),
+ ),
+ "in_op": (
+ _in_impl,
+ util.immutabledict({"negate_op": operators.not_in_op}),
+ ),
+ "not_in_op": (
+ _in_impl,
+ util.immutabledict({"negate_op": operators.in_op}),
+ ),
+ "is_": (
+ _boolean_compare,
+ util.immutabledict({"negate_op": operators.is_}),
+ ),
+ "is_not": (
+ _boolean_compare,
+ util.immutabledict({"negate_op": operators.is_not}),
+ ),
+ "collate": (_collate_impl, util.EMPTY_DICT),
+ "match_op": (_match_impl, util.EMPTY_DICT),
+ "not_match_op": (_match_impl, util.EMPTY_DICT),
+ "distinct_op": (_distinct_impl, util.EMPTY_DICT),
+ "between_op": (_between_impl, util.EMPTY_DICT),
+ "not_between_op": (_between_impl, util.EMPTY_DICT),
+ "neg": (_neg_impl, util.EMPTY_DICT),
+ "getitem": (_getitem_impl, util.EMPTY_DICT),
+ "lshift": (_unsupported_impl, util.EMPTY_DICT),
+ "rshift": (_unsupported_impl, util.EMPTY_DICT),
+ "contains": (_unsupported_impl, util.EMPTY_DICT),
+ "regexp_match_op": (_regexp_match_impl, util.EMPTY_DICT),
+ "not_regexp_match_op": (_regexp_match_impl, util.EMPTY_DICT),
+ "regexp_replace_op": (_regexp_replace_impl, util.EMPTY_DICT),
}