diff options
author | mike bayer <mike_mp@zzzcomputing.com> | 2022-02-13 20:37:12 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@ci3.zzzcomputing.com> | 2022-02-13 20:37:12 +0000 |
commit | d6b3c82b0c329730bcaff42b4bb39dba83acb536 (patch) | |
tree | d6b7f744a35c8d89615eeb0504ee7a4193f95642 /lib/sqlalchemy/orm/util.py | |
parent | 260ade78a70d51378de9e7b9456bfe6218859b6c (diff) | |
parent | e545298e35ea9f126054b337e4b5ba01988b29f7 (diff) | |
download | sqlalchemy-d6b3c82b0c329730bcaff42b4bb39dba83acb536.tar.gz |
Merge "establish mypy / typing approach for v2.0" into main
Diffstat (limited to 'lib/sqlalchemy/orm/util.py')
-rw-r--r-- | lib/sqlalchemy/orm/util.py | 466 |
1 files changed, 170 insertions, 296 deletions
diff --git a/lib/sqlalchemy/orm/util.py b/lib/sqlalchemy/orm/util.py index 75f711007..45c578355 100644 --- a/lib/sqlalchemy/orm/util.py +++ b/lib/sqlalchemy/orm/util.py @@ -5,13 +5,22 @@ # This module is part of SQLAlchemy and is released under # the MIT License: https://www.opensource.org/licenses/mit-license.php - import re import types +import typing +from typing import Any +from typing import Generic +from typing import Optional +from typing import overload +from typing import Tuple +from typing import Type +from typing import TypeVar +from typing import Union import weakref from . import attributes # noqa from .base import _class_to_mapper # noqa +from .base import _IdentityKeyType from .base import _never_set # noqa from .base import _none_set # noqa from .base import attribute_str # noqa @@ -45,8 +54,17 @@ from ..sql import util as sql_util from ..sql import visitors from ..sql.annotation import SupportsCloneAnnotations from ..sql.base import ColumnCollection +from ..sql.selectable import FromClause from ..util.langhelpers import MemoizedSlots +from ..util.typing import de_stringify_annotation +from ..util.typing import is_origin_of + +if typing.TYPE_CHECKING: + from .mapper import Mapper + from ..engine import Row + from ..sql.selectable import Alias +_T = TypeVar("_T", bound=Any) all_cascades = frozenset( ( @@ -276,7 +294,28 @@ def polymorphic_union( return sql.union_all(*result).alias(aliasname) -def identity_key(*args, **kwargs): +@overload +def identity_key( + class_: type, ident: Tuple[Any, ...], *, identity_token: Optional[str] +) -> _IdentityKeyType: + ... + + +@overload +def identity_key(*, instance: Any) -> _IdentityKeyType: + ... + + +@overload +def identity_key( + class_: type, *, row: "Row", identity_token: Optional[str] +) -> _IdentityKeyType: + ... + + +def identity_key( + class_=None, ident=None, *, instance=None, row=None, identity_token=None +) -> _IdentityKeyType: r"""Generate "identity key" tuples, as are used as keys in the :attr:`.Session.identity_map` dictionary. @@ -340,29 +379,11 @@ def identity_key(*args, **kwargs): .. versionadded:: 1.2 added identity_token """ - if args: - row = None - largs = len(args) - if largs == 1: - class_ = args[0] - try: - row = kwargs.pop("row") - except KeyError: - ident = kwargs.pop("ident") - elif largs in (2, 3): - class_, ident = args - else: - raise sa_exc.ArgumentError( - "expected up to three positional arguments, " "got %s" % largs - ) - - identity_token = kwargs.pop("identity_token", None) - if kwargs: - raise sa_exc.ArgumentError( - "unknown keyword arguments: %s" % ", ".join(kwargs) - ) + if class_ is not None: mapper = class_mapper(class_) if row is None: + if ident is None: + raise sa_exc.ArgumentError("ident or row is required") return mapper.identity_key_from_primary_key( util.to_list(ident), identity_token=identity_token ) @@ -370,14 +391,11 @@ def identity_key(*args, **kwargs): return mapper.identity_key_from_row( row, identity_token=identity_token ) - else: - instance = kwargs.pop("instance") - if kwargs: - raise sa_exc.ArgumentError( - "unknown keyword arguments: %s" % ", ".join(kwargs.keys) - ) + elif instance is not None: mapper = object_mapper(instance) return mapper.identity_key_from_instance(instance) + else: + raise sa_exc.ArgumentError("class or instance is required") class ORMAdapter(sql_util.ColumnAdapter): @@ -420,7 +438,7 @@ class ORMAdapter(sql_util.ColumnAdapter): return not entity or entity.isa(self.mapper) -class AliasedClass: +class AliasedClass(inspection.Inspectable["AliasedInsp"], Generic[_T]): r"""Represents an "aliased" form of a mapped class for usage with Query. The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias` @@ -481,7 +499,7 @@ class AliasedClass: def __init__( self, - mapped_class_or_ac, + mapped_class_or_ac: Union[Type[_T], "Mapper[_T]", "AliasedClass[_T]"], alias=None, name=None, flat=False, @@ -611,6 +629,7 @@ class AliasedInsp( ORMEntityColumnsClauseRole, ORMFromClauseRole, sql_base.HasCacheKey, + roles.HasFromClauseElement, InspectionAttr, MemoizedSlots, ): @@ -747,6 +766,73 @@ class AliasedInsp( self._target = mapped_class_or_ac # self._target = mapper.class_ # mapped_class_or_ac + @classmethod + def _alias_factory( + cls, + element: Union[ + Type[_T], "Mapper[_T]", "FromClause", "AliasedClass[_T]" + ], + alias=None, + name=None, + flat=False, + adapt_on_names=False, + ) -> Union["AliasedClass[_T]", "Alias"]: + + if isinstance(element, FromClause): + if adapt_on_names: + raise sa_exc.ArgumentError( + "adapt_on_names only applies to ORM elements" + ) + if name: + return element.alias(name=name, flat=flat) + else: + return coercions.expect( + roles.AnonymizedFromClauseRole, element, flat=flat + ) + else: + return AliasedClass( + element, + alias=alias, + flat=flat, + name=name, + adapt_on_names=adapt_on_names, + ) + + @classmethod + def _with_polymorphic_factory( + cls, + base, + classes, + selectable=False, + flat=False, + polymorphic_on=None, + aliased=False, + innerjoin=False, + _use_mapper_path=False, + ): + + primary_mapper = _class_to_mapper(base) + + if selectable not in (None, False) and flat: + raise sa_exc.ArgumentError( + "the 'flat' and 'selectable' arguments cannot be passed " + "simultaneously to with_polymorphic()" + ) + + mappers, selectable = primary_mapper._with_polymorphic_args( + classes, selectable, innerjoin=innerjoin + ) + if aliased or flat: + selectable = selectable._anonymous_fromclause(flat=flat) + return AliasedClass( + base, + selectable, + with_polymorphic_mappers=mappers, + with_polymorphic_discriminator=polymorphic_on, + use_mapper_path=_use_mapper_path, + represents_outer_join=not innerjoin, + ) + @property def entity(self): # to eliminate reference cycles, the AliasedClass is held weakly. @@ -1107,215 +1193,6 @@ inspection._inspects(AliasedClass)(lambda target: target._aliased_insp) inspection._inspects(AliasedInsp)(lambda target: target) -def aliased(element, alias=None, name=None, flat=False, adapt_on_names=False): - """Produce an alias of the given element, usually an :class:`.AliasedClass` - instance. - - E.g.:: - - my_alias = aliased(MyClass) - - session.query(MyClass, my_alias).filter(MyClass.id > my_alias.id) - - The :func:`.aliased` function is used to create an ad-hoc mapping of a - mapped class to a new selectable. By default, a selectable is generated - from the normally mapped selectable (typically a :class:`_schema.Table` - ) using the - :meth:`_expression.FromClause.alias` method. However, :func:`.aliased` - can also be - used to link the class to a new :func:`_expression.select` statement. - Also, the :func:`.with_polymorphic` function is a variant of - :func:`.aliased` that is intended to specify a so-called "polymorphic - selectable", that corresponds to the union of several joined-inheritance - subclasses at once. - - For convenience, the :func:`.aliased` function also accepts plain - :class:`_expression.FromClause` constructs, such as a - :class:`_schema.Table` or - :func:`_expression.select` construct. In those cases, the - :meth:`_expression.FromClause.alias` - method is called on the object and the new - :class:`_expression.Alias` object returned. The returned - :class:`_expression.Alias` is not - ORM-mapped in this case. - - .. seealso:: - - :ref:`tutorial_orm_entity_aliases` - in the :ref:`unified_tutorial` - - :ref:`orm_queryguide_orm_aliases` - in the :ref:`queryguide_toplevel` - - :ref:`ormtutorial_aliases` - in the legacy :ref:`ormtutorial_toplevel` - - :param element: element to be aliased. Is normally a mapped class, - but for convenience can also be a :class:`_expression.FromClause` - element. - - :param alias: Optional selectable unit to map the element to. This is - usually used to link the object to a subquery, and should be an aliased - select construct as one would produce from the - :meth:`_query.Query.subquery` method or - the :meth:`_expression.Select.subquery` or - :meth:`_expression.Select.alias` methods of the :func:`_expression.select` - construct. - - :param name: optional string name to use for the alias, if not specified - by the ``alias`` parameter. The name, among other things, forms the - attribute name that will be accessible via tuples returned by a - :class:`_query.Query` object. Not supported when creating aliases - of :class:`_sql.Join` objects. - - :param flat: Boolean, will be passed through to the - :meth:`_expression.FromClause.alias` call so that aliases of - :class:`_expression.Join` objects will alias the individual tables - inside the join, rather than creating a subquery. This is generally - supported by all modern databases with regards to right-nested joins - and generally produces more efficient queries. - - :param adapt_on_names: if True, more liberal "matching" will be used when - mapping the mapped columns of the ORM entity to those of the - given selectable - a name-based match will be performed if the - given selectable doesn't otherwise have a column that corresponds - to one on the entity. The use case for this is when associating - an entity with some derived selectable such as one that uses - aggregate functions:: - - class UnitPrice(Base): - __tablename__ = 'unit_price' - ... - unit_id = Column(Integer) - price = Column(Numeric) - - aggregated_unit_price = Session.query( - func.sum(UnitPrice.price).label('price') - ).group_by(UnitPrice.unit_id).subquery() - - aggregated_unit_price = aliased(UnitPrice, - alias=aggregated_unit_price, adapt_on_names=True) - - Above, functions on ``aggregated_unit_price`` which refer to - ``.price`` will return the - ``func.sum(UnitPrice.price).label('price')`` column, as it is - matched on the name "price". Ordinarily, the "price" function - wouldn't have any "column correspondence" to the actual - ``UnitPrice.price`` column as it is not a proxy of the original. - - """ - if isinstance(element, expression.FromClause): - if adapt_on_names: - raise sa_exc.ArgumentError( - "adapt_on_names only applies to ORM elements" - ) - if name: - return element.alias(name=name, flat=flat) - else: - return coercions.expect( - roles.AnonymizedFromClauseRole, element, flat=flat - ) - else: - return AliasedClass( - element, - alias=alias, - flat=flat, - name=name, - adapt_on_names=adapt_on_names, - ) - - -def with_polymorphic( - base, - classes, - selectable=False, - flat=False, - polymorphic_on=None, - aliased=False, - innerjoin=False, - _use_mapper_path=False, -): - """Produce an :class:`.AliasedClass` construct which specifies - columns for descendant mappers of the given base. - - Using this method will ensure that each descendant mapper's - tables are included in the FROM clause, and will allow filter() - criterion to be used against those tables. The resulting - instances will also have those columns already loaded so that - no "post fetch" of those columns will be required. - - .. seealso:: - - :ref:`with_polymorphic` - full discussion of - :func:`_orm.with_polymorphic`. - - :param base: Base class to be aliased. - - :param classes: a single class or mapper, or list of - class/mappers, which inherit from the base class. - Alternatively, it may also be the string ``'*'``, in which case - all descending mapped classes will be added to the FROM clause. - - :param aliased: when True, the selectable will be aliased. For a - JOIN, this means the JOIN will be SELECTed from inside of a subquery - unless the :paramref:`_orm.with_polymorphic.flat` flag is set to - True, which is recommended for simpler use cases. - - :param flat: Boolean, will be passed through to the - :meth:`_expression.FromClause.alias` call so that aliases of - :class:`_expression.Join` objects will alias the individual tables - inside the join, rather than creating a subquery. This is generally - supported by all modern databases with regards to right-nested joins - and generally produces more efficient queries. Setting this flag is - recommended as long as the resulting SQL is functional. - - :param selectable: a table or subquery that will - be used in place of the generated FROM clause. This argument is - required if any of the desired classes use concrete table - inheritance, since SQLAlchemy currently cannot generate UNIONs - among tables automatically. If used, the ``selectable`` argument - must represent the full set of tables and columns mapped by every - mapped class. Otherwise, the unaccounted mapped columns will - result in their table being appended directly to the FROM clause - which will usually lead to incorrect results. - - When left at its default value of ``False``, the polymorphic - selectable assigned to the base mapper is used for selecting rows. - However, it may also be passed as ``None``, which will bypass the - configured polymorphic selectable and instead construct an ad-hoc - selectable for the target classes given; for joined table inheritance - this will be a join that includes all target mappers and their - subclasses. - - :param polymorphic_on: a column to be used as the "discriminator" - column for the given selectable. If not given, the polymorphic_on - attribute of the base classes' mapper will be used, if any. This - is useful for mappings that don't have polymorphic loading - behavior by default. - - :param innerjoin: if True, an INNER JOIN will be used. This should - only be specified if querying for one specific subtype only - """ - primary_mapper = _class_to_mapper(base) - - if selectable not in (None, False) and flat: - raise sa_exc.ArgumentError( - "the 'flat' and 'selectable' arguments cannot be passed " - "simultaneously to with_polymorphic()" - ) - - mappers, selectable = primary_mapper._with_polymorphic_args( - classes, selectable, innerjoin=innerjoin - ) - if aliased or flat: - selectable = selectable._anonymous_fromclause(flat=flat) - return AliasedClass( - base, - selectable, - with_polymorphic_mappers=mappers, - with_polymorphic_discriminator=polymorphic_on, - use_mapper_path=_use_mapper_path, - represents_outer_join=not innerjoin, - ) - - @inspection._self_inspects class Bundle( ORMColumnsClauseRole, @@ -1667,62 +1544,6 @@ class _ORMJoin(expression.Join): return _ORMJoin(self, right, onclause, isouter=True, full=full) -def join( - left, right, onclause=None, isouter=False, full=False, join_to_left=None -): - r"""Produce an inner join between left and right clauses. - - :func:`_orm.join` is an extension to the core join interface - provided by :func:`_expression.join()`, where the - left and right selectables may be not only core selectable - objects such as :class:`_schema.Table`, but also mapped classes or - :class:`.AliasedClass` instances. The "on" clause can - be a SQL expression, or an attribute or string name - referencing a configured :func:`_orm.relationship`. - - :func:`_orm.join` is not commonly needed in modern usage, - as its functionality is encapsulated within that of the - :meth:`_query.Query.join` method, which features a - significant amount of automation beyond :func:`_orm.join` - by itself. Explicit usage of :func:`_orm.join` - with :class:`_query.Query` involves usage of the - :meth:`_query.Query.select_from` method, as in:: - - from sqlalchemy.orm import join - session.query(User).\ - select_from(join(User, Address, User.addresses)).\ - filter(Address.email_address=='foo@bar.com') - - In modern SQLAlchemy the above join can be written more - succinctly as:: - - session.query(User).\ - join(User.addresses).\ - filter(Address.email_address=='foo@bar.com') - - See :meth:`_query.Query.join` for information on modern usage - of ORM level joins. - - .. deprecated:: 0.8 - - the ``join_to_left`` parameter is deprecated, and will be removed - in a future release. The parameter has no effect. - - """ - return _ORMJoin(left, right, onclause, isouter, full) - - -def outerjoin(left, right, onclause=None, full=False, join_to_left=None): - """Produce a left outer join between left and right clauses. - - This is the "outer join" version of the :func:`_orm.join` function, - featuring the same behavior except that an OUTER JOIN is generated. - See that function's documentation for other usage details. - - """ - return _ORMJoin(left, right, onclause, True, full) - - def with_parent(instance, prop, from_entity=None): """Create filtering criterion that relates this query's primary entity to the given related instance, using established @@ -1964,3 +1785,56 @@ def _getitem(iterable_query, item): return list(iterable_query)[-1] else: return list(iterable_query[item : item + 1])[0] + + +def _is_mapped_annotation(raw_annotation: Union[type, str], cls: type): + annotated = de_stringify_annotation(cls, raw_annotation) + return is_origin_of(annotated, "Mapped", module="sqlalchemy.orm") + + +def _extract_mapped_subtype( + raw_annotation: Union[type, str], + cls: type, + key: str, + attr_cls: type, + required: bool, + is_dataclass_field: bool, +) -> Optional[Union[type, str]]: + + if raw_annotation is None: + + if required: + raise sa_exc.ArgumentError( + f"Python typing annotation is required for attribute " + f'"{cls.__name__}.{key}" when primary argument(s) for ' + f'"{attr_cls.__name__}" construct are None or not present' + ) + return None + + annotated = de_stringify_annotation(cls, raw_annotation) + + if is_dataclass_field: + return annotated + else: + if ( + not hasattr(annotated, "__origin__") + or not issubclass(annotated.__origin__, attr_cls) + and not issubclass(attr_cls, annotated.__origin__) + ): + our_annotated_str = ( + annotated.__name__ + if not isinstance(annotated, str) + else repr(annotated) + ) + raise sa_exc.ArgumentError( + f'Type annotation for "{cls.__name__}.{key}" should use the ' + f'syntax "Mapped[{our_annotated_str}]" or ' + f'"{attr_cls.__name__}[{our_annotated_str}]".' + ) + + if len(annotated.__args__) != 1: + raise sa_exc.ArgumentError( + "Expected sub-type for Mapped[] annotation" + ) + + return annotated.__args__[0] |