summaryrefslogtreecommitdiff
path: root/tests/test_op.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2021-02-26 13:11:17 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2021-04-12 14:39:45 -0400
commitdd9c4d695c4463c872b25f59b6a8742bbc047150 (patch)
treec77c80a239ee1afca1a9c91633a78f723279a1fe /tests/test_op.py
parent4f1aef0186cd7141acfb31c05c802293fe805a66 (diff)
downloadalembic-dd9c4d695c4463c872b25f59b6a8742bbc047150.tar.gz
Create schema objects fresh from ops
Refactored the implementation of :class:`.MigrateOperation` constructs such as :class:`.CreateIndexOp`, :class:`.CreateTableOp`, etc. so that they no longer rely upon maintaining a persistent version of each schema object internally; instead, the state variables of each operation object will be used to produce the corresponding construct when the operation is invoked. The rationale is so that environments which make use of operation-manipulation schemes such as those those discussed in :ref:`autogen_rewriter` are better supported, allowing end-user code to manipulate the public attributes of these objects which will then be expressed in the final output, an example is ``some_create_index_op.kw["postgresql_concurrently"] = True``. Previously, these objects when generated from autogenerate would typically hold onto the original, reflected element internally without honoring the other state variables of each construct, preventing the public API from working. Change-Id: Ida2537740325de5c21e9447e930a518093f01bd4 Fixes: #803
Diffstat (limited to 'tests/test_op.py')
-rw-r--r--tests/test_op.py178
1 files changed, 157 insertions, 21 deletions
diff --git a/tests/test_op.py b/tests/test_op.py
index 93bba28..63d3e96 100644
--- a/tests/test_op.py
+++ b/tests/test_op.py
@@ -6,10 +6,12 @@ from sqlalchemy import Column
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import ForeignKey
+from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy import Table
+from sqlalchemy import UniqueConstraint
from sqlalchemy.sql import column
from sqlalchemy.sql import func
from sqlalchemy.sql import text
@@ -22,7 +24,7 @@ from alembic.testing import assert_raises_message
from alembic.testing import combinations
from alembic.testing import config
from alembic.testing import eq_
-from alembic.testing import is_
+from alembic.testing import is_not_
from alembic.testing import mock
from alembic.testing.fixtures import AlterColRoundTripFixture
from alembic.testing.fixtures import op_fixture
@@ -47,16 +49,10 @@ class OpTest(TestBase):
op.rename_table("t1", "t2", schema="foo")
context.assert_("ALTER TABLE foo.t1 RENAME TO foo.t2")
- def test_create_index_no_expr_allowed(self):
- op_fixture()
- assert_raises_message(
- ValueError,
- r"String or text\(\) construct expected",
- op.create_index,
- "name",
- "tname",
- [func.foo(column("x"))],
- )
+ def test_create_index_arbitrary_expr(self):
+ context = op_fixture()
+ op.create_index("name", "tname", [func.foo(column("x"))])
+ context.assert_("CREATE INDEX name ON tname (foo(x))")
def test_add_column_schema_hard_quoting(self):
@@ -843,6 +839,63 @@ class OpTest(TestBase):
"FOREIGN KEY(st_id) REFERENCES some_table (id))"
)
+ def test_create_table_check_constraint(self):
+ context = op_fixture()
+ t1 = op.create_table(
+ "some_table",
+ Column("id", Integer, primary_key=True),
+ Column("foo_id", Integer),
+ CheckConstraint("foo_id>5", name="ck_1"),
+ )
+ context.assert_(
+ "CREATE TABLE some_table ("
+ "id INTEGER NOT NULL, "
+ "foo_id INTEGER, "
+ "PRIMARY KEY (id), "
+ "CONSTRAINT ck_1 CHECK (foo_id>5))"
+ )
+
+ ck = [c for c in t1.constraints if isinstance(c, CheckConstraint)]
+ eq_(ck[0].name, "ck_1")
+
+ def test_create_table_unique_constraint(self):
+ context = op_fixture()
+ t1 = op.create_table(
+ "some_table",
+ Column("id", Integer, primary_key=True),
+ Column("foo_id", Integer),
+ UniqueConstraint("foo_id", name="uq_1"),
+ )
+ context.assert_(
+ "CREATE TABLE some_table ("
+ "id INTEGER NOT NULL, "
+ "foo_id INTEGER, "
+ "PRIMARY KEY (id), "
+ "CONSTRAINT uq_1 UNIQUE (foo_id))"
+ )
+
+ uq = [c for c in t1.constraints if isinstance(c, UniqueConstraint)]
+ eq_(uq[0].name, "uq_1")
+
+ def test_create_table_index(self):
+ context = op_fixture()
+ t1 = op.create_table(
+ "some_table",
+ Column("id", Integer, primary_key=True),
+ Column("foo_id", Integer),
+ Index("ix_1", "foo_id"),
+ )
+ context.assert_(
+ "CREATE TABLE some_table ("
+ "id INTEGER NOT NULL, "
+ "foo_id INTEGER, "
+ "PRIMARY KEY (id))",
+ "CREATE INDEX ix_1 ON some_table (foo_id)",
+ )
+
+ ix = list(t1.indexes)
+ eq_(ix[0].name, "ix_1")
+
def test_create_table_fk_and_schema(self):
context = op_fixture()
t1 = op.create_table(
@@ -1024,13 +1077,15 @@ class CustomOpTest(TestBase):
context.assert_("CREATE SEQUENCE foob")
-class EnsureOrigObjectFromToTest(TestBase):
- """the to_XYZ and from_XYZ methods are used heavily in autogenerate.
+class ObjectFromToTest(TestBase):
+ """Test operation round trips for to_obj() / from_obj().
- It's critical that these methods, at least the "drop" form,
- always return the *same* object if available so that all the info
- passed into to_XYZ is maintained in the from_XYZ.
+ Previously, these needed to preserve the "original" item
+ to this, but this makes them harder to work with.
+ As of #803 the constructs try to behave more intelligently
+ about the state they were given, so that they can both "reverse"
+ themselves but also take into accout their current state.
"""
@@ -1038,31 +1093,112 @@ class EnsureOrigObjectFromToTest(TestBase):
schema_obj = schemaobj.SchemaObjects()
idx = schema_obj.index("x", "y", ["z"])
op = ops.DropIndexOp.from_index(idx)
- is_(op.to_index(), idx)
+ is_not_(op.to_index(), idx)
+
+ def test_drop_index_add_kw(self):
+ schema_obj = schemaobj.SchemaObjects()
+ idx = schema_obj.index("x", "y", ["z"])
+ op = ops.DropIndexOp.from_index(idx)
+
+ op.kw["postgresql_concurrently"] = True
+ eq_(op.to_index().dialect_kwargs["postgresql_concurrently"], True)
+
+ eq_(
+ op.reverse().to_index().dialect_kwargs["postgresql_concurrently"],
+ True,
+ )
def test_create_index(self):
schema_obj = schemaobj.SchemaObjects()
idx = schema_obj.index("x", "y", ["z"])
op = ops.CreateIndexOp.from_index(idx)
- is_(op.to_index(), idx)
+
+ is_not_(op.to_index(), idx)
+
+ def test_create_index_add_kw(self):
+ schema_obj = schemaobj.SchemaObjects()
+ idx = schema_obj.index("x", "y", ["z"])
+ op = ops.CreateIndexOp.from_index(idx)
+
+ op.kw["postgresql_concurrently"] = True
+
+ eq_(op.to_index().dialect_kwargs["postgresql_concurrently"], True)
+ eq_(
+ op.reverse().to_index().dialect_kwargs["postgresql_concurrently"],
+ True,
+ )
def test_drop_table(self):
schema_obj = schemaobj.SchemaObjects()
table = schema_obj.table("x", Column("q", Integer))
op = ops.DropTableOp.from_table(table)
- is_(op.to_table(), table)
+ is_not_(op.to_table(), table)
+
+ def test_drop_table_add_kw(self):
+ schema_obj = schemaobj.SchemaObjects()
+ table = schema_obj.table("x", Column("q", Integer))
+ op = ops.DropTableOp.from_table(table)
+
+ op.table_kw["postgresql_partition_by"] = "x"
+
+ eq_(op.to_table().dialect_kwargs["postgresql_partition_by"], "x")
+ eq_(
+ op.reverse().to_table().dialect_kwargs["postgresql_partition_by"],
+ "x",
+ )
def test_create_table(self):
schema_obj = schemaobj.SchemaObjects()
table = schema_obj.table("x", Column("q", Integer))
op = ops.CreateTableOp.from_table(table)
- is_(op.to_table(), table)
+ is_not_(op.to_table(), table)
+
+ def test_create_table_add_kw(self):
+ schema_obj = schemaobj.SchemaObjects()
+ table = schema_obj.table("x", Column("q", Integer))
+ op = ops.CreateTableOp.from_table(table)
+ op.kw["postgresql_partition_by"] = "x"
+
+ eq_(op.to_table().dialect_kwargs["postgresql_partition_by"], "x")
+ eq_(
+ op.reverse().to_table().dialect_kwargs["postgresql_partition_by"],
+ "x",
+ )
+
+ def test_create_unique_constraint(self):
+ schema_obj = schemaobj.SchemaObjects()
+ const = schema_obj.unique_constraint("x", "foobar", ["a"])
+ op = ops.AddConstraintOp.from_constraint(const)
+ is_not_(op.to_constraint(), const)
+
+ def test_create_unique_constraint_add_kw(self):
+ schema_obj = schemaobj.SchemaObjects()
+ const = schema_obj.unique_constraint("x", "foobar", ["a"])
+ op = ops.AddConstraintOp.from_constraint(const)
+ is_not_(op.to_constraint(), const)
+
+ op.kw["sqlite_on_conflict"] = "IGNORE"
+
+ eq_(op.to_constraint().dialect_kwargs["sqlite_on_conflict"], "IGNORE")
+ eq_(
+ op.reverse().to_constraint().dialect_kwargs["sqlite_on_conflict"],
+ "IGNORE",
+ )
def test_drop_unique_constraint(self):
schema_obj = schemaobj.SchemaObjects()
const = schema_obj.unique_constraint("x", "foobar", ["a"])
op = ops.DropConstraintOp.from_constraint(const)
- is_(op.to_constraint(), const)
+ is_not_(op.to_constraint(), const)
+
+ def test_drop_unique_constraint_change_name(self):
+ schema_obj = schemaobj.SchemaObjects()
+ const = schema_obj.unique_constraint("x", "foobar", ["a"])
+ op = ops.DropConstraintOp.from_constraint(const)
+
+ op.constraint_name = "my_name"
+ eq_(op.to_constraint().name, "my_name")
+ eq_(op.reverse().to_constraint().name, "my_name")
def test_drop_constraint_not_available(self):
op = ops.DropConstraintOp("x", "y", type_="unique")