summaryrefslogtreecommitdiff
path: root/test/engine/test_reflection.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_reflection.py')
-rw-r--r--test/engine/test_reflection.py500
1 files changed, 285 insertions, 215 deletions
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py
index be53b0549..1cbc9ab59 100644
--- a/test/engine/test_reflection.py
+++ b/test/engine/test_reflection.py
@@ -15,8 +15,10 @@ metadata, users = None, None
class ReflectionTest(TestBase, ComparesTables):
- @testing.exclude('mssql', '<', (10, 0, 0), 'Date is only supported on MSSQL 2008+')
- @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely')
+ @testing.exclude('mssql', '<', (10, 0, 0),
+ 'Date is only supported on MSSQL 2008+')
+ @testing.exclude('mysql', '<', (4, 1, 1),
+ 'early types are squirrely')
def test_basic_reflection(self):
meta = MetaData(testing.db)
@@ -39,20 +41,24 @@ class ReflectionTest(TestBase, ComparesTables):
test_needs_fk=True,
)
- addresses = Table('engine_email_addresses', meta,
- Column('address_id', sa.Integer, primary_key = True),
- Column('remote_user_id', sa.Integer, sa.ForeignKey(users.c.user_id)),
+ addresses = Table(
+ 'engine_email_addresses',
+ meta,
+ Column('address_id', sa.Integer, primary_key=True),
+ Column('remote_user_id', sa.Integer,
+ sa.ForeignKey(users.c.user_id)),
Column('email_address', sa.String(20)),
test_needs_fk=True,
- )
+ )
meta.create_all()
try:
meta2 = MetaData()
- reflected_users = Table('engine_users', meta2, autoload=True,
+ reflected_users = Table('engine_users', meta2,
+ autoload=True,
autoload_with=testing.db)
- reflected_addresses = Table('engine_email_addresses', meta2,
- autoload=True, autoload_with=testing.db)
+ reflected_addresses = Table('engine_email_addresses',
+ meta2, autoload=True, autoload_with=testing.db)
self.assert_tables_equal(users, reflected_users)
self.assert_tables_equal(addresses, reflected_addresses)
finally:
@@ -60,34 +66,33 @@ class ReflectionTest(TestBase, ComparesTables):
def test_two_foreign_keys(self):
meta = MetaData(testing.db)
- t1 = Table('t1', meta,
- Column('id', sa.Integer, primary_key=True),
- Column('t2id', sa.Integer, sa.ForeignKey('t2.id')),
- Column('t3id', sa.Integer, sa.ForeignKey('t3.id')),
- test_needs_fk=True
- )
- t2 = Table('t2', meta,
- Column('id', sa.Integer, primary_key=True),
- test_needs_fk=True
- )
- t3 = Table('t3', meta,
- Column('id', sa.Integer, primary_key=True),
- test_needs_fk=True
- )
+ t1 = Table(
+ 't1',
+ meta,
+ Column('id', sa.Integer, primary_key=True),
+ Column('t2id', sa.Integer, sa.ForeignKey('t2.id')),
+ Column('t3id', sa.Integer, sa.ForeignKey('t3.id')),
+ test_needs_fk=True,
+ )
+ t2 = Table('t2', meta, Column('id', sa.Integer,
+ primary_key=True), test_needs_fk=True)
+ t3 = Table('t3', meta, Column('id', sa.Integer,
+ primary_key=True), test_needs_fk=True)
meta.create_all()
try:
meta2 = MetaData()
- t1r, t2r, t3r = [Table(x, meta2, autoload=True, autoload_with=testing.db) for x in ('t1', 't2', 't3')]
-
+ t1r, t2r, t3r = [Table(x, meta2, autoload=True,
+ autoload_with=testing.db) for x in ('t1',
+ 't2', 't3')]
assert t1r.c.t2id.references(t2r.c.id)
assert t1r.c.t3id.references(t3r.c.id)
-
finally:
meta.drop_all()
def test_nonexistent(self):
meta = MetaData(testing.db)
- assert_raises(sa.exc.NoSuchTableError, Table, "nonexistent", meta, autoload=True)
+ assert_raises(sa.exc.NoSuchTableError, Table, 'nonexistent',
+ meta, autoload=True)
def test_include_columns(self):
meta = MetaData(testing.db)
@@ -156,7 +161,8 @@ class ReflectionTest(TestBase, ComparesTables):
Column('data', sa.String(50)),
)
t2 = Table('test2', meta,
- Column('id', sa.Integer, sa.ForeignKey('test.id'), primary_key=True),
+ Column('id', sa.Integer, sa.ForeignKey('test.id'),
+ primary_key=True),
Column('id2', sa.Integer, primary_key=True),
Column('data', sa.String(50)),
)
@@ -218,8 +224,9 @@ class ReflectionTest(TestBase, ComparesTables):
table.drop()
def test_override_pkfk(self):
- """test that you can override columns which contain foreign keys to other reflected tables,
- where the foreign key column is also a primary key column"""
+ """test that you can override columns which contain foreign keys
+ to other reflected tables, where the foreign key column is also
+ a primary key column"""
meta = MetaData(testing.db)
users = Table('users', meta,
@@ -258,8 +265,9 @@ class ReflectionTest(TestBase, ComparesTables):
meta.drop_all()
def test_override_nonexistent_fk(self):
- """test that you can override columns and create new foreign keys to other reflected tables
- which have no foreign keys. this is common with MySQL MyISAM tables."""
+ """test that you can override columns and create new foreign
+ keys to other reflected tables which have no foreign keys. this
+ is common with MySQL MyISAM tables."""
meta = MetaData(testing.db)
users = Table('users', meta,
@@ -273,38 +281,44 @@ class ReflectionTest(TestBase, ComparesTables):
meta.create_all()
try:
meta2 = MetaData(testing.db)
- a2 = Table('addresses', meta2,
- Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
- autoload=True)
+ a2 = Table('addresses', meta2,
+ Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
+ autoload=True)
u2 = Table('users', meta2, autoload=True)
-
assert len(a2.c.user_id.foreign_keys) == 1
assert len(a2.foreign_keys) == 1
assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
- assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
- assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
- assert u2.join(a2).onclause.compare(u2.c.id==a2.c.user_id)
-
+ assert [c.parent for c in a2.c.user_id.foreign_keys] \
+ == [a2.c.user_id]
+ assert list(a2.c.user_id.foreign_keys)[0].parent \
+ is a2.c.user_id
+ assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
meta3 = MetaData(testing.db)
+
u3 = Table('users', meta3, autoload=True)
- a3 = Table('addresses', meta3,
- Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
- autoload=True)
- assert u3.join(a3).onclause.compare(u3.c.id==a3.c.user_id)
+ a3 = Table('addresses', meta3, Column('user_id',
+ sa.Integer, sa.ForeignKey('users.id')),
+ autoload=True)
+ assert u3.join(a3).onclause.compare(u3.c.id == a3.c.user_id)
meta4 = MetaData(testing.db)
- u4 = Table('users', meta4,
- Column('id', sa.Integer, key='u_id', primary_key=True),
- autoload=True)
- a4 = Table('addresses', meta4,
- Column('id', sa.Integer, key='street', primary_key=True),
- Column('street', sa.String(30), key='user_id'),
- Column('user_id', sa.Integer, sa.ForeignKey('users.u_id'),
- key='id'),
- autoload=True)
- assert u4.join(a4).onclause.compare(u4.c.u_id==a4.c.id)
+ u4 = Table('users', meta4,
+ Column('id', sa.Integer, key='u_id', primary_key=True),
+ autoload=True)
+
+ a4 = Table(
+ 'addresses',
+ meta4,
+ Column('id', sa.Integer, key='street',
+ primary_key=True),
+ Column('street', sa.String(30), key='user_id'),
+ Column('user_id', sa.Integer, sa.ForeignKey('users.u_id'
+ ), key='id'),
+ autoload=True,
+ )
+ assert u4.join(a4).onclause.compare(u4.c.u_id == a4.c.id)
assert list(u4.primary_key) == [u4.c.u_id]
assert len(u4.columns) == 2
assert len(u4.constraints) == 1
@@ -331,10 +345,11 @@ class ReflectionTest(TestBase, ComparesTables):
meta.create_all()
try:
m2 = MetaData(testing.db)
- a2 = Table('a', m2, Column('x', sa.Integer, primary_key=True, key='x1'), autoload=True)
+ a2 = Table('a', m2,
+ Column('x', sa.Integer, primary_key=True, key='x1'),
+ autoload=True)
b2 = Table('b', m2, autoload=True)
-
- assert a2.join(b2).onclause.compare(a2.c.x1==b2.c.y)
+ assert a2.join(b2).onclause.compare(a2.c.x1 == b2.c.y)
assert b2.c.y.references(a2.c.x1)
finally:
meta.drop_all()
@@ -368,9 +383,9 @@ class ReflectionTest(TestBase, ComparesTables):
@testing.exclude('mysql', '<', (4, 1, 1), 'innodb funkiness')
def test_override_existing_fk(self):
- """test that you can override columns and specify new foreign keys to other reflected tables,
- on columns which *do* already have that foreign key, and that the FK is not duped.
- """
+ """test that you can override columns and specify new foreign
+ keys to other reflected tables, on columns which *do* already
+ have that foreign key, and that the FK is not duped. """
meta = MetaData(testing.db)
users = Table('users', meta,
@@ -385,78 +400,88 @@ class ReflectionTest(TestBase, ComparesTables):
meta.create_all()
try:
meta2 = MetaData(testing.db)
- a2 = Table('addresses', meta2,
- Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
- autoload=True)
+ a2 = Table('addresses', meta2,
+ Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
+ autoload=True)
u2 = Table('users', meta2, autoload=True)
-
s = sa.select([a2])
+
assert s.c.user_id is not None
assert len(a2.foreign_keys) == 1
assert len(a2.c.user_id.foreign_keys) == 1
assert len(a2.constraints) == 2
assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
- assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
- assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
- assert u2.join(a2).onclause.compare(u2.c.id==a2.c.user_id)
-
+ assert [c.parent for c in a2.c.user_id.foreign_keys] \
+ == [a2.c.user_id]
+ assert list(a2.c.user_id.foreign_keys)[0].parent \
+ is a2.c.user_id
+ assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
+
meta2 = MetaData(testing.db)
- u2 = Table('users', meta2,
- Column('id', sa.Integer, primary_key=True),
- autoload=True)
- a2 = Table('addresses', meta2,
- Column('id', sa.Integer, primary_key=True),
- Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
- autoload=True)
-
+ u2 = Table('users', meta2, Column('id', sa.Integer,
+ primary_key=True), autoload=True)
+ a2 = Table('addresses', meta2, Column('id', sa.Integer,
+ primary_key=True), Column('user_id', sa.Integer,
+ sa.ForeignKey('users.id')), autoload=True)
s = sa.select([a2])
+
assert s.c.user_id is not None
assert len(a2.foreign_keys) == 1
assert len(a2.c.user_id.foreign_keys) == 1
assert len(a2.constraints) == 2
assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
- assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
- assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
- assert u2.join(a2).onclause.compare(u2.c.id==a2.c.user_id)
+ assert [c.parent for c in a2.c.user_id.foreign_keys] \
+ == [a2.c.user_id]
+ assert list(a2.c.user_id.foreign_keys)[0].parent \
+ is a2.c.user_id
+ assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
+
finally:
meta.drop_all()
@testing.exclude('mysql', '<', (4, 1, 1), 'innodb funkiness')
def test_use_existing(self):
meta = MetaData(testing.db)
- users = Table('users', meta,
+ users = Table('users', meta,
+ Column('id', sa.Integer, primary_key=True),
+ Column('name', sa.String(30)),
+ test_needs_fk=True)
+ addresses = Table(
+ 'addresses',
+ meta,
Column('id', sa.Integer, primary_key=True),
- Column('name', sa.String(30)),
- test_needs_fk=True)
- addresses = Table('addresses', meta,
- Column('id', sa.Integer,primary_key=True),
Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
Column('data', sa.String(100)),
- test_needs_fk=True)
-
+ test_needs_fk=True,
+ )
meta.create_all()
try:
meta2 = MetaData(testing.db)
- addresses = Table('addresses', meta2, Column('data', sa.Unicode), autoload=True)
+ addresses = Table('addresses', meta2, Column('data',
+ sa.Unicode), autoload=True)
try:
- users = Table('users', meta2, Column('name', sa.Unicode), autoload=True)
+ users = Table('users', meta2, Column('name',
+ sa.Unicode), autoload=True)
assert False
except sa.exc.InvalidRequestError, err:
- assert str(err) == "Table 'users' is already defined for this MetaData instance. Specify 'useexisting=True' to redefine options and columns on an existing Table object."
-
- users = Table('users', meta2, Column('name', sa.Unicode), autoload=True, useexisting=True)
+ assert str(err) \
+ == "Table 'users' is already defined for this "\
+ "MetaData instance. Specify 'useexisting=True' "\
+ "to redefine options and columns on an existing "\
+ "Table object."
+ users = Table('users', meta2, Column('name', sa.Unicode),
+ autoload=True, useexisting=True)
assert isinstance(users.c.name.type, sa.Unicode)
-
assert not users.quote
-
- users = Table('users', meta2, quote=True, autoload=True, useexisting=True)
+ users = Table('users', meta2, quote=True, autoload=True,
+ useexisting=True)
assert users.quote
-
finally:
meta.drop_all()
def test_pks_not_uniques(self):
- """test that primary key reflection not tripped up by unique indexes"""
+ """test that primary key reflection not tripped up by unique
+ indexes"""
testing.db.execute("""
CREATE TABLE book (
@@ -484,9 +509,10 @@ class ReflectionTest(TestBase, ComparesTables):
Column('slot', sa.String(128)),
)
- assert_raises_message(sa.exc.InvalidRequestError,
- "Could not find table 'pkgs' with which to generate a foreign key",
- metadata.create_all)
+ assert_raises_message(sa.exc.InvalidRequestError,
+ "Could not find table 'pkgs' with which "
+ "to generate a foreign key",
+ metadata.create_all)
def test_composite_pks(self):
"""test reflection of a composite primary key"""
@@ -531,58 +557,67 @@ class ReflectionTest(TestBase, ComparesTables):
Column('bar', sa.Integer),
Column('lala', sa.Integer),
Column('data', sa.String(50)),
- sa.ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']),
+ sa.ForeignKeyConstraint(['foo', 'bar', 'lala'],
+ ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho'
+ ]),
test_needs_fk=True,
)
meta.create_all()
try:
meta2 = MetaData()
- table = Table('multi', meta2, autoload=True, autoload_with=testing.db)
- table2 = Table('multi2', meta2, autoload=True, autoload_with=testing.db)
+ table = Table('multi', meta2, autoload=True,
+ autoload_with=testing.db)
+ table2 = Table('multi2', meta2, autoload=True,
+ autoload_with=testing.db)
self.assert_tables_equal(multi, table)
self.assert_tables_equal(multi2, table2)
j = sa.join(table, table2)
- self.assert_(sa.and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause))
+ self.assert_(sa.and_(table.c.multi_id == table2.c.foo,
+ table.c.multi_rev == table2.c.bar,
+ table.c.multi_hoho
+ == table2.c.lala).compare(j.onclause))
finally:
meta.drop_all()
@testing.crashes('oracle', 'FIXME: unknown, confirm not fails_on')
def test_reserved(self):
- # check a table that uses an SQL reserved name doesn't cause an error
+
+ # check a table that uses an SQL reserved name doesn't cause an
+ # error
+
meta = MetaData(testing.db)
- table_a = Table('select', meta,
- Column('not', sa.Integer, primary_key=True),
- Column('from', sa.String(12), nullable=False),
- sa.UniqueConstraint('from', name='when'))
+ table_a = Table('select', meta, Column('not', sa.Integer,
+ primary_key=True), Column('from',
+ sa.String(12), nullable=False),
+ sa.UniqueConstraint('from', name='when'))
sa.Index('where', table_a.c['from'])
- # There's currently no way to calculate identifier case normalization
- # in isolation, so...
+ # There's currently no way to calculate identifier case
+ # normalization in isolation, so...
+
if testing.against('firebird', 'oracle', 'maxdb'):
check_col = 'TRUE'
else:
check_col = 'true'
quoter = meta.bind.dialect.identifier_preparer.quote_identifier
-
- table_b = Table('false', meta,
- Column('create', sa.Integer, primary_key=True),
- Column('true', sa.Integer, sa.ForeignKey('select.not')),
- sa.CheckConstraint('%s <> 1' % quoter(check_col),
- name='limit'))
-
- table_c = Table('is', meta,
- Column('or', sa.Integer, nullable=False, primary_key=True),
- Column('join', sa.Integer, nullable=False, primary_key=True),
- sa.PrimaryKeyConstraint('or', 'join', name='to'))
-
+
+ table_b = Table('false', meta,
+ Column('create', sa.Integer, primary_key=True),
+ Column('true', sa.Integer,sa.ForeignKey('select.not')),
+ sa.CheckConstraint('%s <> 1'
+ % quoter(check_col), name='limit')
+ )
+
+ table_c = Table('is', meta,
+ Column('or', sa.Integer, nullable=False, primary_key=True),
+ Column('join', sa.Integer, nullable=False, primary_key=True),
+ sa.PrimaryKeyConstraint('or', 'join', name='to')
+ )
index_c = sa.Index('else', table_c.c.join)
-
meta.create_all()
-
index_c.drop()
-
meta2 = MetaData(testing.db)
try:
table_a2 = Table('select', meta2, autoload=True)
@@ -643,9 +678,9 @@ class ReflectionTest(TestBase, ComparesTables):
m8 = MetaData(reflect=True)
self.assert_(False)
except sa.exc.ArgumentError, e:
- self.assert_(
- e.args[0] ==
- "A bind must be supplied in conjunction with reflect=True")
+ self.assert_(e.args[0]
+ == 'A bind must be supplied in '
+ 'conjunction with reflect=True')
finally:
baseline.drop_all()
@@ -710,33 +745,40 @@ class ReflectionTest(TestBase, ComparesTables):
meta.drop_all()
class CreateDropTest(TestBase):
+
@classmethod
def setup_class(cls):
global metadata, users
metadata = MetaData()
- users = Table('users', metadata,
- Column('user_id', sa.Integer, sa.Sequence('user_id_seq', optional=True), primary_key=True),
- Column('user_name', sa.String(40)),
- )
-
+ users = Table('users', metadata,
+ Column('user_id', sa.Integer,
+ sa.Sequence('user_id_seq', optional=True),
+ primary_key=True),
+ Column('user_name',sa.String(40)))
+
addresses = Table('email_addresses', metadata,
- Column('address_id', sa.Integer, sa.Sequence('address_id_seq', optional=True), primary_key = True),
- Column('user_id', sa.Integer, sa.ForeignKey(users.c.user_id)),
- Column('email_address', sa.String(40)),
- )
-
- orders = Table('orders', metadata,
- Column('order_id', sa.Integer, sa.Sequence('order_id_seq', optional=True), primary_key = True),
- Column('user_id', sa.Integer, sa.ForeignKey(users.c.user_id)),
+ Column('address_id', sa.Integer,
+ sa.Sequence('address_id_seq', optional=True),
+ primary_key=True),
+ Column('user_id',
+ sa.Integer, sa.ForeignKey(users.c.user_id)),
+ Column('email_address', sa.String(40)))
+
+ orders = Table(
+ 'orders',
+ metadata,
+ Column('order_id', sa.Integer, sa.Sequence('order_id_seq',
+ optional=True), primary_key=True),
+ Column('user_id', sa.Integer,
+ sa.ForeignKey(users.c.user_id)),
Column('description', sa.String(50)),
Column('isopen', sa.Integer),
- )
-
- orderitems = Table('items', metadata,
- Column('item_id', sa.INT, sa.Sequence('items_id_seq', optional=True), primary_key = True),
- Column('order_id', sa.INT, sa.ForeignKey("orders")),
- Column('item_name', sa.VARCHAR(50)),
- )
+ )
+ orderitems = Table('items', metadata, Column('item_id', sa.INT,
+ sa.Sequence('items_id_seq', optional=True),
+ primary_key=True), Column('order_id',
+ sa.INT, sa.ForeignKey('orders')),
+ Column('item_name', sa.VARCHAR(50)))
def test_sorter( self ):
tables = metadata.sorted_tables
@@ -777,10 +819,14 @@ class CreateDropTest(TestBase):
def test_tablenames(self):
metadata.create_all(bind=testing.db)
- # we only check to see if all the explicitly created tables are there, rather than
- # assertEqual -- the test db could have "extra" tables if there is a misconfigured
- # template. (*cough* tsearch2 w/ the pg windows installer.)
- self.assert_(not set(metadata.tables) - set(testing.db.table_names()))
+
+ # we only check to see if all the explicitly created tables are
+ # there, rather than assertEqual -- the test db could have
+ # "extra" tables if there is a misconfigured template. (*cough*
+ # tsearch2 w/ the pg windows installer.)
+
+ self.assert_(not set(metadata.tables)
+ - set(testing.db.table_names()))
metadata.drop_all(bind=testing.db)
class SchemaManipulationTest(TestBase):
@@ -788,7 +834,9 @@ class SchemaManipulationTest(TestBase):
meta = MetaData()
users = Table('users', meta, Column('id', sa.Integer))
- addresses = Table('addresses', meta, Column('id', sa.Integer), Column('user_id', sa.Integer))
+ addresses = Table('addresses', meta,
+ Column('id', sa.Integer),
+ Column('user_id', sa.Integer))
fk = sa.ForeignKeyConstraint(['user_id'],[users.c.id])
@@ -798,35 +846,46 @@ class SchemaManipulationTest(TestBase):
assert addresses.constraints == set([addresses.primary_key, fk])
class UnicodeReflectionTest(TestBase):
+
@testing.requires.unicode_connections
def test_basic(self):
try:
- # the 'convert_unicode' should not get in the way of the reflection
- # process. reflecttable for oracle, postgresql (others?) expect non-unicode
- # strings in result sets/bind params
- bind = engines.utf8_engine(options={'convert_unicode':True})
- metadata = MetaData(bind)
+ # the 'convert_unicode' should not get in the way of the
+ # reflection process. reflecttable for oracle, postgresql
+ # (others?) expect non-unicode strings in result sets/bind
+ # params
+
+ bind = engines.utf8_engine(options={'convert_unicode'
+ : True})
+ metadata = MetaData(bind)
if testing.against('sybase', 'maxdb', 'oracle', 'mssql'):
names = set([u'plain'])
else:
- names = set([u'plain', u'Unit\u00e9ble', u'\u6e2c\u8a66'])
-
+ names = set([u'plain', u'Unit\u00e9ble', u'\u6e2c\u8a66'
+ ])
for name in names:
- Table(name, metadata, Column('id', sa.Integer, sa.Sequence(name + "_id_seq"),
- primary_key=True))
+ Table(name, metadata, Column('id', sa.Integer,
+ sa.Sequence(name + '_id_seq'), primary_key=True))
metadata.create_all()
-
reflected = set(bind.table_names())
+
# Jython 2.5 on Java 5 lacks unicodedata.normalize
- if not names.issubset(reflected) and hasattr(unicodedata, 'normalize'):
- # Python source files in the utf-8 coding seem to normalize
- # literals as NFC (and the above are explicitly NFC). Maybe
- # this database normalizes NFD on reflection.
- nfc = set([unicodedata.normalize('NFC', n) for n in names])
+
+ if not names.issubset(reflected) and hasattr(unicodedata,
+ 'normalize'):
+
+ # Python source files in the utf-8 coding seem to
+ # normalize literals as NFC (and the above are
+ # explicitly NFC). Maybe this database normalizes NFD
+ # on reflection.
+
+ nfc = set([unicodedata.normalize('NFC', n) for n in
+ names])
self.assert_(nfc == names)
- # Yep. But still ensure that bulk reflection and create/drop
- # work with either normalization.
+
+ # Yep. But still ensure that bulk reflection and
+ # create/drop work with either normalization.
r = MetaData(bind, reflect=True)
r.drop_all()
@@ -840,13 +899,12 @@ class SchemaTest(TestBase):
def test_iteration(self):
metadata = MetaData()
- table1 = Table('table1', metadata,
- Column('col1', sa.Integer, primary_key=True),
- schema='someschema')
- table2 = Table('table2', metadata,
- Column('col1', sa.Integer, primary_key=True),
- Column('col2', sa.Integer, sa.ForeignKey('someschema.table1.col1')),
- schema='someschema')
+ table1 = Table('table1', metadata, Column('col1', sa.Integer,
+ primary_key=True), schema='someschema')
+ table2 = Table('table2', metadata, Column('col1', sa.Integer,
+ primary_key=True), Column('col2', sa.Integer,
+ sa.ForeignKey('someschema.table1.col1')),
+ schema='someschema')
t1 = str(schema.CreateTable(table1).compile(bind=testing.db))
t2 = str(schema.CreateTable(table2).compile(bind=testing.db))
@@ -904,16 +962,17 @@ class HasSequenceTest(TestBase):
@testing.requires.sequences
def test_has_sequence(self):
metadata = MetaData()
- users = Table('users', metadata,
- Column('user_id', sa.Integer, sa.Sequence('user_id_seq'), primary_key=True),
- Column('user_name', sa.String(40)),
- )
+ users = Table('users', metadata, Column('user_id', sa.Integer,
+ sa.Sequence('user_id_seq'), primary_key=True),
+ Column('user_name', sa.String(40)))
metadata.create_all(bind=testing.db)
try:
- eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True)
+ eq_(testing.db.dialect.has_sequence(testing.db,
+ 'user_id_seq'), True)
finally:
metadata.drop_all(bind=testing.db)
- eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False)
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
+ False)
@testing.requires.schemas
@testing.requires.sequences
@@ -923,14 +982,20 @@ class HasSequenceTest(TestBase):
s2 = sa.Sequence('user_id_seq')
testing.db.execute(schema.CreateSequence(s1))
testing.db.execute(schema.CreateSequence(s2))
- eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', schema=test_schema), True)
- eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True)
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
+ schema=test_schema), True)
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
+ True)
testing.db.execute(schema.DropSequence(s1))
- eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', schema=test_schema), False)
- eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True)
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
+ schema=test_schema), False)
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
+ True)
testing.db.execute(schema.DropSequence(s2))
- eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', schema=test_schema), False)
- eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False)
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
+ schema=test_schema), False)
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
+ False)
# Tests related to engine.reflection
@@ -1021,11 +1086,11 @@ class ReverseCasingReflectTest(TestBase, AssertsCompiledSQL):
@testing.requires.denormalized_names
def test_direct_quoting(self):
m = MetaData(testing.db)
- t = Table("weird_casing", m, autoload=True)
- self.assert_compile(
- t.select(),
- 'SELECT weird_casing.col1, weird_casing."Col2", weird_casing."col3" FROM weird_casing'
- )
+ t = Table('weird_casing', m, autoload=True)
+ self.assert_compile(t.select(),
+ 'SELECT weird_casing.col1, '
+ 'weird_casing."Col2", weird_casing."col3" '
+ 'FROM weird_casing')
class ComponentReflectionTest(TestBase):
@@ -1087,7 +1152,7 @@ class ComponentReflectionTest(TestBase):
def _test_get_columns(self, schema=None, table_type='table'):
meta = MetaData(testing.db)
- (users, addresses) = createTables(meta, schema)
+ users, addresses = createTables(meta, schema)
table_names = ['users', 'email_addresses']
meta.create_all()
if table_type == 'view':
@@ -1095,36 +1160,41 @@ class ComponentReflectionTest(TestBase):
table_names = ['users_v', 'email_addresses_v']
try:
insp = Inspector(meta.bind)
- for (table_name, table) in zip(table_names, (users, addresses)):
+ for table_name, table in zip(table_names, (users,
+ addresses)):
schema_name = schema
cols = insp.get_columns(table_name, schema=schema_name)
self.assert_(len(cols) > 0, len(cols))
+
# should be in order
- for (i, col) in enumerate(table.columns):
+
+ for i, col in enumerate(table.columns):
eq_(col.name, cols[i]['name'])
ctype = cols[i]['type'].__class__
ctype_def = col.type
if isinstance(ctype_def, sa.types.TypeEngine):
ctype_def = ctype_def.__class__
-
+
# Oracle returns Date for DateTime.
- if testing.against('oracle') \
- and ctype_def in (sql_types.Date, sql_types.DateTime):
- ctype_def = sql_types.Date
-
- # assert that the desired type and return type
- # share a base within one of the generic types.
- self.assert_(
- len(
- set(
- ctype.__mro__
- ).intersection(ctype_def.__mro__)
- .intersection([sql_types.Integer, sql_types.Numeric,
- sql_types.DateTime, sql_types.Date, sql_types.Time,
- sql_types.String, sql_types._Binary])
- ) > 0
- ,("%s(%s), %s(%s)" % (col.name, col.type, cols[i]['name'],
- ctype)))
+
+ if testing.against('oracle') and ctype_def \
+ in (sql_types.Date, sql_types.DateTime):
+ ctype_def = sql_types.Date
+
+ # assert that the desired type and return type share
+ # a base within one of the generic types.
+
+ self.assert_(len(set(ctype.__mro__).
+ intersection(ctype_def.__mro__).intersection([
+ sql_types.Integer,
+ sql_types.Numeric,
+ sql_types.DateTime,
+ sql_types.Date,
+ sql_types.Time,
+ sql_types.String,
+ sql_types._Binary,
+ ])) > 0, '%s(%s), %s(%s)' % (col.name,
+ col.type, cols[i]['name'], ctype))
finally:
if table_type == 'view':
dropViews(meta.bind, schema)