diff options
Diffstat (limited to 'test/sql/query.py')
-rw-r--r-- | test/sql/query.py | 246 |
1 files changed, 120 insertions, 126 deletions
diff --git a/test/sql/query.py b/test/sql/query.py index 8af5aafea..48a28a9a5 100644 --- a/test/sql/query.py +++ b/test/sql/query.py @@ -1,13 +1,9 @@ -from testbase import PersistTest import testbase -import unittest, sys, datetime - -import sqlalchemy.databases.sqlite as sqllite - -import tables +import datetime from sqlalchemy import * -from sqlalchemy.engine import ResultProxy, RowProxy from sqlalchemy import exceptions +from testlib import * + class QueryTest(PersistTest): @@ -24,25 +20,24 @@ class QueryTest(PersistTest): Column('address', String(30))) metadata.create_all() - def setUp(self): - self.users = users def tearDown(self): - self.users.delete().execute() + addresses.delete().execute() + users.delete().execute() def tearDownAll(self): metadata.drop_all() def testinsert(self): - self.users.insert().execute(user_id = 7, user_name = 'jack') - print repr(self.users.select().execute().fetchall()) - + users.insert().execute(user_id = 7, user_name = 'jack') + assert users.count().scalar() == 1 + def testupdate(self): - self.users.insert().execute(user_id = 7, user_name = 'jack') - print repr(self.users.select().execute().fetchall()) + users.insert().execute(user_id = 7, user_name = 'jack') + assert users.count().scalar() == 1 - self.users.update(self.users.c.user_id == 7).execute(user_name = 'fred') - print repr(self.users.select().execute().fetchall()) + users.update(users.c.user_id == 7).execute(user_name = 'fred') + assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred' def test_lastrow_accessor(self): """test the last_inserted_ids() and lastrow_has_id() functions""" @@ -63,14 +58,15 @@ class QueryTest(PersistTest): if result.lastrow_has_defaults(): criterion = and_(*[col==id for col, id in zip(table.primary_key, result.last_inserted_ids())]) row = table.select(criterion).execute().fetchone() - ret.update(row) + for c in table.c: + ret[c.key] = row[c] return ret for supported, table, values, assertvalues in [ ( {'unsupported':['sqlite']}, Table("t1", metadata, - Column('id', Integer, primary_key=True), + Column('id', Integer, Sequence('t1_id_seq', optional=True), primary_key=True), Column('foo', String(30), primary_key=True)), {'foo':'hi'}, {'id':1, 'foo':'hi'} @@ -78,7 +74,7 @@ class QueryTest(PersistTest): ( {'unsupported':['sqlite']}, Table("t2", metadata, - Column('id', Integer, primary_key=True), + Column('id', Integer, Sequence('t2_id_seq', optional=True), primary_key=True), Column('foo', String(30), primary_key=True), Column('bar', String(30), PassiveDefault('hi')) ), @@ -98,7 +94,7 @@ class QueryTest(PersistTest): ( {'unsupported':[]}, Table("t4", metadata, - Column('id', Integer, primary_key=True), + Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True), Column('foo', String(30), primary_key=True), Column('bar', String(30), PassiveDefault('hi')) ), @@ -124,109 +120,94 @@ class QueryTest(PersistTest): table.drop() def testrowiteration(self): - self.users.insert().execute(user_id = 7, user_name = 'jack') - self.users.insert().execute(user_id = 8, user_name = 'ed') - self.users.insert().execute(user_id = 9, user_name = 'fred') - r = self.users.select().execute() + users.insert().execute(user_id = 7, user_name = 'jack') + users.insert().execute(user_id = 8, user_name = 'ed') + users.insert().execute(user_id = 9, user_name = 'fred') + r = users.select().execute() l = [] for row in r: l.append(row) self.assert_(len(l) == 3) def test_fetchmany(self): - self.users.insert().execute(user_id = 7, user_name = 'jack') - self.users.insert().execute(user_id = 8, user_name = 'ed') - self.users.insert().execute(user_id = 9, user_name = 'fred') - r = self.users.select().execute() + users.insert().execute(user_id = 7, user_name = 'jack') + users.insert().execute(user_id = 8, user_name = 'ed') + users.insert().execute(user_id = 9, user_name = 'fred') + r = users.select().execute() l = [] for row in r.fetchmany(size=2): l.append(row) self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l)) def test_compiled_execute(self): - s = select([self.users], self.users.c.user_id==bindparam('id')).compile() + users.insert().execute(user_id = 7, user_name = 'jack') + s = select([users], users.c.user_id==bindparam('id')).compile() c = testbase.db.connect() - print repr(c.execute(s, id=7).fetchall()) - - def test_global_metadata(self): - t1 = Table('table1', Column('col1', Integer, primary_key=True), - Column('col2', String(20))) - t2 = Table('table2', Column('col1', Integer, primary_key=True), - Column('col2', String(20))) - - assert t1.c.col1 - global_connect(testbase.db) - default_metadata.create_all() - try: - assert t1.count().scalar() == 0 - finally: - default_metadata.drop_all() - default_metadata.clear() - + assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7 def test_repeated_bindparams(self): """test that a BindParam can be used more than once. this should be run for dbs with both positional and named paramstyles.""" - self.users.insert().execute(user_id = 7, user_name = 'jack') - self.users.insert().execute(user_id = 8, user_name = 'fred') + users.insert().execute(user_id = 7, user_name = 'jack') + users.insert().execute(user_id = 8, user_name = 'fred') u = bindparam('userid') - s = self.users.select(or_(self.users.c.user_name==u, self.users.c.user_name==u)) + s = users.select(or_(users.c.user_name==u, users.c.user_name==u)) r = s.execute(userid='fred').fetchall() assert len(r) == 1 def test_bindparam_shortname(self): """test the 'shortname' field on BindParamClause.""" - self.users.insert().execute(user_id = 7, user_name = 'jack') - self.users.insert().execute(user_id = 8, user_name = 'fred') + users.insert().execute(user_id = 7, user_name = 'jack') + users.insert().execute(user_id = 8, user_name = 'fred') u = bindparam('userid', shortname='someshortname') - s = self.users.select(self.users.c.user_name==u) + s = users.select(users.c.user_name==u) r = s.execute(someshortname='fred').fetchall() assert len(r) == 1 def testdelete(self): - self.users.insert().execute(user_id = 7, user_name = 'jack') - self.users.insert().execute(user_id = 8, user_name = 'fred') - print repr(self.users.select().execute().fetchall()) + users.insert().execute(user_id = 7, user_name = 'jack') + users.insert().execute(user_id = 8, user_name = 'fred') + print repr(users.select().execute().fetchall()) - self.users.delete(self.users.c.user_name == 'fred').execute() + users.delete(users.c.user_name == 'fred').execute() - print repr(self.users.select().execute().fetchall()) + print repr(users.select().execute().fetchall()) def testselectlimit(self): - self.users.insert().execute(user_id=1, user_name='john') - self.users.insert().execute(user_id=2, user_name='jack') - self.users.insert().execute(user_id=3, user_name='ed') - self.users.insert().execute(user_id=4, user_name='wendy') - self.users.insert().execute(user_id=5, user_name='laura') - self.users.insert().execute(user_id=6, user_name='ralph') - self.users.insert().execute(user_id=7, user_name='fido') - r = self.users.select(limit=3, order_by=[self.users.c.user_id]).execute().fetchall() + users.insert().execute(user_id=1, user_name='john') + users.insert().execute(user_id=2, user_name='jack') + users.insert().execute(user_id=3, user_name='ed') + users.insert().execute(user_id=4, user_name='wendy') + users.insert().execute(user_id=5, user_name='laura') + users.insert().execute(user_id=6, user_name='ralph') + users.insert().execute(user_id=7, user_name='fido') + r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall() self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r)) - @testbase.unsupported('mssql') + @testing.unsupported('mssql') def testselectlimitoffset(self): - self.users.insert().execute(user_id=1, user_name='john') - self.users.insert().execute(user_id=2, user_name='jack') - self.users.insert().execute(user_id=3, user_name='ed') - self.users.insert().execute(user_id=4, user_name='wendy') - self.users.insert().execute(user_id=5, user_name='laura') - self.users.insert().execute(user_id=6, user_name='ralph') - self.users.insert().execute(user_id=7, user_name='fido') - r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall() + users.insert().execute(user_id=1, user_name='john') + users.insert().execute(user_id=2, user_name='jack') + users.insert().execute(user_id=3, user_name='ed') + users.insert().execute(user_id=4, user_name='wendy') + users.insert().execute(user_id=5, user_name='laura') + users.insert().execute(user_id=6, user_name='ralph') + users.insert().execute(user_id=7, user_name='fido') + r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall() self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')]) - r = self.users.select(offset=5, order_by=[self.users.c.user_id]).execute().fetchall() + r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall() self.assert_(r==[(6, 'ralph'), (7, 'fido')]) - @testbase.supported('mssql') + @testing.supported('mssql') def testselectlimitoffset_mssql(self): try: - r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall() + r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall() assert False # InvalidRequestError should have been raised except exceptions.InvalidRequestError: pass - @testbase.unsupported('mysql') + @testing.unsupported('mysql') def test_scalar_select(self): """test that scalar subqueries with labels get their type propigated to the result set.""" # mysql and/or mysqldb has a bug here, type isnt propigated for scalar subquery. @@ -244,18 +225,26 @@ class QueryTest(PersistTest): datetable.drop() def test_column_accessor(self): - self.users.insert().execute(user_id=1, user_name='john') - self.users.insert().execute(user_id=2, user_name='jack') - r = self.users.select(self.users.c.user_id==2).execute().fetchone() - self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2) - self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack') - - r = text("select * from query_users where user_id=2", engine=testbase.db).execute().fetchone() - self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2) - self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack') + users.insert().execute(user_id=1, user_name='john') + users.insert().execute(user_id=2, user_name='jack') + addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com') + + r = users.select(users.c.user_id==2).execute().fetchone() + self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) + self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') + + r = text("select * from query_users where user_id=2", bind=testbase.db).execute().fetchone() + self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) + self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') + # test slices + r = text("select * from query_addresses", bind=testbase.db).execute().fetchone() + self.assert_(r[0:1] == (1,)) + self.assert_(r[1:] == (2, 'foo@bar.com')) + self.assert_(r[:-1] == (1, 2)) + def test_ambiguous_column(self): - self.users.insert().execute(user_id=1, user_name='john') + users.insert().execute(user_id=1, user_name='john') r = users.outerjoin(addresses).select().execute().fetchone() try: print r['user_id'] @@ -264,18 +253,18 @@ class QueryTest(PersistTest): assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." def test_keys(self): - self.users.insert().execute(user_id=1, user_name='foo') - r = self.users.select().execute().fetchone() + users.insert().execute(user_id=1, user_name='foo') + r = users.select().execute().fetchone() self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name']) def test_items(self): - self.users.insert().execute(user_id=1, user_name='foo') - r = self.users.select().execute().fetchone() + users.insert().execute(user_id=1, user_name='foo') + r = users.select().execute().fetchone() self.assertEqual([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')]) def test_len(self): - self.users.insert().execute(user_id=1, user_name='foo') - r = self.users.select().execute().fetchone() + users.insert().execute(user_id=1, user_name='foo') + r = users.select().execute().fetchone() self.assertEqual(len(r), 2) r.close() r = testbase.db.execute('select user_name, user_id from query_users', {}).fetchone() @@ -295,7 +284,11 @@ class QueryTest(PersistTest): x = testbase.db.func.current_date().execute().scalar() y = testbase.db.func.current_date().select().execute().scalar() z = testbase.db.func.current_date().scalar() - assert x == y == z + assert (x == y == z) is True + + x = testbase.db.func.current_date(type_=Date) + assert isinstance(x.type, Date) + assert isinstance(x.execute().scalar(), datetime.date) def test_conn_functions(self): conn = testbase.db.connect() @@ -305,8 +298,8 @@ class QueryTest(PersistTest): z = conn.scalar(func.current_date()) finally: conn.close() - assert x == y == z - + assert (x == y == z) is True + def test_update_functions(self): """test sending functions and SQL expressions to the VALUES and SET clauses of INSERT/UPDATE instances, and that column-level defaults get overridden""" @@ -357,7 +350,7 @@ class QueryTest(PersistTest): finally: meta.drop_all() - @testbase.supported('postgres') + @testing.supported('postgres') def test_functions_with_cols(self): # TODO: shouldnt this work on oracle too ? x = testbase.db.func.current_date().execute().scalar() @@ -366,7 +359,7 @@ class QueryTest(PersistTest): w = select(['*'], from_obj=[testbase.db.func.current_date()]).scalar() # construct a column-based FROM object out of a function, like in [ticket:172] - s = select([column('date', type=DateTime)], from_obj=[testbase.db.func.current_date()]) + s = select([column('date', type_=DateTime)], from_obj=[testbase.db.func.current_date()]) q = s.execute().fetchone()[s.c.date] r = s.alias('datequery').select().scalar() @@ -374,8 +367,8 @@ class QueryTest(PersistTest): def test_column_order_with_simple_query(self): # should return values in column definition order - self.users.insert().execute(user_id=1, user_name='foo') - r = self.users.select(self.users.c.user_id==1).execute().fetchone() + users.insert().execute(user_id=1, user_name='foo') + r = users.select(users.c.user_id==1).execute().fetchone() self.assertEqual(r[0], 1) self.assertEqual(r[1], 'foo') self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name']) @@ -383,14 +376,14 @@ class QueryTest(PersistTest): def test_column_order_with_text_query(self): # should return values in query order - self.users.insert().execute(user_id=1, user_name='foo') + users.insert().execute(user_id=1, user_name='foo') r = testbase.db.execute('select user_name, user_id from query_users', {}).fetchone() self.assertEqual(r[0], 'foo') self.assertEqual(r[1], 1) self.assertEqual([x.lower() for x in r.keys()], ['user_name', 'user_id']) self.assertEqual(r.values(), ['foo', 1]) - @testbase.unsupported('oracle', 'firebird') + @testing.unsupported('oracle', 'firebird') def test_column_accessor_shadow(self): meta = MetaData(testbase.db) shadowed = Table('test_shadowed', meta, @@ -420,7 +413,7 @@ class QueryTest(PersistTest): finally: shadowed.drop(checkfirst=True) - @testbase.supported('mssql') + @testing.supported('mssql') def test_fetchid_trigger(self): meta = MetaData(testbase.db) t1 = Table('t1', meta, @@ -446,7 +439,7 @@ class QueryTest(PersistTest): con.execute("""drop trigger paj""") meta.drop_all() - @testbase.supported('mssql') + @testing.supported('mssql') def test_insertid_schema(self): meta = MetaData(testbase.db) con = testbase.db.connect() @@ -459,7 +452,7 @@ class QueryTest(PersistTest): tbl.drop() con.execute('drop schema paj') - @testbase.supported('mssql') + @testing.supported('mssql') def test_insertid_reserved(self): meta = MetaData(testbase.db) table = Table( @@ -476,51 +469,52 @@ class QueryTest(PersistTest): def test_in_filtering(self): - """test the 'shortname' field on BindParamClause.""" - self.users.insert().execute(user_id = 7, user_name = 'jack') - self.users.insert().execute(user_id = 8, user_name = 'fred') - self.users.insert().execute(user_id = 9, user_name = None) + """test the behavior of the in_() function.""" + + users.insert().execute(user_id = 7, user_name = 'jack') + users.insert().execute(user_id = 8, user_name = 'fred') + users.insert().execute(user_id = 9, user_name = None) - s = self.users.select(self.users.c.user_name.in_()) + s = users.select(users.c.user_name.in_()) r = s.execute().fetchall() # No username is in empty set assert len(r) == 0 - s = self.users.select(not_(self.users.c.user_name.in_())) + s = users.select(not_(users.c.user_name.in_())) r = s.execute().fetchall() # All usernames with a value are outside an empty set assert len(r) == 2 - s = self.users.select(self.users.c.user_name.in_('jack','fred')) + s = users.select(users.c.user_name.in_('jack','fred')) r = s.execute().fetchall() assert len(r) == 2 - s = self.users.select(not_(self.users.c.user_name.in_('jack','fred'))) + s = users.select(not_(users.c.user_name.in_('jack','fred'))) r = s.execute().fetchall() # Null values are not outside any set assert len(r) == 0 u = bindparam('search_key') - s = self.users.select(u.in_()) + s = users.select(u.in_()) r = s.execute(search_key='john').fetchall() assert len(r) == 0 r = s.execute(search_key=None).fetchall() assert len(r) == 0 - s = self.users.select(not_(u.in_())) + s = users.select(not_(u.in_())) r = s.execute(search_key='john').fetchall() assert len(r) == 3 r = s.execute(search_key=None).fetchall() assert len(r) == 0 - s = self.users.select(self.users.c.user_name.in_() == True) + s = users.select(users.c.user_name.in_() == True) r = s.execute().fetchall() assert len(r) == 0 - s = self.users.select(self.users.c.user_name.in_() == False) + s = users.select(users.c.user_name.in_() == False) r = s.execute().fetchall() assert len(r) == 2 - s = self.users.select(self.users.c.user_name.in_() == None) + s = users.select(users.c.user_name.in_() == None) r = s.execute().fetchall() assert len(r) == 1 @@ -577,7 +571,7 @@ class CompoundTest(PersistTest): assert u.execute().fetchall() == [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] assert u.alias('bar').select().execute().fetchall() == [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] - @testbase.unsupported('mysql') + @testing.unsupported('mysql') def test_intersect(self): i = intersect( select([t2.c.col3, t2.c.col4]), @@ -586,7 +580,7 @@ class CompoundTest(PersistTest): assert i.execute().fetchall() == [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] assert i.alias('bar').select().execute().fetchall() == [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] - @testbase.unsupported('mysql', 'oracle') + @testing.unsupported('mysql', 'oracle') def test_except_style1(self): e = except_(union( select([t1.c.col3, t1.c.col4]), @@ -595,7 +589,7 @@ class CompoundTest(PersistTest): ), select([t2.c.col3, t2.c.col4])) assert e.alias('bar').select().execute().fetchall() == [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')] - @testbase.unsupported('mysql', 'oracle') + @testing.unsupported('mysql', 'oracle') def test_except_style2(self): e = except_(union( select([t1.c.col3, t1.c.col4]), @@ -605,7 +599,7 @@ class CompoundTest(PersistTest): assert e.execute().fetchall() == [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')] assert e.alias('bar').select().execute().fetchall() == [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')] - @testbase.unsupported('sqlite', 'mysql', 'oracle') + @testing.unsupported('sqlite', 'mysql', 'oracle') def test_except_style3(self): # aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc e = except_( @@ -617,7 +611,7 @@ class CompoundTest(PersistTest): ) self.assertEquals(e.execute().fetchall(), [('ccc',)]) - @testbase.unsupported('sqlite', 'mysql', 'oracle') + @testing.unsupported('sqlite', 'mysql', 'oracle') def test_union_union_all(self): e = union_all( select([t1.c.col3]), @@ -628,7 +622,7 @@ class CompoundTest(PersistTest): ) self.assertEquals(e.execute().fetchall(), [('aaa',),('bbb',),('ccc',),('aaa',),('bbb',),('ccc',)]) - @testbase.unsupported('mysql') + @testing.unsupported('mysql') def test_composite(self): u = intersect( select([t2.c.col3, t2.c.col4]), |