summaryrefslogtreecommitdiff
path: root/test/sql/select.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql/select.py')
-rw-r--r--test/sql/select.py188
1 files changed, 130 insertions, 58 deletions
diff --git a/test/sql/select.py b/test/sql/select.py
index 4d3eb4ad7..a5cf061e2 100644
--- a/test/sql/select.py
+++ b/test/sql/select.py
@@ -1,8 +1,8 @@
-from testbase import PersistTest
import testbase
+import re, operator
from sqlalchemy import *
from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql
-import unittest, re, operator
+from testlib import *
# the select test now tests almost completely with TableClause/ColumnClause objects,
@@ -10,21 +10,21 @@ import unittest, re, operator
# so SQLAlchemy's SQL construction engine can be used with no database dependencies at all.
table1 = table('mytable',
- column('myid'),
- column('name'),
- column('description'),
+ column('myid', Integer),
+ column('name', String),
+ column('description', String),
)
table2 = table(
'myothertable',
- column('otherid'),
- column('othername'),
+ column('otherid', Integer),
+ column('othername', String),
)
table3 = table(
'thirdtable',
- column('userid'),
- column('otherstuff'),
+ column('userid', Integer),
+ column('otherstuff', String),
)
metadata = MetaData()
@@ -54,7 +54,7 @@ addresses = table('addresses',
class SQLTest(PersistTest):
def runtest(self, clause, result, dialect = None, params = None, checkparams = None):
c = clause.compile(parameters=params, dialect=dialect)
- self.echo("\nSQL String:\n" + str(c) + repr(c.get_params()))
+ print "\nSQL String:\n" + str(c) + repr(c.get_params())
cc = re.sub(r'\n', '', str(c))
self.assert_(cc == result, "\n'" + cc + "'\n does not match \n'" + result + "'")
if checkparams is not None:
@@ -130,6 +130,15 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
crit = q.c.myid == table1.c.myid
self.runtest(select(['*'], crit), """SELECT * FROM (SELECT mytable.myid AS myid FROM mytable ORDER BY mytable.myid) AS foo, mytable WHERE foo.myid = mytable.myid""", dialect=sqlite.dialect())
self.runtest(select(['*'], crit), """SELECT * FROM (SELECT mytable.myid AS myid FROM mytable) AS foo, mytable WHERE foo.myid = mytable.myid""", dialect=mssql.dialect())
+
+ def testmssql_aliases_schemas(self):
+ self.runtest(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable")
+
+ dialect = mssql.dialect()
+ self.runtest(table4.select(), "SELECT remotetable_1.rem_id, remotetable_1.datatype_id, remotetable_1.value FROM remote_owner.remotetable AS remotetable_1", dialect=dialect)
+
+ # TODO: this is probably incorrect; no "AS <foo>" is being applied to the table
+ self.runtest(table1.join(table4, table1.c.myid==table4.c.rem_id).select(), "SELECT mytable.myid, mytable.name, mytable.description, remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM mytable JOIN remote_owner.remotetable ON remotetable.rem_id = mytable.myid")
def testdontovercorrelate(self):
self.runtest(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""")
@@ -142,6 +151,11 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
self.runtest(select([table1, exists([1], from_obj=[table2]).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={})
def testwheresubquery(self):
+ s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s')
+ self.runtest(
+ select([users, s.c.street], from_obj=[s]),
+ """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""")
+
# TODO: this tests that you dont get a "SELECT column" without a FROM but its not working yet.
#self.runtest(
# table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')), ""
@@ -194,7 +208,20 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
s = select([table1.c.myid], scalar=True)
self.runtest(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) FROM myothertable")
-
+
+ s = select([table1.c.myid]).correlate(None).as_scalar()
+ self.runtest(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) FROM mytable")
+
+ s = select([table1.c.myid]).as_scalar()
+ self.runtest(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) FROM myothertable")
+
+ # test expressions against scalar selects
+ self.runtest(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :literal")
+ self.runtest(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :literal")
+ self.runtest(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :literal")
+
+ self.runtest(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo")
+
zips = table('zips',
column('zipcode'),
@@ -206,15 +233,17 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
column('nm')
)
zip = '12345'
- qlat = select([zips.c.latitude], zips.c.zipcode == zip, scalar=True, correlate=False)
- qlng = select([zips.c.longitude], zips.c.zipcode == zip, scalar=True, correlate=False)
+ qlat = select([zips.c.latitude], zips.c.zipcode == zip).correlate(None).as_scalar()
+ qlng = select([zips.c.longitude], zips.c.zipcode == zip).correlate(None).as_scalar()
q = select([places.c.id, places.c.nm, zips.c.zipcode, func.latlondist(qlat, qlng).label('dist')],
zips.c.zipcode==zip,
order_by = ['dist', places.c.nm]
)
- self.runtest(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE zips.zipcode = :zips_zipcode_1), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_2)) AS dist FROM places, zips WHERE zips.zipcode = :zips_zipcode ORDER BY dist, places.nm")
+ self.runtest(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE "
+ "zips.zipcode = :zips_zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_1)) AS dist "
+ "FROM places, zips WHERE zips.zipcode = :zips_zipcode_2 ORDER BY dist, places.nm")
zalias = zips.alias('main_zip')
qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode, scalar=True)
@@ -223,7 +252,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
order_by = ['dist', places.c.nm]
)
self.runtest(q, "SELECT places.id, places.nm, main_zip.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE zips.zipcode = main_zip.zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = main_zip.zipcode)) AS dist FROM places, zips AS main_zip ORDER BY dist, places.nm")
-
+
a1 = table2.alias('t2alias')
s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid, scalar=True)
j1 = table1.join(table2, table1.c.myid==table2.c.otherid)
@@ -261,28 +290,20 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
)
def testoperators(self):
- self.runtest(
- table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT mytable.name = :mytable_name"
- )
-
- self.runtest(
- literal("a") + literal("b") * literal("c"), ":literal + :literal_1 * :literal_2"
- )
# exercise arithmetic operators
for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'),
(operator.sub, '-'), (operator.div, '/'),
):
for (lhs, rhs, res) in (
- ('a', table1.c.myid, ':mytable_myid %s mytable.myid'),
- ('a', literal('b'), ':literal %s :literal_1'),
+ (5, table1.c.myid, ':mytable_myid %s mytable.myid'),
+ (5, literal(5), ':literal %s :literal_1'),
(table1.c.myid, 'b', 'mytable.myid %s :mytable_myid'),
- (table1.c.myid, literal('b'), 'mytable.myid %s :literal'),
+ (table1.c.myid, literal(2.7), 'mytable.myid %s :literal'),
(table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'),
- (literal('a'), 'b', ':literal %s :literal_1'),
- (literal('a'), table1.c.myid, ':literal %s mytable.myid'),
- (literal('a'), literal('b'), ':literal %s :literal_1'),
+ (literal(5), 8, ':literal %s :literal_1'),
+ (literal(6), table1.c.myid, ':literal %s mytable.myid'),
+ (literal(7), literal(5.5), ':literal %s :literal_1'),
):
self.runtest(py_op(lhs, rhs), res % sql_op)
@@ -314,6 +335,25 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
"\n'" + compiled + "'\n does not match\n'" +
fwd_sql + "'\n or\n'" + rev_sql + "'")
+ self.runtest(
+ table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND mytable.name != :mytable_name"
+ )
+
+ self.runtest(
+ table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT (mytable.name = :mytable_name AND mytable.name = :mytable_name_1 AND mytable.name = :mytable_name_2)"
+ )
+
+ self.runtest(
+ table1.select((table1.c.myid != 12) & ~table1.c.name),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT mytable.name"
+ )
+
+ self.runtest(
+ literal("a") + literal("b") * literal("c"), ":literal || :literal_1 * :literal_2"
+ )
+
# test the op() function, also that its results are further usable in expressions
self.runtest(
table1.select(table1.c.myid.op('hoho')(12)==14),
@@ -374,13 +414,18 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
def testalias(self):
# test the alias for a table1. column names stay the same, table name "changes" to "foo".
self.runtest(
- select([alias(table1, 'foo')])
+ select([table1.alias('foo')])
,"SELECT foo.myid, foo.name, foo.description FROM mytable AS foo")
-
+
+ for dialect in (firebird.dialect(), oracle.dialect()):
+ self.runtest(
+ select([table1.alias('foo')])
+ ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo"
+ ,dialect=dialect)
+
self.runtest(
- select([alias(table1, 'foo')])
- ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo"
- ,dialect=firebird.dialect())
+ select([table1.alias()])
+ ,"SELECT mytable_1.myid, mytable_1.name, mytable_1.description FROM mytable AS mytable_1")
# create a select for a join of two tables. use_labels means the column names will have
# labels tablename_columnname, which become the column keys accessible off the Selectable object.
@@ -401,6 +446,12 @@ myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \
WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :t2view_mytable_myid"
)
+
+ def test_prefixes(self):
+ self.runtest(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"),
+ "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING mytable.myid, mytable.name, mytable.description FROM mytable"
+ )
+
def testtext(self):
self.runtest(
text("select * from foo where lala = bar") ,
@@ -429,7 +480,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
s.append_column("column2")
s.append_whereclause("column1=12")
s.append_whereclause("column2=19")
- s.order_by("column1")
+ s = s.order_by("column1")
s.append_from("table1")
self.runtest(s, "SELECT column1, column2 FROM table1 WHERE column1=12 AND column2=19 ORDER BY column1")
@@ -468,7 +519,14 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
checkparams={'bar':4, 'whee': 7},
params={'bar':4, 'whee': 7, 'hoho':10},
)
-
+
+ self.runtest(
+ text("select * from foo where clock='05:06:07'"),
+ "select * from foo where clock='05:06:07'",
+ checkparams={},
+ params={},
+ )
+
dialect = postgres.dialect()
self.runtest(
text("select * from foo where lala=:bar and hoho=:whee"),
@@ -477,6 +535,13 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
params={'bar':4, 'whee': 7, 'hoho':10},
dialect=dialect
)
+ self.runtest(
+ text("select * from foo where clock='05:06:07' and mork='\:mindy'"),
+ "select * from foo where clock='05:06:07' and mork=':mindy'",
+ checkparams={},
+ params={},
+ dialect=dialect
+ )
dialect = sqlite.dialect()
self.runtest(
@@ -509,7 +574,7 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today
def testliteral(self):
self.runtest(select([literal("foo") + literal("bar")], from_obj=[table1]),
- "SELECT :literal + :literal_1 FROM mytable")
+ "SELECT :literal || :literal_1 FROM mytable")
def testcalculatedcolumns(self):
value_tbl = table('values',
@@ -663,7 +728,7 @@ FROM myothertable ORDER BY myid \
WHERE mytable.name = :mytable_name GROUP BY mytable.myid, mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description \
FROM mytable WHERE mytable.name = :mytable_name_1"
)
-
+
def test_compound_select_grouping(self):
self.runtest(
union_all(
@@ -716,6 +781,7 @@ EXISTS (select yay from foo where boo = lar)",
dialect=postgres.dialect()
)
+
self.runtest(query,
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \
@@ -835,16 +901,16 @@ myothertable.othername != :myothertable_othername OR EXISTS (select yay from foo
self.runtest(select([table1], table1.c.myid.in_('a', literal('b'))),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :literal)")
- self.runtest(select([table1], table1.c.myid.in_(literal('a') + 'a')),
+ self.runtest(select([table1], table1.c.myid.in_(literal(1) + 'a')),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :literal + :literal_1")
self.runtest(select([table1], table1.c.myid.in_(literal('a') +'a', 'b')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal + :literal_1, :mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal || :literal_1, :mytable_myid)")
self.runtest(select([table1], table1.c.myid.in_(literal('a') + literal('a'), literal('b'))),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal + :literal_1, :literal_2)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal || :literal_1, :literal_2)")
- self.runtest(select([table1], table1.c.myid.in_('a', literal('b') +'b')),
+ self.runtest(select([table1], table1.c.myid.in_(1, literal(3) + 4)),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :literal + :literal_1)")
self.runtest(select([table1], table1.c.myid.in_(literal('a') < 'b')),
@@ -862,7 +928,7 @@ myothertable.othername != :myothertable_othername OR EXISTS (select yay from foo
self.runtest(select([table1], table1.c.myid.in_(literal('a'), table1.c.myid +'a')),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, mytable.myid + :mytable_myid)")
- self.runtest(select([table1], table1.c.myid.in_(literal('a'), 'a' + table1.c.myid)),
+ self.runtest(select([table1], table1.c.myid.in_(literal(1), 'a' + table1.c.myid)),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :mytable_myid + mytable.myid)")
self.runtest(select([table1], table1.c.myid.in_(1, 2, 3)),
@@ -900,16 +966,6 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)")
- def testlateargs(self):
- """tests that a SELECT clause will have extra "WHERE" clauses added to it at compile time if extra arguments
- are sent"""
-
- self.runtest(table1.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name = :mytable_name AND mytable.myid = :mytable_myid", params={'myid':'3', 'name':'jack'})
-
- self.runtest(table1.select(table1.c.name=='jack'), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name", params={'myid':'3'})
-
- self.runtest(table1.select(table1.c.name=='jack'), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name", params={'myid':'3', 'name':'fred'})
-
def testcast(self):
tbl = table('casttest',
column('id', Integer),
@@ -963,8 +1019,8 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE
"SELECT op.field FROM op WHERE :literal + (op.field IN (:op_field, :op_field_1))")
self.runtest(table.select((5 + table.c.field).in_(5,6)),
"SELECT op.field FROM op WHERE :op_field + op.field IN (:literal, :literal_1)")
- self.runtest(table.select(not_(table.c.field == 5)),
- "SELECT op.field FROM op WHERE NOT op.field = :op_field")
+ self.runtest(table.select(not_(and_(table.c.field == 5, table.c.field == 7))),
+ "SELECT op.field FROM op WHERE NOT (op.field = :op_field AND op.field = :op_field_1)")
self.runtest(table.select(not_(table.c.field) == 5),
"SELECT op.field FROM op WHERE (NOT op.field) = :literal")
self.runtest(table.select((table.c.field == table.c.field).between(False, True)),
@@ -1019,12 +1075,17 @@ class CRUDTest(SQLTest):
values = {
table1.c.name : table1.c.name + "lala",
table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho'))
- }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :literal_2), name=mytable.name + :mytable_name WHERE mytable.myid = hoho(:hoho) AND mytable.name = :literal + mytable.name + :literal_1")
+ }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :literal), name=(mytable.name || :mytable_name) "
+ "WHERE mytable.myid = hoho(:hoho) AND mytable.name = :literal_1 || mytable.name || :literal_2")
def testcorrelatedupdate(self):
# test against a straight text subquery
- u = update(table1, values = {table1.c.name : text("select name from mytable where id=mytable.id")})
+ u = update(table1, values = {table1.c.name : text("(select name from mytable where id=mytable.id)")})
self.runtest(u, "UPDATE mytable SET name=(select name from mytable where id=mytable.id)")
+
+ mt = table1.alias()
+ u = update(table1, values = {table1.c.name : select([mt.c.name], mt.c.myid==table1.c.myid)})
+ self.runtest(u, "UPDATE mytable SET name=(SELECT mytable_1.name FROM mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)")
# test against a regular constructed subquery
s = select([table2], table2.c.otherid == table1.c.myid)
@@ -1043,7 +1104,18 @@ class CRUDTest(SQLTest):
def testdelete(self):
self.runtest(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid")
-
+
+ def testcorrelateddelete(self):
+ # test a non-correlated WHERE clause
+ s = select([table2.c.othername], table2.c.otherid == 7)
+ u = delete(table1, table1.c.name==s)
+ self.runtest(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid)")
+
+ # test one that is actually correlated...
+ s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
+ u = table1.delete(table1.c.name==s)
+ self.runtest(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)")
+
class SchemaTest(SQLTest):
def testselect(self):
# these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables