diff options
Diffstat (limited to 'tests/test_autogen_diffs.py')
-rw-r--r-- | tests/test_autogen_diffs.py | 962 |
1 files changed, 962 insertions, 0 deletions
diff --git a/tests/test_autogen_diffs.py b/tests/test_autogen_diffs.py new file mode 100644 index 0000000..f32fd84 --- /dev/null +++ b/tests/test_autogen_diffs.py @@ -0,0 +1,962 @@ +import sys + +from sqlalchemy import MetaData, Column, Table, Integer, String, Text, \ + Numeric, CHAR, ForeignKey, INTEGER, Index, UniqueConstraint, \ + TypeDecorator, CheckConstraint, text, PrimaryKeyConstraint +from sqlalchemy.types import NULLTYPE +from sqlalchemy.engine.reflection import Inspector + +from alembic import autogenerate +from alembic.migration import MigrationContext +from alembic.testing import TestBase +from alembic.testing import config +from alembic.testing import assert_raises_message +from alembic.testing.mock import Mock +from alembic.testing import eq_ +from alembic.util import CommandError +from ._autogen_fixtures import \ + AutogenTest, AutogenFixtureTest, _default_object_filters + +py3k = sys.version_info >= (3, ) + + +class AutogenCrossSchemaTest(AutogenTest, TestBase): + __only_on__ = 'postgresql' + + @classmethod + def _get_db_schema(cls): + m = MetaData() + Table('t1', m, + Column('x', Integer) + ) + Table('t2', m, + Column('y', Integer), + schema=config.test_schema + ) + Table('t6', m, + Column('u', Integer) + ) + Table('t7', m, + Column('v', Integer), + schema=config.test_schema + ) + + return m + + @classmethod + def _get_model_schema(cls): + m = MetaData() + Table('t3', m, + Column('q', Integer) + ) + Table('t4', m, + Column('z', Integer), + schema=config.test_schema + ) + Table('t6', m, + Column('u', Integer) + ) + Table('t7', m, + Column('v', Integer), + schema=config.test_schema + ) + return m + + def test_default_schema_omitted_upgrade(self): + diffs = [] + + def include_object(obj, name, type_, reflected, compare_to): + if type_ == "table": + return name == "t3" + else: + return True + self.autogen_context.update({ + 'object_filters': [include_object], + 'include_schemas': True, + 'metadata': self.m2 + }) + autogenerate._produce_net_changes(self.autogen_context, diffs) + + eq_(diffs[0][0], "add_table") + eq_(diffs[0][1].schema, None) + + def test_alt_schema_included_upgrade(self): + diffs = [] + + def include_object(obj, name, type_, reflected, compare_to): + if type_ == "table": + return name == "t4" + else: + return True + + self.autogen_context.update({ + 'object_filters': [include_object], + 'include_schemas': True, + 'metadata': self.m2 + }) + autogenerate._produce_net_changes(self.autogen_context, diffs) + + eq_(diffs[0][0], "add_table") + eq_(diffs[0][1].schema, config.test_schema) + + def test_default_schema_omitted_downgrade(self): + diffs = [] + + def include_object(obj, name, type_, reflected, compare_to): + if type_ == "table": + return name == "t1" + else: + return True + self.autogen_context.update({ + 'object_filters': [include_object], + 'include_schemas': True, + 'metadata': self.m2 + }) + autogenerate._produce_net_changes(self.autogen_context, diffs) + + eq_(diffs[0][0], "remove_table") + eq_(diffs[0][1].schema, None) + + def test_alt_schema_included_downgrade(self): + diffs = [] + + def include_object(obj, name, type_, reflected, compare_to): + if type_ == "table": + return name == "t2" + else: + return True + self.autogen_context.update({ + 'object_filters': [include_object], + 'include_schemas': True, + 'metadata': self.m2 + }) + autogenerate._produce_net_changes(self.autogen_context, diffs) + eq_(diffs[0][0], "remove_table") + eq_(diffs[0][1].schema, config.test_schema) + + +class AutogenDefaultSchemaTest(AutogenFixtureTest, TestBase): + __only_on__ = 'postgresql' + + def test_uses_explcit_schema_in_default_one(self): + + default_schema = self.bind.dialect.default_schema_name + + m1 = MetaData() + m2 = MetaData() + + Table('a', m1, Column('x', String(50))) + Table('a', m2, Column('x', String(50)), schema=default_schema) + + diffs = self._fixture(m1, m2, include_schemas=True) + eq_(diffs, []) + + def test_uses_explcit_schema_in_default_two(self): + + default_schema = self.bind.dialect.default_schema_name + + m1 = MetaData() + m2 = MetaData() + + Table('a', m1, Column('x', String(50))) + Table('a', m2, Column('x', String(50)), schema=default_schema) + Table('a', m2, Column('y', String(50)), schema="test_schema") + + diffs = self._fixture(m1, m2, include_schemas=True) + eq_(len(diffs), 1) + eq_(diffs[0][0], "add_table") + eq_(diffs[0][1].schema, "test_schema") + eq_(diffs[0][1].c.keys(), ['y']) + + def test_uses_explcit_schema_in_default_three(self): + + default_schema = self.bind.dialect.default_schema_name + + m1 = MetaData() + m2 = MetaData() + + Table('a', m1, Column('y', String(50)), schema="test_schema") + + Table('a', m2, Column('x', String(50)), schema=default_schema) + Table('a', m2, Column('y', String(50)), schema="test_schema") + + diffs = self._fixture(m1, m2, include_schemas=True) + eq_(len(diffs), 1) + eq_(diffs[0][0], "add_table") + eq_(diffs[0][1].schema, default_schema) + eq_(diffs[0][1].c.keys(), ['x']) + + +class ModelOne(object): + __requires__ = ('unique_constraint_reflection', ) + + schema = None + + @classmethod + def _get_db_schema(cls): + schema = cls.schema + + m = MetaData(schema=schema) + + Table('user', m, + Column('id', Integer, primary_key=True), + Column('name', String(50)), + Column('a1', Text), + Column("pw", String(50)), + Index('pw_idx', 'pw') + ) + + Table('address', m, + Column('id', Integer, primary_key=True), + Column('email_address', String(100), nullable=False), + ) + + Table('order', m, + Column('order_id', Integer, primary_key=True), + Column("amount", Numeric(8, 2), nullable=False, + server_default=text("0")), + CheckConstraint('amount >= 0', name='ck_order_amount') + ) + + Table('extra', m, + Column("x", CHAR), + Column('uid', Integer, ForeignKey('user.id')) + ) + + return m + + @classmethod + def _get_model_schema(cls): + schema = cls.schema + + m = MetaData(schema=schema) + + Table('user', m, + Column('id', Integer, primary_key=True), + Column('name', String(50), nullable=False), + Column('a1', Text, server_default="x") + ) + + Table('address', m, + Column('id', Integer, primary_key=True), + Column('email_address', String(100), nullable=False), + Column('street', String(50)), + UniqueConstraint('email_address', name="uq_email") + ) + + Table('order', m, + Column('order_id', Integer, primary_key=True), + Column('amount', Numeric(10, 2), nullable=True, + server_default=text("0")), + Column('user_id', Integer, ForeignKey('user.id')), + CheckConstraint('amount > -1', name='ck_order_amount'), + ) + + Table('item', m, + Column('id', Integer, primary_key=True), + Column('description', String(100)), + Column('order_id', Integer, ForeignKey('order.order_id')), + CheckConstraint('len(description) > 5') + ) + return m + + +class AutogenerateDiffTest(ModelOne, AutogenTest, TestBase): + __only_on__ = 'sqlite' + + def test_diffs(self): + """test generation of diff rules""" + + metadata = self.m2 + diffs = [] + ctx = self.autogen_context.copy() + ctx['metadata'] = self.m2 + ctx['object_filters'] = _default_object_filters + autogenerate._produce_net_changes( + ctx, diffs + ) + + eq_( + diffs[0], + ('add_table', metadata.tables['item']) + ) + + eq_(diffs[1][0], 'remove_table') + eq_(diffs[1][1].name, "extra") + + eq_(diffs[2][0], "add_column") + eq_(diffs[2][1], None) + eq_(diffs[2][2], "address") + eq_(diffs[2][3], metadata.tables['address'].c.street) + + eq_(diffs[3][0], "add_constraint") + eq_(diffs[3][1].name, "uq_email") + + eq_(diffs[4][0], "add_column") + eq_(diffs[4][1], None) + eq_(diffs[4][2], "order") + eq_(diffs[4][3], metadata.tables['order'].c.user_id) + + eq_(diffs[5][0][0], "modify_type") + eq_(diffs[5][0][1], None) + eq_(diffs[5][0][2], "order") + eq_(diffs[5][0][3], "amount") + eq_(repr(diffs[5][0][5]), "NUMERIC(precision=8, scale=2)") + eq_(repr(diffs[5][0][6]), "Numeric(precision=10, scale=2)") + + self._assert_fk_diff( + diffs[6], "add_fk", + "order", ["user_id"], + "user", ["id"] + ) + + eq_(diffs[7][0][0], "modify_default") + eq_(diffs[7][0][1], None) + eq_(diffs[7][0][2], "user") + eq_(diffs[7][0][3], "a1") + eq_(diffs[7][0][6].arg, "x") + + eq_(diffs[8][0][0], 'modify_nullable') + eq_(diffs[8][0][5], True) + eq_(diffs[8][0][6], False) + + eq_(diffs[9][0], 'remove_index') + eq_(diffs[9][1].name, 'pw_idx') + + eq_(diffs[10][0], 'remove_column') + eq_(diffs[10][3].name, 'pw') + + def test_include_symbol(self): + + diffs = [] + + def include_symbol(name, schema=None): + return name in ('address', 'order') + + context = MigrationContext.configure( + connection=self.bind.connect(), + opts={ + 'compare_type': True, + 'compare_server_default': True, + 'target_metadata': self.m2, + 'include_symbol': include_symbol, + } + ) + + diffs = autogenerate.compare_metadata( + context, context.opts['target_metadata']) + + alter_cols = set([ + d[2] for d in self._flatten_diffs(diffs) + if d[0].startswith('modify') + ]) + eq_(alter_cols, set(['order'])) + + def test_include_object(self): + def include_object(obj, name, type_, reflected, compare_to): + assert obj.name == name + if type_ == "table": + if reflected: + assert obj.metadata is not self.m2 + else: + assert obj.metadata is self.m2 + return name in ("address", "order", "user") + elif type_ == "column": + if reflected: + assert obj.table.metadata is not self.m2 + else: + assert obj.table.metadata is self.m2 + return name != "street" + else: + return True + + context = MigrationContext.configure( + connection=self.bind.connect(), + opts={ + 'compare_type': True, + 'compare_server_default': True, + 'target_metadata': self.m2, + 'include_object': include_object, + } + ) + + diffs = autogenerate.compare_metadata( + context, context.opts['target_metadata']) + + alter_cols = set([ + d[2] for d in self._flatten_diffs(diffs) + if d[0].startswith('modify') + ]).union( + d[3].name for d in self._flatten_diffs(diffs) + if d[0] == 'add_column' + ).union( + d[1].name for d in self._flatten_diffs(diffs) + if d[0] == 'add_table' + ) + eq_(alter_cols, set(['user_id', 'order', 'user'])) + + def test_skip_null_type_comparison_reflected(self): + diff = [] + autogenerate.compare._compare_type(None, "sometable", "somecol", + Column("somecol", NULLTYPE), + Column("somecol", Integer()), + diff, self.autogen_context + ) + assert not diff + + def test_skip_null_type_comparison_local(self): + diff = [] + autogenerate.compare._compare_type(None, "sometable", "somecol", + Column("somecol", Integer()), + Column("somecol", NULLTYPE), + diff, self.autogen_context + ) + assert not diff + + def test_custom_type_compare(self): + class MyType(TypeDecorator): + impl = Integer + + def compare_against_backend(self, dialect, conn_type): + return isinstance(conn_type, Integer) + + diff = [] + autogenerate.compare._compare_type(None, "sometable", "somecol", + Column("somecol", INTEGER()), + Column("somecol", MyType()), + diff, self.autogen_context + ) + assert not diff + + diff = [] + autogenerate.compare._compare_type(None, "sometable", "somecol", + Column("somecol", String()), + Column("somecol", MyType()), + diff, self.autogen_context + ) + eq_( + diff[0][0:4], + ('modify_type', None, 'sometable', 'somecol') + ) + + def test_affinity_typedec(self): + class MyType(TypeDecorator): + impl = CHAR + + def load_dialect_impl(self, dialect): + if dialect.name == 'sqlite': + return dialect.type_descriptor(Integer()) + else: + return dialect.type_descriptor(CHAR(32)) + + diff = [] + autogenerate.compare._compare_type( + None, "sometable", "somecol", + Column("somecol", Integer, nullable=True), + Column("somecol", MyType()), + diff, self.autogen_context + ) + assert not diff + + def test_dont_barf_on_already_reflected(self): + diffs = [] + from sqlalchemy.util import OrderedSet + inspector = Inspector.from_engine(self.bind) + autogenerate.compare._compare_tables( + OrderedSet([(None, 'extra'), (None, 'user')]), + OrderedSet(), [], inspector, + MetaData(), diffs, self.autogen_context + ) + eq_( + [(rec[0], rec[1].name) for rec in diffs], + [('remove_table', 'extra'), ('remove_table', 'user')] + ) + + +class AutogenerateDiffTestWSchema(ModelOne, AutogenTest, TestBase): + __only_on__ = 'postgresql' + schema = "test_schema" + + def test_diffs(self): + """test generation of diff rules""" + + metadata = self.m2 + diffs = [] + + self.autogen_context.update({ + 'object_filters': _default_object_filters, + 'include_schemas': True, + 'metadata': self.m2 + }) + autogenerate._produce_net_changes(self.autogen_context, diffs) + + eq_( + diffs[0], + ('add_table', metadata.tables['%s.item' % self.schema]) + ) + + eq_(diffs[1][0], 'remove_table') + eq_(diffs[1][1].name, "extra") + + eq_(diffs[2][0], "add_column") + eq_(diffs[2][1], self.schema) + eq_(diffs[2][2], "address") + eq_(diffs[2][3], metadata.tables['%s.address' % self.schema].c.street) + + eq_(diffs[3][0], "add_constraint") + eq_(diffs[3][1].name, "uq_email") + + eq_(diffs[4][0], "add_column") + eq_(diffs[4][1], self.schema) + eq_(diffs[4][2], "order") + eq_(diffs[4][3], metadata.tables['%s.order' % self.schema].c.user_id) + + eq_(diffs[5][0][0], "modify_type") + eq_(diffs[5][0][1], self.schema) + eq_(diffs[5][0][2], "order") + eq_(diffs[5][0][3], "amount") + eq_(repr(diffs[5][0][5]), "NUMERIC(precision=8, scale=2)") + eq_(repr(diffs[5][0][6]), "Numeric(precision=10, scale=2)") + + self._assert_fk_diff( + diffs[6], "add_fk", + "order", ["user_id"], + "user", ["id"], + source_schema=config.test_schema + ) + + eq_(diffs[7][0][0], "modify_default") + eq_(diffs[7][0][1], self.schema) + eq_(diffs[7][0][2], "user") + eq_(diffs[7][0][3], "a1") + eq_(diffs[7][0][6].arg, "x") + + eq_(diffs[8][0][0], 'modify_nullable') + eq_(diffs[8][0][5], True) + eq_(diffs[8][0][6], False) + + eq_(diffs[9][0], 'remove_index') + eq_(diffs[9][1].name, 'pw_idx') + + eq_(diffs[10][0], 'remove_column') + eq_(diffs[10][3].name, 'pw') + + +class AutogenerateCustomCompareTypeTest(AutogenTest, TestBase): + __only_on__ = 'sqlite' + + @classmethod + def _get_db_schema(cls): + m = MetaData() + + Table('sometable', m, + Column('id', Integer, primary_key=True), + Column('value', Integer)) + return m + + @classmethod + def _get_model_schema(cls): + m = MetaData() + + Table('sometable', m, + Column('id', Integer, primary_key=True), + Column('value', String)) + return m + + def test_uses_custom_compare_type_function(self): + my_compare_type = Mock() + self.context._user_compare_type = my_compare_type + + diffs = [] + ctx = self.autogen_context.copy() + ctx['metadata'] = self.m2 + autogenerate._produce_net_changes(ctx, diffs) + + first_table = self.m2.tables['sometable'] + first_column = first_table.columns['id'] + + eq_(len(my_compare_type.mock_calls), 2) + + # We'll just test the first call + _, args, _ = my_compare_type.mock_calls[0] + (context, inspected_column, metadata_column, + inspected_type, metadata_type) = args + eq_(context, self.context) + eq_(metadata_column, first_column) + eq_(metadata_type, first_column.type) + eq_(inspected_column.name, first_column.name) + eq_(type(inspected_type), INTEGER) + + def test_column_type_not_modified_custom_compare_type_returns_False(self): + my_compare_type = Mock() + my_compare_type.return_value = False + self.context._user_compare_type = my_compare_type + + diffs = [] + ctx = self.autogen_context.copy() + ctx['metadata'] = self.m2 + diffs = [] + autogenerate._produce_net_changes(ctx, diffs) + + eq_(diffs, []) + + def test_column_type_modified_custom_compare_type_returns_True(self): + my_compare_type = Mock() + my_compare_type.return_value = True + self.context._user_compare_type = my_compare_type + + ctx = self.autogen_context.copy() + ctx['metadata'] = self.m2 + diffs = [] + autogenerate._produce_net_changes(ctx, diffs) + + eq_(diffs[0][0][0], 'modify_type') + eq_(diffs[1][0][0], 'modify_type') + + +class PKConstraintUpgradesIgnoresNullableTest(AutogenTest, TestBase): + __backend__ = True + + # test workaround for SQLAlchemy issue #3023, alembic issue #199 + @classmethod + def _get_db_schema(cls): + m = MetaData() + + Table( + 'person_to_role', m, + Column('person_id', Integer, autoincrement=False), + Column('role_id', Integer, autoincrement=False), + PrimaryKeyConstraint('person_id', 'role_id') + ) + return m + + @classmethod + def _get_model_schema(cls): + return cls._get_db_schema() + + def test_no_change(self): + diffs = [] + ctx = self.autogen_context.copy() + ctx['metadata'] = self.m2 + autogenerate._produce_net_changes(ctx, diffs) + eq_(diffs, []) + + +class AutogenKeyTest(AutogenTest, TestBase): + __only_on__ = 'sqlite' + + @classmethod + def _get_db_schema(cls): + m = MetaData() + + Table('someothertable', m, + Column('id', Integer, primary_key=True), + Column('value', Integer, key="somekey"), + ) + return m + + @classmethod + def _get_model_schema(cls): + m = MetaData() + + Table('sometable', m, + Column('id', Integer, primary_key=True), + Column('value', Integer, key="someotherkey"), + ) + Table('someothertable', m, + Column('id', Integer, primary_key=True), + Column('value', Integer, key="somekey"), + Column("othervalue", Integer, key="otherkey") + ) + return m + + symbols = ['someothertable', 'sometable'] + + def test_autogen(self): + + diffs = [] + + ctx = self.autogen_context.copy() + ctx['metadata'] = self.m2 + autogenerate._produce_net_changes(ctx, diffs) + eq_(diffs[0][0], "add_table") + eq_(diffs[0][1].name, "sometable") + eq_(diffs[1][0], "add_column") + eq_(diffs[1][3].key, "otherkey") + + +class AutogenVersionTableTest(AutogenTest, TestBase): + __only_on__ = 'sqlite' + version_table_name = 'alembic_version' + version_table_schema = None + + @classmethod + def _get_db_schema(cls): + m = MetaData() + Table( + cls.version_table_name, m, + Column('x', Integer), schema=cls.version_table_schema) + return m + + @classmethod + def _get_model_schema(cls): + m = MetaData() + return m + + def test_no_version_table(self): + diffs = [] + ctx = self.autogen_context.copy() + ctx['metadata'] = self.m2 + + autogenerate._produce_net_changes(ctx, diffs) + eq_(diffs, []) + + def test_version_table_in_target(self): + diffs = [] + Table( + self.version_table_name, + self.m2, Column('x', Integer), schema=self.version_table_schema) + + ctx = self.autogen_context.copy() + ctx['metadata'] = self.m2 + autogenerate._produce_net_changes(ctx, diffs) + eq_(diffs, []) + + +class AutogenCustomVersionTableSchemaTest(AutogenVersionTableTest): + __only_on__ = 'postgresql' + version_table_schema = 'test_schema' + configure_opts = {'version_table_schema': 'test_schema'} + + +class AutogenCustomVersionTableTest(AutogenVersionTableTest): + version_table_name = 'my_version_table' + configure_opts = {'version_table': 'my_version_table'} + + +class AutogenCustomVersionTableAndSchemaTest(AutogenVersionTableTest): + __only_on__ = 'postgresql' + version_table_name = 'my_version_table' + version_table_schema = 'test_schema' + configure_opts = { + 'version_table': 'my_version_table', + 'version_table_schema': 'test_schema'} + + +class AutogenerateDiffOrderTest(AutogenTest, TestBase): + __only_on__ = 'sqlite' + + @classmethod + def _get_db_schema(cls): + return MetaData() + + @classmethod + def _get_model_schema(cls): + m = MetaData() + Table('parent', m, + Column('id', Integer, primary_key=True) + ) + + Table('child', m, + Column('parent_id', Integer, ForeignKey('parent.id')), + ) + + return m + + def test_diffs_order(self): + """ + Added in order to test that child tables(tables with FKs) are generated + before their parent tables + """ + + ctx = self.autogen_context.copy() + ctx['metadata'] = self.m2 + diffs = [] + autogenerate._produce_net_changes(ctx, diffs) + + eq_(diffs[0][0], 'add_table') + eq_(diffs[0][1].name, "parent") + eq_(diffs[1][0], 'add_table') + eq_(diffs[1][1].name, "child") + + +class CompareMetadataTest(ModelOne, AutogenTest, TestBase): + __only_on__ = 'sqlite' + + def test_compare_metadata(self): + metadata = self.m2 + + diffs = autogenerate.compare_metadata(self.context, metadata) + + eq_( + diffs[0], + ('add_table', metadata.tables['item']) + ) + + eq_(diffs[1][0], 'remove_table') + eq_(diffs[1][1].name, "extra") + + eq_(diffs[2][0], "add_column") + eq_(diffs[2][1], None) + eq_(diffs[2][2], "address") + eq_(diffs[2][3], metadata.tables['address'].c.street) + + eq_(diffs[3][0], "add_constraint") + eq_(diffs[3][1].name, "uq_email") + + eq_(diffs[4][0], "add_column") + eq_(diffs[4][1], None) + eq_(diffs[4][2], "order") + eq_(diffs[4][3], metadata.tables['order'].c.user_id) + + eq_(diffs[5][0][0], "modify_type") + eq_(diffs[5][0][1], None) + eq_(diffs[5][0][2], "order") + eq_(diffs[5][0][3], "amount") + eq_(repr(diffs[5][0][5]), "NUMERIC(precision=8, scale=2)") + eq_(repr(diffs[5][0][6]), "Numeric(precision=10, scale=2)") + + self._assert_fk_diff( + diffs[6], "add_fk", + "order", ["user_id"], + "user", ["id"] + ) + + eq_(diffs[7][0][0], "modify_default") + eq_(diffs[7][0][1], None) + eq_(diffs[7][0][2], "user") + eq_(diffs[7][0][3], "a1") + eq_(diffs[7][0][6].arg, "x") + + eq_(diffs[8][0][0], 'modify_nullable') + eq_(diffs[8][0][5], True) + eq_(diffs[8][0][6], False) + + eq_(diffs[9][0], 'remove_index') + eq_(diffs[9][1].name, 'pw_idx') + + eq_(diffs[10][0], 'remove_column') + eq_(diffs[10][3].name, 'pw') + + def test_compare_metadata_include_object(self): + metadata = self.m2 + + def include_object(obj, name, type_, reflected, compare_to): + if type_ == "table": + return name in ("extra", "order") + elif type_ == "column": + return name != "amount" + else: + return True + + context = MigrationContext.configure( + connection=self.bind.connect(), + opts={ + 'compare_type': True, + 'compare_server_default': True, + 'include_object': include_object, + } + ) + + diffs = autogenerate.compare_metadata(context, metadata) + + eq_(diffs[0][0], 'remove_table') + eq_(diffs[0][1].name, "extra") + + eq_(diffs[1][0], "add_column") + eq_(diffs[1][1], None) + eq_(diffs[1][2], "order") + eq_(diffs[1][3], metadata.tables['order'].c.user_id) + + def test_compare_metadata_include_symbol(self): + metadata = self.m2 + + def include_symbol(table_name, schema_name): + return table_name in ('extra', 'order') + + context = MigrationContext.configure( + connection=self.bind.connect(), + opts={ + 'compare_type': True, + 'compare_server_default': True, + 'include_symbol': include_symbol, + } + ) + + diffs = autogenerate.compare_metadata(context, metadata) + + eq_(diffs[0][0], 'remove_table') + eq_(diffs[0][1].name, "extra") + + eq_(diffs[1][0], "add_column") + eq_(diffs[1][1], None) + eq_(diffs[1][2], "order") + eq_(diffs[1][3], metadata.tables['order'].c.user_id) + + eq_(diffs[2][0][0], "modify_type") + eq_(diffs[2][0][1], None) + eq_(diffs[2][0][2], "order") + eq_(diffs[2][0][3], "amount") + eq_(repr(diffs[2][0][5]), "NUMERIC(precision=8, scale=2)") + eq_(repr(diffs[2][0][6]), "Numeric(precision=10, scale=2)") + + eq_(diffs[2][1][0], 'modify_nullable') + eq_(diffs[2][1][2], 'order') + eq_(diffs[2][1][5], False) + eq_(diffs[2][1][6], True) + + def test_compare_metadata_as_sql(self): + context = MigrationContext.configure( + connection=self.bind.connect(), + opts={'as_sql': True} + ) + metadata = self.m2 + + assert_raises_message( + CommandError, + "autogenerate can't use as_sql=True as it prevents " + "querying the database for schema information", + autogenerate.compare_metadata, context, metadata + ) + + +class PGCompareMetaData(ModelOne, AutogenTest, TestBase): + __only_on__ = 'postgresql' + schema = "test_schema" + + def test_compare_metadata_schema(self): + metadata = self.m2 + + context = MigrationContext.configure( + connection=self.bind.connect(), + opts={ + "include_schemas": True + } + ) + + diffs = autogenerate.compare_metadata(context, metadata) + + eq_( + diffs[0], + ('add_table', metadata.tables['test_schema.item']) + ) + + eq_(diffs[1][0], 'remove_table') + eq_(diffs[1][1].name, "extra") + + eq_(diffs[2][0], "add_column") + eq_(diffs[2][1], "test_schema") + eq_(diffs[2][2], "address") + eq_(diffs[2][3], metadata.tables['test_schema.address'].c.street) + + eq_(diffs[3][0], "add_constraint") + eq_(diffs[3][1].name, "uq_email") + + eq_(diffs[4][0], "add_column") + eq_(diffs[4][1], "test_schema") + eq_(diffs[4][2], "order") + eq_(diffs[4][3], metadata.tables['test_schema.order'].c.user_id) + + eq_(diffs[5][0][0], 'modify_nullable') + eq_(diffs[5][0][5], False) + eq_(diffs[5][0][6], True) |