summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r--lib/sqlalchemy/orm/relationships.py20
-rw-r--r--lib/sqlalchemy/orm/util.py91
-rw-r--r--lib/sqlalchemy/util/typing.py67
3 files changed, 140 insertions, 38 deletions
diff --git a/lib/sqlalchemy/orm/relationships.py b/lib/sqlalchemy/orm/relationships.py
index 6d388a630..e0922a538 100644
--- a/lib/sqlalchemy/orm/relationships.py
+++ b/lib/sqlalchemy/orm/relationships.py
@@ -88,6 +88,7 @@ from ..sql.util import selectables_overlap
from ..sql.util import visit_binary_product
from ..util.typing import de_optionalize_union_types
from ..util.typing import Literal
+from ..util.typing import resolve_name_to_real_class_name
if typing.TYPE_CHECKING:
from ._typing import _EntityType
@@ -1729,6 +1730,7 @@ class RelationshipProperty(
return
argument = extracted_mapped_annotation
+ assert originating_module is not None
is_write_only = mapped_container is not None and issubclass(
mapped_container, WriteOnlyMapped
@@ -1765,7 +1767,10 @@ class RelationshipProperty(
type_arg = argument.__args__[0] # type: ignore
if hasattr(type_arg, "__forward_arg__"):
str_argument = type_arg.__forward_arg__
- argument = str_argument
+
+ argument = resolve_name_to_real_class_name(
+ str_argument, originating_module
+ )
else:
argument = type_arg
else:
@@ -1775,6 +1780,10 @@ class RelationshipProperty(
elif hasattr(argument, "__forward_arg__"):
argument = argument.__forward_arg__ # type: ignore
+ argument = resolve_name_to_real_class_name(
+ argument, originating_module
+ )
+
# we don't allow the collection class to be a
# __forward_arg__ right now, so if we see a forward arg here,
# we know there was no collection class either
@@ -1785,7 +1794,14 @@ class RelationshipProperty(
):
self.uselist = False
- self.argument = cast("_RelationshipArgumentType[_T]", argument)
+ # ticket #8759
+ # if a lead argument was given to relationship(), like
+ # `relationship("B")`, use that, don't replace it with class we
+ # found in the annotation. The declarative_scan() method call here is
+ # still useful, as we continue to derive collection type and do
+ # checking of the annotation in any case.
+ if self.argument is None:
+ self.argument = cast("_RelationshipArgumentType[_T]", argument)
@util.preload_module("sqlalchemy.orm.mapper")
def _setup_entity(self, __argument: Any = None) -> None:
diff --git a/lib/sqlalchemy/orm/util.py b/lib/sqlalchemy/orm/util.py
index e64c93636..6cd98f5ea 100644
--- a/lib/sqlalchemy/orm/util.py
+++ b/lib/sqlalchemy/orm/util.py
@@ -78,6 +78,7 @@ from ..sql.elements import KeyedColumnElement
from ..sql.selectable import FromClause
from ..util.langhelpers import MemoizedSlots
from ..util.typing import de_stringify_annotation
+from ..util.typing import eval_name_only
from ..util.typing import is_origin_of_cls
from ..util.typing import Literal
from ..util.typing import typing_get_origin
@@ -2034,35 +2035,75 @@ def _is_mapped_annotation(
return is_origin_of_cls(annotated, _MappedAnnotationBase)
-def _cleanup_mapped_str_annotation(annotation: str) -> str:
+class _CleanupError(Exception):
+ pass
+
+
+def _cleanup_mapped_str_annotation(
+ annotation: str, originating_module: str
+) -> str:
# fix up an annotation that comes in as the form:
# 'Mapped[List[Address]]' so that it instead looks like:
# 'Mapped[List["Address"]]' , which will allow us to get
# "Address" as a string
+ # additionally, resolve symbols for these names since this is where
+ # we'd have to do it
+
inner: Optional[Match[str]]
mm = re.match(r"^(.+?)\[(.+)\]$", annotation)
- if mm and mm.group(1) in ("Mapped", "WriteOnlyMapped", "DynamicMapped"):
- stack = []
- inner = mm
- while True:
- stack.append(inner.group(1))
- g2 = inner.group(2)
- inner = re.match(r"^(.+?)\[(.+)\]$", g2)
- if inner is None:
- stack.append(g2)
- break
-
- # stack: ['Mapped', 'List', 'Address']
- if not re.match(r"""^["'].*["']$""", stack[-1]):
- stripchars = "\"' "
- stack[-1] = ", ".join(
- f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
- )
- # stack: ['Mapped', 'List', '"Address"']
- annotation = "[".join(stack) + ("]" * (len(stack) - 1))
+ if not mm:
+ return annotation
+
+ # ticket #8759. Resolve the Mapped name to a real symbol.
+ # originally this just checked the name.
+ try:
+ obj = eval_name_only(mm.group(1), originating_module)
+ except NameError as ne:
+ raise _CleanupError(
+ f'For annotation "{annotation}", could not resolve '
+ f'container type "{mm.group(1)}". '
+ "Please ensure this type is imported at the module level "
+ "outside of TYPE_CHECKING blocks"
+ ) from ne
+
+ try:
+ if issubclass(obj, _MappedAnnotationBase):
+ real_symbol = obj.__name__
+ else:
+ return annotation
+ except TypeError:
+ # avoid isinstance(obj, type) check, just catch TypeError
+ return annotation
+
+ # note: if one of the codepaths above didn't define real_symbol and
+ # then didn't return, real_symbol raises UnboundLocalError
+ # which is actually a NameError, and the calling routines don't
+ # notice this since they are catching NameError anyway. Just in case
+ # this is being modified in the future, something to be aware of.
+
+ stack = []
+ inner = mm
+ while True:
+ stack.append(real_symbol if mm is inner else inner.group(1))
+ g2 = inner.group(2)
+ inner = re.match(r"^(.+?)\[(.+)\]$", g2)
+ if inner is None:
+ stack.append(g2)
+ break
+
+ # stack: ['Mapped', 'List', 'Address']
+ if not re.match(r"""^["'].*["']$""", stack[-1]):
+ stripchars = "\"' "
+ stack[-1] = ", ".join(
+ f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
+ )
+ # stack: ['Mapped', 'List', '"Address"']
+
+ annotation = "[".join(stack) + ("]" * (len(stack) - 1))
+
return annotation
@@ -2101,12 +2142,18 @@ def _extract_mapped_subtype(
originating_module,
_cleanup_mapped_str_annotation,
)
+ except _CleanupError as ce:
+ raise sa_exc.ArgumentError(
+ f"Could not interpret annotation {raw_annotation}. "
+ "Check that it uses names that are correctly imported at the "
+ "module level. See chained stack trace for more hints."
+ ) from ce
except NameError as ne:
if raiseerr and "Mapped[" in raw_annotation: # type: ignore
raise sa_exc.ArgumentError(
f"Could not interpret annotation {raw_annotation}. "
- "Check that it's not using names that might not be imported "
- "at the module level. See chained stack trace for more hints."
+ "Check that it uses names that are correctly imported at the "
+ "module level. See chained stack trace for more hints."
) from ne
annotated = raw_annotation # type: ignore
diff --git a/lib/sqlalchemy/util/typing.py b/lib/sqlalchemy/util/typing.py
index b5d918a74..e4674a44c 100644
--- a/lib/sqlalchemy/util/typing.py
+++ b/lib/sqlalchemy/util/typing.py
@@ -85,7 +85,7 @@ def de_stringify_annotation(
cls: Type[Any],
annotation: _AnnotationScanType,
originating_module: str,
- str_cleanup_fn: Optional[Callable[[str], str]] = None,
+ str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
) -> Type[Any]:
"""Resolve annotations that may be string based into real objects.
@@ -109,26 +109,65 @@ def de_stringify_annotation(
if isinstance(annotation, str):
if str_cleanup_fn:
- annotation = str_cleanup_fn(annotation)
- base_globals: "Dict[str, Any]" = getattr(
- sys.modules.get(originating_module, None), "__dict__", {}
- )
-
- try:
- annotation = eval(annotation, base_globals, None)
- except NameError as err:
- # breakpoint()
- raise NameError(
- f"Could not de-stringify annotation {annotation}"
- ) from err
+ annotation = str_cleanup_fn(annotation, originating_module)
+
+ annotation = eval_expression(annotation, originating_module)
return annotation # type: ignore
+def eval_expression(expression: str, module_name: str) -> Any:
+ try:
+ base_globals: Dict[str, Any] = sys.modules[module_name].__dict__
+ except KeyError as ke:
+ raise NameError(
+ f"Module {module_name} isn't present in sys.modules; can't "
+ f"evaluate expression {expression}"
+ ) from ke
+ try:
+ annotation = eval(expression, base_globals, None)
+ except Exception as err:
+ raise NameError(
+ f"Could not de-stringify annotation {expression}"
+ ) from err
+ else:
+ return annotation
+
+
+def eval_name_only(name: str, module_name: str) -> Any:
+
+ try:
+ base_globals: Dict[str, Any] = sys.modules[module_name].__dict__
+ except KeyError as ke:
+ raise NameError(
+ f"Module {module_name} isn't present in sys.modules; can't "
+ f"resolve name {name}"
+ ) from ke
+
+ # name only, just look in globals. eval() works perfectly fine here,
+ # however we are seeking to have this be faster, as this occurs for
+ # every Mapper[] keyword, etc. depending on configuration
+ try:
+ return base_globals[name]
+ except KeyError as ke:
+ raise NameError(
+ f"Could not locate name {name} in module {module_name}"
+ ) from ke
+
+
+def resolve_name_to_real_class_name(name: str, module_name: str) -> str:
+ try:
+ obj = eval_name_only(name, module_name)
+ except NameError:
+ return name
+ else:
+ return getattr(obj, "__name__", name)
+
+
def de_stringify_union_elements(
cls: Type[Any],
annotation: _AnnotationScanType,
originating_module: str,
- str_cleanup_fn: Optional[Callable[[str], str]] = None,
+ str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
) -> Type[Any]:
return make_union_type(
*[