summaryrefslogtreecommitdiff
path: root/test/sql/query.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql/query.py')
-rw-r--r--test/sql/query.py246
1 files changed, 120 insertions, 126 deletions
diff --git a/test/sql/query.py b/test/sql/query.py
index 8af5aafea..48a28a9a5 100644
--- a/test/sql/query.py
+++ b/test/sql/query.py
@@ -1,13 +1,9 @@
-from testbase import PersistTest
import testbase
-import unittest, sys, datetime
-
-import sqlalchemy.databases.sqlite as sqllite
-
-import tables
+import datetime
from sqlalchemy import *
-from sqlalchemy.engine import ResultProxy, RowProxy
from sqlalchemy import exceptions
+from testlib import *
+
class QueryTest(PersistTest):
@@ -24,25 +20,24 @@ class QueryTest(PersistTest):
Column('address', String(30)))
metadata.create_all()
- def setUp(self):
- self.users = users
def tearDown(self):
- self.users.delete().execute()
+ addresses.delete().execute()
+ users.delete().execute()
def tearDownAll(self):
metadata.drop_all()
def testinsert(self):
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- print repr(self.users.select().execute().fetchall())
-
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ assert users.count().scalar() == 1
+
def testupdate(self):
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- print repr(self.users.select().execute().fetchall())
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ assert users.count().scalar() == 1
- self.users.update(self.users.c.user_id == 7).execute(user_name = 'fred')
- print repr(self.users.select().execute().fetchall())
+ users.update(users.c.user_id == 7).execute(user_name = 'fred')
+ assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred'
def test_lastrow_accessor(self):
"""test the last_inserted_ids() and lastrow_has_id() functions"""
@@ -63,14 +58,15 @@ class QueryTest(PersistTest):
if result.lastrow_has_defaults():
criterion = and_(*[col==id for col, id in zip(table.primary_key, result.last_inserted_ids())])
row = table.select(criterion).execute().fetchone()
- ret.update(row)
+ for c in table.c:
+ ret[c.key] = row[c]
return ret
for supported, table, values, assertvalues in [
(
{'unsupported':['sqlite']},
Table("t1", metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, Sequence('t1_id_seq', optional=True), primary_key=True),
Column('foo', String(30), primary_key=True)),
{'foo':'hi'},
{'id':1, 'foo':'hi'}
@@ -78,7 +74,7 @@ class QueryTest(PersistTest):
(
{'unsupported':['sqlite']},
Table("t2", metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, Sequence('t2_id_seq', optional=True), primary_key=True),
Column('foo', String(30), primary_key=True),
Column('bar', String(30), PassiveDefault('hi'))
),
@@ -98,7 +94,7 @@ class QueryTest(PersistTest):
(
{'unsupported':[]},
Table("t4", metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True),
Column('foo', String(30), primary_key=True),
Column('bar', String(30), PassiveDefault('hi'))
),
@@ -124,109 +120,94 @@ class QueryTest(PersistTest):
table.drop()
def testrowiteration(self):
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- self.users.insert().execute(user_id = 8, user_name = 'ed')
- self.users.insert().execute(user_id = 9, user_name = 'fred')
- r = self.users.select().execute()
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'ed')
+ users.insert().execute(user_id = 9, user_name = 'fred')
+ r = users.select().execute()
l = []
for row in r:
l.append(row)
self.assert_(len(l) == 3)
def test_fetchmany(self):
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- self.users.insert().execute(user_id = 8, user_name = 'ed')
- self.users.insert().execute(user_id = 9, user_name = 'fred')
- r = self.users.select().execute()
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'ed')
+ users.insert().execute(user_id = 9, user_name = 'fred')
+ r = users.select().execute()
l = []
for row in r.fetchmany(size=2):
l.append(row)
self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l))
def test_compiled_execute(self):
- s = select([self.users], self.users.c.user_id==bindparam('id')).compile()
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ s = select([users], users.c.user_id==bindparam('id')).compile()
c = testbase.db.connect()
- print repr(c.execute(s, id=7).fetchall())
-
- def test_global_metadata(self):
- t1 = Table('table1', Column('col1', Integer, primary_key=True),
- Column('col2', String(20)))
- t2 = Table('table2', Column('col1', Integer, primary_key=True),
- Column('col2', String(20)))
-
- assert t1.c.col1
- global_connect(testbase.db)
- default_metadata.create_all()
- try:
- assert t1.count().scalar() == 0
- finally:
- default_metadata.drop_all()
- default_metadata.clear()
-
+ assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7
def test_repeated_bindparams(self):
"""test that a BindParam can be used more than once.
this should be run for dbs with both positional and named paramstyles."""
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- self.users.insert().execute(user_id = 8, user_name = 'fred')
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'fred')
u = bindparam('userid')
- s = self.users.select(or_(self.users.c.user_name==u, self.users.c.user_name==u))
+ s = users.select(or_(users.c.user_name==u, users.c.user_name==u))
r = s.execute(userid='fred').fetchall()
assert len(r) == 1
def test_bindparam_shortname(self):
"""test the 'shortname' field on BindParamClause."""
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- self.users.insert().execute(user_id = 8, user_name = 'fred')
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'fred')
u = bindparam('userid', shortname='someshortname')
- s = self.users.select(self.users.c.user_name==u)
+ s = users.select(users.c.user_name==u)
r = s.execute(someshortname='fred').fetchall()
assert len(r) == 1
def testdelete(self):
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- self.users.insert().execute(user_id = 8, user_name = 'fred')
- print repr(self.users.select().execute().fetchall())
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'fred')
+ print repr(users.select().execute().fetchall())
- self.users.delete(self.users.c.user_name == 'fred').execute()
+ users.delete(users.c.user_name == 'fred').execute()
- print repr(self.users.select().execute().fetchall())
+ print repr(users.select().execute().fetchall())
def testselectlimit(self):
- self.users.insert().execute(user_id=1, user_name='john')
- self.users.insert().execute(user_id=2, user_name='jack')
- self.users.insert().execute(user_id=3, user_name='ed')
- self.users.insert().execute(user_id=4, user_name='wendy')
- self.users.insert().execute(user_id=5, user_name='laura')
- self.users.insert().execute(user_id=6, user_name='ralph')
- self.users.insert().execute(user_id=7, user_name='fido')
- r = self.users.select(limit=3, order_by=[self.users.c.user_id]).execute().fetchall()
+ users.insert().execute(user_id=1, user_name='john')
+ users.insert().execute(user_id=2, user_name='jack')
+ users.insert().execute(user_id=3, user_name='ed')
+ users.insert().execute(user_id=4, user_name='wendy')
+ users.insert().execute(user_id=5, user_name='laura')
+ users.insert().execute(user_id=6, user_name='ralph')
+ users.insert().execute(user_id=7, user_name='fido')
+ r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall()
self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
- @testbase.unsupported('mssql')
+ @testing.unsupported('mssql')
def testselectlimitoffset(self):
- self.users.insert().execute(user_id=1, user_name='john')
- self.users.insert().execute(user_id=2, user_name='jack')
- self.users.insert().execute(user_id=3, user_name='ed')
- self.users.insert().execute(user_id=4, user_name='wendy')
- self.users.insert().execute(user_id=5, user_name='laura')
- self.users.insert().execute(user_id=6, user_name='ralph')
- self.users.insert().execute(user_id=7, user_name='fido')
- r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
+ users.insert().execute(user_id=1, user_name='john')
+ users.insert().execute(user_id=2, user_name='jack')
+ users.insert().execute(user_id=3, user_name='ed')
+ users.insert().execute(user_id=4, user_name='wendy')
+ users.insert().execute(user_id=5, user_name='laura')
+ users.insert().execute(user_id=6, user_name='ralph')
+ users.insert().execute(user_id=7, user_name='fido')
+ r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall()
self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
- r = self.users.select(offset=5, order_by=[self.users.c.user_id]).execute().fetchall()
+ r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall()
self.assert_(r==[(6, 'ralph'), (7, 'fido')])
- @testbase.supported('mssql')
+ @testing.supported('mssql')
def testselectlimitoffset_mssql(self):
try:
- r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
+ r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall()
assert False # InvalidRequestError should have been raised
except exceptions.InvalidRequestError:
pass
- @testbase.unsupported('mysql')
+ @testing.unsupported('mysql')
def test_scalar_select(self):
"""test that scalar subqueries with labels get their type propigated to the result set."""
# mysql and/or mysqldb has a bug here, type isnt propigated for scalar subquery.
@@ -244,18 +225,26 @@ class QueryTest(PersistTest):
datetable.drop()
def test_column_accessor(self):
- self.users.insert().execute(user_id=1, user_name='john')
- self.users.insert().execute(user_id=2, user_name='jack')
- r = self.users.select(self.users.c.user_id==2).execute().fetchone()
- self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
- self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')
-
- r = text("select * from query_users where user_id=2", engine=testbase.db).execute().fetchone()
- self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
- self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')
+ users.insert().execute(user_id=1, user_name='john')
+ users.insert().execute(user_id=2, user_name='jack')
+ addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com')
+
+ r = users.select(users.c.user_id==2).execute().fetchone()
+ self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
+ self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
+
+ r = text("select * from query_users where user_id=2", bind=testbase.db).execute().fetchone()
+ self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
+ self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
+ # test slices
+ r = text("select * from query_addresses", bind=testbase.db).execute().fetchone()
+ self.assert_(r[0:1] == (1,))
+ self.assert_(r[1:] == (2, 'foo@bar.com'))
+ self.assert_(r[:-1] == (1, 2))
+
def test_ambiguous_column(self):
- self.users.insert().execute(user_id=1, user_name='john')
+ users.insert().execute(user_id=1, user_name='john')
r = users.outerjoin(addresses).select().execute().fetchone()
try:
print r['user_id']
@@ -264,18 +253,18 @@ class QueryTest(PersistTest):
assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement."
def test_keys(self):
- self.users.insert().execute(user_id=1, user_name='foo')
- r = self.users.select().execute().fetchone()
+ users.insert().execute(user_id=1, user_name='foo')
+ r = users.select().execute().fetchone()
self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name'])
def test_items(self):
- self.users.insert().execute(user_id=1, user_name='foo')
- r = self.users.select().execute().fetchone()
+ users.insert().execute(user_id=1, user_name='foo')
+ r = users.select().execute().fetchone()
self.assertEqual([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')])
def test_len(self):
- self.users.insert().execute(user_id=1, user_name='foo')
- r = self.users.select().execute().fetchone()
+ users.insert().execute(user_id=1, user_name='foo')
+ r = users.select().execute().fetchone()
self.assertEqual(len(r), 2)
r.close()
r = testbase.db.execute('select user_name, user_id from query_users', {}).fetchone()
@@ -295,7 +284,11 @@ class QueryTest(PersistTest):
x = testbase.db.func.current_date().execute().scalar()
y = testbase.db.func.current_date().select().execute().scalar()
z = testbase.db.func.current_date().scalar()
- assert x == y == z
+ assert (x == y == z) is True
+
+ x = testbase.db.func.current_date(type_=Date)
+ assert isinstance(x.type, Date)
+ assert isinstance(x.execute().scalar(), datetime.date)
def test_conn_functions(self):
conn = testbase.db.connect()
@@ -305,8 +298,8 @@ class QueryTest(PersistTest):
z = conn.scalar(func.current_date())
finally:
conn.close()
- assert x == y == z
-
+ assert (x == y == z) is True
+
def test_update_functions(self):
"""test sending functions and SQL expressions to the VALUES and SET clauses of INSERT/UPDATE instances,
and that column-level defaults get overridden"""
@@ -357,7 +350,7 @@ class QueryTest(PersistTest):
finally:
meta.drop_all()
- @testbase.supported('postgres')
+ @testing.supported('postgres')
def test_functions_with_cols(self):
# TODO: shouldnt this work on oracle too ?
x = testbase.db.func.current_date().execute().scalar()
@@ -366,7 +359,7 @@ class QueryTest(PersistTest):
w = select(['*'], from_obj=[testbase.db.func.current_date()]).scalar()
# construct a column-based FROM object out of a function, like in [ticket:172]
- s = select([column('date', type=DateTime)], from_obj=[testbase.db.func.current_date()])
+ s = select([column('date', type_=DateTime)], from_obj=[testbase.db.func.current_date()])
q = s.execute().fetchone()[s.c.date]
r = s.alias('datequery').select().scalar()
@@ -374,8 +367,8 @@ class QueryTest(PersistTest):
def test_column_order_with_simple_query(self):
# should return values in column definition order
- self.users.insert().execute(user_id=1, user_name='foo')
- r = self.users.select(self.users.c.user_id==1).execute().fetchone()
+ users.insert().execute(user_id=1, user_name='foo')
+ r = users.select(users.c.user_id==1).execute().fetchone()
self.assertEqual(r[0], 1)
self.assertEqual(r[1], 'foo')
self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name'])
@@ -383,14 +376,14 @@ class QueryTest(PersistTest):
def test_column_order_with_text_query(self):
# should return values in query order
- self.users.insert().execute(user_id=1, user_name='foo')
+ users.insert().execute(user_id=1, user_name='foo')
r = testbase.db.execute('select user_name, user_id from query_users', {}).fetchone()
self.assertEqual(r[0], 'foo')
self.assertEqual(r[1], 1)
self.assertEqual([x.lower() for x in r.keys()], ['user_name', 'user_id'])
self.assertEqual(r.values(), ['foo', 1])
- @testbase.unsupported('oracle', 'firebird')
+ @testing.unsupported('oracle', 'firebird')
def test_column_accessor_shadow(self):
meta = MetaData(testbase.db)
shadowed = Table('test_shadowed', meta,
@@ -420,7 +413,7 @@ class QueryTest(PersistTest):
finally:
shadowed.drop(checkfirst=True)
- @testbase.supported('mssql')
+ @testing.supported('mssql')
def test_fetchid_trigger(self):
meta = MetaData(testbase.db)
t1 = Table('t1', meta,
@@ -446,7 +439,7 @@ class QueryTest(PersistTest):
con.execute("""drop trigger paj""")
meta.drop_all()
- @testbase.supported('mssql')
+ @testing.supported('mssql')
def test_insertid_schema(self):
meta = MetaData(testbase.db)
con = testbase.db.connect()
@@ -459,7 +452,7 @@ class QueryTest(PersistTest):
tbl.drop()
con.execute('drop schema paj')
- @testbase.supported('mssql')
+ @testing.supported('mssql')
def test_insertid_reserved(self):
meta = MetaData(testbase.db)
table = Table(
@@ -476,51 +469,52 @@ class QueryTest(PersistTest):
def test_in_filtering(self):
- """test the 'shortname' field on BindParamClause."""
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- self.users.insert().execute(user_id = 8, user_name = 'fred')
- self.users.insert().execute(user_id = 9, user_name = None)
+ """test the behavior of the in_() function."""
+
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'fred')
+ users.insert().execute(user_id = 9, user_name = None)
- s = self.users.select(self.users.c.user_name.in_())
+ s = users.select(users.c.user_name.in_())
r = s.execute().fetchall()
# No username is in empty set
assert len(r) == 0
- s = self.users.select(not_(self.users.c.user_name.in_()))
+ s = users.select(not_(users.c.user_name.in_()))
r = s.execute().fetchall()
# All usernames with a value are outside an empty set
assert len(r) == 2
- s = self.users.select(self.users.c.user_name.in_('jack','fred'))
+ s = users.select(users.c.user_name.in_('jack','fred'))
r = s.execute().fetchall()
assert len(r) == 2
- s = self.users.select(not_(self.users.c.user_name.in_('jack','fred')))
+ s = users.select(not_(users.c.user_name.in_('jack','fred')))
r = s.execute().fetchall()
# Null values are not outside any set
assert len(r) == 0
u = bindparam('search_key')
- s = self.users.select(u.in_())
+ s = users.select(u.in_())
r = s.execute(search_key='john').fetchall()
assert len(r) == 0
r = s.execute(search_key=None).fetchall()
assert len(r) == 0
- s = self.users.select(not_(u.in_()))
+ s = users.select(not_(u.in_()))
r = s.execute(search_key='john').fetchall()
assert len(r) == 3
r = s.execute(search_key=None).fetchall()
assert len(r) == 0
- s = self.users.select(self.users.c.user_name.in_() == True)
+ s = users.select(users.c.user_name.in_() == True)
r = s.execute().fetchall()
assert len(r) == 0
- s = self.users.select(self.users.c.user_name.in_() == False)
+ s = users.select(users.c.user_name.in_() == False)
r = s.execute().fetchall()
assert len(r) == 2
- s = self.users.select(self.users.c.user_name.in_() == None)
+ s = users.select(users.c.user_name.in_() == None)
r = s.execute().fetchall()
assert len(r) == 1
@@ -577,7 +571,7 @@ class CompoundTest(PersistTest):
assert u.execute().fetchall() == [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
assert u.alias('bar').select().execute().fetchall() == [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
- @testbase.unsupported('mysql')
+ @testing.unsupported('mysql')
def test_intersect(self):
i = intersect(
select([t2.c.col3, t2.c.col4]),
@@ -586,7 +580,7 @@ class CompoundTest(PersistTest):
assert i.execute().fetchall() == [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
assert i.alias('bar').select().execute().fetchall() == [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
- @testbase.unsupported('mysql', 'oracle')
+ @testing.unsupported('mysql', 'oracle')
def test_except_style1(self):
e = except_(union(
select([t1.c.col3, t1.c.col4]),
@@ -595,7 +589,7 @@ class CompoundTest(PersistTest):
), select([t2.c.col3, t2.c.col4]))
assert e.alias('bar').select().execute().fetchall() == [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')]
- @testbase.unsupported('mysql', 'oracle')
+ @testing.unsupported('mysql', 'oracle')
def test_except_style2(self):
e = except_(union(
select([t1.c.col3, t1.c.col4]),
@@ -605,7 +599,7 @@ class CompoundTest(PersistTest):
assert e.execute().fetchall() == [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')]
assert e.alias('bar').select().execute().fetchall() == [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')]
- @testbase.unsupported('sqlite', 'mysql', 'oracle')
+ @testing.unsupported('sqlite', 'mysql', 'oracle')
def test_except_style3(self):
# aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc
e = except_(
@@ -617,7 +611,7 @@ class CompoundTest(PersistTest):
)
self.assertEquals(e.execute().fetchall(), [('ccc',)])
- @testbase.unsupported('sqlite', 'mysql', 'oracle')
+ @testing.unsupported('sqlite', 'mysql', 'oracle')
def test_union_union_all(self):
e = union_all(
select([t1.c.col3]),
@@ -628,7 +622,7 @@ class CompoundTest(PersistTest):
)
self.assertEquals(e.execute().fetchall(), [('aaa',),('bbb',),('ccc',),('aaa',),('bbb',),('ccc',)])
- @testbase.unsupported('mysql')
+ @testing.unsupported('mysql')
def test_composite(self):
u = intersect(
select([t2.c.col3, t2.c.col4]),