diff options
author | Jason Kirtland <jek@discorporate.us> | 2007-12-13 09:59:14 +0000 |
---|---|---|
committer | Jason Kirtland <jek@discorporate.us> | 2007-12-13 09:59:14 +0000 |
commit | 8128a6378affeff76b573b1b4ca1e05e7d00b021 (patch) | |
tree | b0d20234152eb56026d509ea4b205ed086bc742a | |
parent | 2522534311452325513606d765ae398ce8514e2c (diff) | |
download | sqlalchemy-8128a6378affeff76b573b1b4ca1e05e7d00b021.tar.gz |
- Removed @testing.supported. Dialects in development or maintained outside
the tree can now run the full suite of tests out of the box.
- Migrated most @supported to @fails_on, @fails_on_everything_but, or (last
resort) @unsupported. @fails_on revealed a slew of bogus test skippage,
which was corrected.
- Added @fails_on_everything_but. Yes, the first usage *was*
"fails_on_everything_but('postgres')". How did you guess!
- Migrated @supported in dialect/* to the new test-class attribute __only_on__.
- Test classes can also have __unsupported_on__ and __excluded_on__.
-rw-r--r-- | test/dialect/firebird.py | 15 | ||||
-rw-r--r-- | test/dialect/maxdb.py | 18 | ||||
-rwxr-xr-x | test/dialect/mssql.py | 141 | ||||
-rw-r--r-- | test/dialect/mysql.py | 25 | ||||
-rw-r--r-- | test/dialect/oracle.py | 55 | ||||
-rw-r--r-- | test/dialect/postgres.py | 191 | ||||
-rw-r--r-- | test/dialect/sqlite.py | 29 | ||||
-rw-r--r-- | test/engine/execute.py | 16 | ||||
-rw-r--r-- | test/engine/reflection.py | 53 | ||||
-rw-r--r-- | test/engine/transaction.py | 147 | ||||
-rw-r--r-- | test/orm/generative.py | 23 | ||||
-rw-r--r-- | test/orm/inheritance/manytomany.py | 7 | ||||
-rw-r--r-- | test/orm/manytomany.py | 13 | ||||
-rw-r--r-- | test/orm/relationships.py | 153 | ||||
-rw-r--r-- | test/orm/session.py | 229 | ||||
-rw-r--r-- | test/orm/unitofwork.py | 353 | ||||
-rw-r--r-- | test/profiling/zoomark.py | 105 | ||||
-rw-r--r-- | test/sql/defaults.py | 129 | ||||
-rw-r--r-- | test/sql/functions.py | 29 | ||||
-rw-r--r-- | test/sql/query.py | 68 | ||||
-rw-r--r-- | test/testlib/testing.py | 150 |
21 files changed, 1024 insertions, 925 deletions
diff --git a/test/dialect/firebird.py b/test/dialect/firebird.py index 2c527d372..de1f92793 100644 --- a/test/dialect/firebird.py +++ b/test/dialect/firebird.py @@ -5,17 +5,12 @@ from sqlalchemy.exceptions import ProgrammingError from sqlalchemy.sql import table, column from testlib import * -class BasicTest(AssertMixin): - # A simple import of the database/ module should work on all systems. - def test_import(self): - # we got this far, right? - return True - class DomainReflectionTest(AssertMixin): "Test Firebird domains" - @testing.supported('firebird') + __only_on__ = 'firebird' + def setUpAll(self): con = testbase.db.connect() try: @@ -34,7 +29,6 @@ class DomainReflectionTest(AssertMixin): t time, dt timestamp)''') - @testing.supported('firebird') def tearDownAll(self): con = testbase.db.connect() con.execute('DROP TABLE testtable') @@ -43,7 +37,6 @@ class DomainReflectionTest(AssertMixin): con.execute('DROP DOMAIN rem_domain') con.execute('DROP DOMAIN img_domain') - @testing.supported('firebird') def test_table_is_reflected(self): metadata = MetaData(testbase.db) table = Table('testtable', metadata, autoload=True) @@ -74,11 +67,11 @@ class CompileTest(SQLCompileTest): self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)") self.assert_compile(func.current_time(), "CURRENT_TIME") self.assert_compile(func.foo(), "foo") - + m = MetaData() t = Table('sometable', m, Column('col1', Integer), Column('col2', Integer)) self.assert_compile(select([func.max(t.c.col1)]), "SELECT max(sometable.col1) FROM sometable") - + if __name__ == '__main__': testbase.main() diff --git a/test/dialect/maxdb.py b/test/dialect/maxdb.py index 336986744..5397fbba6 100644 --- a/test/dialect/maxdb.py +++ b/test/dialect/maxdb.py @@ -18,6 +18,8 @@ from testlib import * class ReflectionTest(AssertMixin): """Extra reflection tests.""" + __only_on__ = 'maxdb' + def _test_decimal(self, tabledef): """Checks a variety of FIXED usages. @@ -54,7 +56,6 @@ class ReflectionTest(AssertMixin): except exceptions.DatabaseError: pass - @testing.supported('maxdb') def test_decimal_fixed_serial(self): tabledef = """ CREATE TABLE dectest ( @@ -66,7 +67,6 @@ class ReflectionTest(AssertMixin): """ return self._test_decimal(tabledef) - @testing.supported('maxdb') def test_decimal_integer_serial(self): tabledef = """ CREATE TABLE dectest ( @@ -78,7 +78,6 @@ class ReflectionTest(AssertMixin): """ return self._test_decimal(tabledef) - @testing.supported('maxdb') def test_decimal_implicit_serial(self): tabledef = """ CREATE TABLE dectest ( @@ -90,7 +89,6 @@ class ReflectionTest(AssertMixin): """ return self._test_decimal(tabledef) - @testing.supported('maxdb') def test_decimal_smallint_serial(self): tabledef = """ CREATE TABLE dectest ( @@ -102,7 +100,6 @@ class ReflectionTest(AssertMixin): """ return self._test_decimal(tabledef) - @testing.supported('maxdb') def test_decimal_sa_types_1(self): tabledef = Table('dectest', MetaData(), Column('id', Integer, primary_key=True), @@ -112,7 +109,6 @@ class ReflectionTest(AssertMixin): Column('i1', Integer)) return self._test_decimal(tabledef) - @testing.supported('maxdb') def test_decimal_sa_types_2(self): tabledef = Table('dectest', MetaData(), Column('id', Integer, primary_key=True), @@ -122,7 +118,6 @@ class ReflectionTest(AssertMixin): Column('i1', Integer)) return self._test_decimal(tabledef) - @testing.supported('maxdb') def test_decimal_sa_types_3(self): tabledef = Table('dectest', MetaData(), Column('id', Integer, primary_key=True), @@ -132,7 +127,6 @@ class ReflectionTest(AssertMixin): Column('i1', Integer)) return self._test_decimal(tabledef) - @testing.supported('maxdb') def test_assorted_type_aliases(self): """Ensures that aliased types are reflected properly.""" @@ -179,8 +173,9 @@ class DBAPITest(AssertMixin): If any of these fail, that's good- the bug is fixed! """ - - @testing.supported('maxdb') + + __only_on__ = 'maxdb' + def test_dbapi_breaks_sequences(self): con = testbase.db.connect().connection @@ -199,7 +194,6 @@ class DBAPITest(AssertMixin): finally: cr.execute('DROP SEQUENCE busto') - @testing.supported('maxdb') def test_dbapi_breaks_mod_binds(self): con = testbase.db.connect().connection @@ -217,7 +211,6 @@ class DBAPITest(AssertMixin): # OK cr.execute('SELECT MOD(?, 2) FROM DUAL', [3]) - @testing.supported('maxdb') def test_dbapi_breaks_close(self): dialect = testbase.db.dialect cargs, ckw = dialect.create_connect_args(testbase.db.url) @@ -238,7 +231,6 @@ class DBAPITest(AssertMixin): except dialect.dbapi.DatabaseError: self.assert_(True) - @testing.supported('maxdb') def test_modulo_operator(self): st = str(select([sql.column('col') % 5]).compile(testbase.db)) self.assertEquals(st, 'SELECT mod(col, ?) FROM DUAL') diff --git a/test/dialect/mssql.py b/test/dialect/mssql.py index 3207abaa3..00b0db9cc 100755 --- a/test/dialect/mssql.py +++ b/test/dialect/mssql.py @@ -9,7 +9,7 @@ from testlib import * class CompileTest(SQLCompileTest): __dialect__ = mssql.MSSQLDialect() - + def test_insert(self): t = table('sometable', column('somecolumn')) self.assert_compile(t.insert(), "INSERT INTO sometable (somecolumn) VALUES (:somecolumn)") @@ -21,9 +21,9 @@ class CompileTest(SQLCompileTest): def test_count(self): t = table('sometable', column('somecolumn')) self.assert_compile(t.count(), "SELECT count(sometable.somecolumn) AS tbl_row_count FROM sometable") - + def test_union(self): - t1 = table('t1', + t1 = table('t1', column('col1'), column('col2'), column('col3'), @@ -34,11 +34,11 @@ class CompileTest(SQLCompileTest): column('col2'), column('col3'), column('col4')) - + (s1, s2) = ( select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], t1.c.col2.in_(["t1col2r1", "t1col2r2"])), select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], t2.c.col2.in_(["t2col2r2", "t2col2r3"])) - ) + ) u = union(s1, s2, order_by=['col3', 'col4']) self.assert_compile(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2_1, :t1_col2_2) "\ "UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2_1, :t2_col2_2) ORDER BY col3, col4") @@ -54,6 +54,135 @@ class CompileTest(SQLCompileTest): m = MetaData() t = Table('sometable', m, Column('col1', Integer), Column('col2', Integer)) self.assert_compile(select([func.max(t.c.col1)]), "SELECT max(sometable.col1) AS max_1 FROM sometable") - + +class ReflectionTest(PersistTest): + __only_on__ = 'mssql' + + def testidentity(self): + meta = MetaData(testbase.db) + table = Table( + 'identity_test', meta, + Column('col1', Integer, Sequence('fred', 2, 3), primary_key=True) + ) + table.create() + + meta2 = MetaData(testbase.db) + try: + table2 = Table('identity_test', meta2, autoload=True) + assert table2.c['col1'].sequence.start == 2 + assert table2.c['col1'].sequence.increment == 3 + finally: + table.drop() + + +class QueryTest(PersistTest): + __only_on__ = 'mssql' + + def test_fetchid_trigger(self): + meta = MetaData(testbase.db) + t1 = Table('t1', meta, + Column('id', Integer, Sequence('fred', 100, 1), primary_key=True), + Column('descr', String(200))) + t2 = Table('t2', meta, + Column('id', Integer, Sequence('fred', 200, 1), primary_key=True), + Column('descr', String(200))) + meta.create_all() + con = testbase.db.connect() + con.execute("""create trigger paj on t1 for insert as + insert into t2 (descr) select descr from inserted""") + + try: + tr = con.begin() + r = con.execute(t2.insert(), descr='hello') + self.assert_(r.last_inserted_ids() == [200]) + r = con.execute(t1.insert(), descr='hello') + self.assert_(r.last_inserted_ids() == [100]) + + finally: + tr.commit() + con.execute("""drop trigger paj""") + meta.drop_all() + + def test_insertid_schema(self): + meta = MetaData(testbase.db) + con = testbase.db.connect() + con.execute('create schema paj') + tbl = Table('test', meta, Column('id', Integer, primary_key=True), schema='paj') + tbl.create() + try: + tbl.insert().execute({'id':1}) + finally: + tbl.drop() + con.execute('drop schema paj') + + def test_insertid_reserved(self): + meta = MetaData(testbase.db) + table = Table( + 'select', meta, + Column('col', Integer, primary_key=True) + ) + table.create() + + meta2 = MetaData(testbase.db) + try: + table.insert().execute(col=7) + finally: + table.drop() + + def test_select_limit_nooffset(self): + metadata = MetaData(testbase.db) + + users = Table('query_users', metadata, + Column('user_id', INT, primary_key = True), + Column('user_name', VARCHAR(20)), + ) + addresses = Table('query_addresses', metadata, + Column('address_id', Integer, primary_key=True), + Column('user_id', Integer, ForeignKey('query_users.user_id')), + Column('address', String(30))) + metadata.create_all() + + try: + try: + r = users.select(limit=3, offset=2, + order_by=[users.c.user_id]).execute().fetchall() + assert False # InvalidRequestError should have been raised + except exceptions.InvalidRequestError: + pass + finally: + metadata.drop_all() + +class GenerativeQueryTest(PersistTest): + __only_on__ = 'mssql' + + def setUpAll(self): + global foo, metadata + metadata = MetaData(testbase.db) + foo = Table('foo', metadata, + Column('id', Integer, Sequence('foo_id_seq'), + primary_key=True), + Column('bar', Integer), + Column('range', Integer)) + + mapper(Foo, foo) + metadata.create_all() + + sess = create_session(bind=testbase.db) + for i in range(100): + sess.save(Foo(bar=i, range=i%10)) + sess.flush() + + def tearDownAll(self): + metadata.drop_all() + clear_mappers() + + def test_slice_mssql(self): + sess = create_session(bind=testbase.db) + query = sess.query(Foo) + orig = query.all() + assert list(query[:10]) == orig[:10] + assert list(query[:10]) == orig[:10] + + if __name__ == "__main__": testbase.main() diff --git a/test/dialect/mysql.py b/test/dialect/mysql.py index a5e2e5687..11c721e1d 100644 --- a/test/dialect/mysql.py +++ b/test/dialect/mysql.py @@ -9,7 +9,8 @@ from testlib import * class TypesTest(AssertMixin): "Test MySQL column types" - @testing.supported('mysql') + __only_on__ = 'mysql' + def test_basic(self): meta1 = MetaData(testbase.db) table = Table( @@ -43,7 +44,6 @@ class TypesTest(AssertMixin): finally: meta1.drop_all() - @testing.supported('mysql') def test_numeric(self): "Exercise type specification and options for numeric types." @@ -177,7 +177,6 @@ class TypesTest(AssertMixin): raise numeric_table.drop() - @testing.supported('mysql') @testing.exclude('mysql', '<', (4, 1, 1)) def test_charset(self): """Exercise CHARACTER SET and COLLATE-ish options on string types.""" @@ -262,7 +261,6 @@ class TypesTest(AssertMixin): raise charset_table.drop() - @testing.supported('mysql') @testing.exclude('mysql', '<', (5, 0, 5)) def test_bit_50(self): """Exercise BIT types on 5.0+ (not valid for all engine types)""" @@ -326,7 +324,6 @@ class TypesTest(AssertMixin): finally: meta.drop_all() - @testing.supported('mysql') def test_boolean(self): """Test BOOL/TINYINT(1) compatability and reflection.""" @@ -384,7 +381,6 @@ class TypesTest(AssertMixin): finally: meta.drop_all() - @testing.supported('mysql') @testing.exclude('mysql', '<', (4, 1, 0)) def test_timestamp(self): """Exercise funky TIMESTAMP default syntax.""" @@ -428,7 +424,6 @@ class TypesTest(AssertMixin): finally: meta.drop_all() - @testing.supported('mysql') def test_year(self): """Exercise YEAR.""" @@ -459,7 +454,6 @@ class TypesTest(AssertMixin): meta.drop_all() - @testing.supported('mysql') def test_set(self): """Exercise the SET type.""" @@ -517,7 +511,6 @@ class TypesTest(AssertMixin): finally: meta.drop_all() - @testing.supported('mysql') def test_enum(self): """Exercise the ENUM type.""" @@ -586,7 +579,6 @@ class TypesTest(AssertMixin): self.assert_eq(res, expected) enum_table.drop() - @testing.supported('mysql') @testing.exclude('mysql', '>', (3)) def test_enum_parse(self): """More exercises for the ENUM type.""" @@ -616,7 +608,6 @@ class TypesTest(AssertMixin): finally: enum_table.drop() - @testing.supported('mysql') @testing.exclude('mysql', '<', (5, 0, 0)) def test_type_reflection(self): # (ask_for, roundtripped_as_if_different) @@ -679,7 +670,6 @@ class TypesTest(AssertMixin): finally: m.drop_all() - @testing.supported('mysql') def test_autoincrement(self): meta = MetaData(testbase.db) try: @@ -750,11 +740,11 @@ class TypesTest(AssertMixin): class SQLTest(SQLCompileTest): """Tests MySQL-dialect specific compilation.""" - __dialect__ = testbase.db.dialect - @testing.supported('mysql') + __dialect__ = mysql.dialect() + def test_precolumns(self): - dialect = testbase.db.dialect + dialect = self.__dialect__ def gen(distinct=None, prefixes=None): kw = {} @@ -785,7 +775,6 @@ class SQLTest(SQLCompileTest): gen(True, ['high_priority', sql.text('sql_cache')]), 'SELECT high_priority sql_cache DISTINCT q') - @testing.supported('mysql') def test_limit(self): t = sql.table('t', sql.column('col1'), sql.column('col2')) @@ -801,7 +790,6 @@ class SQLTest(SQLCompileTest): "SELECT t.col1, t.col2 FROM t LIMIT 10, 18446744073709551615" ) - @testing.supported('mysql') def test_update_limit(self): t = sql.table('t', sql.column('col1'), sql.column('col2')) @@ -822,7 +810,6 @@ class SQLTest(SQLCompileTest): "UPDATE t SET col1=%s WHERE t.col2 = %s LIMIT 1" ) - @testing.supported('mysql') def test_cast(self): t = sql.table('t', sql.column('col')) m = mysql @@ -907,7 +894,7 @@ class SQLTest(SQLCompileTest): (m.MSEnum, "t.col"), (m.MSEnum("'1'", "'2'"), "t.col"), - (m.MSSet, "t.col"), + (m.MSSet, "t.col"), (m.MSSet("'1'", "'2'"), "t.col"), ] diff --git a/test/dialect/oracle.py b/test/dialect/oracle.py index df8066bdb..c9c19f045 100644 --- a/test/dialect/oracle.py +++ b/test/dialect/oracle.py @@ -2,12 +2,12 @@ import testbase from sqlalchemy import * from sqlalchemy.sql import table, column from sqlalchemy.databases import oracle - from testlib import * class OutParamTest(AssertMixin): - @testing.supported('oracle') + __only_on__ = 'oracle' + def setUpAll(self): testbase.db.execute(""" create or replace procedure foo(x_in IN number, x_out OUT number, y_out OUT number) IS @@ -19,42 +19,40 @@ create or replace procedure foo(x_in IN number, x_out OUT number, y_out OUT numb end; """) - @testing.supported('oracle') def test_out_params(self): result = testbase.db.execute(text("begin foo(:x, :y, :z); end;", bindparams=[bindparam('x', Numeric), outparam('y', Numeric), outparam('z', Numeric)]), x=5) assert result.out_parameters == {'y':10, 'z':75}, result.out_parameters print result.out_parameters - @testing.supported('oracle') def tearDownAll(self): testbase.db.execute("DROP PROCEDURE foo") class CompileTest(SQLCompileTest): __dialect__ = oracle.OracleDialect() - + def test_subquery(self): t = table('sometable', column('col1'), column('col2')) s = select([t]) s = select([s.c.col1, s.c.col2]) - + self.assert_compile(s, "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 FROM sometable)") def test_limit(self): t = table('sometable', column('col1'), column('col2')) - + s = select([t]).limit(10).offset(20) - + self.assert_compile(s, "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2, " "ROW_NUMBER() OVER (ORDER BY sometable.rowid) AS ora_rn FROM sometable) WHERE ora_rn>20 AND ora_rn<=30" ) - + s = select([s.c.col1, s.c.col2]) - + self.assert_compile(s, "SELECT col1, col2 FROM (SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, " "sometable.col2 AS col2, ROW_NUMBER() OVER (ORDER BY sometable.rowid) AS ora_rn FROM sometable) WHERE ora_rn>20 AND ora_rn<=30)") - # testing this twice to ensure oracle doesn't modify the original statement + # testing this twice to ensure oracle doesn't modify the original statement self.assert_compile(s, "SELECT col1, col2 FROM (SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, " "sometable.col2 AS col2, ROW_NUMBER() OVER (ORDER BY sometable.rowid) AS ora_rn FROM sometable) WHERE ora_rn>20 AND ora_rn<=30)") @@ -64,20 +62,20 @@ class CompileTest(SQLCompileTest): "sometable.col2 AS col2, ROW_NUMBER() OVER (ORDER BY sometable.col2) AS ora_rn FROM sometable ORDER BY sometable.col2) WHERE ora_rn>20 AND ora_rn<=30") def test_outer_join(self): - table1 = table('mytable', + table1 = table('mytable', column('myid', Integer), column('name', String), column('description', String), ) table2 = table( - 'myothertable', + 'myothertable', column('otherid', Integer), column('othername', String), ) table3 = table( - 'thirdtable', + 'thirdtable', column('userid', Integer), column('otherstuff', String), ) @@ -92,7 +90,7 @@ class CompileTest(SQLCompileTest): ), from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ] ) - self.assert_compile(query, + self.assert_compile(query, "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \ (mytable.name = :mytable_name_1 OR mytable.myid = :mytable_myid_1 OR \ @@ -101,11 +99,11 @@ myothertable.othername != :myothertable_othername_1 OR EXISTS (select yay from f query = table1.outerjoin(table2, table1.c.myid==table2.c.otherid).outerjoin(table3, table3.c.userid==table2.c.otherid) self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER JOIN thirdtable ON thirdtable.userid = myothertable.otherid") - self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable, myothertable, thirdtable WHERE mytable.myid = myothertable.otherid(+) AND thirdtable.userid(+) = myothertable.otherid", dialect=oracle.dialect(use_ansi=False)) + self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable, myothertable, thirdtable WHERE mytable.myid = myothertable.otherid(+) AND thirdtable.userid(+) = myothertable.otherid", dialect=oracle.dialect(use_ansi=False)) query = table1.join(table2, table1.c.myid==table2.c.otherid).join(table3, table3.c.userid==table2.c.otherid) - self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable, myothertable, thirdtable WHERE mytable.myid = myothertable.otherid AND thirdtable.userid = myothertable.otherid", dialect=oracle.dialect(use_ansi=False)) - + self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable, myothertable, thirdtable WHERE mytable.myid = myothertable.otherid AND thirdtable.userid = myothertable.otherid", dialect=oracle.dialect(use_ansi=False)) + def test_alias_outer_join(self): address_types = table('address_types', column('id'), @@ -118,7 +116,7 @@ myothertable.othername != :myothertable_othername_1 OR EXISTS (select yay from f column('email_address') ) at_alias = address_types.alias() - + s = select([at_alias, addresses]).\ select_from(addresses.outerjoin(at_alias, addresses.c.address_type_id==at_alias.c.id)).\ where(addresses.c.user_id==7).\ @@ -129,25 +127,26 @@ myothertable.othername != :myothertable_othername_1 OR EXISTS (select yay from f "address_types.rowid") class TypesTest(SQLCompileTest): + __only_on__ = 'oracle' + def test_no_clobs_for_string_params(self): """test that simple string params get a DBAPI type of VARCHAR, not CLOB. - this is to prevent setinputsizes from setting up cx_oracle.CLOBs on + this is to prevent setinputsizes from setting up cx_oracle.CLOBs on string-based bind params [ticket:793].""" - + class FakeDBAPI(object): def __getattr__(self, attr): return attr dialect = oracle.OracleDialect() dbapi = FakeDBAPI() - + b = bindparam("foo", "hello world!") assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING' b = bindparam("foo", u"hello world!") assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING' - - @testing.supported('oracle') + def test_longstring(self): metadata = MetaData(testbase.db) testbase.db.execute(""" @@ -155,7 +154,7 @@ class TypesTest(SQLCompileTest): ( ID NUMERIC(22) PRIMARY KEY, ADD_USER VARCHAR2(20) NOT NULL - ) + ) """) try: t = Table("z_test", metadata, autoload=True) @@ -163,7 +162,7 @@ class TypesTest(SQLCompileTest): assert t.select().execute().fetchall() == [(1, 'foobar')] finally: testbase.db.execute("DROP TABLE Z_TEST") - + class SequenceTest(SQLCompileTest): def test_basic(self): seq = Sequence("my_seq_no_schema") @@ -175,7 +174,7 @@ class SequenceTest(SQLCompileTest): seq = Sequence("My_Seq", schema="Some_Schema") assert dialect.identifier_preparer.format_sequence(seq) == '"Some_Schema"."My_Seq"' - - + + if __name__ == '__main__': testbase.main() diff --git a/test/dialect/postgres.py b/test/dialect/postgres.py index 4affabb6c..11e2c139e 100644 --- a/test/dialect/postgres.py +++ b/test/dialect/postgres.py @@ -22,46 +22,47 @@ class SequenceTest(SQLCompileTest): class CompileTest(SQLCompileTest): def test_update_returning(self): dialect = postgres.dialect() - table1 = table('mytable', + table1 = table('mytable', column('myid', Integer), column('name', String), column('description', String), ) - + u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name]) self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect) - + u = update(table1, values=dict(name='foo'), postgres_returning=[table1]) self.assert_compile(u, "UPDATE mytable SET name=%(name)s "\ "RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect) - + u = update(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)]) self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING length(mytable.name)", dialect=dialect) - + def test_insert_returning(self): dialect = postgres.dialect() - table1 = table('mytable', + table1 = table('mytable', column('myid', Integer), column('name', String), column('description', String), ) - + i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name]) self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect) - + i = insert(table1, values=dict(name='foo'), postgres_returning=[table1]) self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) "\ "RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect) - + i = insert(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)]) self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING length(mytable.name)", dialect=dialect) class ReturningTest(AssertMixin): - @testing.supported('postgres') + __only_on__ = 'postgres' + @testing.exclude('postgres', '<', (8, 2)) def test_update_returning(self): meta = MetaData(testbase.db) - table = Table('tables', meta, + table = Table('tables', meta, Column('id', Integer, primary_key=True), Column('persons', Integer), Column('full', Boolean) @@ -69,20 +70,19 @@ class ReturningTest(AssertMixin): table.create() try: table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}]) - + result = table.update(table.c.persons > 4, dict(full=True), postgres_returning=[table.c.id]).execute() self.assertEqual(result.fetchall(), [(1,)]) - + result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute() self.assertEqual(result2.fetchall(), [(1,True),(2,False)]) finally: table.drop() - @testing.supported('postgres') @testing.exclude('postgres', '<', (8, 2)) def test_insert_returning(self): meta = MetaData(testbase.db) - table = Table('tables', meta, + table = Table('tables', meta, Column('id', Integer, primary_key=True), Column('persons', Integer), Column('full', Boolean) @@ -90,104 +90,99 @@ class ReturningTest(AssertMixin): table.create() try: result = table.insert(postgres_returning=[table.c.id]).execute({'persons': 1, 'full': False}) - + self.assertEqual(result.fetchall(), [(1,)]) - + # Multiple inserts only return the last row result2 = table.insert(postgres_returning=[table]).execute( [{'persons': 2, 'full': False}, {'persons': 3, 'full': True}]) - + self.assertEqual(result2.fetchall(), [(3,3,True)]) - + result3 = table.insert(postgres_returning=[(table.c.id*2).label('double_id')]).execute({'persons': 4, 'full': False}) self.assertEqual([dict(row) for row in result3], [{'double_id':8}]) - + result4 = testbase.db.execute('insert into tables (id, persons, "full") values (5, 10, true) returning persons') self.assertEqual([dict(row) for row in result4], [{'persons': 10}]) finally: table.drop() - - + + class InsertTest(AssertMixin): - @testing.supported('postgres') + __only_on__ = 'postgres' + def setUpAll(self): global metadata metadata = MetaData(testbase.db) - - @testing.supported('postgres') + def tearDown(self): metadata.drop_all() metadata.tables.clear() - - @testing.supported('postgres') + def test_compiled_insert(self): - table = Table('testtable', metadata, + table = Table('testtable', metadata, Column('id', Integer, primary_key=True), Column('data', String(30))) - + metadata.create_all() ins = table.insert(values={'data':bindparam('x')}).compile() ins.execute({'x':"five"}, {'x':"seven"}) assert table.select().execute().fetchall() == [(1, 'five'), (2, 'seven')] - - @testing.supported('postgres') + def test_sequence_insert(self): - table = Table('testtable', metadata, + table = Table('testtable', metadata, Column('id', Integer, Sequence('my_seq'), primary_key=True), Column('data', String(30))) metadata.create_all() self._assert_data_with_sequence(table, "my_seq") - @testing.supported('postgres') def test_opt_sequence_insert(self): - table = Table('testtable', metadata, + table = Table('testtable', metadata, Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True), Column('data', String(30))) metadata.create_all() self._assert_data_autoincrement(table) - @testing.supported('postgres') def test_autoincrement_insert(self): - table = Table('testtable', metadata, + table = Table('testtable', metadata, Column('id', Integer, primary_key=True), Column('data', String(30))) metadata.create_all() self._assert_data_autoincrement(table) - @testing.supported('postgres') def test_noautoincrement_insert(self): - table = Table('testtable', metadata, + table = Table('testtable', metadata, Column('id', Integer, primary_key=True, autoincrement=False), Column('data', String(30))) metadata.create_all() self._assert_data_noautoincrement(table) - + def _assert_data_autoincrement(self, table): def go(): # execute with explicit id r = table.insert().execute({'id':30, 'data':'d1'}) assert r.last_inserted_ids() == [30] - + # execute with prefetch id r = table.insert().execute({'data':'d2'}) assert r.last_inserted_ids() == [1] - + # executemany with explicit ids table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}) - + # executemany, uses SERIAL table.insert().execute({'data':'d5'}, {'data':'d6'}) - + # single execute, explicit id, inline table.insert(inline=True).execute({'id':33, 'data':'d7'}) - + # single execute, inline, uses SERIAL table.insert(inline=True).execute({'data':'d8'}) - + # note that the test framework doesnt capture the "preexecute" of a seqeuence # or default. we just see it in the bind params. - + self.assert_sql(testbase.db, go, [], with_sequences=[ ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", @@ -214,7 +209,7 @@ class InsertTest(AssertMixin): [{'data':'d8'}] ), ]) - + assert table.select().execute().fetchall() == [ (30, 'd1'), (1, 'd2'), @@ -227,7 +222,7 @@ class InsertTest(AssertMixin): ] table.delete().execute() - # test the same series of events using a reflected + # test the same series of events using a reflected # version of the table m2 = MetaData(testbase.db) table = Table(table.name, m2, autoload=True) @@ -240,7 +235,7 @@ class InsertTest(AssertMixin): table.insert().execute({'data':'d5'}, {'data':'d6'}) table.insert(inline=True).execute({'id':33, 'data':'d7'}) table.insert(inline=True).execute({'data':'d8'}) - + self.assert_sql(testbase.db, go, [], with_sequences=[ ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", @@ -267,7 +262,7 @@ class InsertTest(AssertMixin): [{'data':'d8'}] ), ]) - + assert table.select().execute().fetchall() == [ (30, 'd1'), (5, 'd2'), @@ -279,7 +274,7 @@ class InsertTest(AssertMixin): (8, 'd8'), ] table.delete().execute() - + def _assert_data_with_sequence(self, table, seqname): def go(): table.insert().execute({'id':30, 'data':'d1'}) @@ -326,10 +321,10 @@ class InsertTest(AssertMixin): (33, 'd7'), (4, 'd8'), ] - - # cant test reflection here since the Sequence must be + + # cant test reflection here since the Sequence must be # explicitly specified - + def _assert_data_noautoincrement(self, table): table.insert().execute({'id':30, 'data':'d1'}) try: @@ -342,10 +337,10 @@ class InsertTest(AssertMixin): assert False except exceptions.IntegrityError, e: assert "violates not-null constraint" in str(e) - + table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'}) table.insert(inline=True).execute({'id':33, 'data':'d4'}) - + assert table.select().execute().fetchall() == [ (30, 'd1'), (31, 'd2'), @@ -354,7 +349,7 @@ class InsertTest(AssertMixin): ] table.delete().execute() - # test the same series of events using a reflected + # test the same series of events using a reflected # version of the table m2 = MetaData(testbase.db) table = Table(table.name, m2, autoload=True) @@ -369,21 +364,22 @@ class InsertTest(AssertMixin): assert False except exceptions.IntegrityError, e: assert "violates not-null constraint" in str(e) - + table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'}) table.insert(inline=True).execute({'id':33, 'data':'d4'}) - + assert table.select().execute().fetchall() == [ (30, 'd1'), (31, 'd2'), (32, 'd3'), (33, 'd4'), ] - + class DomainReflectionTest(AssertMixin): "Test PostgreSQL domains" - @testing.supported('postgres') + __only_on__ = 'postgres' + def setUpAll(self): con = testbase.db.connect() try: @@ -396,7 +392,6 @@ class DomainReflectionTest(AssertMixin): con.execute('CREATE TABLE alt_schema.testtable(question integer, answer alt_schema.testdomain, anything integer)') con.execute('CREATE TABLE crosschema (question integer, answer alt_schema.testdomain)') - @testing.supported('postgres') def tearDownAll(self): con = testbase.db.connect() con.execute('DROP TABLE testtable') @@ -405,35 +400,30 @@ class DomainReflectionTest(AssertMixin): con.execute('DROP DOMAIN testdomain') con.execute('DROP DOMAIN alt_schema.testdomain') - @testing.supported('postgres') def test_table_is_reflected(self): metadata = MetaData(testbase.db) table = Table('testtable', metadata, autoload=True) self.assertEquals(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns") self.assertEquals(table.c.answer.type.__class__, postgres.PGInteger) - - @testing.supported('postgres') + def test_domain_is_reflected(self): metadata = MetaData(testbase.db) table = Table('testtable', metadata, autoload=True) self.assertEquals(str(table.columns.answer.default.arg), '42', "Reflected default value didn't equal expected value") self.assertFalse(table.columns.answer.nullable, "Expected reflected column to not be nullable.") - @testing.supported('postgres') def test_table_is_reflected_alt_schema(self): metadata = MetaData(testbase.db) table = Table('testtable', metadata, autoload=True, schema='alt_schema') self.assertEquals(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns") self.assertEquals(table.c.anything.type.__class__, postgres.PGInteger) - @testing.supported('postgres') def test_schema_domain_is_reflected(self): metadata = MetaData(testbase.db) table = Table('testtable', metadata, autoload=True, schema='alt_schema') self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value") self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.") - @testing.supported('postgres') def test_crosschema_domain_is_reflected(self): metadata = MetaData(testbase.db) table = Table('crosschema', metadata, autoload=True) @@ -441,10 +431,11 @@ class DomainReflectionTest(AssertMixin): self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.") class MiscTest(AssertMixin): - @testing.supported('postgres') + __only_on__ = 'postgres' + def test_date_reflection(self): m1 = MetaData(testbase.db) - t1 = Table('pgdate', m1, + t1 = Table('pgdate', m1, Column('date1', DateTime(timezone=True)), Column('date2', DateTime(timezone=False)) ) @@ -457,7 +448,6 @@ class MiscTest(AssertMixin): finally: m1.drop_all() - @testing.supported('postgres') def test_pg_weirdchar_reflection(self): meta1 = MetaData(testbase.db) subject = Table("subject", meta1, @@ -477,11 +467,10 @@ class MiscTest(AssertMixin): self.assert_((subject.c['id$']==referer.c.ref).compare(subject.join(referer).onclause)) finally: meta1.drop_all() - - @testing.supported('postgres') + def test_checksfor_sequence(self): meta1 = MetaData(testbase.db) - t = Table('mytable', meta1, + t = Table('mytable', meta1, Column('col1', Integer, Sequence('fooseq'))) try: testbase.db.execute("CREATE SEQUENCE fooseq") @@ -489,7 +478,6 @@ class MiscTest(AssertMixin): finally: t.drop(checkfirst=True) - @testing.supported('postgres') def test_distinct_on(self): t = Table('mytable', MetaData(testbase.db), Column('id', Integer, primary_key=True), @@ -507,7 +495,6 @@ class MiscTest(AssertMixin): 'SELECT DISTINCT ON (mytable.id, mytable.a) mytable.id, mytable.a \n' 'FROM mytable') - @testing.supported('postgres') def test_schema_reflection(self): """note: this test requires that the 'alt_schema' schema be separate and accessible by the test user""" @@ -538,7 +525,6 @@ class MiscTest(AssertMixin): finally: meta1.drop_all() - @testing.supported('postgres') def test_schema_reflection_2(self): meta1 = MetaData(testbase.db) subject = Table("subject", meta1, @@ -558,8 +544,7 @@ class MiscTest(AssertMixin): self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause)) finally: meta1.drop_all() - - @testing.supported('postgres') + def test_schema_reflection_3(self): meta1 = MetaData(testbase.db) subject = Table("subject", meta1, @@ -581,13 +566,12 @@ class MiscTest(AssertMixin): self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause)) finally: meta1.drop_all() - - @testing.supported('postgres') + def test_preexecute_passivedefault(self): - """test that when we get a primary key column back + """test that when we get a primary key column back from reflecting a table which has a default value on it, we pre-execute that PassiveDefault upon insert.""" - + try: meta = MetaData(testbase.db) testbase.db.execute(""" @@ -608,25 +592,29 @@ class MiscTest(AssertMixin): finally: testbase.db.execute("drop table speedy_users", None) - @testing.supported('postgres') def test_create_partial_index(self): tbl = Table('testtbl', MetaData(), Column('data',Integer)) idx = Index('test_idx1', tbl.c.data, postgres_where=and_(tbl.c.data > 5, tbl.c.data < 10)) - + executed_sql = [] mock_strategy = MockEngineStrategy() mock_conn = mock_strategy.create('postgres://', executed_sql.append) - + idx.create(mock_conn) - + assert executed_sql == ['CREATE INDEX test_idx1 ON testtbl (data) WHERE testtbl.data > 5 AND testtbl.data < 10'] class TimezoneTest(AssertMixin): - """test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it, - if postgres returns it. python then will not let you compare a datetime with a tzinfo to a datetime - that doesnt have one. this test illustrates two ways to have datetime types with and without timezone - info. """ - @testing.supported('postgres') + """Test timezone-aware datetimes. + + psycopg will return a datetime with a tzinfo attached to it, if postgres + returns it. python then will not let you compare a datetime with a tzinfo + to a datetime that doesnt have one. this test illustrates two ways to + have datetime types with and without timezone info. + """ + + __only_on__ = 'postgres' + def setUpAll(self): global tztable, notztable, metadata metadata = MetaData(testbase.db) @@ -643,11 +631,9 @@ class TimezoneTest(AssertMixin): Column("name", String(20)), ) metadata.create_all() - @testing.supported('postgres') def tearDownAll(self): metadata.drop_all() - @testing.supported('postgres') def test_with_timezone(self): # get a date with a tzinfo somedate = testbase.db.connect().scalar(func.current_timestamp().select()) @@ -655,7 +641,6 @@ class TimezoneTest(AssertMixin): c = tztable.update(tztable.c.id==1).execute(name='newname') print tztable.select(tztable.c.id==1).execute().fetchone() - @testing.supported('postgres') def test_without_timezone(self): # get a date without a tzinfo somedate = datetime.datetime(2005, 10,20, 11, 52, 00) @@ -664,22 +649,21 @@ class TimezoneTest(AssertMixin): print notztable.select(tztable.c.id==1).execute().fetchone() class ArrayTest(AssertMixin): - @testing.supported('postgres') + __only_on__ = 'postgres' + def setUpAll(self): global metadata, arrtable metadata = MetaData(testbase.db) - + arrtable = Table('arrtable', metadata, Column('id', Integer, primary_key=True), Column('intarr', postgres.PGArray(Integer)), Column('strarr', postgres.PGArray(String), nullable=False) ) metadata.create_all() - @testing.supported('postgres') def tearDownAll(self): metadata.drop_all() - - @testing.supported('postgres') + def test_reflect_array_column(self): metadata2 = MetaData(testbase.db) tbl = Table('arrtable', metadata2, autoload=True) @@ -687,8 +671,7 @@ class ArrayTest(AssertMixin): self.assertTrue(isinstance(tbl.c.strarr.type, postgres.PGArray)) self.assertTrue(isinstance(tbl.c.intarr.type.item_type, Integer)) self.assertTrue(isinstance(tbl.c.strarr.type.item_type, String)) - - @testing.supported('postgres') + def test_insert_array(self): arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def']) results = arrtable.select().execute().fetchall() @@ -697,7 +680,6 @@ class ArrayTest(AssertMixin): self.assertEquals(results[0]['strarr'], ['abc','def']) arrtable.delete().execute() - @testing.supported('postgres') def test_array_where(self): arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def']) arrtable.insert().execute(intarr=[4,5,6], strarr='ABC') @@ -705,8 +687,7 @@ class ArrayTest(AssertMixin): self.assertEquals(len(results), 1) self.assertEquals(results[0]['intarr'], [1,2,3]) arrtable.delete().execute() - - @testing.supported('postgres') + def test_array_concat(self): arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def']) results = select([arrtable.c.intarr + [4,5,6]]).execute().fetchall() diff --git a/test/dialect/sqlite.py b/test/dialect/sqlite.py index f3eac38f9..726a27183 100644 --- a/test/dialect/sqlite.py +++ b/test/dialect/sqlite.py @@ -9,12 +9,13 @@ from testlib import * class TestTypes(AssertMixin): - @testing.supported('sqlite') + __only_on__ = 'sqlite' + def test_date(self): meta = MetaData(testbase.db) t = Table('testdate', meta, Column('id', Integer, primary_key=True), - Column('adate', Date), + Column('adate', Date), Column('adatetime', DateTime)) meta.create_all() try: @@ -22,17 +23,18 @@ class TestTypes(AssertMixin): d2 = datetime.datetime(2007, 10, 30) t.insert().execute(adate=str(d1), adatetime=str(d2)) - + self.assert_(t.select().execute().fetchall()[0] == (1, datetime.date(2007, 10, 30), datetime.datetime(2007, 10, 30))) - + finally: meta.drop_all() class DialectTest(AssertMixin): - @testing.supported('sqlite') + __only_on__ = 'sqlite' + def test_extra_reserved_words(self): """Tests reserved words in identifiers. @@ -45,7 +47,7 @@ class DialectTest(AssertMixin): meta = MetaData(testbase.db) t = Table('reserved', meta, Column('safe', Integer), - Column('true', Integer), + Column('true', Integer), Column('false', Integer), Column('column', Integer)) @@ -56,7 +58,6 @@ class DialectTest(AssertMixin): finally: meta.drop_all() - @testing.supported('sqlite') def test_quoted_identifiers(self): """Tests autoload of tables created with quoted column names.""" @@ -89,6 +90,8 @@ class DialectTest(AssertMixin): class InsertTest(AssertMixin): """Tests inserts and autoincrement.""" + __only_on__ = 'sqlite' + # empty insert (i.e. INSERT INTO table DEFAULT VALUES) # fails as recently as sqlite 3.3.6. passes on 3.4.1. this syntax # is nowhere to be found in the sqlite3 documentation or changelog, so can't @@ -106,14 +109,12 @@ class InsertTest(AssertMixin): finally: table.drop() - @testing.supported('sqlite') @testing.exclude('sqlite', '<', (3, 4)) def test_empty_insert_pk1(self): self._test_empty_insert( Table('a', MetaData(testbase.db), Column('id', Integer, primary_key=True))) - @testing.supported('sqlite') @testing.exclude('sqlite', '<', (3, 4)) def test_empty_insert_pk2(self): self.assertRaises( @@ -123,7 +124,6 @@ class InsertTest(AssertMixin): Column('x', Integer, primary_key=True), Column('y', Integer, primary_key=True))) - @testing.supported('sqlite') @testing.exclude('sqlite', '<', (3, 4)) def test_empty_insert_pk3(self): self.assertRaises( @@ -134,7 +134,6 @@ class InsertTest(AssertMixin): Column('y', Integer, PassiveDefault('123'), primary_key=True))) - @testing.supported('sqlite') @testing.exclude('sqlite', '<', (3, 4)) def test_empty_insert_pk4(self): self._test_empty_insert( @@ -142,14 +141,12 @@ class InsertTest(AssertMixin): Column('x', Integer, primary_key=True), Column('y', Integer, PassiveDefault('123')))) - @testing.supported('sqlite') @testing.exclude('sqlite', '<', (3, 4)) def test_empty_insert_nopk1(self): self._test_empty_insert( Table('e', MetaData(testbase.db), Column('id', Integer))) - - @testing.supported('sqlite') + @testing.exclude('sqlite', '<', (3, 4)) def test_empty_insert_nopk2(self): self._test_empty_insert( @@ -157,7 +154,6 @@ class InsertTest(AssertMixin): Column('x', Integer), Column('y', Integer))) - @testing.supported('sqlite') def test_inserts_with_spaces(self): tbl = Table('tbl', MetaData('sqlite:///'), Column('with space', Integer), @@ -172,6 +168,7 @@ class InsertTest(AssertMixin): finally: tbl.drop() - + + if __name__ == "__main__": testbase.main() diff --git a/test/engine/execute.py b/test/engine/execute.py index 6cf3cccd9..a79d18247 100644 --- a/test/engine/execute.py +++ b/test/engine/execute.py @@ -12,13 +12,13 @@ class ExecuteTest(PersistTest): Column('user_name', VARCHAR(20)), ) metadata.create_all() - + def tearDown(self): testbase.db.connect().execute(users.delete()) def tearDownAll(self): metadata.drop_all() - - @testing.supported('sqlite', 'maxdb') + + @testing.fails_on_everything_except('sqlite', 'maxdb') def test_raw_qmark(self): for conn in (testbase.db, testbase.db.connect()): conn.execute("insert into users (user_id, user_name) values (?, ?)", (1,"jack")) @@ -30,7 +30,8 @@ class ExecuteTest(PersistTest): assert res.fetchall() == [(1, "jack"), (2, "fred"), (3, "ed"), (4, "horse"), (5, "barney"), (6, "donkey"), (7, 'sally')] conn.execute("delete from users") - @testing.supported('mysql', 'postgres') + @testing.fails_on_everything_except('mysql', 'postgres') + # some psycopg2 versions bomb this. def test_raw_sprintf(self): for conn in (testbase.db, testbase.db.connect()): conn.execute("insert into users (user_id, user_name) values (%s, %s)", [1,"jack"]) @@ -43,7 +44,8 @@ class ExecuteTest(PersistTest): # pyformat is supported for mysql, but skipping because a few driver # versions have a bug that bombs out on this test. (1.2.2b3, 1.2.2c1, 1.2.2) - @testing.supported('postgres') + @testing.unsupported('mysql') + @testing.fails_on_everything_except('postgres') def test_raw_python(self): for conn in (testbase.db, testbase.db.connect()): conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':1, 'name':'jack'}) @@ -53,7 +55,7 @@ class ExecuteTest(PersistTest): assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')] conn.execute("delete from users") - @testing.supported('sqlite') + @testing.fails_on_everything_except('sqlite') def test_raw_named(self): for conn in (testbase.db, testbase.db.connect()): conn.execute("insert into users (user_id, user_name) values (:id, :name)", {'id':1, 'name':'jack'}) @@ -72,4 +74,4 @@ class ExecuteTest(PersistTest): assert True if __name__ == "__main__": - testbase.main() + testbase.main() diff --git a/test/engine/reflection.py b/test/engine/reflection.py index d44ccaf17..9b560e668 100644 --- a/test/engine/reflection.py +++ b/test/engine/reflection.py @@ -491,23 +491,6 @@ class ReflectionTest(PersistTest): finally: table.drop() - @testing.supported('mssql') - def testidentity(self): - meta = MetaData(testbase.db) - table = Table( - 'identity_test', meta, - Column('col1', Integer, Sequence('fred', 2, 3), primary_key=True) - ) - table.create() - - meta2 = MetaData(testbase.db) - try: - table2 = Table('identity_test', meta2, autoload=True) - assert table2.c['col1'].sequence.start == 2 - assert table2.c['col1'].sequence.increment == 3 - finally: - table.drop() - @testing.unsupported('oracle') def testreserved(self): # check a table that uses an SQL reserved name doesn't cause an error @@ -755,31 +738,37 @@ class SchemaTest(PersistTest): assert buf.index("CREATE TABLE someschema.table1") > -1 assert buf.index("CREATE TABLE someschema.table2") > -1 - @testing.supported('maxdb', 'mysql', 'postgres') + @testing.unsupported('sqlite', 'firebird') + # fixme: revisit these below. + @testing.fails_on('oracle', 'mssql', 'sybase', 'access') def test_explicit_default_schema(self): engine = testbase.db - schema = engine.dialect.get_default_schema_name(engine) if testing.against('mysql'): schema = testbase.db.url.database elif testing.against('postgres'): schema = 'public' + else: + schema = engine.dialect.get_default_schema_name(engine) - metadata = MetaData(testbase.db) + metadata = MetaData(engine) table1 = Table('table1', metadata, - Column('col1', Integer, primary_key=True), - schema=schema) + Column('col1', Integer, primary_key=True), + schema=schema) table2 = Table('table2', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', Integer, ForeignKey('%s.table1.col1' % schema)), - schema=schema) - metadata.create_all() - metadata.create_all(checkfirst=True) - metadata.clear() + Column('col1', Integer, primary_key=True), + Column('col2', Integer, + ForeignKey('%s.table1.col1' % schema)), + schema=schema) + try: + metadata.create_all() + metadata.create_all(checkfirst=True) + metadata.clear() - table1 = Table('table1', metadata, autoload=True, schema=schema) - table2 = Table('table2', metadata, autoload=True, schema=schema) - metadata.drop_all() + table1 = Table('table1', metadata, autoload=True, schema=schema) + table2 = Table('table2', metadata, autoload=True, schema=schema) + finally: + metadata.drop_all() class HasSequenceTest(PersistTest): @@ -791,7 +780,7 @@ class HasSequenceTest(PersistTest): Column('user_name', String(40)), ) - @testing.supported('firebird', 'postgres', 'oracle') + @testing.unsupported('sqlite', 'mysql', 'mssql', 'access', 'sybase') def test_hassequence(self): metadata.create_all(bind=testbase.db) self.assertEqual(testbase.db.dialect.has_sequence(testbase.db, 'user_id_seq'), True) diff --git a/test/engine/transaction.py b/test/engine/transaction.py index b6a47e386..224c5d383 100644 --- a/test/engine/transaction.py +++ b/test/engine/transaction.py @@ -16,12 +16,12 @@ class TransactionTest(PersistTest): test_needs_acid=True, ) users.create(testbase.db) - + def tearDown(self): testbase.db.connect().execute(users.delete()) def tearDownAll(self): users.drop(testbase.db) - + def testcommits(self): connection = testbase.db.connect() transaction = connection.begin() @@ -37,7 +37,7 @@ class TransactionTest(PersistTest): result = connection.execute("select * from query_users") assert len(result.fetchall()) == 3 transaction.commit() - + def testrollback(self): """test a basic rollback""" connection = testbase.db.connect() @@ -46,14 +46,14 @@ class TransactionTest(PersistTest): connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') transaction.rollback() - + result = connection.execute("select * from query_users") assert len(result.fetchall()) == 0 connection.close() def testraise(self): connection = testbase.db.connect() - + transaction = connection.begin() try: connection.execute(users.insert(), user_id=1, user_name='user1') @@ -64,15 +64,15 @@ class TransactionTest(PersistTest): except Exception , e: print "Exception: ", e transaction.rollback() - + result = connection.execute("select * from query_users") assert len(result.fetchall()) == 0 connection.close() - + @testing.exclude('mysql', '<', (5, 0, 3)) def testnestedrollback(self): connection = testbase.db.connect() - + try: transaction = connection.begin() try: @@ -97,7 +97,7 @@ class TransactionTest(PersistTest): assert str(e) == 'uh oh' # and not "This transaction is inactive" finally: connection.close() - + @testing.exclude('mysql', '<', (5, 0, 3)) def testnesting(self): @@ -159,8 +159,8 @@ class TransactionTest(PersistTest): assert len(result.fetchall()) == 0 connection.close() - - @testing.supported('postgres', 'mysql', 'oracle', 'maxdb') + + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access') @testing.exclude('mysql', '<', (5, 0, 3)) def testnestedsubtransactionrollback(self): connection = testbase.db.connect() @@ -171,14 +171,14 @@ class TransactionTest(PersistTest): trans2.rollback() connection.execute(users.insert(), user_id=3, user_name='user3') transaction.commit() - + self.assertEquals( connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), [(1,),(3,)] ) connection.close() - @testing.supported('postgres', 'mysql', 'oracle', 'maxdb') + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access') @testing.exclude('mysql', '<', (5, 0, 3)) def testnestedsubtransactioncommit(self): connection = testbase.db.connect() @@ -189,14 +189,14 @@ class TransactionTest(PersistTest): trans2.commit() connection.execute(users.insert(), user_id=3, user_name='user3') transaction.commit() - + self.assertEquals( connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), [(1,),(2,),(3,)] ) connection.close() - @testing.supported('postgres', 'mysql', 'oracle', 'maxdb') + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access') @testing.exclude('mysql', '<', (5, 0, 3)) def testrollbacktosubtransaction(self): connection = testbase.db.connect() @@ -209,100 +209,105 @@ class TransactionTest(PersistTest): trans3.rollback() connection.execute(users.insert(), user_id=4, user_name='user4') transaction.commit() - + self.assertEquals( connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), [(1,),(4,)] ) connection.close() - - @testing.supported('postgres', 'mysql') + + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') @testing.exclude('mysql', '<', (5, 0, 3)) def testtwophasetransaction(self): connection = testbase.db.connect() - + transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name='user1') transaction.prepare() transaction.commit() - + transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=2, user_name='user2') transaction.commit() - + transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=3, user_name='user3') transaction.rollback() - + transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=4, user_name='user4') transaction.prepare() transaction.rollback() - + self.assertEquals( connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), [(1,),(2,)] ) connection.close() - @testing.supported('postgres', 'mysql') + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') @testing.exclude('mysql', '<', (5, 0, 3)) def testmixedtwophasetransaction(self): connection = testbase.db.connect() - + transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name='user1') - + transaction2 = connection.begin() connection.execute(users.insert(), user_id=2, user_name='user2') - + transaction3 = connection.begin_nested() connection.execute(users.insert(), user_id=3, user_name='user3') - + transaction4 = connection.begin() connection.execute(users.insert(), user_id=4, user_name='user4') transaction4.commit() - + transaction3.rollback() - + connection.execute(users.insert(), user_id=5, user_name='user5') - + transaction2.commit() - + transaction.prepare() - + transaction.commit() - + self.assertEquals( connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), [(1,),(2,),(5,)] ) connection.close() - - @testing.supported('postgres') + + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') + # fixme: see if this is still true and/or can be convert to fails_on() + @testing.unsupported('mysql') def testtwophaserecover(self): # MySQL recovery doesn't currently seem to work correctly # Prepared transactions disappear when connections are closed and even # when they aren't it doesn't seem possible to use the recovery id. connection = testbase.db.connect() - + transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name='user1') transaction.prepare() - + connection.close() connection2 = testbase.db.connect() - + self.assertEquals( connection2.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), [] ) - + recoverables = connection2.recover_twophase() self.assertTrue( transaction.xid in recoverables ) - + connection2.commit_prepared(transaction.xid, recover=True) self.assertEquals( @@ -310,44 +315,45 @@ class TransactionTest(PersistTest): [(1,)] ) connection2.close() - - @testing.supported('postgres', 'mysql') + + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') @testing.exclude('mysql', '<', (5, 0, 3)) def testmultipletwophase(self): conn = testbase.db.connect() - + xa = conn.begin_twophase() conn.execute(users.insert(), user_id=1, user_name='user1') xa.prepare() xa.commit() - + xa = conn.begin_twophase() conn.execute(users.insert(), user_id=2, user_name='user2') xa.prepare() xa.rollback() - + xa = conn.begin_twophase() conn.execute(users.insert(), user_id=3, user_name='user3') xa.rollback() - + xa = conn.begin_twophase() conn.execute(users.insert(), user_id=4, user_name='user4') xa.prepare() xa.commit() - + result = conn.execute(select([users.c.user_name]).order_by(users.c.user_id)) self.assertEqual(result.fetchall(), [('user1',),('user4',)]) - + conn.close() - + class AutoRollbackTest(PersistTest): def setUpAll(self): global metadata metadata = MetaData() - + def tearDownAll(self): metadata.drop_all(testbase.db) - + @testing.unsupported('sqlite') def testrollback_deadlock(self): """test that returning connections to the pool clears any object locks.""" @@ -361,8 +367,10 @@ class AutoRollbackTest(PersistTest): users.create(conn1) conn1.execute("select * from deadlock_users") conn1.close() - # without auto-rollback in the connection pool's return() logic, this deadlocks in Postgres, - # because conn1 is returned to the pool but still has a lock on "deadlock_users" + + # without auto-rollback in the connection pool's return() logic, this + # deadlocks in Postgres, because conn1 is returned to the pool but + # still has a lock on "deadlock_users". # comment out the rollback in pool/ConnectionFairy._close() to see ! users.drop(conn2) conn2.close() @@ -383,10 +391,10 @@ class TLTransactionTest(PersistTest): def tearDownAll(self): users.drop(tlengine) tlengine.dispose() - + def test_connection_close(self): """test that when connections are closed for real, transactions are rolled back and disposed.""" - + c = tlengine.contextual_connect() c.begin() assert tlengine.session.in_transaction() @@ -418,7 +426,7 @@ class TLTransactionTest(PersistTest): assert len(result.fetchall()) == 0 finally: external_connection.close() - + def testrollback(self): """test a basic rollback""" tlengine.begin() @@ -451,7 +459,7 @@ class TLTransactionTest(PersistTest): def testcommits(self): assert tlengine.connect().execute("select count(1) from query_users").scalar() == 0 - + connection = tlengine.contextual_connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') @@ -517,7 +525,7 @@ class TLTransactionTest(PersistTest): assert len(result.fetchall()) == 3 finally: external_connection.close() - + @testing.unsupported('sqlite') @testing.exclude('mysql', '<', (5, 0, 3)) def testnesting(self): @@ -540,7 +548,7 @@ class TLTransactionTest(PersistTest): @testing.exclude('mysql', '<', (5, 0, 3)) def testmixednesting(self): - """tests nesting of transactions off the TLEngine directly inside of + """tests nesting of transactions off the TLEngine directly inside of tranasctions off the connection from the TLEngine""" external_connection = tlengine.connect() self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) @@ -607,7 +615,7 @@ class TLTransactionTest(PersistTest): finally: clear_mappers() - + def testconnections(self): """tests that contextual_connect is threadlocal""" c1 = tlengine.contextual_connect() @@ -616,6 +624,7 @@ class TLTransactionTest(PersistTest): c2.close() assert c1.connection.connection is not None + class ForUpdateTest(PersistTest): def setUpAll(self): global counters, metadata @@ -635,7 +644,7 @@ class ForUpdateTest(PersistTest): con = testbase.db.connect() sel = counters.select(for_update=update_style, whereclause=counters.c.counter_id==1) - + for i in xrange(count): trans = con.begin() try: @@ -658,7 +667,8 @@ class ForUpdateTest(PersistTest): break con.close() - @testing.supported('mysql', 'oracle', 'postgres', 'maxdb') + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access') + def testqueued_update(self): """Test SELECT FOR UPDATE with concurrent modifications. @@ -719,8 +729,8 @@ class ForUpdateTest(PersistTest): thread.join() return errors - - @testing.supported('mysql', 'oracle', 'postgres', 'maxdb') + + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access') def testqueued_select(self): """Simple SELECT FOR UPDATE conflict test""" @@ -729,14 +739,15 @@ class ForUpdateTest(PersistTest): sys.stderr.write("Failure: %s\n" % e) self.assert_(len(errors) == 0) - @testing.supported('oracle', 'postgres', 'maxdb') + @testing.unsupported('sqlite', 'mysql', 'mssql', 'firebird', + 'sybase', 'access') def testnowait_select(self): """Simple SELECT FOR UPDATE NOWAIT conflict test""" errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)], update_style='nowait') self.assert_(len(errors) != 0) - + if __name__ == "__main__": - testbase.main() + testbase.main() diff --git a/test/orm/generative.py b/test/orm/generative.py index d8fb7a5a6..d72887a58 100644 --- a/test/orm/generative.py +++ b/test/orm/generative.py @@ -53,14 +53,6 @@ class GenerativeQueryTest(PersistTest): assert list(query[-5:]) == orig[-5:] assert query[10:20][5] == orig[10:20][5] - @testing.supported('mssql') - def test_slice_mssql(self): - sess = create_session(bind=testbase.db) - query = sess.query(Foo) - orig = query.all() - assert list(query[:10]) == orig[:10] - assert list(query[:10]) == orig[:10] - def test_aggregate(self): sess = create_session(bind=testbase.db) query = sess.query(Foo) @@ -70,23 +62,26 @@ class GenerativeQueryTest(PersistTest): assert query.filter(foo.c.bar<30).apply_max(foo.c.bar).first() == 29 assert query.filter(foo.c.bar<30).apply_max(foo.c.bar).one() == 29 - @testing.unsupported('mysql') def test_aggregate_1(self): - # this one fails in mysql as the result comes back as a string + if (testing.against('mysql') and + testbase.db.dialect.dbapi.version_info[:4] == (1, 2, 1, 'gamma')): + return + query = create_session(bind=testbase.db).query(Foo) assert query.filter(foo.c.bar<30).sum(foo.c.bar) == 435 - @testing.unsupported('postgres', 'mysql', 'firebird', 'mssql') + @testing.fails_on('postgres', 'mysql', 'firebird', 'mssql') def test_aggregate_2(self): query = create_session(bind=testbase.db).query(Foo) assert query.filter(foo.c.bar<30).avg(foo.c.bar) == 14.5 - @testing.supported('postgres', 'mysql', 'firebird', 'mssql') + @testing.fails_on_everything_except('sqlite', 'postgres', 'mysql', + 'firebird', 'mssql') def test_aggregate_2_int(self): query = create_session(bind=testbase.db).query(Foo) assert int(query.filter(foo.c.bar<30).avg(foo.c.bar)) == 14 - @testing.unsupported('postgres', 'mysql', 'firebird', 'mssql') + @testing.fails_on('postgres', 'mysql', 'firebird', 'mssql') def test_aggregate_3(self): query = create_session(bind=testbase.db).query(Foo) assert query.filter(foo.c.bar<30).apply_avg(foo.c.bar).first() == 14.5 @@ -206,7 +201,7 @@ class RelationsTest(AssertMixin): }) session = create_session(bind=testbase.db) query = session.query(tables.User) - x = query.select_from([tables.users.outerjoin(tables.orders).outerjoin(tables.orderitems)]).\ + x = query.select_from(tables.users.outerjoin(tables.orders).outerjoin(tables.orderitems)).\ filter(or_(tables.Order.c.order_id==None,tables.Item.c.item_id==2)) print x.compile() self.assert_result(list(x), tables.User, *tables.user_result[1:3]) diff --git a/test/orm/inheritance/manytomany.py b/test/orm/inheritance/manytomany.py index bed719d3f..7886e90ad 100644 --- a/test/orm/inheritance/manytomany.py +++ b/test/orm/inheritance/manytomany.py @@ -131,7 +131,7 @@ class InheritTest2(ORMTest): l = sess.query(Bar).select() print l[0] print l[0].foos - self.assert_result(l, Bar, + self.assert_unordered_result(l, Bar, # {'id':1, 'data':'barfoo', 'bid':1, 'foos':(Foo, [{'id':2,'data':'subfoo1'}, {'id':3,'data':'subfoo2'}])}, {'id':b.id, 'data':'barfoo', 'foos':(Foo, [{'id':f1.id,'data':'subfoo1'}, {'id':f2.id,'data':'subfoo2'}])}, ) @@ -189,11 +189,12 @@ class InheritTest3(ORMTest): b.foos.append(Foo("foo #1")) b.foos.append(Foo("foo #2")) sess.flush() - compare = repr(b) + repr(b.foos) + compare = repr(b) + repr(sorted([repr(o) for o in b.foos])) sess.clear() l = sess.query(Bar).select() print repr(l[0]) + repr(l[0].foos) - self.assert_(repr(l[0]) + repr(l[0].foos) == compare) + found = repr(l[0]) + repr(sorted([repr(o) for o in l[0].foos])) + self.assertEqual(found, compare) @testing.fails_on('maxdb') def testadvanced(self): diff --git a/test/orm/manytomany.py b/test/orm/manytomany.py index a94e9bbc4..975cb0a5f 100644 --- a/test/orm/manytomany.py +++ b/test/orm/manytomany.py @@ -141,7 +141,7 @@ class M2MTest(ORMTest): Place.mapper = mapper(Place, place, properties = { 'thingies':relation(mapper(PlaceThingy, place_thingy), lazy=False) }) - + Transition.mapper = mapper(Transition, transition, properties = dict( inputs = relation(Place.mapper, place_output, lazy=False), outputs = relation(Place.mapper, place_input, lazy=False), @@ -158,12 +158,11 @@ class M2MTest(ORMTest): sess.clear() r = sess.query(Transition).select() - self.assert_result(r, Transition, - {'name':'transition1', - 'inputs' : (Place, [{'name':'place1'}]), - 'outputs' : (Place, [{'name':'place2'}, {'name':'place3'}]) - } - ) + self.assert_unordered_result(r, Transition, + {'name': 'transition1', + 'inputs': (Place, [{'name':'place1'}]), + 'outputs': (Place, [{'name':'place2'}, {'name':'place3'}]) + }) def testbidirectional(self): """tests a many-to-many backrefs""" diff --git a/test/orm/relationships.py b/test/orm/relationships.py index 7aeade428..fe1155361 100644 --- a/test/orm/relationships.py +++ b/test/orm/relationships.py @@ -8,13 +8,19 @@ from sqlalchemy.orm.collections import collection from testlib import * class RelationTest(PersistTest): - """this is essentially an extension of the "dependency.py" topological sort test. - in this test, a table is dependent on two other tables that are otherwise unrelated to each other. - the dependency sort must insure that this childmost table is below both parent tables in the outcome - (a bug existed where this was not always the case). - while the straight topological sort tests should expose this, since the sorting can be different due - to subtle differences in program execution, this test case was exposing the bug whereas the simpler tests - were not.""" + """An extended topological sort test + + This is essentially an extension of the "dependency.py" topological sort + test. In this test, a table is dependent on two other tables that are + otherwise unrelated to each other. The dependency sort must insure that + this childmost table is below both parent tables in the outcome (a bug + existed where this was not always the case). + + While the straight topological sort tests should expose this, since the + sorting can be different due to subtle differences in program execution, + this test case was exposing the bug whereas the simpler tests were not. + """ + def setUpAll(self): global metadata, tbl_a, tbl_b, tbl_c, tbl_d @@ -77,7 +83,7 @@ class RelationTest(PersistTest): d3 = D(); d3.name = "d3"; d3.b_row = b; d3.c_row = c session.save_or_update(a) session.save_or_update(b) - + def tearDown(self): conn = testbase.db.connect() conn.drop(tbl_d) @@ -87,25 +93,29 @@ class RelationTest(PersistTest): def tearDownAll(self): metadata.drop_all(testbase.db) - + def testDeleteRootTable(self): session.flush() session.delete(a) # works as expected session.flush() - + def testDeleteMiddleTable(self): session.flush() session.delete(c) # fails session.flush() - + class RelationTest2(PersistTest): - """this test tests a relationship on a column that is included in multiple foreign keys, - as well as a self-referential relationship on a composite key where one column in the foreign key - is 'joined to itself'.""" + """Tests a relationship on a column included in multiple foreign keys. + + This test tests a relationship on a column that is included in multiple + foreign keys, as well as a self-referential relationship on a composite + key where one column in the foreign key is 'joined to itself'. + """ + def setUpAll(self): global metadata, company_tbl, employee_tbl metadata = MetaData(testbase.db) - + company_tbl = Table('company', metadata, Column('company_id', Integer, primary_key=True), Column('name', Unicode(30))) @@ -119,9 +129,9 @@ class RelationTest2(PersistTest): ForeignKeyConstraint(['company_id', 'reports_to_id'], ['employee.company_id', 'employee.emp_id'])) metadata.create_all() - + def tearDownAll(self): - metadata.drop_all() + metadata.drop_all() def testexplicit(self): """test with mappers that have fairly explicit join conditions""" @@ -133,7 +143,7 @@ class RelationTest2(PersistTest): self.company = company self.emp_id = emp_id self.reports_to = reports_to - + mapper(Company, company_tbl) mapper(Employee, employee_tbl, properties= { 'company':relation(Company, primaryjoin=employee_tbl.c.company_id==company_tbl.c.company_id, backref='employees'), @@ -141,7 +151,7 @@ class RelationTest2(PersistTest): and_( employee_tbl.c.emp_id==employee_tbl.c.reports_to_id, employee_tbl.c.company_id==employee_tbl.c.company_id - ), + ), foreignkey=[employee_tbl.c.company_id, employee_tbl.c.emp_id], backref='employees') }) @@ -185,7 +195,7 @@ class RelationTest2(PersistTest): mapper(Company, company_tbl) mapper(Employee, employee_tbl, properties= { 'company':relation(Company, backref='employees'), - 'reports_to':relation(Employee, + 'reports_to':relation(Employee, foreignkey=[employee_tbl.c.company_id, employee_tbl.c.emp_id], backref='employees') }) @@ -214,12 +224,12 @@ class RelationTest2(PersistTest): assert [x.name for x in test_e1.employees] == ['emp2', 'emp3'] assert sess.query(Employee).get([c1.company_id, 3]).reports_to.name == 'emp1' assert sess.query(Employee).get([c2.company_id, 3]).reports_to.name == 'emp5' - + class RelationTest3(PersistTest): def setUpAll(self): global jobs, pageversions, pages, metadata, Job, Page, PageVersion, PageComment import datetime - metadata = MetaData(testbase.db) + metadata = MetaData(testbase.db) jobs = Table("jobs", metadata, Column("jobno", Unicode(15), primary_key=True), Column("created", DateTime, nullable=False, default=datetime.datetime.now), @@ -310,7 +320,7 @@ class RelationTest3(PersistTest): def tearDownAll(self): clear_mappers() - metadata.drop_all() + metadata.drop_all() def testbasic(self): """test the combination of complicated join conditions with post_update""" @@ -353,7 +363,7 @@ class RelationTest4(ORMTest): """test syncrules on foreign keys that are also primary""" def define_tables(self, metadata): global tableA, tableB - tableA = Table("A", metadata, + tableA = Table("A", metadata, Column("id",Integer,primary_key=True), Column("foo",Integer,), ) @@ -373,7 +383,7 @@ class RelationTest4(ORMTest): sess = create_session() sess.save(a1) sess.flush() - + sess.delete(a1) try: sess.flush() @@ -400,7 +410,7 @@ class RelationTest4(ORMTest): assert False except exceptions.AssertionError, e: assert str(e).startswith("Dependency rule tried to blank-out primary key column 'B.id' on instance ") - + def test_no_nullPK_BtoA(self): class A(object):pass class B(object):pass @@ -413,7 +423,7 @@ class RelationTest4(ORMTest): sess = create_session() sess.save(b1) try: - # this raises an error as of r3695. in that rev, the attributes package was modified so that a + # this raises an error as of r3695. in that rev, the attributes package was modified so that a # setting of "None" shows up as a change, which in turn fires off dependency.py and then triggers # the rule. sess.flush() @@ -421,14 +431,14 @@ class RelationTest4(ORMTest): except exceptions.AssertionError, e: assert str(e).startswith("Dependency rule tried to blank-out primary key column 'B.id' on instance ") - @testing.supported('sqlite', 'mysql') + @testing.fails_on_everything_except('sqlite', 'mysql') def test_nullPKsOK_BtoA(self): # postgres cant handle a nullable PK column...? - tableC = Table('tablec', tableA.metadata, + tableC = Table('tablec', tableA.metadata, Column('id', Integer, primary_key=True), Column('a_id', Integer, ForeignKey('A.id'), primary_key=True, autoincrement=False, nullable=True)) tableC.create() - + class A(object):pass class C(object):pass mapper(C, tableC, properties={ @@ -442,9 +452,9 @@ class RelationTest4(ORMTest): sess.save(c1) # test that no error is raised. sess.flush() - + def test_delete_cascade_BtoA(self): - """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a + """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a cascade""" class A(object):pass class B(object):pass @@ -469,9 +479,9 @@ class RelationTest4(ORMTest): assert b1 not in sess sess.clear() clear_mappers() - + def test_delete_cascade_AtoB(self): - """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a + """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a cascade""" class A(object):pass class B(object):pass @@ -489,14 +499,14 @@ class RelationTest4(ORMTest): sess = create_session() sess.save(a1) sess.flush() - + sess.delete(a1) sess.flush() assert a1 not in sess assert b1 not in sess sess.clear() clear_mappers() - + def test_delete_manual_AtoB(self): class A(object):pass class B(object):pass @@ -511,7 +521,7 @@ class RelationTest4(ORMTest): sess.save(a1) sess.save(b1) sess.flush() - + sess.delete(a1) sess.delete(b1) sess.flush() @@ -540,7 +550,8 @@ class RelationTest4(ORMTest): assert b1 not in sess class RelationTest5(ORMTest): - """test a map to a select that relates to a map to the table""" + """Test a map to a select that relates to a map to the table.""" + def define_tables(self, metadata): global items items = Table('items', metadata, @@ -553,10 +564,10 @@ class RelationTest5(ORMTest): def test_basic(self): class Container(object):pass class LineItem(object):pass - + container_select = select( [items.c.policyNum, items.c.policyEffDate, items.c.type], - distinct=True, + distinct=True, ).alias('container_select') mapper(LineItem, items) @@ -593,24 +604,24 @@ class RelationTest5(ORMTest): assert len(newcon.lineItems) == 10 for old, new in zip(con.lineItems, newcon.lineItems): assert old.id == new.id - - + + class TypeMatchTest(ORMTest): """test errors raised when trying to add items whose type is not handled by a relation""" def define_tables(self, metadata): global a, b, c, d - a = Table("a", metadata, + a = Table("a", metadata, Column('aid', Integer, primary_key=True), Column('data', String(30))) - b = Table("b", metadata, + b = Table("b", metadata, Column('bid', Integer, primary_key=True), Column("a_id", Integer, ForeignKey("a.aid")), Column('data', String(30))) - c = Table("c", metadata, + c = Table("c", metadata, Column('cid', Integer, primary_key=True), Column("b_id", Integer, ForeignKey("b.bid")), Column('data', String(30))) - d = Table("d", metadata, + d = Table("d", metadata, Column('did', Integer, primary_key=True), Column("a_id", Integer, ForeignKey("a.aid")), Column('data', String(30))) @@ -621,7 +632,7 @@ class TypeMatchTest(ORMTest): mapper(A, a, properties={'bs':relation(B)}) mapper(B, b) mapper(C, c) - + a1 = A() b1 = B() c1 = C() @@ -640,7 +651,7 @@ class TypeMatchTest(ORMTest): mapper(A, a, properties={'bs':relation(B, cascade="none")}) mapper(B, b) mapper(C, c) - + a1 = A() b1 = B() c1 = C() @@ -662,7 +673,7 @@ class TypeMatchTest(ORMTest): mapper(A, a, properties={'bs':relation(B, cascade="none")}) mapper(B, b) mapper(C, c, inherits=B) - + a1 = A() b1 = B() c1 = C() @@ -677,7 +688,7 @@ class TypeMatchTest(ORMTest): assert False except exceptions.FlushError, err: assert str(err).startswith("Attempting to flush an item of type %s on collection 'A.bs (B)', which is handled by mapper 'Mapper|B|b' and does not load items of that type. Did you mean to use a polymorphic mapper for this relationship ?" % C) - + def test_m2o_nopoly_onflush(self): class A(object):pass class B(A):pass @@ -716,18 +727,18 @@ class TypeMatchTest(ORMTest): class TypedAssociationTable(ORMTest): def define_tables(self, metadata): global t1, t2, t3 - + class MySpecialType(types.TypeDecorator): impl = String def convert_bind_param(self, value, dialect): return "lala" + value def convert_result_value(self, value, dialect): return value[4:] - - t1 = Table('t1', metadata, + + t1 = Table('t1', metadata, Column('col1', MySpecialType(30), primary_key=True), Column('col2', String(30))) - t2 = Table('t2', metadata, + t2 = Table('t2', metadata, Column('col1', MySpecialType(30), primary_key=True), Column('col2', String(30))) t3 = Table('t3', metadata, @@ -736,7 +747,7 @@ class TypedAssociationTable(ORMTest): ) def testm2m(self): """test many-to-many tables with special types for candidate keys""" - + class T1(object):pass class T2(object):pass mapper(T2, t2) @@ -756,12 +767,12 @@ class TypedAssociationTable(ORMTest): sess.flush() assert t3.count().scalar() == 2 - + a.t2s.remove(c) sess.flush() - + assert t3.count().scalar() == 1 - + # TODO: move these tests to either attributes.py test or its own module class CustomCollectionsTest(ORMTest): def define_tables(self, metadata): @@ -769,7 +780,7 @@ class CustomCollectionsTest(ORMTest): sometable = Table('sometable', metadata, Column('col1',Integer, primary_key=True), Column('data', String(30))) - someothertable = Table('someothertable', metadata, + someothertable = Table('someothertable', metadata, Column('col1', Integer, primary_key=True), Column('scol1', Integer, ForeignKey(sometable.c.col1)), Column('data', String(20)) @@ -807,7 +818,7 @@ class CustomCollectionsTest(ORMTest): f = sess.query(Foo).get(f.col1) assert len(list(f.bars)) == 2 f.bars.clear() - + def testdict(self): """test that a 'dict' can be used as a collection and can lazyload.""" @@ -823,7 +834,7 @@ class CustomCollectionsTest(ORMTest): def remove(self, item): if id(item) in self: del self[id(item)] - + mapper(Foo, sometable, properties={ 'bars':relation(Bar, collection_class=AppenderDict) }) @@ -846,7 +857,7 @@ class CustomCollectionsTest(ORMTest): pass class Bar(object): def __init__(self, data): self.data = data - + mapper(Foo, sometable, properties={ 'bars':relation(Bar, collection_class=collections.column_mapped_collection(someothertable.c.data)) @@ -935,7 +946,7 @@ class CustomCollectionsTest(ORMTest): p.children[4:] = o assert control == p.children assert control == list(p.children) - + o = Child() control.insert(0, o) p.children.insert(0, o) @@ -1039,7 +1050,7 @@ class CustomCollectionsTest(ORMTest): sess.save(p1) sess.flush() sess.clear() - + p2 = sess.query(Parent).get(p1.col1) o = list(p2.children) assert len(o) == 3 @@ -1061,12 +1072,12 @@ class ViewOnlyTest(ORMTest): Column('data', String(40)), Column('t2id', Integer, ForeignKey('t2.id')) ) - + def test_basic(self): class C1(object):pass class C2(object):pass class C3(object):pass - + mapper(C1, t1, properties={ 't2s':relation(C2), 't2_view':relation(C2, viewonly=True, primaryjoin=and_(t1.c.id==t2.c.t1id, t3.c.t2id==t2.c.id, t3.c.data==t1.c.data)) @@ -1075,7 +1086,7 @@ class ViewOnlyTest(ORMTest): mapper(C3, t3, properties={ 't2':relation(C2) }) - + c1 = C1() c1.data = 'c1data' c2a = C2() @@ -1090,7 +1101,7 @@ class ViewOnlyTest(ORMTest): sess.save(c3) sess.flush() sess.clear() - + c1 = sess.query(C1).get(c1.id) assert set([x.id for x in c1.t2s]) == set([c2a.id, c2b.id]) assert set([x.id for x in c1.t2_view]) == set([c2b.id]) @@ -1144,7 +1155,7 @@ class ViewOnlyTest2(ORMTest): c1 = sess.query(C1).get(c1.t1id) assert set([x.t2id for x in c1.t2s]) == set([c2a.t2id, c2b.t2id]) assert set([x.t2id for x in c1.t2_view]) == set([c2b.t2id]) - - + + if __name__ == "__main__": - testbase.main() + testbase.main() diff --git a/test/orm/session.py b/test/orm/session.py index 7bd8b666d..777f1afee 100644 --- a/test/orm/session.py +++ b/test/orm/session.py @@ -12,15 +12,15 @@ import pickle class SessionTest(AssertMixin): def setUpAll(self): tables.create() - + def tearDownAll(self): tables.drop() - + def tearDown(self): SessionCls.close_all() tables.delete() clear_mappers() - + def setUp(self): pass @@ -80,7 +80,7 @@ class SessionTest(AssertMixin): # then see if expunge fails session.expunge(u) - + @engines.close_open_connections def test_binds_from_expression(self): """test that Session can extract Table objects from ClauseElements and match them to tables.""" @@ -88,7 +88,7 @@ class SessionTest(AssertMixin): sess = Session() sess.execute(users.insert(), params=dict(user_id=1, user_name='ed')) assert sess.execute(users.select()).fetchall() == [(1, 'ed')] - + mapper(Address, addresses) mapper(User, users, properties={ 'addresses':relation(Address, backref=backref("user", cascade="all"), cascade="all") @@ -97,7 +97,7 @@ class SessionTest(AssertMixin): sess.execute(users.insert(), params=dict(user_id=2, user_name='fred')) assert sess.execute(users.select()).fetchall() == [(1, 'ed'), (2, 'fred')] sess.close() - + @testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang @engines.close_open_connections def test_transaction(self): @@ -105,7 +105,7 @@ class SessionTest(AssertMixin): mapper(User, users) conn1 = testbase.db.connect() conn2 = testbase.db.connect() - + sess = create_session(transactional=True, bind=conn1) u = User() sess.save(u) @@ -116,7 +116,7 @@ class SessionTest(AssertMixin): assert conn1.execute("select count(1) from users").scalar() == 1 assert testbase.db.connect().execute("select count(1) from users").scalar() == 1 sess.close() - + @testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang @engines.close_open_connections def test_autoflush(self): @@ -124,7 +124,7 @@ class SessionTest(AssertMixin): mapper(User, users) conn1 = testbase.db.connect() conn2 = testbase.db.connect() - + sess = create_session(bind=conn1, transactional=True, autoflush=True) u = User() u.user_name='ed' @@ -137,7 +137,7 @@ class SessionTest(AssertMixin): assert conn1.execute("select count(1) from users").scalar() == 1 assert testbase.db.connect().execute("select count(1) from users").scalar() == 1 sess.close() - + @testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang @engines.close_open_connections def test_autoflush_unbound(self): @@ -160,14 +160,14 @@ class SessionTest(AssertMixin): except: sess.rollback() raise - + @engines.close_open_connections def test_autoflush_2(self): class User(object):pass mapper(User, users) conn1 = testbase.db.connect() conn2 = testbase.db.connect() - + sess = create_session(bind=conn1, transactional=True, autoflush=True) u = User() u.user_name='ed' @@ -184,7 +184,7 @@ class SessionTest(AssertMixin): mapper(User, users, properties={ 'addresses':relation(Address) }) - + sess = create_session(transactional=True, autoflush=True) u = sess.query(User).get(8) newad = Address() @@ -198,8 +198,8 @@ class SessionTest(AssertMixin): assert u.user_name == 'ed' assert len(u.addresses) == 3 assert newad not in u.addresses - - + + @engines.close_open_connections def test_external_joined_transaction(self): class User(object):pass @@ -207,7 +207,7 @@ class SessionTest(AssertMixin): conn = testbase.db.connect() trans = conn.begin() sess = create_session(bind=conn, transactional=True, autoflush=True) - sess.begin() + sess.begin() u = User() sess.save(u) sess.flush() @@ -216,7 +216,8 @@ class SessionTest(AssertMixin): assert len(sess.query(User).select()) == 0 sess.close() - @testing.supported('postgres', 'mysql') + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') @engines.close_open_connections def test_external_nested_transaction(self): class User(object):pass @@ -228,20 +229,21 @@ class SessionTest(AssertMixin): u1 = User() sess.save(u1) sess.flush() - - sess.begin_nested() + + sess.begin_nested() u2 = User() sess.save(u2) sess.flush() sess.rollback() - - trans.commit() + + trans.commit() assert len(sess.query(User).select()) == 1 except: conn.close() raise - - @testing.supported('postgres', 'mysql') + + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') @engines.close_open_connections def test_heavy_nesting(self): session = create_session(bind=testbase.db) @@ -250,7 +252,7 @@ class SessionTest(AssertMixin): session.connection().execute("insert into users (user_name) values ('user1')") session.begin() - + session.begin_nested() session.connection().execute("insert into users (user_name) values ('user2')") @@ -262,9 +264,10 @@ class SessionTest(AssertMixin): session.commit() assert session.connection().execute("select count(1) from users").scalar() == 2 - - - @testing.supported('postgres', 'mysql') + + + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') @testing.exclude('mysql', '<', (5, 0, 3)) def test_twophase(self): # TODO: mock up a failure condition here @@ -273,7 +276,7 @@ class SessionTest(AssertMixin): class Address(object):pass mapper(User, users) mapper(Address, addresses) - + engine2 = create_engine(testbase.db.url) sess = create_session(transactional=False, autoflush=False, twophase=True) sess.bind_mapper(User, testbase.db) @@ -288,12 +291,12 @@ class SessionTest(AssertMixin): engine2.dispose() assert users.count().scalar() == 1 assert addresses.count().scalar() == 1 - + def test_joined_transaction(self): class User(object):pass mapper(User, users) sess = create_session(transactional=True, autoflush=True) - sess.begin() + sess.begin() u = User() sess.save(u) sess.flush() @@ -302,7 +305,8 @@ class SessionTest(AssertMixin): assert len(sess.query(User).select()) == 0 sess.close() - @testing.supported('postgres', 'mysql') + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') @testing.exclude('mysql', '<', (5, 0, 3)) def test_nested_transaction(self): class User(object):pass @@ -320,13 +324,14 @@ class SessionTest(AssertMixin): sess.save(u2) sess.flush() - sess.rollback() - + sess.rollback() + sess.commit() assert len(sess.query(User).select()) == 1 sess.close() - @testing.supported('postgres', 'mysql') + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') @testing.exclude('mysql', '<', (5, 0, 3)) def test_nested_autotrans(self): class User(object):pass @@ -341,8 +346,8 @@ class SessionTest(AssertMixin): u2 = User() sess.save(u2) sess.flush() - - sess.rollback() + + sess.rollback() sess.commit() assert len(sess.query(User).select()) == 1 @@ -360,25 +365,25 @@ class SessionTest(AssertMixin): sess.save(u) sess.flush() assert transaction.get_or_add(testbase.db) is transaction.get_or_add(c) is c - + try: transaction.add(testbase.db.connect()) assert False - except exceptions.InvalidRequestError, e: + except exceptions.InvalidRequestError, e: assert str(e) == "Session already has a Connection associated for the given Connection's Engine" try: transaction.get_or_add(testbase.db.connect()) assert False - except exceptions.InvalidRequestError, e: + except exceptions.InvalidRequestError, e: assert str(e) == "Session already has a Connection associated for the given Connection's Engine" try: transaction.add(testbase.db) assert False - except exceptions.InvalidRequestError, e: + except exceptions.InvalidRequestError, e: assert str(e) == "Session already has a Connection associated for the given Engine" - + transaction.rollback() assert len(sess.query(User).select()) == 0 sess.close() @@ -405,7 +410,7 @@ class SessionTest(AssertMixin): assert c.scalar("select count(1) from users") == 1 c.execute("delete from users") assert c.scalar("select count(1) from users") == 0 - + c = testbase.db.connect() trans = c.begin() @@ -417,16 +422,16 @@ class SessionTest(AssertMixin): trans.commit() assert not c.in_transaction() assert c.scalar("select count(1) from users") == 1 - - + + @engines.close_open_connections def test_save_update_delete(self): - + s = create_session() class User(object): pass mapper(User, users) - + user = User() try: @@ -440,13 +445,13 @@ class SessionTest(AssertMixin): assert False except exceptions.InvalidRequestError, e: assert str(e) == "Instance 'User@%s' is not persisted" % hex(id(user)) - + s.save(user) s.flush() user = s.query(User).one() s.expunge(user) assert user not in s - + # modify outside of session, assert changes remain/get saved user.user_name = "fred" s.update(user) @@ -457,54 +462,54 @@ class SessionTest(AssertMixin): assert s.query(User).count() == 1 user = s.query(User).one() assert user.user_name == 'fred' - + # ensure its not dirty if no changes occur s.clear() assert user not in s s.update(user) assert user in s assert user not in s.dirty - + try: s.save(user) assert False except exceptions.InvalidRequestError, e: assert str(e) == "Instance 'User@%s' is already persistent" % hex(id(user)) - + s2 = create_session() try: s2.delete(user) assert False except exceptions.InvalidRequestError, e: assert "is already attached to session" in str(e) - + u2 = s2.query(User).get(user.user_id) try: s.delete(u2) assert False except exceptions.InvalidRequestError, e: assert "already persisted with a different identity" in str(e) - + s.delete(user) s.flush() assert user not in s assert s.query(User).count() == 0 - + def test_is_modified(self): s = create_session() class User(object):pass class Address(object):pass - + mapper(User, users, properties={'addresses':relation(Address)}) mapper(Address, addresses) - + # save user u = User() u.user_name = 'fred' s.save(u) s.flush() s.clear() - + user = s.query(User).one() assert user not in s.dirty assert not s.is_modified(user) @@ -517,21 +522,21 @@ class SessionTest(AssertMixin): s.flush() assert user not in s.dirty assert not s.is_modified(user) - + a = Address() user.addresses.append(a) assert user in s.dirty assert s.is_modified(user) assert not s.is_modified(user, include_collections=False) - - + + def test_weak_ref(self): """test the weak-referencing identity map, which strongly-references modified items.""" - + s = create_session() class User(object):pass mapper(User, users) - + # save user s.save(User()) s.flush() @@ -541,26 +546,26 @@ class SessionTest(AssertMixin): gc.collect() assert len(s.identity_map) == 0 assert len(s.identity_map.data) == 0 - + user = s.query(User).one() user.user_name = 'fred' user = None gc.collect() assert len(s.identity_map) == 1 assert len(s.identity_map.data) == 1 - + s.flush() gc.collect() assert len(s.identity_map) == 0 assert len(s.identity_map.data) == 0 - + assert s.query(User).one().user_name == 'fred' - + def test_strong_ref(self): s = create_session(weak_identity_map=False) class User(object):pass mapper(User, users) - + # save user s.save(User()) s.flush() @@ -620,7 +625,7 @@ class SessionTest(AssertMixin): s.flush() self.assert_(s.prune() == 0) self.assert_(len(s.identity_map) == 0) - + def test_no_save_cascade(self): mapper(Address, addresses) mapper(User, users, properties=dict( @@ -638,15 +643,15 @@ class SessionTest(AssertMixin): s.clear() assert s.query(User).one().user_id == u.user_id assert s.query(Address).first() is None - + clear_mappers() - + tables.delete() mapper(Address, addresses) mapper(User, users, properties=dict( addresses=relation(Address, cascade="all", backref=backref("user", cascade="none")) )) - + s = create_session() u = User() a = Address() @@ -693,7 +698,7 @@ class SessionTest(AssertMixin): self._assert_key(key, (User, (1,), None)) key = s.identity_key(User, row=row, entity_name="en") self._assert_key(key, (User, (1,), "en")) - + def test_extension(self): mapper(User, users) log = [] @@ -714,9 +719,9 @@ class SessionTest(AssertMixin): u = User() sess.save(u) sess.flush() - + assert log == ['before_flush', 'after_flush', 'before_commit', 'after_commit', 'after_flush_postexec'] - + log = [] sess = create_session(transactional=True, extension=MyExt()) u = User() @@ -732,56 +737,56 @@ class SessionTest(AssertMixin): log = [] sess.commit() assert log == ['before_commit', 'after_commit'] - + def test_pickled_update(self): mapper(User, users) sess1 = create_session() sess2 = create_session() - + u1 = User() sess1.save(u1) - + try: sess2.save(u1) assert False except exceptions.InvalidRequestError, e: assert "already attached to session" in str(e) - + u2 = pickle.loads(pickle.dumps(u1)) - + sess2.save(u2) - + def test_duplicate_update(self): mapper(User, users) Session = sessionmaker() - sess = Session() + sess = Session() u1 = User() sess.save(u1) sess.flush() assert u1.user_id is not None - + sess.expunge(u1) - + assert u1 not in sess - + u2 = sess.query(User).get(u1.user_id) assert u2 is not None and u2 is not u1 assert u2 in sess - + self.assertRaises(Exception, lambda: sess.update(u1)) sess.expunge(u2) assert u2 not in sess - + u1.user_name = "John" u2.user_name = "Doe" sess.update(u1) assert u1 in sess - + sess.flush() - + sess.clear() u3 = sess.query(User).get(u1.user_id) @@ -791,14 +796,14 @@ class ScopedSessionTest(ORMTest): def define_tables(self, metadata): global table, table2 - table = Table('sometable', metadata, + table = Table('sometable', metadata, Column('id', Integer, primary_key=True), Column('data', String(30))) - table2 = Table('someothertable', metadata, + table2 = Table('someothertable', metadata, Column('id', Integer, primary_key=True), Column('someid', None, ForeignKey('sometable.id')) ) - + def test_basic(self): Session = scoped_session(sessionmaker()) @@ -816,18 +821,18 @@ class ScopedSessionTest(ORMTest): Session.save(s) Session.commit() Session.remove() - + assert SomeObject(id=1, data="hello", options=[SomeOtherObject(someid=1)]) == Session.query(SomeObject).one() - - + + class ScopedMapperTest(PersistTest): def setUpAll(self): global metadata, table, table2 metadata = MetaData(testbase.db) - table = Table('sometable', metadata, + table = Table('sometable', metadata, Column('id', Integer, primary_key=True), Column('data', String(30))) - table2 = Table('someothertable', metadata, + table2 = Table('someothertable', metadata, Column('id', Integer, primary_key=True), Column('someid', None, ForeignKey('sometable.id')) ) @@ -837,9 +842,9 @@ class ScopedMapperTest(PersistTest): global SomeObject, SomeOtherObject class SomeObject(object):pass class SomeOtherObject(object):pass - + global Session - + Session = scoped_session(create_session) Session.mapper(SomeObject, table, properties={ 'options':relation(SomeOtherObject) @@ -856,7 +861,7 @@ class ScopedMapperTest(PersistTest): def tearDownAll(self): metadata.drop_all() - + def tearDown(self): for table in metadata.table_iterator(reverse=True): table.delete().execute() @@ -878,12 +883,12 @@ class ScopedMapperTest(PersistTest): pass Session.mapper(Bar, table2, extension=[ext]) assert hasattr(Bar, 'query') - + class Baz(object): pass Session.mapper(Baz, table2, extension=ext) assert hasattr(Baz, 'query') - + def test_validating_constructor(self): s2 = SomeObject(someid=12) s3 = SomeOtherObject(someid=123, bogus=345) @@ -910,42 +915,42 @@ class ScopedMapperTest(PersistTest): class ScopedMapperTest2(ORMTest): def define_tables(self, metadata): global table, table2 - table = Table('sometable', metadata, + table = Table('sometable', metadata, Column('id', Integer, primary_key=True), Column('data', String(30)), Column('type', String(30)) - + ) - table2 = Table('someothertable', metadata, + table2 = Table('someothertable', metadata, Column('id', Integer, primary_key=True), Column('someid', None, ForeignKey('sometable.id')), Column('somedata', String(30)), ) - + def test_inheritance(self): def expunge_list(l): for x in l: Session.expunge(x) return l - + class BaseClass(fixtures.Base): pass class SubClass(BaseClass): pass - + Session = scoped_session(sessionmaker()) Session.mapper(BaseClass, table, polymorphic_identity='base', polymorphic_on=table.c.type) Session.mapper(SubClass, table2, polymorphic_identity='sub', inherits=BaseClass) - + b = BaseClass(data='b1') s = SubClass(data='s1', somedata='somedata') Session.commit() Session.clear() - + assert expunge_list([BaseClass(data='b1'), SubClass(data='s1', somedata='somedata')]) == BaseClass.query.all() assert expunge_list([SubClass(data='s1', somedata='somedata')]) == SubClass.query.all() - - -if __name__ == "__main__": + + +if __name__ == "__main__": testbase.main() diff --git a/test/orm/unitofwork.py b/test/orm/unitofwork.py index 443c5638a..87a565f3e 100644 --- a/test/orm/unitofwork.py +++ b/test/orm/unitofwork.py @@ -1,4 +1,7 @@ # coding: utf-8 + +"""Tests unitofwork operations.""" + import testbase import pickleable from sqlalchemy import * @@ -6,9 +9,8 @@ from sqlalchemy import exceptions, sql from sqlalchemy.orm import * from testlib import * from testlib.tables import * -from testlib import tables, fixtures +from testlib import engines, tables, fixtures -"""tests unitofwork operations""" # TODO: convert suite to not use Session.mapper, use fixtures.Base # with explicit session.save() @@ -17,12 +19,12 @@ mapper = Session.mapper class UnitOfWorkTest(object): pass - + class HistoryTest(ORMTest): metadata = tables.metadata def define_tables(self, metadata): pass - + def test_backref(self): s = Session() class User(object):pass @@ -31,7 +33,7 @@ class HistoryTest(ORMTest): m = mapper(User, users, properties = dict( addresses = relation(am, backref='user', lazy=False)) ) - + u = User(_sa_session=s) a = Address(_sa_session=s) a.user = u @@ -42,7 +44,7 @@ class HistoryTest(ORMTest): s.close() u = s.query(m).select()[0] print u.addresses[0].user - + class VersioningTest(ORMTest): def define_tables(self, metadata): global version_table @@ -61,7 +63,7 @@ class VersioningTest(ORMTest): f1 = Foo(value='f1', _sa_session=s) f2 = Foo(value='f2', _sa_session=s) s.commit() - + f1.value='f1rev2' s.commit() s2 = Session() @@ -82,11 +84,11 @@ class VersioningTest(ORMTest): # Only dialects with a sane rowcount can detect the ConcurrentModificationError if testbase.db.dialect.supports_sane_rowcount: assert success - + s.close() f1 = s.query(Foo).get(f1.id) f2 = s.query(Foo).get(f2.id) - + f1_s.value='f1rev4' s2.commit() @@ -100,7 +102,7 @@ class VersioningTest(ORMTest): success = True if testbase.db.dialect.supports_sane_multi_rowcount: assert success - + @engines.close_open_connections def test_versioncheck(self): """test that query.with_lockmode performs a 'version check' on an already loaded instance""" @@ -123,11 +125,11 @@ class VersioningTest(ORMTest): s1.query(Foo).load(f1s1.id) # now assert version OK s1.query(Foo).with_lockmode('read').get(f1s1.id) - + # assert brand new load is OK too s1.close() s1.query(Foo).with_lockmode('read').get(f1s1.id) - + @engines.close_open_connections def test_noversioncheck(self): """test that query.with_lockmode works OK when the mapper has no version id col""" @@ -141,7 +143,7 @@ class VersioningTest(ORMTest): f1s2 = s2.query(Foo).with_lockmode('read').get(f1s1.id) assert f1s2.id == f1s1.id assert f1s2.value == f1s1.value - + class UnicodeTest(ORMTest): def define_tables(self, metadata): global uni_table, uni_table2 @@ -164,18 +166,18 @@ class UnicodeTest(ORMTest): self.assert_(t1.txt == txt) Session.commit() self.assert_(t1.txt == txt) - + def test_relation(self): class Test(object): def __init__(self, txt): self.txt = txt class Test2(object):pass - + mapper(Test, uni_table, properties={ 't2s':relation(Test2) }) mapper(Test2, uni_table2) - + txt = u"\u0160\u0110\u0106\u010c\u017d" t1 = Test(txt=txt) t1.t2s.append(Test2()) @@ -186,11 +188,14 @@ class UnicodeTest(ORMTest): assert len(t1.t2s) == 2 class UnicodeSchemaTest(ORMTest): - @testing.supported('sqlite', 'postgres') - def define_tables(self, metadata): - global t1, t2, t3 + __unsupported_on__ = ('oracle', 'mssql', 'firebird', 'sybase', + 'access', 'maxdb') + __excluded_on__ = (('mysql', '<', (4, 1, 1)),) + + metadata = MetaData(engines.utf8_engine()) - #unicode_bind = utf8_engine() + def define_tables(self, metadata): + global t1, t2 t1 = Table('unitable1', metadata, Column(u'méil', Integer, primary_key=True, key='a'), @@ -204,8 +209,7 @@ class UnicodeSchemaTest(ORMTest): Column(u'\u6e2c\u8a66_2', Integer, key="e"), test_needs_fk=True, ) - - @testing.supported('sqlite', 'postgres') + def test_mapping(self): class A(fixtures.Base):pass class B(fixtures.Base):pass @@ -234,7 +238,6 @@ class UnicodeSchemaTest(ORMTest): assert new_a1.t2s[0].d == b1.d Session.clear() - @testing.supported('sqlite', 'postgres') def test_inheritance_mapping(self): class A(fixtures.Base):pass class B(A):pass @@ -249,7 +252,7 @@ class UnicodeSchemaTest(ORMTest): # breaks the comparison ????? l = Session.query(A).all() assert [A(b=5), B(e=7)] == l - + class MutableTypesTest(ORMTest): def define_tables(self, metadata): global table @@ -302,7 +305,7 @@ class MutableTypesTest(ORMTest): {'mutabletest_id': f1.id, 'val': u'hi', 'data':f1.data} ), ]) - + def test_nocomparison(self): """test that types marked as MutableType get changes detected on them when the type has no __eq__ method""" class Foo(object):pass @@ -310,11 +313,11 @@ class MutableTypesTest(ORMTest): f1 = Foo() f1.data = pickleable.BarWithoutCompare(4,5) Session.commit() - + def go(): Session.commit() self.assert_sql_count(testbase.db, go, 0) - + Session.close() f2 = Session.query(Foo).get_by(id=f1.id) @@ -327,7 +330,7 @@ class MutableTypesTest(ORMTest): def go(): Session.commit() self.assert_sql_count(testbase.db, go, 1) - + Session.close() f3 = Session.query(Foo).get_by(id=f1.id) print f2.data, f3.data @@ -336,10 +339,10 @@ class MutableTypesTest(ORMTest): def go(): Session.commit() self.assert_sql_count(testbase.db, go, 0) - + def test_unicode(self): """test that two equivalent unicode values dont get flagged as changed. - + apparently two equal unicode objects dont compare via "is" in all cases, so this tests the compare_values() call on types.String and its usage via types.Unicode.""" class Foo(object):pass @@ -362,7 +365,7 @@ class MutableTypesTest2(ORMTest): Column('id', Integer, Sequence('mutableidseq', optional=True), primary_key=True), Column('data', PickleType(comparator=operator.eq)), ) - + def test_dicts(self): """dictionaries dont pickle the same way twice, sigh.""" @@ -390,25 +393,25 @@ class MutableTypesTest2(ORMTest): def go(): Session.commit() self.assert_sql_count(testbase.db, go, 1) - + Session.clear() f = Session.query(Foo).get(f1.id) assert f.data == [{'personne': {'nom': u'Smith', 'pers_id': 1, 'prenom': u'john', 'civilite': u'Mr', \ 'int_3': False, 'int_2': False, 'int_1': u'23', 'VenSoir': False, 'str_1': u'Test', \ 'SamMidi': False, 'str_2': u'chien', 'DimMidi': False, 'SamSoir': True, 'SamAcc': False}}] - + class PKTest(ORMTest): def define_tables(self, metadata): global table, table2, table3 table = Table( - 'multipk', metadata, + 'multipk', metadata, Column('multi_id', Integer, Sequence("multi_id_seq", optional=True), primary_key=True), Column('multi_rev', Integer, primary_key=True), Column('name', String(50), nullable=False), Column('value', String(100)) ) - + table2 = Table('multipk2', metadata, Column('pk_col_1', String(30), primary_key=True), Column('pk_col_2', String(30), primary_key=True), @@ -422,8 +425,8 @@ class PKTest(ORMTest): ) # not supported on sqlite since sqlite's auto-pk generation only works with - # single column primary keys - @testing.unsupported('sqlite') + # single column primary keys + @testing.fails_on('sqlite') def test_primarykey(self): class Entry(object): pass @@ -436,7 +439,7 @@ class PKTest(ORMTest): Session.close() e2 = Query(Entry).get((e.multi_id, 2)) self.assert_(e is not e2 and e._instance_key == e2._instance_key) - + # this one works with sqlite since we are manually setting up pk values def test_manualpk(self): class Entry(object): @@ -447,7 +450,7 @@ class PKTest(ORMTest): e.pk_col_2 = 'pk1_related' e.data = 'im the data' Session.commit() - + def test_keypks(self): import datetime class Entity(object): @@ -463,7 +466,7 @@ class PKTest(ORMTest): class ForeignPKTest(ORMTest): """tests mapper detection of the relationship direction when parent/child tables are joined on their primary keys""" - + def define_tables(self, metadata): global people, peoplesites @@ -472,13 +475,13 @@ class ForeignPKTest(ORMTest): Column('firstname', String(10)), Column('lastname', String(10)), ) - + peoplesites = Table("peoplesites", metadata, - Column('person', String(10), ForeignKey("people.person"), + Column('person', String(10), ForeignKey("people.person"), primary_key=True), Column('site', String(10)), ) - + def test_basic(self): class PersonSite(object):pass class Person(object):pass @@ -486,7 +489,7 @@ class ForeignPKTest(ORMTest): m2 = mapper(Person, people, properties = { - 'sites' : relation(PersonSite), + 'sites' : relation(PersonSite), }, ) compile_mappers() @@ -539,12 +542,12 @@ class ClauseAttributesTest(ORMTest): assert u.name == 'test2' assert u.counter == 2 self.assert_sql_count(testbase.db, go, 1) - + sess.clear() u = sess.query(User).get(u.id) assert u.name == 'test2' assert u.counter == 2 - + @testing.unsupported('mssql') def test_insert(self): class User(object): @@ -581,7 +584,7 @@ class PassiveDeletesTest(ORMTest): pass class MyOtherClass(object): pass - + mapper(MyOtherClass, myothertable) mapper(MyClass, mytable, properties={ @@ -621,15 +624,15 @@ class ExtraPassiveDeletesTest(ORMTest): ForeignKeyConstraint(['parent_id'],['mytable.id']), # no CASCADE, the same as ON DELETE RESTRICT test_needs_fk=True, ) - + def test_assertions(self): class MyClass(object): pass class MyOtherClass(object): pass - + mapper(MyOtherClass, myothertable) - + try: mapper(MyClass, mytable, properties={ 'children':relation(MyOtherClass, passive_deletes='all', cascade="all") @@ -637,14 +640,14 @@ class ExtraPassiveDeletesTest(ORMTest): assert False except exceptions.ArgumentError, e: assert str(e) == "Can't set passive_deletes='all' in conjunction with 'delete' or 'delete-orphan' cascade" - + @testing.unsupported('sqlite') def test_extra_passive(self): class MyClass(object): pass class MyOtherClass(object): pass - + mapper(MyOtherClass, myothertable) mapper(MyClass, mytable, properties={ @@ -669,12 +672,12 @@ class ExtraPassiveDeletesTest(ORMTest): except exceptions.DBAPIError: assert True - + class DefaultTest(ORMTest): """tests that when saving objects whose table contains DefaultGenerators, either python-side, preexec or database-side, - the newly saved instances receive all the default values either through a post-fetch or getting the pre-exec'ed + the newly saved instances receive all the default values either through a post-fetch or getting the pre-exec'ed defaults back from the engine.""" - + def define_tables(self, metadata): db = testbase.db use_string_defaults = db.engine.__module__.endswith('postgres') or db.engine.__module__.endswith('oracle') or db.engine.__module__.endswith('sqlite') @@ -687,7 +690,7 @@ class DefaultTest(ORMTest): hohotype = Integer self.hohoval = 9 self.althohoval = 15 - + global default_table default_table = Table('default_test', metadata, Column('id', Integer, Sequence("dt_seq", optional=True), primary_key=True), @@ -696,42 +699,42 @@ class DefaultTest(ORMTest): Column('foober', String(30), default="im foober", onupdate="im the update") ) - + def test_insert(self): class Hoho(object):pass mapper(Hoho, default_table) - + h1 = Hoho(hoho=self.althohoval) h2 = Hoho(counter=12) h3 = Hoho(hoho=self.althohoval, counter=12) h4 = Hoho() h5 = Hoho(foober='im the new foober') Session.commit() - + self.assert_(h1.hoho==self.althohoval) self.assert_(h3.hoho==self.althohoval) - + def go(): # test deferred load of attribues, one select per instance self.assert_(h2.hoho==h4.hoho==h5.hoho==self.hohoval) self.assert_sql_count(testbase.db, go, 3) - + def go(): self.assert_(h1.counter == h4.counter==h5.counter==7) self.assert_sql_count(testbase.db, go, 1) - + def go(): self.assert_(h3.counter == h2.counter == 12) self.assert_(h2.foober == h3.foober == h4.foober == 'im foober') self.assert_(h5.foober=='im the new foober') self.assert_sql_count(testbase.db, go, 0) - + Session.close() - + l = Hoho.query.all() - + (h1, h2, h3, h4, h5) = l - + self.assert_(h1.hoho==self.althohoval) self.assert_(h3.hoho==self.althohoval) self.assert_(h2.hoho==h4.hoho==h5.hoho==self.hohoval) @@ -739,21 +742,21 @@ class DefaultTest(ORMTest): self.assert_(h1.counter == h4.counter==h5.counter==7) self.assert_(h2.foober == h3.foober == h4.foober == 'im foober') self.assert_(h5.foober=='im the new foober') - + def test_insert_nopostfetch(self): # populates the PassiveDefaults explicitly so there is no "post-update" class Hoho(object):pass mapper(Hoho, default_table) - + h1 = Hoho(hoho="15", counter="15") - + Session.commit() def go(): self.assert_(h1.hoho=="15") self.assert_(h1.counter=="15") self.assert_(h1.foober=="im foober") self.assert_sql_count(testbase.db, go, 0) - + def test_update(self): class Hoho(object):pass mapper(Hoho, default_table) @@ -766,7 +769,7 @@ class DefaultTest(ORMTest): class OneToManyTest(ORMTest): metadata = tables.metadata - + def define_tables(self, metadata): pass @@ -834,7 +837,7 @@ class OneToManyTest(ORMTest): u2.user_name = 'user2modified' u1.addresses.append(a3) del u1.addresses[0] - self.assert_sql(testbase.db, lambda: Session.commit(), + self.assert_sql(testbase.db, lambda: Session.commit(), [ ( "UPDATE users SET user_name=:user_name WHERE users.user_id = :users_user_id", @@ -951,10 +954,10 @@ class OneToManyTest(ORMTest): m2 = mapper(Address, addresses) m = mapper(User, users, properties={ 'boston_addresses' : relation(m2, primaryjoin= - and_(users.c.user_id==addresses.c.user_id, + and_(users.c.user_id==addresses.c.user_id, addresses.c.email_address.like('%boston%'))), 'newyork_addresses' : relation(m2, primaryjoin= - and_(users.c.user_id==addresses.c.user_id, + and_(users.c.user_id==addresses.c.user_id, addresses.c.email_address.like('%newyork%'))), }) u = User() @@ -971,7 +974,7 @@ class SaveTest(ORMTest): metadata = tables.metadata def define_tables(self, metadata): pass - + def setUp(self): super(SaveTest, self).setUp() keywords.insert().execute( @@ -993,7 +996,7 @@ class SaveTest(ORMTest): u2.user_name = 'savetester2' Session.save(u) - + Session.flush([u]) Session.commit() @@ -1001,7 +1004,7 @@ class SaveTest(ORMTest): nu = Session.get(m, u.user_id) print "U: " + repr(u) + "NU: " + repr(nu) self.assert_(u is nu) - + # clear out the identity map, so next get forces a SELECT Session.close() @@ -1009,7 +1012,7 @@ class SaveTest(ORMTest): nu = Session.get(m, u.user_id) self.assert_(u is not nu and u.user_id == nu.user_id and nu.user_name == 'savetester') Session.close() - + # change first users name and save Session.update(u) u.user_name = 'modifiedname' @@ -1022,7 +1025,7 @@ class SaveTest(ORMTest): print repr(u.user_id), repr(userlist[0].user_id), repr(userlist[0].user_name) self.assert_(u.user_id == userlist[0].user_id and userlist[0].user_name == 'modifiedname') self.assert_(u2.user_id == userlist[1].user_id and userlist[1].user_name == 'savetester2') - + def test_synonym(self): class User(object): def _get_name(self): @@ -1030,11 +1033,11 @@ class SaveTest(ORMTest): def _set_name(self, name): self.user_name = name + ":User" name = property(_get_name, _set_name) - + mapper(User, users, properties={ 'name':synonym('user_name') }) - + u = User() u.name = "some name" assert u.name == 'User:some name:User' @@ -1043,15 +1046,15 @@ class SaveTest(ORMTest): Session.clear() u = Session.query(User).first() assert u.name == 'User:some name:User' - + def test_lazyattr_commit(self): """tests that when a lazy-loaded list is unloaded, and a commit occurs, that the 'passive' call on that list does not blow away its value""" - + m1 = mapper(User, users, properties = { 'addresses': relation(mapper(Address, addresses)) }) - + u = User() u.addresses.append(Address()) u.addresses.append(Address()) @@ -1064,10 +1067,10 @@ class SaveTest(ORMTest): u1.user_name = 'newname' Session.commit() self.assert_(len(u1.addresses) == 4) - + def test_inherits(self): m1 = mapper(User, users) - + class AddressUser(User): """a user object that also has the users mailing address.""" pass @@ -1077,20 +1080,20 @@ class SaveTest(ORMTest): AddressUser, addresses, inherits=m1 ) - + au = AddressUser() Session.commit() Session.close() l = Session.query(AddressUser).selectone() self.assert_(l.user_id == au.user_id and l.address_id == au.address_id) - + def test_deferred(self): """test deferred column operations""" - + mapper(User, users, properties={ 'user_name':deferred(users.c.user_name) }) - + # dont set deferred attribute, commit session u = User() u.user_id=42 @@ -1101,7 +1104,7 @@ class SaveTest(ORMTest): Session.commit() assert list(Session.execute(users.select(), mapper=User)) == [(42, 'some name')] Session.clear() - + # assert that a set operation doesn't trigger a load operation u = Session.query(User).filter(User.user_name=='some name').one() def go(): @@ -1109,19 +1112,19 @@ class SaveTest(ORMTest): self.assert_sql_count(testbase.db, go, 0) Session.flush() assert list(Session.execute(users.select(), mapper=User)) == [(42, 'some other name')] - + Session.clear() - + # test assigning None to an unloaded deferred also works u = Session.query(User).filter(User.user_name=='some other name').one() u.user_name = None Session.flush() assert list(Session.execute(users.select(), mapper=User)) == [(42, None)] - - + + # why no support on oracle ? because oracle doesn't save - # "blank" strings; it saves a single space character. - @testing.unsupported('oracle') + # "blank" strings; it saves a single space character. + @testing.unsupported('oracle') def test_dont_update_blanks(self): mapper(User, users) u = User() @@ -1138,13 +1141,13 @@ class SaveTest(ORMTest): """tests a save of an object where each instance spans two tables. also tests redefinition of the keynames for the column properties.""" usersaddresses = sql.join(users, addresses, users.c.user_id == addresses.c.user_id) - m = mapper(User, usersaddresses, + m = mapper(User, usersaddresses, properties = dict( - email = addresses.c.email_address, + email = addresses.c.email_address, foo_id = [users.c.user_id, addresses.c.user_id], ) ) - + u = User() u.user_name = 'multitester' u.email = 'multi@test.org' @@ -1153,10 +1156,10 @@ class SaveTest(ORMTest): id = m.primary_key_from_instance(u) Session.close() - + u = Session.get(User, id) assert u.user_name == 'multitester' - + usertable = users.select(users.c.user_id.in_([u.foo_id])).execute().fetchall() self.assertEqual(usertable[0].values(), [u.foo_id, 'multitester']) addresstable = addresses.select(addresses.c.address_id.in_([u.address_id])).execute().fetchall() @@ -1174,14 +1177,14 @@ class SaveTest(ORMTest): Session.close() u = Session.get(User, id) assert u.user_name == 'imnew' - + def test_history_get(self): """tests that the history properly lazy-fetches data when it wasnt otherwise loaded""" mapper(User, users, properties={ 'addresses':relation(Address, cascade="all, delete-orphan") }) mapper(Address, addresses) - + u = User() u.addresses.append(Address()) u.addresses.append(Address()) @@ -1192,12 +1195,12 @@ class SaveTest(ORMTest): Session.commit() assert users.count().scalar() == 0 assert addresses.count().scalar() == 0 - - - + + + def test_batchmode(self): """test the 'batch=False' flag on mapper()""" - + class TestExtension(MapperExtension): def before_insert(self, mapper, connection, instance): self.current_instance = instance @@ -1209,9 +1212,9 @@ class SaveTest(ORMTest): u2 = User() u2.username = 'user2' Session.commit() - + clear_mappers() - + m = mapper(User, users, extension=TestExtension()) u1 = User() u1.username = 'user1' @@ -1222,14 +1225,14 @@ class SaveTest(ORMTest): assert False except AssertionError: assert True - - + + class ManyToOneTest(ORMTest): metadata = tables.metadata - + def define_tables(self, metadata): pass - + def test_m2o_onetoone(self): # TODO: put assertion in here !!! m = mapper(Address, addresses, properties = dict( @@ -1249,7 +1252,7 @@ class ManyToOneTest(ORMTest): a.user = User() a.user.user_name = elem['user_name'] objects.append(a) - + Session.commit() objects[2].email_address = 'imnew@foo.bar' objects[3].user = User() @@ -1263,11 +1266,11 @@ class ManyToOneTest(ORMTest): "UPDATE email_addresses SET email_address=:email_address WHERE email_addresses.address_id = :email_addresses_address_id": lambda ctx: {'email_address': 'imnew@foo.bar', 'email_addresses_address_id': objects[2].address_id} , - + "UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id": lambda ctx: {'user_id': objects[3].user.user_id, 'email_addresses_address_id': objects[3].address_id} }, - + ], with_sequences=[ ( @@ -1278,11 +1281,11 @@ class ManyToOneTest(ORMTest): "UPDATE email_addresses SET email_address=:email_address WHERE email_addresses.address_id = :email_addresses_address_id": lambda ctx: {'email_address': 'imnew@foo.bar', 'email_addresses_address_id': objects[2].address_id} , - + "UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id": lambda ctx: {'user_id': objects[3].user.user_id, 'email_addresses_address_id': objects[3].address_id} }, - + ]) l = sql.select([users, addresses], sql.and_(users.c.user_id==addresses.c.user_id, addresses.c.address_id==a.address_id)).execute() assert l.fetchone().values() == [a.user.user_id, 'asdf8d', a.address_id, a.user_id, 'theater@foo.com'] @@ -1296,7 +1299,7 @@ class ManyToOneTest(ORMTest): a1.email_address = 'emailaddress1' u1 = User() u1.user_name='user1' - + a1.user = u1 Session.commit() Session.close() @@ -1357,7 +1360,7 @@ class ManyToOneTest(ORMTest): u1 = Session.query(User).get(u1.user_id) u2 = Session.query(User).get(u2.user_id) assert a1.user is u1 - + a1.user = u2 Session.commit() Session.close() @@ -1390,13 +1393,13 @@ class ManyToOneTest(ORMTest): assert sess.query(Address).get(a1.address_id).user is None assert sess.query(User).get(u1.user_id).addresses == [] - + class ManyToManyTest(ORMTest): metadata = tables.metadata - + def define_tables(self, metadata): pass - + def test_manytomany(self): items = orderitems @@ -1436,7 +1439,7 @@ class ManyToManyTest(ORMTest): item.keywords.append(k) Session.commit() - + l = Session.query(m).select(items.c.item_name.in_([e['item_name'] for e in data[1:]]), order_by=[items.c.item_name]) self.assert_result(l, *data) @@ -1456,7 +1459,7 @@ class ManyToManyTest(ORMTest): lambda ctx: [{'item_id': objects[5].item_id, 'keyword_id': k.keyword_id}] ) ], - + with_sequences = [ { "UPDATE items SET item_name=:item_name WHERE items.item_id = :items_item_id": @@ -1478,12 +1481,12 @@ class ManyToManyTest(ORMTest): "DELETE FROM itemkeywords WHERE itemkeywords.item_id = :item_id AND itemkeywords.keyword_id = :keyword_id", [{'item_id': objects[5].item_id, 'keyword_id': dkid}] ), - ( + ( "INSERT INTO itemkeywords (item_id, keyword_id) VALUES (:item_id, :keyword_id)", lambda ctx: [{'item_id': objects[2].item_id, 'keyword_id': k.keyword_id}] ) ]) - + Session.delete(objects[3]) Session.commit() @@ -1502,7 +1505,7 @@ class ManyToManyTest(ORMTest): i.keywords.append(k1) i.keywords.append(k2) Session.commit() - + assert itemkeywords.count().scalar() == 2 i.keywords = [] Session.commit() @@ -1510,19 +1513,19 @@ class ManyToManyTest(ORMTest): def test_scalar(self): """test that dependency.py doesnt try to delete an m2m relation referencing None.""" - + mapper(Keyword, keywords) mapper(Item, orderitems, properties = dict( keyword = relation(Keyword, secondary=itemkeywords, uselist=False), )) - + i = Item() Session.commit() Session.delete(i) Session.commit() - - + + def test_manytomany_update(self): """tests some history operations on a many to many""" @@ -1533,7 +1536,7 @@ class ManyToManyTest(ORMTest): return other.__class__ == Keyword and other.name == self.name def __repr__(self): return "Keyword(%s, %s)" % (getattr(self, 'keyword_id', 'None'), self.name) - + mapper(Keyword, keywords) mapper(Item, orderitems, properties = dict( keywords = relation(Keyword, secondary=itemkeywords, lazy=False, order_by=keywords.c.name), @@ -1546,18 +1549,18 @@ class ManyToManyTest(ORMTest): item.keywords.append(k2) item.keywords.append(k3) Session.commit() - + item.keywords = [] item.keywords.append(k1) item.keywords.append(k2) Session.commit() - + Session.close() item = Session.query(Item).get(item.item_id) print [k1, k2] print item.keywords assert item.keywords == [k1, k2] - + def test_association(self): """basic test of an association object""" class IKAssociation(object): @@ -1578,29 +1581,29 @@ class ManyToManyTest(ORMTest): )) data = [Item, - {'item_name': 'a_item1', 'keywords' : (IKAssociation, + {'item_name': 'a_item1', 'keywords' : (IKAssociation, [ {'keyword' : (Keyword, {'name': 'big'})}, - {'keyword' : (Keyword, {'name': 'green'})}, + {'keyword' : (Keyword, {'name': 'green'})}, {'keyword' : (Keyword, {'name': 'purple'})}, {'keyword' : (Keyword, {'name': 'round'})} ] - ) + ) }, - {'item_name': 'a_item2', 'keywords' : (IKAssociation, + {'item_name': 'a_item2', 'keywords' : (IKAssociation, [ {'keyword' : (Keyword, {'name': 'huge'})}, - {'keyword' : (Keyword, {'name': 'violet'})}, + {'keyword' : (Keyword, {'name': 'violet'})}, {'keyword' : (Keyword, {'name': 'yellow'})} ] - ) + ) }, - {'item_name': 'a_item3', 'keywords' : (IKAssociation, + {'item_name': 'a_item3', 'keywords' : (IKAssociation, [ {'keyword' : (Keyword, {'name': 'big'})}, - {'keyword' : (Keyword, {'name': 'blue'})}, + {'keyword' : (Keyword, {'name': 'blue'})}, ] - ) + ) } ] for elem in data[1:]: @@ -1621,9 +1624,9 @@ class ManyToManyTest(ORMTest): Session.close() l = Item.query.filter(items.c.item_name.in_([e['item_name'] for e in data[1:]])).order_by(items.c.item_name).all() self.assert_result(l, *data) - + class SaveTest2(ORMTest): - + def define_tables(self, metadata): global users, addresses users = Table('users', metadata, @@ -1636,7 +1639,7 @@ class SaveTest2(ORMTest): Column('rel_user_id', Integer, ForeignKey(users.c.user_id)), Column('email_address', String(20)), ) - + def test_m2o_nonmatch(self): m = mapper(Address, addresses, properties = dict( user = relation(mapper(User, users), lazy = True, uselist = False) @@ -1670,7 +1673,7 @@ class SaveTest2(ORMTest): {'rel_user_id': 2, 'email_address': 'thesdf@asdf.com'} ) ], - + with_sequences = [ ( "INSERT INTO users (user_id, user_name) VALUES (:user_id, :user_name)", @@ -1737,19 +1740,19 @@ class SaveTest3(ORMTest): class BooleanColTest(ORMTest): def define_tables(self, metadata): global t - t =Table('t1', metadata, + t =Table('t1', metadata, Column('id', Integer, primary_key=True), Column('name', String(30)), Column('value', Boolean)) - + def test_boolean(self): # use the regular mapper from sqlalchemy.orm import mapper - + class T(fixtures.Base): pass mapper(T, t) - + sess = create_session() t1 = T(value=True, name="t1") t2 = T(value=False, name="t2") @@ -1757,9 +1760,9 @@ class BooleanColTest(ORMTest): sess.save(t1) sess.save(t2) sess.save(t3) - + sess.flush() - + for clear in (False, True): if clear: sess.clear() @@ -1770,7 +1773,7 @@ class BooleanColTest(ORMTest): if clear: sess.clear() self.assertEquals(sess.query(T).filter(T.value==False).all(), [T(value=False, name="t2")]) - + t2 = sess.query(T).get(t2.id) t2.value = True sess.flush() @@ -1778,16 +1781,16 @@ class BooleanColTest(ORMTest): t2.value = False sess.flush() self.assertEquals(sess.query(T).filter(T.value==True).all(), [T(value=True, name="t1"),T(value=True, name="t3")]) - - + + class RowSwitchTest(ORMTest): def define_tables(self, metadata): global t1, t2, t3, t1t3 - + global T1, T2, T3 - + Session.remove() - + # parent t1 = Table('t1', metadata, Column('id', Integer, primary_key=True), @@ -1811,7 +1814,7 @@ class RowSwitchTest(ORMTest): Column('t1id', Integer, ForeignKey('t1.id'),nullable=False), Column('t3id', Integer, ForeignKey('t3.id'),nullable=False), ) - + class T1(fixtures.Base): pass @@ -1820,29 +1823,29 @@ class RowSwitchTest(ORMTest): class T3(fixtures.Base): pass - + def tearDown(self): Session.remove() super(RowSwitchTest, self).tearDown() - + def test_onetomany(self): mapper(T1, t1, properties={ 't2s':relation(T2, cascade="all, delete-orphan") }) mapper(T2, t2) - + sess = Session(autoflush=False) - + o1 = T1(data='some t1', id=1) o1.t2s.append(T2(data='some t2', id=1)) o1.t2s.append(T2(data='some other t2', id=2)) - + sess.save(o1) sess.flush() - + assert list(sess.execute(t1.select(), mapper=T1)) == [(1, 'some t1')] assert list(sess.execute(t2.select(), mapper=T1)) == [(1, 'some t2', 1), (2, 'some other t2', 1)] - + o2 = T1(data='some other t1', id=o1.id, t2s=[ T2(data='third t2', id=3), T2(data='fourth t2', id=4), @@ -1885,7 +1888,7 @@ class RowSwitchTest(ORMTest): assert list(sess.execute(t3.select(), mapper=T1)) == [(3, 'third t3'), (4, 'fourth t3')] def test_manytoone(self): - + mapper(T2, t2, properties={ 't1':relation(T1) }) @@ -1910,8 +1913,8 @@ class RowSwitchTest(ORMTest): assert list(sess.execute(t1.select(), mapper=T1)) == [(2, 'some other t1')] assert list(sess.execute(t2.select(), mapper=T1)) == [(1, 'some other t2', 2)] - - - + + + if __name__ == "__main__": - testbase.main() + testbase.main() diff --git a/test/profiling/zoomark.py b/test/profiling/zoomark.py index 05dfea290..ef0aec76f 100644 --- a/test/profiling/zoomark.py +++ b/test/profiling/zoomark.py @@ -19,13 +19,14 @@ class ZooMarkTest(testing.AssertMixin): function calls made during the test. The count can vary between Python 2.4 and 2.5. """ - - @testing.supported('postgres') - @profiling.profiled('create', call_range=(1500, 1880), always=True) + + __only_on__ = 'postgres' + + @profiling.profiled('create', call_range=(1500, 1880), always=True) def test_1_create_tables(self): global metadata metadata = MetaData(testbase.db) - + Zoo = Table('Zoo', metadata, Column('ID', Integer, Sequence('zoo_id_seq'), primary_key=True, index=True), Column('Name', Unicode(255)), @@ -34,7 +35,7 @@ class ZooMarkTest(testing.AssertMixin): Column('LastEscape', DateTime), Column('Admission', Float), ) - + Animal = Table('Animal', metadata, Column('ID', Integer, Sequence('animal_id_seq'), primary_key=True), Column('ZooID', Integer, ForeignKey('Zoo.ID'), index=True), @@ -48,13 +49,12 @@ class ZooMarkTest(testing.AssertMixin): Column('AlternateFoodID', Integer), ) metadata.create_all() - - @testing.supported('postgres') + @profiling.profiled('populate', call_range=(2700, 3700), always=True) def test_1a_populate(self): Zoo = metadata.tables['Zoo'] Animal = metadata.tables['Animal'] - + wap = Zoo.insert().execute(Name=u'Wild Animal Park', Founded=datetime.date(2000, 1, 1), # 59 can give rounding errors with divmod, which @@ -63,101 +63,98 @@ class ZooMarkTest(testing.AssertMixin): LastEscape=datetime.datetime(2004, 7, 29, 5, 6, 7), Admission=4.95, ).last_inserted_ids()[0] - + sdz = Zoo.insert().execute(Name =u'San Diego Zoo', Founded = datetime.date(1935, 9, 13), Opens = datetime.time(9, 0, 0), Admission = 0, ).last_inserted_ids()[0] - + Zoo.insert().execute( Name = u'Montr\xe9al Biod\xf4me', Founded = datetime.date(1992, 6, 19), Opens = datetime.time(9, 0, 0), Admission = 11.75, ) - + seaworld = Zoo.insert().execute( Name =u'Sea_World', Admission = 60).last_inserted_ids()[0] - + # Let's add a crazy futuristic Zoo to test large date values. lp = Zoo.insert().execute(Name =u'Luna Park', Founded = datetime.date(2072, 7, 17), Opens = datetime.time(0, 0, 0), Admission = 134.95, ).last_inserted_ids()[0] - + # Animals leopardid = Animal.insert().execute(Species=u'Leopard', Lifespan=73.5, ).last_inserted_ids()[0] Animal.update(Animal.c.ID==leopardid).execute(ZooID=wap, LastEscape=datetime.datetime(2004, 12, 21, 8, 15, 0, 999907)) - + lion = Animal.insert().execute(Species=u'Lion', ZooID=wap).last_inserted_ids()[0] Animal.insert().execute(Species=u'Slug', Legs=1, Lifespan=.75) - + tiger = Animal.insert().execute(Species=u'Tiger', ZooID=sdz ).last_inserted_ids()[0] - + # Override Legs.default with itself just to make sure it works. Animal.insert().execute(Species=u'Bear', Legs=4) Animal.insert().execute(Species=u'Ostrich', Legs=2, Lifespan=103.2) Animal.insert().execute(Species=u'Centipede', Legs=100) - + emp = Animal.insert().execute(Species=u'Emperor Penguin', Legs=2, ZooID=seaworld).last_inserted_ids()[0] adelie = Animal.insert().execute(Species=u'Adelie Penguin', Legs=2, ZooID=seaworld).last_inserted_ids()[0] - + Animal.insert().execute(Species=u'Millipede', Legs=1000000, ZooID=sdz) - + # Add a mother and child to test relationships bai_yun = Animal.insert().execute(Species=u'Ape', Name=u'Bai Yun', Legs=2).last_inserted_ids()[0] Animal.insert().execute(Species=u'Ape', Name=u'Hua Mei', Legs=2, MotherID=bai_yun) - - @testing.supported('postgres') + @profiling.profiled('insert', call_range=(150, 220), always=True) def test_2_insert(self): Animal = metadata.tables['Animal'] i = Animal.insert() for x in xrange(ITERATIONS): tick = i.execute(Species=u'Tick', Name=u'Tick %d' % x, Legs=8) - - @testing.supported('postgres') + @profiling.profiled('properties', call_range=(2300, 3030), always=True) def test_3_properties(self): Zoo = metadata.tables['Zoo'] Animal = metadata.tables['Animal'] - + def fullobject(select): """Iterate over the full result row.""" return list(select.execute().fetchone()) - + for x in xrange(ITERATIONS): # Zoos WAP = fullobject(Zoo.select(Zoo.c.Name==u'Wild Animal Park')) SDZ = fullobject(Zoo.select(Zoo.c.Founded==datetime.date(1935, 9, 13))) Biodome = fullobject(Zoo.select(Zoo.c.Name==u'Montr\xe9al Biod\xf4me')) seaworld = fullobject(Zoo.select(Zoo.c.Admission == float(60))) - + # Animals leopard = fullobject(Animal.select(Animal.c.Species ==u'Leopard')) ostrich = fullobject(Animal.select(Animal.c.Species==u'Ostrich')) millipede = fullobject(Animal.select(Animal.c.Legs==1000000)) ticks = fullobject(Animal.select(Animal.c.Species==u'Tick')) - - @testing.supported('postgres') + @profiling.profiled('expressions', call_range=(9200, 12050), always=True) def test_4_expressions(self): Zoo = metadata.tables['Zoo'] Animal = metadata.tables['Animal'] - + def fulltable(select): """Iterate over the full result table.""" return [list(row) for row in select.execute().fetchall()] - + for x in xrange(ITERATIONS): assert len(fulltable(Zoo.select())) == 5 assert len(fulltable(Animal.select())) == ITERATIONS + 12 @@ -169,31 +166,31 @@ class ZooMarkTest(testing.AssertMixin): assert len(fulltable(Animal.select(Animal.c.Lifespan > 70))) == 2 assert len(fulltable(Animal.select(Animal.c.Species.startswith(u'L')))) == 2 assert len(fulltable(Animal.select(Animal.c.Species.endswith(u'pede')))) == 2 - + assert len(fulltable(Animal.select(Animal.c.LastEscape != None))) == 1 assert len(fulltable(Animal.select(None == Animal.c.LastEscape ))) == ITERATIONS + 11 - + # In operator (containedby) assert len(fulltable(Animal.select(Animal.c.Species.like(u'%pede%')))) == 2 assert len(fulltable(Animal.select(Animal.c.Species.in_([u'Lion', u'Tiger', u'Bear'])))) == 3 - + # Try In with cell references class thing(object): pass pet, pet2 = thing(), thing() pet.Name, pet2.Name =u'Slug', u'Ostrich' assert len(fulltable(Animal.select(Animal.c.Species.in_([pet.Name, pet2.Name])))) == 2 - + # logic and other functions assert len(fulltable(Animal.select(Animal.c.Species.like(u'Slug')))) == 1 assert len(fulltable(Animal.select(Animal.c.Species.like(u'%pede%')))) == 2 name =u'Lion' assert len(fulltable(Animal.select(func.length(Animal.c.Species) == len(name) ))) == ITERATIONS + 3 - + assert len(fulltable(Animal.select(Animal.c.Species.like(u'%i%') ))) == ITERATIONS + 7 - + # Test now(), today(), year(), month(), day() assert len(fulltable(Zoo.select(Zoo.c.Founded != None and Zoo.c.Founded < func.current_timestamp(_type=Date)))) == 3 @@ -201,19 +198,18 @@ class ZooMarkTest(testing.AssertMixin): assert len(fulltable(Animal.select(func.date_part('year', Animal.c.LastEscape) == 2004))) == 1 assert len(fulltable(Animal.select(func.date_part('month', Animal.c.LastEscape) == 12))) == 1 assert len(fulltable(Animal.select(func.date_part('day', Animal.c.LastEscape) == 21))) == 1 - - @testing.supported('postgres') + @profiling.profiled('aggregates', call_range=(800, 1170), always=True) def test_5_aggregates(self): Animal = metadata.tables['Animal'] Zoo = metadata.tables['Zoo'] - + for x in xrange(ITERATIONS): # views view = select([Animal.c.Legs]).execute().fetchall() legs = [x[0] for x in view] legs.sort() - + expected = {'Leopard': 73.5, 'Slug': .75, 'Tiger': None, @@ -230,7 +226,7 @@ class ZooMarkTest(testing.AssertMixin): for species, lifespan in select([Animal.c.Species, Animal.c.Lifespan] ).execute().fetchall(): assert lifespan == expected[species] - + expected = [u'Montr\xe9al Biod\xf4me', 'Wild Animal Park'] e = select([Zoo.c.Name], and_(Zoo.c.Founded != None, @@ -238,17 +234,16 @@ class ZooMarkTest(testing.AssertMixin): Zoo.c.Founded >= datetime.date(1990, 1, 1))) values = [val[0] for val in e.execute().fetchall()] assert set(values) == set(expected) - + # distinct legs = [x[0] for x in select([Animal.c.Legs], distinct=True).execute().fetchall()] legs.sort() - - @testing.supported('postgres') + @profiling.profiled('editing', call_range=(1050, 1180), always=True) def test_6_editing(self): Zoo = metadata.tables['Zoo'] - + for x in xrange(ITERATIONS): # Edit SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo').execute().fetchone() @@ -257,44 +252,43 @@ class ZooMarkTest(testing.AssertMixin): Founded = datetime.date(1900, 1, 1), Opens = datetime.time(7, 30, 0), Admission = "35.00") - + # Test edits SDZ = Zoo.select(Zoo.c.Name==u'The San Diego Zoo').execute().fetchone() assert SDZ['Founded'] == datetime.date(1900, 1, 1), SDZ['Founded'] - + # Change it back Zoo.update(Zoo.c.ID==SDZ['ID']).execute( Name =u'San Diego Zoo', Founded = datetime.date(1935, 9, 13), Opens = datetime.time(9, 0, 0), Admission = "0") - + # Test re-edits SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo').execute().fetchone() assert SDZ['Founded'] == datetime.date(1935, 9, 13) - - @testing.supported('postgres') + @profiling.profiled('multiview', call_range=(1900, 2300), always=True) def test_7_multiview(self): Zoo = metadata.tables['Zoo'] Animal = metadata.tables['Animal'] - + def fulltable(select): """Iterate over the full result table.""" return [list(row) for row in select.execute().fetchall()] - + for x in xrange(ITERATIONS): za = fulltable(select([Zoo.c.ID] + list(Animal.c), Zoo.c.Name ==u'San Diego Zoo', from_obj = [join(Zoo, Animal)])) - + SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo') - + e = fulltable(select([Zoo.c.ID, Animal.c.ID], and_(Zoo.c.Name==u'San Diego Zoo', Animal.c.Species==u'Leopard'), from_obj = [join(Zoo, Animal)])) - + # Now try the same query with INNER, LEFT, and RIGHT JOINs. e = fulltable(select([Zoo.c.Name, Animal.c.Species], from_obj=[join(Zoo, Animal)])) @@ -303,7 +297,6 @@ class ZooMarkTest(testing.AssertMixin): e = fulltable(select([Zoo.c.Name, Animal.c.Species], from_obj=[outerjoin(Animal, Zoo)])) - @testing.supported('postgres') def test_8_drop(self): metadata.drop_all() diff --git a/test/sql/defaults.py b/test/sql/defaults.py index a50250e9b..bce499686 100644 --- a/test/sql/defaults.py +++ b/test/sql/defaults.py @@ -5,6 +5,7 @@ from sqlalchemy import exceptions, schema, util from sqlalchemy.orm import mapper, create_session from testlib import * + class DefaultTest(PersistTest): def setUpAll(self): @@ -13,7 +14,7 @@ class DefaultTest(PersistTest): db = testbase.db metadata = MetaData(db) default_generator = {'x':50} - + def mydefault(): default_generator['x'] += 1 return default_generator['x'] @@ -21,7 +22,7 @@ class DefaultTest(PersistTest): def myupdate_with_ctx(ctx): conn = ctx.connection return conn.execute(select([text('13')])).scalar() - + def mydefault_using_connection(ctx): conn = ctx.connection try: @@ -30,10 +31,10 @@ class DefaultTest(PersistTest): # ensure a "close()" on this connection does nothing, # since its a "branched" connection conn.close() - + use_function_defaults = testing.against('postgres', 'oracle') is_oracle = testing.against('oracle') - + # select "count(1)" returns different results on different DBs # also correct for "current_date" compatible as column default, value differences currenttime = func.current_date(type_=Date, bind=db) @@ -63,32 +64,32 @@ class DefaultTest(PersistTest): def1 = def2 = "3" ts = 3 deftype = Integer - + t = Table('default_test1', metadata, # python function Column('col1', Integer, primary_key=True, default=mydefault), - + # python literal Column('col2', String(20), default="imthedefault", onupdate="im the update"), - + # preexecute expression Column('col3', Integer, default=func.length('abcdef'), onupdate=func.length('abcdefghijk')), - + # SQL-side default from sql expression Column('col4', deftype, PassiveDefault(def1)), - + # SQL-side default from literal expression Column('col5', deftype, PassiveDefault(def2)), - + # preexecute + update timestamp Column('col6', Date, default=currenttime, onupdate=currenttime), - + Column('boolcol1', Boolean, default=True), Column('boolcol2', Boolean, default=False), - + # python function which uses ExecutionContext Column('col7', Integer, default=mydefault_using_connection, onupdate=myupdate_with_ctx), - + # python builtin Column('col8', Date, default=datetime.date.today, onupdate=datetime.date.today) ) @@ -96,11 +97,11 @@ class DefaultTest(PersistTest): def tearDownAll(self): t.drop() - + def tearDown(self): default_generator['x'] = 50 t.delete().execute() - + def testargsignature(self): ex_msg = \ "ColumnDefault Python function takes zero or one positional arguments" @@ -122,7 +123,7 @@ class DefaultTest(PersistTest): for fn in fn3, fn4, fn5, fn6, fn7: c = ColumnDefault(fn) - + def teststandalone(self): c = testbase.db.engine.contextual_connect() x = c.execute(t.c.col1.default) @@ -132,7 +133,7 @@ class DefaultTest(PersistTest): self.assert_(y == 'imthedefault') self.assert_(z == f) self.assert_(f2==11) - + def testinsert(self): r = t.insert().execute() assert r.lastrow_has_defaults() @@ -141,7 +142,7 @@ class DefaultTest(PersistTest): r = t.insert(inline=True).execute() assert r.lastrow_has_defaults() assert util.Set(r.context.postfetch_cols) == util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]) - + t.insert().execute() t.insert().execute() @@ -149,8 +150,8 @@ class DefaultTest(PersistTest): l = t.select().execute() today = datetime.date.today() self.assert_(l.fetchall() == [ - (51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), - (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), + (51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), + (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), ]) @@ -172,7 +173,7 @@ class DefaultTest(PersistTest): t.insert(values={'col3':50}).execute() l = t.select().execute() self.assert_(l.fetchone()['col3'] == 50) - + def testupdatemany(self): # MySQL-Python 1.2.2 breaks functions in execute_many :( if (testing.against('mysql') and @@ -184,8 +185,7 @@ class DefaultTest(PersistTest): t.update(t.c.col1==bindparam('pkval')).execute( {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False}, ) - - + t.update(t.c.col1==bindparam('pkval')).execute( {'pkval':51,}, {'pkval':52,}, @@ -196,8 +196,7 @@ class DefaultTest(PersistTest): ctexec = currenttime.scalar() today = datetime.date.today() self.assert_(l.fetchall() == [(51, 'im the update', f2, ts, ts, ctexec, False, False, 13, today), (52, 'im the update', f2, ts, ts, ctexec, True, False, 13, today), (53, 'im the update', f2, ts, ts, ctexec, True, False, 13, today)]) - - + def testupdate(self): r = t.insert().execute() pk = r.last_inserted_ids()[0] @@ -207,7 +206,7 @@ class DefaultTest(PersistTest): l = l.fetchone() self.assert_(l == (pk, 'im the update', f2, None, None, ctexec, True, False, 13, datetime.date.today())) self.assert_(f2==11) - + def testupdatevalues(self): r = t.insert().execute() pk = r.last_inserted_ids()[0] @@ -216,11 +215,11 @@ class DefaultTest(PersistTest): l = l.fetchone() self.assert_(l['col3'] == 55) - @testing.supported('postgres') + @testing.fails_on_everything_except('postgres') def testpassiveoverride(self): - """primarily for postgres, tests that when we get a primary key column back + """primarily for postgres, tests that when we get a primary key column back from reflecting a table which has a default value on it, we pre-execute - that PassiveDefault upon insert, even though PassiveDefault says + that PassiveDefault upon insert, even though PassiveDefault says "let the database execute this", because in postgres we must have all the primary key values in memory before insert; otherwise we cant locate the just inserted row.""" @@ -246,21 +245,21 @@ class DefaultTest(PersistTest): class PKDefaultTest(PersistTest): def setUpAll(self): global metadata, t1, t2 - + metadata = MetaData(testbase.db) - - t2 = Table('t2', metadata, + + t2 = Table('t2', metadata, Column('nextid', Integer)) - + t1 = Table('t1', metadata, Column('id', Integer, primary_key=True, default=select([func.max(t2.c.nextid)]).as_scalar()), Column('data', String(30))) - + metadata.create_all() - + def tearDownAll(self): metadata.drop_all() - + @testing.unsupported('mssql') def test_basic(self): t2.insert().execute(nextid=1) @@ -270,14 +269,14 @@ class PKDefaultTest(PersistTest): t2.insert().execute(nextid=2) r = t1.insert().execute(data='there') assert r.last_inserted_ids() == [2] - - + + class AutoIncrementTest(PersistTest): def setUp(self): global aitable, aimeta - + aimeta = MetaData(testbase.db) - aitable = Table("aitest", aimeta, + aitable = Table("aitest", aimeta, Column('id', Integer, Sequence('ai_id_seq', optional=True), primary_key=True), Column('int1', Integer), @@ -287,16 +286,19 @@ class AutoIncrementTest(PersistTest): def tearDown(self): aimeta.drop_all() - @testing.supported('postgres', 'mysql', 'maxdb') + # should fail everywhere... was: @supported('postgres', 'mysql', 'maxdb') + @testing.fails_on('sqlite') def testnonautoincrement(self): + # sqlite INT primary keys can be non-unique! (only for ints) meta = MetaData(testbase.db) - nonai_table = Table("nonaitest", meta, + nonai_table = Table("nonaitest", meta, Column('id', Integer, autoincrement=False, primary_key=True), Column('data', String(20))) nonai_table.create(checkfirst=True) try: try: - # postgres will fail on first row, mysql fails on second row + # postgres + mysql strict will fail on first row, + # mysql in legacy mode fails on second row nonai_table.insert().execute(data='row 1') nonai_table.insert().execute(data='row 2') assert False @@ -306,7 +308,7 @@ class AutoIncrementTest(PersistTest): nonai_table.insert().execute(id=1, data='row 1') finally: - nonai_table.drop() + nonai_table.drop() # TODO: add coverage for increment on a secondary column in a key def _test_autoincrement(self, bind): @@ -362,7 +364,7 @@ class AutoIncrementTest(PersistTest): def test_autoincrement_fk(self): if not testbase.db.dialect.supports_pk_autoincrement: return True - + metadata = MetaData(testbase.db) # No optional sequence here. @@ -379,13 +381,14 @@ class AutoIncrementTest(PersistTest): metadata.drop_all() - class SequenceTest(PersistTest): - @testing.supported('postgres', 'oracle', 'maxdb') + __unsupported_on__ = ('sqlite', 'mysql', 'mssql', 'firebird', + 'sybase', 'access') + def setUpAll(self): global cartitems, sometable, metadata metadata = MetaData(testbase.db) - cartitems = Table("cartitems", metadata, + cartitems = Table("cartitems", metadata, Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True), Column("description", String(40)), Column("createdate", DateTime()) @@ -393,12 +396,12 @@ class SequenceTest(PersistTest): sometable = Table( 'Manager', metadata, Column('obj_id', Integer, Sequence('obj_id_seq'), ), Column('name', String, ), - Column('id', Integer, Sequence('Manager_id_seq', optional=True), primary_key=True), + Column('id', Integer, Sequence('Manager_id_seq', optional=True), + primary_key=True), ) - + metadata.create_all() - - @testing.supported('postgres', 'oracle', 'maxdb') + def testseqnonpk(self): """test sequences fire off as defaults on non-pk columns""" @@ -415,7 +418,6 @@ class SequenceTest(PersistTest): (4, "name4", 4), ] - @testing.supported('postgres', 'oracle', 'maxdb') def testsequence(self): cartitems.insert().execute(description='hi') cartitems.insert().execute(description='there') @@ -427,11 +429,11 @@ class SequenceTest(PersistTest): assert select([func.count(cartitems.c.cart_id)], and_(cartitems.c.description == 'lala', cartitems.c.cart_id == id_)).scalar() == 1 - + cartitems.select().execute().fetchall() - - @testing.supported('postgres', 'oracle') + + @testing.fails_on('maxdb') # maxdb db-api seems to double-execute NEXTVAL internally somewhere, # throwing off the numbers for these tests... def test_implicit_sequence_exec(self): @@ -443,7 +445,7 @@ class SequenceTest(PersistTest): finally: s.drop() - @testing.supported('postgres', 'oracle') + @testing.fails_on('maxdb') def teststandalone_explicit(self): s = Sequence("my_sequence") s.create(bind=testbase.db) @@ -452,23 +454,22 @@ class SequenceTest(PersistTest): self.assert_(x == 1) finally: s.drop(testbase.db) - - @testing.supported('postgres', 'oracle', 'maxdb') + def test_checkfirst(self): s = Sequence("my_sequence") s.create(testbase.db, checkfirst=False) s.create(testbase.db, checkfirst=True) s.drop(testbase.db, checkfirst=False) s.drop(testbase.db, checkfirst=True) - - @testing.supported('postgres', 'oracle') + + @testing.fails_on('maxdb') def teststandalone2(self): x = cartitems.c.cart_id.sequence.execute() self.assert_(1 <= x <= 4) - - @testing.supported('postgres', 'oracle', 'maxdb') - def tearDownAll(self): + + def tearDownAll(self): metadata.drop_all() + if __name__ == "__main__": testbase.main() diff --git a/test/sql/functions.py b/test/sql/functions.py index 177a308b4..1103245ea 100644 --- a/test/sql/functions.py +++ b/test/sql/functions.py @@ -1,15 +1,24 @@ import testbase import datetime from sqlalchemy import * -from sqlalchemy import exceptions, sql +from sqlalchemy import databases, exceptions, sql from sqlalchemy.sql.compiler import BIND_TEMPLATES from sqlalchemy.engine import default from sqlalchemy import types as sqltypes from testlib import * -# TODO: add a helper function to testlib for this -from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql -dialects = [x.dialect() for x in [sqlite, postgres, mysql, oracle, firebird, mssql]] +from sqlalchemy.databases import * +# every dialect in databases.__all__ is expected to pass these tests. +dialects = [getattr(databases, mod).dialect() + for mod in databases.__all__ + # fixme! + if mod not in ('access',)] + +# if the configured dialect is out-of-tree or not yet in __all__, include it +# too. +if testbase.db.name not in databases.__all__: + dialects.append(testbase.db.dialect) + class CompileTest(SQLCompileTest): def test_compile(self): @@ -22,7 +31,7 @@ class CompileTest(SQLCompileTest): else: self.assert_compile(func.nosuchfunction(), "nosuchfunction()", dialect=dialect) self.assert_compile(func.char_length('foo'), "char_length(%s)" % bindtemplate % {'name':'param_1', 'position':1}, dialect=dialect) - + def test_constructor(self): try: func.current_timestamp('somearg') @@ -41,14 +50,14 @@ class CompileTest(SQLCompileTest): assert False except TypeError: assert True - + def test_typing(self): assert isinstance(func.coalesce(datetime.date(2007, 10, 5), datetime.date(2005, 10, 15)).type, sqltypes.Date) assert isinstance(func.coalesce(None, datetime.date(2005, 10, 15)).type, sqltypes.Date) - + assert isinstance(func.concat("foo", "bar").type, sqltypes.String) - + class ExecuteTest(PersistTest): def test_standalone_execute(self): @@ -123,11 +132,10 @@ class ExecuteTest(PersistTest): t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute() print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone() assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo") - finally: meta.drop_all() - @testing.supported('postgres') + @testing.fails_on_everything_except('postgres') def test_as_from(self): # TODO: shouldnt this work on oracle too ? x = testbase.db.func.current_date().execute().scalar() @@ -150,4 +158,3 @@ def exec_sorted(statement, *args, **kw): if __name__ == '__main__': testbase.main() -
\ No newline at end of file diff --git a/test/sql/query.py b/test/sql/query.py index 4979fecd7..d0b24a9cc 100644 --- a/test/sql/query.py +++ b/test/sql/query.py @@ -264,19 +264,11 @@ class QueryTest(PersistTest): r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall() self.assert_(r==[(6, 'ralph'), (7, 'fido')]) - @testing.supported('mssql') - @testing.fails_on('maxdb') - def test_select_limit_nooffset(self): - try: - r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall() - assert False # InvalidRequestError should have been raised - except exceptions.InvalidRequestError: - pass - - @testing.unsupported('mysql') + @testing.exclude('mysql', '<', (5, 0, 0)) def test_scalar_select(self): """test that scalar subqueries with labels get their type propigated to the result set.""" - # mysql and/or mysqldb has a bug here, type isnt propigated for scalar subquery. + # mysql and/or mysqldb has a bug here, type isn't propagated for scalar + # subquery. datetable = Table('datetable', metadata, Column('id', Integer, primary_key=True), Column('today', DateTime)) @@ -482,60 +474,6 @@ class QueryTest(PersistTest): finally: shadowed.drop(checkfirst=True) - @testing.supported('mssql') - def test_fetchid_trigger(self): - meta = MetaData(testbase.db) - t1 = Table('t1', meta, - Column('id', Integer, Sequence('fred', 100, 1), primary_key=True), - Column('descr', String(200))) - t2 = Table('t2', meta, - Column('id', Integer, Sequence('fred', 200, 1), primary_key=True), - Column('descr', String(200))) - meta.create_all() - con = testbase.db.connect() - con.execute("""create trigger paj on t1 for insert as - insert into t2 (descr) select descr from inserted""") - - try: - tr = con.begin() - r = con.execute(t2.insert(), descr='hello') - self.assert_(r.last_inserted_ids() == [200]) - r = con.execute(t1.insert(), descr='hello') - self.assert_(r.last_inserted_ids() == [100]) - - finally: - tr.commit() - con.execute("""drop trigger paj""") - meta.drop_all() - - @testing.supported('mssql') - def test_insertid_schema(self): - meta = MetaData(testbase.db) - con = testbase.db.connect() - con.execute('create schema paj') - tbl = Table('test', meta, Column('id', Integer, primary_key=True), schema='paj') - tbl.create() - try: - tbl.insert().execute({'id':1}) - finally: - tbl.drop() - con.execute('drop schema paj') - - @testing.supported('mssql') - def test_insertid_reserved(self): - meta = MetaData(testbase.db) - table = Table( - 'select', meta, - Column('col', Integer, primary_key=True) - ) - table.create() - - meta2 = MetaData(testbase.db) - try: - table.insert().execute(col=7) - finally: - table.drop() - @testing.fails_on('maxdb') def test_in_filtering(self): """test the behavior of the in_() function.""" diff --git a/test/testlib/testing.py b/test/testlib/testing.py index 95071a475..d5fb3b4e5 100644 --- a/test/testlib/testing.py +++ b/test/testlib/testing.py @@ -21,18 +21,32 @@ _ops = { '<': operator.lt, 'between': lambda val, pair: val >= pair[0] and val <= pair[1], } -def unsupported(*dbs): - """Mark a test as unsupported by one or more database implementations""" +def fails_on(*dbs): + """Mark a test as expected to fail on one or more database implementations. + + Unlike ``unsupported``, tests marked as ``fails_on`` will be run + for the named databases. The test is expected to fail and the unit test + logic is inverted: if the test fails, a success is reported. If the test + succeeds, a failure is reported. + """ def decorate(fn): fn_name = fn.__name__ def maybe(*args, **kw): - if config.db.name in dbs: - print "'%s' unsupported on DB implementation '%s'" % ( - fn_name, config.db.name) - return True - else: + if config.db.name not in dbs: return fn(*args, **kw) + else: + try: + fn(*args, **kw) + except Exception, ex: + print ("'%s' failed as expected on DB implementation " + "'%s': %s" % ( + fn_name, config.db.name, str(ex))) + return True + else: + raise AssertionError( + "Unexpected success for '%s' on DB implementation '%s'" % + (fn_name, config.db.name)) try: maybe.__name__ = fn_name except: @@ -40,19 +54,17 @@ def unsupported(*dbs): return maybe return decorate -def fails_on(*dbs): - """Mark a test as expected to fail on one or more database implementations. +def fails_on_everything_except(*dbs): + """Mark a test as expected to fail on most database implementations. - Unlike ``unsupported``, tests marked as ``fails_on`` will be run - for the named databases. The test is expected to fail and the unit test - logic is inverted: if the test fails, a success is reported. If the test - succeeds, a failure is reported. + Like ``fails_on``, except failure is the expected outcome on all + databases except those listed. """ def decorate(fn): fn_name = fn.__name__ def maybe(*args, **kw): - if config.db.name not in dbs: + if config.db.name in dbs: return fn(*args, **kw) else: try: @@ -73,18 +85,22 @@ def fails_on(*dbs): return maybe return decorate -def supported(*dbs): - """Mark a test as supported by one or more database implementations""" +def unsupported(*dbs): + """Mark a test as unsupported by one or more database implementations. + + 'unsupported' tests will be skipped unconditionally. Useful for feature + tests that cause deadlocks or other fatal problems. + """ def decorate(fn): fn_name = fn.__name__ def maybe(*args, **kw): if config.db.name in dbs: - return fn(*args, **kw) - else: print "'%s' unsupported on DB implementation '%s'" % ( fn_name, config.db.name) return True + else: + return fn(*args, **kw) try: maybe.__name__ = fn_name except: @@ -95,7 +111,7 @@ def supported(*dbs): def exclude(db, op, spec): """Mark a test as unsupported by specific database server versions. - Stackable, both with other excludes and supported/unsupported. Examples:: + Stackable, both with other excludes and other decorators. Examples:: # Not supported by mydb versions less than 1, 0 @exclude('mydb', '<', (1,0)) # Other operators work too @@ -106,17 +122,9 @@ def exclude(db, op, spec): def decorate(fn): fn_name = fn.__name__ def maybe(*args, **kw): - if config.db.name != db: - return fn(*args, **kw) - - have = config.db.dialect.server_version_info( - config.db.contextual_connect()) - - oper = hasattr(op, '__call__') and op or _ops[op] - - if oper(have, spec): + if _is_excluded(db, op, spec): print "'%s' unsupported on DB %s version '%s'" % ( - fn_name, config.db.name, have) + fn_name, config.db.name, _server_version()) return True else: return fn(*args, **kw) @@ -127,6 +135,41 @@ def exclude(db, op, spec): return maybe return decorate +def _is_excluded(db, op, spec): + """Return True if the configured db matches an exclusion specification. + + db: + A dialect name + op: + An operator or stringified operator, such as '==' + spec: + A value that will be compared to the dialect's server_version_info + using the supplied operator. + + Examples:: + # Not supported by mydb versions less than 1, 0 + _is_excluded('mydb', '<', (1,0)) + # Other operators work too + _is_excluded('bigdb', '==', (9,0,9)) + _is_excluded('yikesdb', 'in', ((0, 3, 'alpha2'), (0, 3, 'alpha3'))) + """ + + if config.db.name != db: + return False + + version = _server_version() + + oper = hasattr(op, '__call__') and op or _ops[op] + return oper(version, spec) + +def _server_version(bind=None): + """Return a server_version_info tuple.""" + + if bind is None: + bind = config.db + return bind.dialect.server_version_info(bind.contextual_connect()) + + def against(*queries): """Boolean predicate, compares to testing database configuration. @@ -262,6 +305,12 @@ class ExecutionContextWrapper(object): return query class PersistTest(unittest.TestCase): + # A sequence of dialect names to exclude from the test class. + __unsupported_on__ = () + + # If present, test class is only runnable for the *single* specified + # dialect. If you need multiple, use __unsupported_on__ and invert. + __only_on__ = None def __init__(self, *args, **params): unittest.TestCase.__init__(self, *args, **params) @@ -426,7 +475,8 @@ class ORMTest(AssertMixin): _otest_metadata = MetaData(config.db) else: _otest_metadata = self.metadata - _otest_metadata.bind = config.db + if self.metadata.bind is None: + _otest_metadata.bind = config.db self.define_tables(_otest_metadata) _otest_metadata.create_all() self.insert_data() @@ -490,23 +540,39 @@ class TTestSuite(unittest.TestSuite): return self(result) def __call__(self, result): - try: - if self._initTest is not None: - self._initTest.setUpAll() - except: - # skip tests if global setup fails - ex = self.__exc_info() - for test in self._tests: - result.addError(test, ex) - return False + init = getattr(self, '_initTest', None) + if init is not None: + if (hasattr(init, '__unsupported_on__') and + config.db.name in init.__unsupported_on__): + print "'%s' unsupported on DB implementation '%s'" % ( + init.__class__.__name__, config.db.name) + return True + if (getattr(init, '__only_on__', None) not in (None,config.db.name)): + print "'%s' unsupported on DB implementation '%s'" % ( + init.__class__.__name__, config.db.name) + return True + for rule in getattr(init, '__excluded_on__', ()): + if _is_excluded(*rule): + print "'%s' unsupported on DB %s version %s" % ( + init.__class__.__name__, config.db.name, + _server_version()) + return True + try: + init.setUpAll() + except: + # skip tests if global setup fails + ex = self.__exc_info() + for test in self._tests: + result.addError(test, ex) + return False try: return self.do_run(result) finally: try: - if self._initTest is not None: - self._initTest.tearDownAll() + if init is not None: + init.tearDownAll() except: - result.addError(self._initTest, self.__exc_info()) + result.addError(init, self.__exc_info()) pass def __exc_info(self): |