diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-02-26 13:11:17 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-04-12 14:39:45 -0400 |
commit | dd9c4d695c4463c872b25f59b6a8742bbc047150 (patch) | |
tree | c77c80a239ee1afca1a9c91633a78f723279a1fe /tests/test_op.py | |
parent | 4f1aef0186cd7141acfb31c05c802293fe805a66 (diff) | |
download | alembic-dd9c4d695c4463c872b25f59b6a8742bbc047150.tar.gz |
Create schema objects fresh from ops
Refactored the implementation of :class:`.MigrateOperation` constructs such
as :class:`.CreateIndexOp`, :class:`.CreateTableOp`, etc. so that they no
longer rely upon maintaining a persistent version of each schema object
internally; instead, the state variables of each operation object will be
used to produce the corresponding construct when the operation is invoked.
The rationale is so that environments which make use of
operation-manipulation schemes such as those those discussed in
:ref:`autogen_rewriter` are better supported, allowing end-user code to
manipulate the public attributes of these objects which will then be
expressed in the final output, an example is
``some_create_index_op.kw["postgresql_concurrently"] = True``.
Previously, these objects when generated from autogenerate would typically
hold onto the original, reflected element internally without honoring the
other state variables of each construct, preventing the public API from
working.
Change-Id: Ida2537740325de5c21e9447e930a518093f01bd4
Fixes: #803
Diffstat (limited to 'tests/test_op.py')
-rw-r--r-- | tests/test_op.py | 178 |
1 files changed, 157 insertions, 21 deletions
diff --git a/tests/test_op.py b/tests/test_op.py index 93bba28..63d3e96 100644 --- a/tests/test_op.py +++ b/tests/test_op.py @@ -6,10 +6,12 @@ from sqlalchemy import Column from sqlalchemy import event from sqlalchemy import exc from sqlalchemy import ForeignKey +from sqlalchemy import Index from sqlalchemy import Integer from sqlalchemy import MetaData from sqlalchemy import String from sqlalchemy import Table +from sqlalchemy import UniqueConstraint from sqlalchemy.sql import column from sqlalchemy.sql import func from sqlalchemy.sql import text @@ -22,7 +24,7 @@ from alembic.testing import assert_raises_message from alembic.testing import combinations from alembic.testing import config from alembic.testing import eq_ -from alembic.testing import is_ +from alembic.testing import is_not_ from alembic.testing import mock from alembic.testing.fixtures import AlterColRoundTripFixture from alembic.testing.fixtures import op_fixture @@ -47,16 +49,10 @@ class OpTest(TestBase): op.rename_table("t1", "t2", schema="foo") context.assert_("ALTER TABLE foo.t1 RENAME TO foo.t2") - def test_create_index_no_expr_allowed(self): - op_fixture() - assert_raises_message( - ValueError, - r"String or text\(\) construct expected", - op.create_index, - "name", - "tname", - [func.foo(column("x"))], - ) + def test_create_index_arbitrary_expr(self): + context = op_fixture() + op.create_index("name", "tname", [func.foo(column("x"))]) + context.assert_("CREATE INDEX name ON tname (foo(x))") def test_add_column_schema_hard_quoting(self): @@ -843,6 +839,63 @@ class OpTest(TestBase): "FOREIGN KEY(st_id) REFERENCES some_table (id))" ) + def test_create_table_check_constraint(self): + context = op_fixture() + t1 = op.create_table( + "some_table", + Column("id", Integer, primary_key=True), + Column("foo_id", Integer), + CheckConstraint("foo_id>5", name="ck_1"), + ) + context.assert_( + "CREATE TABLE some_table (" + "id INTEGER NOT NULL, " + "foo_id INTEGER, " + "PRIMARY KEY (id), " + "CONSTRAINT ck_1 CHECK (foo_id>5))" + ) + + ck = [c for c in t1.constraints if isinstance(c, CheckConstraint)] + eq_(ck[0].name, "ck_1") + + def test_create_table_unique_constraint(self): + context = op_fixture() + t1 = op.create_table( + "some_table", + Column("id", Integer, primary_key=True), + Column("foo_id", Integer), + UniqueConstraint("foo_id", name="uq_1"), + ) + context.assert_( + "CREATE TABLE some_table (" + "id INTEGER NOT NULL, " + "foo_id INTEGER, " + "PRIMARY KEY (id), " + "CONSTRAINT uq_1 UNIQUE (foo_id))" + ) + + uq = [c for c in t1.constraints if isinstance(c, UniqueConstraint)] + eq_(uq[0].name, "uq_1") + + def test_create_table_index(self): + context = op_fixture() + t1 = op.create_table( + "some_table", + Column("id", Integer, primary_key=True), + Column("foo_id", Integer), + Index("ix_1", "foo_id"), + ) + context.assert_( + "CREATE TABLE some_table (" + "id INTEGER NOT NULL, " + "foo_id INTEGER, " + "PRIMARY KEY (id))", + "CREATE INDEX ix_1 ON some_table (foo_id)", + ) + + ix = list(t1.indexes) + eq_(ix[0].name, "ix_1") + def test_create_table_fk_and_schema(self): context = op_fixture() t1 = op.create_table( @@ -1024,13 +1077,15 @@ class CustomOpTest(TestBase): context.assert_("CREATE SEQUENCE foob") -class EnsureOrigObjectFromToTest(TestBase): - """the to_XYZ and from_XYZ methods are used heavily in autogenerate. +class ObjectFromToTest(TestBase): + """Test operation round trips for to_obj() / from_obj(). - It's critical that these methods, at least the "drop" form, - always return the *same* object if available so that all the info - passed into to_XYZ is maintained in the from_XYZ. + Previously, these needed to preserve the "original" item + to this, but this makes them harder to work with. + As of #803 the constructs try to behave more intelligently + about the state they were given, so that they can both "reverse" + themselves but also take into accout their current state. """ @@ -1038,31 +1093,112 @@ class EnsureOrigObjectFromToTest(TestBase): schema_obj = schemaobj.SchemaObjects() idx = schema_obj.index("x", "y", ["z"]) op = ops.DropIndexOp.from_index(idx) - is_(op.to_index(), idx) + is_not_(op.to_index(), idx) + + def test_drop_index_add_kw(self): + schema_obj = schemaobj.SchemaObjects() + idx = schema_obj.index("x", "y", ["z"]) + op = ops.DropIndexOp.from_index(idx) + + op.kw["postgresql_concurrently"] = True + eq_(op.to_index().dialect_kwargs["postgresql_concurrently"], True) + + eq_( + op.reverse().to_index().dialect_kwargs["postgresql_concurrently"], + True, + ) def test_create_index(self): schema_obj = schemaobj.SchemaObjects() idx = schema_obj.index("x", "y", ["z"]) op = ops.CreateIndexOp.from_index(idx) - is_(op.to_index(), idx) + + is_not_(op.to_index(), idx) + + def test_create_index_add_kw(self): + schema_obj = schemaobj.SchemaObjects() + idx = schema_obj.index("x", "y", ["z"]) + op = ops.CreateIndexOp.from_index(idx) + + op.kw["postgresql_concurrently"] = True + + eq_(op.to_index().dialect_kwargs["postgresql_concurrently"], True) + eq_( + op.reverse().to_index().dialect_kwargs["postgresql_concurrently"], + True, + ) def test_drop_table(self): schema_obj = schemaobj.SchemaObjects() table = schema_obj.table("x", Column("q", Integer)) op = ops.DropTableOp.from_table(table) - is_(op.to_table(), table) + is_not_(op.to_table(), table) + + def test_drop_table_add_kw(self): + schema_obj = schemaobj.SchemaObjects() + table = schema_obj.table("x", Column("q", Integer)) + op = ops.DropTableOp.from_table(table) + + op.table_kw["postgresql_partition_by"] = "x" + + eq_(op.to_table().dialect_kwargs["postgresql_partition_by"], "x") + eq_( + op.reverse().to_table().dialect_kwargs["postgresql_partition_by"], + "x", + ) def test_create_table(self): schema_obj = schemaobj.SchemaObjects() table = schema_obj.table("x", Column("q", Integer)) op = ops.CreateTableOp.from_table(table) - is_(op.to_table(), table) + is_not_(op.to_table(), table) + + def test_create_table_add_kw(self): + schema_obj = schemaobj.SchemaObjects() + table = schema_obj.table("x", Column("q", Integer)) + op = ops.CreateTableOp.from_table(table) + op.kw["postgresql_partition_by"] = "x" + + eq_(op.to_table().dialect_kwargs["postgresql_partition_by"], "x") + eq_( + op.reverse().to_table().dialect_kwargs["postgresql_partition_by"], + "x", + ) + + def test_create_unique_constraint(self): + schema_obj = schemaobj.SchemaObjects() + const = schema_obj.unique_constraint("x", "foobar", ["a"]) + op = ops.AddConstraintOp.from_constraint(const) + is_not_(op.to_constraint(), const) + + def test_create_unique_constraint_add_kw(self): + schema_obj = schemaobj.SchemaObjects() + const = schema_obj.unique_constraint("x", "foobar", ["a"]) + op = ops.AddConstraintOp.from_constraint(const) + is_not_(op.to_constraint(), const) + + op.kw["sqlite_on_conflict"] = "IGNORE" + + eq_(op.to_constraint().dialect_kwargs["sqlite_on_conflict"], "IGNORE") + eq_( + op.reverse().to_constraint().dialect_kwargs["sqlite_on_conflict"], + "IGNORE", + ) def test_drop_unique_constraint(self): schema_obj = schemaobj.SchemaObjects() const = schema_obj.unique_constraint("x", "foobar", ["a"]) op = ops.DropConstraintOp.from_constraint(const) - is_(op.to_constraint(), const) + is_not_(op.to_constraint(), const) + + def test_drop_unique_constraint_change_name(self): + schema_obj = schemaobj.SchemaObjects() + const = schema_obj.unique_constraint("x", "foobar", ["a"]) + op = ops.DropConstraintOp.from_constraint(const) + + op.constraint_name = "my_name" + eq_(op.to_constraint().name, "my_name") + eq_(op.reverse().to_constraint().name, "my_name") def test_drop_constraint_not_available(self): op = ops.DropConstraintOp("x", "y", type_="unique") |