summaryrefslogtreecommitdiff
path: root/alembic/operations
diff options
context:
space:
mode:
authorCaselIT <cfederico87@gmail.com>2021-04-18 15:44:50 +0200
committerMike Bayer <mike_mp@zzzcomputing.com>2021-08-11 15:04:56 -0400
commit6aad68605f510e8b51f42efa812e02b3831d6e33 (patch)
treecc0e98b8ad8245add8692d8e4910faf57abf7ae3 /alembic/operations
parent3bf6a326c0a11e4f05c94008709d6b0b8e9e051a (diff)
downloadalembic-6aad68605f510e8b51f42efa812e02b3831d6e33.tar.gz
Add pep-484 type annotations
pep-484 type annotations have been added throughout the library. This should be helpful in providing Mypy and IDE support, however there is not full support for Alembic's dynamically modified "op" namespace as of yet; a future release will likely modify the approach used for importing this namespace to be better compatible with pep-484 capabilities. Type originally created using MonkeyType Add types extracted with the MonkeyType https://github.com/instagram/MonkeyType library by running the unit tests using ``monkeytype run -m pytest tests``, then ``monkeytype apply <module>`` (see below for further details). USed MonkeyType version 20.5 on Python 3.8, since newer version have issues After applying the types, the new imports are placed in a ``TYPE_CHECKING`` guard and all type definition of non base types are deferred by using the string notation. NOTE: since to apply the types MonkeType need to import the module, also the test ones, the patch below mocks the setup done by pytest so that the tests could be correctly imported diff --git a/alembic/testing/__init__.py b/alembic/testing/__init__.py index bdd1746..b1090c7 100644 Change-Id: Iff93628f4b43c740848871ce077a118db5e75d41 --- a/alembic/testing/__init__.py +++ b/alembic/testing/__init__.py @@ -9,6 +9,12 @@ from sqlalchemy.testing.config import combinations from sqlalchemy.testing.config import fixture from sqlalchemy.testing.config import requirements as requires +from sqlalchemy.testing.plugin.pytestplugin import PytestFixtureFunctions +from sqlalchemy.testing.plugin.plugin_base import _setup_requirements + +config._fixture_functions = PytestFixtureFunctions() +_setup_requirements("tests.requirements:DefaultRequirements") + from alembic import util from .assertions import assert_raises from .assertions import assert_raises_message Currently I'm using this branch of the sqlalchemy stubs: https://github.com/sqlalchemy/sqlalchemy2-stubs/tree/alembic_updates Change-Id: I8fd0700aab1913f395302626b8b84fea60334abd
Diffstat (limited to 'alembic/operations')
-rw-r--r--alembic/operations/base.py59
-rw-r--r--alembic/operations/batch.py168
-rw-r--r--alembic/operations/ops.py822
-rw-r--r--alembic/operations/schemaobj.py124
-rw-r--r--alembic/operations/toimpl.py55
5 files changed, 838 insertions, 390 deletions
diff --git a/alembic/operations/base.py b/alembic/operations/base.py
index cd14080..d4ec7b1 100644
--- a/alembic/operations/base.py
+++ b/alembic/operations/base.py
@@ -1,5 +1,13 @@
from contextlib import contextmanager
import textwrap
+from typing import Any
+from typing import Callable
+from typing import Iterator
+from typing import Optional
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy.sql.elements import conv
from . import batch
from . import schemaobj
@@ -8,12 +16,15 @@ from ..util import sqla_compat
from ..util.compat import inspect_formatargspec
from ..util.compat import inspect_getargspec
-__all__ = ("Operations", "BatchOperations")
+if TYPE_CHECKING:
+ from sqlalchemy.engine import Connection
-try:
- from sqlalchemy.sql.naming import conv
-except:
- conv = None
+ from .batch import BatchOperationsImpl
+ from .ops import MigrateOperation
+ from ..runtime.migration import MigrationContext
+ from ..util.sqla_compat import _literal_bindparam
+
+__all__ = ("Operations", "BatchOperations")
class Operations(util.ModuleClsProxy):
@@ -49,7 +60,11 @@ class Operations(util.ModuleClsProxy):
_to_impl = util.Dispatcher()
- def __init__(self, migration_context, impl=None):
+ def __init__(
+ self,
+ migration_context: "MigrationContext",
+ impl: Optional["BatchOperationsImpl"] = None,
+ ) -> None:
"""Construct a new :class:`.Operations`
:param migration_context: a :class:`.MigrationContext`
@@ -65,7 +80,9 @@ class Operations(util.ModuleClsProxy):
self.schema_obj = schemaobj.SchemaObjects(migration_context)
@classmethod
- def register_operation(cls, name, sourcename=None):
+ def register_operation(
+ cls, name: str, sourcename: Optional[str] = None
+ ) -> Callable:
"""Register a new operation for this class.
This method is normally used to add new operations
@@ -142,7 +159,7 @@ class Operations(util.ModuleClsProxy):
return register
@classmethod
- def implementation_for(cls, op_cls):
+ def implementation_for(cls, op_cls: Any) -> Callable:
"""Register an implementation for a given :class:`.MigrateOperation`.
This is part of the operation extensibility API.
@@ -161,7 +178,9 @@ class Operations(util.ModuleClsProxy):
@classmethod
@contextmanager
- def context(cls, migration_context):
+ def context(
+ cls, migration_context: "MigrationContext"
+ ) -> Iterator["Operations"]:
op = Operations(migration_context)
op._install_proxy()
yield op
@@ -342,7 +361,7 @@ class Operations(util.ModuleClsProxy):
return self.migration_context
- def invoke(self, operation):
+ def invoke(self, operation: "MigrateOperation") -> Any:
"""Given a :class:`.MigrateOperation`, invoke it in terms of
this :class:`.Operations` instance.
@@ -352,7 +371,7 @@ class Operations(util.ModuleClsProxy):
)
return fn(self, operation)
- def f(self, name):
+ def f(self, name: str) -> "conv":
"""Indicate a string name that has already had a naming convention
applied to it.
@@ -385,20 +404,14 @@ class Operations(util.ModuleClsProxy):
CONSTRAINT ck_bool_t_x CHECK (x in (1, 0)))
The function is rendered in the output of autogenerate when
- a particular constraint name is already converted, for SQLAlchemy
- version **0.9.4 and greater only**. Even though ``naming_convention``
- was introduced in 0.9.2, the string disambiguation service is new
- as of 0.9.4.
+ a particular constraint name is already converted.
"""
- if conv:
- return conv(name)
- else:
- raise NotImplementedError(
- "op.f() feature requires SQLAlchemy 0.9.4 or greater."
- )
+ return conv(name)
- def inline_literal(self, value, type_=None):
+ def inline_literal(
+ self, value: Union[str, int], type_: None = None
+ ) -> "_literal_bindparam":
r"""Produce an 'inline literal' expression, suitable for
using in an INSERT, UPDATE, or DELETE statement.
@@ -442,7 +455,7 @@ class Operations(util.ModuleClsProxy):
"""
return sqla_compat._literal_bindparam(None, value, type_=type_)
- def get_bind(self):
+ def get_bind(self) -> "Connection":
"""Return the current 'bind'.
Under normal circumstances, this is the
diff --git a/alembic/operations/batch.py b/alembic/operations/batch.py
index 656b868..ee1fe05 100644
--- a/alembic/operations/batch.py
+++ b/alembic/operations/batch.py
@@ -1,3 +1,12 @@
+from typing import Any
+from typing import cast
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
from sqlalchemy import CheckConstraint
from sqlalchemy import Column
from sqlalchemy import ForeignKeyConstraint
@@ -21,6 +30,18 @@ from ..util.sqla_compat import _is_type_bound
from ..util.sqla_compat import _remove_column_from_collection
from ..util.sqla_compat import _select
+if TYPE_CHECKING:
+ from typing import Literal
+
+ from sqlalchemy.engine import Dialect
+ from sqlalchemy.sql.elements import ColumnClause
+ from sqlalchemy.sql.elements import quoted_name
+ from sqlalchemy.sql.functions import Function
+ from sqlalchemy.sql.schema import Constraint
+ from sqlalchemy.sql.type_api import TypeEngine
+
+ from ..ddl.impl import DefaultImpl
+
class BatchOperationsImpl:
def __init__(
@@ -61,14 +82,14 @@ class BatchOperationsImpl:
self.batch = []
@property
- def dialect(self):
+ def dialect(self) -> "Dialect":
return self.operations.impl.dialect
@property
- def impl(self):
+ def impl(self) -> "DefaultImpl":
return self.operations.impl
- def _should_recreate(self):
+ def _should_recreate(self) -> bool:
if self.recreate == "auto":
return self.operations.impl.requires_recreate_in_batch(self)
elif self.recreate == "always":
@@ -76,7 +97,7 @@ class BatchOperationsImpl:
else:
return False
- def flush(self):
+ def flush(self) -> None:
should_recreate = self._should_recreate()
with _ensure_scope_for_ddl(self.impl.connection):
@@ -118,10 +139,10 @@ class BatchOperationsImpl:
batch_impl._create(self.impl)
- def alter_column(self, *arg, **kw):
+ def alter_column(self, *arg, **kw) -> None:
self.batch.append(("alter_column", arg, kw))
- def add_column(self, *arg, **kw):
+ def add_column(self, *arg, **kw) -> None:
if (
"insert_before" in kw or "insert_after" in kw
) and not self._should_recreate():
@@ -131,22 +152,22 @@ class BatchOperationsImpl:
)
self.batch.append(("add_column", arg, kw))
- def drop_column(self, *arg, **kw):
+ def drop_column(self, *arg, **kw) -> None:
self.batch.append(("drop_column", arg, kw))
- def add_constraint(self, const):
+ def add_constraint(self, const: "Constraint") -> None:
self.batch.append(("add_constraint", (const,), {}))
- def drop_constraint(self, const):
+ def drop_constraint(self, const: "Constraint") -> None:
self.batch.append(("drop_constraint", (const,), {}))
def rename_table(self, *arg, **kw):
self.batch.append(("rename_table", arg, kw))
- def create_index(self, idx):
+ def create_index(self, idx: "Index") -> None:
self.batch.append(("create_index", (idx,), {}))
- def drop_index(self, idx):
+ def drop_index(self, idx: "Index") -> None:
self.batch.append(("drop_index", (idx,), {}))
def create_table_comment(self, table):
@@ -168,22 +189,24 @@ class BatchOperationsImpl:
class ApplyBatchImpl:
def __init__(
self,
- impl,
- table,
- table_args,
- table_kwargs,
- reflected,
- partial_reordering=(),
- ):
+ impl: "DefaultImpl",
+ table: "Table",
+ table_args: tuple,
+ table_kwargs: Dict[str, Any],
+ reflected: bool,
+ partial_reordering: tuple = (),
+ ) -> None:
self.impl = impl
self.table = table # this is a Table object
self.table_args = table_args
self.table_kwargs = table_kwargs
self.temp_table_name = self._calc_temp_name(table.name)
- self.new_table = None
+ self.new_table: Optional[Table] = None
self.partial_reordering = partial_reordering # tuple of tuples
- self.add_col_ordering = () # tuple of tuples
+ self.add_col_ordering: Tuple[
+ Tuple[str, str], ...
+ ] = () # tuple of tuples
self.column_transfers = OrderedDict(
(c.name, {"expr": c}) for c in self.table.c
@@ -194,12 +217,12 @@ class ApplyBatchImpl:
self._grab_table_elements()
@classmethod
- def _calc_temp_name(cls, tablename):
+ def _calc_temp_name(cls, tablename: "quoted_name") -> str:
return ("_alembic_tmp_%s" % tablename)[0:50]
- def _grab_table_elements(self):
+ def _grab_table_elements(self) -> None:
schema = self.table.schema
- self.columns = OrderedDict()
+ self.columns: Dict[str, "Column"] = OrderedDict()
for c in self.table.c:
c_copy = _copy(c, schema=schema)
c_copy.unique = c_copy.index = False
@@ -208,11 +231,11 @@ class ApplyBatchImpl:
if isinstance(c.type, SchemaEventTarget):
assert c_copy.type is not c.type
self.columns[c.name] = c_copy
- self.named_constraints = {}
+ self.named_constraints: Dict[str, "Constraint"] = {}
self.unnamed_constraints = []
self.col_named_constraints = {}
- self.indexes = {}
- self.new_indexes = {}
+ self.indexes: Dict[str, "Index"] = {}
+ self.new_indexes: Dict[str, "Index"] = {}
for const in self.table.constraints:
if _is_type_bound(const):
@@ -238,7 +261,7 @@ class ApplyBatchImpl:
for k in self.table.kwargs:
self.table_kwargs.setdefault(k, self.table.kwargs[k])
- def _adjust_self_columns_for_partial_reordering(self):
+ def _adjust_self_columns_for_partial_reordering(self) -> None:
pairs = set()
col_by_idx = list(self.columns)
@@ -258,17 +281,17 @@ class ApplyBatchImpl:
# this can happen if some columns were dropped and not removed
# from existing_ordering. this should be prevented already, but
# conservatively making sure this didn't happen
- pairs = [p for p in pairs if p[0] != p[1]]
+ pairs_list = [p for p in pairs if p[0] != p[1]]
sorted_ = list(
- topological.sort(pairs, col_by_idx, deterministic_order=True)
+ topological.sort(pairs_list, col_by_idx, deterministic_order=True)
)
self.columns = OrderedDict((k, self.columns[k]) for k in sorted_)
self.column_transfers = OrderedDict(
(k, self.column_transfers[k]) for k in sorted_
)
- def _transfer_elements_to_new_table(self):
+ def _transfer_elements_to_new_table(self) -> None:
assert self.new_table is None, "Can only create new table once"
m = MetaData()
@@ -296,6 +319,7 @@ class ApplyBatchImpl:
if not const_columns.issubset(self.column_transfers):
continue
+ const_copy: "Constraint"
if isinstance(const, ForeignKeyConstraint):
if _fk_is_self_referential(const):
# for self-referential constraint, refer to the
@@ -320,8 +344,9 @@ class ApplyBatchImpl:
self._setup_referent(m, const)
new_table.append_constraint(const_copy)
- def _gather_indexes_from_both_tables(self):
- idx = []
+ def _gather_indexes_from_both_tables(self) -> List["Index"]:
+ assert self.new_table is not None
+ idx: List[Index] = []
idx.extend(self.indexes.values())
for index in self.new_indexes.values():
idx.append(
@@ -334,8 +359,12 @@ class ApplyBatchImpl:
)
return idx
- def _setup_referent(self, metadata, constraint):
- spec = constraint.elements[0]._get_colspec()
+ def _setup_referent(
+ self, metadata: "MetaData", constraint: "ForeignKeyConstraint"
+ ) -> None:
+ spec = constraint.elements[
+ 0
+ ]._get_colspec() # type:ignore[attr-defined]
parts = spec.split(".")
tname = parts[-2]
if len(parts) == 3:
@@ -345,10 +374,14 @@ class ApplyBatchImpl:
if tname != self.temp_table_name:
key = sql_schema._get_table_key(tname, referent_schema)
+
+ def colspec(elem: Any):
+ return elem._get_colspec()
+
if key in metadata.tables:
t = metadata.tables[key]
for elem in constraint.elements:
- colname = elem._get_colspec().split(".")[-1]
+ colname = colspec(elem).split(".")[-1]
if colname not in t.c:
t.append_column(Column(colname, sqltypes.NULLTYPE))
else:
@@ -358,17 +391,18 @@ class ApplyBatchImpl:
*[
Column(n, sqltypes.NULLTYPE)
for n in [
- elem._get_colspec().split(".")[-1]
+ colspec(elem).split(".")[-1]
for elem in constraint.elements
]
],
schema=referent_schema
)
- def _create(self, op_impl):
+ def _create(self, op_impl: "DefaultImpl") -> None:
self._transfer_elements_to_new_table()
op_impl.prep_table_for_batch(self, self.table)
+ assert self.new_table is not None
op_impl.create_table(self.new_table)
try:
@@ -405,18 +439,18 @@ class ApplyBatchImpl:
def alter_column(
self,
- table_name,
- column_name,
- nullable=None,
- server_default=False,
- name=None,
- type_=None,
- autoincrement=None,
- comment=False,
+ table_name: str,
+ column_name: str,
+ nullable: Optional[bool] = None,
+ server_default: Optional[Union["Function", str, bool]] = False,
+ name: Optional[str] = None,
+ type_: Optional["TypeEngine"] = None,
+ autoincrement: None = None,
+ comment: Union[str, "Literal[False]"] = False,
**kw
- ):
+ ) -> None:
existing = self.columns[column_name]
- existing_transfer = self.column_transfers[column_name]
+ existing_transfer: Dict[str, Any] = self.column_transfers[column_name]
if name is not None and name != column_name:
# note that we don't change '.key' - we keep referring
# to the renamed column by its old key in _create(). neat!
@@ -431,8 +465,8 @@ class ApplyBatchImpl:
# we also ignore the drop_constraint that will come here from
# Operations.implementation_for(alter_column)
if isinstance(existing.type, SchemaEventTarget):
- existing.type._create_events = (
- existing.type.create_constraint
+ existing.type._create_events = ( # type:ignore[attr-defined]
+ existing.type.create_constraint # type:ignore[attr-defined] # noqa
) = False
self.impl.cast_for_batch_migrate(
@@ -452,7 +486,11 @@ class ApplyBatchImpl:
if server_default is None:
existing.server_default = None
else:
- sql_schema.DefaultClause(server_default)._set_parent(existing)
+ sql_schema.DefaultClause(
+ server_default
+ )._set_parent( # type:ignore[attr-defined]
+ existing
+ )
if autoincrement is not None:
existing.autoincrement = bool(autoincrement)
@@ -460,8 +498,11 @@ class ApplyBatchImpl:
existing.comment = comment
def _setup_dependencies_for_add_column(
- self, colname, insert_before, insert_after
- ):
+ self,
+ colname: str,
+ insert_before: Optional[str],
+ insert_after: Optional[str],
+ ) -> None:
index_cols = self.existing_ordering
col_indexes = {name: i for i, name in enumerate(index_cols)}
@@ -505,8 +546,13 @@ class ApplyBatchImpl:
self.add_col_ordering += ((index_cols[-1], colname),)
def add_column(
- self, table_name, column, insert_before=None, insert_after=None, **kw
- ):
+ self,
+ table_name: str,
+ column: "Column",
+ insert_before: Optional[str] = None,
+ insert_after: Optional[str] = None,
+ **kw
+ ) -> None:
self._setup_dependencies_for_add_column(
column.name, insert_before, insert_after
)
@@ -515,7 +561,9 @@ class ApplyBatchImpl:
self.columns[column.name] = _copy(column, schema=self.table.schema)
self.column_transfers[column.name] = {}
- def drop_column(self, table_name, column, **kw):
+ def drop_column(
+ self, table_name: str, column: Union["ColumnClause", "Column"], **kw
+ ) -> None:
if column.name in self.table.primary_key.columns:
_remove_column_from_collection(
self.table.primary_key.columns, column
@@ -546,7 +594,7 @@ class ApplyBatchImpl:
"""
- def add_constraint(self, const):
+ def add_constraint(self, const: "Constraint") -> None:
if not const.name:
raise ValueError("Constraint must have a name")
if isinstance(const, sql_schema.PrimaryKeyConstraint):
@@ -555,7 +603,7 @@ class ApplyBatchImpl:
self.named_constraints[const.name] = const
- def drop_constraint(self, const):
+ def drop_constraint(self, const: "Constraint") -> None:
if not const.name:
raise ValueError("Constraint must have a name")
try:
@@ -566,7 +614,7 @@ class ApplyBatchImpl:
if col_const.name == const.name:
self.columns[col.name].constraints.remove(col_const)
else:
- const = self.named_constraints.pop(const.name)
+ const = self.named_constraints.pop(cast(str, const.name))
except KeyError:
if _is_type_bound(const):
# type-bound constraints are only included in the new
@@ -580,10 +628,10 @@ class ApplyBatchImpl:
for col in const.columns:
self.columns[col.name].primary_key = False
- def create_index(self, idx):
+ def create_index(self, idx: "Index") -> None:
self.new_indexes[idx.name] = idx
- def drop_index(self, idx):
+ def drop_index(self, idx: "Index") -> None:
try:
del self.indexes[idx.name]
except KeyError:
diff --git a/alembic/operations/ops.py b/alembic/operations/ops.py
index 8979355..d5ddbc9 100644
--- a/alembic/operations/ops.py
+++ b/alembic/operations/ops.py
@@ -1,4 +1,19 @@
+from abc import abstractmethod
import re
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import FrozenSet
+from typing import Iterator
+from typing import List
+from typing import MutableMapping
+from typing import Optional
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import Union
from sqlalchemy.types import NULLTYPE
@@ -8,6 +23,33 @@ from .base import Operations
from .. import util
from ..util import sqla_compat
+if TYPE_CHECKING:
+ from sqlalchemy.sql.dml import Insert
+ from sqlalchemy.sql.dml import Update
+ from sqlalchemy.sql.elements import BinaryExpression
+ from sqlalchemy.sql.elements import ColumnElement
+ from sqlalchemy.sql.elements import conv
+ from sqlalchemy.sql.elements import quoted_name
+ from sqlalchemy.sql.elements import TextClause
+ from sqlalchemy.sql.functions import Function
+ from sqlalchemy.sql.schema import CheckConstraint
+ from sqlalchemy.sql.schema import Column
+ from sqlalchemy.sql.schema import Computed
+ from sqlalchemy.sql.schema import Constraint
+ from sqlalchemy.sql.schema import ForeignKeyConstraint
+ from sqlalchemy.sql.schema import Identity
+ from sqlalchemy.sql.schema import Index
+ from sqlalchemy.sql.schema import MetaData
+ from sqlalchemy.sql.schema import PrimaryKeyConstraint
+ from sqlalchemy.sql.schema import SchemaItem
+ from sqlalchemy.sql.schema import Table
+ from sqlalchemy.sql.schema import UniqueConstraint
+ from sqlalchemy.sql.selectable import TableClause
+ from sqlalchemy.sql.type_api import TypeEngine
+
+ from ..autogenerate.rewriter import Rewriter
+ from ..runtime.migration import MigrationContext
+
class MigrateOperation:
"""base class for migration command and organization objects.
@@ -32,7 +74,13 @@ class MigrateOperation:
"""
return {}
- _mutations = frozenset()
+ _mutations: FrozenSet["Rewriter"] = frozenset()
+
+ def reverse(self) -> "MigrateOperation":
+ raise NotImplementedError
+
+ def to_diff_tuple(self) -> Tuple[Any, ...]:
+ raise NotImplementedError
class AddConstraintOp(MigrateOperation):
@@ -45,7 +93,7 @@ class AddConstraintOp(MigrateOperation):
raise NotImplementedError()
@classmethod
- def register_add_constraint(cls, type_):
+ def register_add_constraint(cls, type_: str) -> Callable:
def go(klass):
cls.add_constraint_ops.dispatch_for(type_)(klass.from_constraint)
return klass
@@ -53,15 +101,21 @@ class AddConstraintOp(MigrateOperation):
return go
@classmethod
- def from_constraint(cls, constraint):
+ def from_constraint(cls, constraint: "Constraint") -> "AddConstraintOp":
return cls.add_constraint_ops.dispatch(constraint.__visit_name__)(
constraint
)
- def reverse(self):
+ @abstractmethod
+ def to_constraint(
+ self, migration_context: Optional["MigrationContext"] = None
+ ) -> "Constraint":
+ pass
+
+ def reverse(self) -> "DropConstraintOp":
return DropConstraintOp.from_constraint(self.to_constraint())
- def to_diff_tuple(self):
+ def to_diff_tuple(self) -> Tuple[str, "Constraint"]:
return ("add_constraint", self.to_constraint())
@@ -72,29 +126,34 @@ class DropConstraintOp(MigrateOperation):
def __init__(
self,
- constraint_name,
- table_name,
- type_=None,
- schema=None,
- _reverse=None,
- ):
+ constraint_name: Optional[str],
+ table_name: str,
+ type_: Optional[str] = None,
+ schema: Optional[str] = None,
+ _reverse: Optional["AddConstraintOp"] = None,
+ ) -> None:
self.constraint_name = constraint_name
self.table_name = table_name
self.constraint_type = type_
self.schema = schema
self._reverse = _reverse
- def reverse(self):
+ def reverse(self) -> "AddConstraintOp":
return AddConstraintOp.from_constraint(self.to_constraint())
- def to_diff_tuple(self):
+ def to_diff_tuple(
+ self,
+ ) -> Tuple[str, "SchemaItem"]:
if self.constraint_type == "foreignkey":
return ("remove_fk", self.to_constraint())
else:
return ("remove_constraint", self.to_constraint())
@classmethod
- def from_constraint(cls, constraint):
+ def from_constraint(
+ cls,
+ constraint: "Constraint",
+ ) -> "DropConstraintOp":
types = {
"unique_constraint": "unique",
"foreign_key_constraint": "foreignkey",
@@ -113,7 +172,9 @@ class DropConstraintOp(MigrateOperation):
_reverse=AddConstraintOp.from_constraint(constraint),
)
- def to_constraint(self):
+ def to_constraint(
+ self,
+ ) -> "Constraint":
if self._reverse is not None:
constraint = self._reverse.to_constraint()
@@ -131,8 +192,13 @@ class DropConstraintOp(MigrateOperation):
@classmethod
def drop_constraint(
- cls, operations, constraint_name, table_name, type_=None, schema=None
- ):
+ cls,
+ operations: "Operations",
+ constraint_name: str,
+ table_name: str,
+ type_: Optional[str] = None,
+ schema: Optional[str] = None,
+ ) -> Optional["Table"]:
r"""Drop a constraint of the given name, typically via DROP CONSTRAINT.
:param constraint_name: name of the constraint.
@@ -150,7 +216,12 @@ class DropConstraintOp(MigrateOperation):
return operations.invoke(op)
@classmethod
- def batch_drop_constraint(cls, operations, constraint_name, type_=None):
+ def batch_drop_constraint(
+ cls,
+ operations: "BatchOperations",
+ constraint_name: str,
+ type_: Optional[str] = None,
+ ) -> None:
"""Issue a "drop constraint" instruction using the
current batch migration context.
@@ -182,8 +253,13 @@ class CreatePrimaryKeyOp(AddConstraintOp):
constraint_type = "primarykey"
def __init__(
- self, constraint_name, table_name, columns, schema=None, **kw
- ):
+ self,
+ constraint_name: Optional[str],
+ table_name: str,
+ columns: Sequence[str],
+ schema: Optional[str] = None,
+ **kw
+ ) -> None:
self.constraint_name = constraint_name
self.table_name = table_name
self.columns = columns
@@ -191,18 +267,23 @@ class CreatePrimaryKeyOp(AddConstraintOp):
self.kw = kw
@classmethod
- def from_constraint(cls, constraint):
+ def from_constraint(cls, constraint: "Constraint") -> "CreatePrimaryKeyOp":
constraint_table = sqla_compat._table_for_constraint(constraint)
+ pk_constraint = cast("PrimaryKeyConstraint", constraint)
+
return cls(
- constraint.name,
+ pk_constraint.name,
constraint_table.name,
- constraint.columns.keys(),
+ pk_constraint.columns.keys(),
schema=constraint_table.schema,
- **constraint.dialect_kwargs,
+ **pk_constraint.dialect_kwargs,
)
- def to_constraint(self, migration_context=None):
+ def to_constraint(
+ self, migration_context: Optional["MigrationContext"] = None
+ ) -> "PrimaryKeyConstraint":
schema_obj = schemaobj.SchemaObjects(migration_context)
+
return schema_obj.primary_key_constraint(
self.constraint_name,
self.table_name,
@@ -213,8 +294,13 @@ class CreatePrimaryKeyOp(AddConstraintOp):
@classmethod
def create_primary_key(
- cls, operations, constraint_name, table_name, columns, schema=None
- ):
+ cls,
+ operations: "Operations",
+ constraint_name: str,
+ table_name: str,
+ columns: List[str],
+ schema: Optional[str] = None,
+ ) -> Optional["Table"]:
"""Issue a "create primary key" instruction using the current
migration context.
@@ -255,7 +341,12 @@ class CreatePrimaryKeyOp(AddConstraintOp):
return operations.invoke(op)
@classmethod
- def batch_create_primary_key(cls, operations, constraint_name, columns):
+ def batch_create_primary_key(
+ cls,
+ operations: "BatchOperations",
+ constraint_name: str,
+ columns: List[str],
+ ) -> None:
"""Issue a "create primary key" instruction using the
current batch migration context.
@@ -287,8 +378,13 @@ class CreateUniqueConstraintOp(AddConstraintOp):
constraint_type = "unique"
def __init__(
- self, constraint_name, table_name, columns, schema=None, **kw
- ):
+ self,
+ constraint_name: Optional[str],
+ table_name: str,
+ columns: Sequence[str],
+ schema: Optional[str] = None,
+ **kw
+ ) -> None:
self.constraint_name = constraint_name
self.table_name = table_name
self.columns = columns
@@ -296,24 +392,31 @@ class CreateUniqueConstraintOp(AddConstraintOp):
self.kw = kw
@classmethod
- def from_constraint(cls, constraint):
+ def from_constraint(
+ cls, constraint: "Constraint"
+ ) -> "CreateUniqueConstraintOp":
+
constraint_table = sqla_compat._table_for_constraint(constraint)
- kw = {}
- if constraint.deferrable:
- kw["deferrable"] = constraint.deferrable
- if constraint.initially:
- kw["initially"] = constraint.initially
- kw.update(constraint.dialect_kwargs)
+ uq_constraint = cast("UniqueConstraint", constraint)
+
+ kw: dict = {}
+ if uq_constraint.deferrable:
+ kw["deferrable"] = uq_constraint.deferrable
+ if uq_constraint.initially:
+ kw["initially"] = uq_constraint.initially
+ kw.update(uq_constraint.dialect_kwargs)
return cls(
- constraint.name,
+ uq_constraint.name,
constraint_table.name,
- [c.name for c in constraint.columns],
+ [c.name for c in uq_constraint.columns],
schema=constraint_table.schema,
**kw,
)
- def to_constraint(self, migration_context=None):
+ def to_constraint(
+ self, migration_context: Optional["MigrationContext"] = None
+ ) -> "UniqueConstraint":
schema_obj = schemaobj.SchemaObjects(migration_context)
return schema_obj.unique_constraint(
self.constraint_name,
@@ -326,13 +429,13 @@ class CreateUniqueConstraintOp(AddConstraintOp):
@classmethod
def create_unique_constraint(
cls,
- operations,
- constraint_name,
- table_name,
- columns,
- schema=None,
+ operations: "Operations",
+ constraint_name: Optional[str],
+ table_name: str,
+ columns: Sequence[str],
+ schema: Optional[str] = None,
**kw
- ):
+ ) -> Any:
"""Issue a "create unique constraint" instruction using the
current migration context.
@@ -376,8 +479,12 @@ class CreateUniqueConstraintOp(AddConstraintOp):
@classmethod
def batch_create_unique_constraint(
- cls, operations, constraint_name, columns, **kw
- ):
+ cls,
+ operations: "BatchOperations",
+ constraint_name: str,
+ columns: Sequence[str],
+ **kw
+ ) -> Any:
"""Issue a "create unique constraint" instruction using the
current batch migration context.
@@ -406,13 +513,13 @@ class CreateForeignKeyOp(AddConstraintOp):
def __init__(
self,
- constraint_name,
- source_table,
- referent_table,
- local_cols,
- remote_cols,
+ constraint_name: Optional[str],
+ source_table: str,
+ referent_table: str,
+ local_cols: List[str],
+ remote_cols: List[str],
**kw
- ):
+ ) -> None:
self.constraint_name = constraint_name
self.source_table = source_table
self.referent_table = referent_table
@@ -420,22 +527,24 @@ class CreateForeignKeyOp(AddConstraintOp):
self.remote_cols = remote_cols
self.kw = kw
- def to_diff_tuple(self):
+ def to_diff_tuple(self) -> Tuple[str, "ForeignKeyConstraint"]:
return ("add_fk", self.to_constraint())
@classmethod
- def from_constraint(cls, constraint):
- kw = {}
- if constraint.onupdate:
- kw["onupdate"] = constraint.onupdate
- if constraint.ondelete:
- kw["ondelete"] = constraint.ondelete
- if constraint.initially:
- kw["initially"] = constraint.initially
- if constraint.deferrable:
- kw["deferrable"] = constraint.deferrable
- if constraint.use_alter:
- kw["use_alter"] = constraint.use_alter
+ def from_constraint(cls, constraint: "Constraint") -> "CreateForeignKeyOp":
+
+ fk_constraint = cast("ForeignKeyConstraint", constraint)
+ kw: dict = {}
+ if fk_constraint.onupdate:
+ kw["onupdate"] = fk_constraint.onupdate
+ if fk_constraint.ondelete:
+ kw["ondelete"] = fk_constraint.ondelete
+ if fk_constraint.initially:
+ kw["initially"] = fk_constraint.initially
+ if fk_constraint.deferrable:
+ kw["deferrable"] = fk_constraint.deferrable
+ if fk_constraint.use_alter:
+ kw["use_alter"] = fk_constraint.use_alter
(
source_schema,
@@ -448,13 +557,13 @@ class CreateForeignKeyOp(AddConstraintOp):
ondelete,
deferrable,
initially,
- ) = sqla_compat._fk_spec(constraint)
+ ) = sqla_compat._fk_spec(fk_constraint)
kw["source_schema"] = source_schema
kw["referent_schema"] = target_schema
- kw.update(constraint.dialect_kwargs)
+ kw.update(fk_constraint.dialect_kwargs)
return cls(
- constraint.name,
+ fk_constraint.name,
source_table,
target_table,
source_columns,
@@ -462,7 +571,9 @@ class CreateForeignKeyOp(AddConstraintOp):
**kw,
)
- def to_constraint(self, migration_context=None):
+ def to_constraint(
+ self, migration_context: Optional["MigrationContext"] = None
+ ) -> "ForeignKeyConstraint":
schema_obj = schemaobj.SchemaObjects(migration_context)
return schema_obj.foreign_key_constraint(
self.constraint_name,
@@ -476,21 +587,21 @@ class CreateForeignKeyOp(AddConstraintOp):
@classmethod
def create_foreign_key(
cls,
- operations,
- constraint_name,
- source_table,
- referent_table,
- local_cols,
- remote_cols,
- onupdate=None,
- ondelete=None,
- deferrable=None,
- initially=None,
- match=None,
- source_schema=None,
- referent_schema=None,
+ operations: "Operations",
+ constraint_name: str,
+ source_table: str,
+ referent_table: str,
+ local_cols: List[str],
+ remote_cols: List[str],
+ onupdate: Optional[str] = None,
+ ondelete: Optional[str] = None,
+ deferrable: Optional[bool] = None,
+ initially: Optional[str] = None,
+ match: Optional[str] = None,
+ source_schema: Optional[str] = None,
+ referent_schema: Optional[str] = None,
**dialect_kw
- ):
+ ) -> Optional["Table"]:
"""Issue a "create foreign key" instruction using the
current migration context.
@@ -556,19 +667,19 @@ class CreateForeignKeyOp(AddConstraintOp):
@classmethod
def batch_create_foreign_key(
cls,
- operations,
- constraint_name,
- referent_table,
- local_cols,
- remote_cols,
- referent_schema=None,
- onupdate=None,
- ondelete=None,
- deferrable=None,
- initially=None,
- match=None,
+ operations: "BatchOperations",
+ constraint_name: str,
+ referent_table: str,
+ local_cols: List[str],
+ remote_cols: List[str],
+ referent_schema: Optional[str] = None,
+ onupdate: None = None,
+ ondelete: None = None,
+ deferrable: None = None,
+ initially: None = None,
+ match: None = None,
**dialect_kw
- ):
+ ) -> None:
"""Issue a "create foreign key" instruction using the
current batch migration context.
@@ -618,8 +729,13 @@ class CreateCheckConstraintOp(AddConstraintOp):
constraint_type = "check"
def __init__(
- self, constraint_name, table_name, condition, schema=None, **kw
- ):
+ self,
+ constraint_name: Optional[str],
+ table_name: str,
+ condition: Union["TextClause", "ColumnElement[Any]"],
+ schema: Optional[str] = None,
+ **kw
+ ) -> None:
self.constraint_name = constraint_name
self.table_name = table_name
self.condition = condition
@@ -627,18 +743,26 @@ class CreateCheckConstraintOp(AddConstraintOp):
self.kw = kw
@classmethod
- def from_constraint(cls, constraint):
+ def from_constraint(
+ cls, constraint: "Constraint"
+ ) -> "CreateCheckConstraintOp":
constraint_table = sqla_compat._table_for_constraint(constraint)
+ ck_constraint = cast("CheckConstraint", constraint)
+
return cls(
- constraint.name,
+ ck_constraint.name,
constraint_table.name,
- constraint.sqltext,
+ cast(
+ "Union[TextClause, ColumnElement[Any]]", ck_constraint.sqltext
+ ),
schema=constraint_table.schema,
- **constraint.dialect_kwargs,
+ **ck_constraint.dialect_kwargs,
)
- def to_constraint(self, migration_context=None):
+ def to_constraint(
+ self, migration_context: Optional["MigrationContext"] = None
+ ) -> "CheckConstraint":
schema_obj = schemaobj.SchemaObjects(migration_context)
return schema_obj.check_constraint(
self.constraint_name,
@@ -651,13 +775,13 @@ class CreateCheckConstraintOp(AddConstraintOp):
@classmethod
def create_check_constraint(
cls,
- operations,
- constraint_name,
- table_name,
- condition,
- schema=None,
+ operations: "Operations",
+ constraint_name: Optional[str],
+ table_name: str,
+ condition: "BinaryExpression",
+ schema: Optional[str] = None,
**kw
- ):
+ ) -> Optional["Table"]:
"""Issue a "create check constraint" instruction using the
current migration context.
@@ -703,8 +827,12 @@ class CreateCheckConstraintOp(AddConstraintOp):
@classmethod
def batch_create_check_constraint(
- cls, operations, constraint_name, condition, **kw
- ):
+ cls,
+ operations: "BatchOperations",
+ constraint_name: str,
+ condition: "TextClause",
+ **kw
+ ) -> Optional["Table"]:
"""Issue a "create check constraint" instruction using the
current batch migration context.
@@ -732,8 +860,14 @@ class CreateIndexOp(MigrateOperation):
"""Represent a create index operation."""
def __init__(
- self, index_name, table_name, columns, schema=None, unique=False, **kw
- ):
+ self,
+ index_name: str,
+ table_name: str,
+ columns: Sequence[Union[str, "TextClause", "ColumnElement[Any]"]],
+ schema: Optional[str] = None,
+ unique: bool = False,
+ **kw
+ ) -> None:
self.index_name = index_name
self.table_name = table_name
self.columns = columns
@@ -741,14 +875,15 @@ class CreateIndexOp(MigrateOperation):
self.unique = unique
self.kw = kw
- def reverse(self):
+ def reverse(self) -> "DropIndexOp":
return DropIndexOp.from_index(self.to_index())
- def to_diff_tuple(self):
+ def to_diff_tuple(self) -> Tuple[str, "Index"]:
return ("add_index", self.to_index())
@classmethod
- def from_index(cls, index):
+ def from_index(cls, index: "Index") -> "CreateIndexOp":
+ assert index.table is not None
return cls(
index.name,
index.table.name,
@@ -758,7 +893,9 @@ class CreateIndexOp(MigrateOperation):
**index.kwargs,
)
- def to_index(self, migration_context=None):
+ def to_index(
+ self, migration_context: Optional["MigrationContext"] = None
+ ) -> "Index":
schema_obj = schemaobj.SchemaObjects(migration_context)
idx = schema_obj.index(
@@ -774,14 +911,14 @@ class CreateIndexOp(MigrateOperation):
@classmethod
def create_index(
cls,
- operations,
- index_name,
- table_name,
- columns,
- schema=None,
- unique=False,
+ operations: Operations,
+ index_name: str,
+ table_name: str,
+ columns: Sequence[Union[str, "TextClause", "Function"]],
+ schema: Optional[str] = None,
+ unique: bool = False,
**kw
- ):
+ ) -> Optional["Table"]:
r"""Issue a "create index" instruction using the current
migration context.
@@ -829,7 +966,13 @@ class CreateIndexOp(MigrateOperation):
return operations.invoke(op)
@classmethod
- def batch_create_index(cls, operations, index_name, columns, **kw):
+ def batch_create_index(
+ cls,
+ operations: "BatchOperations",
+ index_name: str,
+ columns: List[str],
+ **kw
+ ) -> Optional["Table"]:
"""Issue a "create index" instruction using the
current batch migration context.
@@ -855,22 +998,28 @@ class DropIndexOp(MigrateOperation):
"""Represent a drop index operation."""
def __init__(
- self, index_name, table_name=None, schema=None, _reverse=None, **kw
- ):
+ self,
+ index_name: Union["quoted_name", str, "conv"],
+ table_name: Optional[str] = None,
+ schema: Optional[str] = None,
+ _reverse: Optional["CreateIndexOp"] = None,
+ **kw
+ ) -> None:
self.index_name = index_name
self.table_name = table_name
self.schema = schema
self._reverse = _reverse
self.kw = kw
- def to_diff_tuple(self):
+ def to_diff_tuple(self) -> Tuple[str, "Index"]:
return ("remove_index", self.to_index())
- def reverse(self):
+ def reverse(self) -> "CreateIndexOp":
return CreateIndexOp.from_index(self.to_index())
@classmethod
- def from_index(cls, index):
+ def from_index(cls, index: "Index") -> "DropIndexOp":
+ assert index.table is not None
return cls(
index.name,
index.table.name,
@@ -879,7 +1028,9 @@ class DropIndexOp(MigrateOperation):
**index.kwargs,
)
- def to_index(self, migration_context=None):
+ def to_index(
+ self, migration_context: Optional["MigrationContext"] = None
+ ) -> "Index":
schema_obj = schemaobj.SchemaObjects(migration_context)
# need a dummy column name here since SQLAlchemy
@@ -894,8 +1045,13 @@ class DropIndexOp(MigrateOperation):
@classmethod
def drop_index(
- cls, operations, index_name, table_name=None, schema=None, **kw
- ):
+ cls,
+ operations: "Operations",
+ index_name: str,
+ table_name: Optional[str] = None,
+ schema: Optional[str] = None,
+ **kw
+ ) -> Optional["Table"]:
r"""Issue a "drop index" instruction using the current
migration context.
@@ -921,7 +1077,9 @@ class DropIndexOp(MigrateOperation):
return operations.invoke(op)
@classmethod
- def batch_drop_index(cls, operations, index_name, **kw):
+ def batch_drop_index(
+ cls, operations: BatchOperations, index_name: str, **kw
+ ) -> Optional["Table"]:
"""Issue a "drop index" instruction using the
current batch migration context.
@@ -946,13 +1104,13 @@ class CreateTableOp(MigrateOperation):
def __init__(
self,
- table_name,
- columns,
- schema=None,
- _namespace_metadata=None,
- _constraints_included=False,
+ table_name: str,
+ columns: Sequence[Union["Column", "Constraint"]],
+ schema: Optional[str] = None,
+ _namespace_metadata: Optional["MetaData"] = None,
+ _constraints_included: bool = False,
**kw
- ):
+ ) -> None:
self.table_name = table_name
self.columns = columns
self.schema = schema
@@ -963,22 +1121,24 @@ class CreateTableOp(MigrateOperation):
self._namespace_metadata = _namespace_metadata
self._constraints_included = _constraints_included
- def reverse(self):
+ def reverse(self) -> "DropTableOp":
return DropTableOp.from_table(
self.to_table(), _namespace_metadata=self._namespace_metadata
)
- def to_diff_tuple(self):
+ def to_diff_tuple(self) -> Tuple[str, "Table"]:
return ("add_table", self.to_table())
@classmethod
- def from_table(cls, table, _namespace_metadata=None):
+ def from_table(
+ cls, table: "Table", _namespace_metadata: Optional["MetaData"] = None
+ ) -> "CreateTableOp":
if _namespace_metadata is None:
_namespace_metadata = table.metadata
return cls(
table.name,
- list(table.c) + list(table.constraints),
+ list(table.c) + list(table.constraints), # type:ignore[arg-type]
schema=table.schema,
_namespace_metadata=_namespace_metadata,
# given a Table() object, this Table will contain full Index()
@@ -989,12 +1149,14 @@ class CreateTableOp(MigrateOperation):
# not doubled up. see #844 #848
_constraints_included=True,
comment=table.comment,
- info=table.info.copy(),
+ info=dict(table.info),
prefixes=list(table._prefixes),
**table.kwargs,
)
- def to_table(self, migration_context=None):
+ def to_table(
+ self, migration_context: Optional["MigrationContext"] = None
+ ) -> "Table":
schema_obj = schemaobj.SchemaObjects(migration_context)
return schema_obj.table(
@@ -1009,7 +1171,9 @@ class CreateTableOp(MigrateOperation):
)
@classmethod
- def create_table(cls, operations, table_name, *columns, **kw):
+ def create_table(
+ cls, operations: "Operations", table_name: str, *columns, **kw
+ ) -> Optional["Table"]:
r"""Issue a "create table" instruction using the current migration
context.
@@ -1094,7 +1258,13 @@ class CreateTableOp(MigrateOperation):
class DropTableOp(MigrateOperation):
"""Represent a drop table operation."""
- def __init__(self, table_name, schema=None, table_kw=None, _reverse=None):
+ def __init__(
+ self,
+ table_name: str,
+ schema: Optional[str] = None,
+ table_kw: Optional[MutableMapping[Any, Any]] = None,
+ _reverse: Optional["CreateTableOp"] = None,
+ ) -> None:
self.table_name = table_name
self.schema = schema
self.table_kw = table_kw or {}
@@ -1103,20 +1273,22 @@ class DropTableOp(MigrateOperation):
self.prefixes = self.table_kw.pop("prefixes", None)
self._reverse = _reverse
- def to_diff_tuple(self):
+ def to_diff_tuple(self) -> Tuple[str, "Table"]:
return ("remove_table", self.to_table())
- def reverse(self):
+ def reverse(self) -> "CreateTableOp":
return CreateTableOp.from_table(self.to_table())
@classmethod
- def from_table(cls, table, _namespace_metadata=None):
+ def from_table(
+ cls, table: "Table", _namespace_metadata: Optional["MetaData"] = None
+ ) -> "DropTableOp":
return cls(
table.name,
schema=table.schema,
table_kw={
"comment": table.comment,
- "info": table.info.copy(),
+ "info": dict(table.info),
"prefixes": list(table._prefixes),
**table.kwargs,
},
@@ -1125,7 +1297,9 @@ class DropTableOp(MigrateOperation):
),
)
- def to_table(self, migration_context=None):
+ def to_table(
+ self, migration_context: Optional["MigrationContext"] = None
+ ) -> "Table":
if self._reverse:
cols_and_constraints = self._reverse.columns
else:
@@ -1139,14 +1313,21 @@ class DropTableOp(MigrateOperation):
info=self.info.copy() if self.info else {},
prefixes=list(self.prefixes) if self.prefixes else [],
schema=self.schema,
- _constraints_included=bool(self._reverse)
- and self._reverse._constraints_included,
+ _constraints_included=self._reverse._constraints_included
+ if self._reverse
+ else False,
**self.table_kw,
)
return t
@classmethod
- def drop_table(cls, operations, table_name, schema=None, **kw):
+ def drop_table(
+ cls,
+ operations: "Operations",
+ table_name: str,
+ schema: Optional[str] = None,
+ **kw: Any
+ ) -> None:
r"""Issue a "drop table" instruction using the current
migration context.
@@ -1171,7 +1352,11 @@ class DropTableOp(MigrateOperation):
class AlterTableOp(MigrateOperation):
"""Represent an alter table operation."""
- def __init__(self, table_name, schema=None):
+ def __init__(
+ self,
+ table_name: str,
+ schema: Optional[str] = None,
+ ) -> None:
self.table_name = table_name
self.schema = schema
@@ -1180,14 +1365,23 @@ class AlterTableOp(MigrateOperation):
class RenameTableOp(AlterTableOp):
"""Represent a rename table operation."""
- def __init__(self, old_table_name, new_table_name, schema=None):
+ def __init__(
+ self,
+ old_table_name: str,
+ new_table_name: str,
+ schema: Optional[str] = None,
+ ) -> None:
super(RenameTableOp, self).__init__(old_table_name, schema=schema)
self.new_table_name = new_table_name
@classmethod
def rename_table(
- cls, operations, old_table_name, new_table_name, schema=None
- ):
+ cls,
+ operations: "Operations",
+ old_table_name: str,
+ new_table_name: str,
+ schema: Optional[str] = None,
+ ) -> Optional["Table"]:
"""Emit an ALTER TABLE to rename a table.
:param old_table_name: old name.
@@ -1210,8 +1404,12 @@ class CreateTableCommentOp(AlterTableOp):
"""Represent a COMMENT ON `table` operation."""
def __init__(
- self, table_name, comment, schema=None, existing_comment=None
- ):
+ self,
+ table_name: str,
+ comment: Optional[str],
+ schema: Optional[str] = None,
+ existing_comment: Optional[str] = None,
+ ) -> None:
self.table_name = table_name
self.comment = comment
self.existing_comment = existing_comment
@@ -1220,12 +1418,12 @@ class CreateTableCommentOp(AlterTableOp):
@classmethod
def create_table_comment(
cls,
- operations,
- table_name,
- comment,
- existing_comment=None,
- schema=None,
- ):
+ operations: "Operations",
+ table_name: str,
+ comment: Optional[str],
+ existing_comment: None = None,
+ schema: Optional[str] = None,
+ ) -> Optional["Table"]:
"""Emit a COMMENT ON operation to set the comment for a table.
.. versionadded:: 1.0.6
@@ -1317,15 +1515,24 @@ class CreateTableCommentOp(AlterTableOp):
class DropTableCommentOp(AlterTableOp):
"""Represent an operation to remove the comment from a table."""
- def __init__(self, table_name, schema=None, existing_comment=None):
+ def __init__(
+ self,
+ table_name: str,
+ schema: Optional[str] = None,
+ existing_comment: Optional[str] = None,
+ ) -> None:
self.table_name = table_name
self.existing_comment = existing_comment
self.schema = schema
@classmethod
def drop_table_comment(
- cls, operations, table_name, existing_comment=None, schema=None
- ):
+ cls,
+ operations: "Operations",
+ table_name: str,
+ existing_comment: Optional[str] = None,
+ schema: Optional[str] = None,
+ ) -> Optional["Table"]:
"""Issue a "drop table comment" operation to
remove an existing comment set on a table.
@@ -1388,20 +1595,20 @@ class AlterColumnOp(AlterTableOp):
def __init__(
self,
- table_name,
- column_name,
- schema=None,
- existing_type=None,
- existing_server_default=False,
- existing_nullable=None,
- existing_comment=None,
- modify_nullable=None,
- modify_comment=False,
- modify_server_default=False,
- modify_name=None,
- modify_type=None,
+ table_name: str,
+ column_name: str,
+ schema: Optional[str] = None,
+ existing_type: Optional[Any] = None,
+ existing_server_default: Any = False,
+ existing_nullable: Optional[bool] = None,
+ existing_comment: Optional[str] = None,
+ modify_nullable: Optional[bool] = None,
+ modify_comment: Optional[Union[str, bool]] = False,
+ modify_server_default: Any = False,
+ modify_name: Optional[str] = None,
+ modify_type: Optional[Any] = None,
**kw
- ):
+ ) -> None:
super(AlterColumnOp, self).__init__(table_name, schema=schema)
self.column_name = column_name
self.existing_type = existing_type
@@ -1415,7 +1622,7 @@ class AlterColumnOp(AlterTableOp):
self.modify_type = modify_type
self.kw = kw
- def to_diff_tuple(self):
+ def to_diff_tuple(self) -> Any:
col_diff = []
schema, tname, cname = self.schema, self.table_name, self.column_name
@@ -1495,7 +1702,7 @@ class AlterColumnOp(AlterTableOp):
return col_diff
- def has_changes(self):
+ def has_changes(self) -> bool:
hc1 = (
self.modify_nullable is not None
or self.modify_server_default is not False
@@ -1510,7 +1717,7 @@ class AlterColumnOp(AlterTableOp):
else:
return False
- def reverse(self):
+ def reverse(self) -> "AlterColumnOp":
kw = self.kw.copy()
kw["existing_type"] = self.existing_type
@@ -1546,21 +1753,25 @@ class AlterColumnOp(AlterTableOp):
@classmethod
def alter_column(
cls,
- operations,
- table_name,
- column_name,
- nullable=None,
- comment=False,
- server_default=False,
- new_column_name=None,
- type_=None,
- existing_type=None,
- existing_server_default=False,
- existing_nullable=None,
- existing_comment=None,
- schema=None,
+ operations: Operations,
+ table_name: str,
+ column_name: str,
+ nullable: Optional[bool] = None,
+ comment: Optional[Union[str, bool]] = False,
+ server_default: Any = False,
+ new_column_name: Optional[str] = None,
+ type_: Optional[Union["TypeEngine", Type["TypeEngine"]]] = None,
+ existing_type: Optional[
+ Union["TypeEngine", Type["TypeEngine"]]
+ ] = None,
+ existing_server_default: Optional[
+ Union[str, bool, "Identity", "Computed"]
+ ] = False,
+ existing_nullable: Optional[bool] = None,
+ existing_comment: Optional[str] = None,
+ schema: Optional[str] = None,
**kw
- ):
+ ) -> Optional["Table"]:
r"""Issue an "alter column" instruction using the
current migration context.
@@ -1671,21 +1882,23 @@ class AlterColumnOp(AlterTableOp):
@classmethod
def batch_alter_column(
cls,
- operations,
- column_name,
- nullable=None,
- comment=False,
- server_default=False,
- new_column_name=None,
- type_=None,
- existing_type=None,
- existing_server_default=False,
- existing_nullable=None,
- existing_comment=None,
- insert_before=None,
- insert_after=None,
+ operations: BatchOperations,
+ column_name: str,
+ nullable: Optional[bool] = None,
+ comment: bool = False,
+ server_default: Union["Function", bool] = False,
+ new_column_name: Optional[str] = None,
+ type_: Optional[Union["TypeEngine", Type["TypeEngine"]]] = None,
+ existing_type: Optional[
+ Union["TypeEngine", Type["TypeEngine"]]
+ ] = None,
+ existing_server_default: bool = False,
+ existing_nullable: None = None,
+ existing_comment: None = None,
+ insert_before: None = None,
+ insert_after: None = None,
**kw
- ):
+ ) -> Optional["Table"]:
"""Issue an "alter column" instruction using the current
batch migration context.
@@ -1736,32 +1949,51 @@ class AlterColumnOp(AlterTableOp):
class AddColumnOp(AlterTableOp):
"""Represent an add column operation."""
- def __init__(self, table_name, column, schema=None, **kw):
+ def __init__(
+ self,
+ table_name: str,
+ column: "Column",
+ schema: Optional[str] = None,
+ **kw
+ ) -> None:
super(AddColumnOp, self).__init__(table_name, schema=schema)
self.column = column
self.kw = kw
- def reverse(self):
+ def reverse(self) -> "DropColumnOp":
return DropColumnOp.from_column_and_tablename(
self.schema, self.table_name, self.column
)
- def to_diff_tuple(self):
+ def to_diff_tuple(
+ self,
+ ) -> Tuple[str, Optional[str], str, "Column"]:
return ("add_column", self.schema, self.table_name, self.column)
- def to_column(self):
+ def to_column(self) -> "Column":
return self.column
@classmethod
- def from_column(cls, col):
+ def from_column(cls, col: "Column") -> "AddColumnOp":
return cls(col.table.name, col, schema=col.table.schema)
@classmethod
- def from_column_and_tablename(cls, schema, tname, col):
+ def from_column_and_tablename(
+ cls,
+ schema: Optional[str],
+ tname: str,
+ col: "Column",
+ ) -> "AddColumnOp":
return cls(tname, col, schema=schema)
@classmethod
- def add_column(cls, operations, table_name, column, schema=None):
+ def add_column(
+ cls,
+ operations: "Operations",
+ table_name: str,
+ column: "Column",
+ schema: Optional[str] = None,
+ ) -> Optional["Table"]:
"""Issue an "add column" instruction using the current
migration context.
@@ -1816,8 +2048,12 @@ class AddColumnOp(AlterTableOp):
@classmethod
def batch_add_column(
- cls, operations, column, insert_before=None, insert_after=None
- ):
+ cls,
+ operations: "BatchOperations",
+ column: "Column",
+ insert_before: Optional[str] = None,
+ insert_after: Optional[str] = None,
+ ) -> Optional["Table"]:
"""Issue an "add column" instruction using the current
batch migration context.
@@ -1848,14 +2084,21 @@ class DropColumnOp(AlterTableOp):
"""Represent a drop column operation."""
def __init__(
- self, table_name, column_name, schema=None, _reverse=None, **kw
- ):
+ self,
+ table_name: str,
+ column_name: str,
+ schema: Optional[str] = None,
+ _reverse: Optional["AddColumnOp"] = None,
+ **kw
+ ) -> None:
super(DropColumnOp, self).__init__(table_name, schema=schema)
self.column_name = column_name
self.kw = kw
self._reverse = _reverse
- def to_diff_tuple(self):
+ def to_diff_tuple(
+ self,
+ ) -> Tuple[str, Optional[str], str, "Column"]:
return (
"remove_column",
self.schema,
@@ -1863,7 +2106,7 @@ class DropColumnOp(AlterTableOp):
self.to_column(),
)
- def reverse(self):
+ def reverse(self) -> "AddColumnOp":
if self._reverse is None:
raise ValueError(
"operation is not reversible; "
@@ -1875,7 +2118,12 @@ class DropColumnOp(AlterTableOp):
)
@classmethod
- def from_column_and_tablename(cls, schema, tname, col):
+ def from_column_and_tablename(
+ cls,
+ schema: Optional[str],
+ tname: str,
+ col: "Column",
+ ) -> "DropColumnOp":
return cls(
tname,
col.name,
@@ -1883,7 +2131,9 @@ class DropColumnOp(AlterTableOp):
_reverse=AddColumnOp.from_column_and_tablename(schema, tname, col),
)
- def to_column(self, migration_context=None):
+ def to_column(
+ self, migration_context: Optional["MigrationContext"] = None
+ ) -> "Column":
if self._reverse is not None:
return self._reverse.column
schema_obj = schemaobj.SchemaObjects(migration_context)
@@ -1891,8 +2141,13 @@ class DropColumnOp(AlterTableOp):
@classmethod
def drop_column(
- cls, operations, table_name, column_name, schema=None, **kw
- ):
+ cls,
+ operations: "Operations",
+ table_name: str,
+ column_name: str,
+ schema: Optional[str] = None,
+ **kw
+ ) -> Optional["Table"]:
"""Issue a "drop column" instruction using the current
migration context.
@@ -1934,7 +2189,9 @@ class DropColumnOp(AlterTableOp):
return operations.invoke(op)
@classmethod
- def batch_drop_column(cls, operations, column_name, **kw):
+ def batch_drop_column(
+ cls, operations: "BatchOperations", column_name: str, **kw
+ ) -> Optional["Table"]:
"""Issue a "drop column" instruction using the current
batch migration context.
@@ -1956,13 +2213,24 @@ class DropColumnOp(AlterTableOp):
class BulkInsertOp(MigrateOperation):
"""Represent a bulk insert operation."""
- def __init__(self, table, rows, multiinsert=True):
+ def __init__(
+ self,
+ table: Union["Table", "TableClause"],
+ rows: List[dict],
+ multiinsert: bool = True,
+ ) -> None:
self.table = table
self.rows = rows
self.multiinsert = multiinsert
@classmethod
- def bulk_insert(cls, operations, table, rows, multiinsert=True):
+ def bulk_insert(
+ cls,
+ operations: Operations,
+ table: Union["Table", "TableClause"],
+ rows: List[dict],
+ multiinsert: bool = True,
+ ) -> None:
"""Issue a "bulk insert" operation using the current
migration context.
@@ -2046,12 +2314,21 @@ class BulkInsertOp(MigrateOperation):
class ExecuteSQLOp(MigrateOperation):
"""Represent an execute SQL operation."""
- def __init__(self, sqltext, execution_options=None):
+ def __init__(
+ self,
+ sqltext: Union["Update", str, "Insert", "TextClause"],
+ execution_options: None = None,
+ ) -> None:
self.sqltext = sqltext
self.execution_options = execution_options
@classmethod
- def execute(cls, operations, sqltext, execution_options=None):
+ def execute(
+ cls,
+ operations: Operations,
+ sqltext: Union[str, "TextClause", "Update"],
+ execution_options: None = None,
+ ) -> Optional["Table"]:
r"""Execute the given SQL using the current migration context.
The given SQL can be a plain string, e.g.::
@@ -2140,20 +2417,22 @@ class ExecuteSQLOp(MigrateOperation):
class OpContainer(MigrateOperation):
"""Represent a sequence of operations operation."""
- def __init__(self, ops=()):
- self.ops = ops
+ def __init__(self, ops: Sequence[MigrateOperation] = ()) -> None:
+ self.ops = list(ops)
- def is_empty(self):
+ def is_empty(self) -> bool:
return not self.ops
- def as_diffs(self):
+ def as_diffs(self) -> Any:
return list(OpContainer._ops_as_diffs(self))
@classmethod
- def _ops_as_diffs(cls, migrations):
+ def _ops_as_diffs(
+ cls, migrations: "OpContainer"
+ ) -> Iterator[Tuple[Any, ...]]:
for op in migrations.ops:
if hasattr(op, "ops"):
- for sub_op in cls._ops_as_diffs(op):
+ for sub_op in cls._ops_as_diffs(cast("OpContainer", op)):
yield sub_op
else:
yield op.to_diff_tuple()
@@ -2162,12 +2441,17 @@ class OpContainer(MigrateOperation):
class ModifyTableOps(OpContainer):
"""Contains a sequence of operations that all apply to a single Table."""
- def __init__(self, table_name, ops, schema=None):
+ def __init__(
+ self,
+ table_name: str,
+ ops: Sequence[MigrateOperation],
+ schema: Optional[str] = None,
+ ) -> None:
super(ModifyTableOps, self).__init__(ops)
self.table_name = table_name
self.schema = schema
- def reverse(self):
+ def reverse(self) -> "ModifyTableOps":
return ModifyTableOps(
self.table_name,
ops=list(reversed([op.reverse() for op in self.ops])),
@@ -2185,17 +2469,21 @@ class UpgradeOps(OpContainer):
"""
- def __init__(self, ops=(), upgrade_token="upgrades"):
+ def __init__(
+ self,
+ ops: Sequence[MigrateOperation] = (),
+ upgrade_token: str = "upgrades",
+ ) -> None:
super(UpgradeOps, self).__init__(ops=ops)
self.upgrade_token = upgrade_token
- def reverse_into(self, downgrade_ops):
- downgrade_ops.ops[:] = list(
+ def reverse_into(self, downgrade_ops: "DowngradeOps") -> "DowngradeOps":
+ downgrade_ops.ops[:] = list( # type:ignore[index]
reversed([op.reverse() for op in self.ops])
)
return downgrade_ops
- def reverse(self):
+ def reverse(self) -> "DowngradeOps":
return self.reverse_into(DowngradeOps(ops=[]))
@@ -2209,7 +2497,11 @@ class DowngradeOps(OpContainer):
"""
- def __init__(self, ops=(), downgrade_token="downgrades"):
+ def __init__(
+ self,
+ ops: Sequence[MigrateOperation] = (),
+ downgrade_token: str = "downgrades",
+ ) -> None:
super(DowngradeOps, self).__init__(ops=ops)
self.downgrade_token = downgrade_token
@@ -2243,19 +2535,21 @@ class MigrationScript(MigrateOperation):
"""
+ _needs_render: Optional[bool]
+
def __init__(
self,
- rev_id,
- upgrade_ops,
- downgrade_ops,
- message=None,
- imports=set(),
- head=None,
- splice=None,
- branch_label=None,
- version_path=None,
- depends_on=None,
- ):
+ rev_id: Optional[str],
+ upgrade_ops: "UpgradeOps",
+ downgrade_ops: "DowngradeOps",
+ message: Optional[str] = None,
+ imports: Set[str] = set(),
+ head: Optional[str] = None,
+ splice: Optional[bool] = None,
+ branch_label: Optional[str] = None,
+ version_path: Optional[str] = None,
+ depends_on: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
self.rev_id = rev_id
self.message = message
self.imports = imports
@@ -2318,7 +2612,7 @@ class MigrationScript(MigrateOperation):
assert isinstance(elem, DowngradeOps)
@property
- def upgrade_ops_list(self):
+ def upgrade_ops_list(self) -> List["UpgradeOps"]:
"""A list of :class:`.UpgradeOps` instances.
This is used in place of the :attr:`.MigrationScript.upgrade_ops`
@@ -2329,7 +2623,7 @@ class MigrationScript(MigrateOperation):
return self._upgrade_ops
@property
- def downgrade_ops_list(self):
+ def downgrade_ops_list(self) -> List["DowngradeOps"]:
"""A list of :class:`.DowngradeOps` instances.
This is used in place of the :attr:`.MigrationScript.downgrade_ops`
diff --git a/alembic/operations/schemaobj.py b/alembic/operations/schemaobj.py
index adbffdc..0d40dc7 100644
--- a/alembic/operations/schemaobj.py
+++ b/alembic/operations/schemaobj.py
@@ -1,3 +1,12 @@
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
from sqlalchemy import schema as sa_schema
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.schema import Constraint
@@ -9,34 +18,59 @@ from .. import util
from ..util import sqla_compat
from ..util.compat import string_types
+if TYPE_CHECKING:
+ from sqlalchemy.sql.elements import ColumnElement
+ from sqlalchemy.sql.elements import TextClause
+ from sqlalchemy.sql.schema import CheckConstraint
+ from sqlalchemy.sql.schema import ForeignKey
+ from sqlalchemy.sql.schema import ForeignKeyConstraint
+ from sqlalchemy.sql.schema import MetaData
+ from sqlalchemy.sql.schema import PrimaryKeyConstraint
+ from sqlalchemy.sql.schema import Table
+ from sqlalchemy.sql.schema import UniqueConstraint
+ from sqlalchemy.sql.type_api import TypeEngine
+
+ from ..runtime.migration import MigrationContext
+
class SchemaObjects:
- def __init__(self, migration_context=None):
+ def __init__(
+ self, migration_context: Optional["MigrationContext"] = None
+ ) -> None:
self.migration_context = migration_context
- def primary_key_constraint(self, name, table_name, cols, schema=None):
+ def primary_key_constraint(
+ self,
+ name: Optional[str],
+ table_name: str,
+ cols: Sequence[str],
+ schema: Optional[str] = None,
+ **dialect_kw
+ ) -> "PrimaryKeyConstraint":
m = self.metadata()
columns = [sa_schema.Column(n, NULLTYPE) for n in cols]
t = sa_schema.Table(table_name, m, *columns, schema=schema)
- p = sa_schema.PrimaryKeyConstraint(*[t.c[n] for n in cols], name=name)
+ p = sa_schema.PrimaryKeyConstraint(
+ *[t.c[n] for n in cols], name=name, **dialect_kw
+ )
return p
def foreign_key_constraint(
self,
- name,
- source,
- referent,
- local_cols,
- remote_cols,
- onupdate=None,
- ondelete=None,
- deferrable=None,
- source_schema=None,
- referent_schema=None,
- initially=None,
- match=None,
+ name: Optional[str],
+ source: str,
+ referent: str,
+ local_cols: List[str],
+ remote_cols: List[str],
+ onupdate: Optional[str] = None,
+ ondelete: Optional[str] = None,
+ deferrable: Optional[bool] = None,
+ source_schema: Optional[str] = None,
+ referent_schema: Optional[str] = None,
+ initially: Optional[str] = None,
+ match: Optional[str] = None,
**dialect_kw
- ):
+ ) -> "ForeignKeyConstraint":
m = self.metadata()
if source == referent and source_schema == referent_schema:
t1_cols = local_cols + remote_cols
@@ -78,7 +112,14 @@ class SchemaObjects:
return f
- def unique_constraint(self, name, source, local_cols, schema=None, **kw):
+ def unique_constraint(
+ self,
+ name: Optional[str],
+ source: str,
+ local_cols: Sequence[str],
+ schema: Optional[str] = None,
+ **kw
+ ) -> "UniqueConstraint":
t = sa_schema.Table(
source,
self.metadata(),
@@ -92,7 +133,14 @@ class SchemaObjects:
t.append_constraint(uq)
return uq
- def check_constraint(self, name, source, condition, schema=None, **kw):
+ def check_constraint(
+ self,
+ name: Optional[str],
+ source: str,
+ condition: Union["TextClause", "ColumnElement[Any]"],
+ schema: Optional[str] = None,
+ **kw
+ ) -> Union["CheckConstraint"]:
t = sa_schema.Table(
source,
self.metadata(),
@@ -103,9 +151,16 @@ class SchemaObjects:
t.append_constraint(ck)
return ck
- def generic_constraint(self, name, table_name, type_, schema=None, **kw):
+ def generic_constraint(
+ self,
+ name: Optional[str],
+ table_name: str,
+ type_: Optional[str],
+ schema: Optional[str] = None,
+ **kw
+ ) -> Any:
t = self.table(table_name, schema=schema)
- types = {
+ types: Dict[Optional[str], Any] = {
"foreignkey": lambda name: sa_schema.ForeignKeyConstraint(
[], [], name=name
),
@@ -126,7 +181,7 @@ class SchemaObjects:
t.append_constraint(const)
return const
- def metadata(self):
+ def metadata(self) -> "MetaData":
kw = {}
if (
self.migration_context is not None
@@ -137,7 +192,7 @@ class SchemaObjects:
kw["naming_convention"] = mt.naming_convention
return sa_schema.MetaData(**kw)
- def table(self, name, *columns, **kw):
+ def table(self, name: str, *columns, **kw) -> "Table":
m = self.metadata()
cols = [
@@ -173,10 +228,17 @@ class SchemaObjects:
self._ensure_table_for_fk(m, f)
return t
- def column(self, name, type_, **kw):
+ def column(self, name: str, type_: "TypeEngine", **kw) -> "Column":
return sa_schema.Column(name, type_, **kw)
- def index(self, name, tablename, columns, schema=None, **kw):
+ def index(
+ self,
+ name: str,
+ tablename: Optional[str],
+ columns: Sequence[Union[str, "TextClause", "ColumnElement[Any]"]],
+ schema: Optional[str] = None,
+ **kw
+ ) -> "Index":
t = sa_schema.Table(
tablename or "no_table",
self.metadata(),
@@ -190,23 +252,27 @@ class SchemaObjects:
)
return idx
- def _parse_table_key(self, table_key):
+ def _parse_table_key(self, table_key: str) -> Tuple[Optional[str], str]:
if "." in table_key:
tokens = table_key.split(".")
- sname = ".".join(tokens[0:-1])
+ sname: Optional[str] = ".".join(tokens[0:-1])
tname = tokens[-1]
else:
tname = table_key
sname = None
return (sname, tname)
- def _ensure_table_for_fk(self, metadata, fk):
+ def _ensure_table_for_fk(
+ self, metadata: "MetaData", fk: "ForeignKey"
+ ) -> None:
"""create a placeholder Table object for the referent of a
ForeignKey.
"""
- if isinstance(fk._colspec, string_types):
- table_key, cname = fk._colspec.rsplit(".", 1)
+ if isinstance(fk._colspec, string_types): # type:ignore[attr-defined]
+ table_key, cname = fk._colspec.rsplit( # type:ignore[attr-defined]
+ ".", 1
+ )
sname, tname = self._parse_table_key(table_key)
if table_key not in metadata.tables:
rel_t = sa_schema.Table(tname, metadata, schema=sname)
diff --git a/alembic/operations/toimpl.py b/alembic/operations/toimpl.py
index 10a41e4..f97983e 100644
--- a/alembic/operations/toimpl.py
+++ b/alembic/operations/toimpl.py
@@ -1,12 +1,19 @@
+from typing import TYPE_CHECKING
+
from sqlalchemy import schema as sa_schema
from . import ops
from .base import Operations
from ..util.sqla_compat import _copy
+if TYPE_CHECKING:
+ from sqlalchemy.sql.schema import Table
+
@Operations.implementation_for(ops.AlterColumnOp)
-def alter_column(operations, operation):
+def alter_column(
+ operations: "Operations", operation: "ops.AlterColumnOp"
+) -> None:
compiler = operations.impl.dialect.statement_compiler(
operations.impl.dialect, None
@@ -68,14 +75,16 @@ def alter_column(operations, operation):
@Operations.implementation_for(ops.DropTableOp)
-def drop_table(operations, operation):
+def drop_table(operations: "Operations", operation: "ops.DropTableOp") -> None:
operations.impl.drop_table(
operation.to_table(operations.migration_context)
)
@Operations.implementation_for(ops.DropColumnOp)
-def drop_column(operations, operation):
+def drop_column(
+ operations: "Operations", operation: "ops.DropColumnOp"
+) -> None:
column = operation.to_column(operations.migration_context)
operations.impl.drop_column(
operation.table_name, column, schema=operation.schema, **operation.kw
@@ -83,46 +92,56 @@ def drop_column(operations, operation):
@Operations.implementation_for(ops.CreateIndexOp)
-def create_index(operations, operation):
+def create_index(
+ operations: "Operations", operation: "ops.CreateIndexOp"
+) -> None:
idx = operation.to_index(operations.migration_context)
operations.impl.create_index(idx)
@Operations.implementation_for(ops.DropIndexOp)
-def drop_index(operations, operation):
+def drop_index(operations: "Operations", operation: "ops.DropIndexOp") -> None:
operations.impl.drop_index(
operation.to_index(operations.migration_context)
)
@Operations.implementation_for(ops.CreateTableOp)
-def create_table(operations, operation):
+def create_table(
+ operations: "Operations", operation: "ops.CreateTableOp"
+) -> "Table":
table = operation.to_table(operations.migration_context)
operations.impl.create_table(table)
return table
@Operations.implementation_for(ops.RenameTableOp)
-def rename_table(operations, operation):
+def rename_table(
+ operations: "Operations", operation: "ops.RenameTableOp"
+) -> None:
operations.impl.rename_table(
operation.table_name, operation.new_table_name, schema=operation.schema
)
@Operations.implementation_for(ops.CreateTableCommentOp)
-def create_table_comment(operations, operation):
+def create_table_comment(
+ operations: "Operations", operation: "ops.CreateTableCommentOp"
+) -> None:
table = operation.to_table(operations.migration_context)
operations.impl.create_table_comment(table)
@Operations.implementation_for(ops.DropTableCommentOp)
-def drop_table_comment(operations, operation):
+def drop_table_comment(
+ operations: "Operations", operation: "ops.DropTableCommentOp"
+) -> None:
table = operation.to_table(operations.migration_context)
operations.impl.drop_table_comment(table)
@Operations.implementation_for(ops.AddColumnOp)
-def add_column(operations, operation):
+def add_column(operations: "Operations", operation: "ops.AddColumnOp") -> None:
table_name = operation.table_name
column = operation.column
schema = operation.schema
@@ -150,14 +169,18 @@ def add_column(operations, operation):
@Operations.implementation_for(ops.AddConstraintOp)
-def create_constraint(operations, operation):
+def create_constraint(
+ operations: "Operations", operation: "ops.AddConstraintOp"
+) -> None:
operations.impl.add_constraint(
operation.to_constraint(operations.migration_context)
)
@Operations.implementation_for(ops.DropConstraintOp)
-def drop_constraint(operations, operation):
+def drop_constraint(
+ operations: "Operations", operation: "ops.DropConstraintOp"
+) -> None:
operations.impl.drop_constraint(
operations.schema_obj.generic_constraint(
operation.constraint_name,
@@ -169,14 +192,18 @@ def drop_constraint(operations, operation):
@Operations.implementation_for(ops.BulkInsertOp)
-def bulk_insert(operations, operation):
+def bulk_insert(
+ operations: "Operations", operation: "ops.BulkInsertOp"
+) -> None:
operations.impl.bulk_insert(
operation.table, operation.rows, multiinsert=operation.multiinsert
)
@Operations.implementation_for(ops.ExecuteSQLOp)
-def execute_sql(operations, operation):
+def execute_sql(
+ operations: "Operations", operation: "ops.ExecuteSQLOp"
+) -> None:
operations.migration_context.impl.execute(
operation.sqltext, execution_options=operation.execution_options
)