diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-12-27 12:37:07 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-12-27 12:37:07 -0500 |
commit | a9cc3f62fc424b7aff426c4d7a2c047be3b7d848 (patch) | |
tree | 183df236454203dcd080c1778a52d3feacbfbc27 | |
parent | 444008f8aeaba58e8a6fd7369f300dbbce4aea72 (diff) | |
download | alembic-a9cc3f62fc424b7aff426c4d7a2c047be3b7d848.tar.gz |
- clean out test_autogenerate
- start pinning some tests to 0.9, don't need to pin to 0.7 anymore
- Autogenerate for ``op.create_table()`` will not include a
``PrimaryKeyConstraint()`` that has no columns.
- don't need a full string test for boolean/check constraint autogen
-rw-r--r-- | alembic/autogenerate/render.py | 3 | ||||
-rw-r--r-- | alembic/util.py | 1 | ||||
-rw-r--r-- | docs/build/changelog.rst | 6 | ||||
-rw-r--r-- | tests/__init__.py | 15 | ||||
-rw-r--r-- | tests/test_autogen_render.py | 26 | ||||
-rw-r--r-- | tests/test_autogenerate.py | 1134 | ||||
-rw-r--r-- | tests/test_postgresql.py | 5 |
7 files changed, 293 insertions, 897 deletions
diff --git a/alembic/autogenerate/render.py b/alembic/autogenerate/render.py index 53c714d..b89227a 100644 --- a/alembic/autogenerate/render.py +++ b/alembic/autogenerate/render.py @@ -323,6 +323,9 @@ def _render_primary_key(constraint, autogen_context): if rendered is not False: return rendered + if not constraint.columns: + return None + opts = [] if constraint.name: opts.append(("name", repr(constraint.name))) diff --git a/alembic/util.py b/alembic/util.py index 4f29882..6a5f8df 100644 --- a/alembic/util.py +++ b/alembic/util.py @@ -23,6 +23,7 @@ def _safe_int(value): _vers = tuple([_safe_int(x) for x in re.findall(r'(\d+|[abc]\d)', __version__)]) sqla_07 = _vers > (0, 7, 2) sqla_08 = _vers >= (0, 8, 0, 'b2') +sqla_09 = _vers >= (0, 9, 0) if not sqla_07: raise CommandError( "SQLAlchemy 0.7.3 or greater is required. ") diff --git a/docs/build/changelog.rst b/docs/build/changelog.rst index 088018e..72c1fc2 100644 --- a/docs/build/changelog.rst +++ b/docs/build/changelog.rst @@ -9,6 +9,12 @@ Changelog .. change:: :tags: bug + Autogenerate for ``op.create_table()`` will not include a + ``PrimaryKeyConstraint()`` that has no columns. + + .. change:: + :tags: bug + Fixed bug in the not-internally-used :meth:`.ScriptDirectory.get_base` method which would fail if called on an empty versions directory. diff --git a/tests/__init__.py b/tests/__init__.py index 71139ad..904ee76 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -43,7 +43,7 @@ def sqlite_db(): # per connection, so create a new # engine for each assertion dir_ = os.path.join(staging_directory, 'scripts') - return create_engine('sqlite:///%s/foo.db' % dir_, echo=True) + return create_engine('sqlite:///%s/foo.db' % dir_) def capture_db(): buf = [] @@ -62,7 +62,7 @@ def db_for_dialect(name): except configparser.NoOptionError: raise SkipTest("No dialect %r in test.cfg" % name) try: - eng = create_engine(cfg, echo=True) + eng = create_engine(cfg) except ImportError as er1: raise SkipTest("Can't import DBAPI: %s" % er1) try: @@ -72,11 +72,6 @@ def db_for_dialect(name): _engs[name] = eng return eng -@decorator -def requires_07(fn, *arg, **kw): - if not util.sqla_07: - raise SkipTest("SQLAlchemy 0.7 required") - return fn(*arg, **kw) @decorator def requires_08(fn, *arg, **kw): @@ -84,6 +79,12 @@ def requires_08(fn, *arg, **kw): raise SkipTest("SQLAlchemy 0.8.0b2 or greater required") return fn(*arg, **kw) +@decorator +def requires_09(fn, *arg, **kw): + if not util.sqla_09: + raise SkipTest("SQLAlchemy 0.9 or greater required") + return fn(*arg, **kw) + _dialects = {} def _get_dialect(name): if name is None or name == 'default': diff --git a/tests/test_autogen_render.py b/tests/test_autogen_render.py index e928921..c1942ef 100644 --- a/tests/test_autogen_render.py +++ b/tests/test_autogen_render.py @@ -290,6 +290,32 @@ class AutogenRenderTest(TestCase): "op.drop_table('sometable', schema='foo')" ) + def test_render_table_no_implicit_check(self): + m = MetaData() + t = Table('test', m, Column('x', Boolean())) + + eq_ignore_whitespace( + autogenerate.render._add_table(t, self.autogen_context), + "op.create_table('test',sa.Column('x', sa.Boolean(), nullable=True))" + ) + + def test_render_empty_pk_vs_nonempty_pk(self): + m = MetaData() + t1 = Table('t1', m, Column('x', Integer)) + t2 = Table('t2', m, Column('x', Integer, primary_key=True)) + + eq_ignore_whitespace( + autogenerate.render._add_table(t1, self.autogen_context), + "op.create_table('t1',sa.Column('x', sa.Integer(), nullable=True))" + ) + + eq_ignore_whitespace( + autogenerate.render._add_table(t2, self.autogen_context), + "op.create_table('t2'," + "sa.Column('x', sa.Integer(), nullable=False)," + "sa.PrimaryKeyConstraint('x'))" + ) + def test_render_add_column(self): eq_( autogenerate.render._add_column( diff --git a/tests/test_autogenerate.py b/tests/test_autogenerate.py index f4b37e0..3b55908 100644 --- a/tests/test_autogenerate.py +++ b/tests/test_autogenerate.py @@ -16,87 +16,11 @@ from sqlalchemy.sql import and_, column, literal_column from alembic import autogenerate, util, compat from alembic.migration import MigrationContext from . import staging_env, sqlite_db, clear_staging_env, eq_, \ - eq_ignore_whitespace, requires_07, db_for_dialect + eq_ignore_whitespace, db_for_dialect py3k = sys.version_info >= (3, ) -def _model_one(schema=None): - m = MetaData(schema=schema) - Table('user', m, - Column('id', Integer, primary_key=True), - Column('name', String(50)), - Column('a1', Text), - Column("pw", String(50)) - ) - - Table('address', m, - Column('id', Integer, primary_key=True), - Column('email_address', String(100), nullable=False), - ) - - Table('order', m, - Column('order_id', Integer, primary_key=True), - Column("amount", Numeric(8, 2), nullable=False, - server_default="0"), - CheckConstraint('amount >= 0', name='ck_order_amount') - ) - - Table('extra', m, - Column("x", CHAR), - Column('uid', Integer, ForeignKey('user.id')) - ) - - return m - -def _model_two(schema=None): - m = MetaData(schema=schema) - - Table('user', m, - Column('id', Integer, primary_key=True), - Column('name', String(50), nullable=False), - Column('a1', Text, server_default="x") - ) - - Table('address', m, - Column('id', Integer, primary_key=True), - Column('email_address', String(100), nullable=False), - Column('street', String(50)), - ) - - Table('order', m, - Column('order_id', Integer, primary_key=True), - Column('amount', Numeric(10, 2), nullable=True, - server_default="0"), - Column('user_id', Integer, ForeignKey('user.id')), - CheckConstraint('amount > -1', name='ck_order_amount'), - ) - - Table('item', m, - Column('id', Integer, primary_key=True), - Column('description', String(100)), - Column('order_id', Integer, ForeignKey('order.order_id')), - CheckConstraint('len(description) > 5') - ) - return m - - -def _model_three(): - m = MetaData() - return m - -def _model_four(): - m = MetaData() - - Table('parent', m, - Column('id', Integer, primary_key=True) - ) - - Table('child', m, - Column('parent_id', Integer, ForeignKey('parent.id')), - ) - - return m names_in_this_test = set() def _default_include_object(obj, name, type_, reflected, compare_to): @@ -119,7 +43,6 @@ class AutogenTest(object): return sqlite_db() @classmethod - @requires_07 def setup_class(cls): staging_env() cls.bind = cls._get_bind() @@ -132,21 +55,21 @@ class AutogenTest(object): connection=conn, opts={ 'compare_type': True, - 'compare_server_default':True, - 'target_metadata':cls.m2, - 'upgrade_token':"upgrades", - 'downgrade_token':"downgrades", - 'alembic_module_prefix':'op.', - 'sqlalchemy_module_prefix':'sa.', + 'compare_server_default': True, + 'target_metadata': cls.m2, + 'upgrade_token': "upgrades", + 'downgrade_token': "downgrades", + 'alembic_module_prefix': 'op.', + 'sqlalchemy_module_prefix': 'sa.', } ) connection = context.bind cls.autogen_context = { - 'imports':set(), - 'connection':connection, - 'dialect':connection.dialect, - 'context':context + 'imports': set(), + 'connection': connection, + 'dialect': connection.dialect, + 'context': context } @classmethod @@ -155,76 +78,6 @@ class AutogenTest(object): clear_staging_env() -class ImplicitConstraintNoGenTest(AutogenTest, TestCase): - - @classmethod - def _get_bind(cls): - return db_for_dialect('mysql') - - @classmethod - def _get_db_schema(cls): - m = MetaData() - - Table('someothertable', m, - Column('id', Integer, primary_key=True), - Column('value', Boolean()), - mysql_engine='InnoDB', - mysql_default_charset='utf8' - ) - return m - - @classmethod - def _get_model_schema(cls): - m = MetaData() - - Table('sometable', m, - Column('id', Integer, primary_key=True), - Column('value', Boolean()), - ) - return m - - - def test_boolean_gen_upgrade(self): - template_args = {} - autogenerate._produce_migration_diffs(self.context, - template_args, set(), - include_symbol=lambda name, schema=None: name == 'sometable') - eq_( - re.sub(r"u'", "'", template_args['upgrades']), - "### commands auto generated by Alembic - please adjust! ###\n" - " op.create_table('sometable',\n" - " sa.Column('id', sa.Integer(), nullable=False),\n" - " sa.Column('value', sa.Boolean(), nullable=True),\n" - " sa.PrimaryKeyConstraint('id')\n )\n" - " ### end Alembic commands ###" - ) - - def test_boolean_gen_downgrade(self): - # on the downgrade side, we are OK for now, as SQLAlchemy - # does not reflect check constraints yet. - - template_args = {} - autogenerate._produce_migration_diffs(self.context, - template_args, set(), - include_symbol=lambda name, schema=None: name in ('sometable', 'someothertable') - ) - eq_( - re.sub(r"u'", "'", template_args['downgrades']), - "### commands auto generated by Alembic - please adjust! ###\n" - " op.create_table('someothertable',\n" - " sa.Column('id', mysql.INTEGER(display_width=11), " - "nullable=False),\n" - " sa.Column('value', mysql.TINYINT(display_width=1), " - "autoincrement=False, nullable=True),\n" - " sa.PrimaryKeyConstraint('id'),\n" - " mysql_default_charset='utf8',\n" - " mysql_engine='InnoDB'\n" - " )\n" - " op.drop_table('sometable')\n" - " ### end Alembic commands ###" - ) - - class AutogenCrossSchemaTest(AutogenTest, TestCase): @classmethod def _get_bind(cls): @@ -324,179 +177,78 @@ class AutogenCrossSchemaTest(AutogenTest, TestCase): eq_(diffs[0][1].schema, self.test_schema_name) - - -class AutogenerateDiffTestWSchema(AutogenTest, TestCase): - - @classmethod - def _get_bind(cls): - cls.test_schema_name = "test_schema" - return db_for_dialect('postgresql') +class ModelOne(object): + schema = None @classmethod def _get_db_schema(cls): - return _model_one(schema=cls.test_schema_name) - - @classmethod - def _get_model_schema(cls): - return _model_two(schema=cls.test_schema_name) + schema = cls.schema - def test_diffs(self): - """test generation of diff rules""" + m = MetaData(schema=schema) - metadata = self.m2 - connection = self.context.bind - diffs = [] - autogenerate._produce_net_changes(connection, metadata, diffs, - self.autogen_context, - object_filters=_default_object_filters, - include_schemas=True - ) - - eq_( - diffs[0], - ('add_table', metadata.tables['%s.item' % self.test_schema_name]) + Table('user', m, + Column('id', Integer, primary_key=True), + Column('name', String(50)), + Column('a1', Text), + Column("pw", String(50)) ) - eq_(diffs[1][0], 'remove_table') - eq_(diffs[1][1].name, "extra") - - eq_(diffs[2][0], "add_column") - eq_(diffs[2][1], self.test_schema_name) - eq_(diffs[2][2], "address") - eq_(diffs[2][3], metadata.tables['%s.address' % self.test_schema_name].c.street) + Table('address', m, + Column('id', Integer, primary_key=True), + Column('email_address', String(100), nullable=False), + ) - eq_(diffs[3][0], "add_column") - eq_(diffs[3][1], self.test_schema_name) - eq_(diffs[3][2], "order") - eq_(diffs[3][3], metadata.tables['%s.order' % self.test_schema_name].c.user_id) + Table('order', m, + Column('order_id', Integer, primary_key=True), + Column("amount", Numeric(8, 2), nullable=False, + server_default="0"), + CheckConstraint('amount >= 0', name='ck_order_amount') + ) - eq_(diffs[4][0][0], "modify_type") - eq_(diffs[4][0][1], self.test_schema_name) - eq_(diffs[4][0][2], "order") - eq_(diffs[4][0][3], "amount") - eq_(repr(diffs[4][0][5]), "NUMERIC(precision=8, scale=2)") - eq_(repr(diffs[4][0][6]), "Numeric(precision=10, scale=2)") + Table('extra', m, + Column("x", CHAR), + Column('uid', Integer, ForeignKey('user.id')) + ) - eq_(diffs[5][0], 'remove_column') - eq_(diffs[5][3].name, 'pw') + return m - eq_(diffs[6][0][0], "modify_default") - eq_(diffs[6][0][1], self.test_schema_name) - eq_(diffs[6][0][2], "user") - eq_(diffs[6][0][3], "a1") - eq_(diffs[6][0][6].arg, "x") + @classmethod + def _get_model_schema(cls): + schema = cls.schema - eq_(diffs[7][0][0], 'modify_nullable') - eq_(diffs[7][0][5], True) - eq_(diffs[7][0][6], False) + m = MetaData(schema=schema) - def test_render_nothing(self): - context = MigrationContext.configure( - connection=self.bind.connect(), - opts={ - 'compare_type': True, - 'compare_server_default': True, - 'target_metadata': self.m1, - 'upgrade_token': "upgrades", - 'downgrade_token': "downgrades", - 'alembic_module_prefix': 'op.', - 'sqlalchemy_module_prefix': 'sa.', - } + Table('user', m, + Column('id', Integer, primary_key=True), + Column('name', String(50), nullable=False), + Column('a1', Text, server_default="x") ) - template_args = {} - autogenerate._produce_migration_diffs(context, template_args, set(), - include_symbol=lambda name, schema: False - ) - eq_(re.sub(r"u'", "'", template_args['upgrades']), -"""### commands auto generated by Alembic - please adjust! ### - pass - ### end Alembic commands ###""") - eq_(re.sub(r"u'", "'", template_args['downgrades']), -"""### commands auto generated by Alembic - please adjust! ### - pass - ### end Alembic commands ###""") - def test_render_diffs_extras(self): - """test a full render including indentation (include and schema)""" + Table('address', m, + Column('id', Integer, primary_key=True), + Column('email_address', String(100), nullable=False), + Column('street', String(50)), + ) - template_args = {} - autogenerate._produce_migration_diffs( - self.context, template_args, set(), - include_object=_default_include_object, - include_schemas=True - ) + Table('order', m, + Column('order_id', Integer, primary_key=True), + Column('amount', Numeric(10, 2), nullable=True, + server_default="0"), + Column('user_id', Integer, ForeignKey('user.id')), + CheckConstraint('amount > -1', name='ck_order_amount'), + ) - eq_(re.sub(r"u'", "'", template_args['upgrades']), -"""### commands auto generated by Alembic - please adjust! ### - op.create_table('item', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('description', sa.String(length=100), nullable=True), - sa.Column('order_id', sa.Integer(), nullable=True), - sa.CheckConstraint('len(description) > 5'), - sa.ForeignKeyConstraint(['order_id'], ['%(schema)s.order.order_id'], ), - sa.PrimaryKeyConstraint('id'), - schema='%(schema)s' - ) - op.drop_table('extra', schema='%(schema)s') - op.add_column('address', sa.Column('street', sa.String(length=50), nullable=True), schema='%(schema)s') - op.add_column('order', sa.Column('user_id', sa.Integer(), nullable=True), schema='%(schema)s') - op.alter_column('order', 'amount', - existing_type=sa.NUMERIC(precision=8, scale=2), - type_=sa.Numeric(precision=10, scale=2), - nullable=True, - existing_server_default='0::numeric', - schema='%(schema)s') - op.drop_column('user', 'pw', schema='%(schema)s') - op.alter_column('user', 'a1', - existing_type=sa.TEXT(), - server_default='x', - existing_nullable=True, - schema='%(schema)s') - op.alter_column('user', 'name', - existing_type=sa.VARCHAR(length=50), - nullable=False, - schema='%(schema)s') - ### end Alembic commands ###""" % {"schema": self.test_schema_name}) - eq_(re.sub(r"u'", "'", template_args['downgrades']), -"""### commands auto generated by Alembic - please adjust! ### - op.alter_column('user', 'name', - existing_type=sa.VARCHAR(length=50), - nullable=True, - schema='%(schema)s') - op.alter_column('user', 'a1', - existing_type=sa.TEXT(), - server_default=None, - existing_nullable=True, - schema='%(schema)s') - op.add_column('user', sa.Column('pw', sa.VARCHAR(length=50), nullable=True), schema='%(schema)s') - op.alter_column('order', 'amount', - existing_type=sa.Numeric(precision=10, scale=2), - type_=sa.NUMERIC(precision=8, scale=2), - nullable=False, - existing_server_default='0::numeric', - schema='%(schema)s') - op.drop_column('order', 'user_id', schema='%(schema)s') - op.drop_column('address', 'street', schema='%(schema)s') - op.create_table('extra', - sa.Column('x', sa.CHAR(length=1), autoincrement=False, nullable=True), - sa.Column('uid', sa.INTEGER(), autoincrement=False, nullable=True), - sa.ForeignKeyConstraint(['uid'], ['%(schema)s.user.id'], name='extra_uid_fkey'), - sa.PrimaryKeyConstraint(), - schema='%(schema)s' - ) - op.drop_table('item', schema='%(schema)s') - ### end Alembic commands ###""" % {"schema": self.test_schema_name}) + Table('item', m, + Column('id', Integer, primary_key=True), + Column('description', String(100)), + Column('order_id', Integer, ForeignKey('order.order_id')), + CheckConstraint('len(description) > 5') + ) + return m -class AutogenerateDiffTest(AutogenTest, TestCase): - @classmethod - def _get_db_schema(cls): - return _model_one() - @classmethod - def _get_model_schema(cls): - return _model_two() +class AutogenerateDiffTest(ModelOne, AutogenTest, TestCase): def test_diffs(self): """test generation of diff rules""" @@ -573,7 +325,6 @@ class AutogenerateDiffTest(AutogenTest, TestCase): def test_render_diffs_standard(self): """test a full render including indentation""" - metadata = self.m2 template_args = {} autogenerate._produce_migration_diffs(self.context, template_args, set()) @@ -604,6 +355,7 @@ class AutogenerateDiffTest(AutogenTest, TestCase): existing_type=sa.VARCHAR(length=50), nullable=False) ### end Alembic commands ###""") + eq_(re.sub(r"u'", "'", template_args['downgrades']), """### commands auto generated by Alembic - please adjust! ### op.alter_column('user', 'name', @@ -624,8 +376,7 @@ class AutogenerateDiffTest(AutogenTest, TestCase): op.create_table('extra', sa.Column('x', sa.CHAR(), nullable=True), sa.Column('uid', sa.INTEGER(), nullable=True), - sa.ForeignKeyConstraint(['uid'], ['user.id'], ), - sa.PrimaryKeyConstraint() + sa.ForeignKeyConstraint(['uid'], ['user.id'], ) ) op.drop_table('item') ### end Alembic commands ###""") @@ -750,6 +501,160 @@ class AutogenerateDiffTest(AutogenTest, TestCase): [('remove_table', 'extra'), ('remove_table', 'user')] ) +class AutogenerateDiffTestWSchema(ModelOne, AutogenTest, TestCase): + schema = "test_schema" + + + @classmethod + def _get_bind(cls): + return db_for_dialect('postgresql') + + def test_diffs(self): + """test generation of diff rules""" + + metadata = self.m2 + connection = self.context.bind + diffs = [] + autogenerate._produce_net_changes(connection, metadata, diffs, + self.autogen_context, + object_filters=_default_object_filters, + include_schemas=True + ) + + eq_( + diffs[0], + ('add_table', metadata.tables['%s.item' % self.schema]) + ) + + eq_(diffs[1][0], 'remove_table') + eq_(diffs[1][1].name, "extra") + + eq_(diffs[2][0], "add_column") + eq_(diffs[2][1], self.schema) + eq_(diffs[2][2], "address") + eq_(diffs[2][3], metadata.tables['%s.address' % self.schema].c.street) + + eq_(diffs[3][0], "add_column") + eq_(diffs[3][1], self.schema) + eq_(diffs[3][2], "order") + eq_(diffs[3][3], metadata.tables['%s.order' % self.schema].c.user_id) + + eq_(diffs[4][0][0], "modify_type") + eq_(diffs[4][0][1], self.schema) + eq_(diffs[4][0][2], "order") + eq_(diffs[4][0][3], "amount") + eq_(repr(diffs[4][0][5]), "NUMERIC(precision=8, scale=2)") + eq_(repr(diffs[4][0][6]), "Numeric(precision=10, scale=2)") + + eq_(diffs[5][0], 'remove_column') + eq_(diffs[5][3].name, 'pw') + + eq_(diffs[6][0][0], "modify_default") + eq_(diffs[6][0][1], self.schema) + eq_(diffs[6][0][2], "user") + eq_(diffs[6][0][3], "a1") + eq_(diffs[6][0][6].arg, "x") + + eq_(diffs[7][0][0], 'modify_nullable') + eq_(diffs[7][0][5], True) + eq_(diffs[7][0][6], False) + + def test_render_nothing(self): + context = MigrationContext.configure( + connection=self.bind.connect(), + opts={ + 'compare_type': True, + 'compare_server_default': True, + 'target_metadata': self.m1, + 'upgrade_token': "upgrades", + 'downgrade_token': "downgrades", + 'alembic_module_prefix': 'op.', + 'sqlalchemy_module_prefix': 'sa.', + } + ) + template_args = {} + autogenerate._produce_migration_diffs(context, template_args, set(), + include_symbol=lambda name, schema: False + ) + eq_(re.sub(r"u'", "'", template_args['upgrades']), +"""### commands auto generated by Alembic - please adjust! ### + pass + ### end Alembic commands ###""") + eq_(re.sub(r"u'", "'", template_args['downgrades']), +"""### commands auto generated by Alembic - please adjust! ### + pass + ### end Alembic commands ###""") + + def test_render_diffs_extras(self): + """test a full render including indentation (include and schema)""" + + template_args = {} + autogenerate._produce_migration_diffs( + self.context, template_args, set(), + include_object=_default_include_object, + include_schemas=True + ) + + eq_(re.sub(r"u'", "'", template_args['upgrades']), +"""### commands auto generated by Alembic - please adjust! ### + op.create_table('item', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('description', sa.String(length=100), nullable=True), + sa.Column('order_id', sa.Integer(), nullable=True), + sa.CheckConstraint('len(description) > 5'), + sa.ForeignKeyConstraint(['order_id'], ['%(schema)s.order.order_id'], ), + sa.PrimaryKeyConstraint('id'), + schema='%(schema)s' + ) + op.drop_table('extra', schema='%(schema)s') + op.add_column('address', sa.Column('street', sa.String(length=50), nullable=True), schema='%(schema)s') + op.add_column('order', sa.Column('user_id', sa.Integer(), nullable=True), schema='%(schema)s') + op.alter_column('order', 'amount', + existing_type=sa.NUMERIC(precision=8, scale=2), + type_=sa.Numeric(precision=10, scale=2), + nullable=True, + existing_server_default='0::numeric', + schema='%(schema)s') + op.drop_column('user', 'pw', schema='%(schema)s') + op.alter_column('user', 'a1', + existing_type=sa.TEXT(), + server_default='x', + existing_nullable=True, + schema='%(schema)s') + op.alter_column('user', 'name', + existing_type=sa.VARCHAR(length=50), + nullable=False, + schema='%(schema)s') + ### end Alembic commands ###""" % {"schema": self.schema}) + + eq_(re.sub(r"u'", "'", template_args['downgrades']), +"""### commands auto generated by Alembic - please adjust! ### + op.alter_column('user', 'name', + existing_type=sa.VARCHAR(length=50), + nullable=True, + schema='%(schema)s') + op.alter_column('user', 'a1', + existing_type=sa.TEXT(), + server_default=None, + existing_nullable=True, + schema='%(schema)s') + op.add_column('user', sa.Column('pw', sa.VARCHAR(length=50), nullable=True), schema='%(schema)s') + op.alter_column('order', 'amount', + existing_type=sa.Numeric(precision=10, scale=2), + type_=sa.NUMERIC(precision=8, scale=2), + nullable=False, + existing_server_default='0::numeric', + schema='%(schema)s') + op.drop_column('order', 'user_id', schema='%(schema)s') + op.drop_column('address', 'street', schema='%(schema)s') + op.create_table('extra', + sa.Column('x', sa.CHAR(length=1), autoincrement=False, nullable=True), + sa.Column('uid', sa.INTEGER(), autoincrement=False, nullable=True), + sa.ForeignKeyConstraint(['uid'], ['%(schema)s.user.id'], name='extra_uid_fkey'), + schema='%(schema)s' + ) + op.drop_table('item', schema='%(schema)s') + ### end Alembic commands ###""" % {"schema": self.schema}) class AutogenerateUniqueIndexTest(TestCase): @@ -922,7 +827,8 @@ class AutogenerateUniqueIndexTest(TestCase): Table('nothing_changed_related', m1, Column('id1', Integer), Column('id2', Integer), - ForeignKeyConstraint(['id1', 'id2'], ['nothing_changed.id1', 'nothing_changed.id2']) + ForeignKeyConstraint(['id1', 'id2'], + ['nothing_changed.id1', 'nothing_changed.id2']) ) Table('nothing_changed', m2, @@ -933,7 +839,8 @@ class AutogenerateUniqueIndexTest(TestCase): Table('nothing_changed_related', m2, Column('id1', Integer), Column('id2', Integer), - ForeignKeyConstraint(['id1', 'id2'], ['nothing_changed.id1', 'nothing_changed.id2']) + ForeignKeyConstraint(['id1', 'id2'], + ['nothing_changed.id1', 'nothing_changed.id2']) ) @@ -1123,7 +1030,8 @@ class AutogenerateCustomCompareTypeTest(AutogenTest, TestCase): self.context._user_compare_type = my_compare_type diffs = [] - autogenerate._produce_net_changes(self.context.bind, self.m2, diffs, self.autogen_context) + autogenerate._produce_net_changes(self.context.bind, self.m2, + diffs, self.autogen_context) first_table = self.m2.tables['sometable'] first_column = first_table.columns['id'] @@ -1145,7 +1053,8 @@ class AutogenerateCustomCompareTypeTest(AutogenTest, TestCase): self.context._user_compare_type = my_compare_type diffs = [] - autogenerate._produce_net_changes(self.context.bind, self.m2, diffs, self.autogen_context) + autogenerate._produce_net_changes(self.context.bind, self.m2, + diffs, self.autogen_context) eq_(diffs, []) @@ -1155,7 +1064,8 @@ class AutogenerateCustomCompareTypeTest(AutogenTest, TestCase): self.context._user_compare_type = my_compare_type diffs = [] - autogenerate._produce_net_changes(self.context.bind, self.m2, diffs, self.autogen_context) + autogenerate._produce_net_changes(self.context.bind, self.m2, + diffs, self.autogen_context) eq_(diffs[0][0][0], 'modify_type') eq_(diffs[1][0][0], 'modify_type') @@ -1203,40 +1113,23 @@ class AutogenKeyTest(AutogenTest, TestCase): eq_(diffs[1][0], "add_column") eq_(diffs[1][3].key, "otherkey") -class AutogenerateDiffOrderTest(TestCase): +class AutogenerateDiffOrderTest(AutogenTest, TestCase): @classmethod - @requires_07 - def setup_class(cls): - staging_env() - cls.bind = sqlite_db() - cls.m3 = _model_three() - cls.m3.create_all(cls.bind) - cls.m4 = _model_four() - - cls.empty_context = empty_context = MigrationContext.configure( - connection = cls.bind.connect(), - opts = { - 'compare_type':True, - 'compare_server_default':True, - 'target_metadata':cls.m3, - 'upgrade_token':"upgrades", - 'downgrade_token':"downgrades", - 'alembic_module_prefix':'op.', - 'sqlalchemy_module_prefix':'sa.' - } + def _get_db_schema(cls): + return MetaData() + + @classmethod + def _get_model_schema(cls): + m = MetaData() + Table('parent', m, + Column('id', Integer, primary_key=True) ) - connection = empty_context.bind - cls.autogen_empty_context = { - 'imports': set(), - 'connection': connection, - 'dialect': connection.dialect, - 'context': empty_context - } + Table('child', m, + Column('parent_id', Integer, ForeignKey('parent.id')), + ) - @classmethod - def teardown_class(cls): - clear_staging_env() + return m def test_diffs_order(self): """ @@ -1244,551 +1137,16 @@ class AutogenerateDiffOrderTest(TestCase): before their parent tables """ - metadata = self.m4 - connection = self.empty_context.bind + metadata = self.m2 + connection = self.context.bind diffs = [] autogenerate._produce_net_changes(connection, metadata, diffs, - self.autogen_empty_context) + self.autogen_context + ) eq_(diffs[0][0], 'add_table') eq_(diffs[0][1].name, "parent") eq_(diffs[1][0], 'add_table') eq_(diffs[1][1].name, "child") -class AutogenRenderTest(TestCase): - """test individual directives""" - - @classmethod - @requires_07 - def setup_class(cls): - cls.autogen_context = { - 'opts': { - 'sqlalchemy_module_prefix': 'sa.', - 'alembic_module_prefix': 'op.', - }, - 'dialect': mysql.dialect() - } - cls.pg_autogen_context = { - 'opts': { - 'sqlalchemy_module_prefix': 'sa.', - 'alembic_module_prefix': 'op.', - }, - 'dialect': postgresql.dialect() - } - - - def test_render_add_index(self): - """ - autogenerate.render._add_index - """ - m = MetaData() - t = Table('test', m, - Column('id', Integer, primary_key=True), - Column('active', Boolean()), - Column('code', String(255)), - ) - idx = Index('test_active_code_idx', t.c.active, t.c.code) - eq_ignore_whitespace( - autogenerate.render._add_index(idx, self.autogen_context), - "op.create_index('test_active_code_idx', 'test', " - "['active', 'code'], unique=False)" - ) - - def test_render_add_index_schema(self): - """ - autogenerate.render._add_index using schema - """ - m = MetaData() - t = Table('test', m, - Column('id', Integer, primary_key=True), - Column('active', Boolean()), - Column('code', String(255)), - schema='CamelSchema' - ) - idx = Index('test_active_code_idx', t.c.active, t.c.code) - eq_ignore_whitespace( - autogenerate.render._add_index(idx, self.autogen_context), - "op.create_index('test_active_code_idx', 'CamelSchema.test', " - "['active', 'code'], unique=False, schema='CamelSchema')" - ) - - def test_render_add_index_pg_where(self): - autogen_context = self.pg_autogen_context - - m = MetaData() - t = Table('t', m, - Column('x', String), - Column('y', String) - ) - - idx = Index('foo_idx', t.c.x, t.c.y, - postgresql_where=(t.c.y == 'something')) - - if compat.sqla_08: - eq_ignore_whitespace( - autogenerate.render._add_index(idx, autogen_context), - """op.create_index('foo_idx', 't', ['x', 'y'], unique=False, """ - """postgresql_where=sa.text("t.y = 'something'"))""" - ) - else: - eq_ignore_whitespace( - autogenerate.render._add_index(idx, autogen_context), - """op.create_index('foo_idx', 't', ['x', 'y'], unique=False, """ - """postgresql_where=sa.text('t.y = %(y_1)s'))""" - ) - - # def test_render_add_index_func(self): - # """ - # autogenerate.render._drop_index using func -- TODO: SQLA needs to - # reflect expressions as well as columns - # """ - # m = MetaData() - # t = Table('test', m, - # Column('id', Integer, primary_key=True), - # Column('active', Boolean()), - # Column('code', String(255)), - # ) - # idx = Index('test_active_lower_code_idx', t.c.active, func.lower(t.c.code)) - # eq_ignore_whitespace( - # autogenerate.render._add_index(idx, self.autogen_context), - # "" - # ) - - def test_drop_index(self): - """ - autogenerate.render._drop_index - """ - m = MetaData() - t = Table('test', m, - Column('id', Integer, primary_key=True), - Column('active', Boolean()), - Column('code', String(255)), - ) - idx = Index('test_active_code_idx', t.c.active, t.c.code) - eq_ignore_whitespace( - autogenerate.render._drop_index(idx, self.autogen_context), - "op.drop_index('test_active_code_idx', 'test')" - ) - - def test_add_unique_constraint(self): - """ - autogenerate.render._add_unique_constraint - """ - m = MetaData() - t = Table('test', m, - Column('id', Integer, primary_key=True), - Column('active', Boolean()), - Column('code', String(255)), - ) - uq = UniqueConstraint(t.c.code, name='uq_test_code') - eq_ignore_whitespace( - autogenerate.render._add_unique_constraint(uq, self.autogen_context), - "op.create_unique_constraint('uq_test_code', 'test', ['code'])" - ) - - def test_drop_constraint(self): - """ - autogenerate.render._drop_constraint - """ - m = MetaData() - t = Table('test', m, - Column('id', Integer, primary_key=True), - Column('active', Boolean()), - Column('code', String(255)), - ) - uq = UniqueConstraint(t.c.code, name='uq_test_code') - eq_ignore_whitespace( - autogenerate.render._drop_constraint(uq, self.autogen_context), - "op.drop_constraint('uq_test_code', 'test')" - ) - - def test_render_table_upgrade(self): - m = MetaData() - t = Table('test', m, - Column('id', Integer, primary_key=True), - Column('name', Unicode(255)), - Column("address_id", Integer, ForeignKey("address.id")), - Column("timestamp", DATETIME, server_default="NOW()"), - Column("amount", Numeric(5, 2)), - UniqueConstraint("name", name="uq_name"), - UniqueConstraint("timestamp"), - ) - eq_ignore_whitespace( - autogenerate.render._add_table(t, self.autogen_context), - "op.create_table('test'," - "sa.Column('id', sa.Integer(), nullable=False)," - "sa.Column('name', sa.Unicode(length=255), nullable=True)," - "sa.Column('address_id', sa.Integer(), nullable=True)," - "sa.Column('timestamp', sa.DATETIME(), " - "server_default='NOW()', " - "nullable=True)," - "sa.Column('amount', sa.Numeric(precision=5, scale=2), nullable=True)," - "sa.ForeignKeyConstraint(['address_id'], ['address.id'], )," - "sa.PrimaryKeyConstraint('id')," - "sa.UniqueConstraint('name', name='uq_name')," - "sa.UniqueConstraint('timestamp')" - ")" - ) - - def test_render_table_w_schema(self): - m = MetaData() - t = Table('test', m, - Column('id', Integer, primary_key=True), - Column('q', Integer, ForeignKey('address.id')), - schema='foo' - ) - eq_ignore_whitespace( - autogenerate.render._add_table(t, self.autogen_context), - "op.create_table('test'," - "sa.Column('id', sa.Integer(), nullable=False)," - "sa.Column('q', sa.Integer(), nullable=True)," - "sa.ForeignKeyConstraint(['q'], ['address.id'], )," - "sa.PrimaryKeyConstraint('id')," - "schema='foo'" - ")" - ) - - def test_render_table_w_fk_schema(self): - m = MetaData() - t = Table('test', m, - Column('id', Integer, primary_key=True), - Column('q', Integer, ForeignKey('foo.address.id')), - ) - eq_ignore_whitespace( - autogenerate.render._add_table(t, self.autogen_context), - "op.create_table('test'," - "sa.Column('id', sa.Integer(), nullable=False)," - "sa.Column('q', sa.Integer(), nullable=True)," - "sa.ForeignKeyConstraint(['q'], ['foo.address.id'], )," - "sa.PrimaryKeyConstraint('id')" - ")" - ) - - def test_render_table_w_metadata_schema(self): - m = MetaData(schema="foo") - t = Table('test', m, - Column('id', Integer, primary_key=True), - Column('q', Integer, ForeignKey('address.id')), - ) - eq_ignore_whitespace( - re.sub(r"u'", "'", autogenerate.render._add_table(t, self.autogen_context)), - "op.create_table('test'," - "sa.Column('id', sa.Integer(), nullable=False)," - "sa.Column('q', sa.Integer(), nullable=True)," - "sa.ForeignKeyConstraint(['q'], ['foo.address.id'], )," - "sa.PrimaryKeyConstraint('id')," - "schema='foo'" - ")" - ) - - def test_render_table_w_metadata_schema_override(self): - m = MetaData(schema="foo") - t = Table('test', m, - Column('id', Integer, primary_key=True), - Column('q', Integer, ForeignKey('bar.address.id')), - ) - eq_ignore_whitespace( - autogenerate.render._add_table(t, self.autogen_context), - "op.create_table('test'," - "sa.Column('id', sa.Integer(), nullable=False)," - "sa.Column('q', sa.Integer(), nullable=True)," - "sa.ForeignKeyConstraint(['q'], ['bar.address.id'], )," - "sa.PrimaryKeyConstraint('id')," - "schema='foo'" - ")" - ) - - def test_render_addtl_args(self): - m = MetaData() - t = Table('test', m, - Column('id', Integer, primary_key=True), - Column('q', Integer, ForeignKey('bar.address.id')), - postgresql_arg1="some_arg", mysql_engine="InnoDB" - ) - eq_ignore_whitespace( - autogenerate.render._add_table(t, self.autogen_context), - "op.create_table('test'," - "sa.Column('id', sa.Integer(), nullable=False)," - "sa.Column('q', sa.Integer(), nullable=True)," - "sa.ForeignKeyConstraint(['q'], ['bar.address.id'], )," - "sa.PrimaryKeyConstraint('id')," - "mysql_engine='InnoDB',postgresql_arg1='some_arg')" - ) - - def test_render_drop_table(self): - eq_( - autogenerate.render._drop_table(Table("sometable", MetaData()), - self.autogen_context), - "op.drop_table('sometable')" - ) - - def test_render_drop_table_w_schema(self): - eq_( - autogenerate.render._drop_table( - Table("sometable", MetaData(), schema='foo'), - self.autogen_context), - "op.drop_table('sometable', schema='foo')" - ) - - def test_render_add_column(self): - eq_( - autogenerate.render._add_column( - None, "foo", Column("x", Integer, server_default="5"), - self.autogen_context), - "op.add_column('foo', sa.Column('x', sa.Integer(), " - "server_default='5', nullable=True))" - ) - - def test_render_add_column_w_schema(self): - eq_( - autogenerate.render._add_column( - "foo", "bar", Column("x", Integer, server_default="5"), - self.autogen_context), - "op.add_column('bar', sa.Column('x', sa.Integer(), " - "server_default='5', nullable=True), schema='foo')" - ) - - def test_render_drop_column(self): - eq_( - autogenerate.render._drop_column( - None, "foo", Column("x", Integer, server_default="5"), - self.autogen_context), - - "op.drop_column('foo', 'x')" - ) - - def test_render_drop_column_w_schema(self): - eq_( - autogenerate.render._drop_column( - "foo", "bar", Column("x", Integer, server_default="5"), - self.autogen_context), - - "op.drop_column('bar', 'x', schema='foo')" - ) - - def test_render_quoted_server_default(self): - eq_( - autogenerate.render._render_server_default( - "nextval('group_to_perm_group_to_perm_id_seq'::regclass)", - self.autogen_context), - '"nextval(\'group_to_perm_group_to_perm_id_seq\'::regclass)"' - ) - - def test_render_col_with_server_default(self): - c = Column('updated_at', TIMESTAMP(), - server_default='TIMEZONE("utc", CURRENT_TIMESTAMP)', - nullable=False) - result = autogenerate.render._render_column( - c, self.autogen_context - ) - eq_( - result, - 'sa.Column(\'updated_at\', sa.TIMESTAMP(), ' - 'server_default=\'TIMEZONE("utc", CURRENT_TIMESTAMP)\', ' - 'nullable=False)' - ) - - def test_render_col_autoinc_false_mysql(self): - c = Column('some_key', Integer, primary_key=True, autoincrement=False) - Table('some_table', MetaData(), c) - result = autogenerate.render._render_column( - c, self.autogen_context - ) - eq_( - result, - 'sa.Column(\'some_key\', sa.Integer(), ' - 'autoincrement=False, ' - 'nullable=False)' - ) - - def test_render_custom(self): - - def render(type_, obj, context): - if type_ == "foreign_key": - return None - if type_ == "column": - if obj.name == "y": - return None - else: - return "col(%s)" % obj.name - return "render:%s" % type_ - - autogen_context = {"opts": { - 'render_item': render, - 'alembic_module_prefix': 'sa.' - }} - - t = Table('t', MetaData(), - Column('x', Integer), - Column('y', Integer), - PrimaryKeyConstraint('x'), - ForeignKeyConstraint(['x'], ['y']) - ) - result = autogenerate.render._add_table( - t, autogen_context - ) - eq_( - result, """sa.create_table('t', -col(x), -render:primary_key\n)""" - ) - - def test_render_modify_type(self): - eq_ignore_whitespace( - autogenerate.render._modify_col( - "sometable", "somecolumn", - self.autogen_context, - type_=CHAR(10), existing_type=CHAR(20)), - "op.alter_column('sometable', 'somecolumn', " - "existing_type=sa.CHAR(length=20), type_=sa.CHAR(length=10))" - ) - - def test_render_modify_type_w_schema(self): - eq_ignore_whitespace( - autogenerate.render._modify_col( - "sometable", "somecolumn", - self.autogen_context, - type_=CHAR(10), existing_type=CHAR(20), - schema='foo'), - "op.alter_column('sometable', 'somecolumn', " - "existing_type=sa.CHAR(length=20), type_=sa.CHAR(length=10), " - "schema='foo')" - ) - - def test_render_modify_nullable(self): - eq_ignore_whitespace( - autogenerate.render._modify_col( - "sometable", "somecolumn", - self.autogen_context, - existing_type=Integer(), - nullable=True), - "op.alter_column('sometable', 'somecolumn', " - "existing_type=sa.Integer(), nullable=True)" - ) - - def test_render_modify_nullable_w_schema(self): - eq_ignore_whitespace( - autogenerate.render._modify_col( - "sometable", "somecolumn", - self.autogen_context, - existing_type=Integer(), - nullable=True, schema='foo'), - "op.alter_column('sometable', 'somecolumn', " - "existing_type=sa.Integer(), nullable=True, schema='foo')" - ) - - def test_render_fk_constraint_kwarg(self): - m = MetaData() - t1 = Table('t', m, Column('c', Integer)) - t2 = Table('t2', m, Column('c_rem', Integer)) - - fk = ForeignKeyConstraint([t1.c.c], [t2.c.c_rem], onupdate="CASCADE") - if not util.sqla_08: - t1.append_constraint(fk) - - eq_ignore_whitespace( - re.sub(r"u'", "'", autogenerate.render._render_constraint(fk, self.autogen_context)), - "sa.ForeignKeyConstraint(['c'], ['t2.c_rem'], onupdate='CASCADE')" - ) - - fk = ForeignKeyConstraint([t1.c.c], [t2.c.c_rem], ondelete="CASCADE") - if not util.sqla_08: - t1.append_constraint(fk) - - eq_ignore_whitespace( - re.sub(r"u'", "'", autogenerate.render._render_constraint(fk, self.autogen_context)), - "sa.ForeignKeyConstraint(['c'], ['t2.c_rem'], ondelete='CASCADE')" - ) - - fk = ForeignKeyConstraint([t1.c.c], [t2.c.c_rem], deferrable=True) - if not util.sqla_08: - t1.append_constraint(fk) - eq_ignore_whitespace( - re.sub(r"u'", "'", autogenerate.render._render_constraint(fk, self.autogen_context)), - "sa.ForeignKeyConstraint(['c'], ['t2.c_rem'], deferrable=True)" - ) - - fk = ForeignKeyConstraint([t1.c.c], [t2.c.c_rem], initially="XYZ") - if not util.sqla_08: - t1.append_constraint(fk) - eq_ignore_whitespace( - re.sub(r"u'", "'", autogenerate.render._render_constraint(fk, self.autogen_context)), - "sa.ForeignKeyConstraint(['c'], ['t2.c_rem'], initially='XYZ')" - ) - - def test_render_fk_constraint_use_alter(self): - m = MetaData() - Table('t', m, Column('c', Integer)) - t2 = Table('t2', m, Column('c_rem', Integer, - ForeignKey('t.c', name="fk1", use_alter=True))) - const = list(t2.foreign_keys)[0].constraint - - eq_ignore_whitespace( - autogenerate.render._render_constraint(const, self.autogen_context), - "sa.ForeignKeyConstraint(['c_rem'], ['t.c'], " - "name='fk1', use_alter=True)" - ) - - def test_render_check_constraint_literal(self): - eq_ignore_whitespace( - autogenerate.render._render_check_constraint( - CheckConstraint("im a constraint", name='cc1'), - self.autogen_context - ), - "sa.CheckConstraint('im a constraint', name='cc1')" - ) - - def test_render_check_constraint_sqlexpr(self): - c = column('c') - five = literal_column('5') - ten = literal_column('10') - eq_ignore_whitespace( - autogenerate.render._render_check_constraint( - CheckConstraint(and_(c > five, c < ten)), - self.autogen_context - ), - "sa.CheckConstraint('c > 5 AND c < 10')" - ) - - def test_render_unique_constraint_opts(self): - m = MetaData() - t = Table('t', m, Column('c', Integer)) - eq_ignore_whitespace( - autogenerate.render._render_unique_constraint( - UniqueConstraint(t.c.c, name='uq_1', deferrable='XYZ'), - self.autogen_context - ), - "sa.UniqueConstraint('c', deferrable='XYZ', name='uq_1')" - ) - - def test_render_modify_nullable_w_default(self): - eq_ignore_whitespace( - autogenerate.render._modify_col( - "sometable", "somecolumn", - self.autogen_context, - existing_type=Integer(), - existing_server_default="5", - nullable=True), - "op.alter_column('sometable', 'somecolumn', " - "existing_type=sa.Integer(), nullable=True, " - "existing_server_default='5')" - ) - - def test_render_enum(self): - eq_ignore_whitespace( - autogenerate.render._repr_type( - "sa.", - Enum("one", "two", "three", name="myenum"), - self.autogen_context), - "sa.Enum('one', 'two', 'three', name='myenum')" - ) - eq_ignore_whitespace( - autogenerate.render._repr_type( - "sa.", - Enum("one", "two", "three"), - self.autogen_context), - "sa.Enum('one', 'two', 'three')" - ) - -# TODO: tests for dialect-specific type rendering + imports diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py index cfbcb56..bd1c0b4 100644 --- a/tests/test_postgresql.py +++ b/tests/test_postgresql.py @@ -10,10 +10,9 @@ from alembic.migration import MigrationContext from alembic.script import ScriptDirectory from . import db_for_dialect, eq_, staging_env, \ clear_staging_env, _no_sql_testing_config,\ - capture_context_buffer, requires_07, write_script + capture_context_buffer, requires_09, write_script class PGOfflineEnumTest(TestCase): - @requires_07 def setUp(self): staging_env() self.cfg = cfg = _no_sql_testing_config() @@ -67,6 +66,7 @@ def downgrade(): """ % self.rid) + @requires_09 def test_offline_inline_enum_create(self): self._inline_enum_script() with capture_context_buffer() as buf: @@ -82,6 +82,7 @@ def downgrade(): # no drop since we didn't emit events assert "DROP TYPE pgenum" not in buf.getvalue() + @requires_09 def test_offline_distinct_enum_create(self): self._distinct_enum_script() with capture_context_buffer() as buf: |