diff options
Diffstat (limited to 'test/sql/select.py')
-rw-r--r-- | test/sql/select.py | 188 |
1 files changed, 130 insertions, 58 deletions
diff --git a/test/sql/select.py b/test/sql/select.py index 4d3eb4ad7..a5cf061e2 100644 --- a/test/sql/select.py +++ b/test/sql/select.py @@ -1,8 +1,8 @@ -from testbase import PersistTest import testbase +import re, operator from sqlalchemy import * from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql -import unittest, re, operator +from testlib import * # the select test now tests almost completely with TableClause/ColumnClause objects, @@ -10,21 +10,21 @@ import unittest, re, operator # so SQLAlchemy's SQL construction engine can be used with no database dependencies at all. table1 = table('mytable', - column('myid'), - column('name'), - column('description'), + column('myid', Integer), + column('name', String), + column('description', String), ) table2 = table( 'myothertable', - column('otherid'), - column('othername'), + column('otherid', Integer), + column('othername', String), ) table3 = table( 'thirdtable', - column('userid'), - column('otherstuff'), + column('userid', Integer), + column('otherstuff', String), ) metadata = MetaData() @@ -54,7 +54,7 @@ addresses = table('addresses', class SQLTest(PersistTest): def runtest(self, clause, result, dialect = None, params = None, checkparams = None): c = clause.compile(parameters=params, dialect=dialect) - self.echo("\nSQL String:\n" + str(c) + repr(c.get_params())) + print "\nSQL String:\n" + str(c) + repr(c.get_params()) cc = re.sub(r'\n', '', str(c)) self.assert_(cc == result, "\n'" + cc + "'\n does not match \n'" + result + "'") if checkparams is not None: @@ -130,6 +130,15 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A crit = q.c.myid == table1.c.myid self.runtest(select(['*'], crit), """SELECT * FROM (SELECT mytable.myid AS myid FROM mytable ORDER BY mytable.myid) AS foo, mytable WHERE foo.myid = mytable.myid""", dialect=sqlite.dialect()) self.runtest(select(['*'], crit), """SELECT * FROM (SELECT mytable.myid AS myid FROM mytable) AS foo, mytable WHERE foo.myid = mytable.myid""", dialect=mssql.dialect()) + + def testmssql_aliases_schemas(self): + self.runtest(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable") + + dialect = mssql.dialect() + self.runtest(table4.select(), "SELECT remotetable_1.rem_id, remotetable_1.datatype_id, remotetable_1.value FROM remote_owner.remotetable AS remotetable_1", dialect=dialect) + + # TODO: this is probably incorrect; no "AS <foo>" is being applied to the table + self.runtest(table1.join(table4, table1.c.myid==table4.c.rem_id).select(), "SELECT mytable.myid, mytable.name, mytable.description, remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM mytable JOIN remote_owner.remotetable ON remotetable.rem_id = mytable.myid") def testdontovercorrelate(self): self.runtest(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""") @@ -142,6 +151,11 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A self.runtest(select([table1, exists([1], from_obj=[table2]).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={}) def testwheresubquery(self): + s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s') + self.runtest( + select([users, s.c.street], from_obj=[s]), + """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""") + # TODO: this tests that you dont get a "SELECT column" without a FROM but its not working yet. #self.runtest( # table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')), "" @@ -194,7 +208,20 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A s = select([table1.c.myid], scalar=True) self.runtest(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) FROM myothertable") - + + s = select([table1.c.myid]).correlate(None).as_scalar() + self.runtest(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) FROM mytable") + + s = select([table1.c.myid]).as_scalar() + self.runtest(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) FROM myothertable") + + # test expressions against scalar selects + self.runtest(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :literal") + self.runtest(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :literal") + self.runtest(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :literal") + + self.runtest(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo") + zips = table('zips', column('zipcode'), @@ -206,15 +233,17 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A column('nm') ) zip = '12345' - qlat = select([zips.c.latitude], zips.c.zipcode == zip, scalar=True, correlate=False) - qlng = select([zips.c.longitude], zips.c.zipcode == zip, scalar=True, correlate=False) + qlat = select([zips.c.latitude], zips.c.zipcode == zip).correlate(None).as_scalar() + qlng = select([zips.c.longitude], zips.c.zipcode == zip).correlate(None).as_scalar() q = select([places.c.id, places.c.nm, zips.c.zipcode, func.latlondist(qlat, qlng).label('dist')], zips.c.zipcode==zip, order_by = ['dist', places.c.nm] ) - self.runtest(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE zips.zipcode = :zips_zipcode_1), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_2)) AS dist FROM places, zips WHERE zips.zipcode = :zips_zipcode ORDER BY dist, places.nm") + self.runtest(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE " + "zips.zipcode = :zips_zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_1)) AS dist " + "FROM places, zips WHERE zips.zipcode = :zips_zipcode_2 ORDER BY dist, places.nm") zalias = zips.alias('main_zip') qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode, scalar=True) @@ -223,7 +252,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A order_by = ['dist', places.c.nm] ) self.runtest(q, "SELECT places.id, places.nm, main_zip.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE zips.zipcode = main_zip.zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = main_zip.zipcode)) AS dist FROM places, zips AS main_zip ORDER BY dist, places.nm") - + a1 = table2.alias('t2alias') s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid, scalar=True) j1 = table1.join(table2, table1.c.myid==table2.c.otherid) @@ -261,28 +290,20 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A ) def testoperators(self): - self.runtest( - table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT mytable.name = :mytable_name" - ) - - self.runtest( - literal("a") + literal("b") * literal("c"), ":literal + :literal_1 * :literal_2" - ) # exercise arithmetic operators for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'), (operator.sub, '-'), (operator.div, '/'), ): for (lhs, rhs, res) in ( - ('a', table1.c.myid, ':mytable_myid %s mytable.myid'), - ('a', literal('b'), ':literal %s :literal_1'), + (5, table1.c.myid, ':mytable_myid %s mytable.myid'), + (5, literal(5), ':literal %s :literal_1'), (table1.c.myid, 'b', 'mytable.myid %s :mytable_myid'), - (table1.c.myid, literal('b'), 'mytable.myid %s :literal'), + (table1.c.myid, literal(2.7), 'mytable.myid %s :literal'), (table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'), - (literal('a'), 'b', ':literal %s :literal_1'), - (literal('a'), table1.c.myid, ':literal %s mytable.myid'), - (literal('a'), literal('b'), ':literal %s :literal_1'), + (literal(5), 8, ':literal %s :literal_1'), + (literal(6), table1.c.myid, ':literal %s mytable.myid'), + (literal(7), literal(5.5), ':literal %s :literal_1'), ): self.runtest(py_op(lhs, rhs), res % sql_op) @@ -314,6 +335,25 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A "\n'" + compiled + "'\n does not match\n'" + fwd_sql + "'\n or\n'" + rev_sql + "'") + self.runtest( + table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND mytable.name != :mytable_name" + ) + + self.runtest( + table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT (mytable.name = :mytable_name AND mytable.name = :mytable_name_1 AND mytable.name = :mytable_name_2)" + ) + + self.runtest( + table1.select((table1.c.myid != 12) & ~table1.c.name), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT mytable.name" + ) + + self.runtest( + literal("a") + literal("b") * literal("c"), ":literal || :literal_1 * :literal_2" + ) + # test the op() function, also that its results are further usable in expressions self.runtest( table1.select(table1.c.myid.op('hoho')(12)==14), @@ -374,13 +414,18 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A def testalias(self): # test the alias for a table1. column names stay the same, table name "changes" to "foo". self.runtest( - select([alias(table1, 'foo')]) + select([table1.alias('foo')]) ,"SELECT foo.myid, foo.name, foo.description FROM mytable AS foo") - + + for dialect in (firebird.dialect(), oracle.dialect()): + self.runtest( + select([table1.alias('foo')]) + ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo" + ,dialect=dialect) + self.runtest( - select([alias(table1, 'foo')]) - ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo" - ,dialect=firebird.dialect()) + select([table1.alias()]) + ,"SELECT mytable_1.myid, mytable_1.name, mytable_1.description FROM mytable AS mytable_1") # create a select for a join of two tables. use_labels means the column names will have # labels tablename_columnname, which become the column keys accessible off the Selectable object. @@ -401,6 +446,12 @@ myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :t2view_mytable_myid" ) + + def test_prefixes(self): + self.runtest(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"), + "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING mytable.myid, mytable.name, mytable.description FROM mytable" + ) + def testtext(self): self.runtest( text("select * from foo where lala = bar") , @@ -429,7 +480,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = s.append_column("column2") s.append_whereclause("column1=12") s.append_whereclause("column2=19") - s.order_by("column1") + s = s.order_by("column1") s.append_from("table1") self.runtest(s, "SELECT column1, column2 FROM table1 WHERE column1=12 AND column2=19 ORDER BY column1") @@ -468,7 +519,14 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = checkparams={'bar':4, 'whee': 7}, params={'bar':4, 'whee': 7, 'hoho':10}, ) - + + self.runtest( + text("select * from foo where clock='05:06:07'"), + "select * from foo where clock='05:06:07'", + checkparams={}, + params={}, + ) + dialect = postgres.dialect() self.runtest( text("select * from foo where lala=:bar and hoho=:whee"), @@ -477,6 +535,13 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = params={'bar':4, 'whee': 7, 'hoho':10}, dialect=dialect ) + self.runtest( + text("select * from foo where clock='05:06:07' and mork='\:mindy'"), + "select * from foo where clock='05:06:07' and mork=':mindy'", + checkparams={}, + params={}, + dialect=dialect + ) dialect = sqlite.dialect() self.runtest( @@ -509,7 +574,7 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today def testliteral(self): self.runtest(select([literal("foo") + literal("bar")], from_obj=[table1]), - "SELECT :literal + :literal_1 FROM mytable") + "SELECT :literal || :literal_1 FROM mytable") def testcalculatedcolumns(self): value_tbl = table('values', @@ -663,7 +728,7 @@ FROM myothertable ORDER BY myid \ WHERE mytable.name = :mytable_name GROUP BY mytable.myid, mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description \ FROM mytable WHERE mytable.name = :mytable_name_1" ) - + def test_compound_select_grouping(self): self.runtest( union_all( @@ -716,6 +781,7 @@ EXISTS (select yay from foo where boo = lar)", dialect=postgres.dialect() ) + self.runtest(query, "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \ @@ -835,16 +901,16 @@ myothertable.othername != :myothertable_othername OR EXISTS (select yay from foo self.runtest(select([table1], table1.c.myid.in_('a', literal('b'))), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :literal)") - self.runtest(select([table1], table1.c.myid.in_(literal('a') + 'a')), + self.runtest(select([table1], table1.c.myid.in_(literal(1) + 'a')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :literal + :literal_1") self.runtest(select([table1], table1.c.myid.in_(literal('a') +'a', 'b')), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal + :literal_1, :mytable_myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal || :literal_1, :mytable_myid)") self.runtest(select([table1], table1.c.myid.in_(literal('a') + literal('a'), literal('b'))), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal + :literal_1, :literal_2)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal || :literal_1, :literal_2)") - self.runtest(select([table1], table1.c.myid.in_('a', literal('b') +'b')), + self.runtest(select([table1], table1.c.myid.in_(1, literal(3) + 4)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :literal + :literal_1)") self.runtest(select([table1], table1.c.myid.in_(literal('a') < 'b')), @@ -862,7 +928,7 @@ myothertable.othername != :myothertable_othername OR EXISTS (select yay from foo self.runtest(select([table1], table1.c.myid.in_(literal('a'), table1.c.myid +'a')), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, mytable.myid + :mytable_myid)") - self.runtest(select([table1], table1.c.myid.in_(literal('a'), 'a' + table1.c.myid)), + self.runtest(select([table1], table1.c.myid.in_(literal(1), 'a' + table1.c.myid)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :mytable_myid + mytable.myid)") self.runtest(select([table1], table1.c.myid.in_(1, 2, 3)), @@ -900,16 +966,6 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)") - def testlateargs(self): - """tests that a SELECT clause will have extra "WHERE" clauses added to it at compile time if extra arguments - are sent""" - - self.runtest(table1.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name = :mytable_name AND mytable.myid = :mytable_myid", params={'myid':'3', 'name':'jack'}) - - self.runtest(table1.select(table1.c.name=='jack'), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name", params={'myid':'3'}) - - self.runtest(table1.select(table1.c.name=='jack'), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name", params={'myid':'3', 'name':'fred'}) - def testcast(self): tbl = table('casttest', column('id', Integer), @@ -963,8 +1019,8 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE "SELECT op.field FROM op WHERE :literal + (op.field IN (:op_field, :op_field_1))") self.runtest(table.select((5 + table.c.field).in_(5,6)), "SELECT op.field FROM op WHERE :op_field + op.field IN (:literal, :literal_1)") - self.runtest(table.select(not_(table.c.field == 5)), - "SELECT op.field FROM op WHERE NOT op.field = :op_field") + self.runtest(table.select(not_(and_(table.c.field == 5, table.c.field == 7))), + "SELECT op.field FROM op WHERE NOT (op.field = :op_field AND op.field = :op_field_1)") self.runtest(table.select(not_(table.c.field) == 5), "SELECT op.field FROM op WHERE (NOT op.field) = :literal") self.runtest(table.select((table.c.field == table.c.field).between(False, True)), @@ -1019,12 +1075,17 @@ class CRUDTest(SQLTest): values = { table1.c.name : table1.c.name + "lala", table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho')) - }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :literal_2), name=mytable.name + :mytable_name WHERE mytable.myid = hoho(:hoho) AND mytable.name = :literal + mytable.name + :literal_1") + }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :literal), name=(mytable.name || :mytable_name) " + "WHERE mytable.myid = hoho(:hoho) AND mytable.name = :literal_1 || mytable.name || :literal_2") def testcorrelatedupdate(self): # test against a straight text subquery - u = update(table1, values = {table1.c.name : text("select name from mytable where id=mytable.id")}) + u = update(table1, values = {table1.c.name : text("(select name from mytable where id=mytable.id)")}) self.runtest(u, "UPDATE mytable SET name=(select name from mytable where id=mytable.id)") + + mt = table1.alias() + u = update(table1, values = {table1.c.name : select([mt.c.name], mt.c.myid==table1.c.myid)}) + self.runtest(u, "UPDATE mytable SET name=(SELECT mytable_1.name FROM mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)") # test against a regular constructed subquery s = select([table2], table2.c.otherid == table1.c.myid) @@ -1043,7 +1104,18 @@ class CRUDTest(SQLTest): def testdelete(self): self.runtest(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid") - + + def testcorrelateddelete(self): + # test a non-correlated WHERE clause + s = select([table2.c.othername], table2.c.otherid == 7) + u = delete(table1, table1.c.name==s) + self.runtest(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid)") + + # test one that is actually correlated... + s = select([table2.c.othername], table2.c.otherid == table1.c.myid) + u = table1.delete(table1.c.name==s) + self.runtest(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)") + class SchemaTest(SQLTest): def testselect(self): # these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables |