diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2008-02-10 23:39:09 +0000 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2008-02-10 23:39:09 +0000 |
commit | 90c572b513fb33cb5cd17134efb6018abc76bb1f (patch) | |
tree | cecf7e9a4a8550f84232bb6915e4d9b8debe0c00 | |
parent | 1ddf4af3552cf28739fb8ca1be7e521767cc034c (diff) | |
download | sqlalchemy-90c572b513fb33cb5cd17134efb6018abc76bb1f.tar.gz |
- Table columns and constraints can be overridden on a
an existing table (such as a table that was already
reflected) using the 'useexisting=True' flag, which now
takes into account the arguments passed along with it.
- fixed one element of [ticket:910]
- refactored reflection test
-rw-r--r-- | CHANGES | 5 | ||||
-rw-r--r-- | lib/sqlalchemy/schema.py | 70 | ||||
-rw-r--r-- | test/engine/metadata.py | 85 | ||||
-rw-r--r-- | test/engine/reflection.py | 443 | ||||
-rw-r--r-- | test/testlib/__init__.py | 4 | ||||
-rw-r--r-- | test/testlib/testing.py | 41 |
6 files changed, 345 insertions, 303 deletions
@@ -10,6 +10,11 @@ CHANGES MetaData instances and executed automatically when those objects are created and/or dropped. + - Table columns and constraints can be overridden on a + an existing table (such as a table that was already + reflected) using the 'useexisting=True' flag, which now + takes into account the arguments passed along with it. + - Added a callable-based DDL events interface, adds hooks before and after Tables and MetaData create and drop. diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py index 83f282b24..3a2ff97a7 100644 --- a/lib/sqlalchemy/schema.py +++ b/lib/sqlalchemy/schema.py @@ -93,13 +93,14 @@ class _TableSingleton(expression._FigureVisitName): key = _get_table_key(name, schema) try: table = metadata.tables[key] - if args: - if not useexisting: - raise exceptions.ArgumentError("Table '%s' is already defined for this MetaData instance." % key) + if not useexisting and table._cant_override(*args, **kwargs): + raise exceptions.InvalidRequestError("Table '%s' is already defined for this MetaData instance. Specify 'useexisting=True' to redefine options and columns on an existing Table object." % key) + else: + table._init_existing(*args, **kwargs) return table except KeyError: if mustexist: - raise exceptions.ArgumentError("Table '%s' not defined" % (key)) + raise exceptions.InvalidRequestError("Table '%s' not defined" % (key)) try: return type.__call__(self, name, metadata, *args, **kwargs) except: @@ -201,26 +202,16 @@ class Table(SchemaItem, expression.TableClause): self.primary_key = PrimaryKeyConstraint() self._foreign_keys = util.OrderedSet() self.ddl_listeners = util.defaultdict(list) - self.quote = kwargs.pop('quote', False) - self.quote_schema = kwargs.pop('quote_schema', False) + self.kwargs = {} if self.schema is not None: self.fullname = "%s.%s" % (self.schema, self.name) else: self.fullname = self.name - self.owner = kwargs.pop('owner', None) - if kwargs.get('info'): - self._info = kwargs.pop('info') autoload = kwargs.pop('autoload', False) autoload_with = kwargs.pop('autoload_with', None) include_columns = kwargs.pop('include_columns', None) - # validate remaining kwargs that they all specify DB prefixes - if len([k for k in kwargs if not re.match(r'^(?:%s)_' % '|'.join(databases.__all__), k)]): - raise TypeError("Invalid argument(s) for Table: %s" % repr(kwargs.keys())) - - self.kwargs = kwargs - self._set_parent(metadata) # load column definitions from the database if 'autoload' is defined # we do it after the table is in the singleton dictionary to support @@ -233,8 +224,44 @@ class Table(SchemaItem, expression.TableClause): # initialize all the column, etc. objects. done after # reflection to allow user-overrides - self._init_items(*args) + self.__post_init(*args, **kwargs) + + def _init_existing(self, *args, **kwargs): + autoload = kwargs.pop('autoload', False) + autoload_with = kwargs.pop('autoload_with', None) + schema = kwargs.pop('schema', None) + if schema and schema != self.schema: + raise exceptions.ArgumentError("Can't change schema of existing table from '%s' to '%s'", (self.schema, schema)) + + include_columns = kwargs.pop('include_columns', None) + if include_columns: + for c in self.c: + if c.name not in include_columns: + self.c.remove(c) + self.__post_init(*args, **kwargs) + + def _cant_override(self, *args, **kwargs): + """return True if the given arguments cannot be sent to a table that already exists. + + the 'useexisting' flag overrides this. + """ + + return bool(args) or bool(util.Set(kwargs).difference(['autoload', 'autoload_with', 'schema'])) + + def __post_init(self, *args, **kwargs): + self.quote = kwargs.pop('quote', False) + self.quote_schema = kwargs.pop('quote_schema', False) + self.owner = kwargs.pop('owner', None) + if kwargs.get('info'): + self._info = kwargs.pop('info') + # validate remaining kwargs that they all specify DB prefixes + if len([k for k in kwargs if not re.match(r'^(?:%s)_' % '|'.join(databases.__all__), k)]): + raise TypeError("Invalid argument(s) for Table: %s" % repr(kwargs.keys())) + self.kwargs.update(kwargs) + + self._init_items(*args) + def key(self): return _get_table_key(self.name, self.schema) key = property(key) @@ -916,9 +943,10 @@ class ForeignKeyConstraint(Constraint): def _set_parent(self, table): self.table = table - table.constraints.add(self) - for (c, r) in zip(self.__colnames, self.__refcolnames): - self.append_element(c,r) + if self not in table.constraints: + table.constraints.add(self) + for (c, r) in zip(self.__colnames, self.__refcolnames): + self.append_element(c,r) def append_element(self, col, refcol): fk = ForeignKey(refcol, constraint=self, name=self.name, onupdate=self.onupdate, ondelete=self.ondelete, use_alter=self.use_alter) @@ -948,8 +976,8 @@ class PrimaryKeyConstraint(Constraint): def _set_parent(self, table): self.table = table table.primary_key = self - for c in self.__colnames: - self.add(table.c[c]) + for name in self.__colnames: + self.add(table.c[name]) def add(self, col): self.columns.add(col) diff --git a/test/engine/metadata.py b/test/engine/metadata.py index 923f0334e..dd4ee51d3 100644 --- a/test/engine/metadata.py +++ b/test/engine/metadata.py @@ -2,8 +2,9 @@ import testenv; testenv.configure_for_tests() from sqlalchemy import * from sqlalchemy import exceptions from testlib import * +import pickle -class MetaDataTest(PersistTest): +class MetaDataTest(PersistTest, ComparesTables): def test_metadata_connect(self): metadata = MetaData() t1 = Table('table1', metadata, Column('col1', Integer, primary_key=True), @@ -29,10 +30,88 @@ class MetaDataTest(PersistTest): t2 = Table('table1', metadata, Column('col1', Integer, primary_key=True), Column('col2', String(20))) assert False - except exceptions.ArgumentError, e: - assert str(e) == "Table 'table1' is already defined for this MetaData instance." + except exceptions.InvalidRequestError, e: + assert str(e) == "Table 'table1' is already defined for this MetaData instance. Specify 'useexisting=True' to redefine options and columns on an existing Table object." finally: metadata.drop_all() + @testing.exclude('mysql', '<', (4, 1, 1)) + def test_to_metadata(self): + meta = MetaData() + + table = Table('mytable', meta, + Column('myid', Integer, primary_key=True), + Column('name', String(40), nullable=False), + Column('description', String(30), CheckConstraint("description='hi'")), + UniqueConstraint('name'), + test_needs_fk=True, + ) + + table2 = Table('othertable', meta, + Column('id', Integer, primary_key=True), + Column('myid', Integer, ForeignKey('mytable.myid')), + test_needs_fk=True, + ) + + def test_to_metadata(): + meta2 = MetaData() + table_c = table.tometadata(meta2) + table2_c = table2.tometadata(meta2) + return (table_c, table2_c) + + def test_pickle(): + meta.bind = testing.db + meta2 = pickle.loads(pickle.dumps(meta)) + assert meta2.bind is None + meta3 = pickle.loads(pickle.dumps(meta2)) + return (meta2.tables['mytable'], meta2.tables['othertable']) + + def test_pickle_via_reflect(): + # this is the most common use case, pickling the results of a + # database reflection + meta2 = MetaData(bind=testing.db) + t1 = Table('mytable', meta2, autoload=True) + t2 = Table('othertable', meta2, autoload=True) + meta3 = pickle.loads(pickle.dumps(meta2)) + assert meta3.bind is None + assert meta3.tables['mytable'] is not t1 + return (meta3.tables['mytable'], meta3.tables['othertable']) + + meta.create_all(testing.db) + try: + for test, has_constraints in ((test_to_metadata, True), (test_pickle, True), (test_pickle_via_reflect, False)): + table_c, table2_c = test() + self.assert_tables_equal(table, table_c) + self.assert_tables_equal(table2, table2_c) + + assert table is not table_c + assert table.primary_key is not table_c.primary_key + assert list(table2_c.c.myid.foreign_keys)[0].column is table_c.c.myid + assert list(table2_c.c.myid.foreign_keys)[0].column is not table.c.myid + + # constraints dont get reflected for any dialect right now + if has_constraints: + for c in table_c.c.description.constraints: + if isinstance(c, CheckConstraint): + break + else: + assert False + assert c.sqltext=="description='hi'" + + for c in table_c.constraints: + if isinstance(c, UniqueConstraint): + break + else: + assert False + assert c.columns.contains_column(table_c.c.name) + assert not c.columns.contains_column(table.c.name) + finally: + meta.drop_all(testing.db) + + def test_nonexistent(self): + self.assertRaises(exceptions.NoSuchTableError, Table, + 'fake_table', + MetaData(testing.db), autoload=True) + if __name__ == '__main__': testenv.main() diff --git a/test/engine/reflection.py b/test/engine/reflection.py index 2bbbea1aa..3aa6630eb 100644 --- a/test/engine/reflection.py +++ b/test/engine/reflection.py @@ -1,5 +1,5 @@ import testenv; testenv.configure_for_tests() -import pickle, StringIO, unicodedata +import StringIO, unicodedata from sqlalchemy import * from sqlalchemy import exceptions from sqlalchemy import types as sqltypes @@ -7,54 +7,25 @@ from testlib import * from testlib import engines -class ReflectionTest(PersistTest): +class ReflectionTest(PersistTest, ComparesTables): @testing.exclude('mysql', '<', (4, 1, 1)) - def testbasic(self): - use_function_defaults = testing.against('postgres', 'oracle', 'maxdb') - - use_string_defaults = (use_function_defaults or - testing.db.engine.__module__.endswith('sqlite')) - - if use_function_defaults: - defval = func.current_date() - deftype = Date - else: - defval = "3" - deftype = Integer - - if use_string_defaults: - deftype2 = Text - defval2 = "im a default" - deftype3 = Date - if testing.against('oracle'): - defval3 = text("to_date('09-09-1999', 'MM-DD-YYYY')") - elif testing.against('maxdb'): - defval3 = '19990909' - else: - defval3 = '1999-09-09' - else: - deftype2, deftype3 = Integer, Integer - defval2, defval3 = "15", "16" - + def test_basic_reflection(self): meta = MetaData(testing.db) users = Table('engine_users', meta, - Column('user_id', INT, primary_key = True), - Column('user_name', VARCHAR(20), nullable = False), - Column('test1', CHAR(5), nullable = False), - Column('test2', FLOAT(5), nullable = False), + Column('user_id', INT, primary_key=True), + Column('user_name', VARCHAR(20), nullable=False), + Column('test1', CHAR(5), nullable=False), + Column('test2', Float(5), nullable=False), Column('test3', Text), - Column('test4', DECIMAL, nullable = False), - Column('test5', TIMESTAMP), + Column('test4', Numeric, nullable = False), + Column('test5', DateTime), Column('parent_user_id', Integer, ForeignKey('engine_users.user_id')), - Column('test6', DateTime, nullable = False), + Column('test6', DateTime, nullable=False), Column('test7', Text), Column('test8', Binary), - Column('test_passivedefault', deftype, PassiveDefault(defval)), Column('test_passivedefault2', Integer, PassiveDefault("5")), - Column('test_passivedefault3', deftype2, PassiveDefault(defval2)), - Column('test_passivedefault4', deftype3, PassiveDefault(defval3)), Column('test9', Binary(100)), Column('test10', Boolean), Column('test_numeric', Numeric(None, None)), @@ -67,119 +38,41 @@ class ReflectionTest(PersistTest): Column('email_address', String(20)), test_needs_fk=True, ) - meta.drop_all() - - users.create() - addresses.create() - - # clear out table registry - meta.clear() - - try: - addresses = Table('engine_email_addresses', meta, autoload = True) - # reference the addresses foreign key col, which will require users to be - # reflected at some point - users = Table('engine_users', meta, autoload = True) - assert users.c.user_id in users.primary_key - assert len(users.primary_key) == 1 - finally: - addresses.drop() - users.drop() - - # a hack to remove the defaults we got reflecting from postgres - # SERIAL columns, since they reference sequences that were just dropped. - # PG 8.1 doesnt want to create them if the underlying sequence doesnt exist - users.c.user_id.default = None - addresses.c.address_id.default = None + meta.create_all() - users.create() - addresses.create() try: - print users - print addresses - j = join(users, addresses) - print str(j.onclause) - self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause)) + meta2 = MetaData() + reflected_users = Table('engine_users', meta2, autoload=True, autoload_with=testing.db) + reflected_addresses = Table('engine_email_addresses', meta2, autoload=True, autoload_with=testing.db) + self.assert_tables_equal(users, reflected_users) + self.assert_tables_equal(addresses, reflected_addresses) finally: addresses.drop() users.drop() - def test_autoload_partial(self): + def test_include_columns(self): meta = MetaData(testing.db) - foo = Table('foo', meta, - Column('a', String(30)), - Column('b', String(30)), - Column('c', String(30)), - Column('d', String(30)), - Column('e', String(30)), - Column('f', String(30)), - ) + foo = Table('foo', meta, *[Column(n, String(30)) for n in ['a', 'b', 'c', 'd', 'e', 'f']]) meta.create_all() try: meta2 = MetaData(testing.db) - foo2 = Table('foo', meta2, autoload=True, include_columns=['b', 'f', 'e']) + foo = Table('foo', meta2, autoload=True, include_columns=['b', 'f', 'e']) # test that cols come back in original order - assert [c.name for c in foo2.c] == ['b', 'e', 'f'] + self.assertEquals([c.name for c in foo.c], ['b', 'e', 'f']) for c in ('b', 'f', 'e'): - assert c in foo2.c + assert c in foo.c for c in ('a', 'c', 'd'): - assert c not in foo2.c - finally: - meta.drop_all() - - def test_override_create_fkcols(self): - """test that you can override columns and create new foreign keys to other reflected tables - which have no foreign keys. this is common with MySQL MyISAM tables.""" - - meta = MetaData(testing.db) - users = Table('users', meta, - Column('id', Integer, primary_key=True), - Column('name', String(30))) - addresses = Table('addresses', meta, - Column('id', Integer, primary_key=True), - Column('street', String(30)), - Column('user_id', Integer)) - - meta.create_all() - try: - meta2 = MetaData(testing.db) - a2 = Table('addresses', meta2, - Column('user_id', Integer, ForeignKey('users.id')), - autoload=True) - u2 = Table('users', meta2, autoload=True) - - assert len(a2.c.user_id.foreign_keys) == 1 - assert len(a2.foreign_keys) == 1 - assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] - assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] - assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id - assert u2.join(a2).onclause == u2.c.id==a2.c.user_id - + assert c not in foo.c + + # test against a table which is already reflected meta3 = MetaData(testing.db) - u3 = Table('users', meta3, autoload=True) - a3 = Table('addresses', meta3, - Column('user_id', Integer, ForeignKey('users.id')), - autoload=True) - - assert u3.join(a3).onclause == u3.c.id==a3.c.user_id - - meta4 = MetaData(testing.db) - u4 = Table('users', meta4, - Column('id', Integer, key='u_id', primary_key=True), - autoload=True) - a4 = Table('addresses', meta4, - Column('id', Integer, key='street', primary_key=True), - Column('street', String(30), key='user_id'), - Column('user_id', Integer, ForeignKey('users.u_id'), - key='id'), - autoload=True) - - assert u4.join(a4).onclause.compare(u4.c.u_id==a4.c.id) - assert list(u4.primary_key) == [u4.c.u_id] - assert len(u4.columns) == 2 - assert len(u4.constraints) == 1 - assert len(a4.columns) == 3 - assert len(a4.constraints) == 2 + foo = Table('foo', meta3, autoload=True) + foo = Table('foo', meta3, include_columns=['b', 'f', 'e'], useexisting=True) + self.assertEquals([c.name for c in foo.c], ['b', 'e', 'f']) + for c in ('b', 'f', 'e'): + assert c in foo.c + for c in ('a', 'c', 'd'): + assert c not in foo.c finally: meta.drop_all() @@ -221,9 +114,33 @@ class ReflectionTest(PersistTest): dialect_module.ischema_names = ischema_names t.drop() - def test_override_fkandpkcol(self): + def test_basic_override(self): + meta = MetaData(testing.db) + table = Table( + 'override_test', meta, + Column('col1', Integer, primary_key=True), + Column('col2', String(20)), + Column('col3', Numeric) + ) + table.create() + + meta2 = MetaData(testing.db) + try: + table = Table( + 'override_test', meta2, + Column('col2', Unicode()), + Column('col4', String(30)), autoload=True) + + self.assert_(isinstance(table.c.col1.type, Integer)) + self.assert_(isinstance(table.c.col2.type, Unicode)) + self.assert_(isinstance(table.c.col4.type, String)) + finally: + table.drop() + + def test_override_pkfk(self): """test that you can override columns which contain foreign keys to other reflected tables, where the foreign key column is also a primary key column""" + meta = MetaData(testing.db) users = Table('users', meta, Column('id', Integer, primary_key=True), @@ -237,7 +154,7 @@ class ReflectionTest(PersistTest): try: meta2 = MetaData(testing.db) a2 = Table('addresses', meta2, - Column('id', Integer, ForeignKey('users.id'), primary_key=True, ), + Column('id', Integer, ForeignKey('users.id'), primary_key=True), autoload=True) u2 = Table('users', meta2, autoload=True) @@ -245,15 +162,6 @@ class ReflectionTest(PersistTest): assert list(u2.primary_key) == [u2.c.id] assert u2.join(a2).onclause == u2.c.id==a2.c.id - # heres what was originally failing, because a2's primary key - # had two "id" columns, one of which was not part of a2's "c" collection - #class Address(object):pass - #mapper(Address, a2) - #add1 = Address() - #sess = create_session() - #sess.save(add1) - #sess.flush() - meta3 = MetaData(testing.db) u3 = Table('users', meta3, autoload=True) a3 = Table('addresses', meta3, @@ -267,7 +175,63 @@ class ReflectionTest(PersistTest): finally: meta.drop_all() - def test_override_existing_fkcols(self): + def test_override_nonexistent_fk(self): + """test that you can override columns and create new foreign keys to other reflected tables + which have no foreign keys. this is common with MySQL MyISAM tables.""" + + meta = MetaData(testing.db) + users = Table('users', meta, + Column('id', Integer, primary_key=True), + Column('name', String(30))) + addresses = Table('addresses', meta, + Column('id', Integer, primary_key=True), + Column('street', String(30)), + Column('user_id', Integer)) + + meta.create_all() + try: + meta2 = MetaData(testing.db) + a2 = Table('addresses', meta2, + Column('user_id', Integer, ForeignKey('users.id')), + autoload=True) + u2 = Table('users', meta2, autoload=True) + + assert len(a2.c.user_id.foreign_keys) == 1 + assert len(a2.foreign_keys) == 1 + assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] + assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] + assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id + assert u2.join(a2).onclause == u2.c.id==a2.c.user_id + + meta3 = MetaData(testing.db) + u3 = Table('users', meta3, autoload=True) + a3 = Table('addresses', meta3, + Column('user_id', Integer, ForeignKey('users.id')), + autoload=True) + + assert u3.join(a3).onclause == u3.c.id==a3.c.user_id + + meta4 = MetaData(testing.db) + u4 = Table('users', meta4, + Column('id', Integer, key='u_id', primary_key=True), + autoload=True) + a4 = Table('addresses', meta4, + Column('id', Integer, key='street', primary_key=True), + Column('street', String(30), key='user_id'), + Column('user_id', Integer, ForeignKey('users.u_id'), + key='id'), + autoload=True) + + assert u4.join(a4).onclause.compare(u4.c.u_id==a4.c.id) + assert list(u4.primary_key) == [u4.c.u_id] + assert len(u4.columns) == 2 + assert len(u4.constraints) == 1 + assert len(a4.columns) == 3 + assert len(a4.constraints) == 2 + finally: + meta.drop_all() + + def test_override_existing_fk(self): """test that you can override columns and specify new foreign keys to other reflected tables, on columns which *do* already have that foreign key, and that the FK is not duped. """ @@ -282,7 +246,6 @@ class ReflectionTest(PersistTest): Column('user_id', Integer, ForeignKey('users.id')), test_needs_fk=True) - meta.create_all() try: meta2 = MetaData(testing.db) @@ -315,13 +278,45 @@ class ReflectionTest(PersistTest): assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id assert u2.join(a2).onclause == u2.c.id==a2.c.user_id - - finally: meta.drop_all() + + def test_use_existing(self): + meta = MetaData(testing.db) + users = Table('users', meta, + Column('id', Integer, primary_key=True), + Column('name', String(30)), + test_needs_fk=True) + addresses = Table('addresses', meta, + Column('id', Integer,primary_key=True), + Column('user_id', Integer, ForeignKey('users.id')), + Column('data', String(100)), + test_needs_fk=True) + meta.create_all() + try: + meta2 = MetaData(testing.db) + addresses = Table('addresses', meta2, Column('data', Unicode), autoload=True) + try: + users = Table('users', meta2, Column('name', Unicode), autoload=True) + assert False + except exceptions.InvalidRequestError, err: + assert str(err) == "Table 'users' is already defined for this MetaData instance. Specify 'useexisting=True' to redefine options and columns on an existing Table object." + + users = Table('users', meta2, Column('name', Unicode), autoload=True, useexisting=True) + assert isinstance(users.c.name.type, Unicode) + + assert not users.quote + + users = Table('users', meta2, quote=True, autoload=True, useexisting=True) + assert users.quote + + finally: + meta.drop_all() + def test_pks_not_uniques(self): """test that primary key reflection not tripped up by unique indexes""" + testing.db.execute(""" CREATE TABLE book ( id INTEGER NOT NULL, @@ -355,6 +350,7 @@ class ReflectionTest(PersistTest): def test_composite_pks(self): """test reflection of a composite primary key""" + testing.db.execute(""" CREATE TABLE book ( id INTEGER NOT NULL, @@ -380,7 +376,7 @@ class ReflectionTest(PersistTest): """test reflection of composite foreign keys""" meta = MetaData(testing.db) - table = Table( + multi = Table( 'multi', meta, Column('multi_id', Integer, primary_key=True), Column('multi_rev', Integer, primary_key=True), @@ -389,7 +385,7 @@ class ReflectionTest(PersistTest): Column('val', String(100)), test_needs_fk=True, ) - table2 = Table('multi2', meta, + multi2 = Table('multi2', meta, Column('id', Integer, primary_key=True), Column('foo', Integer), Column('bar', Integer), @@ -398,128 +394,19 @@ class ReflectionTest(PersistTest): ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']), test_needs_fk=True, ) - assert table.c.multi_hoho meta.create_all() - meta.clear() try: - table = Table('multi', meta, autoload=True) - table2 = Table('multi2', meta, autoload=True) - - print table - print table2 + meta2 = MetaData() + table = Table('multi', meta2, autoload=True, autoload_with=testing.db) + table2 = Table('multi2', meta2, autoload=True, autoload_with=testing.db) + self.assert_tables_equal(multi, table) + self.assert_tables_equal(multi2, table2) j = join(table, table2) - print str(j.onclause) self.assert_(and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause)) - finally: meta.drop_all() - @testing.exclude('mysql', '<', (4, 1, 1)) - def test_to_metadata(self): - meta = MetaData() - - table = Table('mytable', meta, - Column('myid', Integer, primary_key=True), - Column('name', String(40), nullable=False), - Column('description', String(30), CheckConstraint("description='hi'")), - UniqueConstraint('name'), - test_needs_fk=True, - ) - - table2 = Table('othertable', meta, - Column('id', Integer, primary_key=True), - Column('myid', Integer, ForeignKey('mytable.myid')), - test_needs_fk=True, - ) - - def test_to_metadata(): - meta2 = MetaData() - table_c = table.tometadata(meta2) - table2_c = table2.tometadata(meta2) - return (table_c, table2_c) - - def test_pickle(): - meta.bind = testing.db - meta2 = pickle.loads(pickle.dumps(meta)) - assert meta2.bind is None - meta3 = pickle.loads(pickle.dumps(meta2)) - return (meta2.tables['mytable'], meta2.tables['othertable']) - - def test_pickle_via_reflect(): - # this is the most common use case, pickling the results of a - # database reflection - meta2 = MetaData(bind=testing.db) - t1 = Table('mytable', meta2, autoload=True) - t2 = Table('othertable', meta2, autoload=True) - meta3 = pickle.loads(pickle.dumps(meta2)) - assert meta3.bind is None - assert meta3.tables['mytable'] is not t1 - return (meta3.tables['mytable'], meta3.tables['othertable']) - - meta.create_all(testing.db) - try: - for test, has_constraints in ((test_to_metadata, True), (test_pickle, True), (test_pickle_via_reflect, False)): - table_c, table2_c = test() - assert table is not table_c - assert table_c.c.myid.primary_key - assert isinstance(table_c.c.myid.type, Integer) - assert isinstance(table_c.c.name.type, String) - assert not table_c.c.name.nullable - assert table_c.c.description.nullable - assert table.primary_key is not table_c.primary_key - assert [x.name for x in table.primary_key] == [x.name for x in table_c.primary_key] - assert list(table2_c.c.myid.foreign_keys)[0].column is table_c.c.myid - assert list(table2_c.c.myid.foreign_keys)[0].column is not table.c.myid - - # constraints dont get reflected for any dialect right now - if has_constraints: - for c in table_c.c.description.constraints: - if isinstance(c, CheckConstraint): - break - else: - assert False - assert c.sqltext=="description='hi'" - - for c in table_c.constraints: - if isinstance(c, UniqueConstraint): - break - else: - assert False - assert c.columns.contains_column(table_c.c.name) - assert not c.columns.contains_column(table.c.name) - finally: - meta.drop_all(testing.db) - - def test_nonexistent(self): - self.assertRaises(exceptions.NoSuchTableError, Table, - 'fake_table', - MetaData(testing.db), autoload=True) - - def testoverride(self): - meta = MetaData(testing.db) - table = Table( - 'override_test', meta, - Column('col1', Integer, primary_key=True), - Column('col2', String(20)), - Column('col3', Numeric) - ) - table.create() - # clear out table registry - - meta2 = MetaData(testing.db) - try: - table = Table( - 'override_test', meta2, - Column('col2', Unicode()), - Column('col4', String(30)), autoload=True) - - print repr(table) - self.assert_(isinstance(table.c.col1.type, Integer)) - self.assert_(isinstance(table.c.col2.type, Unicode)) - self.assert_(isinstance(table.c.col4.type, String)) - finally: - table.drop() @testing.unsupported('oracle') def testreserved(self): @@ -552,7 +439,6 @@ class ReflectionTest(PersistTest): index_c = Index('else', table_c.c.join) - #meta.bind.echo = True meta.create_all() index_c.drop() @@ -701,7 +587,21 @@ class CreateDropTest(PersistTest): self.assert_(not Set(metadata.tables) - Set(testing.db.table_names())) metadata.drop_all(bind=testing.db) -class UnicodeTest(PersistTest): +class SchemaManipulationTest(PersistTest): + def test_append_constraint_unique(self): + meta = MetaData() + + users = Table('users', meta, Column('id', Integer)) + addresses = Table('addresses', meta, Column('id', Integer), Column('user_id', Integer)) + + fk = ForeignKeyConstraint(['user_id'],[users.c.id]) + + addresses.append_constraint(fk) + addresses.append_constraint(fk) + assert len(addresses.c.user_id.foreign_keys) == 1 + assert addresses.constraints == set([addresses.primary_key, fk]) + +class UnicodeReflectionTest(PersistTest): def test_basic(self): try: @@ -740,7 +640,6 @@ class UnicodeTest(PersistTest): class SchemaTest(PersistTest): - # this test should really be in the sql tests somewhere, not engine def test_iteration(self): metadata = MetaData() table1 = Table('table1', metadata, diff --git a/test/testlib/__init__.py b/test/testlib/__init__.py index 49ef0ca8a..1cb3647e2 100644 --- a/test/testlib/__init__.py +++ b/test/testlib/__init__.py @@ -8,7 +8,7 @@ from testlib.schema import Table, Column from testlib.orm import mapper import testlib.testing as testing from testlib.testing import rowset -from testlib.testing import PersistTest, AssertMixin, ORMTest, SQLCompileTest +from testlib.testing import PersistTest, AssertMixin, ORMTest, SQLCompileTest, ComparesTables import testlib.profiling as profiling import testlib.engines as engines from testlib.compat import set, frozenset, sorted, _function_named @@ -18,6 +18,6 @@ __all__ = ('testing', 'mapper', 'Table', 'Column', 'rowset', - 'PersistTest', 'AssertMixin', 'ORMTest', 'SQLCompileTest', + 'PersistTest', 'AssertMixin', 'ORMTest', 'SQLCompileTest', 'ComparesTables', 'profiling', 'engines', 'set', 'frozenset', 'sorted', '_function_named') diff --git a/test/testlib/testing.py b/test/testlib/testing.py index 3b7b2992e..5c37f95bd 100644 --- a/test/testlib/testing.py +++ b/test/testlib/testing.py @@ -7,10 +7,10 @@ from cStringIO import StringIO import testlib.config as config from testlib.compat import * -sql, MetaData, clear_mappers, Session, util = None, None, None, None, None +sql, sqltypes, schema, MetaData, clear_mappers, Session, util = None, None, None, None, None, None, None sa_exceptions = None -__all__ = ('PersistTest', 'AssertMixin', 'ORMTest', 'SQLCompileTest') +__all__ = ('PersistTest', 'AssertMixin', 'ComparesTables', 'ORMTest', 'SQLCompileTest') _ops = { '<': operator.lt, '>': operator.gt, @@ -485,10 +485,41 @@ class SQLCompileTest(PersistTest): if checkparams is not None: self.assertEquals(c.construct_params(params), checkparams) +class ComparesTables(object): + def assert_tables_equal(self, table, reflected_table): + global sqltypes, schema + if sqltypes is None: + import sqlalchemy.types as sqltypes + if schema is None: + import sqlalchemy.schema as schema + base_mro = sqltypes.TypeEngine.__mro__ + assert len(table.c) == len(reflected_table.c) + for c, reflected_c in zip(table.c, reflected_table.c): + self.assertEquals(c.name, reflected_c.name) + assert reflected_c is reflected_table.c[c.name] + self.assertEquals(c.primary_key, reflected_c.primary_key) + self.assertEquals(c.nullable, reflected_c.nullable) + assert len( + set(type(reflected_c.type).__mro__).difference(base_mro).intersection( + set(type(c.type).__mro__).difference(base_mro) + ) + ) > 0, "Type '%s' doesn't correspond to type '%s'" % (reflected_c.type, c.type) + + if isinstance(c.type, sqltypes.String): + self.assertEquals(c.type.length, reflected_c.type.length) + + self.assertEquals(set([f.column.name for f in c.foreign_keys]), set([f.column.name for f in reflected_c.foreign_keys])) + if c.default: + assert isinstance(reflected_c.default, schema.PassiveDefault) + elif not c.primary_key or not against('postgres'): + assert reflected_c.default is None + + assert len(table.primary_key) == len(reflected_table.primary_key) + for c in table.primary_key: + assert reflected_table.primary_key.columns[c.name] + + class AssertMixin(PersistTest): - """given a list-based structure of keys/properties which represent information within an object structure, and - a list of actual objects, asserts that the list of objects corresponds to the structure.""" - def assert_result(self, result, class_, *objects): result = list(result) print repr(result) |