summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/sql/schema.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-03-20 16:39:36 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-03-24 16:57:30 -0400
commit6f02d5edd88fe2475629438b0730181a2b00c5fe (patch)
treebbf9e9f3e8a2363659be35d59a7749c7fe35ef7c /lib/sqlalchemy/sql/schema.py
parentc565c470517e1cc70a7f33d1ad3d3256935f1121 (diff)
downloadsqlalchemy-6f02d5edd88fe2475629438b0730181a2b00c5fe.tar.gz
pep484 - SQL internals
non-strict checking for mostly internal or semi-internal code Change-Id: Ib91b47f1a8ccc15e666b94bad1ce78c4ab15b0ec
Diffstat (limited to 'lib/sqlalchemy/sql/schema.py')
-rw-r--r--lib/sqlalchemy/sql/schema.py436
1 files changed, 293 insertions, 143 deletions
diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py
index 5cfb55603..540b62e8a 100644
--- a/lib/sqlalchemy/sql/schema.py
+++ b/lib/sqlalchemy/sql/schema.py
@@ -30,16 +30,22 @@ as components in SQL expressions.
"""
from __future__ import annotations
+from abc import ABC
import collections
+import operator
import typing
from typing import Any
+from typing import Callable
from typing import Dict
from typing import List
from typing import MutableMapping
from typing import Optional
from typing import overload
from typing import Sequence as _typing_Sequence
+from typing import Set
+from typing import Tuple
from typing import Type
+from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
@@ -48,6 +54,7 @@ from . import ddl
from . import roles
from . import type_api
from . import visitors
+from .base import ColumnCollection
from .base import DedupeColumnCollection
from .base import DialectKWArgs
from .base import Executable
@@ -67,12 +74,15 @@ from .. import exc
from .. import inspection
from .. import util
from ..util.typing import Literal
+from ..util.typing import Protocol
+from ..util.typing import TypeGuard
if typing.TYPE_CHECKING:
from .type_api import TypeEngine
from ..engine import Connection
from ..engine import Engine
-
+ from ..engine.interfaces import ExecutionContext
+ from ..engine.mock import MockConnection
_T = TypeVar("_T", bound="Any")
_ServerDefaultType = Union["FetchedValue", str, TextClause, ColumnElement]
_TAB = TypeVar("_TAB", bound="Table")
@@ -102,7 +112,7 @@ NULL_UNSPECIFIED = util.symbol(
)
-def _get_table_key(name, schema):
+def _get_table_key(name: str, schema: Optional[str]) -> str:
if schema is None:
return name
else:
@@ -207,7 +217,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
__visit_name__ = "table"
- constraints = None
+ constraints: Set[Constraint]
"""A collection of all :class:`_schema.Constraint` objects associated with
this :class:`_schema.Table`.
@@ -235,7 +245,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
"""
- indexes = None
+ indexes: Set[Index]
"""A collection of all :class:`_schema.Index` objects associated with this
:class:`_schema.Table`.
@@ -249,6 +259,14 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
("schema", InternalTraversal.dp_string)
]
+ if TYPE_CHECKING:
+
+ @util.non_memoized_property
+ def columns(self) -> ColumnCollection[Column[Any]]:
+ ...
+
+ c: ColumnCollection[Column[Any]]
+
def _gen_cache_key(self, anon_map, bindparams):
if self._annotations:
return (self,) + self._annotations_cache_key
@@ -736,11 +754,12 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
)
@property
- def _sorted_constraints(self):
+ def _sorted_constraints(self) -> List[Constraint]:
"""Return the set of constraints as a list, sorted by creation
order.
"""
+
return sorted(self.constraints, key=lambda c: c._creation_order)
@property
@@ -801,6 +820,8 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
)
self.info = kwargs.pop("info", self.info)
+ exclude_columns: _typing_Sequence[str]
+
if autoload:
if not autoload_replace:
# don't replace columns already present.
@@ -1074,8 +1095,8 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
return metadata.tables[key]
args = []
- for c in self.columns:
- args.append(c._copy(schema=schema))
+ for col in self.columns:
+ args.append(col._copy(schema=schema))
table = Table(
name,
metadata,
@@ -1084,28 +1105,30 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
*args,
**self.kwargs,
)
- for c in self.constraints:
- if isinstance(c, ForeignKeyConstraint):
- referred_schema = c._referred_schema
+ for const in self.constraints:
+ if isinstance(const, ForeignKeyConstraint):
+ referred_schema = const._referred_schema
if referred_schema_fn:
fk_constraint_schema = referred_schema_fn(
- self, schema, c, referred_schema
+ self, schema, const, referred_schema
)
else:
fk_constraint_schema = (
schema if referred_schema == self.schema else None
)
table.append_constraint(
- c._copy(schema=fk_constraint_schema, target_table=table)
+ const._copy(
+ schema=fk_constraint_schema, target_table=table
+ )
)
- elif not c._type_bound:
+ elif not const._type_bound:
# skip unique constraints that would be generated
# by the 'unique' flag on Column
- if c._column_flag:
+ if const._column_flag:
continue
table.append_constraint(
- c._copy(schema=schema, target_table=table)
+ const._copy(schema=schema, target_table=table)
)
for index in self.indexes:
# skip indexes that would be generated
@@ -1734,23 +1757,25 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
name = kwargs.pop("name", None)
type_ = kwargs.pop("type_", None)
- args = list(args)
- if args:
- if isinstance(args[0], str):
+ l_args = list(args)
+ del args
+
+ if l_args:
+ if isinstance(l_args[0], str):
if name is not None:
raise exc.ArgumentError(
"May not pass name positionally and as a keyword."
)
- name = args.pop(0)
- if args:
- coltype = args[0]
+ name = l_args.pop(0)
+ if l_args:
+ coltype = l_args[0]
if hasattr(coltype, "_sqla_type"):
if type_ is not None:
raise exc.ArgumentError(
"May not pass type_ positionally and as a keyword."
)
- type_ = args.pop(0)
+ type_ = l_args.pop(0)
if name is not None:
name = quoted_name(name, kwargs.pop("quote", None))
@@ -1772,7 +1797,9 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
else:
self.nullable = not primary_key
- self.default = kwargs.pop("default", None)
+ default = kwargs.pop("default", None)
+ onupdate = kwargs.pop("onupdate", None)
+
self.server_default = kwargs.pop("server_default", None)
self.server_onupdate = kwargs.pop("server_onupdate", None)
@@ -1784,7 +1811,6 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
self.system = kwargs.pop("system", False)
self.doc = kwargs.pop("doc", None)
- self.onupdate = kwargs.pop("onupdate", None)
self.autoincrement = kwargs.pop("autoincrement", "auto")
self.constraints = set()
self.foreign_keys = set()
@@ -1803,32 +1829,38 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
if isinstance(impl, SchemaEventTarget):
impl._set_parent_with_dispatch(self)
- if self.default is not None:
- if isinstance(self.default, (ColumnDefault, Sequence)):
- args.append(self.default)
- else:
- args.append(ColumnDefault(self.default))
+ if default is not None:
+ if not isinstance(default, (ColumnDefault, Sequence)):
+ default = ColumnDefault(default)
+
+ self.default = default
+ l_args.append(default)
+ else:
+ self.default = None
+
+ if onupdate is not None:
+ if not isinstance(onupdate, (ColumnDefault, Sequence)):
+ onupdate = ColumnDefault(onupdate, for_update=True)
+
+ self.onupdate = onupdate
+ l_args.append(onupdate)
+ else:
+ self.onpudate = None
if self.server_default is not None:
if isinstance(self.server_default, FetchedValue):
- args.append(self.server_default._as_for_update(False))
+ l_args.append(self.server_default._as_for_update(False))
else:
- args.append(DefaultClause(self.server_default))
-
- if self.onupdate is not None:
- if isinstance(self.onupdate, (ColumnDefault, Sequence)):
- args.append(self.onupdate)
- else:
- args.append(ColumnDefault(self.onupdate, for_update=True))
+ l_args.append(DefaultClause(self.server_default))
if self.server_onupdate is not None:
if isinstance(self.server_onupdate, FetchedValue):
- args.append(self.server_onupdate._as_for_update(True))
+ l_args.append(self.server_onupdate._as_for_update(True))
else:
- args.append(
+ l_args.append(
DefaultClause(self.server_onupdate, for_update=True)
)
- self._init_items(*args)
+ self._init_items(*l_args)
util.set_creation_order(self)
@@ -1837,7 +1869,11 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
self._extra_kwargs(**kwargs)
- foreign_keys = None
+ table: Table
+
+ constraints: Set[Constraint]
+
+ foreign_keys: Set[ForeignKey]
"""A collection of all :class:`_schema.ForeignKey` marker objects
associated with this :class:`_schema.Column`.
@@ -1850,7 +1886,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
"""
- index = None
+ index: bool
"""The value of the :paramref:`_schema.Column.index` parameter.
Does not indicate if this :class:`_schema.Column` is actually indexed
@@ -1861,7 +1897,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
:attr:`_schema.Table.indexes`
"""
- unique = None
+ unique: bool
"""The value of the :paramref:`_schema.Column.unique` parameter.
Does not indicate if this :class:`_schema.Column` is actually subject to
@@ -2074,8 +2110,8 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
server_default = self.server_default
server_onupdate = self.server_onupdate
if isinstance(server_default, (Computed, Identity)):
+ args.append(server_default._copy(**kw))
server_default = server_onupdate = None
- args.append(self.server_default._copy(**kw))
type_ = self.type
if isinstance(type_, SchemaEventTarget):
@@ -2203,9 +2239,11 @@ class ForeignKey(DialectKWArgs, SchemaItem):
__visit_name__ = "foreign_key"
+ parent: Column[Any]
+
def __init__(
self,
- column: Union[str, Column, SQLCoreOperations],
+ column: Union[str, Column[Any], SQLCoreOperations[Any]],
_constraint: Optional["ForeignKeyConstraint"] = None,
use_alter: bool = False,
name: Optional[str] = None,
@@ -2296,7 +2334,7 @@ class ForeignKey(DialectKWArgs, SchemaItem):
self._table_column = self._colspec
if not isinstance(
- self._table_column.table, (util.NoneType, TableClause)
+ self._table_column.table, (type(None), TableClause)
):
raise exc.ArgumentError(
"ForeignKey received Column not bound "
@@ -2309,7 +2347,10 @@ class ForeignKey(DialectKWArgs, SchemaItem):
# object passes itself in when creating ForeignKey
# markers.
self.constraint = _constraint
- self.parent = None
+
+ # .parent is not Optional under normal use
+ self.parent = None # type: ignore
+
self.use_alter = use_alter
self.name = name
self.onupdate = onupdate
@@ -2501,19 +2542,18 @@ class ForeignKey(DialectKWArgs, SchemaItem):
return parenttable, tablekey, colname
def _link_to_col_by_colstring(self, parenttable, table, colname):
- if not hasattr(self.constraint, "_referred_table"):
- self.constraint._referred_table = table
- else:
- assert self.constraint._referred_table is table
-
_column = None
if colname is None:
# colname is None in the case that ForeignKey argument
# was specified as table name only, in which case we
# match the column name to the same column on the
# parent.
- key = self.parent
- _column = table.c.get(self.parent.key, None)
+ # this use case wasn't working in later 1.x series
+ # as it had no test coverage; fixed in 2.0
+ parent = self.parent
+ assert parent is not None
+ key = parent.key
+ _column = table.c.get(key, None)
elif self.link_to_name:
key = colname
for c in table.c:
@@ -2533,10 +2573,10 @@ class ForeignKey(DialectKWArgs, SchemaItem):
key,
)
- self._set_target_column(_column)
+ return _column
def _set_target_column(self, column):
- assert isinstance(self.parent.table, Table)
+ assert self.parent is not None
# propagate TypeEngine to parent if it didn't have one
if self.parent.type._isnull:
@@ -2561,11 +2601,6 @@ class ForeignKey(DialectKWArgs, SchemaItem):
If no target column has been established, an exception
is raised.
- .. versionchanged:: 0.9.0
- Foreign key target column resolution now occurs as soon as both
- the ForeignKey object and the remote Column to which it refers
- are both associated with the same MetaData object.
-
"""
if isinstance(self._colspec, str):
@@ -2586,14 +2621,11 @@ class ForeignKey(DialectKWArgs, SchemaItem):
"parent MetaData" % parenttable
)
else:
- raise exc.NoReferencedColumnError(
- "Could not initialize target column for "
- "ForeignKey '%s' on table '%s': "
- "table '%s' has no column named '%s'"
- % (self._colspec, parenttable.name, tablekey, colname),
- tablekey,
- colname,
+ table = parenttable.metadata.tables[tablekey]
+ return self._link_to_col_by_colstring(
+ parenttable, table, colname
)
+
elif hasattr(self._colspec, "__clause_element__"):
_column = self._colspec.__clause_element__()
return _column
@@ -2601,18 +2633,22 @@ class ForeignKey(DialectKWArgs, SchemaItem):
_column = self._colspec
return _column
- def _set_parent(self, column, **kw):
- if self.parent is not None and self.parent is not column:
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ assert isinstance(parent, Column)
+
+ if self.parent is not None and self.parent is not parent:
raise exc.InvalidRequestError(
"This ForeignKey already has a parent !"
)
- self.parent = column
+ self.parent = parent
self.parent.foreign_keys.add(self)
self.parent._on_table_attach(self._set_table)
def _set_remote_table(self, table):
- parenttable, tablekey, colname = self._resolve_col_tokens()
- self._link_to_col_by_colstring(parenttable, table, colname)
+ parenttable, _, colname = self._resolve_col_tokens()
+ _column = self._link_to_col_by_colstring(parenttable, table, colname)
+ self._set_target_column(_column)
+ assert self.constraint is not None
self.constraint._validate_dest_table(table)
def _remove_from_metadata(self, metadata):
@@ -2651,10 +2687,15 @@ class ForeignKey(DialectKWArgs, SchemaItem):
if table_key in parenttable.metadata.tables:
table = parenttable.metadata.tables[table_key]
try:
- self._link_to_col_by_colstring(parenttable, table, colname)
+ _column = self._link_to_col_by_colstring(
+ parenttable, table, colname
+ )
except exc.NoReferencedColumnError:
# this is OK, we'll try later
pass
+ else:
+ self._set_target_column(_column)
+
parenttable.metadata._fk_memos[fk_key].append(self)
elif hasattr(self._colspec, "__clause_element__"):
_column = self._colspec.__clause_element__()
@@ -2664,6 +2705,31 @@ class ForeignKey(DialectKWArgs, SchemaItem):
self._set_target_column(_column)
+if TYPE_CHECKING:
+
+ def default_is_sequence(
+ obj: Optional[DefaultGenerator],
+ ) -> TypeGuard[Sequence]:
+ ...
+
+ def default_is_clause_element(
+ obj: Optional[DefaultGenerator],
+ ) -> TypeGuard[ColumnElementColumnDefault]:
+ ...
+
+ def default_is_scalar(
+ obj: Optional[DefaultGenerator],
+ ) -> TypeGuard[ScalarElementColumnDefault]:
+ ...
+
+else:
+ default_is_sequence = operator.attrgetter("is_sequence")
+
+ default_is_clause_element = operator.attrgetter("is_clause_element")
+
+ default_is_scalar = operator.attrgetter("is_scalar")
+
+
class DefaultGenerator(Executable, SchemaItem):
"""Base class for column *default* values."""
@@ -2671,18 +2737,18 @@ class DefaultGenerator(Executable, SchemaItem):
is_sequence = False
is_server_default = False
+ is_clause_element = False
+ is_callable = False
is_scalar = False
- column = None
+ column: Optional[Column[Any]]
def __init__(self, for_update=False):
self.for_update = for_update
- @util.memoized_property
- def is_callable(self):
- raise NotImplementedError()
-
- def _set_parent(self, column, **kw):
- self.column = column
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ if TYPE_CHECKING:
+ assert isinstance(parent, Column)
+ self.column = parent
if self.for_update:
self.column.onupdate = self
else:
@@ -2696,7 +2762,7 @@ class DefaultGenerator(Executable, SchemaItem):
)
-class ColumnDefault(DefaultGenerator):
+class ColumnDefault(DefaultGenerator, ABC):
"""A plain default value on a column.
This could correspond to a constant, a callable function,
@@ -2718,7 +2784,30 @@ class ColumnDefault(DefaultGenerator):
"""
- def __init__(self, arg, **kwargs):
+ arg: Any
+
+ @overload
+ def __new__(
+ cls, arg: Callable[..., Any], for_update: bool = ...
+ ) -> CallableColumnDefault:
+ ...
+
+ @overload
+ def __new__(
+ cls, arg: ColumnElement[Any], for_update: bool = ...
+ ) -> ColumnElementColumnDefault:
+ ...
+
+ # if I return ScalarElementColumnDefault here, which is what's actually
+ # returned, mypy complains that
+ # overloads overlap w/ incompatible return types.
+ @overload
+ def __new__(cls, arg: object, for_update: bool = ...) -> ColumnDefault:
+ ...
+
+ def __new__(
+ cls, arg: Any = None, for_update: bool = False
+ ) -> ColumnDefault:
"""Construct a new :class:`.ColumnDefault`.
@@ -2744,70 +2833,121 @@ class ColumnDefault(DefaultGenerator):
statement and parameters.
"""
- super(ColumnDefault, self).__init__(**kwargs)
+
if isinstance(arg, FetchedValue):
raise exc.ArgumentError(
"ColumnDefault may not be a server-side default type."
)
- if callable(arg):
- arg = self._maybe_wrap_callable(arg)
+ elif callable(arg):
+ cls = CallableColumnDefault
+ elif isinstance(arg, ClauseElement):
+ cls = ColumnElementColumnDefault
+ elif arg is not None:
+ cls = ScalarElementColumnDefault
+
+ return object.__new__(cls)
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}({self.arg!r})"
+
+
+class ScalarElementColumnDefault(ColumnDefault):
+ """default generator for a fixed scalar Python value
+
+ .. versionadded: 2.0
+
+ """
+
+ is_scalar = True
+
+ def __init__(self, arg: Any, for_update: bool = False):
+ self.for_update = for_update
self.arg = arg
- @util.memoized_property
- def is_callable(self):
- return callable(self.arg)
- @util.memoized_property
- def is_clause_element(self):
- return isinstance(self.arg, ClauseElement)
+# _SQLExprDefault = Union["ColumnElement[Any]", "TextClause", "SelectBase"]
+_SQLExprDefault = Union["ColumnElement[Any]", "TextClause"]
- @util.memoized_property
- def is_scalar(self):
- return (
- not self.is_callable
- and not self.is_clause_element
- and not self.is_sequence
- )
+
+class ColumnElementColumnDefault(ColumnDefault):
+ """default generator for a SQL expression
+
+ .. versionadded:: 2.0
+
+ """
+
+ is_clause_element = True
+
+ arg: _SQLExprDefault
+
+ def __init__(
+ self,
+ arg: _SQLExprDefault,
+ for_update: bool = False,
+ ):
+ self.for_update = for_update
+ self.arg = arg
@util.memoized_property
@util.preload_module("sqlalchemy.sql.sqltypes")
def _arg_is_typed(self):
sqltypes = util.preloaded.sql_sqltypes
- if self.is_clause_element:
- return not isinstance(self.arg.type, sqltypes.NullType)
- else:
- return False
+ return not isinstance(self.arg.type, sqltypes.NullType)
+
+
+class _CallableColumnDefaultProtocol(Protocol):
+ def __call__(self, context: ExecutionContext) -> Any:
+ ...
- def _maybe_wrap_callable(self, fn):
+
+class CallableColumnDefault(ColumnDefault):
+ """default generator for a callable Python function
+
+ .. versionadded:: 2.0
+
+ """
+
+ is_callable = True
+ arg: _CallableColumnDefaultProtocol
+
+ def __init__(
+ self,
+ arg: Union[_CallableColumnDefaultProtocol, Callable[[], Any]],
+ for_update: bool = False,
+ ):
+ self.for_update = for_update
+ self.arg = self._maybe_wrap_callable(arg)
+
+ def _maybe_wrap_callable(
+ self, fn: Union[_CallableColumnDefaultProtocol, Callable[[], Any]]
+ ) -> _CallableColumnDefaultProtocol:
"""Wrap callables that don't accept a context.
This is to allow easy compatibility with default callables
that aren't specific to accepting of a context.
"""
+
try:
argspec = util.get_callable_argspec(fn, no_self=True)
except TypeError:
- return util.wrap_callable(lambda ctx: fn(), fn)
+ return util.wrap_callable(lambda ctx: fn(), fn) # type: ignore
defaulted = argspec[3] is not None and len(argspec[3]) or 0
positionals = len(argspec[0]) - defaulted
if positionals == 0:
- return util.wrap_callable(lambda ctx: fn(), fn)
+ return util.wrap_callable(lambda ctx: fn(), fn) # type: ignore
elif positionals == 1:
- return fn
+ return fn # type: ignore
else:
raise exc.ArgumentError(
"ColumnDefault Python function takes zero or one "
"positional arguments"
)
- def __repr__(self):
- return "ColumnDefault(%r)" % (self.arg,)
-
class IdentityOptions:
"""Defines options for a named database sequence or an identity column.
@@ -2899,6 +3039,8 @@ class Sequence(HasSchemaAttr, IdentityOptions, DefaultGenerator):
is_sequence = True
+ column: Optional[Column[Any]] = None
+
def __init__(
self,
name,
@@ -3087,14 +3229,6 @@ class Sequence(HasSchemaAttr, IdentityOptions, DefaultGenerator):
else:
self.data_type = None
- @util.memoized_property
- def is_callable(self):
- return False
-
- @util.memoized_property
- def is_clause_element(self):
- return False
-
@util.preload_module("sqlalchemy.sql.functions")
def next_value(self):
"""Return a :class:`.next_value` function element
@@ -3235,6 +3369,9 @@ class Constraint(DialectKWArgs, SchemaItem):
__visit_name__ = "constraint"
+ _creation_order: int
+ _column_flag: bool
+
def __init__(
self,
name=None,
@@ -3316,8 +3453,6 @@ class Constraint(DialectKWArgs, SchemaItem):
class ColumnCollectionMixin:
-
- columns = None
"""A :class:`_expression.ColumnCollection` of :class:`_schema.Column`
objects.
@@ -3326,8 +3461,17 @@ class ColumnCollectionMixin:
"""
+ columns: ColumnCollection[Column[Any]]
+
_allow_multiple_tables = False
+ if TYPE_CHECKING:
+
+ def _set_parent_with_dispatch(
+ self, parent: SchemaEventTarget, **kw: Any
+ ) -> None:
+ ...
+
def __init__(self, *columns, **kw):
_autoattach = kw.pop("_autoattach", True)
self._column_flag = kw.pop("_column_flag", False)
@@ -3404,14 +3548,16 @@ class ColumnCollectionMixin:
)
)
- def _col_expressions(self, table):
+ def _col_expressions(self, table: Table) -> List[Column[Any]]:
return [
table.c[col] if isinstance(col, str) else col
for col in self._pending_colargs
]
- def _set_parent(self, table, **kw):
- for col in self._col_expressions(table):
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ if TYPE_CHECKING:
+ assert isinstance(parent, Table)
+ for col in self._col_expressions(parent):
if col is not None:
self.columns.add(col)
@@ -3446,7 +3592,7 @@ class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint):
self, *columns, _autoattach=_autoattach, _column_flag=_column_flag
)
- columns = None
+ columns: DedupeColumnCollection[Column[Any]]
"""A :class:`_expression.ColumnCollection` representing the set of columns
for this constraint.
@@ -3568,7 +3714,7 @@ class CheckConstraint(ColumnCollectionConstraint):
"""
self.sqltext = coercions.expect(roles.DDLExpressionRole, sqltext)
- columns = []
+ columns: List[Column[Any]] = []
visitors.traverse(self.sqltext, {}, {"column": columns.append})
super(CheckConstraint, self).__init__(
@@ -3779,17 +3925,17 @@ class ForeignKeyConstraint(ColumnCollectionConstraint):
assert table is self.parent
self._set_parent_with_dispatch(table)
- def _append_element(self, column, fk):
+ def _append_element(self, column: Column[Any], fk: ForeignKey) -> None:
self.columns.add(column)
self.elements.append(fk)
- columns = None
+ columns: DedupeColumnCollection[Column[Any]]
"""A :class:`_expression.ColumnCollection` representing the set of columns
for this constraint.
"""
- elements = None
+ elements: List[ForeignKey]
"""A sequence of :class:`_schema.ForeignKey` objects.
Each :class:`_schema.ForeignKey`
@@ -4271,7 +4417,7 @@ class Index(DialectKWArgs, ColumnCollectionMixin, SchemaItem):
self._validate_dialect_kwargs(kw)
- self.expressions = []
+ self.expressions: List[ColumnElement[Any]] = []
# will call _set_parent() if table-bound column
# objects are present
ColumnCollectionMixin.__init__(
@@ -4501,11 +4647,13 @@ class MetaData(HasSchemaAttr):
)
if info:
self.info = info
- self._schemas = set()
- self._sequences = {}
- self._fk_memos = collections.defaultdict(list)
+ self._schemas: Set[str] = set()
+ self._sequences: Dict[str, Sequence] = {}
+ self._fk_memos: Dict[
+ Tuple[str, str], List[ForeignKey]
+ ] = collections.defaultdict(list)
- tables: Dict[str, Table]
+ tables: util.FacadeDict[str, Table]
"""A dictionary of :class:`_schema.Table`
objects keyed to their name or "table key".
@@ -4539,7 +4687,7 @@ class MetaData(HasSchemaAttr):
def _remove_table(self, name, schema):
key = _get_table_key(name, schema)
- removed = dict.pop(self.tables, key, None)
+ removed = dict.pop(self.tables, key, None) # type: ignore
if removed is not None:
for fk in removed.foreign_keys:
fk._remove_from_metadata(self)
@@ -4634,12 +4782,12 @@ class MetaData(HasSchemaAttr):
"""
return ddl.sort_tables(
- sorted(self.tables.values(), key=lambda t: t.key)
+ sorted(self.tables.values(), key=lambda t: t.key) # type: ignore
)
def reflect(
self,
- bind: Union["Engine", "Connection"],
+ bind: Union[Engine, Connection],
schema: Optional[str] = None,
views: bool = False,
only: Optional[_typing_Sequence[str]] = None,
@@ -4647,7 +4795,7 @@ class MetaData(HasSchemaAttr):
autoload_replace: bool = True,
resolve_fks: bool = True,
**dialect_kwargs: Any,
- ):
+ ) -> None:
r"""Load all available table definitions from the database.
Automatically creates ``Table`` entries in this ``MetaData`` for any
@@ -4748,12 +4896,14 @@ class MetaData(HasSchemaAttr):
if schema is not None:
reflect_opts["schema"] = schema
- available = util.OrderedSet(insp.get_table_names(schema))
+ available: util.OrderedSet[str] = util.OrderedSet(
+ insp.get_table_names(schema)
+ )
if views:
available.update(insp.get_view_names(schema))
if schema is not None:
- available_w_schema = util.OrderedSet(
+ available_w_schema: util.OrderedSet[str] = util.OrderedSet(
["%s.%s" % (schema, name) for name in available]
)
else:
@@ -4796,10 +4946,10 @@ class MetaData(HasSchemaAttr):
def create_all(
self,
- bind: Union["Engine", "Connection"],
+ bind: Union[Engine, Connection, MockConnection],
tables: Optional[_typing_Sequence[Table]] = None,
checkfirst: bool = True,
- ):
+ ) -> None:
"""Create all tables stored in this metadata.
Conditional by default, will not attempt to recreate tables already
@@ -4824,10 +4974,10 @@ class MetaData(HasSchemaAttr):
def drop_all(
self,
- bind: Union["Engine", "Connection"],
+ bind: Union[Engine, Connection, MockConnection],
tables: Optional[_typing_Sequence[Table]] = None,
checkfirst: bool = True,
- ):
+ ) -> None:
"""Drop all tables stored in this metadata.
Conditional by default, will not attempt to drop tables not present in