diff options
Diffstat (limited to 'test/engine/test_reflection.py')
-rw-r--r-- | test/engine/test_reflection.py | 500 |
1 files changed, 285 insertions, 215 deletions
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py index be53b0549..1cbc9ab59 100644 --- a/test/engine/test_reflection.py +++ b/test/engine/test_reflection.py @@ -15,8 +15,10 @@ metadata, users = None, None class ReflectionTest(TestBase, ComparesTables): - @testing.exclude('mssql', '<', (10, 0, 0), 'Date is only supported on MSSQL 2008+') - @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely') + @testing.exclude('mssql', '<', (10, 0, 0), + 'Date is only supported on MSSQL 2008+') + @testing.exclude('mysql', '<', (4, 1, 1), + 'early types are squirrely') def test_basic_reflection(self): meta = MetaData(testing.db) @@ -39,20 +41,24 @@ class ReflectionTest(TestBase, ComparesTables): test_needs_fk=True, ) - addresses = Table('engine_email_addresses', meta, - Column('address_id', sa.Integer, primary_key = True), - Column('remote_user_id', sa.Integer, sa.ForeignKey(users.c.user_id)), + addresses = Table( + 'engine_email_addresses', + meta, + Column('address_id', sa.Integer, primary_key=True), + Column('remote_user_id', sa.Integer, + sa.ForeignKey(users.c.user_id)), Column('email_address', sa.String(20)), test_needs_fk=True, - ) + ) meta.create_all() try: meta2 = MetaData() - reflected_users = Table('engine_users', meta2, autoload=True, + reflected_users = Table('engine_users', meta2, + autoload=True, autoload_with=testing.db) - reflected_addresses = Table('engine_email_addresses', meta2, - autoload=True, autoload_with=testing.db) + reflected_addresses = Table('engine_email_addresses', + meta2, autoload=True, autoload_with=testing.db) self.assert_tables_equal(users, reflected_users) self.assert_tables_equal(addresses, reflected_addresses) finally: @@ -60,34 +66,33 @@ class ReflectionTest(TestBase, ComparesTables): def test_two_foreign_keys(self): meta = MetaData(testing.db) - t1 = Table('t1', meta, - Column('id', sa.Integer, primary_key=True), - Column('t2id', sa.Integer, sa.ForeignKey('t2.id')), - Column('t3id', sa.Integer, sa.ForeignKey('t3.id')), - test_needs_fk=True - ) - t2 = Table('t2', meta, - Column('id', sa.Integer, primary_key=True), - test_needs_fk=True - ) - t3 = Table('t3', meta, - Column('id', sa.Integer, primary_key=True), - test_needs_fk=True - ) + t1 = Table( + 't1', + meta, + Column('id', sa.Integer, primary_key=True), + Column('t2id', sa.Integer, sa.ForeignKey('t2.id')), + Column('t3id', sa.Integer, sa.ForeignKey('t3.id')), + test_needs_fk=True, + ) + t2 = Table('t2', meta, Column('id', sa.Integer, + primary_key=True), test_needs_fk=True) + t3 = Table('t3', meta, Column('id', sa.Integer, + primary_key=True), test_needs_fk=True) meta.create_all() try: meta2 = MetaData() - t1r, t2r, t3r = [Table(x, meta2, autoload=True, autoload_with=testing.db) for x in ('t1', 't2', 't3')] - + t1r, t2r, t3r = [Table(x, meta2, autoload=True, + autoload_with=testing.db) for x in ('t1', + 't2', 't3')] assert t1r.c.t2id.references(t2r.c.id) assert t1r.c.t3id.references(t3r.c.id) - finally: meta.drop_all() def test_nonexistent(self): meta = MetaData(testing.db) - assert_raises(sa.exc.NoSuchTableError, Table, "nonexistent", meta, autoload=True) + assert_raises(sa.exc.NoSuchTableError, Table, 'nonexistent', + meta, autoload=True) def test_include_columns(self): meta = MetaData(testing.db) @@ -156,7 +161,8 @@ class ReflectionTest(TestBase, ComparesTables): Column('data', sa.String(50)), ) t2 = Table('test2', meta, - Column('id', sa.Integer, sa.ForeignKey('test.id'), primary_key=True), + Column('id', sa.Integer, sa.ForeignKey('test.id'), + primary_key=True), Column('id2', sa.Integer, primary_key=True), Column('data', sa.String(50)), ) @@ -218,8 +224,9 @@ class ReflectionTest(TestBase, ComparesTables): table.drop() def test_override_pkfk(self): - """test that you can override columns which contain foreign keys to other reflected tables, - where the foreign key column is also a primary key column""" + """test that you can override columns which contain foreign keys + to other reflected tables, where the foreign key column is also + a primary key column""" meta = MetaData(testing.db) users = Table('users', meta, @@ -258,8 +265,9 @@ class ReflectionTest(TestBase, ComparesTables): meta.drop_all() def test_override_nonexistent_fk(self): - """test that you can override columns and create new foreign keys to other reflected tables - which have no foreign keys. this is common with MySQL MyISAM tables.""" + """test that you can override columns and create new foreign + keys to other reflected tables which have no foreign keys. this + is common with MySQL MyISAM tables.""" meta = MetaData(testing.db) users = Table('users', meta, @@ -273,38 +281,44 @@ class ReflectionTest(TestBase, ComparesTables): meta.create_all() try: meta2 = MetaData(testing.db) - a2 = Table('addresses', meta2, - Column('user_id', sa.Integer, sa.ForeignKey('users.id')), - autoload=True) + a2 = Table('addresses', meta2, + Column('user_id',sa.Integer, sa.ForeignKey('users.id')), + autoload=True) u2 = Table('users', meta2, autoload=True) - assert len(a2.c.user_id.foreign_keys) == 1 assert len(a2.foreign_keys) == 1 assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] - assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] - assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id - assert u2.join(a2).onclause.compare(u2.c.id==a2.c.user_id) - + assert [c.parent for c in a2.c.user_id.foreign_keys] \ + == [a2.c.user_id] + assert list(a2.c.user_id.foreign_keys)[0].parent \ + is a2.c.user_id + assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) meta3 = MetaData(testing.db) + u3 = Table('users', meta3, autoload=True) - a3 = Table('addresses', meta3, - Column('user_id', sa.Integer, sa.ForeignKey('users.id')), - autoload=True) - assert u3.join(a3).onclause.compare(u3.c.id==a3.c.user_id) + a3 = Table('addresses', meta3, Column('user_id', + sa.Integer, sa.ForeignKey('users.id')), + autoload=True) + assert u3.join(a3).onclause.compare(u3.c.id == a3.c.user_id) meta4 = MetaData(testing.db) - u4 = Table('users', meta4, - Column('id', sa.Integer, key='u_id', primary_key=True), - autoload=True) - a4 = Table('addresses', meta4, - Column('id', sa.Integer, key='street', primary_key=True), - Column('street', sa.String(30), key='user_id'), - Column('user_id', sa.Integer, sa.ForeignKey('users.u_id'), - key='id'), - autoload=True) - assert u4.join(a4).onclause.compare(u4.c.u_id==a4.c.id) + u4 = Table('users', meta4, + Column('id', sa.Integer, key='u_id', primary_key=True), + autoload=True) + + a4 = Table( + 'addresses', + meta4, + Column('id', sa.Integer, key='street', + primary_key=True), + Column('street', sa.String(30), key='user_id'), + Column('user_id', sa.Integer, sa.ForeignKey('users.u_id' + ), key='id'), + autoload=True, + ) + assert u4.join(a4).onclause.compare(u4.c.u_id == a4.c.id) assert list(u4.primary_key) == [u4.c.u_id] assert len(u4.columns) == 2 assert len(u4.constraints) == 1 @@ -331,10 +345,11 @@ class ReflectionTest(TestBase, ComparesTables): meta.create_all() try: m2 = MetaData(testing.db) - a2 = Table('a', m2, Column('x', sa.Integer, primary_key=True, key='x1'), autoload=True) + a2 = Table('a', m2, + Column('x', sa.Integer, primary_key=True, key='x1'), + autoload=True) b2 = Table('b', m2, autoload=True) - - assert a2.join(b2).onclause.compare(a2.c.x1==b2.c.y) + assert a2.join(b2).onclause.compare(a2.c.x1 == b2.c.y) assert b2.c.y.references(a2.c.x1) finally: meta.drop_all() @@ -368,9 +383,9 @@ class ReflectionTest(TestBase, ComparesTables): @testing.exclude('mysql', '<', (4, 1, 1), 'innodb funkiness') def test_override_existing_fk(self): - """test that you can override columns and specify new foreign keys to other reflected tables, - on columns which *do* already have that foreign key, and that the FK is not duped. - """ + """test that you can override columns and specify new foreign + keys to other reflected tables, on columns which *do* already + have that foreign key, and that the FK is not duped. """ meta = MetaData(testing.db) users = Table('users', meta, @@ -385,78 +400,88 @@ class ReflectionTest(TestBase, ComparesTables): meta.create_all() try: meta2 = MetaData(testing.db) - a2 = Table('addresses', meta2, - Column('user_id', sa.Integer, sa.ForeignKey('users.id')), - autoload=True) + a2 = Table('addresses', meta2, + Column('user_id',sa.Integer, sa.ForeignKey('users.id')), + autoload=True) u2 = Table('users', meta2, autoload=True) - s = sa.select([a2]) + assert s.c.user_id is not None assert len(a2.foreign_keys) == 1 assert len(a2.c.user_id.foreign_keys) == 1 assert len(a2.constraints) == 2 assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] - assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] - assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id - assert u2.join(a2).onclause.compare(u2.c.id==a2.c.user_id) - + assert [c.parent for c in a2.c.user_id.foreign_keys] \ + == [a2.c.user_id] + assert list(a2.c.user_id.foreign_keys)[0].parent \ + is a2.c.user_id + assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) + meta2 = MetaData(testing.db) - u2 = Table('users', meta2, - Column('id', sa.Integer, primary_key=True), - autoload=True) - a2 = Table('addresses', meta2, - Column('id', sa.Integer, primary_key=True), - Column('user_id', sa.Integer, sa.ForeignKey('users.id')), - autoload=True) - + u2 = Table('users', meta2, Column('id', sa.Integer, + primary_key=True), autoload=True) + a2 = Table('addresses', meta2, Column('id', sa.Integer, + primary_key=True), Column('user_id', sa.Integer, + sa.ForeignKey('users.id')), autoload=True) s = sa.select([a2]) + assert s.c.user_id is not None assert len(a2.foreign_keys) == 1 assert len(a2.c.user_id.foreign_keys) == 1 assert len(a2.constraints) == 2 assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] - assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] - assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id - assert u2.join(a2).onclause.compare(u2.c.id==a2.c.user_id) + assert [c.parent for c in a2.c.user_id.foreign_keys] \ + == [a2.c.user_id] + assert list(a2.c.user_id.foreign_keys)[0].parent \ + is a2.c.user_id + assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) + finally: meta.drop_all() @testing.exclude('mysql', '<', (4, 1, 1), 'innodb funkiness') def test_use_existing(self): meta = MetaData(testing.db) - users = Table('users', meta, + users = Table('users', meta, + Column('id', sa.Integer, primary_key=True), + Column('name', sa.String(30)), + test_needs_fk=True) + addresses = Table( + 'addresses', + meta, Column('id', sa.Integer, primary_key=True), - Column('name', sa.String(30)), - test_needs_fk=True) - addresses = Table('addresses', meta, - Column('id', sa.Integer,primary_key=True), Column('user_id', sa.Integer, sa.ForeignKey('users.id')), Column('data', sa.String(100)), - test_needs_fk=True) - + test_needs_fk=True, + ) meta.create_all() try: meta2 = MetaData(testing.db) - addresses = Table('addresses', meta2, Column('data', sa.Unicode), autoload=True) + addresses = Table('addresses', meta2, Column('data', + sa.Unicode), autoload=True) try: - users = Table('users', meta2, Column('name', sa.Unicode), autoload=True) + users = Table('users', meta2, Column('name', + sa.Unicode), autoload=True) assert False except sa.exc.InvalidRequestError, err: - assert str(err) == "Table 'users' is already defined for this MetaData instance. Specify 'useexisting=True' to redefine options and columns on an existing Table object." - - users = Table('users', meta2, Column('name', sa.Unicode), autoload=True, useexisting=True) + assert str(err) \ + == "Table 'users' is already defined for this "\ + "MetaData instance. Specify 'useexisting=True' "\ + "to redefine options and columns on an existing "\ + "Table object." + users = Table('users', meta2, Column('name', sa.Unicode), + autoload=True, useexisting=True) assert isinstance(users.c.name.type, sa.Unicode) - assert not users.quote - - users = Table('users', meta2, quote=True, autoload=True, useexisting=True) + users = Table('users', meta2, quote=True, autoload=True, + useexisting=True) assert users.quote - finally: meta.drop_all() def test_pks_not_uniques(self): - """test that primary key reflection not tripped up by unique indexes""" + """test that primary key reflection not tripped up by unique + indexes""" testing.db.execute(""" CREATE TABLE book ( @@ -484,9 +509,10 @@ class ReflectionTest(TestBase, ComparesTables): Column('slot', sa.String(128)), ) - assert_raises_message(sa.exc.InvalidRequestError, - "Could not find table 'pkgs' with which to generate a foreign key", - metadata.create_all) + assert_raises_message(sa.exc.InvalidRequestError, + "Could not find table 'pkgs' with which " + "to generate a foreign key", + metadata.create_all) def test_composite_pks(self): """test reflection of a composite primary key""" @@ -531,58 +557,67 @@ class ReflectionTest(TestBase, ComparesTables): Column('bar', sa.Integer), Column('lala', sa.Integer), Column('data', sa.String(50)), - sa.ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']), + sa.ForeignKeyConstraint(['foo', 'bar', 'lala'], + ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho' + ]), test_needs_fk=True, ) meta.create_all() try: meta2 = MetaData() - table = Table('multi', meta2, autoload=True, autoload_with=testing.db) - table2 = Table('multi2', meta2, autoload=True, autoload_with=testing.db) + table = Table('multi', meta2, autoload=True, + autoload_with=testing.db) + table2 = Table('multi2', meta2, autoload=True, + autoload_with=testing.db) self.assert_tables_equal(multi, table) self.assert_tables_equal(multi2, table2) j = sa.join(table, table2) - self.assert_(sa.and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause)) + self.assert_(sa.and_(table.c.multi_id == table2.c.foo, + table.c.multi_rev == table2.c.bar, + table.c.multi_hoho + == table2.c.lala).compare(j.onclause)) finally: meta.drop_all() @testing.crashes('oracle', 'FIXME: unknown, confirm not fails_on') def test_reserved(self): - # check a table that uses an SQL reserved name doesn't cause an error + + # check a table that uses an SQL reserved name doesn't cause an + # error + meta = MetaData(testing.db) - table_a = Table('select', meta, - Column('not', sa.Integer, primary_key=True), - Column('from', sa.String(12), nullable=False), - sa.UniqueConstraint('from', name='when')) + table_a = Table('select', meta, Column('not', sa.Integer, + primary_key=True), Column('from', + sa.String(12), nullable=False), + sa.UniqueConstraint('from', name='when')) sa.Index('where', table_a.c['from']) - # There's currently no way to calculate identifier case normalization - # in isolation, so... + # There's currently no way to calculate identifier case + # normalization in isolation, so... + if testing.against('firebird', 'oracle', 'maxdb'): check_col = 'TRUE' else: check_col = 'true' quoter = meta.bind.dialect.identifier_preparer.quote_identifier - - table_b = Table('false', meta, - Column('create', sa.Integer, primary_key=True), - Column('true', sa.Integer, sa.ForeignKey('select.not')), - sa.CheckConstraint('%s <> 1' % quoter(check_col), - name='limit')) - - table_c = Table('is', meta, - Column('or', sa.Integer, nullable=False, primary_key=True), - Column('join', sa.Integer, nullable=False, primary_key=True), - sa.PrimaryKeyConstraint('or', 'join', name='to')) - + + table_b = Table('false', meta, + Column('create', sa.Integer, primary_key=True), + Column('true', sa.Integer,sa.ForeignKey('select.not')), + sa.CheckConstraint('%s <> 1' + % quoter(check_col), name='limit') + ) + + table_c = Table('is', meta, + Column('or', sa.Integer, nullable=False, primary_key=True), + Column('join', sa.Integer, nullable=False, primary_key=True), + sa.PrimaryKeyConstraint('or', 'join', name='to') + ) index_c = sa.Index('else', table_c.c.join) - meta.create_all() - index_c.drop() - meta2 = MetaData(testing.db) try: table_a2 = Table('select', meta2, autoload=True) @@ -643,9 +678,9 @@ class ReflectionTest(TestBase, ComparesTables): m8 = MetaData(reflect=True) self.assert_(False) except sa.exc.ArgumentError, e: - self.assert_( - e.args[0] == - "A bind must be supplied in conjunction with reflect=True") + self.assert_(e.args[0] + == 'A bind must be supplied in ' + 'conjunction with reflect=True') finally: baseline.drop_all() @@ -710,33 +745,40 @@ class ReflectionTest(TestBase, ComparesTables): meta.drop_all() class CreateDropTest(TestBase): + @classmethod def setup_class(cls): global metadata, users metadata = MetaData() - users = Table('users', metadata, - Column('user_id', sa.Integer, sa.Sequence('user_id_seq', optional=True), primary_key=True), - Column('user_name', sa.String(40)), - ) - + users = Table('users', metadata, + Column('user_id', sa.Integer, + sa.Sequence('user_id_seq', optional=True), + primary_key=True), + Column('user_name',sa.String(40))) + addresses = Table('email_addresses', metadata, - Column('address_id', sa.Integer, sa.Sequence('address_id_seq', optional=True), primary_key = True), - Column('user_id', sa.Integer, sa.ForeignKey(users.c.user_id)), - Column('email_address', sa.String(40)), - ) - - orders = Table('orders', metadata, - Column('order_id', sa.Integer, sa.Sequence('order_id_seq', optional=True), primary_key = True), - Column('user_id', sa.Integer, sa.ForeignKey(users.c.user_id)), + Column('address_id', sa.Integer, + sa.Sequence('address_id_seq', optional=True), + primary_key=True), + Column('user_id', + sa.Integer, sa.ForeignKey(users.c.user_id)), + Column('email_address', sa.String(40))) + + orders = Table( + 'orders', + metadata, + Column('order_id', sa.Integer, sa.Sequence('order_id_seq', + optional=True), primary_key=True), + Column('user_id', sa.Integer, + sa.ForeignKey(users.c.user_id)), Column('description', sa.String(50)), Column('isopen', sa.Integer), - ) - - orderitems = Table('items', metadata, - Column('item_id', sa.INT, sa.Sequence('items_id_seq', optional=True), primary_key = True), - Column('order_id', sa.INT, sa.ForeignKey("orders")), - Column('item_name', sa.VARCHAR(50)), - ) + ) + orderitems = Table('items', metadata, Column('item_id', sa.INT, + sa.Sequence('items_id_seq', optional=True), + primary_key=True), Column('order_id', + sa.INT, sa.ForeignKey('orders')), + Column('item_name', sa.VARCHAR(50))) def test_sorter( self ): tables = metadata.sorted_tables @@ -777,10 +819,14 @@ class CreateDropTest(TestBase): def test_tablenames(self): metadata.create_all(bind=testing.db) - # we only check to see if all the explicitly created tables are there, rather than - # assertEqual -- the test db could have "extra" tables if there is a misconfigured - # template. (*cough* tsearch2 w/ the pg windows installer.) - self.assert_(not set(metadata.tables) - set(testing.db.table_names())) + + # we only check to see if all the explicitly created tables are + # there, rather than assertEqual -- the test db could have + # "extra" tables if there is a misconfigured template. (*cough* + # tsearch2 w/ the pg windows installer.) + + self.assert_(not set(metadata.tables) + - set(testing.db.table_names())) metadata.drop_all(bind=testing.db) class SchemaManipulationTest(TestBase): @@ -788,7 +834,9 @@ class SchemaManipulationTest(TestBase): meta = MetaData() users = Table('users', meta, Column('id', sa.Integer)) - addresses = Table('addresses', meta, Column('id', sa.Integer), Column('user_id', sa.Integer)) + addresses = Table('addresses', meta, + Column('id', sa.Integer), + Column('user_id', sa.Integer)) fk = sa.ForeignKeyConstraint(['user_id'],[users.c.id]) @@ -798,35 +846,46 @@ class SchemaManipulationTest(TestBase): assert addresses.constraints == set([addresses.primary_key, fk]) class UnicodeReflectionTest(TestBase): + @testing.requires.unicode_connections def test_basic(self): try: - # the 'convert_unicode' should not get in the way of the reflection - # process. reflecttable for oracle, postgresql (others?) expect non-unicode - # strings in result sets/bind params - bind = engines.utf8_engine(options={'convert_unicode':True}) - metadata = MetaData(bind) + # the 'convert_unicode' should not get in the way of the + # reflection process. reflecttable for oracle, postgresql + # (others?) expect non-unicode strings in result sets/bind + # params + + bind = engines.utf8_engine(options={'convert_unicode' + : True}) + metadata = MetaData(bind) if testing.against('sybase', 'maxdb', 'oracle', 'mssql'): names = set([u'plain']) else: - names = set([u'plain', u'Unit\u00e9ble', u'\u6e2c\u8a66']) - + names = set([u'plain', u'Unit\u00e9ble', u'\u6e2c\u8a66' + ]) for name in names: - Table(name, metadata, Column('id', sa.Integer, sa.Sequence(name + "_id_seq"), - primary_key=True)) + Table(name, metadata, Column('id', sa.Integer, + sa.Sequence(name + '_id_seq'), primary_key=True)) metadata.create_all() - reflected = set(bind.table_names()) + # Jython 2.5 on Java 5 lacks unicodedata.normalize - if not names.issubset(reflected) and hasattr(unicodedata, 'normalize'): - # Python source files in the utf-8 coding seem to normalize - # literals as NFC (and the above are explicitly NFC). Maybe - # this database normalizes NFD on reflection. - nfc = set([unicodedata.normalize('NFC', n) for n in names]) + + if not names.issubset(reflected) and hasattr(unicodedata, + 'normalize'): + + # Python source files in the utf-8 coding seem to + # normalize literals as NFC (and the above are + # explicitly NFC). Maybe this database normalizes NFD + # on reflection. + + nfc = set([unicodedata.normalize('NFC', n) for n in + names]) self.assert_(nfc == names) - # Yep. But still ensure that bulk reflection and create/drop - # work with either normalization. + + # Yep. But still ensure that bulk reflection and + # create/drop work with either normalization. r = MetaData(bind, reflect=True) r.drop_all() @@ -840,13 +899,12 @@ class SchemaTest(TestBase): def test_iteration(self): metadata = MetaData() - table1 = Table('table1', metadata, - Column('col1', sa.Integer, primary_key=True), - schema='someschema') - table2 = Table('table2', metadata, - Column('col1', sa.Integer, primary_key=True), - Column('col2', sa.Integer, sa.ForeignKey('someschema.table1.col1')), - schema='someschema') + table1 = Table('table1', metadata, Column('col1', sa.Integer, + primary_key=True), schema='someschema') + table2 = Table('table2', metadata, Column('col1', sa.Integer, + primary_key=True), Column('col2', sa.Integer, + sa.ForeignKey('someschema.table1.col1')), + schema='someschema') t1 = str(schema.CreateTable(table1).compile(bind=testing.db)) t2 = str(schema.CreateTable(table2).compile(bind=testing.db)) @@ -904,16 +962,17 @@ class HasSequenceTest(TestBase): @testing.requires.sequences def test_has_sequence(self): metadata = MetaData() - users = Table('users', metadata, - Column('user_id', sa.Integer, sa.Sequence('user_id_seq'), primary_key=True), - Column('user_name', sa.String(40)), - ) + users = Table('users', metadata, Column('user_id', sa.Integer, + sa.Sequence('user_id_seq'), primary_key=True), + Column('user_name', sa.String(40))) metadata.create_all(bind=testing.db) try: - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True) + eq_(testing.db.dialect.has_sequence(testing.db, + 'user_id_seq'), True) finally: metadata.drop_all(bind=testing.db) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + False) @testing.requires.schemas @testing.requires.sequences @@ -923,14 +982,20 @@ class HasSequenceTest(TestBase): s2 = sa.Sequence('user_id_seq') testing.db.execute(schema.CreateSequence(s1)) testing.db.execute(schema.CreateSequence(s2)) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', schema=test_schema), True) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', + schema=test_schema), True) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + True) testing.db.execute(schema.DropSequence(s1)) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', schema=test_schema), False) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', + schema=test_schema), False) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + True) testing.db.execute(schema.DropSequence(s2)) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', schema=test_schema), False) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', + schema=test_schema), False) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + False) # Tests related to engine.reflection @@ -1021,11 +1086,11 @@ class ReverseCasingReflectTest(TestBase, AssertsCompiledSQL): @testing.requires.denormalized_names def test_direct_quoting(self): m = MetaData(testing.db) - t = Table("weird_casing", m, autoload=True) - self.assert_compile( - t.select(), - 'SELECT weird_casing.col1, weird_casing."Col2", weird_casing."col3" FROM weird_casing' - ) + t = Table('weird_casing', m, autoload=True) + self.assert_compile(t.select(), + 'SELECT weird_casing.col1, ' + 'weird_casing."Col2", weird_casing."col3" ' + 'FROM weird_casing') class ComponentReflectionTest(TestBase): @@ -1087,7 +1152,7 @@ class ComponentReflectionTest(TestBase): def _test_get_columns(self, schema=None, table_type='table'): meta = MetaData(testing.db) - (users, addresses) = createTables(meta, schema) + users, addresses = createTables(meta, schema) table_names = ['users', 'email_addresses'] meta.create_all() if table_type == 'view': @@ -1095,36 +1160,41 @@ class ComponentReflectionTest(TestBase): table_names = ['users_v', 'email_addresses_v'] try: insp = Inspector(meta.bind) - for (table_name, table) in zip(table_names, (users, addresses)): + for table_name, table in zip(table_names, (users, + addresses)): schema_name = schema cols = insp.get_columns(table_name, schema=schema_name) self.assert_(len(cols) > 0, len(cols)) + # should be in order - for (i, col) in enumerate(table.columns): + + for i, col in enumerate(table.columns): eq_(col.name, cols[i]['name']) ctype = cols[i]['type'].__class__ ctype_def = col.type if isinstance(ctype_def, sa.types.TypeEngine): ctype_def = ctype_def.__class__ - + # Oracle returns Date for DateTime. - if testing.against('oracle') \ - and ctype_def in (sql_types.Date, sql_types.DateTime): - ctype_def = sql_types.Date - - # assert that the desired type and return type - # share a base within one of the generic types. - self.assert_( - len( - set( - ctype.__mro__ - ).intersection(ctype_def.__mro__) - .intersection([sql_types.Integer, sql_types.Numeric, - sql_types.DateTime, sql_types.Date, sql_types.Time, - sql_types.String, sql_types._Binary]) - ) > 0 - ,("%s(%s), %s(%s)" % (col.name, col.type, cols[i]['name'], - ctype))) + + if testing.against('oracle') and ctype_def \ + in (sql_types.Date, sql_types.DateTime): + ctype_def = sql_types.Date + + # assert that the desired type and return type share + # a base within one of the generic types. + + self.assert_(len(set(ctype.__mro__). + intersection(ctype_def.__mro__).intersection([ + sql_types.Integer, + sql_types.Numeric, + sql_types.DateTime, + sql_types.Date, + sql_types.Time, + sql_types.String, + sql_types._Binary, + ])) > 0, '%s(%s), %s(%s)' % (col.name, + col.type, cols[i]['name'], ctype)) finally: if table_type == 'view': dropViews(meta.bind, schema) |