diff options
author | Bob Farrell <robertanthonyfarrell@gmail.com> | 2010-06-10 18:08:06 +0100 |
---|---|---|
committer | Bob Farrell <robertanthonyfarrell@gmail.com> | 2010-06-10 18:08:06 +0100 |
commit | fd8d3136832601a1835ff4e2eb1d2969ef9af165 (patch) | |
tree | 687051e151be86db885054977a2a99c0706bf95b /migrate/tests/changeset/test_changeset.py | |
parent | 330b0ad2ec9230d7ae39ed6198fd27a53c04e3f2 (diff) | |
download | sqlalchemy-migrate-fd8d3136832601a1835ff4e2eb1d2969ef9af165.tar.gz |
move tests/ directory into migrate/tests (much better form) and fix all import lines and other minor issues
Diffstat (limited to 'migrate/tests/changeset/test_changeset.py')
-rw-r--r-- | migrate/tests/changeset/test_changeset.py | 774 |
1 files changed, 774 insertions, 0 deletions
diff --git a/migrate/tests/changeset/test_changeset.py b/migrate/tests/changeset/test_changeset.py new file mode 100644 index 0000000..beeb509 --- /dev/null +++ b/migrate/tests/changeset/test_changeset.py @@ -0,0 +1,774 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import sqlalchemy +from sqlalchemy import * + +from migrate import changeset +from migrate.changeset import * +from migrate.changeset.schema import ColumnDelta +from migrate.tests import fixture + + +class TestAddDropColumn(fixture.DB): + """Test add/drop column through all possible interfaces + also test for constraints + """ + level = fixture.DB.CONNECT + table_name = 'tmp_adddropcol' + table_int = 0 + + def _setup(self, url): + super(TestAddDropColumn, self)._setup(url) + self.meta = MetaData() + self.table = Table(self.table_name, self.meta, + Column('id', Integer, unique=True), + ) + self.meta.bind = self.engine + if self.engine.has_table(self.table.name): + self.table.drop() + self.table.create() + + def _teardown(self): + if self.engine.has_table(self.table.name): + self.table.drop() + self.meta.clear() + super(TestAddDropColumn,self)._teardown() + + def run_(self, create_column_func, drop_column_func, *col_p, **col_k): + col_name = 'data' + + def assert_numcols(num_of_expected_cols): + # number of cols should be correct in table object and in database + self.refresh_table(self.table_name) + result = len(self.table.c) + + self.assertEquals(result, num_of_expected_cols), + if col_k.get('primary_key', None): + # new primary key: check its length too + result = len(self.table.primary_key) + self.assertEquals(result, num_of_expected_cols) + + # we have 1 columns and there is no data column + assert_numcols(1) + self.assertTrue(getattr(self.table.c, 'data', None) is None) + if len(col_p) == 0: + col_p = [String(40)] + col = Column(col_name, *col_p, **col_k) + create_column_func(col) + assert_numcols(2) + # data column exists + self.assert_(self.table.c.data.type.length, 40) + + col2 = self.table.c.data + drop_column_func(col2) + assert_numcols(1) + + @fixture.usedb() + def test_undefined(self): + """Add/drop columns not yet defined in the table""" + def add_func(col): + return create_column(col, self.table) + def drop_func(col): + return drop_column(col, self.table) + return self.run_(add_func, drop_func) + + @fixture.usedb() + def test_defined(self): + """Add/drop columns already defined in the table""" + def add_func(col): + self.meta.clear() + self.table = Table(self.table_name, self.meta, + Column('id', Integer, primary_key=True), + col, + ) + return create_column(col) + def drop_func(col): + return drop_column(col) + return self.run_(add_func, drop_func) + + @fixture.usedb() + def test_method_bound(self): + """Add/drop columns via column methods; columns bound to a table + ie. no table parameter passed to function + """ + def add_func(col): + self.assert_(col.table is None, col.table) + self.table.append_column(col) + return col.create() + def drop_func(col): + #self.assert_(col.table is None,col.table) + #self.table.append_column(col) + return col.drop() + return self.run_(add_func, drop_func) + + @fixture.usedb() + def test_method_notbound(self): + """Add/drop columns via column methods; columns not bound to a table""" + def add_func(col): + return col.create(self.table) + def drop_func(col): + return col.drop(self.table) + return self.run_(add_func, drop_func) + + @fixture.usedb() + def test_tablemethod_obj(self): + """Add/drop columns via table methods; by column object""" + def add_func(col): + return self.table.create_column(col) + def drop_func(col): + return self.table.drop_column(col) + return self.run_(add_func, drop_func) + + @fixture.usedb() + def test_tablemethod_name(self): + """Add/drop columns via table methods; by column name""" + def add_func(col): + # must be bound to table + self.table.append_column(col) + return self.table.create_column(col.name) + def drop_func(col): + # Not necessarily bound to table + return self.table.drop_column(col.name) + return self.run_(add_func, drop_func) + + @fixture.usedb() + def test_byname(self): + """Add/drop columns via functions; by table object and column name""" + def add_func(col): + self.table.append_column(col) + return create_column(col.name, self.table) + def drop_func(col): + return drop_column(col.name, self.table) + return self.run_(add_func, drop_func) + + @fixture.usedb() + def test_drop_column_not_in_table(self): + """Drop column by name""" + def add_func(col): + return self.table.create_column(col) + def drop_func(col): + self.table.c.remove(col) + return self.table.drop_column(col.name) + self.run_(add_func, drop_func) + + @fixture.usedb() + def test_fk(self): + """Can create columns with foreign keys""" + # create FK's target + reftable = Table('tmp_ref', self.meta, + Column('id', Integer, primary_key=True), + ) + if self.engine.has_table(reftable.name): + reftable.drop() + reftable.create() + + # create column with fk + col = Column('data', Integer, ForeignKey(reftable.c.id)) + if self.url.startswith('sqlite'): + self.assertRaises(changeset.exceptions.NotSupportedError, + col.create, self.table) + else: + col.create(self.table) + + # check if constraint is added + for cons in self.table.constraints: + if isinstance(cons, sqlalchemy.schema.ForeignKeyConstraint): + break + else: + self.fail('No constraint found') + + # TODO: test on db level if constraints work + + self.assertEqual(reftable.c.id.name, col.foreign_keys[0].column.name) + col.drop(self.table) + + if self.engine.has_table(reftable.name): + reftable.drop() + + @fixture.usedb(not_supported='sqlite') + def test_pk(self): + """Can create columns with primary key""" + col = Column('data', Integer, nullable=False) + self.assertRaises(changeset.exceptions.InvalidConstraintError, + col.create, self.table, primary_key_name=True) + col.create(self.table, primary_key_name='data_pkey') + + # check if constraint was added (cannot test on objects) + self.table.insert(values={'data': 4}).execute() + try: + self.table.insert(values={'data': 4}).execute() + except (sqlalchemy.exc.IntegrityError, + sqlalchemy.exc.ProgrammingError): + pass + else: + self.fail() + + col.drop() + + @fixture.usedb(not_supported='mysql') + def test_check(self): + """Can create columns with check constraint""" + col = Column('data', + Integer, + sqlalchemy.schema.CheckConstraint('data > 4')) + col.create(self.table) + + # check if constraint was added (cannot test on objects) + self.table.insert(values={'data': 5}).execute() + try: + self.table.insert(values={'data': 3}).execute() + except (sqlalchemy.exc.IntegrityError, + sqlalchemy.exc.ProgrammingError): + pass + else: + self.fail() + + col.drop() + + @fixture.usedb(not_supported='sqlite') + def test_unique(self): + """Can create columns with unique constraint""" + self.assertRaises(changeset.exceptions.InvalidConstraintError, + Column('data', Integer, unique=True).create, self.table) + col = Column('data', Integer) + col.create(self.table, unique_name='data_unique') + + # check if constraint was added (cannot test on objects) + self.table.insert(values={'data': 5}).execute() + try: + self.table.insert(values={'data': 5}).execute() + except (sqlalchemy.exc.IntegrityError, + sqlalchemy.exc.ProgrammingError): + pass + else: + self.fail() + + col.drop(self.table) + +# TODO: remove already attached columns with indexes, uniques, pks, fks .. + @fixture.usedb() + def test_index(self): + """Can create columns with indexes""" + self.assertRaises(changeset.exceptions.InvalidConstraintError, + Column('data', Integer).create, self.table, index_name=True) + col = Column('data', Integer) + col.create(self.table, index_name='ix_data') + + # check if index was added + self.table.insert(values={'data': 5}).execute() + try: + self.table.insert(values={'data': 5}).execute() + except (sqlalchemy.exc.IntegrityError, + sqlalchemy.exc.ProgrammingError): + pass + else: + self.fail() + + Index('ix_data', col).drop(bind=self.engine) + col.drop() + + @fixture.usedb() + def test_server_defaults(self): + """Can create columns with server_default values""" + col = Column('data', String(244), server_default='foobar') + col.create(self.table) + + self.table.insert(values={'id': 10}).execute() + row = self._select_row() + self.assertEqual(u'foobar', row['data']) + + col.drop() + + @fixture.usedb() + def test_populate_default(self): + """Test populate_default=True""" + def default(): + return 'foobar' + col = Column('data', String(244), default=default) + col.create(self.table, populate_default=True) + + self.table.insert(values={'id': 10}).execute() + row = self._select_row() + self.assertEqual(u'foobar', row['data']) + + col.drop() + + # TODO: test sequence + # TODO: test quoting + # TODO: test non-autoname constraints + + +class TestRename(fixture.DB): + """Tests for table and index rename methods""" + level = fixture.DB.CONNECT + meta = MetaData() + + def _setup(self, url): + super(TestRename, self)._setup(url) + self.meta.bind = self.engine + + @fixture.usedb(not_supported='firebird') + def test_rename_table(self): + """Tables can be renamed""" + c_name = 'col_1' + table_name1 = 'name_one' + table_name2 = 'name_two' + index_name1 = 'x' + table_name1 + index_name2 = 'x' + table_name2 + + self.meta.clear() + self.column = Column(c_name, Integer) + self.table = Table(table_name1, self.meta, self.column) + self.index = Index(index_name1, self.column, unique=False) + + if self.engine.has_table(self.table.name): + self.table.drop() + if self.engine.has_table(table_name2): + tmp = Table(table_name2, self.meta, autoload=True) + tmp.drop() + tmp.deregister() + del tmp + self.table.create() + + def assert_table_name(expected, skip_object_check=False): + """Refresh a table via autoload + SA has changed some since this test was written; we now need to do + meta.clear() upon reloading a table - clear all rather than a + select few. So, this works only if we're working with one table at + a time (else, others will vanish too). + """ + if not skip_object_check: + # Table object check + self.assertEquals(self.table.name,expected) + newname = self.table.name + else: + # we know the object's name isn't consistent: just assign it + newname = expected + # Table DB check + self.meta.clear() + self.table = Table(newname, self.meta, autoload=True) + self.assertEquals(self.table.name, expected) + + def assert_index_name(expected, skip_object_check=False): + if not skip_object_check: + # Index object check + self.assertEquals(self.index.name, expected) + else: + # object is inconsistent + self.index.name = expected + # TODO: Index DB check + + try: + # Table renames + assert_table_name(table_name1) + rename_table(self.table, table_name2) + assert_table_name(table_name2) + self.table.rename(table_name1) + assert_table_name(table_name1) + + # test by just the string + rename_table(table_name1, table_name2, engine=self.engine) + assert_table_name(table_name2, True) # object not updated + + # Index renames + if self.url.startswith('sqlite') or self.url.startswith('mysql'): + self.assertRaises(changeset.exceptions.NotSupportedError, + self.index.rename, index_name2) + else: + assert_index_name(index_name1) + rename_index(self.index, index_name2, engine=self.engine) + assert_index_name(index_name2) + self.index.rename(index_name1) + assert_index_name(index_name1) + + # test by just the string + rename_index(index_name1, index_name2, engine=self.engine) + assert_index_name(index_name2, True) + + finally: + if self.table.exists(): + self.table.drop() + + +class TestColumnChange(fixture.DB): + level = fixture.DB.CONNECT + table_name = 'tmp_colchange' + + def _setup(self, url): + super(TestColumnChange, self)._setup(url) + self.meta = MetaData(self.engine) + self.table = Table(self.table_name, self.meta, + Column('id', Integer, primary_key=True), + Column('data', String(40), server_default=DefaultClause("tluafed"), + nullable=True), + ) + if self.table.exists(): + self.table.drop() + try: + self.table.create() + except sqlalchemy.exceptions.SQLError, e: + # SQLite: database schema has changed + if not self.url.startswith('sqlite://'): + raise + + def _teardown(self): + if self.table.exists(): + try: + self.table.drop(self.engine) + except sqlalchemy.exceptions.SQLError,e: + # SQLite: database schema has changed + if not self.url.startswith('sqlite://'): + raise + super(TestColumnChange, self)._teardown() + + @fixture.usedb() + def test_rename(self): + """Can rename a column""" + def num_rows(col, content): + return len(list(self.table.select(col == content).execute())) + # Table content should be preserved in changed columns + content = "fgsfds" + self.engine.execute(self.table.insert(), data=content, id=42) + self.assertEquals(num_rows(self.table.c.data, content), 1) + + # ...as a function, given a column object and the new name + alter_column('data', name='data2', table=self.table) + self.refresh_table() + alter_column(self.table.c.data2, name='atad') + self.refresh_table(self.table.name) + self.assert_('data' not in self.table.c.keys()) + self.assert_('atad' in self.table.c.keys()) + self.assertEquals(num_rows(self.table.c.atad, content), 1) + + # ...as a method, given a new name + self.table.c.atad.alter(name='data') + self.refresh_table(self.table.name) + self.assert_('atad' not in self.table.c.keys()) + self.table.c.data # Should not raise exception + self.assertEquals(num_rows(self.table.c.data, content), 1) + + # ...as a function, given a new object + col = Column('atad', String(40), server_default=self.table.c.data.server_default) + alter_column(self.table.c.data, col) + self.refresh_table(self.table.name) + self.assert_('data' not in self.table.c.keys()) + self.table.c.atad # Should not raise exception + self.assertEquals(num_rows(self.table.c.atad, content), 1) + + # ...as a method, given a new object + col = Column('data', String(40), server_default=self.table.c.atad.server_default) + self.table.c.atad.alter(col) + self.refresh_table(self.table.name) + self.assert_('atad' not in self.table.c.keys()) + self.table.c.data # Should not raise exception + self.assertEquals(num_rows(self.table.c.data,content), 1) + + @fixture.usedb() + def test_type(self): + """Can change a column's type""" + # Entire column definition given + self.table.c.data.alter(Column('data', String(42))) + self.refresh_table(self.table.name) + self.assert_(isinstance(self.table.c.data.type, String)) + self.assertEquals(self.table.c.data.type.length, 42) + + # Just the new type + self.table.c.data.alter(type=String(43)) + self.refresh_table(self.table.name) + self.assert_(isinstance(self.table.c.data.type, String)) + self.assertEquals(self.table.c.data.type.length, 43) + + # Different type + self.assert_(isinstance(self.table.c.id.type, Integer)) + self.assertEquals(self.table.c.id.nullable, False) + + if not self.engine.name == 'firebird': + self.table.c.id.alter(type=String(20)) + self.assertEquals(self.table.c.id.nullable, False) + self.refresh_table(self.table.name) + self.assert_(isinstance(self.table.c.id.type, String)) + + @fixture.usedb() + def test_default(self): + """Can change a column's server_default value (DefaultClauses only) + Only DefaultClauses are changed here: others are managed by the + application / by SA + """ + self.assertEquals(self.table.c.data.server_default.arg, 'tluafed') + + # Just the new default + default = 'my_default' + self.table.c.data.alter(server_default=DefaultClause(default)) + self.refresh_table(self.table.name) + #self.assertEquals(self.table.c.data.server_default.arg,default) + # TextClause returned by autoload + self.assert_(default in str(self.table.c.data.server_default.arg)) + self.engine.execute(self.table.insert(), id=12) + row = self._select_row() + self.assertEqual(row['data'], default) + + # Column object + default = 'your_default' + self.table.c.data.alter(Column('data', String(40), server_default=DefaultClause(default))) + self.refresh_table(self.table.name) + self.assert_(default in str(self.table.c.data.server_default.arg)) + + # Drop/remove default + self.table.c.data.alter(server_default=None) + self.assertEqual(self.table.c.data.server_default, None) + + self.refresh_table(self.table.name) + # server_default isn't necessarily None for Oracle + #self.assert_(self.table.c.data.server_default is None,self.table.c.data.server_default) + self.engine.execute(self.table.insert(), id=11) + if SQLA_06: + row = self.table.select(self.table.c.id == 11).execution_options(autocommit=True).execute().fetchone() + else: + row = self.table.select(self.table.c.id == 11, autocommit=True).execute().fetchone() + self.assert_(row['data'] is None, row['data']) + + @fixture.usedb(not_supported='firebird') + def test_null(self): + """Can change a column's null constraint""" + self.assertEquals(self.table.c.data.nullable, True) + + # Column object + self.table.c.data.alter(Column('data', String(40), nullable=False)) + self.table.nullable = None + self.refresh_table(self.table.name) + self.assertEquals(self.table.c.data.nullable, False) + + # Just the new status + self.table.c.data.alter(nullable=True) + self.refresh_table(self.table.name) + self.assertEquals(self.table.c.data.nullable, True) + + @fixture.usedb() + def test_alter_metadata(self): + """Test if alter_metadata is respected""" + + self.table.c.data.alter(Column('data', String(100))) + + self.assert_(isinstance(self.table.c.data.type, String)) + self.assertEqual(self.table.c.data.type.length, 100) + + # nothing should change + self.table.c.data.alter(Column('data', String(200)), alter_metadata=False) + self.assert_(isinstance(self.table.c.data.type, String)) + self.assertEqual(self.table.c.data.type.length, 100) + + @fixture.usedb() + def test_alter_returns_delta(self): + """Test if alter constructs return delta""" + + delta = self.table.c.data.alter(Column('data', String(100))) + self.assert_('type' in delta) + + @fixture.usedb() + def test_alter_all(self): + """Tests all alter changes at one time""" + # test for each db separately + # since currently some dont support everything + + # test pre settings + self.assertEqual(self.table.c.data.nullable, True) + self.assertEqual(self.table.c.data.server_default.arg, 'tluafed') + self.assertEqual(self.table.c.data.name, 'data') + self.assertTrue(isinstance(self.table.c.data.type, String)) + self.assertTrue(self.table.c.data.type.length, 40) + + kw = dict(nullable=False, + server_default='foobar', + name='data_new', + type=String(50), + alter_metadata=True) + if self.engine.name == 'firebird': + del kw['nullable'] + self.table.c.data.alter(**kw) + + # test altered objects + self.assertEqual(self.table.c.data.server_default.arg, 'foobar') + if not self.engine.name == 'firebird': + self.assertEqual(self.table.c.data.nullable, False) + self.assertEqual(self.table.c.data.name, 'data_new') + self.assertEqual(self.table.c.data.type.length, 50) + + self.refresh_table(self.table.name) + + # test post settings + if not self.engine.name == 'firebird': + self.assertEqual(self.table.c.data_new.nullable, False) + self.assertEqual(self.table.c.data_new.name, 'data_new') + self.assertTrue(isinstance(self.table.c.data_new.type, String)) + self.assertTrue(self.table.c.data_new.type.length, 50) + + # insert data and assert default + self.table.insert(values={'id': 10}).execute() + row = self._select_row() + self.assertEqual(u'foobar', row['data_new']) + + +class TestColumnDelta(fixture.DB): + """Tests ColumnDelta class""" + + level = fixture.DB.CONNECT + table_name = 'tmp_coldelta' + table_int = 0 + + def _setup(self, url): + super(TestColumnDelta, self)._setup(url) + self.meta = MetaData() + self.table = Table(self.table_name, self.meta, + Column('ids', String(10)), + ) + self.meta.bind = self.engine + if self.engine.has_table(self.table.name): + self.table.drop() + self.table.create() + + def _teardown(self): + if self.engine.has_table(self.table.name): + self.table.drop() + self.meta.clear() + super(TestColumnDelta,self)._teardown() + + def mkcol(self, name='id', type=String, *p, **k): + return Column(name, type, *p, **k) + + def verify(self, expected, original, *p, **k): + self.delta = ColumnDelta(original, *p, **k) + result = self.delta.keys() + result.sort() + self.assertEquals(expected, result) + return self.delta + + def test_deltas_two_columns(self): + """Testing ColumnDelta with two columns""" + col_orig = self.mkcol(primary_key=True) + col_new = self.mkcol(name='ids', primary_key=True) + self.verify([], col_orig, col_orig) + self.verify(['name'], col_orig, col_orig, 'ids') + self.verify(['name'], col_orig, col_orig, name='ids') + self.verify(['name'], col_orig, col_new) + self.verify(['name', 'type'], col_orig, col_new, type=String) + + # Type comparisons + self.verify([], self.mkcol(type=String), self.mkcol(type=String)) + self.verify(['type'], self.mkcol(type=String), self.mkcol(type=Integer)) + self.verify(['type'], self.mkcol(type=String), self.mkcol(type=String(42))) + self.verify([], self.mkcol(type=String(42)), self.mkcol(type=String(42))) + self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=String(42))) + self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=Text(24))) + + # Other comparisons + self.verify(['primary_key'], self.mkcol(nullable=False), self.mkcol(primary_key=True)) + + # PK implies nullable=False + self.verify(['nullable', 'primary_key'], self.mkcol(nullable=True), self.mkcol(primary_key=True)) + self.verify([], self.mkcol(primary_key=True), self.mkcol(primary_key=True)) + self.verify(['nullable'], self.mkcol(nullable=True), self.mkcol(nullable=False)) + self.verify([], self.mkcol(nullable=True), self.mkcol(nullable=True)) + self.verify([], self.mkcol(server_default=None), self.mkcol(server_default=None)) + self.verify([], self.mkcol(server_default='42'), self.mkcol(server_default='42')) + + # test server default + delta = self.verify(['server_default'], self.mkcol(), self.mkcol('id', String, DefaultClause('foobar'))) + self.assertEqual(delta['server_default'].arg, 'foobar') + + self.verify([], self.mkcol(server_default='foobar'), self.mkcol('id', String, DefaultClause('foobar'))) + self.verify(['type'], self.mkcol(server_default='foobar'), self.mkcol('id', Text, DefaultClause('foobar'))) + + # test alter_metadata + col = self.mkcol(server_default='foobar') + self.verify(['type'], col, self.mkcol('id', Text, DefaultClause('foobar')), alter_metadata=True) + self.assert_(isinstance(col.type, Text)) + + col = self.mkcol() + self.verify(['name', 'server_default', 'type'], col, self.mkcol('beep', Text, DefaultClause('foobar')), alter_metadata=True) + self.assert_(isinstance(col.type, Text)) + self.assertEqual(col.name, 'beep') + self.assertEqual(col.server_default.arg, 'foobar') + + col = self.mkcol() + self.verify(['name', 'server_default', 'type'], col, self.mkcol('beep', Text, DefaultClause('foobar')), alter_metadata=False) + self.assertFalse(isinstance(col.type, Text)) + self.assertNotEqual(col.name, 'beep') + self.assertFalse(col.server_default) + + @fixture.usedb() + def test_deltas_zero_columns(self): + """Testing ColumnDelta with zero columns""" + + self.verify(['name'], 'ids', table=self.table, name='hey') + + # test reflection + self.verify(['type'], 'ids', table=self.table.name, type=String(80), engine=self.engine) + self.verify(['type'], 'ids', table=self.table.name, type=String(80), metadata=self.meta) + + # check if alter_metadata is respected + self.meta.clear() + delta = self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=True, metadata=self.meta) + self.assert_(self.table.name in self.meta) + self.assertEqual(delta.result_column.type.length, 80) + self.assertEqual(self.meta.tables.get(self.table.name).c.ids.type.length, 80) + + self.meta.clear() + self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=False, engine=self.engine) + self.assert_(self.table.name not in self.meta) + + self.meta.clear() + self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=False, metadata=self.meta) + self.assert_(self.table.name not in self.meta) + + # test defaults + self.meta.clear() + self.verify(['server_default'], 'ids', table=self.table.name, server_default='foobar', alter_metadata=True, metadata=self.meta) + self.meta.tables.get(self.table.name).c.ids.server_default.arg == 'foobar' + + # test missing parameters + self.assertRaises(ValueError, ColumnDelta, table=self.table.name) + self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=True) + self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=False) + + def test_deltas_one_column(self): + """Testing ColumnDelta with one column""" + col_orig = self.mkcol(primary_key=True) + + self.verify([], col_orig) + self.verify(['name'], col_orig, 'ids') + # Parameters are always executed, even if they're 'unchanged' + # (We can't assume given column is up-to-date) + self.verify(['name', 'primary_key', 'type'], col_orig, 'id', Integer, primary_key=True) + self.verify(['name', 'primary_key', 'type'], col_orig, name='id', type=Integer, primary_key=True) + + # Change name, given an up-to-date definition and the current name + delta = self.verify(['name'], col_orig, name='blah') + self.assertEquals(delta.get('name'), 'blah') + self.assertEquals(delta.current_name, 'id') + + # check if alter_metadata is respected + col_orig = self.mkcol(primary_key=True) + self.verify(['name', 'type'], col_orig, name='id12', type=Text, alter_metadata=True) + self.assert_(isinstance(col_orig.type, Text)) + self.assertEqual(col_orig.name, 'id12') + + col_orig = self.mkcol(primary_key=True) + self.verify(['name', 'type'], col_orig, name='id12', type=Text, alter_metadata=False) + self.assert_(isinstance(col_orig.type, String)) + self.assertEqual(col_orig.name, 'id') + + # test server default + col_orig = self.mkcol(primary_key=True) + delta = self.verify(['server_default'], col_orig, DefaultClause('foobar')) + self.assertEqual(delta['server_default'].arg, 'foobar') + + delta = self.verify(['server_default'], col_orig, server_default=DefaultClause('foobar')) + self.assertEqual(delta['server_default'].arg, 'foobar') + + # no change + col_orig = self.mkcol(server_default=DefaultClause('foobar')) + delta = self.verify(['type'], col_orig, DefaultClause('foobar'), type=PickleType) + self.assert_(isinstance(delta.result_column.type, PickleType)) + + # TODO: test server on update + # TODO: test bind metadata |