diff options
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r-- | lib/sqlalchemy/orm/relationships.py | 20 | ||||
-rw-r--r-- | lib/sqlalchemy/orm/util.py | 91 | ||||
-rw-r--r-- | lib/sqlalchemy/util/typing.py | 67 |
3 files changed, 140 insertions, 38 deletions
diff --git a/lib/sqlalchemy/orm/relationships.py b/lib/sqlalchemy/orm/relationships.py index 6d388a630..e0922a538 100644 --- a/lib/sqlalchemy/orm/relationships.py +++ b/lib/sqlalchemy/orm/relationships.py @@ -88,6 +88,7 @@ from ..sql.util import selectables_overlap from ..sql.util import visit_binary_product from ..util.typing import de_optionalize_union_types from ..util.typing import Literal +from ..util.typing import resolve_name_to_real_class_name if typing.TYPE_CHECKING: from ._typing import _EntityType @@ -1729,6 +1730,7 @@ class RelationshipProperty( return argument = extracted_mapped_annotation + assert originating_module is not None is_write_only = mapped_container is not None and issubclass( mapped_container, WriteOnlyMapped @@ -1765,7 +1767,10 @@ class RelationshipProperty( type_arg = argument.__args__[0] # type: ignore if hasattr(type_arg, "__forward_arg__"): str_argument = type_arg.__forward_arg__ - argument = str_argument + + argument = resolve_name_to_real_class_name( + str_argument, originating_module + ) else: argument = type_arg else: @@ -1775,6 +1780,10 @@ class RelationshipProperty( elif hasattr(argument, "__forward_arg__"): argument = argument.__forward_arg__ # type: ignore + argument = resolve_name_to_real_class_name( + argument, originating_module + ) + # we don't allow the collection class to be a # __forward_arg__ right now, so if we see a forward arg here, # we know there was no collection class either @@ -1785,7 +1794,14 @@ class RelationshipProperty( ): self.uselist = False - self.argument = cast("_RelationshipArgumentType[_T]", argument) + # ticket #8759 + # if a lead argument was given to relationship(), like + # `relationship("B")`, use that, don't replace it with class we + # found in the annotation. The declarative_scan() method call here is + # still useful, as we continue to derive collection type and do + # checking of the annotation in any case. + if self.argument is None: + self.argument = cast("_RelationshipArgumentType[_T]", argument) @util.preload_module("sqlalchemy.orm.mapper") def _setup_entity(self, __argument: Any = None) -> None: diff --git a/lib/sqlalchemy/orm/util.py b/lib/sqlalchemy/orm/util.py index e64c93636..6cd98f5ea 100644 --- a/lib/sqlalchemy/orm/util.py +++ b/lib/sqlalchemy/orm/util.py @@ -78,6 +78,7 @@ from ..sql.elements import KeyedColumnElement from ..sql.selectable import FromClause from ..util.langhelpers import MemoizedSlots from ..util.typing import de_stringify_annotation +from ..util.typing import eval_name_only from ..util.typing import is_origin_of_cls from ..util.typing import Literal from ..util.typing import typing_get_origin @@ -2034,35 +2035,75 @@ def _is_mapped_annotation( return is_origin_of_cls(annotated, _MappedAnnotationBase) -def _cleanup_mapped_str_annotation(annotation: str) -> str: +class _CleanupError(Exception): + pass + + +def _cleanup_mapped_str_annotation( + annotation: str, originating_module: str +) -> str: # fix up an annotation that comes in as the form: # 'Mapped[List[Address]]' so that it instead looks like: # 'Mapped[List["Address"]]' , which will allow us to get # "Address" as a string + # additionally, resolve symbols for these names since this is where + # we'd have to do it + inner: Optional[Match[str]] mm = re.match(r"^(.+?)\[(.+)\]$", annotation) - if mm and mm.group(1) in ("Mapped", "WriteOnlyMapped", "DynamicMapped"): - stack = [] - inner = mm - while True: - stack.append(inner.group(1)) - g2 = inner.group(2) - inner = re.match(r"^(.+?)\[(.+)\]$", g2) - if inner is None: - stack.append(g2) - break - - # stack: ['Mapped', 'List', 'Address'] - if not re.match(r"""^["'].*["']$""", stack[-1]): - stripchars = "\"' " - stack[-1] = ", ".join( - f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",") - ) - # stack: ['Mapped', 'List', '"Address"'] - annotation = "[".join(stack) + ("]" * (len(stack) - 1)) + if not mm: + return annotation + + # ticket #8759. Resolve the Mapped name to a real symbol. + # originally this just checked the name. + try: + obj = eval_name_only(mm.group(1), originating_module) + except NameError as ne: + raise _CleanupError( + f'For annotation "{annotation}", could not resolve ' + f'container type "{mm.group(1)}". ' + "Please ensure this type is imported at the module level " + "outside of TYPE_CHECKING blocks" + ) from ne + + try: + if issubclass(obj, _MappedAnnotationBase): + real_symbol = obj.__name__ + else: + return annotation + except TypeError: + # avoid isinstance(obj, type) check, just catch TypeError + return annotation + + # note: if one of the codepaths above didn't define real_symbol and + # then didn't return, real_symbol raises UnboundLocalError + # which is actually a NameError, and the calling routines don't + # notice this since they are catching NameError anyway. Just in case + # this is being modified in the future, something to be aware of. + + stack = [] + inner = mm + while True: + stack.append(real_symbol if mm is inner else inner.group(1)) + g2 = inner.group(2) + inner = re.match(r"^(.+?)\[(.+)\]$", g2) + if inner is None: + stack.append(g2) + break + + # stack: ['Mapped', 'List', 'Address'] + if not re.match(r"""^["'].*["']$""", stack[-1]): + stripchars = "\"' " + stack[-1] = ", ".join( + f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",") + ) + # stack: ['Mapped', 'List', '"Address"'] + + annotation = "[".join(stack) + ("]" * (len(stack) - 1)) + return annotation @@ -2101,12 +2142,18 @@ def _extract_mapped_subtype( originating_module, _cleanup_mapped_str_annotation, ) + except _CleanupError as ce: + raise sa_exc.ArgumentError( + f"Could not interpret annotation {raw_annotation}. " + "Check that it uses names that are correctly imported at the " + "module level. See chained stack trace for more hints." + ) from ce except NameError as ne: if raiseerr and "Mapped[" in raw_annotation: # type: ignore raise sa_exc.ArgumentError( f"Could not interpret annotation {raw_annotation}. " - "Check that it's not using names that might not be imported " - "at the module level. See chained stack trace for more hints." + "Check that it uses names that are correctly imported at the " + "module level. See chained stack trace for more hints." ) from ne annotated = raw_annotation # type: ignore diff --git a/lib/sqlalchemy/util/typing.py b/lib/sqlalchemy/util/typing.py index b5d918a74..e4674a44c 100644 --- a/lib/sqlalchemy/util/typing.py +++ b/lib/sqlalchemy/util/typing.py @@ -85,7 +85,7 @@ def de_stringify_annotation( cls: Type[Any], annotation: _AnnotationScanType, originating_module: str, - str_cleanup_fn: Optional[Callable[[str], str]] = None, + str_cleanup_fn: Optional[Callable[[str, str], str]] = None, ) -> Type[Any]: """Resolve annotations that may be string based into real objects. @@ -109,26 +109,65 @@ def de_stringify_annotation( if isinstance(annotation, str): if str_cleanup_fn: - annotation = str_cleanup_fn(annotation) - base_globals: "Dict[str, Any]" = getattr( - sys.modules.get(originating_module, None), "__dict__", {} - ) - - try: - annotation = eval(annotation, base_globals, None) - except NameError as err: - # breakpoint() - raise NameError( - f"Could not de-stringify annotation {annotation}" - ) from err + annotation = str_cleanup_fn(annotation, originating_module) + + annotation = eval_expression(annotation, originating_module) return annotation # type: ignore +def eval_expression(expression: str, module_name: str) -> Any: + try: + base_globals: Dict[str, Any] = sys.modules[module_name].__dict__ + except KeyError as ke: + raise NameError( + f"Module {module_name} isn't present in sys.modules; can't " + f"evaluate expression {expression}" + ) from ke + try: + annotation = eval(expression, base_globals, None) + except Exception as err: + raise NameError( + f"Could not de-stringify annotation {expression}" + ) from err + else: + return annotation + + +def eval_name_only(name: str, module_name: str) -> Any: + + try: + base_globals: Dict[str, Any] = sys.modules[module_name].__dict__ + except KeyError as ke: + raise NameError( + f"Module {module_name} isn't present in sys.modules; can't " + f"resolve name {name}" + ) from ke + + # name only, just look in globals. eval() works perfectly fine here, + # however we are seeking to have this be faster, as this occurs for + # every Mapper[] keyword, etc. depending on configuration + try: + return base_globals[name] + except KeyError as ke: + raise NameError( + f"Could not locate name {name} in module {module_name}" + ) from ke + + +def resolve_name_to_real_class_name(name: str, module_name: str) -> str: + try: + obj = eval_name_only(name, module_name) + except NameError: + return name + else: + return getattr(obj, "__name__", name) + + def de_stringify_union_elements( cls: Type[Any], annotation: _AnnotationScanType, originating_module: str, - str_cleanup_fn: Optional[Callable[[str], str]] = None, + str_cleanup_fn: Optional[Callable[[str, str], str]] = None, ) -> Type[Any]: return make_union_type( *[ |