summaryrefslogtreecommitdiff
path: root/migrate/tests/changeset/test_changeset.py
diff options
context:
space:
mode:
authorBob Farrell <robertanthonyfarrell@gmail.com>2010-06-10 18:08:06 +0100
committerBob Farrell <robertanthonyfarrell@gmail.com>2010-06-10 18:08:06 +0100
commitfd8d3136832601a1835ff4e2eb1d2969ef9af165 (patch)
tree687051e151be86db885054977a2a99c0706bf95b /migrate/tests/changeset/test_changeset.py
parent330b0ad2ec9230d7ae39ed6198fd27a53c04e3f2 (diff)
downloadsqlalchemy-migrate-fd8d3136832601a1835ff4e2eb1d2969ef9af165.tar.gz
move tests/ directory into migrate/tests (much better form) and fix all import lines and other minor issues
Diffstat (limited to 'migrate/tests/changeset/test_changeset.py')
-rw-r--r--migrate/tests/changeset/test_changeset.py774
1 files changed, 774 insertions, 0 deletions
diff --git a/migrate/tests/changeset/test_changeset.py b/migrate/tests/changeset/test_changeset.py
new file mode 100644
index 0000000..beeb509
--- /dev/null
+++ b/migrate/tests/changeset/test_changeset.py
@@ -0,0 +1,774 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import sqlalchemy
+from sqlalchemy import *
+
+from migrate import changeset
+from migrate.changeset import *
+from migrate.changeset.schema import ColumnDelta
+from migrate.tests import fixture
+
+
+class TestAddDropColumn(fixture.DB):
+ """Test add/drop column through all possible interfaces
+ also test for constraints
+ """
+ level = fixture.DB.CONNECT
+ table_name = 'tmp_adddropcol'
+ table_int = 0
+
+ def _setup(self, url):
+ super(TestAddDropColumn, self)._setup(url)
+ self.meta = MetaData()
+ self.table = Table(self.table_name, self.meta,
+ Column('id', Integer, unique=True),
+ )
+ self.meta.bind = self.engine
+ if self.engine.has_table(self.table.name):
+ self.table.drop()
+ self.table.create()
+
+ def _teardown(self):
+ if self.engine.has_table(self.table.name):
+ self.table.drop()
+ self.meta.clear()
+ super(TestAddDropColumn,self)._teardown()
+
+ def run_(self, create_column_func, drop_column_func, *col_p, **col_k):
+ col_name = 'data'
+
+ def assert_numcols(num_of_expected_cols):
+ # number of cols should be correct in table object and in database
+ self.refresh_table(self.table_name)
+ result = len(self.table.c)
+
+ self.assertEquals(result, num_of_expected_cols),
+ if col_k.get('primary_key', None):
+ # new primary key: check its length too
+ result = len(self.table.primary_key)
+ self.assertEquals(result, num_of_expected_cols)
+
+ # we have 1 columns and there is no data column
+ assert_numcols(1)
+ self.assertTrue(getattr(self.table.c, 'data', None) is None)
+ if len(col_p) == 0:
+ col_p = [String(40)]
+ col = Column(col_name, *col_p, **col_k)
+ create_column_func(col)
+ assert_numcols(2)
+ # data column exists
+ self.assert_(self.table.c.data.type.length, 40)
+
+ col2 = self.table.c.data
+ drop_column_func(col2)
+ assert_numcols(1)
+
+ @fixture.usedb()
+ def test_undefined(self):
+ """Add/drop columns not yet defined in the table"""
+ def add_func(col):
+ return create_column(col, self.table)
+ def drop_func(col):
+ return drop_column(col, self.table)
+ return self.run_(add_func, drop_func)
+
+ @fixture.usedb()
+ def test_defined(self):
+ """Add/drop columns already defined in the table"""
+ def add_func(col):
+ self.meta.clear()
+ self.table = Table(self.table_name, self.meta,
+ Column('id', Integer, primary_key=True),
+ col,
+ )
+ return create_column(col)
+ def drop_func(col):
+ return drop_column(col)
+ return self.run_(add_func, drop_func)
+
+ @fixture.usedb()
+ def test_method_bound(self):
+ """Add/drop columns via column methods; columns bound to a table
+ ie. no table parameter passed to function
+ """
+ def add_func(col):
+ self.assert_(col.table is None, col.table)
+ self.table.append_column(col)
+ return col.create()
+ def drop_func(col):
+ #self.assert_(col.table is None,col.table)
+ #self.table.append_column(col)
+ return col.drop()
+ return self.run_(add_func, drop_func)
+
+ @fixture.usedb()
+ def test_method_notbound(self):
+ """Add/drop columns via column methods; columns not bound to a table"""
+ def add_func(col):
+ return col.create(self.table)
+ def drop_func(col):
+ return col.drop(self.table)
+ return self.run_(add_func, drop_func)
+
+ @fixture.usedb()
+ def test_tablemethod_obj(self):
+ """Add/drop columns via table methods; by column object"""
+ def add_func(col):
+ return self.table.create_column(col)
+ def drop_func(col):
+ return self.table.drop_column(col)
+ return self.run_(add_func, drop_func)
+
+ @fixture.usedb()
+ def test_tablemethod_name(self):
+ """Add/drop columns via table methods; by column name"""
+ def add_func(col):
+ # must be bound to table
+ self.table.append_column(col)
+ return self.table.create_column(col.name)
+ def drop_func(col):
+ # Not necessarily bound to table
+ return self.table.drop_column(col.name)
+ return self.run_(add_func, drop_func)
+
+ @fixture.usedb()
+ def test_byname(self):
+ """Add/drop columns via functions; by table object and column name"""
+ def add_func(col):
+ self.table.append_column(col)
+ return create_column(col.name, self.table)
+ def drop_func(col):
+ return drop_column(col.name, self.table)
+ return self.run_(add_func, drop_func)
+
+ @fixture.usedb()
+ def test_drop_column_not_in_table(self):
+ """Drop column by name"""
+ def add_func(col):
+ return self.table.create_column(col)
+ def drop_func(col):
+ self.table.c.remove(col)
+ return self.table.drop_column(col.name)
+ self.run_(add_func, drop_func)
+
+ @fixture.usedb()
+ def test_fk(self):
+ """Can create columns with foreign keys"""
+ # create FK's target
+ reftable = Table('tmp_ref', self.meta,
+ Column('id', Integer, primary_key=True),
+ )
+ if self.engine.has_table(reftable.name):
+ reftable.drop()
+ reftable.create()
+
+ # create column with fk
+ col = Column('data', Integer, ForeignKey(reftable.c.id))
+ if self.url.startswith('sqlite'):
+ self.assertRaises(changeset.exceptions.NotSupportedError,
+ col.create, self.table)
+ else:
+ col.create(self.table)
+
+ # check if constraint is added
+ for cons in self.table.constraints:
+ if isinstance(cons, sqlalchemy.schema.ForeignKeyConstraint):
+ break
+ else:
+ self.fail('No constraint found')
+
+ # TODO: test on db level if constraints work
+
+ self.assertEqual(reftable.c.id.name, col.foreign_keys[0].column.name)
+ col.drop(self.table)
+
+ if self.engine.has_table(reftable.name):
+ reftable.drop()
+
+ @fixture.usedb(not_supported='sqlite')
+ def test_pk(self):
+ """Can create columns with primary key"""
+ col = Column('data', Integer, nullable=False)
+ self.assertRaises(changeset.exceptions.InvalidConstraintError,
+ col.create, self.table, primary_key_name=True)
+ col.create(self.table, primary_key_name='data_pkey')
+
+ # check if constraint was added (cannot test on objects)
+ self.table.insert(values={'data': 4}).execute()
+ try:
+ self.table.insert(values={'data': 4}).execute()
+ except (sqlalchemy.exc.IntegrityError,
+ sqlalchemy.exc.ProgrammingError):
+ pass
+ else:
+ self.fail()
+
+ col.drop()
+
+ @fixture.usedb(not_supported='mysql')
+ def test_check(self):
+ """Can create columns with check constraint"""
+ col = Column('data',
+ Integer,
+ sqlalchemy.schema.CheckConstraint('data > 4'))
+ col.create(self.table)
+
+ # check if constraint was added (cannot test on objects)
+ self.table.insert(values={'data': 5}).execute()
+ try:
+ self.table.insert(values={'data': 3}).execute()
+ except (sqlalchemy.exc.IntegrityError,
+ sqlalchemy.exc.ProgrammingError):
+ pass
+ else:
+ self.fail()
+
+ col.drop()
+
+ @fixture.usedb(not_supported='sqlite')
+ def test_unique(self):
+ """Can create columns with unique constraint"""
+ self.assertRaises(changeset.exceptions.InvalidConstraintError,
+ Column('data', Integer, unique=True).create, self.table)
+ col = Column('data', Integer)
+ col.create(self.table, unique_name='data_unique')
+
+ # check if constraint was added (cannot test on objects)
+ self.table.insert(values={'data': 5}).execute()
+ try:
+ self.table.insert(values={'data': 5}).execute()
+ except (sqlalchemy.exc.IntegrityError,
+ sqlalchemy.exc.ProgrammingError):
+ pass
+ else:
+ self.fail()
+
+ col.drop(self.table)
+
+# TODO: remove already attached columns with indexes, uniques, pks, fks ..
+ @fixture.usedb()
+ def test_index(self):
+ """Can create columns with indexes"""
+ self.assertRaises(changeset.exceptions.InvalidConstraintError,
+ Column('data', Integer).create, self.table, index_name=True)
+ col = Column('data', Integer)
+ col.create(self.table, index_name='ix_data')
+
+ # check if index was added
+ self.table.insert(values={'data': 5}).execute()
+ try:
+ self.table.insert(values={'data': 5}).execute()
+ except (sqlalchemy.exc.IntegrityError,
+ sqlalchemy.exc.ProgrammingError):
+ pass
+ else:
+ self.fail()
+
+ Index('ix_data', col).drop(bind=self.engine)
+ col.drop()
+
+ @fixture.usedb()
+ def test_server_defaults(self):
+ """Can create columns with server_default values"""
+ col = Column('data', String(244), server_default='foobar')
+ col.create(self.table)
+
+ self.table.insert(values={'id': 10}).execute()
+ row = self._select_row()
+ self.assertEqual(u'foobar', row['data'])
+
+ col.drop()
+
+ @fixture.usedb()
+ def test_populate_default(self):
+ """Test populate_default=True"""
+ def default():
+ return 'foobar'
+ col = Column('data', String(244), default=default)
+ col.create(self.table, populate_default=True)
+
+ self.table.insert(values={'id': 10}).execute()
+ row = self._select_row()
+ self.assertEqual(u'foobar', row['data'])
+
+ col.drop()
+
+ # TODO: test sequence
+ # TODO: test quoting
+ # TODO: test non-autoname constraints
+
+
+class TestRename(fixture.DB):
+ """Tests for table and index rename methods"""
+ level = fixture.DB.CONNECT
+ meta = MetaData()
+
+ def _setup(self, url):
+ super(TestRename, self)._setup(url)
+ self.meta.bind = self.engine
+
+ @fixture.usedb(not_supported='firebird')
+ def test_rename_table(self):
+ """Tables can be renamed"""
+ c_name = 'col_1'
+ table_name1 = 'name_one'
+ table_name2 = 'name_two'
+ index_name1 = 'x' + table_name1
+ index_name2 = 'x' + table_name2
+
+ self.meta.clear()
+ self.column = Column(c_name, Integer)
+ self.table = Table(table_name1, self.meta, self.column)
+ self.index = Index(index_name1, self.column, unique=False)
+
+ if self.engine.has_table(self.table.name):
+ self.table.drop()
+ if self.engine.has_table(table_name2):
+ tmp = Table(table_name2, self.meta, autoload=True)
+ tmp.drop()
+ tmp.deregister()
+ del tmp
+ self.table.create()
+
+ def assert_table_name(expected, skip_object_check=False):
+ """Refresh a table via autoload
+ SA has changed some since this test was written; we now need to do
+ meta.clear() upon reloading a table - clear all rather than a
+ select few. So, this works only if we're working with one table at
+ a time (else, others will vanish too).
+ """
+ if not skip_object_check:
+ # Table object check
+ self.assertEquals(self.table.name,expected)
+ newname = self.table.name
+ else:
+ # we know the object's name isn't consistent: just assign it
+ newname = expected
+ # Table DB check
+ self.meta.clear()
+ self.table = Table(newname, self.meta, autoload=True)
+ self.assertEquals(self.table.name, expected)
+
+ def assert_index_name(expected, skip_object_check=False):
+ if not skip_object_check:
+ # Index object check
+ self.assertEquals(self.index.name, expected)
+ else:
+ # object is inconsistent
+ self.index.name = expected
+ # TODO: Index DB check
+
+ try:
+ # Table renames
+ assert_table_name(table_name1)
+ rename_table(self.table, table_name2)
+ assert_table_name(table_name2)
+ self.table.rename(table_name1)
+ assert_table_name(table_name1)
+
+ # test by just the string
+ rename_table(table_name1, table_name2, engine=self.engine)
+ assert_table_name(table_name2, True) # object not updated
+
+ # Index renames
+ if self.url.startswith('sqlite') or self.url.startswith('mysql'):
+ self.assertRaises(changeset.exceptions.NotSupportedError,
+ self.index.rename, index_name2)
+ else:
+ assert_index_name(index_name1)
+ rename_index(self.index, index_name2, engine=self.engine)
+ assert_index_name(index_name2)
+ self.index.rename(index_name1)
+ assert_index_name(index_name1)
+
+ # test by just the string
+ rename_index(index_name1, index_name2, engine=self.engine)
+ assert_index_name(index_name2, True)
+
+ finally:
+ if self.table.exists():
+ self.table.drop()
+
+
+class TestColumnChange(fixture.DB):
+ level = fixture.DB.CONNECT
+ table_name = 'tmp_colchange'
+
+ def _setup(self, url):
+ super(TestColumnChange, self)._setup(url)
+ self.meta = MetaData(self.engine)
+ self.table = Table(self.table_name, self.meta,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(40), server_default=DefaultClause("tluafed"),
+ nullable=True),
+ )
+ if self.table.exists():
+ self.table.drop()
+ try:
+ self.table.create()
+ except sqlalchemy.exceptions.SQLError, e:
+ # SQLite: database schema has changed
+ if not self.url.startswith('sqlite://'):
+ raise
+
+ def _teardown(self):
+ if self.table.exists():
+ try:
+ self.table.drop(self.engine)
+ except sqlalchemy.exceptions.SQLError,e:
+ # SQLite: database schema has changed
+ if not self.url.startswith('sqlite://'):
+ raise
+ super(TestColumnChange, self)._teardown()
+
+ @fixture.usedb()
+ def test_rename(self):
+ """Can rename a column"""
+ def num_rows(col, content):
+ return len(list(self.table.select(col == content).execute()))
+ # Table content should be preserved in changed columns
+ content = "fgsfds"
+ self.engine.execute(self.table.insert(), data=content, id=42)
+ self.assertEquals(num_rows(self.table.c.data, content), 1)
+
+ # ...as a function, given a column object and the new name
+ alter_column('data', name='data2', table=self.table)
+ self.refresh_table()
+ alter_column(self.table.c.data2, name='atad')
+ self.refresh_table(self.table.name)
+ self.assert_('data' not in self.table.c.keys())
+ self.assert_('atad' in self.table.c.keys())
+ self.assertEquals(num_rows(self.table.c.atad, content), 1)
+
+ # ...as a method, given a new name
+ self.table.c.atad.alter(name='data')
+ self.refresh_table(self.table.name)
+ self.assert_('atad' not in self.table.c.keys())
+ self.table.c.data # Should not raise exception
+ self.assertEquals(num_rows(self.table.c.data, content), 1)
+
+ # ...as a function, given a new object
+ col = Column('atad', String(40), server_default=self.table.c.data.server_default)
+ alter_column(self.table.c.data, col)
+ self.refresh_table(self.table.name)
+ self.assert_('data' not in self.table.c.keys())
+ self.table.c.atad # Should not raise exception
+ self.assertEquals(num_rows(self.table.c.atad, content), 1)
+
+ # ...as a method, given a new object
+ col = Column('data', String(40), server_default=self.table.c.atad.server_default)
+ self.table.c.atad.alter(col)
+ self.refresh_table(self.table.name)
+ self.assert_('atad' not in self.table.c.keys())
+ self.table.c.data # Should not raise exception
+ self.assertEquals(num_rows(self.table.c.data,content), 1)
+
+ @fixture.usedb()
+ def test_type(self):
+ """Can change a column's type"""
+ # Entire column definition given
+ self.table.c.data.alter(Column('data', String(42)))
+ self.refresh_table(self.table.name)
+ self.assert_(isinstance(self.table.c.data.type, String))
+ self.assertEquals(self.table.c.data.type.length, 42)
+
+ # Just the new type
+ self.table.c.data.alter(type=String(43))
+ self.refresh_table(self.table.name)
+ self.assert_(isinstance(self.table.c.data.type, String))
+ self.assertEquals(self.table.c.data.type.length, 43)
+
+ # Different type
+ self.assert_(isinstance(self.table.c.id.type, Integer))
+ self.assertEquals(self.table.c.id.nullable, False)
+
+ if not self.engine.name == 'firebird':
+ self.table.c.id.alter(type=String(20))
+ self.assertEquals(self.table.c.id.nullable, False)
+ self.refresh_table(self.table.name)
+ self.assert_(isinstance(self.table.c.id.type, String))
+
+ @fixture.usedb()
+ def test_default(self):
+ """Can change a column's server_default value (DefaultClauses only)
+ Only DefaultClauses are changed here: others are managed by the
+ application / by SA
+ """
+ self.assertEquals(self.table.c.data.server_default.arg, 'tluafed')
+
+ # Just the new default
+ default = 'my_default'
+ self.table.c.data.alter(server_default=DefaultClause(default))
+ self.refresh_table(self.table.name)
+ #self.assertEquals(self.table.c.data.server_default.arg,default)
+ # TextClause returned by autoload
+ self.assert_(default in str(self.table.c.data.server_default.arg))
+ self.engine.execute(self.table.insert(), id=12)
+ row = self._select_row()
+ self.assertEqual(row['data'], default)
+
+ # Column object
+ default = 'your_default'
+ self.table.c.data.alter(Column('data', String(40), server_default=DefaultClause(default)))
+ self.refresh_table(self.table.name)
+ self.assert_(default in str(self.table.c.data.server_default.arg))
+
+ # Drop/remove default
+ self.table.c.data.alter(server_default=None)
+ self.assertEqual(self.table.c.data.server_default, None)
+
+ self.refresh_table(self.table.name)
+ # server_default isn't necessarily None for Oracle
+ #self.assert_(self.table.c.data.server_default is None,self.table.c.data.server_default)
+ self.engine.execute(self.table.insert(), id=11)
+ if SQLA_06:
+ row = self.table.select(self.table.c.id == 11).execution_options(autocommit=True).execute().fetchone()
+ else:
+ row = self.table.select(self.table.c.id == 11, autocommit=True).execute().fetchone()
+ self.assert_(row['data'] is None, row['data'])
+
+ @fixture.usedb(not_supported='firebird')
+ def test_null(self):
+ """Can change a column's null constraint"""
+ self.assertEquals(self.table.c.data.nullable, True)
+
+ # Column object
+ self.table.c.data.alter(Column('data', String(40), nullable=False))
+ self.table.nullable = None
+ self.refresh_table(self.table.name)
+ self.assertEquals(self.table.c.data.nullable, False)
+
+ # Just the new status
+ self.table.c.data.alter(nullable=True)
+ self.refresh_table(self.table.name)
+ self.assertEquals(self.table.c.data.nullable, True)
+
+ @fixture.usedb()
+ def test_alter_metadata(self):
+ """Test if alter_metadata is respected"""
+
+ self.table.c.data.alter(Column('data', String(100)))
+
+ self.assert_(isinstance(self.table.c.data.type, String))
+ self.assertEqual(self.table.c.data.type.length, 100)
+
+ # nothing should change
+ self.table.c.data.alter(Column('data', String(200)), alter_metadata=False)
+ self.assert_(isinstance(self.table.c.data.type, String))
+ self.assertEqual(self.table.c.data.type.length, 100)
+
+ @fixture.usedb()
+ def test_alter_returns_delta(self):
+ """Test if alter constructs return delta"""
+
+ delta = self.table.c.data.alter(Column('data', String(100)))
+ self.assert_('type' in delta)
+
+ @fixture.usedb()
+ def test_alter_all(self):
+ """Tests all alter changes at one time"""
+ # test for each db separately
+ # since currently some dont support everything
+
+ # test pre settings
+ self.assertEqual(self.table.c.data.nullable, True)
+ self.assertEqual(self.table.c.data.server_default.arg, 'tluafed')
+ self.assertEqual(self.table.c.data.name, 'data')
+ self.assertTrue(isinstance(self.table.c.data.type, String))
+ self.assertTrue(self.table.c.data.type.length, 40)
+
+ kw = dict(nullable=False,
+ server_default='foobar',
+ name='data_new',
+ type=String(50),
+ alter_metadata=True)
+ if self.engine.name == 'firebird':
+ del kw['nullable']
+ self.table.c.data.alter(**kw)
+
+ # test altered objects
+ self.assertEqual(self.table.c.data.server_default.arg, 'foobar')
+ if not self.engine.name == 'firebird':
+ self.assertEqual(self.table.c.data.nullable, False)
+ self.assertEqual(self.table.c.data.name, 'data_new')
+ self.assertEqual(self.table.c.data.type.length, 50)
+
+ self.refresh_table(self.table.name)
+
+ # test post settings
+ if not self.engine.name == 'firebird':
+ self.assertEqual(self.table.c.data_new.nullable, False)
+ self.assertEqual(self.table.c.data_new.name, 'data_new')
+ self.assertTrue(isinstance(self.table.c.data_new.type, String))
+ self.assertTrue(self.table.c.data_new.type.length, 50)
+
+ # insert data and assert default
+ self.table.insert(values={'id': 10}).execute()
+ row = self._select_row()
+ self.assertEqual(u'foobar', row['data_new'])
+
+
+class TestColumnDelta(fixture.DB):
+ """Tests ColumnDelta class"""
+
+ level = fixture.DB.CONNECT
+ table_name = 'tmp_coldelta'
+ table_int = 0
+
+ def _setup(self, url):
+ super(TestColumnDelta, self)._setup(url)
+ self.meta = MetaData()
+ self.table = Table(self.table_name, self.meta,
+ Column('ids', String(10)),
+ )
+ self.meta.bind = self.engine
+ if self.engine.has_table(self.table.name):
+ self.table.drop()
+ self.table.create()
+
+ def _teardown(self):
+ if self.engine.has_table(self.table.name):
+ self.table.drop()
+ self.meta.clear()
+ super(TestColumnDelta,self)._teardown()
+
+ def mkcol(self, name='id', type=String, *p, **k):
+ return Column(name, type, *p, **k)
+
+ def verify(self, expected, original, *p, **k):
+ self.delta = ColumnDelta(original, *p, **k)
+ result = self.delta.keys()
+ result.sort()
+ self.assertEquals(expected, result)
+ return self.delta
+
+ def test_deltas_two_columns(self):
+ """Testing ColumnDelta with two columns"""
+ col_orig = self.mkcol(primary_key=True)
+ col_new = self.mkcol(name='ids', primary_key=True)
+ self.verify([], col_orig, col_orig)
+ self.verify(['name'], col_orig, col_orig, 'ids')
+ self.verify(['name'], col_orig, col_orig, name='ids')
+ self.verify(['name'], col_orig, col_new)
+ self.verify(['name', 'type'], col_orig, col_new, type=String)
+
+ # Type comparisons
+ self.verify([], self.mkcol(type=String), self.mkcol(type=String))
+ self.verify(['type'], self.mkcol(type=String), self.mkcol(type=Integer))
+ self.verify(['type'], self.mkcol(type=String), self.mkcol(type=String(42)))
+ self.verify([], self.mkcol(type=String(42)), self.mkcol(type=String(42)))
+ self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=String(42)))
+ self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=Text(24)))
+
+ # Other comparisons
+ self.verify(['primary_key'], self.mkcol(nullable=False), self.mkcol(primary_key=True))
+
+ # PK implies nullable=False
+ self.verify(['nullable', 'primary_key'], self.mkcol(nullable=True), self.mkcol(primary_key=True))
+ self.verify([], self.mkcol(primary_key=True), self.mkcol(primary_key=True))
+ self.verify(['nullable'], self.mkcol(nullable=True), self.mkcol(nullable=False))
+ self.verify([], self.mkcol(nullable=True), self.mkcol(nullable=True))
+ self.verify([], self.mkcol(server_default=None), self.mkcol(server_default=None))
+ self.verify([], self.mkcol(server_default='42'), self.mkcol(server_default='42'))
+
+ # test server default
+ delta = self.verify(['server_default'], self.mkcol(), self.mkcol('id', String, DefaultClause('foobar')))
+ self.assertEqual(delta['server_default'].arg, 'foobar')
+
+ self.verify([], self.mkcol(server_default='foobar'), self.mkcol('id', String, DefaultClause('foobar')))
+ self.verify(['type'], self.mkcol(server_default='foobar'), self.mkcol('id', Text, DefaultClause('foobar')))
+
+ # test alter_metadata
+ col = self.mkcol(server_default='foobar')
+ self.verify(['type'], col, self.mkcol('id', Text, DefaultClause('foobar')), alter_metadata=True)
+ self.assert_(isinstance(col.type, Text))
+
+ col = self.mkcol()
+ self.verify(['name', 'server_default', 'type'], col, self.mkcol('beep', Text, DefaultClause('foobar')), alter_metadata=True)
+ self.assert_(isinstance(col.type, Text))
+ self.assertEqual(col.name, 'beep')
+ self.assertEqual(col.server_default.arg, 'foobar')
+
+ col = self.mkcol()
+ self.verify(['name', 'server_default', 'type'], col, self.mkcol('beep', Text, DefaultClause('foobar')), alter_metadata=False)
+ self.assertFalse(isinstance(col.type, Text))
+ self.assertNotEqual(col.name, 'beep')
+ self.assertFalse(col.server_default)
+
+ @fixture.usedb()
+ def test_deltas_zero_columns(self):
+ """Testing ColumnDelta with zero columns"""
+
+ self.verify(['name'], 'ids', table=self.table, name='hey')
+
+ # test reflection
+ self.verify(['type'], 'ids', table=self.table.name, type=String(80), engine=self.engine)
+ self.verify(['type'], 'ids', table=self.table.name, type=String(80), metadata=self.meta)
+
+ # check if alter_metadata is respected
+ self.meta.clear()
+ delta = self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=True, metadata=self.meta)
+ self.assert_(self.table.name in self.meta)
+ self.assertEqual(delta.result_column.type.length, 80)
+ self.assertEqual(self.meta.tables.get(self.table.name).c.ids.type.length, 80)
+
+ self.meta.clear()
+ self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=False, engine=self.engine)
+ self.assert_(self.table.name not in self.meta)
+
+ self.meta.clear()
+ self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=False, metadata=self.meta)
+ self.assert_(self.table.name not in self.meta)
+
+ # test defaults
+ self.meta.clear()
+ self.verify(['server_default'], 'ids', table=self.table.name, server_default='foobar', alter_metadata=True, metadata=self.meta)
+ self.meta.tables.get(self.table.name).c.ids.server_default.arg == 'foobar'
+
+ # test missing parameters
+ self.assertRaises(ValueError, ColumnDelta, table=self.table.name)
+ self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=True)
+ self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=False)
+
+ def test_deltas_one_column(self):
+ """Testing ColumnDelta with one column"""
+ col_orig = self.mkcol(primary_key=True)
+
+ self.verify([], col_orig)
+ self.verify(['name'], col_orig, 'ids')
+ # Parameters are always executed, even if they're 'unchanged'
+ # (We can't assume given column is up-to-date)
+ self.verify(['name', 'primary_key', 'type'], col_orig, 'id', Integer, primary_key=True)
+ self.verify(['name', 'primary_key', 'type'], col_orig, name='id', type=Integer, primary_key=True)
+
+ # Change name, given an up-to-date definition and the current name
+ delta = self.verify(['name'], col_orig, name='blah')
+ self.assertEquals(delta.get('name'), 'blah')
+ self.assertEquals(delta.current_name, 'id')
+
+ # check if alter_metadata is respected
+ col_orig = self.mkcol(primary_key=True)
+ self.verify(['name', 'type'], col_orig, name='id12', type=Text, alter_metadata=True)
+ self.assert_(isinstance(col_orig.type, Text))
+ self.assertEqual(col_orig.name, 'id12')
+
+ col_orig = self.mkcol(primary_key=True)
+ self.verify(['name', 'type'], col_orig, name='id12', type=Text, alter_metadata=False)
+ self.assert_(isinstance(col_orig.type, String))
+ self.assertEqual(col_orig.name, 'id')
+
+ # test server default
+ col_orig = self.mkcol(primary_key=True)
+ delta = self.verify(['server_default'], col_orig, DefaultClause('foobar'))
+ self.assertEqual(delta['server_default'].arg, 'foobar')
+
+ delta = self.verify(['server_default'], col_orig, server_default=DefaultClause('foobar'))
+ self.assertEqual(delta['server_default'].arg, 'foobar')
+
+ # no change
+ col_orig = self.mkcol(server_default=DefaultClause('foobar'))
+ delta = self.verify(['type'], col_orig, DefaultClause('foobar'), type=PickleType)
+ self.assert_(isinstance(delta.result_column.type, PickleType))
+
+ # TODO: test server on update
+ # TODO: test bind metadata