diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-04-13 09:45:29 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-04-15 10:29:23 -0400 |
commit | c932123bacad9bf047d160b85e3f95d396c513ae (patch) | |
tree | 3f84221c467ff8fba468d7ca78dc4b0c158d8970 /lib/sqlalchemy/sql/schema.py | |
parent | 0bfb620009f668e97ad3c2c25a564ca36428b9ae (diff) | |
download | sqlalchemy-c932123bacad9bf047d160b85e3f95d396c513ae.tar.gz |
pep484: schema API
implement strict typing for schema.py
this module has lots of public API, lots of old decisions
and very hard to follow construction sequences in many
cases, and is also where we get a lot of new feature requests,
so strict typing should help keep things clean.
among improvements here, fixed the pool .info getters
and also figured out how to get ColumnCollection and
related to be covariant so that we may set them up
as returning Column or ColumnClause without any conflicts.
DDL was affected, noting that superclasses of DDLElement
(_DDLCompiles, added recently) can now be passed into
"ddl_if" callables; reorganized ddl into ExecutableDDLElement
as a new name for DDLElement and _DDLCompiles renamed to
BaseDDLElement.
setting up strict also located an API use case that
is completely broken, which is connection.execute(some_default)
returns a scalar value. This case has been deprecated
and new paths have been set up so that connection.scalar()
may be used. This likely wasn't possible in previous
versions because scalar() would assume a CursorResult.
The scalar() change also impacts Session as we have explicit
support (since someone had reported it as a regression)
for session.execute(Sequence()) to work. They will get the
same deprecation message (which omits the word "Connection",
just uses ".execute()" and ".scalar()") and they can then
use Session.scalar() as well. Getting this to type
correctly while still supporting ORM use cases required
some refactoring, and I also set up a keyword only delimeter
for Session.execute() and related as execution_options /
bind_arguments should always be keyword only, applied these
changes to AsyncSession as well.
Additionally simpify Table __init__ now that we are Python
3 only, we can have positional plus explicit kwargs finally.
Simplify Column.__init__ as well again taking advantage
of kw only arguments.
Fill in most/all __init__ methods in sqltypes.py as
the constructor for types is most of the API. should
likely do this for dialect-specific types as well.
Apply _InfoType for all info attributes as should have been
done originally and update descriptor decorators.
Change-Id: I3f9f8ff3f1c8858471ff4545ac83d68c88107527
Diffstat (limited to 'lib/sqlalchemy/sql/schema.py')
-rw-r--r-- | lib/sqlalchemy/sql/schema.py | 1228 |
1 files changed, 769 insertions, 459 deletions
diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py index c9b67caca..92b9cc62c 100644 --- a/lib/sqlalchemy/sql/schema.py +++ b/lib/sqlalchemy/sql/schema.py @@ -32,21 +32,22 @@ from __future__ import annotations from abc import ABC import collections +from enum import Enum import operator import typing from typing import Any from typing import Callable from typing import cast from typing import Dict +from typing import Iterable from typing import Iterator from typing import List -from typing import MutableMapping +from typing import NoReturn from typing import Optional from typing import overload from typing import Sequence as _typing_Sequence from typing import Set from typing import Tuple -from typing import Type from typing import TYPE_CHECKING from typing import TypeVar from typing import Union @@ -65,7 +66,6 @@ from .elements import ClauseElement from .elements import ColumnClause from .elements import ColumnElement from .elements import quoted_name -from .elements import SQLCoreOperations from .elements import TextClause from .selectable import TableClause from .type_api import to_instance @@ -75,53 +75,91 @@ from .. import event from .. import exc from .. import inspection from .. import util +from ..util.typing import Final from ..util.typing import Literal from ..util.typing import Protocol +from ..util.typing import Self from ..util.typing import TypeGuard if typing.TYPE_CHECKING: + from ._typing import _DDLColumnArgument + from ._typing import _InfoType + from ._typing import _TextCoercedExpressionArgument + from ._typing import _TypeEngineArgument + from .base import ColumnCollection + from .base import DedupeColumnCollection from .base import ReadOnlyColumnCollection + from .compiler import DDLCompiler + from .elements import BindParameter + from .functions import Function from .type_api import TypeEngine + from .visitors import _TraverseInternalsType + from .visitors import anon_map from ..engine import Connection from ..engine import Engine + from ..engine.cursor import CursorResult + from ..engine.interfaces import _CoreMultiExecuteParams + from ..engine.interfaces import _CoreSingleExecuteParams + from ..engine.interfaces import _ExecuteOptionsParameter from ..engine.interfaces import ExecutionContext from ..engine.mock import MockConnection + from ..sql.selectable import FromClause + _T = TypeVar("_T", bound="Any") +_SI = TypeVar("_SI", bound="SchemaItem") _ServerDefaultType = Union["FetchedValue", str, TextClause, ColumnElement] _TAB = TypeVar("_TAB", bound="Table") -RETAIN_SCHEMA = util.symbol( - "retain_schema" + +_CreateDropBind = Union["Engine", "Connection", "MockConnection"] + + +class SchemaConst(Enum): + + RETAIN_SCHEMA = 1 """Symbol indicating that a :class:`_schema.Table`, :class:`.Sequence` or in some cases a :class:`_schema.ForeignKey` object, in situations where the object is being copied for a :meth:`.Table.to_metadata` operation, should retain the schema name that it already has. """ -) -BLANK_SCHEMA = util.symbol( - "blank_schema", - """Symbol indicating that a :class:`_schema.Table`, :class:`.Sequence` - or in some cases a :class:`_schema.ForeignKey` object + BLANK_SCHEMA = 2 + """Symbol indicating that a :class:`_schema.Table` or :class:`.Sequence` should have 'None' for its schema, even if the parent :class:`_schema.MetaData` has specified a schema. + .. seealso:: + + :paramref:`_schema.MetaData.schema` + + :paramref:`_schema.Table.schema` + + :paramref:`.Sequence.schema` + .. versionadded:: 1.0.14 - """, -) + """ -NULL_UNSPECIFIED = util.symbol( - "NULL_UNSPECIFIED", + NULL_UNSPECIFIED = 3 """Symbol indicating the "nullable" keyword was not passed to a Column. Normally we would expect None to be acceptable for this but some backends such as that of SQL Server place special signficance on a "nullability" value of None. - """, -) + """ + + +RETAIN_SCHEMA: Final[ + Literal[SchemaConst.RETAIN_SCHEMA] +] = SchemaConst.RETAIN_SCHEMA +BLANK_SCHEMA: Final[ + Literal[SchemaConst.BLANK_SCHEMA] +] = SchemaConst.BLANK_SCHEMA +NULL_UNSPECIFIED: Final[ + Literal[SchemaConst.NULL_UNSPECIFIED] +] = SchemaConst.NULL_UNSPECIFIED def _get_table_key(name: str, schema: Optional[str]) -> str: @@ -170,7 +208,7 @@ class SchemaItem(SchemaEventTarget, visitors.Visitable): create_drop_stringify_dialect = "default" - def _init_items(self, *args, **kw): + def _init_items(self, *args: SchemaItem, **kw: Any) -> None: """Initialize the list of child items for this SchemaItem.""" for item in args: if item is not None: @@ -184,11 +222,11 @@ class SchemaItem(SchemaEventTarget, visitors.Visitable): else: spwd(self, **kw) - def __repr__(self): + def __repr__(self) -> str: return util.generic_repr(self, omit_kwarg=["info"]) @util.memoized_property - def info(self): + def info(self) -> _InfoType: """Info dictionary associated with the object, allowing user-defined data to be associated with this :class:`.SchemaItem`. @@ -199,7 +237,7 @@ class SchemaItem(SchemaEventTarget, visitors.Visitable): """ return {} - def _schema_item_copy(self, schema_item): + def _schema_item_copy(self, schema_item: _SI) -> _SI: if "info" in self.__dict__: schema_item.info = self.info.copy() schema_item.dispatch._update(self.dispatch) @@ -235,9 +273,9 @@ class HasConditionalDDL: r"""apply a conditional DDL rule to this schema item. These rules work in a similar manner to the - :meth:`.DDLElement.execute_if` callable, with the added feature that - the criteria may be checked within the DDL compilation phase for a - construct such as :class:`.CreateTable`. + :meth:`.ExecutableDDLElement.execute_if` callable, with the added + feature that the criteria may be checked within the DDL compilation + phase for a construct such as :class:`.CreateTable`. :meth:`.HasConditionalDDL.ddl_if` currently applies towards the :class:`.Index` construct as well as all :class:`.Constraint` constructs. @@ -246,7 +284,8 @@ class HasConditionalDDL: to indicate multiple dialect types. :param callable\_: a callable that is constructed using the same form - as that described in :paramref:`.DDLElement.execute_if.callable_`. + as that described in + :paramref:`.ExecutableDDLElement.execute_if.callable_`. :param state: any arbitrary object that will be passed to the callable, if present. @@ -306,6 +345,8 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): def foreign_keys(self) -> Set[ForeignKey]: ... + _columns: DedupeColumnCollection[Column[Any]] + constraints: Set[Constraint] """A collection of all :class:`_schema.Constraint` objects associated with this :class:`_schema.Table`. @@ -344,25 +385,30 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): """ - _traverse_internals = TableClause._traverse_internals + [ - ("schema", InternalTraversal.dp_string) - ] + _traverse_internals: _TraverseInternalsType = ( + TableClause._traverse_internals + + [("schema", InternalTraversal.dp_string)] + ) if TYPE_CHECKING: - # we are upgrading .c and .columns to return Column, not - # ColumnClause. mypy typically sees this as incompatible because - # the contract of TableClause is that we can put a ColumnClause - # into this collection. does not recognize its immutability - # for the moment. + + @util.ro_non_memoized_property + def columns(self) -> ReadOnlyColumnCollection[str, Column[Any]]: + ... + @util.ro_non_memoized_property - def columns(self) -> ReadOnlyColumnCollection[str, Column[Any]]: # type: ignore # noqa: E501 + def exported_columns( + self, + ) -> ReadOnlyColumnCollection[str, Column[Any]]: ... @util.ro_non_memoized_property - def c(self) -> ReadOnlyColumnCollection[str, Column[Any]]: # type: ignore # noqa: E501 + def c(self) -> ReadOnlyColumnCollection[str, Column[Any]]: ... - def _gen_cache_key(self, anon_map, bindparams): + def _gen_cache_key( + self, anon_map: anon_map, bindparams: List[BindParameter[Any]] + ) -> Tuple[Any, ...]: if self._annotations: return (self,) + self._annotations_cache_key else: @@ -382,7 +428,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): return cls._new(*args, **kw) @classmethod - def _new(cls, *args, **kw): + def _new(cls, *args: Any, **kw: Any) -> Any: if not args and not kw: # python3k pickle seems to call this return object.__new__(cls) @@ -429,7 +475,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): table.dispatch.before_parent_attach(table, metadata) metadata._add_table(name, schema, table) try: - table._init(name, metadata, *args, **kw) + table.__init__(name, metadata, *args, _no_init=False, **kw) table.dispatch.after_parent_attach(table, metadata) return table except Exception: @@ -439,10 +485,31 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): def __init__( self, name: str, - metadata: "MetaData", + metadata: MetaData, *args: SchemaItem, + schema: Optional[Union[str, Literal[SchemaConst.BLANK_SCHEMA]]] = None, + quote: Optional[bool] = None, + quote_schema: Optional[bool] = None, + autoload_with: Optional[Union[Engine, Connection]] = None, + autoload_replace: bool = True, + keep_existing: bool = False, + extend_existing: bool = False, + resolve_fks: bool = True, + include_columns: Optional[Iterable[str]] = None, + implicit_returning: bool = True, + comment: Optional[str] = None, + info: Optional[Dict[Any, Any]] = None, + listeners: Optional[ + _typing_Sequence[Tuple[str, Callable[..., Any]]] + ] = None, + prefixes: Optional[_typing_Sequence[str]] = None, + # used internally in the metadata.reflect() process + _extend_on: Optional[Set[Table]] = None, + # used by __new__ to bypass __init__ + _no_init: bool = True, + # dialect-specific keyword args **kw: Any, - ): + ) -> None: r"""Constructor for :class:`_schema.Table`. @@ -731,24 +798,22 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): :ref:`dialect_toplevel` for detail on documented arguments. """ # noqa: E501 + if _no_init: + # don't run __init__ from __new__ by default; + # __new__ has a specific place that __init__ is called + return - # __init__ is overridden to prevent __new__ from - # calling the superclass constructor. - - def _init(self, name, metadata, *args, **kwargs): - super(Table, self).__init__( - quoted_name(name, kwargs.pop("quote", None)) - ) + super().__init__(quoted_name(name, quote)) self.metadata = metadata - self.schema = kwargs.pop("schema", None) - if self.schema is None: + if schema is None: self.schema = metadata.schema - elif self.schema is BLANK_SCHEMA: + elif schema is BLANK_SCHEMA: self.schema = None else: - quote_schema = kwargs.pop("quote_schema", None) - self.schema = quoted_name(self.schema, quote_schema) + quote_schema = quote_schema + assert isinstance(schema, str) + self.schema = quoted_name(schema, quote_schema) self.indexes = set() self.constraints = set() @@ -756,42 +821,31 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): _implicit_generated=True )._set_parent_with_dispatch(self) self.foreign_keys = set() # type: ignore - self._extra_dependencies = set() + self._extra_dependencies: Set[Table] = set() if self.schema is not None: self.fullname = "%s.%s" % (self.schema, self.name) else: self.fullname = self.name - autoload_with = kwargs.pop("autoload_with", None) - autoload = autoload_with is not None - # this argument is only used with _init_existing() - kwargs.pop("autoload_replace", True) - keep_existing = kwargs.pop("keep_existing", False) - extend_existing = kwargs.pop("extend_existing", False) - _extend_on = kwargs.pop("_extend_on", None) - - resolve_fks = kwargs.pop("resolve_fks", True) - include_columns = kwargs.pop("include_columns", None) + self.implicit_returning = implicit_returning - self.implicit_returning = kwargs.pop("implicit_returning", True) + self.comment = comment - self.comment = kwargs.pop("comment", None) + if info is not None: + self.info = info - if "info" in kwargs: - self.info = kwargs.pop("info") - if "listeners" in kwargs: - listeners = kwargs.pop("listeners") + if listeners is not None: for evt, fn in listeners: event.listen(self, evt, fn) - self._prefixes = kwargs.pop("prefixes", None) or [] + self._prefixes = prefixes if prefixes else [] - self._extra_kwargs(**kwargs) + self._extra_kwargs(**kw) # load column definitions from the database if 'autoload' is defined # we do it after the table is in the singleton dictionary to support # circular foreign keys - if autoload: + if autoload_with is not None: self._autoload( metadata, autoload_with, @@ -805,18 +859,20 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): self._init_items( *args, - allow_replacements=extend_existing or keep_existing or autoload, + allow_replacements=extend_existing + or keep_existing + or autoload_with, ) def _autoload( self, - metadata, - autoload_with, - include_columns, - exclude_columns=(), - resolve_fks=True, - _extend_on=None, - ): + metadata: MetaData, + autoload_with: Union[Engine, Connection], + include_columns: Optional[Iterable[str]], + exclude_columns: Iterable[str] = (), + resolve_fks: bool = True, + _extend_on: Optional[Set[Table]] = None, + ) -> None: insp = inspection.inspect(autoload_with) with insp._inspection_context() as conn_insp: conn_insp.reflect_table( @@ -837,7 +893,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): return sorted(self.constraints, key=lambda c: c._creation_order) @property - def foreign_key_constraints(self): + def foreign_key_constraints(self) -> Set[ForeignKeyConstraint]: """:class:`_schema.ForeignKeyConstraint` objects referred to by this :class:`_schema.Table`. @@ -855,9 +911,13 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): :attr:`_schema.Table.indexes` """ - return set(fkc.constraint for fkc in self.foreign_keys) + return set( + fkc.constraint + for fkc in self.foreign_keys + if fkc.constraint is not None + ) - def _init_existing(self, *args, **kwargs): + def _init_existing(self, *args: Any, **kwargs: Any) -> None: autoload_with = kwargs.pop("autoload_with", None) autoload = kwargs.pop("autoload", autoload_with is not None) autoload_replace = kwargs.pop("autoload_replace", True) @@ -916,13 +976,13 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): self._extra_kwargs(**kwargs) self._init_items(*args) - def _extra_kwargs(self, **kwargs): + def _extra_kwargs(self, **kwargs: Any) -> None: self._validate_dialect_kwargs(kwargs) - def _init_collections(self): + def _init_collections(self) -> None: pass - def _reset_exported(self): + def _reset_exported(self) -> None: pass @util.ro_non_memoized_property @@ -930,7 +990,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): return self.primary_key._autoincrement_column @property - def key(self): + def key(self) -> str: """Return the 'key' for this :class:`_schema.Table`. This value is used as the dictionary key within the @@ -943,7 +1003,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): """ return _get_table_key(self.name, self.schema) - def __repr__(self): + def __repr__(self) -> str: return "Table(%s)" % ", ".join( [repr(self.name)] + [repr(self.metadata)] @@ -951,10 +1011,10 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): + ["%s=%s" % (k, repr(getattr(self, k))) for k in ["schema"]] ) - def __str__(self): + def __str__(self) -> str: return _get_table_key(self.description, self.schema) - def add_is_dependent_on(self, table): + def add_is_dependent_on(self, table: Table) -> None: """Add a 'dependency' for this Table. This is another Table object which must be created @@ -968,7 +1028,9 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): """ self._extra_dependencies.add(table) - def append_column(self, column, replace_existing=False): + def append_column( + self, column: ColumnClause[Any], replace_existing: bool = False + ) -> None: """Append a :class:`_schema.Column` to this :class:`_schema.Table`. The "key" of the newly added :class:`_schema.Column`, i.e. the @@ -998,7 +1060,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): self, allow_replacements=replace_existing ) - def append_constraint(self, constraint): + def append_constraint(self, constraint: Union[Index, Constraint]) -> None: """Append a :class:`_schema.Constraint` to this :class:`_schema.Table`. @@ -1019,11 +1081,13 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): constraint._set_parent_with_dispatch(self) - def _set_parent(self, metadata, **kw): + def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: + metadata = parent + assert isinstance(metadata, MetaData) metadata._add_table(self.name, self.schema, self) self.metadata = metadata - def create(self, bind, checkfirst=False): + def create(self, bind: _CreateDropBind, checkfirst: bool = False) -> None: """Issue a ``CREATE`` statement for this :class:`_schema.Table`, using the given :class:`.Connection` or :class:`.Engine` @@ -1037,7 +1101,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): bind._run_ddl_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst) - def drop(self, bind, checkfirst=False): + def drop(self, bind: _CreateDropBind, checkfirst: bool = False) -> None: """Issue a ``DROP`` statement for this :class:`_schema.Table`, using the given :class:`.Connection` or :class:`.Engine` for connectivity. @@ -1056,11 +1120,16 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): ) def tometadata( self, - metadata, - schema=RETAIN_SCHEMA, - referred_schema_fn=None, - name=None, - ): + metadata: MetaData, + schema: Union[str, Literal[SchemaConst.RETAIN_SCHEMA]] = RETAIN_SCHEMA, + referred_schema_fn: Optional[ + Callable[ + [Table, Optional[str], ForeignKeyConstraint, Optional[str]], + Optional[str], + ] + ] = None, + name: Optional[str] = None, + ) -> Table: """Return a copy of this :class:`_schema.Table` associated with a different :class:`_schema.MetaData`. @@ -1077,11 +1146,16 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): def to_metadata( self, - metadata, - schema=RETAIN_SCHEMA, - referred_schema_fn=None, - name=None, - ): + metadata: MetaData, + schema: Union[str, Literal[SchemaConst.RETAIN_SCHEMA]] = RETAIN_SCHEMA, + referred_schema_fn: Optional[ + Callable[ + [Table, Optional[str], ForeignKeyConstraint, Optional[str]], + Optional[str], + ] + ] = None, + name: Optional[str] = None, + ) -> Table: """Return a copy of this :class:`_schema.Table` associated with a different :class:`_schema.MetaData`. @@ -1163,11 +1237,16 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): """ if name is None: name = self.name + + actual_schema: Optional[str] + if schema is RETAIN_SCHEMA: - schema = self.schema + actual_schema = self.schema elif schema is None: - schema = metadata.schema - key = _get_table_key(name, schema) + actual_schema = metadata.schema + else: + actual_schema = schema # type: ignore + key = _get_table_key(name, actual_schema) if key in metadata.tables: util.warn( "Table '%s' already exists within the given " @@ -1177,11 +1256,11 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): args = [] for col in self.columns: - args.append(col._copy(schema=schema)) + args.append(col._copy(schema=actual_schema)) table = Table( name, metadata, - schema=schema, + schema=actual_schema, comment=self.comment, *args, **self.kwargs, @@ -1191,11 +1270,13 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): referred_schema = const._referred_schema if referred_schema_fn: fk_constraint_schema = referred_schema_fn( - self, schema, const, referred_schema + self, actual_schema, const, referred_schema ) else: fk_constraint_schema = ( - schema if referred_schema == self.schema else None + actual_schema + if referred_schema == self.schema + else None ) table.append_constraint( const._copy( @@ -1209,7 +1290,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): continue table.append_constraint( - const._copy(schema=schema, target_table=table) + const._copy(schema=actual_schema, target_table=table) ) for index in self.indexes: # skip indexes that would be generated @@ -1221,7 +1302,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause): unique=index.unique, *[ _copy_expression(expr, self, table) - for expr in index.expressions + for expr in index._table_bound_expressions ], _table=table, **index.kwargs, @@ -1239,101 +1320,137 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): @overload def __init__( - self: "Column[None]", - __name: str, + self, *args: SchemaEventTarget, - autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = ..., - default: Optional[Any] = ..., - doc: Optional[str] = ..., - key: Optional[str] = ..., - index: Optional[bool] = ..., - info: MutableMapping[Any, Any] = ..., - nullable: bool = ..., - onupdate: Optional[Any] = ..., - primary_key: bool = ..., - server_default: Optional[_ServerDefaultType] = ..., - server_onupdate: Optional["FetchedValue"] = ..., - quote: Optional[bool] = ..., - unique: Optional[bool] = ..., - system: bool = ..., - comment: Optional[str] = ..., - **kwargs: Any, - ) -> None: + autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = "auto", + default: Optional[Any] = None, + doc: Optional[str] = None, + key: Optional[str] = None, + index: Optional[bool] = None, + unique: Optional[bool] = None, + info: Optional[_InfoType] = None, + nullable: Optional[ + Union[bool, Literal[SchemaConst.NULL_UNSPECIFIED]] + ] = NULL_UNSPECIFIED, + onupdate: Optional[Any] = None, + primary_key: bool = False, + server_default: Optional[_ServerDefaultType] = None, + server_onupdate: Optional[FetchedValue] = None, + quote: Optional[bool] = None, + system: bool = False, + comment: Optional[str] = None, + _proxies: Optional[Any] = None, + **dialect_kwargs: Any, + ): ... @overload def __init__( - self: "Column[None]", + self, + __name: str, *args: SchemaEventTarget, - autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = ..., - default: Optional[Any] = ..., - doc: Optional[str] = ..., - key: Optional[str] = ..., - index: Optional[bool] = ..., - info: MutableMapping[Any, Any] = ..., - nullable: bool = ..., - onupdate: Optional[Any] = ..., - primary_key: bool = ..., - server_default: Optional[_ServerDefaultType] = ..., - server_onupdate: Optional["FetchedValue"] = ..., - quote: Optional[bool] = ..., - unique: Optional[bool] = ..., - system: bool = ..., - comment: Optional[str] = ..., - **kwargs: Any, - ) -> None: + autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = "auto", + default: Optional[Any] = None, + doc: Optional[str] = None, + key: Optional[str] = None, + index: Optional[bool] = None, + unique: Optional[bool] = None, + info: Optional[_InfoType] = None, + nullable: Optional[ + Union[bool, Literal[SchemaConst.NULL_UNSPECIFIED]] + ] = NULL_UNSPECIFIED, + onupdate: Optional[Any] = None, + primary_key: bool = False, + server_default: Optional[_ServerDefaultType] = None, + server_onupdate: Optional[FetchedValue] = None, + quote: Optional[bool] = None, + system: bool = False, + comment: Optional[str] = None, + _proxies: Optional[Any] = None, + **dialect_kwargs: Any, + ): ... @overload def __init__( self, - __name: str, - __type: Union[Type["TypeEngine[_T]"], "TypeEngine[_T]"], + __type: _TypeEngineArgument[_T], *args: SchemaEventTarget, - autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = ..., - default: Optional[Any] = ..., - doc: Optional[str] = ..., - key: Optional[str] = ..., - index: Optional[bool] = ..., - info: MutableMapping[Any, Any] = ..., - nullable: bool = ..., - onupdate: Optional[Any] = ..., - primary_key: bool = ..., - server_default: Optional[_ServerDefaultType] = ..., - server_onupdate: Optional["FetchedValue"] = ..., - quote: Optional[bool] = ..., - unique: Optional[bool] = ..., - system: bool = ..., - comment: Optional[str] = ..., - **kwargs: Any, - ) -> None: + autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = "auto", + default: Optional[Any] = None, + doc: Optional[str] = None, + key: Optional[str] = None, + index: Optional[bool] = None, + unique: Optional[bool] = None, + info: Optional[_InfoType] = None, + nullable: Optional[ + Union[bool, Literal[SchemaConst.NULL_UNSPECIFIED]] + ] = NULL_UNSPECIFIED, + onupdate: Optional[Any] = None, + primary_key: bool = False, + server_default: Optional[_ServerDefaultType] = None, + server_onupdate: Optional[FetchedValue] = None, + quote: Optional[bool] = None, + system: bool = False, + comment: Optional[str] = None, + _proxies: Optional[Any] = None, + **dialect_kwargs: Any, + ): ... @overload def __init__( self, - __type: Union[Type["TypeEngine[_T]"], "TypeEngine[_T]"], + __name: str, + __type: _TypeEngineArgument[_T], *args: SchemaEventTarget, - autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = ..., - default: Optional[Any] = ..., - doc: Optional[str] = ..., - key: Optional[str] = ..., - index: Optional[bool] = ..., - info: MutableMapping[Any, Any] = ..., - nullable: bool = ..., - onupdate: Optional[Any] = ..., - primary_key: bool = ..., - server_default: Optional[_ServerDefaultType] = ..., - server_onupdate: Optional["FetchedValue"] = ..., - quote: Optional[bool] = ..., - unique: Optional[bool] = ..., - system: bool = ..., - comment: Optional[str] = ..., - **kwargs: Any, - ) -> None: + autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = "auto", + default: Optional[Any] = None, + doc: Optional[str] = None, + key: Optional[str] = None, + index: Optional[bool] = None, + unique: Optional[bool] = None, + info: Optional[_InfoType] = None, + nullable: Optional[ + Union[bool, Literal[SchemaConst.NULL_UNSPECIFIED]] + ] = NULL_UNSPECIFIED, + onupdate: Optional[Any] = None, + primary_key: bool = False, + server_default: Optional[_ServerDefaultType] = None, + server_onupdate: Optional[FetchedValue] = None, + quote: Optional[bool] = None, + system: bool = False, + comment: Optional[str] = None, + _proxies: Optional[Any] = None, + **dialect_kwargs: Any, + ): ... - def __init__(self, *args: Any, **kwargs: Any): + def __init__( + self, + *args: Union[str, _TypeEngineArgument[_T], SchemaEventTarget], + name: Optional[str] = None, + type_: Optional[_TypeEngineArgument[_T]] = None, + autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = "auto", + default: Optional[Any] = None, + doc: Optional[str] = None, + key: Optional[str] = None, + index: Optional[bool] = None, + unique: Optional[bool] = None, + info: Optional[_InfoType] = None, + nullable: Optional[ + Union[bool, Literal[SchemaConst.NULL_UNSPECIFIED]] + ] = NULL_UNSPECIFIED, + onupdate: Optional[Any] = None, + primary_key: bool = False, + server_default: Optional[_ServerDefaultType] = None, + server_onupdate: Optional[FetchedValue] = None, + quote: Optional[bool] = None, + system: bool = False, + comment: Optional[str] = None, + _proxies: Optional[Any] = None, + **dialect_kwargs: Any, + ): r""" Construct a new ``Column`` object. @@ -1836,8 +1953,6 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): """ # noqa: E501, RST201, RST202 - name = kwargs.pop("name", None) - type_ = kwargs.pop("type_", None) l_args = list(args) del args @@ -1847,7 +1962,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): raise exc.ArgumentError( "May not pass name positionally and as a keyword." ) - name = l_args.pop(0) + name = l_args.pop(0) # type: ignore if l_args: coltype = l_args[0] @@ -1856,52 +1971,49 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): raise exc.ArgumentError( "May not pass type_ positionally and as a keyword." ) - type_ = l_args.pop(0) + type_ = l_args.pop(0) # type: ignore if name is not None: - name = quoted_name(name, kwargs.pop("quote", None)) - elif "quote" in kwargs: + name = quoted_name(name, quote) + elif quote is not None: raise exc.ArgumentError( "Explicit 'name' is required when " "sending 'quote' argument" ) - super(Column, self).__init__(name, type_) - self.key = kwargs.pop("key", name) - self.primary_key = primary_key = kwargs.pop("primary_key", False) + # name = None is expected to be an interim state + # note this use case is legacy now that ORM declarative has a + # dedicated "column" construct local to the ORM + super(Column, self).__init__(name, type_) # type: ignore - self._user_defined_nullable = udn = kwargs.pop( - "nullable", NULL_UNSPECIFIED - ) + self.key = key if key is not None else name # type: ignore + self.primary_key = primary_key + + self._user_defined_nullable = udn = nullable if udn is not NULL_UNSPECIFIED: self.nullable = udn else: self.nullable = not primary_key - default = kwargs.pop("default", None) - onupdate = kwargs.pop("onupdate", None) - - self.server_default = kwargs.pop("server_default", None) - self.server_onupdate = kwargs.pop("server_onupdate", None) - # these default to None because .index and .unique is *not* # an informational flag about Column - there can still be an # Index or UniqueConstraint referring to this Column. - self.index = kwargs.pop("index", None) - self.unique = kwargs.pop("unique", None) + self.index = index + self.unique = unique - self.system = kwargs.pop("system", False) - self.doc = kwargs.pop("doc", None) - self.autoincrement = kwargs.pop("autoincrement", "auto") + self.system = system + self.doc = doc + self.autoincrement = autoincrement self.constraints = set() self.foreign_keys = set() - self.comment = kwargs.pop("comment", None) + self.comment = comment self.computed = None self.identity = None # check if this Column is proxying another column - if "_proxies" in kwargs: - self._proxies = kwargs.pop("_proxies") + + if _proxies is not None: + self._proxies = _proxies else: # otherwise, add DDL-related events if isinstance(self.type, SchemaEventTarget): @@ -1928,6 +2040,9 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): else: self.onpudate = None + self.server_default = server_default + self.server_onupdate = server_onupdate + if self.server_default is not None: if isinstance(self.server_default, FetchedValue): l_args.append(self.server_default._as_for_update(False)) @@ -1941,14 +2056,14 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): l_args.append( DefaultClause(self.server_onupdate, for_update=True) ) - self._init_items(*l_args) + self._init_items(*cast(_typing_Sequence[SchemaItem], l_args)) util.set_creation_order(self) - if "info" in kwargs: - self.info = kwargs.pop("info") + if info is not None: + self.info = info - self._extra_kwargs(**kwargs) + self._extra_kwargs(**dialect_kwargs) table: Table @@ -1967,7 +2082,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): """ - index: bool + index: Optional[bool] """The value of the :paramref:`_schema.Column.index` parameter. Does not indicate if this :class:`_schema.Column` is actually indexed @@ -1978,7 +2093,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): :attr:`_schema.Table.indexes` """ - unique: bool + unique: Optional[bool] """The value of the :paramref:`_schema.Column.unique` parameter. Does not indicate if this :class:`_schema.Column` is actually subject to @@ -1993,10 +2108,14 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): """ - def _extra_kwargs(self, **kwargs): + computed: Optional[Computed] + + identity: Optional[Identity] + + def _extra_kwargs(self, **kwargs: Any) -> None: self._validate_dialect_kwargs(kwargs) - def __str__(self): + def __str__(self) -> str: if self.name is None: return "(no name)" elif self.table is not None: @@ -2007,7 +2126,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): else: return self.description - def references(self, column): + def references(self, column: Column[Any]) -> bool: """Return True if this Column references the given column via foreign key.""" @@ -2017,10 +2136,10 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): else: return False - def append_foreign_key(self, fk): + def append_foreign_key(self, fk: ForeignKey) -> None: fk._set_parent_with_dispatch(self) - def __repr__(self): + def __repr__(self) -> str: kwarg = [] if self.key != self.name: kwarg.append("key") @@ -2051,7 +2170,14 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): + ["%s=%s" % (k, repr(getattr(self, k))) for k in kwarg] ) - def _set_parent(self, table, allow_replacements=True): + def _set_parent( + self, + parent: SchemaEventTarget, + allow_replacements: bool = True, + **kw: Any, + ) -> None: + table = parent + assert isinstance(table, Table) if not self.name: raise exc.ArgumentError( "Column must be constructed with a non-blank name or " @@ -2071,7 +2197,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): ) if self.key in table._columns: - col = table._columns.get(self.key) + col = table._columns[self.key] if col is not self: if not allow_replacements: util.warn_deprecated( @@ -2139,7 +2265,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): "An column cannot specify both Identity and Sequence." ) - def _setup_on_memoized_fks(self, fn): + def _setup_on_memoized_fks(self, fn: Callable[..., Any]) -> None: fk_keys = [ ((self.table.key, self.key), False), ((self.table.key, self.name), True), @@ -2150,7 +2276,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): if fk.link_to_name is link_to_name: fn(fk) - def _on_table_attach(self, fn): + def _on_table_attach(self, fn: Callable[..., Any]) -> None: if self.table is not None: fn(self, self.table) else: @@ -2161,10 +2287,10 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): "The :meth:`_schema.Column.copy` method is deprecated " "and will be removed in a future release.", ) - def copy(self, **kw): + def copy(self, **kw: Any) -> Column[Any]: return self._copy(**kw) - def _copy(self, **kw): + def _copy(self, **kw: Any) -> Column[Any]: """Create a copy of this ``Column``, uninitialized. This is used in :meth:`_schema.Table.to_metadata`. @@ -2172,9 +2298,15 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): """ # Constraint objects plus non-constraint-bound ForeignKey objects - args = [ - c._copy(**kw) for c in self.constraints if not c._type_bound - ] + [c._copy(**kw) for c in self.foreign_keys if not c.constraint] + args: List[SchemaItem] = [ + c._copy(**kw) + for c in self.constraints + if not c._type_bound # type: ignore + ] + [ + c._copy(**kw) # type: ignore + for c in self.foreign_keys + if not c.constraint + ] # ticket #5276 column_kwargs = {} @@ -2223,8 +2355,13 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): return self._schema_item_copy(c) def _make_proxy( - self, selectable, name=None, key=None, name_is_truncatable=False, **kw - ): + self, + selectable: FromClause, + name: Optional[str] = None, + key: Optional[str] = None, + name_is_truncatable: bool = False, + **kw: Any, + ) -> Tuple[str, ColumnClause[_T]]: """Create a *proxy* for this column. This is a copy of this ``Column`` referenced by a different parent @@ -2272,9 +2409,9 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): if selectable._is_clone_of is not None: c._is_clone_of = selectable._is_clone_of.columns.get(c.key) if self.primary_key: - selectable.primary_key.add(c) + selectable.primary_key.add(c) # type: ignore if fk: - selectable.foreign_keys.update(fk) + selectable.foreign_keys.update(fk) # type: ignore return c.key, c @@ -2326,17 +2463,17 @@ class ForeignKey(DialectKWArgs, SchemaItem): def __init__( self, - column: Union[str, Column[Any], SQLCoreOperations[Any]], - _constraint: Optional["ForeignKeyConstraint"] = None, + column: _DDLColumnArgument, + _constraint: Optional[ForeignKeyConstraint] = None, use_alter: bool = False, name: Optional[str] = None, onupdate: Optional[str] = None, ondelete: Optional[str] = None, deferrable: Optional[bool] = None, - initially: Optional[bool] = None, + initially: Optional[str] = None, link_to_name: bool = False, match: Optional[str] = None, - info: Optional[Dict[Any, Any]] = None, + info: Optional[_InfoType] = None, **dialect_kw: Any, ): r""" @@ -2446,7 +2583,7 @@ class ForeignKey(DialectKWArgs, SchemaItem): self.info = info self._unvalidated_dialect_kw = dialect_kw - def __repr__(self): + def __repr__(self) -> str: return "ForeignKey(%r)" % self._get_colspec() @util.deprecated( @@ -2454,10 +2591,10 @@ class ForeignKey(DialectKWArgs, SchemaItem): "The :meth:`_schema.ForeignKey.copy` method is deprecated " "and will be removed in a future release.", ) - def copy(self, schema=None, **kw): + def copy(self, *, schema: Optional[str] = None, **kw: Any) -> ForeignKey: return self._copy(schema=schema, **kw) - def _copy(self, schema=None, **kw): + def _copy(self, *, schema: Optional[str] = None, **kw: Any) -> ForeignKey: """Produce a copy of this :class:`_schema.ForeignKey` object. The new :class:`_schema.ForeignKey` will not be bound @@ -2487,7 +2624,17 @@ class ForeignKey(DialectKWArgs, SchemaItem): ) return self._schema_item_copy(fk) - def _get_colspec(self, schema=None, table_name=None, _is_copy=False): + def _get_colspec( + self, + schema: Optional[ + Union[ + str, + Literal[SchemaConst.RETAIN_SCHEMA, SchemaConst.BLANK_SCHEMA], + ] + ] = None, + table_name: Optional[str] = None, + _is_copy: bool = False, + ) -> str: """Return a string based 'column specification' for this :class:`_schema.ForeignKey`. @@ -2523,13 +2670,14 @@ class ForeignKey(DialectKWArgs, SchemaItem): self._table_column.key, ) else: + assert isinstance(self._colspec, str) return self._colspec @property - def _referred_schema(self): + def _referred_schema(self) -> Optional[str]: return self._column_tokens[0] - def _table_key(self): + def _table_key(self) -> Any: if self._table_column is not None: if self._table_column.table is None: return None @@ -2541,16 +2689,16 @@ class ForeignKey(DialectKWArgs, SchemaItem): target_fullname = property(_get_colspec) - def references(self, table): + def references(self, table: Table) -> bool: """Return True if the given :class:`_schema.Table` is referenced by this :class:`_schema.ForeignKey`.""" return table.corresponding_column(self.column) is not None - def get_referent(self, table): + def get_referent(self, table: FromClause) -> Optional[Column[Any]]: """Return the :class:`_schema.Column` in the given - :class:`_schema.Table` + :class:`_schema.Table` (or any :class:`.FromClause`) referenced by this :class:`_schema.ForeignKey`. Returns None if this :class:`_schema.ForeignKey` @@ -2559,10 +2707,10 @@ class ForeignKey(DialectKWArgs, SchemaItem): """ - return table.corresponding_column(self.column) + return table.columns.corresponding_column(self.column) @util.memoized_property - def _column_tokens(self): + def _column_tokens(self) -> Tuple[Optional[str], str, Optional[str]]: """parse a string-based _colspec into its component parts.""" m = self._get_colspec().split(".") @@ -2592,7 +2740,7 @@ class ForeignKey(DialectKWArgs, SchemaItem): schema = None return schema, tname, colname - def _resolve_col_tokens(self): + def _resolve_col_tokens(self) -> Tuple[Table, str, Optional[str]]: if self.parent is None: raise exc.InvalidRequestError( "this ForeignKey object does not yet have a " @@ -2627,7 +2775,9 @@ class ForeignKey(DialectKWArgs, SchemaItem): tablekey = _get_table_key(tname, schema) return parenttable, tablekey, colname - def _link_to_col_by_colstring(self, parenttable, table, colname): + def _link_to_col_by_colstring( + self, parenttable: Table, table: Table, colname: Optional[str] + ) -> Column[Any]: _column = None if colname is None: # colname is None in the case that ForeignKey argument @@ -2661,7 +2811,7 @@ class ForeignKey(DialectKWArgs, SchemaItem): return _column - def _set_target_column(self, column): + def _set_target_column(self, column: Column[Any]) -> None: assert self.parent is not None # propagate TypeEngine to parent if it didn't have one @@ -2671,16 +2821,16 @@ class ForeignKey(DialectKWArgs, SchemaItem): # super-edgy case, if other FKs point to our column, # they'd get the type propagated out also. - def set_type(fk): + def set_type(fk: ForeignKey) -> None: if fk.parent.type._isnull: fk.parent.type = column.type self.parent._setup_on_memoized_fks(set_type) - self.column = column + self.column = column # type: ignore - @util.memoized_property - def column(self): + @util.ro_memoized_property + def column(self) -> Column[Any]: """Return the target :class:`_schema.Column` referenced by this :class:`_schema.ForeignKey`. @@ -2689,6 +2839,8 @@ class ForeignKey(DialectKWArgs, SchemaItem): """ + _column: Column[Any] + if isinstance(self._colspec, str): parenttable, tablekey, colname = self._resolve_col_tokens() @@ -2730,14 +2882,14 @@ class ForeignKey(DialectKWArgs, SchemaItem): self.parent.foreign_keys.add(self) self.parent._on_table_attach(self._set_table) - def _set_remote_table(self, table): + def _set_remote_table(self, table: Table) -> None: parenttable, _, colname = self._resolve_col_tokens() _column = self._link_to_col_by_colstring(parenttable, table, colname) self._set_target_column(_column) assert self.constraint is not None self.constraint._validate_dest_table(table) - def _remove_from_metadata(self, metadata): + def _remove_from_metadata(self, metadata: MetaData) -> None: parenttable, table_key, colname = self._resolve_col_tokens() fk_key = (table_key, colname) @@ -2745,7 +2897,7 @@ class ForeignKey(DialectKWArgs, SchemaItem): # TODO: no test coverage for self not in memos metadata._fk_memos[fk_key].remove(self) - def _set_table(self, column, table): + def _set_table(self, column: Column[Any], table: Table) -> None: # standalone ForeignKey - create ForeignKeyConstraint # on the hosting Table when attached to the Table. assert isinstance(table, Table) @@ -2821,6 +2973,7 @@ class DefaultGenerator(Executable, SchemaItem): __visit_name__ = "default_generator" + _is_default_generator = True is_sequence = False is_server_default = False is_clause_element = False @@ -2828,7 +2981,7 @@ class DefaultGenerator(Executable, SchemaItem): is_scalar = False column: Optional[Column[Any]] - def __init__(self, for_update=False): + def __init__(self, for_update: bool = False) -> None: self.for_update = for_update def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: @@ -2841,8 +2994,27 @@ class DefaultGenerator(Executable, SchemaItem): self.column.default = self def _execute_on_connection( - self, connection, distilled_params, execution_options - ): + self, + connection: Connection, + distilled_params: _CoreMultiExecuteParams, + execution_options: _ExecuteOptionsParameter, + ) -> Any: + util.warn_deprecated( + "Using the .execute() method to invoke a " + "DefaultGenerator object is deprecated; please use " + "the .scalar() method.", + "2.0", + ) + return self._execute_on_scalar( + connection, distilled_params, execution_options + ) + + def _execute_on_scalar( + self, + connection: Connection, + distilled_params: _CoreMultiExecuteParams, + execution_options: _ExecuteOptionsParameter, + ) -> Any: return connection._execute_default( self, distilled_params, execution_options ) @@ -2933,7 +3105,7 @@ class ColumnDefault(DefaultGenerator, ABC): return object.__new__(cls) - def __repr__(self): + def __repr__(self) -> str: return f"{self.__class__.__name__}({self.arg!r})" @@ -2946,7 +3118,7 @@ class ScalarElementColumnDefault(ColumnDefault): is_scalar = True - def __init__(self, arg: Any, for_update: bool = False): + def __init__(self, arg: Any, for_update: bool = False) -> None: self.for_update = for_update self.arg = arg @@ -2970,13 +3142,13 @@ class ColumnElementColumnDefault(ColumnDefault): self, arg: _SQLExprDefault, for_update: bool = False, - ): + ) -> None: self.for_update = for_update self.arg = arg @util.memoized_property @util.preload_module("sqlalchemy.sql.sqltypes") - def _arg_is_typed(self): + def _arg_is_typed(self) -> bool: sqltypes = util.preloaded.sql_sqltypes return not isinstance(self.arg.type, sqltypes.NullType) @@ -3001,7 +3173,7 @@ class CallableColumnDefault(ColumnDefault): self, arg: Union[_CallableColumnDefaultProtocol, Callable[[], Any]], for_update: bool = False, - ): + ) -> None: self.for_update = for_update self.arg = self._maybe_wrap_callable(arg) @@ -3048,16 +3220,16 @@ class IdentityOptions: def __init__( self, - start=None, - increment=None, - minvalue=None, - maxvalue=None, - nominvalue=None, - nomaxvalue=None, - cycle=None, - cache=None, - order=None, - ): + start: Optional[int] = None, + increment: Optional[int] = None, + minvalue: Optional[int] = None, + maxvalue: Optional[int] = None, + nominvalue: Optional[bool] = None, + nomaxvalue: Optional[bool] = None, + cycle: Optional[bool] = None, + cache: Optional[bool] = None, + order: Optional[bool] = None, + ) -> None: """Construct a :class:`.IdentityOptions` object. See the :class:`.Sequence` documentation for a complete description @@ -3125,28 +3297,29 @@ class Sequence(HasSchemaAttr, IdentityOptions, DefaultGenerator): is_sequence = True - column: Optional[Column[Any]] = None + column: Optional[Column[Any]] + data_type: Optional[TypeEngine[int]] def __init__( self, - name, - start=None, - increment=None, - minvalue=None, - maxvalue=None, - nominvalue=None, - nomaxvalue=None, - cycle=None, - schema=None, - cache=None, - order=None, - data_type=None, - optional=False, - quote=None, - metadata=None, - quote_schema=None, - for_update=False, - ): + name: str, + start: Optional[int] = None, + increment: Optional[int] = None, + minvalue: Optional[int] = None, + maxvalue: Optional[int] = None, + nominvalue: Optional[bool] = None, + nomaxvalue: Optional[bool] = None, + cycle: Optional[bool] = None, + schema: Optional[Union[str, Literal[SchemaConst.BLANK_SCHEMA]]] = None, + cache: Optional[bool] = None, + order: Optional[bool] = None, + data_type: Optional[_TypeEngineArgument[int]] = None, + optional: bool = False, + quote: Optional[bool] = None, + metadata: Optional[MetaData] = None, + quote_schema: Optional[bool] = None, + for_update: bool = False, + ) -> None: """Construct a :class:`.Sequence` object. :param name: the name of the sequence. @@ -3298,6 +3471,7 @@ class Sequence(HasSchemaAttr, IdentityOptions, DefaultGenerator): cache=cache, order=order, ) + self.column = None self.name = quoted_name(name, quote) self.optional = optional if schema is BLANK_SCHEMA: @@ -3316,7 +3490,7 @@ class Sequence(HasSchemaAttr, IdentityOptions, DefaultGenerator): self.data_type = None @util.preload_module("sqlalchemy.sql.functions") - def next_value(self): + def next_value(self) -> Function[int]: """Return a :class:`.next_value` function element which will render the appropriate increment function for this :class:`.Sequence` within any SQL expression. @@ -3324,28 +3498,30 @@ class Sequence(HasSchemaAttr, IdentityOptions, DefaultGenerator): """ return util.preloaded.sql_functions.func.next_value(self) - def _set_parent(self, column, **kw): + def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: + column = parent + assert isinstance(column, Column) super(Sequence, self)._set_parent(column) column._on_table_attach(self._set_table) - def _set_table(self, column, table): + def _set_table(self, column: Column[Any], table: Table) -> None: self._set_metadata(table.metadata) - def _set_metadata(self, metadata): + def _set_metadata(self, metadata: MetaData) -> None: self.metadata = metadata self.metadata._sequences[self._key] = self - def create(self, bind, checkfirst=True): + def create(self, bind: _CreateDropBind, checkfirst: bool = True) -> None: """Creates this sequence in the database.""" bind._run_ddl_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst) - def drop(self, bind, checkfirst=True): + def drop(self, bind: _CreateDropBind, checkfirst: bool = True) -> None: """Drops this sequence from the database.""" bind._run_ddl_visitor(ddl.SchemaDropper, self, checkfirst=checkfirst) - def _not_a_column_expr(self): + def _not_a_column_expr(self) -> NoReturn: raise exc.InvalidRequestError( "This %s cannot be used directly " "as a column expression. Use func.next_value(sequence) " @@ -3380,30 +3556,34 @@ class FetchedValue(SchemaEventTarget): has_argument = False is_clause_element = False - def __init__(self, for_update=False): + column: Optional[Column[Any]] + + def __init__(self, for_update: bool = False) -> None: self.for_update = for_update - def _as_for_update(self, for_update): + def _as_for_update(self, for_update: bool) -> FetchedValue: if for_update == self.for_update: return self else: - return self._clone(for_update) + return self._clone(for_update) # type: ignore - def _clone(self, for_update): + def _clone(self, for_update: bool) -> Any: n = self.__class__.__new__(self.__class__) n.__dict__.update(self.__dict__) n.__dict__.pop("column", None) n.for_update = for_update return n - def _set_parent(self, column, **kw): + def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: + column = parent + assert isinstance(column, Column) self.column = column if self.for_update: self.column.server_onupdate = self else: self.column.server_default = self - def __repr__(self): + def __repr__(self) -> str: return util.generic_repr(self) @@ -3431,13 +3611,18 @@ class DefaultClause(FetchedValue): has_argument = True - def __init__(self, arg, for_update=False, _reflected=False): + def __init__( + self, + arg: Union[str, ClauseElement, TextClause], + for_update: bool = False, + _reflected: bool = False, + ) -> None: util.assert_arg_type(arg, (str, ClauseElement, TextClause), "arg") super(DefaultClause, self).__init__(for_update) self.arg = arg self.reflected = _reflected - def __repr__(self): + def __repr__(self) -> str: return "DefaultClause(%r, for_update=%r)" % (self.arg, self.for_update) @@ -3460,14 +3645,14 @@ class Constraint(DialectKWArgs, HasConditionalDDL, SchemaItem): def __init__( self, - name=None, - deferrable=None, - initially=None, - _create_rule=None, - info=None, - _type_bound=False, - **dialect_kw, - ): + name: Optional[str] = None, + deferrable: Optional[bool] = None, + initially: Optional[str] = None, + info: Optional[_InfoType] = None, + _create_rule: Optional[Any] = None, + _type_bound: bool = False, + **dialect_kw: Any, + ) -> None: r"""Create a SQL constraint. :param name: @@ -3510,7 +3695,9 @@ class Constraint(DialectKWArgs, HasConditionalDDL, SchemaItem): util.set_creation_order(self) self._validate_dialect_kwargs(dialect_kw) - def _should_create_for_compiler(self, compiler, **kw): + def _should_create_for_compiler( + self, compiler: DDLCompiler, **kw: Any + ) -> bool: if self._create_rule is not None and not self._create_rule(compiler): return False elif self._ddl_if is not None: @@ -3521,7 +3708,7 @@ class Constraint(DialectKWArgs, HasConditionalDDL, SchemaItem): return True @property - def table(self): + def table(self) -> Table: try: if isinstance(self.parent, Table): return self.parent @@ -3532,7 +3719,8 @@ class Constraint(DialectKWArgs, HasConditionalDDL, SchemaItem): "mean to call table.append_constraint(constraint) ?" ) - def _set_parent(self, parent, **kw): + def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: + assert isinstance(parent, (Table, Column)) self.parent = parent parent.constraints.add(self) @@ -3541,10 +3729,10 @@ class Constraint(DialectKWArgs, HasConditionalDDL, SchemaItem): "The :meth:`_schema.Constraint.copy` method is deprecated " "and will be removed in a future release.", ) - def copy(self, **kw): - return self._copy(**kw) + def copy(self: Self, **kw: Any) -> Self: + return self._copy(**kw) # type: ignore - def _copy(self, **kw): + def _copy(self: Self, **kw: Any) -> Self: raise NotImplementedError() @@ -3561,6 +3749,8 @@ class ColumnCollectionMixin: _allow_multiple_tables = False + _pending_colargs: List[Optional[Union[str, Column[Any]]]] + if TYPE_CHECKING: def _set_parent_with_dispatch( @@ -3568,18 +3758,28 @@ class ColumnCollectionMixin: ) -> None: ... - def __init__(self, *columns, **kw): - _autoattach = kw.pop("_autoattach", True) - self._column_flag = kw.pop("_column_flag", False) + def __init__( + self, + *columns: _DDLColumnArgument, + _autoattach: bool = True, + _column_flag: bool = False, + _gather_expressions: Optional[ + List[Union[str, ColumnElement[Any]]] + ] = None, + ) -> None: + self._column_flag = _column_flag self._columns = DedupeColumnCollection() - processed_expressions = kw.pop("_gather_expressions", None) + processed_expressions: Optional[ + List[Union[ColumnElement[Any], str]] + ] = _gather_expressions + if processed_expressions is not None: self._pending_colargs = [] for ( expr, - column, - strname, + _, + _, add_element, ) in coercions.expect_col_expression_collection( roles.DDLConstraintColumnRole, columns @@ -3595,7 +3795,7 @@ class ColumnCollectionMixin: if _autoattach and self._pending_colargs: self._check_attach() - def _check_attach(self, evt=False): + def _check_attach(self, evt: bool = False) -> None: col_objs = [c for c in self._pending_colargs if isinstance(c, Column)] cols_w_table = [c for c in col_objs if isinstance(c.table, Table)] @@ -3613,7 +3813,7 @@ class ColumnCollectionMixin: ).difference(col_objs) if not has_string_cols: - def _col_attached(column, table): + def _col_attached(column: Column[Any], table: Table) -> None: # this isinstance() corresponds with the # isinstance() above; only want to count Table-bound # columns @@ -3652,15 +3852,24 @@ class ColumnCollectionMixin: def c(self) -> ReadOnlyColumnCollection[str, Column[Any]]: return self._columns.as_readonly() - def _col_expressions(self, table: Table) -> List[Column[Any]]: - return [ - table.c[col] if isinstance(col, str) else col - for col in self._pending_colargs - ] + def _col_expressions( + self, parent: Union[Table, Column[Any]] + ) -> List[Optional[Column[Any]]]: + if isinstance(parent, Column): + result: List[Optional[Column[Any]]] = [ + c for c in self._pending_colargs if isinstance(c, Column) + ] + assert len(result) == len(self._pending_colargs) + return result + else: + return [ + parent.c[col] if isinstance(col, str) else col + for col in self._pending_colargs + ] def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: - if TYPE_CHECKING: - assert isinstance(parent, Table) + assert isinstance(parent, (Table, Column)) + for col in self._col_expressions(parent): if col is not None: self._columns.add(col) @@ -3669,7 +3878,18 @@ class ColumnCollectionMixin: class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint): """A constraint that proxies a ColumnCollection.""" - def __init__(self, *columns, **kw): + def __init__( + self, + *columns: _DDLColumnArgument, + name: Optional[str] = None, + deferrable: Optional[bool] = None, + initially: Optional[str] = None, + info: Optional[_InfoType] = None, + _autoattach: bool = True, + _column_flag: bool = False, + _gather_expressions: Optional[List[_DDLColumnArgument]] = None, + **dialect_kw: Any, + ) -> None: r""" :param \*columns: A sequence of column names or Column objects. @@ -3685,13 +3905,19 @@ class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint): Optional string. If set, emit INITIALLY <value> when issuing DDL for this constraint. - :param \**kw: other keyword arguments including dialect-specific - arguments are propagated to the :class:`.Constraint` superclass. + :param \**dialect_kw: other keyword arguments including + dialect-specific arguments are propagated to the :class:`.Constraint` + superclass. """ - _autoattach = kw.pop("_autoattach", True) - _column_flag = kw.pop("_column_flag", False) - Constraint.__init__(self, **kw) + Constraint.__init__( + self, + name=name, + deferrable=deferrable, + initially=initially, + info=info, + **dialect_kw, + ) ColumnCollectionMixin.__init__( self, *columns, _autoattach=_autoattach, _column_flag=_column_flag ) @@ -3702,11 +3928,12 @@ class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint): """ - def _set_parent(self, table, **kw): - Constraint._set_parent(self, table) - ColumnCollectionMixin._set_parent(self, table) + def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: + assert isinstance(parent, (Column, Table)) + Constraint._set_parent(self, parent) + ColumnCollectionMixin._set_parent(self, parent) - def __contains__(self, x): + def __contains__(self, x: Any) -> bool: return x in self._columns @util.deprecated( @@ -3714,10 +3941,20 @@ class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint): "The :meth:`_schema.ColumnCollectionConstraint.copy` method " "is deprecated and will be removed in a future release.", ) - def copy(self, target_table=None, **kw): + def copy( + self, + *, + target_table: Optional[Table] = None, + **kw: Any, + ) -> ColumnCollectionConstraint: return self._copy(target_table=target_table, **kw) - def _copy(self, target_table=None, **kw): + def _copy( + self, + *, + target_table: Optional[Table] = None, + **kw: Any, + ) -> ColumnCollectionConstraint: # ticket #5276 constraint_kwargs = {} for dialect_name in self.dialect_options: @@ -3730,6 +3967,7 @@ class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint): dialect_name + "_" + dialect_option_key ] = dialect_option_value + assert isinstance(self.parent, Table) c = self.__class__( name=self.name, deferrable=self.deferrable, @@ -3742,7 +3980,7 @@ class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint): ) return self._schema_item_copy(c) - def contains_column(self, col): + def contains_column(self, col: Column[Any]) -> bool: """Return True if this constraint contains the given column. Note that this object also contains an attribute ``.columns`` @@ -3777,17 +4015,17 @@ class CheckConstraint(ColumnCollectionConstraint): ) def __init__( self, - sqltext, - name=None, - deferrable=None, - initially=None, - table=None, - info=None, - _create_rule=None, - _autoattach=True, - _type_bound=False, - **kw, - ): + sqltext: _TextCoercedExpressionArgument[Any], + name: Optional[str] = None, + deferrable: Optional[bool] = None, + initially: Optional[str] = None, + table: Optional[Table] = None, + info: Optional[_InfoType] = None, + _create_rule: Optional[Any] = None, + _autoattach: bool = True, + _type_bound: bool = False, + **dialect_kw: Any, + ) -> None: r"""Construct a CHECK constraint. :param sqltext: @@ -3821,7 +4059,7 @@ class CheckConstraint(ColumnCollectionConstraint): columns: List[Column[Any]] = [] visitors.traverse(self.sqltext, {}, {"column": columns.append}) - super(CheckConstraint, self).__init__( + super().__init__( name=name, deferrable=deferrable, initially=initially, @@ -3830,13 +4068,13 @@ class CheckConstraint(ColumnCollectionConstraint): _type_bound=_type_bound, _autoattach=_autoattach, *columns, - **kw, + **dialect_kw, ) if table is not None: self._set_parent_with_dispatch(table) @property - def is_column_level(self): + def is_column_level(self) -> bool: return not isinstance(self.parent, Table) @util.deprecated( @@ -3844,10 +4082,14 @@ class CheckConstraint(ColumnCollectionConstraint): "The :meth:`_schema.CheckConstraint.copy` method is deprecated " "and will be removed in a future release.", ) - def copy(self, target_table=None, **kw): + def copy( + self, *, target_table: Optional[Table] = None, **kw: Any + ) -> CheckConstraint: return self._copy(target_table=target_table, **kw) - def _copy(self, target_table=None, **kw): + def _copy( + self, *, target_table: Optional[Table] = None, **kw: Any + ) -> CheckConstraint: if target_table is not None: # note that target_table is None for the copy process of # a column-bound CheckConstraint, so this path is not reached @@ -3886,20 +4128,20 @@ class ForeignKeyConstraint(ColumnCollectionConstraint): def __init__( self, - columns, - refcolumns, - name=None, - onupdate=None, - ondelete=None, - deferrable=None, - initially=None, - use_alter=False, - link_to_name=False, - match=None, - table=None, - info=None, - **dialect_kw, - ): + columns: _typing_Sequence[_DDLColumnArgument], + refcolumns: _typing_Sequence[_DDLColumnArgument], + name: Optional[str] = None, + onupdate: Optional[str] = None, + ondelete: Optional[str] = None, + deferrable: Optional[bool] = None, + initially: Optional[str] = None, + use_alter: bool = False, + link_to_name: bool = False, + match: Optional[str] = None, + table: Optional[Table] = None, + info: Optional[_InfoType] = None, + **dialect_kw: Any, + ) -> None: r"""Construct a composite-capable FOREIGN KEY. :param columns: A sequence of local column names. The named columns @@ -4051,19 +4293,19 @@ class ForeignKeyConstraint(ColumnCollectionConstraint): """ @property - def _elements(self): + def _elements(self) -> util.OrderedDict[str, ForeignKey]: # legacy - provide a dictionary view of (column_key, fk) return util.OrderedDict(zip(self.column_keys, self.elements)) @property - def _referred_schema(self): + def _referred_schema(self) -> Optional[str]: for elem in self.elements: return elem._referred_schema else: return None @property - def referred_table(self): + def referred_table(self) -> Table: """The :class:`_schema.Table` object to which this :class:`_schema.ForeignKeyConstraint` references. @@ -4076,7 +4318,7 @@ class ForeignKeyConstraint(ColumnCollectionConstraint): """ return self.elements[0].column.table - def _validate_dest_table(self, table): + def _validate_dest_table(self, table: Table) -> None: table_keys = set([elem._table_key() for elem in self.elements]) if None not in table_keys and len(table_keys) > 1: elem0, elem1 = sorted(table_keys)[0:2] @@ -4087,7 +4329,7 @@ class ForeignKeyConstraint(ColumnCollectionConstraint): ) @property - def column_keys(self): + def column_keys(self) -> _typing_Sequence[str]: """Return a list of string keys representing the local columns in this :class:`_schema.ForeignKeyConstraint`. @@ -4108,10 +4350,12 @@ class ForeignKeyConstraint(ColumnCollectionConstraint): ] @property - def _col_description(self): + def _col_description(self) -> str: return ", ".join(self.column_keys) - def _set_parent(self, table, **kw): + def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: + table = parent + assert isinstance(table, Table) Constraint._set_parent(self, table) try: @@ -4134,10 +4378,22 @@ class ForeignKeyConstraint(ColumnCollectionConstraint): "The :meth:`_schema.ForeignKeyConstraint.copy` method is deprecated " "and will be removed in a future release.", ) - def copy(self, schema=None, target_table=None, **kw): + def copy( + self, + *, + schema: Optional[str] = None, + target_table: Optional[Table] = None, + **kw: Any, + ) -> ForeignKeyConstraint: return self._copy(schema=schema, target_table=target_table, **kw) - def _copy(self, schema=None, target_table=None, **kw): + def _copy( + self, + *, + schema: Optional[str] = None, + target_table: Optional[Table] = None, + **kw: Any, + ) -> ForeignKeyConstraint: fkc = ForeignKeyConstraint( [x.parent.key for x in self.elements], [ @@ -4241,16 +4497,34 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint): __visit_name__ = "primary_key_constraint" - def __init__(self, *columns, **kw): - self._implicit_generated = kw.pop("_implicit_generated", False) - super(PrimaryKeyConstraint, self).__init__(*columns, **kw) + def __init__( + self, + *columns: _DDLColumnArgument, + name: Optional[str] = None, + deferrable: Optional[bool] = None, + initially: Optional[str] = None, + info: Optional[_InfoType] = None, + _implicit_generated: bool = False, + **dialect_kw: Any, + ) -> None: + self._implicit_generated = _implicit_generated + super(PrimaryKeyConstraint, self).__init__( + *columns, + name=name, + deferrable=deferrable, + initially=initially, + info=info, + **dialect_kw, + ) - def _set_parent(self, table, **kw): + def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: + table = parent + assert isinstance(table, Table) super(PrimaryKeyConstraint, self)._set_parent(table) if table.primary_key is not self: table.constraints.discard(table.primary_key) - table.primary_key = self + table.primary_key = self # type: ignore table.constraints.add(self) table_pks = [c for c in table.c if c.primary_key] @@ -4280,7 +4554,7 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint): if table_pks: self._columns.extend(table_pks) - def _reload(self, columns): + def _reload(self, columns: Iterable[Column[Any]]) -> None: """repopulate this :class:`.PrimaryKeyConstraint` given a set of columns. @@ -4309,14 +4583,14 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint): PrimaryKeyConstraint._autoincrement_column._reset(self) # type: ignore self._set_parent_with_dispatch(self.table) - def _replace(self, col): + def _replace(self, col: Column[Any]) -> None: PrimaryKeyConstraint._autoincrement_column._reset(self) # type: ignore self._columns.replace(col) self.dispatch._sa_event_column_added_to_pk_constraint(self, col) @property - def columns_autoinc_first(self): + def columns_autoinc_first(self) -> List[Column[Any]]: autoinc = self._autoincrement_column if autoinc is not None: @@ -4326,7 +4600,7 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint): @util.ro_memoized_property def _autoincrement_column(self) -> Optional[Column[Any]]: - def _validate_autoinc(col, autoinc_true): + def _validate_autoinc(col: Column[Any], autoinc_true: bool) -> bool: if col.type._type_affinity is None or not issubclass( col.type._type_affinity, type_api.INTEGERTYPE._type_affinity ): @@ -4478,7 +4752,21 @@ class Index( __visit_name__ = "index" - def __init__(self, name, *expressions, **kw): + table: Optional[Table] + expressions: _typing_Sequence[Union[str, ColumnElement[Any]]] + _table_bound_expressions: _typing_Sequence[ColumnElement[Any]] + + def __init__( + self, + name: Optional[str], + *expressions: _DDLColumnArgument, + unique: bool = False, + quote: Optional[bool] = None, + info: Optional[_InfoType] = None, + _table: Optional[Table] = None, + _column_flag: bool = False, + **dialect_kw: Any, + ) -> None: r"""Construct an index object. :param name: @@ -4503,8 +4791,8 @@ class Index( .. versionadded:: 1.0.0 - :param \**kw: Additional keyword arguments not mentioned above are - dialect specific, and passed in the form + :param \**dialect_kw: Additional keyword arguments not mentioned above + are dialect specific, and passed in the form ``<dialectname>_<argname>``. See the documentation regarding an individual dialect at :ref:`dialect_toplevel` for detail on documented arguments. @@ -4512,20 +4800,19 @@ class Index( """ self.table = table = None - self.name = quoted_name.construct(name, kw.pop("quote", None)) - self.unique = kw.pop("unique", False) - _column_flag = kw.pop("_column_flag", False) - if "info" in kw: - self.info = kw.pop("info") + self.name = quoted_name.construct(name, quote) + self.unique = unique + if info is not None: + self.info = info # TODO: consider "table" argument being public, but for # the purpose of the fix here, it starts as private. - if "_table" in kw: - table = kw.pop("_table") + if _table is not None: + table = _table - self._validate_dialect_kwargs(kw) + self._validate_dialect_kwargs(dialect_kw) - self.expressions: List[ColumnElement[Any]] = [] + self.expressions = [] # will call _set_parent() if table-bound column # objects are present ColumnCollectionMixin.__init__( @@ -4534,11 +4821,12 @@ class Index( _column_flag=_column_flag, _gather_expressions=self.expressions, ) - if table is not None: self._set_parent(table) - def _set_parent(self, table, **kw): + def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: + table = parent + assert isinstance(table, Table) ColumnCollectionMixin._set_parent(self, table) if self.table is not None and table is not self.table: @@ -4553,12 +4841,18 @@ class Index( expressions = self.expressions col_expressions = self._col_expressions(table) assert len(expressions) == len(col_expressions) - self.expressions = [ - expr if isinstance(expr, ClauseElement) else colexpr - for expr, colexpr in zip(expressions, col_expressions) - ] - def create(self, bind, checkfirst=False): + exprs = [] + for expr, colexpr in zip(expressions, col_expressions): + if isinstance(expr, ClauseElement): + exprs.append(expr) + elif colexpr is not None: + exprs.append(colexpr) + else: + assert False + self.expressions = self._table_bound_expressions = exprs + + def create(self, bind: _CreateDropBind, checkfirst: bool = False) -> None: """Issue a ``CREATE`` statement for this :class:`.Index`, using the given :class:`.Connection` or :class:`.Engine`` for connectivity. @@ -4569,9 +4863,8 @@ class Index( """ bind._run_ddl_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst) - return self - def drop(self, bind, checkfirst=False): + def drop(self, bind: _CreateDropBind, checkfirst: bool = False) -> None: """Issue a ``DROP`` statement for this :class:`.Index`, using the given :class:`.Connection` or :class:`.Engine` for connectivity. @@ -4583,7 +4876,9 @@ class Index( """ bind._run_ddl_visitor(ddl.SchemaDropper, self, checkfirst=checkfirst) - def __repr__(self): + def __repr__(self) -> str: + exprs: _typing_Sequence[Any] + return "Index(%s)" % ( ", ".join( [repr(self.name)] @@ -4593,7 +4888,9 @@ class Index( ) -DEFAULT_NAMING_CONVENTION = util.immutabledict({"ix": "ix_%(column_0_label)s"}) +DEFAULT_NAMING_CONVENTION: util.immutabledict[str, str] = util.immutabledict( + {"ix": "ix_%(column_0_label)s"} +) class MetaData(HasSchemaAttr): @@ -4628,8 +4925,8 @@ class MetaData(HasSchemaAttr): schema: Optional[str] = None, quote_schema: Optional[bool] = None, naming_convention: Optional[Dict[str, str]] = None, - info: Optional[Dict[Any, Any]] = None, - ): + info: Optional[_InfoType] = None, + ) -> None: """Create a new MetaData object. :param schema: @@ -4758,7 +5055,7 @@ class MetaData(HasSchemaAttr): self._schemas: Set[str] = set() self._sequences: Dict[str, Sequence] = {} self._fk_memos: Dict[ - Tuple[str, str], List[ForeignKey] + Tuple[str, Optional[str]], List[ForeignKey] ] = collections.defaultdict(list) tables: util.FacadeDict[str, Table] @@ -4787,13 +5084,15 @@ class MetaData(HasSchemaAttr): table_or_key = table_or_key.key return table_or_key in self.tables - def _add_table(self, name, schema, table): + def _add_table( + self, name: str, schema: Optional[str], table: Table + ) -> None: key = _get_table_key(name, schema) self.tables._insert_item(key, table) if schema: self._schemas.add(schema) - def _remove_table(self, name, schema): + def _remove_table(self, name: str, schema: Optional[str]) -> None: key = _get_table_key(name, schema) removed = dict.pop(self.tables, key, None) # type: ignore if removed is not None: @@ -4808,7 +5107,7 @@ class MetaData(HasSchemaAttr): ] ) - def __getstate__(self): + def __getstate__(self) -> Dict[str, Any]: return { "tables": self.tables, "schema": self.schema, @@ -4818,7 +5117,7 @@ class MetaData(HasSchemaAttr): "naming_convention": self.naming_convention, } - def __setstate__(self, state): + def __setstate__(self, state: Dict[str, Any]) -> None: self.tables = state["tables"] self.schema = state["schema"] self.naming_convention = state["naming_convention"] @@ -5054,7 +5353,7 @@ class MetaData(HasSchemaAttr): def create_all( self, - bind: Union[Engine, Connection, MockConnection], + bind: _CreateDropBind, tables: Optional[_typing_Sequence[Table]] = None, checkfirst: bool = True, ) -> None: @@ -5082,7 +5381,7 @@ class MetaData(HasSchemaAttr): def drop_all( self, - bind: Union[Engine, Connection, MockConnection], + bind: _CreateDropBind, tables: Optional[_typing_Sequence[Table]] = None, checkfirst: bool = True, ) -> None: @@ -5134,10 +5433,14 @@ class Computed(FetchedValue, SchemaItem): __visit_name__ = "computed_column" + column: Optional[Column[Any]] + @_document_text_coercion( "sqltext", ":class:`.Computed`", ":paramref:`.Computed.sqltext`" ) - def __init__(self, sqltext, persisted=None): + def __init__( + self, sqltext: _DDLColumnArgument, persisted: Optional[bool] = None + ) -> None: """Construct a GENERATED ALWAYS AS DDL construct to accompany a :class:`_schema.Column`. @@ -5170,7 +5473,9 @@ class Computed(FetchedValue, SchemaItem): self.persisted = persisted self.column = None - def _set_parent(self, parent, **kw): + def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: + assert isinstance(parent, Column) + if not isinstance( parent.server_default, (type(None), Computed) ) or not isinstance(parent.server_onupdate, (type(None), Computed)): @@ -5183,7 +5488,7 @@ class Computed(FetchedValue, SchemaItem): self.column.server_onupdate = self self.column.server_default = self - def _as_for_update(self, for_update): + def _as_for_update(self, for_update: bool) -> FetchedValue: return self @util.deprecated( @@ -5191,10 +5496,14 @@ class Computed(FetchedValue, SchemaItem): "The :meth:`_schema.Computed.copy` method is deprecated " "and will be removed in a future release.", ) - def copy(self, target_table=None, **kw): - return self._copy(target_table, **kw) + def copy( + self, *, target_table: Optional[Table] = None, **kw: Any + ) -> Computed: + return self._copy(target_table=target_table, **kw) - def _copy(self, target_table=None, **kw): + def _copy( + self, *, target_table: Optional[Table] = None, **kw: Any + ) -> Computed: sqltext = _copy_expression( self.sqltext, self.column.table if self.column is not None else None, @@ -5233,18 +5542,18 @@ class Identity(IdentityOptions, FetchedValue, SchemaItem): def __init__( self, - always=False, - on_null=None, - start=None, - increment=None, - minvalue=None, - maxvalue=None, - nominvalue=None, - nomaxvalue=None, - cycle=None, - cache=None, - order=None, - ): + always: bool = False, + on_null: Optional[bool] = None, + start: Optional[int] = None, + increment: Optional[int] = None, + minvalue: Optional[int] = None, + maxvalue: Optional[int] = None, + nominvalue: Optional[bool] = None, + nomaxvalue: Optional[bool] = None, + cycle: Optional[bool] = None, + cache: Optional[bool] = None, + order: Optional[bool] = None, + ) -> None: """Construct a GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY DDL construct to accompany a :class:`_schema.Column`. @@ -5306,7 +5615,8 @@ class Identity(IdentityOptions, FetchedValue, SchemaItem): self.on_null = on_null self.column = None - def _set_parent(self, parent, **kw): + def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: + assert isinstance(parent, Column) if not isinstance( parent.server_default, (type(None), Identity) ) or not isinstance(parent.server_onupdate, type(None)): @@ -5327,7 +5637,7 @@ class Identity(IdentityOptions, FetchedValue, SchemaItem): parent.server_default = self - def _as_for_update(self, for_update): + def _as_for_update(self, for_update: bool) -> FetchedValue: return self @util.deprecated( @@ -5335,10 +5645,10 @@ class Identity(IdentityOptions, FetchedValue, SchemaItem): "The :meth:`_schema.Identity.copy` method is deprecated " "and will be removed in a future release.", ) - def copy(self, **kw): + def copy(self, **kw: Any) -> Identity: return self._copy(**kw) - def _copy(self, **kw): + def _copy(self, **kw: Any) -> Identity: i = Identity( always=self.always, on_null=self.on_null, |