summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/sql/schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/sql/schema.py')
-rw-r--r--lib/sqlalchemy/sql/schema.py1228
1 files changed, 769 insertions, 459 deletions
diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py
index c9b67caca..92b9cc62c 100644
--- a/lib/sqlalchemy/sql/schema.py
+++ b/lib/sqlalchemy/sql/schema.py
@@ -32,21 +32,22 @@ from __future__ import annotations
from abc import ABC
import collections
+from enum import Enum
import operator
import typing
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
+from typing import Iterable
from typing import Iterator
from typing import List
-from typing import MutableMapping
+from typing import NoReturn
from typing import Optional
from typing import overload
from typing import Sequence as _typing_Sequence
from typing import Set
from typing import Tuple
-from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
@@ -65,7 +66,6 @@ from .elements import ClauseElement
from .elements import ColumnClause
from .elements import ColumnElement
from .elements import quoted_name
-from .elements import SQLCoreOperations
from .elements import TextClause
from .selectable import TableClause
from .type_api import to_instance
@@ -75,53 +75,91 @@ from .. import event
from .. import exc
from .. import inspection
from .. import util
+from ..util.typing import Final
from ..util.typing import Literal
from ..util.typing import Protocol
+from ..util.typing import Self
from ..util.typing import TypeGuard
if typing.TYPE_CHECKING:
+ from ._typing import _DDLColumnArgument
+ from ._typing import _InfoType
+ from ._typing import _TextCoercedExpressionArgument
+ from ._typing import _TypeEngineArgument
+ from .base import ColumnCollection
+ from .base import DedupeColumnCollection
from .base import ReadOnlyColumnCollection
+ from .compiler import DDLCompiler
+ from .elements import BindParameter
+ from .functions import Function
from .type_api import TypeEngine
+ from .visitors import _TraverseInternalsType
+ from .visitors import anon_map
from ..engine import Connection
from ..engine import Engine
+ from ..engine.cursor import CursorResult
+ from ..engine.interfaces import _CoreMultiExecuteParams
+ from ..engine.interfaces import _CoreSingleExecuteParams
+ from ..engine.interfaces import _ExecuteOptionsParameter
from ..engine.interfaces import ExecutionContext
from ..engine.mock import MockConnection
+ from ..sql.selectable import FromClause
+
_T = TypeVar("_T", bound="Any")
+_SI = TypeVar("_SI", bound="SchemaItem")
_ServerDefaultType = Union["FetchedValue", str, TextClause, ColumnElement]
_TAB = TypeVar("_TAB", bound="Table")
-RETAIN_SCHEMA = util.symbol(
- "retain_schema"
+
+_CreateDropBind = Union["Engine", "Connection", "MockConnection"]
+
+
+class SchemaConst(Enum):
+
+ RETAIN_SCHEMA = 1
"""Symbol indicating that a :class:`_schema.Table`, :class:`.Sequence`
or in some cases a :class:`_schema.ForeignKey` object, in situations
where the object is being copied for a :meth:`.Table.to_metadata`
operation, should retain the schema name that it already has.
"""
-)
-BLANK_SCHEMA = util.symbol(
- "blank_schema",
- """Symbol indicating that a :class:`_schema.Table`, :class:`.Sequence`
- or in some cases a :class:`_schema.ForeignKey` object
+ BLANK_SCHEMA = 2
+ """Symbol indicating that a :class:`_schema.Table` or :class:`.Sequence`
should have 'None' for its schema, even if the parent
:class:`_schema.MetaData` has specified a schema.
+ .. seealso::
+
+ :paramref:`_schema.MetaData.schema`
+
+ :paramref:`_schema.Table.schema`
+
+ :paramref:`.Sequence.schema`
+
.. versionadded:: 1.0.14
- """,
-)
+ """
-NULL_UNSPECIFIED = util.symbol(
- "NULL_UNSPECIFIED",
+ NULL_UNSPECIFIED = 3
"""Symbol indicating the "nullable" keyword was not passed to a Column.
Normally we would expect None to be acceptable for this but some backends
such as that of SQL Server place special signficance on a "nullability"
value of None.
- """,
-)
+ """
+
+
+RETAIN_SCHEMA: Final[
+ Literal[SchemaConst.RETAIN_SCHEMA]
+] = SchemaConst.RETAIN_SCHEMA
+BLANK_SCHEMA: Final[
+ Literal[SchemaConst.BLANK_SCHEMA]
+] = SchemaConst.BLANK_SCHEMA
+NULL_UNSPECIFIED: Final[
+ Literal[SchemaConst.NULL_UNSPECIFIED]
+] = SchemaConst.NULL_UNSPECIFIED
def _get_table_key(name: str, schema: Optional[str]) -> str:
@@ -170,7 +208,7 @@ class SchemaItem(SchemaEventTarget, visitors.Visitable):
create_drop_stringify_dialect = "default"
- def _init_items(self, *args, **kw):
+ def _init_items(self, *args: SchemaItem, **kw: Any) -> None:
"""Initialize the list of child items for this SchemaItem."""
for item in args:
if item is not None:
@@ -184,11 +222,11 @@ class SchemaItem(SchemaEventTarget, visitors.Visitable):
else:
spwd(self, **kw)
- def __repr__(self):
+ def __repr__(self) -> str:
return util.generic_repr(self, omit_kwarg=["info"])
@util.memoized_property
- def info(self):
+ def info(self) -> _InfoType:
"""Info dictionary associated with the object, allowing user-defined
data to be associated with this :class:`.SchemaItem`.
@@ -199,7 +237,7 @@ class SchemaItem(SchemaEventTarget, visitors.Visitable):
"""
return {}
- def _schema_item_copy(self, schema_item):
+ def _schema_item_copy(self, schema_item: _SI) -> _SI:
if "info" in self.__dict__:
schema_item.info = self.info.copy()
schema_item.dispatch._update(self.dispatch)
@@ -235,9 +273,9 @@ class HasConditionalDDL:
r"""apply a conditional DDL rule to this schema item.
These rules work in a similar manner to the
- :meth:`.DDLElement.execute_if` callable, with the added feature that
- the criteria may be checked within the DDL compilation phase for a
- construct such as :class:`.CreateTable`.
+ :meth:`.ExecutableDDLElement.execute_if` callable, with the added
+ feature that the criteria may be checked within the DDL compilation
+ phase for a construct such as :class:`.CreateTable`.
:meth:`.HasConditionalDDL.ddl_if` currently applies towards the
:class:`.Index` construct as well as all :class:`.Constraint`
constructs.
@@ -246,7 +284,8 @@ class HasConditionalDDL:
to indicate multiple dialect types.
:param callable\_: a callable that is constructed using the same form
- as that described in :paramref:`.DDLElement.execute_if.callable_`.
+ as that described in
+ :paramref:`.ExecutableDDLElement.execute_if.callable_`.
:param state: any arbitrary object that will be passed to the
callable, if present.
@@ -306,6 +345,8 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
def foreign_keys(self) -> Set[ForeignKey]:
...
+ _columns: DedupeColumnCollection[Column[Any]]
+
constraints: Set[Constraint]
"""A collection of all :class:`_schema.Constraint` objects associated with
this :class:`_schema.Table`.
@@ -344,25 +385,30 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
"""
- _traverse_internals = TableClause._traverse_internals + [
- ("schema", InternalTraversal.dp_string)
- ]
+ _traverse_internals: _TraverseInternalsType = (
+ TableClause._traverse_internals
+ + [("schema", InternalTraversal.dp_string)]
+ )
if TYPE_CHECKING:
- # we are upgrading .c and .columns to return Column, not
- # ColumnClause. mypy typically sees this as incompatible because
- # the contract of TableClause is that we can put a ColumnClause
- # into this collection. does not recognize its immutability
- # for the moment.
+
+ @util.ro_non_memoized_property
+ def columns(self) -> ReadOnlyColumnCollection[str, Column[Any]]:
+ ...
+
@util.ro_non_memoized_property
- def columns(self) -> ReadOnlyColumnCollection[str, Column[Any]]: # type: ignore # noqa: E501
+ def exported_columns(
+ self,
+ ) -> ReadOnlyColumnCollection[str, Column[Any]]:
...
@util.ro_non_memoized_property
- def c(self) -> ReadOnlyColumnCollection[str, Column[Any]]: # type: ignore # noqa: E501
+ def c(self) -> ReadOnlyColumnCollection[str, Column[Any]]:
...
- def _gen_cache_key(self, anon_map, bindparams):
+ def _gen_cache_key(
+ self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
+ ) -> Tuple[Any, ...]:
if self._annotations:
return (self,) + self._annotations_cache_key
else:
@@ -382,7 +428,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
return cls._new(*args, **kw)
@classmethod
- def _new(cls, *args, **kw):
+ def _new(cls, *args: Any, **kw: Any) -> Any:
if not args and not kw:
# python3k pickle seems to call this
return object.__new__(cls)
@@ -429,7 +475,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
table.dispatch.before_parent_attach(table, metadata)
metadata._add_table(name, schema, table)
try:
- table._init(name, metadata, *args, **kw)
+ table.__init__(name, metadata, *args, _no_init=False, **kw)
table.dispatch.after_parent_attach(table, metadata)
return table
except Exception:
@@ -439,10 +485,31 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
def __init__(
self,
name: str,
- metadata: "MetaData",
+ metadata: MetaData,
*args: SchemaItem,
+ schema: Optional[Union[str, Literal[SchemaConst.BLANK_SCHEMA]]] = None,
+ quote: Optional[bool] = None,
+ quote_schema: Optional[bool] = None,
+ autoload_with: Optional[Union[Engine, Connection]] = None,
+ autoload_replace: bool = True,
+ keep_existing: bool = False,
+ extend_existing: bool = False,
+ resolve_fks: bool = True,
+ include_columns: Optional[Iterable[str]] = None,
+ implicit_returning: bool = True,
+ comment: Optional[str] = None,
+ info: Optional[Dict[Any, Any]] = None,
+ listeners: Optional[
+ _typing_Sequence[Tuple[str, Callable[..., Any]]]
+ ] = None,
+ prefixes: Optional[_typing_Sequence[str]] = None,
+ # used internally in the metadata.reflect() process
+ _extend_on: Optional[Set[Table]] = None,
+ # used by __new__ to bypass __init__
+ _no_init: bool = True,
+ # dialect-specific keyword args
**kw: Any,
- ):
+ ) -> None:
r"""Constructor for :class:`_schema.Table`.
@@ -731,24 +798,22 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
:ref:`dialect_toplevel` for detail on documented arguments.
""" # noqa: E501
+ if _no_init:
+ # don't run __init__ from __new__ by default;
+ # __new__ has a specific place that __init__ is called
+ return
- # __init__ is overridden to prevent __new__ from
- # calling the superclass constructor.
-
- def _init(self, name, metadata, *args, **kwargs):
- super(Table, self).__init__(
- quoted_name(name, kwargs.pop("quote", None))
- )
+ super().__init__(quoted_name(name, quote))
self.metadata = metadata
- self.schema = kwargs.pop("schema", None)
- if self.schema is None:
+ if schema is None:
self.schema = metadata.schema
- elif self.schema is BLANK_SCHEMA:
+ elif schema is BLANK_SCHEMA:
self.schema = None
else:
- quote_schema = kwargs.pop("quote_schema", None)
- self.schema = quoted_name(self.schema, quote_schema)
+ quote_schema = quote_schema
+ assert isinstance(schema, str)
+ self.schema = quoted_name(schema, quote_schema)
self.indexes = set()
self.constraints = set()
@@ -756,42 +821,31 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
_implicit_generated=True
)._set_parent_with_dispatch(self)
self.foreign_keys = set() # type: ignore
- self._extra_dependencies = set()
+ self._extra_dependencies: Set[Table] = set()
if self.schema is not None:
self.fullname = "%s.%s" % (self.schema, self.name)
else:
self.fullname = self.name
- autoload_with = kwargs.pop("autoload_with", None)
- autoload = autoload_with is not None
- # this argument is only used with _init_existing()
- kwargs.pop("autoload_replace", True)
- keep_existing = kwargs.pop("keep_existing", False)
- extend_existing = kwargs.pop("extend_existing", False)
- _extend_on = kwargs.pop("_extend_on", None)
-
- resolve_fks = kwargs.pop("resolve_fks", True)
- include_columns = kwargs.pop("include_columns", None)
+ self.implicit_returning = implicit_returning
- self.implicit_returning = kwargs.pop("implicit_returning", True)
+ self.comment = comment
- self.comment = kwargs.pop("comment", None)
+ if info is not None:
+ self.info = info
- if "info" in kwargs:
- self.info = kwargs.pop("info")
- if "listeners" in kwargs:
- listeners = kwargs.pop("listeners")
+ if listeners is not None:
for evt, fn in listeners:
event.listen(self, evt, fn)
- self._prefixes = kwargs.pop("prefixes", None) or []
+ self._prefixes = prefixes if prefixes else []
- self._extra_kwargs(**kwargs)
+ self._extra_kwargs(**kw)
# load column definitions from the database if 'autoload' is defined
# we do it after the table is in the singleton dictionary to support
# circular foreign keys
- if autoload:
+ if autoload_with is not None:
self._autoload(
metadata,
autoload_with,
@@ -805,18 +859,20 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
self._init_items(
*args,
- allow_replacements=extend_existing or keep_existing or autoload,
+ allow_replacements=extend_existing
+ or keep_existing
+ or autoload_with,
)
def _autoload(
self,
- metadata,
- autoload_with,
- include_columns,
- exclude_columns=(),
- resolve_fks=True,
- _extend_on=None,
- ):
+ metadata: MetaData,
+ autoload_with: Union[Engine, Connection],
+ include_columns: Optional[Iterable[str]],
+ exclude_columns: Iterable[str] = (),
+ resolve_fks: bool = True,
+ _extend_on: Optional[Set[Table]] = None,
+ ) -> None:
insp = inspection.inspect(autoload_with)
with insp._inspection_context() as conn_insp:
conn_insp.reflect_table(
@@ -837,7 +893,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
return sorted(self.constraints, key=lambda c: c._creation_order)
@property
- def foreign_key_constraints(self):
+ def foreign_key_constraints(self) -> Set[ForeignKeyConstraint]:
""":class:`_schema.ForeignKeyConstraint` objects referred to by this
:class:`_schema.Table`.
@@ -855,9 +911,13 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
:attr:`_schema.Table.indexes`
"""
- return set(fkc.constraint for fkc in self.foreign_keys)
+ return set(
+ fkc.constraint
+ for fkc in self.foreign_keys
+ if fkc.constraint is not None
+ )
- def _init_existing(self, *args, **kwargs):
+ def _init_existing(self, *args: Any, **kwargs: Any) -> None:
autoload_with = kwargs.pop("autoload_with", None)
autoload = kwargs.pop("autoload", autoload_with is not None)
autoload_replace = kwargs.pop("autoload_replace", True)
@@ -916,13 +976,13 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
self._extra_kwargs(**kwargs)
self._init_items(*args)
- def _extra_kwargs(self, **kwargs):
+ def _extra_kwargs(self, **kwargs: Any) -> None:
self._validate_dialect_kwargs(kwargs)
- def _init_collections(self):
+ def _init_collections(self) -> None:
pass
- def _reset_exported(self):
+ def _reset_exported(self) -> None:
pass
@util.ro_non_memoized_property
@@ -930,7 +990,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
return self.primary_key._autoincrement_column
@property
- def key(self):
+ def key(self) -> str:
"""Return the 'key' for this :class:`_schema.Table`.
This value is used as the dictionary key within the
@@ -943,7 +1003,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
"""
return _get_table_key(self.name, self.schema)
- def __repr__(self):
+ def __repr__(self) -> str:
return "Table(%s)" % ", ".join(
[repr(self.name)]
+ [repr(self.metadata)]
@@ -951,10 +1011,10 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
+ ["%s=%s" % (k, repr(getattr(self, k))) for k in ["schema"]]
)
- def __str__(self):
+ def __str__(self) -> str:
return _get_table_key(self.description, self.schema)
- def add_is_dependent_on(self, table):
+ def add_is_dependent_on(self, table: Table) -> None:
"""Add a 'dependency' for this Table.
This is another Table object which must be created
@@ -968,7 +1028,9 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
"""
self._extra_dependencies.add(table)
- def append_column(self, column, replace_existing=False):
+ def append_column(
+ self, column: ColumnClause[Any], replace_existing: bool = False
+ ) -> None:
"""Append a :class:`_schema.Column` to this :class:`_schema.Table`.
The "key" of the newly added :class:`_schema.Column`, i.e. the
@@ -998,7 +1060,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
self, allow_replacements=replace_existing
)
- def append_constraint(self, constraint):
+ def append_constraint(self, constraint: Union[Index, Constraint]) -> None:
"""Append a :class:`_schema.Constraint` to this
:class:`_schema.Table`.
@@ -1019,11 +1081,13 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
constraint._set_parent_with_dispatch(self)
- def _set_parent(self, metadata, **kw):
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ metadata = parent
+ assert isinstance(metadata, MetaData)
metadata._add_table(self.name, self.schema, self)
self.metadata = metadata
- def create(self, bind, checkfirst=False):
+ def create(self, bind: _CreateDropBind, checkfirst: bool = False) -> None:
"""Issue a ``CREATE`` statement for this
:class:`_schema.Table`, using the given
:class:`.Connection` or :class:`.Engine`
@@ -1037,7 +1101,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
bind._run_ddl_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
- def drop(self, bind, checkfirst=False):
+ def drop(self, bind: _CreateDropBind, checkfirst: bool = False) -> None:
"""Issue a ``DROP`` statement for this
:class:`_schema.Table`, using the given
:class:`.Connection` or :class:`.Engine` for connectivity.
@@ -1056,11 +1120,16 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
)
def tometadata(
self,
- metadata,
- schema=RETAIN_SCHEMA,
- referred_schema_fn=None,
- name=None,
- ):
+ metadata: MetaData,
+ schema: Union[str, Literal[SchemaConst.RETAIN_SCHEMA]] = RETAIN_SCHEMA,
+ referred_schema_fn: Optional[
+ Callable[
+ [Table, Optional[str], ForeignKeyConstraint, Optional[str]],
+ Optional[str],
+ ]
+ ] = None,
+ name: Optional[str] = None,
+ ) -> Table:
"""Return a copy of this :class:`_schema.Table`
associated with a different
:class:`_schema.MetaData`.
@@ -1077,11 +1146,16 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
def to_metadata(
self,
- metadata,
- schema=RETAIN_SCHEMA,
- referred_schema_fn=None,
- name=None,
- ):
+ metadata: MetaData,
+ schema: Union[str, Literal[SchemaConst.RETAIN_SCHEMA]] = RETAIN_SCHEMA,
+ referred_schema_fn: Optional[
+ Callable[
+ [Table, Optional[str], ForeignKeyConstraint, Optional[str]],
+ Optional[str],
+ ]
+ ] = None,
+ name: Optional[str] = None,
+ ) -> Table:
"""Return a copy of this :class:`_schema.Table` associated with a
different :class:`_schema.MetaData`.
@@ -1163,11 +1237,16 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
"""
if name is None:
name = self.name
+
+ actual_schema: Optional[str]
+
if schema is RETAIN_SCHEMA:
- schema = self.schema
+ actual_schema = self.schema
elif schema is None:
- schema = metadata.schema
- key = _get_table_key(name, schema)
+ actual_schema = metadata.schema
+ else:
+ actual_schema = schema # type: ignore
+ key = _get_table_key(name, actual_schema)
if key in metadata.tables:
util.warn(
"Table '%s' already exists within the given "
@@ -1177,11 +1256,11 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
args = []
for col in self.columns:
- args.append(col._copy(schema=schema))
+ args.append(col._copy(schema=actual_schema))
table = Table(
name,
metadata,
- schema=schema,
+ schema=actual_schema,
comment=self.comment,
*args,
**self.kwargs,
@@ -1191,11 +1270,13 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
referred_schema = const._referred_schema
if referred_schema_fn:
fk_constraint_schema = referred_schema_fn(
- self, schema, const, referred_schema
+ self, actual_schema, const, referred_schema
)
else:
fk_constraint_schema = (
- schema if referred_schema == self.schema else None
+ actual_schema
+ if referred_schema == self.schema
+ else None
)
table.append_constraint(
const._copy(
@@ -1209,7 +1290,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
continue
table.append_constraint(
- const._copy(schema=schema, target_table=table)
+ const._copy(schema=actual_schema, target_table=table)
)
for index in self.indexes:
# skip indexes that would be generated
@@ -1221,7 +1302,7 @@ class Table(DialectKWArgs, HasSchemaAttr, TableClause):
unique=index.unique,
*[
_copy_expression(expr, self, table)
- for expr in index.expressions
+ for expr in index._table_bound_expressions
],
_table=table,
**index.kwargs,
@@ -1239,101 +1320,137 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
@overload
def __init__(
- self: "Column[None]",
- __name: str,
+ self,
*args: SchemaEventTarget,
- autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = ...,
- default: Optional[Any] = ...,
- doc: Optional[str] = ...,
- key: Optional[str] = ...,
- index: Optional[bool] = ...,
- info: MutableMapping[Any, Any] = ...,
- nullable: bool = ...,
- onupdate: Optional[Any] = ...,
- primary_key: bool = ...,
- server_default: Optional[_ServerDefaultType] = ...,
- server_onupdate: Optional["FetchedValue"] = ...,
- quote: Optional[bool] = ...,
- unique: Optional[bool] = ...,
- system: bool = ...,
- comment: Optional[str] = ...,
- **kwargs: Any,
- ) -> None:
+ autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = "auto",
+ default: Optional[Any] = None,
+ doc: Optional[str] = None,
+ key: Optional[str] = None,
+ index: Optional[bool] = None,
+ unique: Optional[bool] = None,
+ info: Optional[_InfoType] = None,
+ nullable: Optional[
+ Union[bool, Literal[SchemaConst.NULL_UNSPECIFIED]]
+ ] = NULL_UNSPECIFIED,
+ onupdate: Optional[Any] = None,
+ primary_key: bool = False,
+ server_default: Optional[_ServerDefaultType] = None,
+ server_onupdate: Optional[FetchedValue] = None,
+ quote: Optional[bool] = None,
+ system: bool = False,
+ comment: Optional[str] = None,
+ _proxies: Optional[Any] = None,
+ **dialect_kwargs: Any,
+ ):
...
@overload
def __init__(
- self: "Column[None]",
+ self,
+ __name: str,
*args: SchemaEventTarget,
- autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = ...,
- default: Optional[Any] = ...,
- doc: Optional[str] = ...,
- key: Optional[str] = ...,
- index: Optional[bool] = ...,
- info: MutableMapping[Any, Any] = ...,
- nullable: bool = ...,
- onupdate: Optional[Any] = ...,
- primary_key: bool = ...,
- server_default: Optional[_ServerDefaultType] = ...,
- server_onupdate: Optional["FetchedValue"] = ...,
- quote: Optional[bool] = ...,
- unique: Optional[bool] = ...,
- system: bool = ...,
- comment: Optional[str] = ...,
- **kwargs: Any,
- ) -> None:
+ autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = "auto",
+ default: Optional[Any] = None,
+ doc: Optional[str] = None,
+ key: Optional[str] = None,
+ index: Optional[bool] = None,
+ unique: Optional[bool] = None,
+ info: Optional[_InfoType] = None,
+ nullable: Optional[
+ Union[bool, Literal[SchemaConst.NULL_UNSPECIFIED]]
+ ] = NULL_UNSPECIFIED,
+ onupdate: Optional[Any] = None,
+ primary_key: bool = False,
+ server_default: Optional[_ServerDefaultType] = None,
+ server_onupdate: Optional[FetchedValue] = None,
+ quote: Optional[bool] = None,
+ system: bool = False,
+ comment: Optional[str] = None,
+ _proxies: Optional[Any] = None,
+ **dialect_kwargs: Any,
+ ):
...
@overload
def __init__(
self,
- __name: str,
- __type: Union[Type["TypeEngine[_T]"], "TypeEngine[_T]"],
+ __type: _TypeEngineArgument[_T],
*args: SchemaEventTarget,
- autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = ...,
- default: Optional[Any] = ...,
- doc: Optional[str] = ...,
- key: Optional[str] = ...,
- index: Optional[bool] = ...,
- info: MutableMapping[Any, Any] = ...,
- nullable: bool = ...,
- onupdate: Optional[Any] = ...,
- primary_key: bool = ...,
- server_default: Optional[_ServerDefaultType] = ...,
- server_onupdate: Optional["FetchedValue"] = ...,
- quote: Optional[bool] = ...,
- unique: Optional[bool] = ...,
- system: bool = ...,
- comment: Optional[str] = ...,
- **kwargs: Any,
- ) -> None:
+ autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = "auto",
+ default: Optional[Any] = None,
+ doc: Optional[str] = None,
+ key: Optional[str] = None,
+ index: Optional[bool] = None,
+ unique: Optional[bool] = None,
+ info: Optional[_InfoType] = None,
+ nullable: Optional[
+ Union[bool, Literal[SchemaConst.NULL_UNSPECIFIED]]
+ ] = NULL_UNSPECIFIED,
+ onupdate: Optional[Any] = None,
+ primary_key: bool = False,
+ server_default: Optional[_ServerDefaultType] = None,
+ server_onupdate: Optional[FetchedValue] = None,
+ quote: Optional[bool] = None,
+ system: bool = False,
+ comment: Optional[str] = None,
+ _proxies: Optional[Any] = None,
+ **dialect_kwargs: Any,
+ ):
...
@overload
def __init__(
self,
- __type: Union[Type["TypeEngine[_T]"], "TypeEngine[_T]"],
+ __name: str,
+ __type: _TypeEngineArgument[_T],
*args: SchemaEventTarget,
- autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = ...,
- default: Optional[Any] = ...,
- doc: Optional[str] = ...,
- key: Optional[str] = ...,
- index: Optional[bool] = ...,
- info: MutableMapping[Any, Any] = ...,
- nullable: bool = ...,
- onupdate: Optional[Any] = ...,
- primary_key: bool = ...,
- server_default: Optional[_ServerDefaultType] = ...,
- server_onupdate: Optional["FetchedValue"] = ...,
- quote: Optional[bool] = ...,
- unique: Optional[bool] = ...,
- system: bool = ...,
- comment: Optional[str] = ...,
- **kwargs: Any,
- ) -> None:
+ autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = "auto",
+ default: Optional[Any] = None,
+ doc: Optional[str] = None,
+ key: Optional[str] = None,
+ index: Optional[bool] = None,
+ unique: Optional[bool] = None,
+ info: Optional[_InfoType] = None,
+ nullable: Optional[
+ Union[bool, Literal[SchemaConst.NULL_UNSPECIFIED]]
+ ] = NULL_UNSPECIFIED,
+ onupdate: Optional[Any] = None,
+ primary_key: bool = False,
+ server_default: Optional[_ServerDefaultType] = None,
+ server_onupdate: Optional[FetchedValue] = None,
+ quote: Optional[bool] = None,
+ system: bool = False,
+ comment: Optional[str] = None,
+ _proxies: Optional[Any] = None,
+ **dialect_kwargs: Any,
+ ):
...
- def __init__(self, *args: Any, **kwargs: Any):
+ def __init__(
+ self,
+ *args: Union[str, _TypeEngineArgument[_T], SchemaEventTarget],
+ name: Optional[str] = None,
+ type_: Optional[_TypeEngineArgument[_T]] = None,
+ autoincrement: Union[bool, Literal["auto", "ignore_fk"]] = "auto",
+ default: Optional[Any] = None,
+ doc: Optional[str] = None,
+ key: Optional[str] = None,
+ index: Optional[bool] = None,
+ unique: Optional[bool] = None,
+ info: Optional[_InfoType] = None,
+ nullable: Optional[
+ Union[bool, Literal[SchemaConst.NULL_UNSPECIFIED]]
+ ] = NULL_UNSPECIFIED,
+ onupdate: Optional[Any] = None,
+ primary_key: bool = False,
+ server_default: Optional[_ServerDefaultType] = None,
+ server_onupdate: Optional[FetchedValue] = None,
+ quote: Optional[bool] = None,
+ system: bool = False,
+ comment: Optional[str] = None,
+ _proxies: Optional[Any] = None,
+ **dialect_kwargs: Any,
+ ):
r"""
Construct a new ``Column`` object.
@@ -1836,8 +1953,6 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
""" # noqa: E501, RST201, RST202
- name = kwargs.pop("name", None)
- type_ = kwargs.pop("type_", None)
l_args = list(args)
del args
@@ -1847,7 +1962,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
raise exc.ArgumentError(
"May not pass name positionally and as a keyword."
)
- name = l_args.pop(0)
+ name = l_args.pop(0) # type: ignore
if l_args:
coltype = l_args[0]
@@ -1856,52 +1971,49 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
raise exc.ArgumentError(
"May not pass type_ positionally and as a keyword."
)
- type_ = l_args.pop(0)
+ type_ = l_args.pop(0) # type: ignore
if name is not None:
- name = quoted_name(name, kwargs.pop("quote", None))
- elif "quote" in kwargs:
+ name = quoted_name(name, quote)
+ elif quote is not None:
raise exc.ArgumentError(
"Explicit 'name' is required when " "sending 'quote' argument"
)
- super(Column, self).__init__(name, type_)
- self.key = kwargs.pop("key", name)
- self.primary_key = primary_key = kwargs.pop("primary_key", False)
+ # name = None is expected to be an interim state
+ # note this use case is legacy now that ORM declarative has a
+ # dedicated "column" construct local to the ORM
+ super(Column, self).__init__(name, type_) # type: ignore
- self._user_defined_nullable = udn = kwargs.pop(
- "nullable", NULL_UNSPECIFIED
- )
+ self.key = key if key is not None else name # type: ignore
+ self.primary_key = primary_key
+
+ self._user_defined_nullable = udn = nullable
if udn is not NULL_UNSPECIFIED:
self.nullable = udn
else:
self.nullable = not primary_key
- default = kwargs.pop("default", None)
- onupdate = kwargs.pop("onupdate", None)
-
- self.server_default = kwargs.pop("server_default", None)
- self.server_onupdate = kwargs.pop("server_onupdate", None)
-
# these default to None because .index and .unique is *not*
# an informational flag about Column - there can still be an
# Index or UniqueConstraint referring to this Column.
- self.index = kwargs.pop("index", None)
- self.unique = kwargs.pop("unique", None)
+ self.index = index
+ self.unique = unique
- self.system = kwargs.pop("system", False)
- self.doc = kwargs.pop("doc", None)
- self.autoincrement = kwargs.pop("autoincrement", "auto")
+ self.system = system
+ self.doc = doc
+ self.autoincrement = autoincrement
self.constraints = set()
self.foreign_keys = set()
- self.comment = kwargs.pop("comment", None)
+ self.comment = comment
self.computed = None
self.identity = None
# check if this Column is proxying another column
- if "_proxies" in kwargs:
- self._proxies = kwargs.pop("_proxies")
+
+ if _proxies is not None:
+ self._proxies = _proxies
else:
# otherwise, add DDL-related events
if isinstance(self.type, SchemaEventTarget):
@@ -1928,6 +2040,9 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
else:
self.onpudate = None
+ self.server_default = server_default
+ self.server_onupdate = server_onupdate
+
if self.server_default is not None:
if isinstance(self.server_default, FetchedValue):
l_args.append(self.server_default._as_for_update(False))
@@ -1941,14 +2056,14 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
l_args.append(
DefaultClause(self.server_onupdate, for_update=True)
)
- self._init_items(*l_args)
+ self._init_items(*cast(_typing_Sequence[SchemaItem], l_args))
util.set_creation_order(self)
- if "info" in kwargs:
- self.info = kwargs.pop("info")
+ if info is not None:
+ self.info = info
- self._extra_kwargs(**kwargs)
+ self._extra_kwargs(**dialect_kwargs)
table: Table
@@ -1967,7 +2082,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
"""
- index: bool
+ index: Optional[bool]
"""The value of the :paramref:`_schema.Column.index` parameter.
Does not indicate if this :class:`_schema.Column` is actually indexed
@@ -1978,7 +2093,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
:attr:`_schema.Table.indexes`
"""
- unique: bool
+ unique: Optional[bool]
"""The value of the :paramref:`_schema.Column.unique` parameter.
Does not indicate if this :class:`_schema.Column` is actually subject to
@@ -1993,10 +2108,14 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
"""
- def _extra_kwargs(self, **kwargs):
+ computed: Optional[Computed]
+
+ identity: Optional[Identity]
+
+ def _extra_kwargs(self, **kwargs: Any) -> None:
self._validate_dialect_kwargs(kwargs)
- def __str__(self):
+ def __str__(self) -> str:
if self.name is None:
return "(no name)"
elif self.table is not None:
@@ -2007,7 +2126,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
else:
return self.description
- def references(self, column):
+ def references(self, column: Column[Any]) -> bool:
"""Return True if this Column references the given column via foreign
key."""
@@ -2017,10 +2136,10 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
else:
return False
- def append_foreign_key(self, fk):
+ def append_foreign_key(self, fk: ForeignKey) -> None:
fk._set_parent_with_dispatch(self)
- def __repr__(self):
+ def __repr__(self) -> str:
kwarg = []
if self.key != self.name:
kwarg.append("key")
@@ -2051,7 +2170,14 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
+ ["%s=%s" % (k, repr(getattr(self, k))) for k in kwarg]
)
- def _set_parent(self, table, allow_replacements=True):
+ def _set_parent(
+ self,
+ parent: SchemaEventTarget,
+ allow_replacements: bool = True,
+ **kw: Any,
+ ) -> None:
+ table = parent
+ assert isinstance(table, Table)
if not self.name:
raise exc.ArgumentError(
"Column must be constructed with a non-blank name or "
@@ -2071,7 +2197,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
)
if self.key in table._columns:
- col = table._columns.get(self.key)
+ col = table._columns[self.key]
if col is not self:
if not allow_replacements:
util.warn_deprecated(
@@ -2139,7 +2265,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
"An column cannot specify both Identity and Sequence."
)
- def _setup_on_memoized_fks(self, fn):
+ def _setup_on_memoized_fks(self, fn: Callable[..., Any]) -> None:
fk_keys = [
((self.table.key, self.key), False),
((self.table.key, self.name), True),
@@ -2150,7 +2276,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
if fk.link_to_name is link_to_name:
fn(fk)
- def _on_table_attach(self, fn):
+ def _on_table_attach(self, fn: Callable[..., Any]) -> None:
if self.table is not None:
fn(self, self.table)
else:
@@ -2161,10 +2287,10 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
"The :meth:`_schema.Column.copy` method is deprecated "
"and will be removed in a future release.",
)
- def copy(self, **kw):
+ def copy(self, **kw: Any) -> Column[Any]:
return self._copy(**kw)
- def _copy(self, **kw):
+ def _copy(self, **kw: Any) -> Column[Any]:
"""Create a copy of this ``Column``, uninitialized.
This is used in :meth:`_schema.Table.to_metadata`.
@@ -2172,9 +2298,15 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
"""
# Constraint objects plus non-constraint-bound ForeignKey objects
- args = [
- c._copy(**kw) for c in self.constraints if not c._type_bound
- ] + [c._copy(**kw) for c in self.foreign_keys if not c.constraint]
+ args: List[SchemaItem] = [
+ c._copy(**kw)
+ for c in self.constraints
+ if not c._type_bound # type: ignore
+ ] + [
+ c._copy(**kw) # type: ignore
+ for c in self.foreign_keys
+ if not c.constraint
+ ]
# ticket #5276
column_kwargs = {}
@@ -2223,8 +2355,13 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
return self._schema_item_copy(c)
def _make_proxy(
- self, selectable, name=None, key=None, name_is_truncatable=False, **kw
- ):
+ self,
+ selectable: FromClause,
+ name: Optional[str] = None,
+ key: Optional[str] = None,
+ name_is_truncatable: bool = False,
+ **kw: Any,
+ ) -> Tuple[str, ColumnClause[_T]]:
"""Create a *proxy* for this column.
This is a copy of this ``Column`` referenced by a different parent
@@ -2272,9 +2409,9 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
if selectable._is_clone_of is not None:
c._is_clone_of = selectable._is_clone_of.columns.get(c.key)
if self.primary_key:
- selectable.primary_key.add(c)
+ selectable.primary_key.add(c) # type: ignore
if fk:
- selectable.foreign_keys.update(fk)
+ selectable.foreign_keys.update(fk) # type: ignore
return c.key, c
@@ -2326,17 +2463,17 @@ class ForeignKey(DialectKWArgs, SchemaItem):
def __init__(
self,
- column: Union[str, Column[Any], SQLCoreOperations[Any]],
- _constraint: Optional["ForeignKeyConstraint"] = None,
+ column: _DDLColumnArgument,
+ _constraint: Optional[ForeignKeyConstraint] = None,
use_alter: bool = False,
name: Optional[str] = None,
onupdate: Optional[str] = None,
ondelete: Optional[str] = None,
deferrable: Optional[bool] = None,
- initially: Optional[bool] = None,
+ initially: Optional[str] = None,
link_to_name: bool = False,
match: Optional[str] = None,
- info: Optional[Dict[Any, Any]] = None,
+ info: Optional[_InfoType] = None,
**dialect_kw: Any,
):
r"""
@@ -2446,7 +2583,7 @@ class ForeignKey(DialectKWArgs, SchemaItem):
self.info = info
self._unvalidated_dialect_kw = dialect_kw
- def __repr__(self):
+ def __repr__(self) -> str:
return "ForeignKey(%r)" % self._get_colspec()
@util.deprecated(
@@ -2454,10 +2591,10 @@ class ForeignKey(DialectKWArgs, SchemaItem):
"The :meth:`_schema.ForeignKey.copy` method is deprecated "
"and will be removed in a future release.",
)
- def copy(self, schema=None, **kw):
+ def copy(self, *, schema: Optional[str] = None, **kw: Any) -> ForeignKey:
return self._copy(schema=schema, **kw)
- def _copy(self, schema=None, **kw):
+ def _copy(self, *, schema: Optional[str] = None, **kw: Any) -> ForeignKey:
"""Produce a copy of this :class:`_schema.ForeignKey` object.
The new :class:`_schema.ForeignKey` will not be bound
@@ -2487,7 +2624,17 @@ class ForeignKey(DialectKWArgs, SchemaItem):
)
return self._schema_item_copy(fk)
- def _get_colspec(self, schema=None, table_name=None, _is_copy=False):
+ def _get_colspec(
+ self,
+ schema: Optional[
+ Union[
+ str,
+ Literal[SchemaConst.RETAIN_SCHEMA, SchemaConst.BLANK_SCHEMA],
+ ]
+ ] = None,
+ table_name: Optional[str] = None,
+ _is_copy: bool = False,
+ ) -> str:
"""Return a string based 'column specification' for this
:class:`_schema.ForeignKey`.
@@ -2523,13 +2670,14 @@ class ForeignKey(DialectKWArgs, SchemaItem):
self._table_column.key,
)
else:
+ assert isinstance(self._colspec, str)
return self._colspec
@property
- def _referred_schema(self):
+ def _referred_schema(self) -> Optional[str]:
return self._column_tokens[0]
- def _table_key(self):
+ def _table_key(self) -> Any:
if self._table_column is not None:
if self._table_column.table is None:
return None
@@ -2541,16 +2689,16 @@ class ForeignKey(DialectKWArgs, SchemaItem):
target_fullname = property(_get_colspec)
- def references(self, table):
+ def references(self, table: Table) -> bool:
"""Return True if the given :class:`_schema.Table`
is referenced by this
:class:`_schema.ForeignKey`."""
return table.corresponding_column(self.column) is not None
- def get_referent(self, table):
+ def get_referent(self, table: FromClause) -> Optional[Column[Any]]:
"""Return the :class:`_schema.Column` in the given
- :class:`_schema.Table`
+ :class:`_schema.Table` (or any :class:`.FromClause`)
referenced by this :class:`_schema.ForeignKey`.
Returns None if this :class:`_schema.ForeignKey`
@@ -2559,10 +2707,10 @@ class ForeignKey(DialectKWArgs, SchemaItem):
"""
- return table.corresponding_column(self.column)
+ return table.columns.corresponding_column(self.column)
@util.memoized_property
- def _column_tokens(self):
+ def _column_tokens(self) -> Tuple[Optional[str], str, Optional[str]]:
"""parse a string-based _colspec into its component parts."""
m = self._get_colspec().split(".")
@@ -2592,7 +2740,7 @@ class ForeignKey(DialectKWArgs, SchemaItem):
schema = None
return schema, tname, colname
- def _resolve_col_tokens(self):
+ def _resolve_col_tokens(self) -> Tuple[Table, str, Optional[str]]:
if self.parent is None:
raise exc.InvalidRequestError(
"this ForeignKey object does not yet have a "
@@ -2627,7 +2775,9 @@ class ForeignKey(DialectKWArgs, SchemaItem):
tablekey = _get_table_key(tname, schema)
return parenttable, tablekey, colname
- def _link_to_col_by_colstring(self, parenttable, table, colname):
+ def _link_to_col_by_colstring(
+ self, parenttable: Table, table: Table, colname: Optional[str]
+ ) -> Column[Any]:
_column = None
if colname is None:
# colname is None in the case that ForeignKey argument
@@ -2661,7 +2811,7 @@ class ForeignKey(DialectKWArgs, SchemaItem):
return _column
- def _set_target_column(self, column):
+ def _set_target_column(self, column: Column[Any]) -> None:
assert self.parent is not None
# propagate TypeEngine to parent if it didn't have one
@@ -2671,16 +2821,16 @@ class ForeignKey(DialectKWArgs, SchemaItem):
# super-edgy case, if other FKs point to our column,
# they'd get the type propagated out also.
- def set_type(fk):
+ def set_type(fk: ForeignKey) -> None:
if fk.parent.type._isnull:
fk.parent.type = column.type
self.parent._setup_on_memoized_fks(set_type)
- self.column = column
+ self.column = column # type: ignore
- @util.memoized_property
- def column(self):
+ @util.ro_memoized_property
+ def column(self) -> Column[Any]:
"""Return the target :class:`_schema.Column` referenced by this
:class:`_schema.ForeignKey`.
@@ -2689,6 +2839,8 @@ class ForeignKey(DialectKWArgs, SchemaItem):
"""
+ _column: Column[Any]
+
if isinstance(self._colspec, str):
parenttable, tablekey, colname = self._resolve_col_tokens()
@@ -2730,14 +2882,14 @@ class ForeignKey(DialectKWArgs, SchemaItem):
self.parent.foreign_keys.add(self)
self.parent._on_table_attach(self._set_table)
- def _set_remote_table(self, table):
+ def _set_remote_table(self, table: Table) -> None:
parenttable, _, colname = self._resolve_col_tokens()
_column = self._link_to_col_by_colstring(parenttable, table, colname)
self._set_target_column(_column)
assert self.constraint is not None
self.constraint._validate_dest_table(table)
- def _remove_from_metadata(self, metadata):
+ def _remove_from_metadata(self, metadata: MetaData) -> None:
parenttable, table_key, colname = self._resolve_col_tokens()
fk_key = (table_key, colname)
@@ -2745,7 +2897,7 @@ class ForeignKey(DialectKWArgs, SchemaItem):
# TODO: no test coverage for self not in memos
metadata._fk_memos[fk_key].remove(self)
- def _set_table(self, column, table):
+ def _set_table(self, column: Column[Any], table: Table) -> None:
# standalone ForeignKey - create ForeignKeyConstraint
# on the hosting Table when attached to the Table.
assert isinstance(table, Table)
@@ -2821,6 +2973,7 @@ class DefaultGenerator(Executable, SchemaItem):
__visit_name__ = "default_generator"
+ _is_default_generator = True
is_sequence = False
is_server_default = False
is_clause_element = False
@@ -2828,7 +2981,7 @@ class DefaultGenerator(Executable, SchemaItem):
is_scalar = False
column: Optional[Column[Any]]
- def __init__(self, for_update=False):
+ def __init__(self, for_update: bool = False) -> None:
self.for_update = for_update
def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
@@ -2841,8 +2994,27 @@ class DefaultGenerator(Executable, SchemaItem):
self.column.default = self
def _execute_on_connection(
- self, connection, distilled_params, execution_options
- ):
+ self,
+ connection: Connection,
+ distilled_params: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptionsParameter,
+ ) -> Any:
+ util.warn_deprecated(
+ "Using the .execute() method to invoke a "
+ "DefaultGenerator object is deprecated; please use "
+ "the .scalar() method.",
+ "2.0",
+ )
+ return self._execute_on_scalar(
+ connection, distilled_params, execution_options
+ )
+
+ def _execute_on_scalar(
+ self,
+ connection: Connection,
+ distilled_params: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptionsParameter,
+ ) -> Any:
return connection._execute_default(
self, distilled_params, execution_options
)
@@ -2933,7 +3105,7 @@ class ColumnDefault(DefaultGenerator, ABC):
return object.__new__(cls)
- def __repr__(self):
+ def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.arg!r})"
@@ -2946,7 +3118,7 @@ class ScalarElementColumnDefault(ColumnDefault):
is_scalar = True
- def __init__(self, arg: Any, for_update: bool = False):
+ def __init__(self, arg: Any, for_update: bool = False) -> None:
self.for_update = for_update
self.arg = arg
@@ -2970,13 +3142,13 @@ class ColumnElementColumnDefault(ColumnDefault):
self,
arg: _SQLExprDefault,
for_update: bool = False,
- ):
+ ) -> None:
self.for_update = for_update
self.arg = arg
@util.memoized_property
@util.preload_module("sqlalchemy.sql.sqltypes")
- def _arg_is_typed(self):
+ def _arg_is_typed(self) -> bool:
sqltypes = util.preloaded.sql_sqltypes
return not isinstance(self.arg.type, sqltypes.NullType)
@@ -3001,7 +3173,7 @@ class CallableColumnDefault(ColumnDefault):
self,
arg: Union[_CallableColumnDefaultProtocol, Callable[[], Any]],
for_update: bool = False,
- ):
+ ) -> None:
self.for_update = for_update
self.arg = self._maybe_wrap_callable(arg)
@@ -3048,16 +3220,16 @@ class IdentityOptions:
def __init__(
self,
- start=None,
- increment=None,
- minvalue=None,
- maxvalue=None,
- nominvalue=None,
- nomaxvalue=None,
- cycle=None,
- cache=None,
- order=None,
- ):
+ start: Optional[int] = None,
+ increment: Optional[int] = None,
+ minvalue: Optional[int] = None,
+ maxvalue: Optional[int] = None,
+ nominvalue: Optional[bool] = None,
+ nomaxvalue: Optional[bool] = None,
+ cycle: Optional[bool] = None,
+ cache: Optional[bool] = None,
+ order: Optional[bool] = None,
+ ) -> None:
"""Construct a :class:`.IdentityOptions` object.
See the :class:`.Sequence` documentation for a complete description
@@ -3125,28 +3297,29 @@ class Sequence(HasSchemaAttr, IdentityOptions, DefaultGenerator):
is_sequence = True
- column: Optional[Column[Any]] = None
+ column: Optional[Column[Any]]
+ data_type: Optional[TypeEngine[int]]
def __init__(
self,
- name,
- start=None,
- increment=None,
- minvalue=None,
- maxvalue=None,
- nominvalue=None,
- nomaxvalue=None,
- cycle=None,
- schema=None,
- cache=None,
- order=None,
- data_type=None,
- optional=False,
- quote=None,
- metadata=None,
- quote_schema=None,
- for_update=False,
- ):
+ name: str,
+ start: Optional[int] = None,
+ increment: Optional[int] = None,
+ minvalue: Optional[int] = None,
+ maxvalue: Optional[int] = None,
+ nominvalue: Optional[bool] = None,
+ nomaxvalue: Optional[bool] = None,
+ cycle: Optional[bool] = None,
+ schema: Optional[Union[str, Literal[SchemaConst.BLANK_SCHEMA]]] = None,
+ cache: Optional[bool] = None,
+ order: Optional[bool] = None,
+ data_type: Optional[_TypeEngineArgument[int]] = None,
+ optional: bool = False,
+ quote: Optional[bool] = None,
+ metadata: Optional[MetaData] = None,
+ quote_schema: Optional[bool] = None,
+ for_update: bool = False,
+ ) -> None:
"""Construct a :class:`.Sequence` object.
:param name: the name of the sequence.
@@ -3298,6 +3471,7 @@ class Sequence(HasSchemaAttr, IdentityOptions, DefaultGenerator):
cache=cache,
order=order,
)
+ self.column = None
self.name = quoted_name(name, quote)
self.optional = optional
if schema is BLANK_SCHEMA:
@@ -3316,7 +3490,7 @@ class Sequence(HasSchemaAttr, IdentityOptions, DefaultGenerator):
self.data_type = None
@util.preload_module("sqlalchemy.sql.functions")
- def next_value(self):
+ def next_value(self) -> Function[int]:
"""Return a :class:`.next_value` function element
which will render the appropriate increment function
for this :class:`.Sequence` within any SQL expression.
@@ -3324,28 +3498,30 @@ class Sequence(HasSchemaAttr, IdentityOptions, DefaultGenerator):
"""
return util.preloaded.sql_functions.func.next_value(self)
- def _set_parent(self, column, **kw):
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ column = parent
+ assert isinstance(column, Column)
super(Sequence, self)._set_parent(column)
column._on_table_attach(self._set_table)
- def _set_table(self, column, table):
+ def _set_table(self, column: Column[Any], table: Table) -> None:
self._set_metadata(table.metadata)
- def _set_metadata(self, metadata):
+ def _set_metadata(self, metadata: MetaData) -> None:
self.metadata = metadata
self.metadata._sequences[self._key] = self
- def create(self, bind, checkfirst=True):
+ def create(self, bind: _CreateDropBind, checkfirst: bool = True) -> None:
"""Creates this sequence in the database."""
bind._run_ddl_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
- def drop(self, bind, checkfirst=True):
+ def drop(self, bind: _CreateDropBind, checkfirst: bool = True) -> None:
"""Drops this sequence from the database."""
bind._run_ddl_visitor(ddl.SchemaDropper, self, checkfirst=checkfirst)
- def _not_a_column_expr(self):
+ def _not_a_column_expr(self) -> NoReturn:
raise exc.InvalidRequestError(
"This %s cannot be used directly "
"as a column expression. Use func.next_value(sequence) "
@@ -3380,30 +3556,34 @@ class FetchedValue(SchemaEventTarget):
has_argument = False
is_clause_element = False
- def __init__(self, for_update=False):
+ column: Optional[Column[Any]]
+
+ def __init__(self, for_update: bool = False) -> None:
self.for_update = for_update
- def _as_for_update(self, for_update):
+ def _as_for_update(self, for_update: bool) -> FetchedValue:
if for_update == self.for_update:
return self
else:
- return self._clone(for_update)
+ return self._clone(for_update) # type: ignore
- def _clone(self, for_update):
+ def _clone(self, for_update: bool) -> Any:
n = self.__class__.__new__(self.__class__)
n.__dict__.update(self.__dict__)
n.__dict__.pop("column", None)
n.for_update = for_update
return n
- def _set_parent(self, column, **kw):
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ column = parent
+ assert isinstance(column, Column)
self.column = column
if self.for_update:
self.column.server_onupdate = self
else:
self.column.server_default = self
- def __repr__(self):
+ def __repr__(self) -> str:
return util.generic_repr(self)
@@ -3431,13 +3611,18 @@ class DefaultClause(FetchedValue):
has_argument = True
- def __init__(self, arg, for_update=False, _reflected=False):
+ def __init__(
+ self,
+ arg: Union[str, ClauseElement, TextClause],
+ for_update: bool = False,
+ _reflected: bool = False,
+ ) -> None:
util.assert_arg_type(arg, (str, ClauseElement, TextClause), "arg")
super(DefaultClause, self).__init__(for_update)
self.arg = arg
self.reflected = _reflected
- def __repr__(self):
+ def __repr__(self) -> str:
return "DefaultClause(%r, for_update=%r)" % (self.arg, self.for_update)
@@ -3460,14 +3645,14 @@ class Constraint(DialectKWArgs, HasConditionalDDL, SchemaItem):
def __init__(
self,
- name=None,
- deferrable=None,
- initially=None,
- _create_rule=None,
- info=None,
- _type_bound=False,
- **dialect_kw,
- ):
+ name: Optional[str] = None,
+ deferrable: Optional[bool] = None,
+ initially: Optional[str] = None,
+ info: Optional[_InfoType] = None,
+ _create_rule: Optional[Any] = None,
+ _type_bound: bool = False,
+ **dialect_kw: Any,
+ ) -> None:
r"""Create a SQL constraint.
:param name:
@@ -3510,7 +3695,9 @@ class Constraint(DialectKWArgs, HasConditionalDDL, SchemaItem):
util.set_creation_order(self)
self._validate_dialect_kwargs(dialect_kw)
- def _should_create_for_compiler(self, compiler, **kw):
+ def _should_create_for_compiler(
+ self, compiler: DDLCompiler, **kw: Any
+ ) -> bool:
if self._create_rule is not None and not self._create_rule(compiler):
return False
elif self._ddl_if is not None:
@@ -3521,7 +3708,7 @@ class Constraint(DialectKWArgs, HasConditionalDDL, SchemaItem):
return True
@property
- def table(self):
+ def table(self) -> Table:
try:
if isinstance(self.parent, Table):
return self.parent
@@ -3532,7 +3719,8 @@ class Constraint(DialectKWArgs, HasConditionalDDL, SchemaItem):
"mean to call table.append_constraint(constraint) ?"
)
- def _set_parent(self, parent, **kw):
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ assert isinstance(parent, (Table, Column))
self.parent = parent
parent.constraints.add(self)
@@ -3541,10 +3729,10 @@ class Constraint(DialectKWArgs, HasConditionalDDL, SchemaItem):
"The :meth:`_schema.Constraint.copy` method is deprecated "
"and will be removed in a future release.",
)
- def copy(self, **kw):
- return self._copy(**kw)
+ def copy(self: Self, **kw: Any) -> Self:
+ return self._copy(**kw) # type: ignore
- def _copy(self, **kw):
+ def _copy(self: Self, **kw: Any) -> Self:
raise NotImplementedError()
@@ -3561,6 +3749,8 @@ class ColumnCollectionMixin:
_allow_multiple_tables = False
+ _pending_colargs: List[Optional[Union[str, Column[Any]]]]
+
if TYPE_CHECKING:
def _set_parent_with_dispatch(
@@ -3568,18 +3758,28 @@ class ColumnCollectionMixin:
) -> None:
...
- def __init__(self, *columns, **kw):
- _autoattach = kw.pop("_autoattach", True)
- self._column_flag = kw.pop("_column_flag", False)
+ def __init__(
+ self,
+ *columns: _DDLColumnArgument,
+ _autoattach: bool = True,
+ _column_flag: bool = False,
+ _gather_expressions: Optional[
+ List[Union[str, ColumnElement[Any]]]
+ ] = None,
+ ) -> None:
+ self._column_flag = _column_flag
self._columns = DedupeColumnCollection()
- processed_expressions = kw.pop("_gather_expressions", None)
+ processed_expressions: Optional[
+ List[Union[ColumnElement[Any], str]]
+ ] = _gather_expressions
+
if processed_expressions is not None:
self._pending_colargs = []
for (
expr,
- column,
- strname,
+ _,
+ _,
add_element,
) in coercions.expect_col_expression_collection(
roles.DDLConstraintColumnRole, columns
@@ -3595,7 +3795,7 @@ class ColumnCollectionMixin:
if _autoattach and self._pending_colargs:
self._check_attach()
- def _check_attach(self, evt=False):
+ def _check_attach(self, evt: bool = False) -> None:
col_objs = [c for c in self._pending_colargs if isinstance(c, Column)]
cols_w_table = [c for c in col_objs if isinstance(c.table, Table)]
@@ -3613,7 +3813,7 @@ class ColumnCollectionMixin:
).difference(col_objs)
if not has_string_cols:
- def _col_attached(column, table):
+ def _col_attached(column: Column[Any], table: Table) -> None:
# this isinstance() corresponds with the
# isinstance() above; only want to count Table-bound
# columns
@@ -3652,15 +3852,24 @@ class ColumnCollectionMixin:
def c(self) -> ReadOnlyColumnCollection[str, Column[Any]]:
return self._columns.as_readonly()
- def _col_expressions(self, table: Table) -> List[Column[Any]]:
- return [
- table.c[col] if isinstance(col, str) else col
- for col in self._pending_colargs
- ]
+ def _col_expressions(
+ self, parent: Union[Table, Column[Any]]
+ ) -> List[Optional[Column[Any]]]:
+ if isinstance(parent, Column):
+ result: List[Optional[Column[Any]]] = [
+ c for c in self._pending_colargs if isinstance(c, Column)
+ ]
+ assert len(result) == len(self._pending_colargs)
+ return result
+ else:
+ return [
+ parent.c[col] if isinstance(col, str) else col
+ for col in self._pending_colargs
+ ]
def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
- if TYPE_CHECKING:
- assert isinstance(parent, Table)
+ assert isinstance(parent, (Table, Column))
+
for col in self._col_expressions(parent):
if col is not None:
self._columns.add(col)
@@ -3669,7 +3878,18 @@ class ColumnCollectionMixin:
class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint):
"""A constraint that proxies a ColumnCollection."""
- def __init__(self, *columns, **kw):
+ def __init__(
+ self,
+ *columns: _DDLColumnArgument,
+ name: Optional[str] = None,
+ deferrable: Optional[bool] = None,
+ initially: Optional[str] = None,
+ info: Optional[_InfoType] = None,
+ _autoattach: bool = True,
+ _column_flag: bool = False,
+ _gather_expressions: Optional[List[_DDLColumnArgument]] = None,
+ **dialect_kw: Any,
+ ) -> None:
r"""
:param \*columns:
A sequence of column names or Column objects.
@@ -3685,13 +3905,19 @@ class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint):
Optional string. If set, emit INITIALLY <value> when issuing DDL
for this constraint.
- :param \**kw: other keyword arguments including dialect-specific
- arguments are propagated to the :class:`.Constraint` superclass.
+ :param \**dialect_kw: other keyword arguments including
+ dialect-specific arguments are propagated to the :class:`.Constraint`
+ superclass.
"""
- _autoattach = kw.pop("_autoattach", True)
- _column_flag = kw.pop("_column_flag", False)
- Constraint.__init__(self, **kw)
+ Constraint.__init__(
+ self,
+ name=name,
+ deferrable=deferrable,
+ initially=initially,
+ info=info,
+ **dialect_kw,
+ )
ColumnCollectionMixin.__init__(
self, *columns, _autoattach=_autoattach, _column_flag=_column_flag
)
@@ -3702,11 +3928,12 @@ class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint):
"""
- def _set_parent(self, table, **kw):
- Constraint._set_parent(self, table)
- ColumnCollectionMixin._set_parent(self, table)
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ assert isinstance(parent, (Column, Table))
+ Constraint._set_parent(self, parent)
+ ColumnCollectionMixin._set_parent(self, parent)
- def __contains__(self, x):
+ def __contains__(self, x: Any) -> bool:
return x in self._columns
@util.deprecated(
@@ -3714,10 +3941,20 @@ class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint):
"The :meth:`_schema.ColumnCollectionConstraint.copy` method "
"is deprecated and will be removed in a future release.",
)
- def copy(self, target_table=None, **kw):
+ def copy(
+ self,
+ *,
+ target_table: Optional[Table] = None,
+ **kw: Any,
+ ) -> ColumnCollectionConstraint:
return self._copy(target_table=target_table, **kw)
- def _copy(self, target_table=None, **kw):
+ def _copy(
+ self,
+ *,
+ target_table: Optional[Table] = None,
+ **kw: Any,
+ ) -> ColumnCollectionConstraint:
# ticket #5276
constraint_kwargs = {}
for dialect_name in self.dialect_options:
@@ -3730,6 +3967,7 @@ class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint):
dialect_name + "_" + dialect_option_key
] = dialect_option_value
+ assert isinstance(self.parent, Table)
c = self.__class__(
name=self.name,
deferrable=self.deferrable,
@@ -3742,7 +3980,7 @@ class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint):
)
return self._schema_item_copy(c)
- def contains_column(self, col):
+ def contains_column(self, col: Column[Any]) -> bool:
"""Return True if this constraint contains the given column.
Note that this object also contains an attribute ``.columns``
@@ -3777,17 +4015,17 @@ class CheckConstraint(ColumnCollectionConstraint):
)
def __init__(
self,
- sqltext,
- name=None,
- deferrable=None,
- initially=None,
- table=None,
- info=None,
- _create_rule=None,
- _autoattach=True,
- _type_bound=False,
- **kw,
- ):
+ sqltext: _TextCoercedExpressionArgument[Any],
+ name: Optional[str] = None,
+ deferrable: Optional[bool] = None,
+ initially: Optional[str] = None,
+ table: Optional[Table] = None,
+ info: Optional[_InfoType] = None,
+ _create_rule: Optional[Any] = None,
+ _autoattach: bool = True,
+ _type_bound: bool = False,
+ **dialect_kw: Any,
+ ) -> None:
r"""Construct a CHECK constraint.
:param sqltext:
@@ -3821,7 +4059,7 @@ class CheckConstraint(ColumnCollectionConstraint):
columns: List[Column[Any]] = []
visitors.traverse(self.sqltext, {}, {"column": columns.append})
- super(CheckConstraint, self).__init__(
+ super().__init__(
name=name,
deferrable=deferrable,
initially=initially,
@@ -3830,13 +4068,13 @@ class CheckConstraint(ColumnCollectionConstraint):
_type_bound=_type_bound,
_autoattach=_autoattach,
*columns,
- **kw,
+ **dialect_kw,
)
if table is not None:
self._set_parent_with_dispatch(table)
@property
- def is_column_level(self):
+ def is_column_level(self) -> bool:
return not isinstance(self.parent, Table)
@util.deprecated(
@@ -3844,10 +4082,14 @@ class CheckConstraint(ColumnCollectionConstraint):
"The :meth:`_schema.CheckConstraint.copy` method is deprecated "
"and will be removed in a future release.",
)
- def copy(self, target_table=None, **kw):
+ def copy(
+ self, *, target_table: Optional[Table] = None, **kw: Any
+ ) -> CheckConstraint:
return self._copy(target_table=target_table, **kw)
- def _copy(self, target_table=None, **kw):
+ def _copy(
+ self, *, target_table: Optional[Table] = None, **kw: Any
+ ) -> CheckConstraint:
if target_table is not None:
# note that target_table is None for the copy process of
# a column-bound CheckConstraint, so this path is not reached
@@ -3886,20 +4128,20 @@ class ForeignKeyConstraint(ColumnCollectionConstraint):
def __init__(
self,
- columns,
- refcolumns,
- name=None,
- onupdate=None,
- ondelete=None,
- deferrable=None,
- initially=None,
- use_alter=False,
- link_to_name=False,
- match=None,
- table=None,
- info=None,
- **dialect_kw,
- ):
+ columns: _typing_Sequence[_DDLColumnArgument],
+ refcolumns: _typing_Sequence[_DDLColumnArgument],
+ name: Optional[str] = None,
+ onupdate: Optional[str] = None,
+ ondelete: Optional[str] = None,
+ deferrable: Optional[bool] = None,
+ initially: Optional[str] = None,
+ use_alter: bool = False,
+ link_to_name: bool = False,
+ match: Optional[str] = None,
+ table: Optional[Table] = None,
+ info: Optional[_InfoType] = None,
+ **dialect_kw: Any,
+ ) -> None:
r"""Construct a composite-capable FOREIGN KEY.
:param columns: A sequence of local column names. The named columns
@@ -4051,19 +4293,19 @@ class ForeignKeyConstraint(ColumnCollectionConstraint):
"""
@property
- def _elements(self):
+ def _elements(self) -> util.OrderedDict[str, ForeignKey]:
# legacy - provide a dictionary view of (column_key, fk)
return util.OrderedDict(zip(self.column_keys, self.elements))
@property
- def _referred_schema(self):
+ def _referred_schema(self) -> Optional[str]:
for elem in self.elements:
return elem._referred_schema
else:
return None
@property
- def referred_table(self):
+ def referred_table(self) -> Table:
"""The :class:`_schema.Table` object to which this
:class:`_schema.ForeignKeyConstraint` references.
@@ -4076,7 +4318,7 @@ class ForeignKeyConstraint(ColumnCollectionConstraint):
"""
return self.elements[0].column.table
- def _validate_dest_table(self, table):
+ def _validate_dest_table(self, table: Table) -> None:
table_keys = set([elem._table_key() for elem in self.elements])
if None not in table_keys and len(table_keys) > 1:
elem0, elem1 = sorted(table_keys)[0:2]
@@ -4087,7 +4329,7 @@ class ForeignKeyConstraint(ColumnCollectionConstraint):
)
@property
- def column_keys(self):
+ def column_keys(self) -> _typing_Sequence[str]:
"""Return a list of string keys representing the local
columns in this :class:`_schema.ForeignKeyConstraint`.
@@ -4108,10 +4350,12 @@ class ForeignKeyConstraint(ColumnCollectionConstraint):
]
@property
- def _col_description(self):
+ def _col_description(self) -> str:
return ", ".join(self.column_keys)
- def _set_parent(self, table, **kw):
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ table = parent
+ assert isinstance(table, Table)
Constraint._set_parent(self, table)
try:
@@ -4134,10 +4378,22 @@ class ForeignKeyConstraint(ColumnCollectionConstraint):
"The :meth:`_schema.ForeignKeyConstraint.copy` method is deprecated "
"and will be removed in a future release.",
)
- def copy(self, schema=None, target_table=None, **kw):
+ def copy(
+ self,
+ *,
+ schema: Optional[str] = None,
+ target_table: Optional[Table] = None,
+ **kw: Any,
+ ) -> ForeignKeyConstraint:
return self._copy(schema=schema, target_table=target_table, **kw)
- def _copy(self, schema=None, target_table=None, **kw):
+ def _copy(
+ self,
+ *,
+ schema: Optional[str] = None,
+ target_table: Optional[Table] = None,
+ **kw: Any,
+ ) -> ForeignKeyConstraint:
fkc = ForeignKeyConstraint(
[x.parent.key for x in self.elements],
[
@@ -4241,16 +4497,34 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint):
__visit_name__ = "primary_key_constraint"
- def __init__(self, *columns, **kw):
- self._implicit_generated = kw.pop("_implicit_generated", False)
- super(PrimaryKeyConstraint, self).__init__(*columns, **kw)
+ def __init__(
+ self,
+ *columns: _DDLColumnArgument,
+ name: Optional[str] = None,
+ deferrable: Optional[bool] = None,
+ initially: Optional[str] = None,
+ info: Optional[_InfoType] = None,
+ _implicit_generated: bool = False,
+ **dialect_kw: Any,
+ ) -> None:
+ self._implicit_generated = _implicit_generated
+ super(PrimaryKeyConstraint, self).__init__(
+ *columns,
+ name=name,
+ deferrable=deferrable,
+ initially=initially,
+ info=info,
+ **dialect_kw,
+ )
- def _set_parent(self, table, **kw):
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ table = parent
+ assert isinstance(table, Table)
super(PrimaryKeyConstraint, self)._set_parent(table)
if table.primary_key is not self:
table.constraints.discard(table.primary_key)
- table.primary_key = self
+ table.primary_key = self # type: ignore
table.constraints.add(self)
table_pks = [c for c in table.c if c.primary_key]
@@ -4280,7 +4554,7 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint):
if table_pks:
self._columns.extend(table_pks)
- def _reload(self, columns):
+ def _reload(self, columns: Iterable[Column[Any]]) -> None:
"""repopulate this :class:`.PrimaryKeyConstraint` given
a set of columns.
@@ -4309,14 +4583,14 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint):
PrimaryKeyConstraint._autoincrement_column._reset(self) # type: ignore
self._set_parent_with_dispatch(self.table)
- def _replace(self, col):
+ def _replace(self, col: Column[Any]) -> None:
PrimaryKeyConstraint._autoincrement_column._reset(self) # type: ignore
self._columns.replace(col)
self.dispatch._sa_event_column_added_to_pk_constraint(self, col)
@property
- def columns_autoinc_first(self):
+ def columns_autoinc_first(self) -> List[Column[Any]]:
autoinc = self._autoincrement_column
if autoinc is not None:
@@ -4326,7 +4600,7 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint):
@util.ro_memoized_property
def _autoincrement_column(self) -> Optional[Column[Any]]:
- def _validate_autoinc(col, autoinc_true):
+ def _validate_autoinc(col: Column[Any], autoinc_true: bool) -> bool:
if col.type._type_affinity is None or not issubclass(
col.type._type_affinity, type_api.INTEGERTYPE._type_affinity
):
@@ -4478,7 +4752,21 @@ class Index(
__visit_name__ = "index"
- def __init__(self, name, *expressions, **kw):
+ table: Optional[Table]
+ expressions: _typing_Sequence[Union[str, ColumnElement[Any]]]
+ _table_bound_expressions: _typing_Sequence[ColumnElement[Any]]
+
+ def __init__(
+ self,
+ name: Optional[str],
+ *expressions: _DDLColumnArgument,
+ unique: bool = False,
+ quote: Optional[bool] = None,
+ info: Optional[_InfoType] = None,
+ _table: Optional[Table] = None,
+ _column_flag: bool = False,
+ **dialect_kw: Any,
+ ) -> None:
r"""Construct an index object.
:param name:
@@ -4503,8 +4791,8 @@ class Index(
.. versionadded:: 1.0.0
- :param \**kw: Additional keyword arguments not mentioned above are
- dialect specific, and passed in the form
+ :param \**dialect_kw: Additional keyword arguments not mentioned above
+ are dialect specific, and passed in the form
``<dialectname>_<argname>``. See the documentation regarding an
individual dialect at :ref:`dialect_toplevel` for detail on
documented arguments.
@@ -4512,20 +4800,19 @@ class Index(
"""
self.table = table = None
- self.name = quoted_name.construct(name, kw.pop("quote", None))
- self.unique = kw.pop("unique", False)
- _column_flag = kw.pop("_column_flag", False)
- if "info" in kw:
- self.info = kw.pop("info")
+ self.name = quoted_name.construct(name, quote)
+ self.unique = unique
+ if info is not None:
+ self.info = info
# TODO: consider "table" argument being public, but for
# the purpose of the fix here, it starts as private.
- if "_table" in kw:
- table = kw.pop("_table")
+ if _table is not None:
+ table = _table
- self._validate_dialect_kwargs(kw)
+ self._validate_dialect_kwargs(dialect_kw)
- self.expressions: List[ColumnElement[Any]] = []
+ self.expressions = []
# will call _set_parent() if table-bound column
# objects are present
ColumnCollectionMixin.__init__(
@@ -4534,11 +4821,12 @@ class Index(
_column_flag=_column_flag,
_gather_expressions=self.expressions,
)
-
if table is not None:
self._set_parent(table)
- def _set_parent(self, table, **kw):
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ table = parent
+ assert isinstance(table, Table)
ColumnCollectionMixin._set_parent(self, table)
if self.table is not None and table is not self.table:
@@ -4553,12 +4841,18 @@ class Index(
expressions = self.expressions
col_expressions = self._col_expressions(table)
assert len(expressions) == len(col_expressions)
- self.expressions = [
- expr if isinstance(expr, ClauseElement) else colexpr
- for expr, colexpr in zip(expressions, col_expressions)
- ]
- def create(self, bind, checkfirst=False):
+ exprs = []
+ for expr, colexpr in zip(expressions, col_expressions):
+ if isinstance(expr, ClauseElement):
+ exprs.append(expr)
+ elif colexpr is not None:
+ exprs.append(colexpr)
+ else:
+ assert False
+ self.expressions = self._table_bound_expressions = exprs
+
+ def create(self, bind: _CreateDropBind, checkfirst: bool = False) -> None:
"""Issue a ``CREATE`` statement for this
:class:`.Index`, using the given
:class:`.Connection` or :class:`.Engine`` for connectivity.
@@ -4569,9 +4863,8 @@ class Index(
"""
bind._run_ddl_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
- return self
- def drop(self, bind, checkfirst=False):
+ def drop(self, bind: _CreateDropBind, checkfirst: bool = False) -> None:
"""Issue a ``DROP`` statement for this
:class:`.Index`, using the given
:class:`.Connection` or :class:`.Engine` for connectivity.
@@ -4583,7 +4876,9 @@ class Index(
"""
bind._run_ddl_visitor(ddl.SchemaDropper, self, checkfirst=checkfirst)
- def __repr__(self):
+ def __repr__(self) -> str:
+ exprs: _typing_Sequence[Any]
+
return "Index(%s)" % (
", ".join(
[repr(self.name)]
@@ -4593,7 +4888,9 @@ class Index(
)
-DEFAULT_NAMING_CONVENTION = util.immutabledict({"ix": "ix_%(column_0_label)s"})
+DEFAULT_NAMING_CONVENTION: util.immutabledict[str, str] = util.immutabledict(
+ {"ix": "ix_%(column_0_label)s"}
+)
class MetaData(HasSchemaAttr):
@@ -4628,8 +4925,8 @@ class MetaData(HasSchemaAttr):
schema: Optional[str] = None,
quote_schema: Optional[bool] = None,
naming_convention: Optional[Dict[str, str]] = None,
- info: Optional[Dict[Any, Any]] = None,
- ):
+ info: Optional[_InfoType] = None,
+ ) -> None:
"""Create a new MetaData object.
:param schema:
@@ -4758,7 +5055,7 @@ class MetaData(HasSchemaAttr):
self._schemas: Set[str] = set()
self._sequences: Dict[str, Sequence] = {}
self._fk_memos: Dict[
- Tuple[str, str], List[ForeignKey]
+ Tuple[str, Optional[str]], List[ForeignKey]
] = collections.defaultdict(list)
tables: util.FacadeDict[str, Table]
@@ -4787,13 +5084,15 @@ class MetaData(HasSchemaAttr):
table_or_key = table_or_key.key
return table_or_key in self.tables
- def _add_table(self, name, schema, table):
+ def _add_table(
+ self, name: str, schema: Optional[str], table: Table
+ ) -> None:
key = _get_table_key(name, schema)
self.tables._insert_item(key, table)
if schema:
self._schemas.add(schema)
- def _remove_table(self, name, schema):
+ def _remove_table(self, name: str, schema: Optional[str]) -> None:
key = _get_table_key(name, schema)
removed = dict.pop(self.tables, key, None) # type: ignore
if removed is not None:
@@ -4808,7 +5107,7 @@ class MetaData(HasSchemaAttr):
]
)
- def __getstate__(self):
+ def __getstate__(self) -> Dict[str, Any]:
return {
"tables": self.tables,
"schema": self.schema,
@@ -4818,7 +5117,7 @@ class MetaData(HasSchemaAttr):
"naming_convention": self.naming_convention,
}
- def __setstate__(self, state):
+ def __setstate__(self, state: Dict[str, Any]) -> None:
self.tables = state["tables"]
self.schema = state["schema"]
self.naming_convention = state["naming_convention"]
@@ -5054,7 +5353,7 @@ class MetaData(HasSchemaAttr):
def create_all(
self,
- bind: Union[Engine, Connection, MockConnection],
+ bind: _CreateDropBind,
tables: Optional[_typing_Sequence[Table]] = None,
checkfirst: bool = True,
) -> None:
@@ -5082,7 +5381,7 @@ class MetaData(HasSchemaAttr):
def drop_all(
self,
- bind: Union[Engine, Connection, MockConnection],
+ bind: _CreateDropBind,
tables: Optional[_typing_Sequence[Table]] = None,
checkfirst: bool = True,
) -> None:
@@ -5134,10 +5433,14 @@ class Computed(FetchedValue, SchemaItem):
__visit_name__ = "computed_column"
+ column: Optional[Column[Any]]
+
@_document_text_coercion(
"sqltext", ":class:`.Computed`", ":paramref:`.Computed.sqltext`"
)
- def __init__(self, sqltext, persisted=None):
+ def __init__(
+ self, sqltext: _DDLColumnArgument, persisted: Optional[bool] = None
+ ) -> None:
"""Construct a GENERATED ALWAYS AS DDL construct to accompany a
:class:`_schema.Column`.
@@ -5170,7 +5473,9 @@ class Computed(FetchedValue, SchemaItem):
self.persisted = persisted
self.column = None
- def _set_parent(self, parent, **kw):
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ assert isinstance(parent, Column)
+
if not isinstance(
parent.server_default, (type(None), Computed)
) or not isinstance(parent.server_onupdate, (type(None), Computed)):
@@ -5183,7 +5488,7 @@ class Computed(FetchedValue, SchemaItem):
self.column.server_onupdate = self
self.column.server_default = self
- def _as_for_update(self, for_update):
+ def _as_for_update(self, for_update: bool) -> FetchedValue:
return self
@util.deprecated(
@@ -5191,10 +5496,14 @@ class Computed(FetchedValue, SchemaItem):
"The :meth:`_schema.Computed.copy` method is deprecated "
"and will be removed in a future release.",
)
- def copy(self, target_table=None, **kw):
- return self._copy(target_table, **kw)
+ def copy(
+ self, *, target_table: Optional[Table] = None, **kw: Any
+ ) -> Computed:
+ return self._copy(target_table=target_table, **kw)
- def _copy(self, target_table=None, **kw):
+ def _copy(
+ self, *, target_table: Optional[Table] = None, **kw: Any
+ ) -> Computed:
sqltext = _copy_expression(
self.sqltext,
self.column.table if self.column is not None else None,
@@ -5233,18 +5542,18 @@ class Identity(IdentityOptions, FetchedValue, SchemaItem):
def __init__(
self,
- always=False,
- on_null=None,
- start=None,
- increment=None,
- minvalue=None,
- maxvalue=None,
- nominvalue=None,
- nomaxvalue=None,
- cycle=None,
- cache=None,
- order=None,
- ):
+ always: bool = False,
+ on_null: Optional[bool] = None,
+ start: Optional[int] = None,
+ increment: Optional[int] = None,
+ minvalue: Optional[int] = None,
+ maxvalue: Optional[int] = None,
+ nominvalue: Optional[bool] = None,
+ nomaxvalue: Optional[bool] = None,
+ cycle: Optional[bool] = None,
+ cache: Optional[bool] = None,
+ order: Optional[bool] = None,
+ ) -> None:
"""Construct a GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY DDL
construct to accompany a :class:`_schema.Column`.
@@ -5306,7 +5615,8 @@ class Identity(IdentityOptions, FetchedValue, SchemaItem):
self.on_null = on_null
self.column = None
- def _set_parent(self, parent, **kw):
+ def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+ assert isinstance(parent, Column)
if not isinstance(
parent.server_default, (type(None), Identity)
) or not isinstance(parent.server_onupdate, type(None)):
@@ -5327,7 +5637,7 @@ class Identity(IdentityOptions, FetchedValue, SchemaItem):
parent.server_default = self
- def _as_for_update(self, for_update):
+ def _as_for_update(self, for_update: bool) -> FetchedValue:
return self
@util.deprecated(
@@ -5335,10 +5645,10 @@ class Identity(IdentityOptions, FetchedValue, SchemaItem):
"The :meth:`_schema.Identity.copy` method is deprecated "
"and will be removed in a future release.",
)
- def copy(self, **kw):
+ def copy(self, **kw: Any) -> Identity:
return self._copy(**kw)
- def _copy(self, **kw):
+ def _copy(self, **kw: Any) -> Identity:
i = Identity(
always=self.always,
on_null=self.on_null,