diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2023-03-06 13:34:40 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2023-03-06 14:09:04 -0500 |
commit | 3cf19f16631e54108e5f519348cba14521b7433e (patch) | |
tree | 2097dc67dfdd0dec3fb94317dde1fcf2b812f751 | |
parent | cd7b1a94cc90ca533a84b47332900888151e22d0 (diff) | |
download | alembic-3cf19f16631e54108e5f519348cba14521b7433e.tar.gz |
distinguish between string contraint name and defined
Take _NONE_NAME into account as a valid constraint name
and don't skip these constraints or consider them to be unnamed.
Thanks to typing this also revealed that previous batch versions
were also keying "_NONE_NAME" constraints as though they were named.
Fixed regression for 1.10.0 where :class:`.Constraint` objects were
suddenly required to have non-None name fields when using batch mode, which
was not previously a requirement.
Change-Id: If4a7191a00848b19cb124bc6da362f3bc6ce1472
Fixes: #1195
-rw-r--r-- | alembic/autogenerate/compare.py | 2 | ||||
-rw-r--r-- | alembic/operations/batch.py | 14 | ||||
-rw-r--r-- | alembic/operations/ops.py | 10 | ||||
-rw-r--r-- | alembic/operations/schemaobj.py | 14 | ||||
-rw-r--r-- | alembic/util/sqla_compat.py | 20 | ||||
-rw-r--r-- | docs/build/unreleased/1195.rst | 7 | ||||
-rw-r--r-- | tests/test_batch.py | 26 |
7 files changed, 74 insertions, 19 deletions
diff --git a/alembic/autogenerate/compare.py b/alembic/autogenerate/compare.py index f50c41c..4f5126f 100644 --- a/alembic/autogenerate/compare.py +++ b/alembic/autogenerate/compare.py @@ -652,7 +652,7 @@ def _compare_indexes_and_uniques( conn_names = { c.name: c for c in conn_unique_constraints.union(conn_indexes_sig) - if sqla_compat.constraint_name_defined(c.name) + if sqla_compat.constraint_name_string(c.name) } doubled_constraints = { diff --git a/alembic/operations/batch.py b/alembic/operations/batch.py index fe32eec..da2caf6 100644 --- a/alembic/operations/batch.py +++ b/alembic/operations/batch.py @@ -34,6 +34,7 @@ from ..util.sqla_compat import _remove_column_from_collection from ..util.sqla_compat import _resolve_for_variant from ..util.sqla_compat import _select from ..util.sqla_compat import constraint_name_defined +from ..util.sqla_compat import constraint_name_string if TYPE_CHECKING: from typing import Literal @@ -269,7 +270,7 @@ class ApplyBatchImpl: # because # we have no way to determine _is_type_bound() for these. pass - elif constraint_name_defined(const.name): + elif constraint_name_string(const.name): self.named_constraints[const.name] = const else: self.unnamed_constraints.append(const) @@ -669,7 +670,10 @@ class ApplyBatchImpl: if self.table.primary_key in self.unnamed_constraints: self.unnamed_constraints.remove(self.table.primary_key) - self.named_constraints[const.name] = const + if constraint_name_string(const.name): + self.named_constraints[const.name] = const + else: + self.unnamed_constraints.append(const) def drop_constraint(self, const: Constraint) -> None: if not const.name: @@ -681,9 +685,11 @@ class ApplyBatchImpl: for col_const in list(self.columns[col.name].constraints): if col_const.name == const.name: self.columns[col.name].constraints.remove(col_const) - else: - assert constraint_name_defined(const.name) + elif constraint_name_string(const.name): const = self.named_constraints.pop(const.name) + elif const in self.unnamed_constraints: + self.unnamed_constraints.remove(const) + except KeyError: if _is_type_bound(const): # type-bound constraints are only included in the new diff --git a/alembic/operations/ops.py b/alembic/operations/ops.py index 8e1144b..48384f9 100644 --- a/alembic/operations/ops.py +++ b/alembic/operations/ops.py @@ -130,7 +130,7 @@ class DropConstraintOp(MigrateOperation): def __init__( self, - constraint_name: Optional[str], + constraint_name: Optional[sqla_compat._ConstraintNameDefined], table_name: str, type_: Optional[str] = None, schema: Optional[str] = None, @@ -255,7 +255,7 @@ class CreatePrimaryKeyOp(AddConstraintOp): def __init__( self, - constraint_name: Optional[str], + constraint_name: Optional[sqla_compat._ConstraintNameDefined], table_name: str, columns: Sequence[str], schema: Optional[str] = None, @@ -379,7 +379,7 @@ class CreateUniqueConstraintOp(AddConstraintOp): def __init__( self, - constraint_name: Optional[str], + constraint_name: Optional[sqla_compat._ConstraintNameDefined], table_name: str, columns: Sequence[str], schema: Optional[str] = None, @@ -513,7 +513,7 @@ class CreateForeignKeyOp(AddConstraintOp): def __init__( self, - constraint_name: Optional[str], + constraint_name: Optional[sqla_compat._ConstraintNameDefined], source_table: str, referent_table: str, local_cols: List[str], @@ -730,7 +730,7 @@ class CreateCheckConstraintOp(AddConstraintOp): def __init__( self, - constraint_name: Optional[str], + constraint_name: Optional[sqla_compat._ConstraintNameDefined], table_name: str, condition: Union[str, TextClause, ColumnElement[Any]], schema: Optional[str] = None, diff --git a/alembic/operations/schemaobj.py b/alembic/operations/schemaobj.py index ba09b3b..0568471 100644 --- a/alembic/operations/schemaobj.py +++ b/alembic/operations/schemaobj.py @@ -42,7 +42,7 @@ class SchemaObjects: def primary_key_constraint( self, - name: Optional[str], + name: Optional[sqla_compat._ConstraintNameDefined], table_name: str, cols: Sequence[str], schema: Optional[str] = None, @@ -51,14 +51,16 @@ class SchemaObjects: m = self.metadata() columns = [sa_schema.Column(n, NULLTYPE) for n in cols] t = sa_schema.Table(table_name, m, *columns, schema=schema) + # SQLAlchemy primary key constraint name arg is wrongly typed on + # the SQLAlchemy side through 2.0.5 at least p = sa_schema.PrimaryKeyConstraint( - *[t.c[n] for n in cols], name=name, **dialect_kw + *[t.c[n] for n in cols], name=name, **dialect_kw # type: ignore ) return p def foreign_key_constraint( self, - name: Optional[str], + name: Optional[sqla_compat._ConstraintNameDefined], source: str, referent: str, local_cols: List[str], @@ -115,7 +117,7 @@ class SchemaObjects: def unique_constraint( self, - name: Optional[str], + name: Optional[sqla_compat._ConstraintNameDefined], source: str, local_cols: Sequence[str], schema: Optional[str] = None, @@ -136,7 +138,7 @@ class SchemaObjects: def check_constraint( self, - name: Optional[str], + name: Optional[sqla_compat._ConstraintNameDefined], source: str, condition: Union[str, TextClause, ColumnElement[Any]], schema: Optional[str] = None, @@ -154,7 +156,7 @@ class SchemaObjects: def generic_constraint( self, - name: Optional[str], + name: Optional[sqla_compat._ConstraintNameDefined], table_name: str, type_: Optional[str], schema: Optional[str] = None, diff --git a/alembic/util/sqla_compat.py b/alembic/util/sqla_compat.py index 738bbcb..2cc070b 100644 --- a/alembic/util/sqla_compat.py +++ b/alembic/util/sqla_compat.py @@ -27,6 +27,7 @@ from sqlalchemy.sql.elements import ColumnClause from sqlalchemy.sql.elements import quoted_name from sqlalchemy.sql.elements import TextClause from sqlalchemy.sql.elements import UnaryExpression +from sqlalchemy.sql.naming import _NONE_NAME as _NONE_NAME from sqlalchemy.sql.visitors import traverse from typing_extensions import TypeGuard @@ -111,15 +112,28 @@ if sqla_2: else: from sqlalchemy.util import symbol as _NoneName # type: ignore[assignment] + _ConstraintName = Union[None, str, _NoneName] +_ConstraintNameDefined = Union[str, _NoneName] + + +def constraint_name_defined( + name: _ConstraintName, +) -> TypeGuard[_ConstraintNameDefined]: + return name is _NONE_NAME or isinstance(name, (str, _NoneName)) -def constraint_name_defined(name: _ConstraintName) -> TypeGuard[str]: + +def constraint_name_string( + name: _ConstraintName, +) -> TypeGuard[str]: return isinstance(name, str) -def constraint_name_or_none(name: _ConstraintName) -> Optional[str]: - return name if constraint_name_defined(name) else None +def constraint_name_or_none( + name: _ConstraintName, +) -> Optional[str]: + return name if constraint_name_string(name) else None AUTOINCREMENT_DEFAULT = "auto" diff --git a/docs/build/unreleased/1195.rst b/docs/build/unreleased/1195.rst new file mode 100644 index 0000000..11542f1 --- /dev/null +++ b/docs/build/unreleased/1195.rst @@ -0,0 +1,7 @@ +.. change:: + :tags: bug, batch, regression + :tickets: 1195 + + Fixed regression for 1.10.0 where :class:`.Constraint` objects were + suddenly required to have non-None name fields when using batch mode, which + was not previously a requirement. diff --git a/tests/test_batch.py b/tests/test_batch.py index e0289aa..3b67895 100644 --- a/tests/test_batch.py +++ b/tests/test_batch.py @@ -49,6 +49,7 @@ from alembic.testing.fixtures import capture_context_buffer from alembic.testing.fixtures import op_fixture from alembic.util import CommandError from alembic.util import exc as alembic_exc +from alembic.util.sqla_compat import _NONE_NAME from alembic.util.sqla_compat import _safe_commit_connection_transaction from alembic.util.sqla_compat import _select from alembic.util.sqla_compat import has_computed @@ -819,6 +820,18 @@ class BatchApplyTest(TestBase): ddl_not_contains="CONSTRAINT uq1 UNIQUE", ) + def test_add_ck_unnamed(self): + """test for #1195""" + impl = self._simple_fixture() + ck = self.op.schema_obj.check_constraint(_NONE_NAME, "tname", "y > 5") + + impl.add_constraint(ck) + self._assert_impl( + impl, + colnames=["id", "x", "y"], + ddl_contains="CHECK (y > 5)", + ) + def test_add_ck(self): impl = self._simple_fixture() ck = self.op.schema_obj.check_constraint("ck1", "tname", "y > 5") @@ -1444,6 +1457,19 @@ class BatchRoundTripTest(TestBase): t = Table("hasbool", self.metadata, Column("x", Integer)) t.create(self.conn) + def test_add_constraint_type(self): + """test for #1195.""" + + with self.op.batch_alter_table("foo") as batch_op: + batch_op.add_column(Column("q", Boolean(create_constraint=True))) + insp = inspect(self.conn) + + assert { + c["type"]._type_affinity + for c in insp.get_columns("foo") + if c["name"] == "q" + }.intersection([Boolean, Integer]) + def test_change_type_boolean_to_int(self): self._boolean_fixture() with self.op.batch_alter_table("hasbool") as batch_op: |