#! coding:utf-8 """ compiler tests. These tests are among the very first that were written when SQLAlchemy began in 2005. As a result the testing style here is very dense; it's an ongoing job to break these into much smaller tests with correct pep8 styling and coherent test organization. """ import decimal from sqlalchemy import alias from sqlalchemy import and_ from sqlalchemy import asc from sqlalchemy import bindparam from sqlalchemy import Boolean from sqlalchemy import case from sqlalchemy import cast from sqlalchemy import CheckConstraint from sqlalchemy import Column from sqlalchemy import Date from sqlalchemy import desc from sqlalchemy import distinct from sqlalchemy import exc from sqlalchemy import except_ from sqlalchemy import exists from sqlalchemy import Float from sqlalchemy import func from sqlalchemy import Integer from sqlalchemy import intersect from sqlalchemy import join from sqlalchemy import literal from sqlalchemy import literal_column from sqlalchemy import MetaData from sqlalchemy import not_ from sqlalchemy import null from sqlalchemy import Numeric from sqlalchemy import or_ from sqlalchemy import outerjoin from sqlalchemy import over from sqlalchemy import schema from sqlalchemy import select from sqlalchemy import sql from sqlalchemy import String from sqlalchemy import subquery from sqlalchemy import Table from sqlalchemy import Text from sqlalchemy import text from sqlalchemy import TIMESTAMP from sqlalchemy import true from sqlalchemy import tuple_ from sqlalchemy import type_coerce from sqlalchemy import types from sqlalchemy import union from sqlalchemy import union_all from sqlalchemy import util from sqlalchemy.dialects import mysql from sqlalchemy.dialects import oracle from sqlalchemy.dialects import postgresql from sqlalchemy.dialects import sqlite from sqlalchemy.dialects import sybase from sqlalchemy.dialects.postgresql.base import PGCompiler from sqlalchemy.dialects.postgresql.base import PGDialect from sqlalchemy.engine import default from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql import column from sqlalchemy.sql import compiler from sqlalchemy.sql import label from sqlalchemy.sql import table from sqlalchemy.sql.expression import _literal_as_text from sqlalchemy.sql.expression import ClauseList from sqlalchemy.sql.expression import HasPrefixes from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import eq_ from sqlalchemy.testing import eq_ignore_whitespace from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.util import u table1 = table( "mytable", column("myid", Integer), column("name", String), column("description", String), ) table2 = table( "myothertable", column("otherid", Integer), column("othername", String) ) table3 = table( "thirdtable", column("userid", Integer), column("otherstuff", String) ) metadata = MetaData() # table with a schema table4 = Table( "remotetable", metadata, Column("rem_id", Integer, primary_key=True), Column("datatype_id", Integer), Column("value", String(20)), schema="remote_owner", ) # table with a 'multipart' schema table5 = Table( "remotetable", metadata, Column("rem_id", Integer, primary_key=True), Column("datatype_id", Integer), Column("value", String(20)), schema="dbo.remote_owner", ) users = table( "users", column("user_id"), column("user_name"), column("password") ) addresses = table( "addresses", column("address_id"), column("user_id"), column("street"), column("city"), column("state"), column("zip"), ) keyed = Table( "keyed", metadata, Column("x", Integer, key="colx"), Column("y", Integer, key="coly"), Column("z", Integer), ) class SelectTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def test_attribute_sanity(self): assert hasattr(table1, "c") assert hasattr(table1.select(), "c") assert not hasattr(table1.c.myid.self_group(), "columns") assert hasattr(table1.select().self_group(), "columns") assert not hasattr(table1.c.myid, "columns") assert not hasattr(table1.c.myid, "c") assert not hasattr(table1.select().c.myid, "c") assert not hasattr(table1.select().c.myid, "columns") assert not hasattr(table1.alias().c.myid, "columns") assert not hasattr(table1.alias().c.myid, "c") if util.compat.py32: assert_raises_message( exc.InvalidRequestError, "Scalar Select expression has no " "columns; use this object directly within a " "column-level expression.", lambda: hasattr( select([table1.c.myid]).as_scalar().self_group(), "columns" ), ) assert_raises_message( exc.InvalidRequestError, "Scalar Select expression has no " "columns; use this object directly within a " "column-level expression.", lambda: hasattr( select([table1.c.myid]).as_scalar(), "columns" ), ) else: assert not hasattr( select([table1.c.myid]).as_scalar().self_group(), "columns" ) assert not hasattr(select([table1.c.myid]).as_scalar(), "columns") def test_prefix_constructor(self): class Pref(HasPrefixes): def _generate(self): return self assert_raises( exc.ArgumentError, Pref().prefix_with, "some prefix", not_a_dialect=True, ) def test_table_select(self): self.assert_compile( table1.select(), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable", ) self.assert_compile( select([table1, table2]), "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername FROM mytable, " "myothertable", ) def test_invalid_col_argument(self): assert_raises(exc.ArgumentError, select, table1) assert_raises(exc.ArgumentError, select, table1.c.myid) def test_int_limit_offset_coercion(self): for given, exp in [ ("5", 5), (5, 5), (5.2, 5), (decimal.Decimal("5"), 5), (None, None), ]: eq_(select().limit(given)._limit, exp) eq_(select().offset(given)._offset, exp) eq_(select(limit=given)._limit, exp) eq_(select(offset=given)._offset, exp) assert_raises(ValueError, select().limit, "foo") assert_raises(ValueError, select().offset, "foo") assert_raises(ValueError, select, offset="foo") assert_raises(ValueError, select, limit="foo") def test_limit_offset_no_int_coercion_one(self): exp1 = literal_column("Q") exp2 = literal_column("Y") self.assert_compile( select([1]).limit(exp1).offset(exp2), "SELECT 1 LIMIT Q OFFSET Y" ) self.assert_compile( select([1]).limit(bindparam("x")).offset(bindparam("y")), "SELECT 1 LIMIT :x OFFSET :y", ) def test_limit_offset_no_int_coercion_two(self): exp1 = literal_column("Q") exp2 = literal_column("Y") sel = select([1]).limit(exp1).offset(exp2) assert_raises_message( exc.CompileError, "This SELECT structure does not use a simple integer " "value for limit", getattr, sel, "_limit", ) assert_raises_message( exc.CompileError, "This SELECT structure does not use a simple integer " "value for offset", getattr, sel, "_offset", ) def test_limit_offset_no_int_coercion_three(self): exp1 = bindparam("Q") exp2 = bindparam("Y") sel = select([1]).limit(exp1).offset(exp2) assert_raises_message( exc.CompileError, "This SELECT structure does not use a simple integer " "value for limit", getattr, sel, "_limit", ) assert_raises_message( exc.CompileError, "This SELECT structure does not use a simple integer " "value for offset", getattr, sel, "_offset", ) def test_limit_offset(self): for lim, offset, exp, params in [ ( 5, 10, "LIMIT :param_1 OFFSET :param_2", {"param_1": 5, "param_2": 10}, ), (None, 10, "LIMIT -1 OFFSET :param_1", {"param_1": 10}), (5, None, "LIMIT :param_1", {"param_1": 5}), ( 0, 0, "LIMIT :param_1 OFFSET :param_2", {"param_1": 0, "param_2": 0}, ), ]: self.assert_compile( select([1]).limit(lim).offset(offset), "SELECT 1 " + exp, checkparams=params, ) def test_limit_offset_select_literal_binds(self): stmt = select([1]).limit(5).offset(6) self.assert_compile( stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True ) def test_limit_offset_compound_select_literal_binds(self): stmt = select([1]).union(select([2])).limit(5).offset(6) self.assert_compile( stmt, "SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6", literal_binds=True, ) def test_select_precol_compile_ordering(self): s1 = select([column("x")]).select_from(text("a")).limit(5).as_scalar() s2 = select([s1]).limit(10) class MyCompiler(compiler.SQLCompiler): def get_select_precolumns(self, select, **kw): result = "" if select._limit: result += "FIRST %s " % self.process( literal(select._limit), **kw ) if select._offset: result += "SKIP %s " % self.process( literal(select._offset), **kw ) return result def limit_clause(self, select, **kw): return "" dialect = default.DefaultDialect() dialect.statement_compiler = MyCompiler dialect.paramstyle = "qmark" dialect.positional = True self.assert_compile( s2, "SELECT FIRST ? (SELECT FIRST ? x FROM a) AS anon_1", checkpositional=(10, 5), dialect=dialect, ) def test_from_subquery(self): """tests placing select statements in the column clause of another select, for the purposes of selecting from the exported columns of that select.""" s = select([table1], table1.c.name == "jack") self.assert_compile( select([s], s.c.myid == 7), "SELECT myid, name, description FROM " "(SELECT mytable.myid AS myid, " "mytable.name AS name, mytable.description AS description " "FROM mytable " "WHERE mytable.name = :name_1) WHERE myid = :myid_1", ) sq = select([table1]) self.assert_compile( sq.select(), "SELECT myid, name, description FROM " "(SELECT mytable.myid AS myid, " "mytable.name AS name, mytable.description " "AS description FROM mytable)", ) sq = select([table1]).alias("sq") self.assert_compile( sq.select(sq.c.myid == 7), "SELECT sq.myid, sq.name, sq.description FROM " "(SELECT mytable.myid AS myid, mytable.name AS name, " "mytable.description AS description FROM mytable) AS sq " "WHERE sq.myid = :myid_1", ) sq = select( [table1, table2], and_(table1.c.myid == 7, table2.c.otherid == table1.c.myid), use_labels=True, ).alias("sq") sqstring = ( "SELECT mytable.myid AS mytable_myid, mytable.name AS " "mytable_name, mytable.description AS mytable_description, " "myothertable.otherid AS myothertable_otherid, " "myothertable.othername AS myothertable_othername FROM " "mytable, myothertable WHERE mytable.myid = :myid_1 AND " "myothertable.otherid = mytable.myid" ) self.assert_compile( sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, " "sq.mytable_description, sq.myothertable_otherid, " "sq.myothertable_othername FROM (%s) AS sq" % sqstring, ) sq2 = select([sq], use_labels=True).alias("sq2") self.assert_compile( sq2.select(), "SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, " "sq2.sq_mytable_description, sq2.sq_myothertable_otherid, " "sq2.sq_myothertable_othername FROM " "(SELECT sq.mytable_myid AS " "sq_mytable_myid, sq.mytable_name AS sq_mytable_name, " "sq.mytable_description AS sq_mytable_description, " "sq.myothertable_otherid AS sq_myothertable_otherid, " "sq.myothertable_othername AS sq_myothertable_othername " "FROM (%s) AS sq) AS sq2" % sqstring, ) def test_select_from_clauselist(self): self.assert_compile( select([ClauseList(column("a"), column("b"))]).select_from( text("sometable") ), "SELECT a, b FROM sometable", ) def test_use_labels(self): self.assert_compile( select([table1.c.myid == 5], use_labels=True), "SELECT mytable.myid = :myid_1 AS anon_1 FROM mytable", ) self.assert_compile( select([func.foo()], use_labels=True), "SELECT foo() AS foo_1" ) # this is native_boolean=False for default dialect self.assert_compile( select([not_(True)], use_labels=True), "SELECT :param_1 = 0 AS anon_1", ) self.assert_compile( select([cast("data", Integer)], use_labels=True), "SELECT CAST(:param_1 AS INTEGER) AS anon_1", ) self.assert_compile( select( [func.sum(func.lala(table1.c.myid).label("foo")).label("bar")] ), "SELECT sum(lala(mytable.myid)) AS bar FROM mytable", ) self.assert_compile( select([keyed]), "SELECT keyed.x, keyed.y" ", keyed.z FROM keyed" ) self.assert_compile( select([keyed]).apply_labels(), "SELECT keyed.x AS keyed_x, keyed.y AS " "keyed_y, keyed.z AS keyed_z FROM keyed", ) def test_paramstyles(self): stmt = text("select :foo, :bar, :bat from sometable") self.assert_compile( stmt, "select ?, ?, ? from sometable", dialect=default.DefaultDialect(paramstyle="qmark"), ) self.assert_compile( stmt, "select :foo, :bar, :bat from sometable", dialect=default.DefaultDialect(paramstyle="named"), ) self.assert_compile( stmt, "select %s, %s, %s from sometable", dialect=default.DefaultDialect(paramstyle="format"), ) self.assert_compile( stmt, "select :1, :2, :3 from sometable", dialect=default.DefaultDialect(paramstyle="numeric"), ) self.assert_compile( stmt, "select %(foo)s, %(bar)s, %(bat)s from sometable", dialect=default.DefaultDialect(paramstyle="pyformat"), ) def test_anon_param_name_on_keys(self): self.assert_compile( keyed.insert(), "INSERT INTO keyed (x, y, z) VALUES (%(colx)s, %(coly)s, %(z)s)", dialect=default.DefaultDialect(paramstyle="pyformat"), ) self.assert_compile( keyed.c.coly == 5, "keyed.y = %(coly_1)s", checkparams={"coly_1": 5}, dialect=default.DefaultDialect(paramstyle="pyformat"), ) def test_dupe_columns(self): """test that deduping is performed against clause element identity, not rendered result.""" self.assert_compile( select([column("a"), column("a"), column("a")]), "SELECT a, a, a", dialect=default.DefaultDialect(), ) c = column("a") self.assert_compile( select([c, c, c]), "SELECT a", dialect=default.DefaultDialect() ) a, b = column("a"), column("b") self.assert_compile( select([a, b, b, b, a, a]), "SELECT a, b", dialect=default.DefaultDialect(), ) # using alternate keys. a, b, c = ( Column("a", Integer, key="b"), Column("b", Integer), Column("c", Integer, key="a"), ) self.assert_compile( select([a, b, c, a, b, c]), "SELECT a, b, c", dialect=default.DefaultDialect(), ) self.assert_compile( select([bindparam("a"), bindparam("b"), bindparam("c")]), "SELECT :a AS anon_1, :b AS anon_2, :c AS anon_3", dialect=default.DefaultDialect(paramstyle="named"), ) self.assert_compile( select([bindparam("a"), bindparam("b"), bindparam("c")]), "SELECT ? AS anon_1, ? AS anon_2, ? AS anon_3", dialect=default.DefaultDialect(paramstyle="qmark"), ) self.assert_compile( select([column("a"), column("a"), column("a")]), "SELECT a, a, a" ) s = select([bindparam("a"), bindparam("b"), bindparam("c")]) s = s.compile(dialect=default.DefaultDialect(paramstyle="qmark")) eq_(s.positiontup, ["a", "b", "c"]) def test_nested_label_targeting(self): """test nested anonymous label generation. """ s1 = table1.select() s2 = s1.alias() s3 = select([s2], use_labels=True) s4 = s3.alias() s5 = select([s4], use_labels=True) self.assert_compile( s5, "SELECT anon_1.anon_2_myid AS " "anon_1_anon_2_myid, anon_1.anon_2_name AS " "anon_1_anon_2_name, anon_1.anon_2_descript" "ion AS anon_1_anon_2_description FROM " "(SELECT anon_2.myid AS anon_2_myid, " "anon_2.name AS anon_2_name, " "anon_2.description AS anon_2_description " "FROM (SELECT mytable.myid AS myid, " "mytable.name AS name, mytable.description " "AS description FROM mytable) AS anon_2) " "AS anon_1", ) def test_nested_label_targeting_keyed(self): s1 = keyed.select() s2 = s1.alias() s3 = select([s2], use_labels=True) self.assert_compile( s3, "SELECT anon_1.x AS anon_1_x, " "anon_1.y AS anon_1_y, " "anon_1.z AS anon_1_z FROM " "(SELECT keyed.x AS x, keyed.y " "AS y, keyed.z AS z FROM keyed) AS anon_1", ) s4 = s3.alias() s5 = select([s4], use_labels=True) self.assert_compile( s5, "SELECT anon_1.anon_2_x AS anon_1_anon_2_x, " "anon_1.anon_2_y AS anon_1_anon_2_y, " "anon_1.anon_2_z AS anon_1_anon_2_z " "FROM (SELECT anon_2.x AS anon_2_x, " "anon_2.y AS anon_2_y, " "anon_2.z AS anon_2_z FROM " "(SELECT keyed.x AS x, keyed.y AS y, keyed.z " "AS z FROM keyed) AS anon_2) AS anon_1", ) def test_exists(self): s = select([table1.c.myid]).where(table1.c.myid == 5) self.assert_compile( exists(s), "EXISTS (SELECT mytable.myid FROM mytable " "WHERE mytable.myid = :myid_1)", ) self.assert_compile( exists(s.as_scalar()), "EXISTS (SELECT mytable.myid FROM mytable " "WHERE mytable.myid = :myid_1)", ) self.assert_compile( exists([table1.c.myid], table1.c.myid == 5).select(), "SELECT EXISTS (SELECT mytable.myid FROM " "mytable WHERE mytable.myid = :myid_1) AS anon_1", params={"mytable_myid": 5}, ) self.assert_compile( select([table1, exists([1], from_obj=table2)]), "SELECT mytable.myid, mytable.name, " "mytable.description, EXISTS (SELECT 1 " "FROM myothertable) AS anon_1 FROM mytable", params={}, ) self.assert_compile( select([table1, exists([1], from_obj=table2).label("foo")]), "SELECT mytable.myid, mytable.name, " "mytable.description, EXISTS (SELECT 1 " "FROM myothertable) AS foo FROM mytable", params={}, ) self.assert_compile( table1.select( exists() .where(table2.c.otherid == table1.c.myid) .correlate(table1) ), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable WHERE " "EXISTS (SELECT * FROM myothertable WHERE " "myothertable.otherid = mytable.myid)", ) self.assert_compile( table1.select( exists() .where(table2.c.otherid == table1.c.myid) .correlate(table1) ), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable WHERE " "EXISTS (SELECT * FROM myothertable WHERE " "myothertable.otherid = mytable.myid)", ) self.assert_compile( table1.select( exists() .where(table2.c.otherid == table1.c.myid) .correlate(table1) ).replace_selectable(table2, table2.alias()), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable WHERE " "EXISTS (SELECT * FROM myothertable AS " "myothertable_1 WHERE myothertable_1.otheri" "d = mytable.myid)", ) self.assert_compile( table1.select( exists() .where(table2.c.otherid == table1.c.myid) .correlate(table1) ) .select_from( table1.join(table2, table1.c.myid == table2.c.otherid) ) .replace_selectable(table2, table2.alias()), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable JOIN " "myothertable AS myothertable_1 ON " "mytable.myid = myothertable_1.otherid " "WHERE EXISTS (SELECT * FROM myothertable " "AS myothertable_1 WHERE " "myothertable_1.otherid = mytable.myid)", ) self.assert_compile( select( [ or_( exists().where(table2.c.otherid == "foo"), exists().where(table2.c.otherid == "bar"), ) ] ), "SELECT (EXISTS (SELECT * FROM myothertable " "WHERE myothertable.otherid = :otherid_1)) " "OR (EXISTS (SELECT * FROM myothertable WHERE " "myothertable.otherid = :otherid_2)) AS anon_1", ) self.assert_compile( select([exists([1])]), "SELECT EXISTS (SELECT 1) AS anon_1" ) self.assert_compile( select([~exists([1])]), "SELECT NOT (EXISTS (SELECT 1)) AS anon_1" ) self.assert_compile( select([~(~exists([1]))]), "SELECT NOT (NOT (EXISTS (SELECT 1))) AS anon_1", ) def test_where_subquery(self): s = select( [addresses.c.street], addresses.c.user_id == users.c.user_id, correlate=True, ).alias("s") # don't correlate in a FROM list self.assert_compile( select([users, s.c.street], from_obj=s), "SELECT users.user_id, users.user_name, " "users.password, s.street FROM users, " "(SELECT addresses.street AS street FROM " "addresses, users WHERE addresses.user_id = " "users.user_id) AS s", ) self.assert_compile( table1.select( table1.c.myid == select([table1.c.myid], table1.c.name == "jack") ), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable WHERE " "mytable.myid = (SELECT mytable.myid FROM " "mytable WHERE mytable.name = :name_1)", ) self.assert_compile( table1.select( table1.c.myid == select( [table2.c.otherid], table1.c.name == table2.c.othername ) ), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable WHERE " "mytable.myid = (SELECT " "myothertable.otherid FROM myothertable " "WHERE mytable.name = myothertable.othernam" "e)", ) self.assert_compile( table1.select(exists([1], table2.c.otherid == table1.c.myid)), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable WHERE " "EXISTS (SELECT 1 FROM myothertable WHERE " "myothertable.otherid = mytable.myid)", ) talias = table1.alias("ta") s = subquery( "sq2", [talias], exists([1], table2.c.otherid == talias.c.myid) ) self.assert_compile( select([s, table1]), "SELECT sq2.myid, sq2.name, " "sq2.description, mytable.myid, " "mytable.name, mytable.description FROM " "(SELECT ta.myid AS myid, ta.name AS name, " "ta.description AS description FROM " "mytable AS ta WHERE EXISTS (SELECT 1 FROM " "myothertable WHERE myothertable.otherid = " "ta.myid)) AS sq2, mytable", ) # test constructing the outer query via append_column(), which # occurs in the ORM's Query object s = select( [], exists([1], table2.c.otherid == table1.c.myid), from_obj=table1 ) s.append_column(table1) self.assert_compile( s, "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable WHERE " "EXISTS (SELECT 1 FROM myothertable WHERE " "myothertable.otherid = mytable.myid)", ) def test_orderby_subquery(self): self.assert_compile( table1.select( order_by=[ select( [table2.c.otherid], table1.c.myid == table2.c.otherid ) ] ), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable ORDER BY " "(SELECT myothertable.otherid FROM " "myothertable WHERE mytable.myid = " "myothertable.otherid)", ) self.assert_compile( table1.select( order_by=[ desc( select( [table2.c.otherid], table1.c.myid == table2.c.otherid, ) ) ] ), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable ORDER BY " "(SELECT myothertable.otherid FROM " "myothertable WHERE mytable.myid = " "myothertable.otherid) DESC", ) def test_scalar_select(self): assert_raises_message( exc.InvalidRequestError, r"Select objects don't have a type\. Call as_scalar\(\) " r"on this Select object to return a 'scalar' " r"version of this Select\.", func.coalesce, select([table1.c.myid]), ) s = select([table1.c.myid], correlate=False).as_scalar() self.assert_compile( select([table1, s]), "SELECT mytable.myid, mytable.name, " "mytable.description, (SELECT mytable.myid " "FROM mytable) AS anon_1 FROM mytable", ) s = select([table1.c.myid]).as_scalar() self.assert_compile( select([table2, s]), "SELECT myothertable.otherid, " "myothertable.othername, (SELECT " "mytable.myid FROM mytable) AS anon_1 FROM " "myothertable", ) s = select([table1.c.myid]).correlate(None).as_scalar() self.assert_compile( select([table1, s]), "SELECT mytable.myid, mytable.name, " "mytable.description, (SELECT mytable.myid " "FROM mytable) AS anon_1 FROM mytable", ) s = select([table1.c.myid]).as_scalar() s2 = s.where(table1.c.myid == 5) self.assert_compile( s2, "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)", ) self.assert_compile(s, "(SELECT mytable.myid FROM mytable)") # test that aliases use as_scalar() when used in an explicitly # scalar context s = select([table1.c.myid]).alias() self.assert_compile( select([table1.c.myid]).where(table1.c.myid == s), "SELECT mytable.myid FROM mytable WHERE " "mytable.myid = (SELECT mytable.myid FROM " "mytable)", ) self.assert_compile( select([table1.c.myid]).where(s > table1.c.myid), "SELECT mytable.myid FROM mytable WHERE " "mytable.myid < (SELECT mytable.myid FROM " "mytable)", ) s = select([table1.c.myid]).as_scalar() self.assert_compile( select([table2, s]), "SELECT myothertable.otherid, " "myothertable.othername, (SELECT " "mytable.myid FROM mytable) AS anon_1 FROM " "myothertable", ) # test expressions against scalar selects self.assert_compile( select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) " "- :param_1 AS anon_1", ) self.assert_compile( select([select([table1.c.name]).as_scalar() + literal("x")]), "SELECT (SELECT mytable.name FROM mytable) " "|| :param_1 AS anon_1", ) self.assert_compile( select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) " "> :param_1 AS anon_1", ) self.assert_compile( select([select([table1.c.name]).label("foo")]), "SELECT (SELECT mytable.name FROM mytable) " "AS foo", ) # scalar selects should not have any attributes on their 'c' or # 'columns' attribute s = select([table1.c.myid]).as_scalar() try: s.c.foo except exc.InvalidRequestError as err: assert ( str(err) == "Scalar Select expression has no columns; use this " "object directly within a column-level expression." ) try: s.columns.foo except exc.InvalidRequestError as err: assert ( str(err) == "Scalar Select expression has no columns; use this " "object directly within a column-level expression." ) zips = table( "zips", column("zipcode"), column("latitude"), column("longitude") ) places = table("places", column("id"), column("nm")) zipcode = "12345" qlat = ( select([zips.c.latitude], zips.c.zipcode == zipcode) .correlate(None) .as_scalar() ) qlng = ( select([zips.c.longitude], zips.c.zipcode == zipcode) .correlate(None) .as_scalar() ) q = select( [ places.c.id, places.c.nm, zips.c.zipcode, func.latlondist(qlat, qlng).label("dist"), ], zips.c.zipcode == zipcode, order_by=["dist", places.c.nm], ) self.assert_compile( q, "SELECT places.id, places.nm, " "zips.zipcode, latlondist((SELECT " "zips.latitude FROM zips WHERE " "zips.zipcode = :zipcode_1), (SELECT " "zips.longitude FROM zips WHERE " "zips.zipcode = :zipcode_2)) AS dist FROM " "places, zips WHERE zips.zipcode = " ":zipcode_3 ORDER BY dist, places.nm", ) zalias = zips.alias("main_zip") qlat = select( [zips.c.latitude], zips.c.zipcode == zalias.c.zipcode ).as_scalar() qlng = select( [zips.c.longitude], zips.c.zipcode == zalias.c.zipcode ).as_scalar() q = select( [ places.c.id, places.c.nm, zalias.c.zipcode, func.latlondist(qlat, qlng).label("dist"), ], order_by=["dist", places.c.nm], ) self.assert_compile( q, "SELECT places.id, places.nm, " "main_zip.zipcode, latlondist((SELECT " "zips.latitude FROM zips WHERE " "zips.zipcode = main_zip.zipcode), (SELECT " "zips.longitude FROM zips WHERE " "zips.zipcode = main_zip.zipcode)) AS dist " "FROM places, zips AS main_zip ORDER BY " "dist, places.nm", ) a1 = table2.alias("t2alias") s1 = select([a1.c.otherid], table1.c.myid == a1.c.otherid).as_scalar() j1 = table1.join(table2, table1.c.myid == table2.c.otherid) s2 = select([table1, s1], from_obj=j1) self.assert_compile( s2, "SELECT mytable.myid, mytable.name, " "mytable.description, (SELECT " "t2alias.otherid FROM myothertable AS " "t2alias WHERE mytable.myid = " "t2alias.otherid) AS anon_1 FROM mytable " "JOIN myothertable ON mytable.myid = " "myothertable.otherid", ) def test_label_comparison_one(self): x = func.lala(table1.c.myid).label("foo") self.assert_compile( select([x], x == 5), "SELECT lala(mytable.myid) AS foo FROM " "mytable WHERE lala(mytable.myid) = " ":param_1", ) def test_label_comparison_two(self): self.assert_compile( label("bar", column("foo", type_=String)) + "foo", "foo || :param_1", ) def test_order_by_labels_enabled(self): lab1 = (table1.c.myid + 12).label("foo") lab2 = func.somefunc(table1.c.name).label("bar") dialect = default.DefaultDialect() self.assert_compile( select([lab1, lab2]).order_by(lab1, desc(lab2)), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY foo, bar DESC", dialect=dialect, ) # the function embedded label renders as the function self.assert_compile( select([lab1, lab2]).order_by(func.hoho(lab1), desc(lab2)), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY hoho(mytable.myid + :myid_1), bar DESC", dialect=dialect, ) # binary expressions render as the expression without labels self.assert_compile( select([lab1, lab2]).order_by(lab1 + "test"), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY mytable.myid + :myid_1 + :param_1", dialect=dialect, ) # labels within functions in the columns clause render # with the expression self.assert_compile( select([lab1, func.foo(lab1)]).order_by(lab1, func.foo(lab1)), "SELECT mytable.myid + :myid_1 AS foo, " "foo(mytable.myid + :myid_1) AS foo_1 FROM mytable " "ORDER BY foo, foo(mytable.myid + :myid_1)", dialect=dialect, ) lx = (table1.c.myid + table1.c.myid).label("lx") ly = (func.lower(table1.c.name) + table1.c.description).label("ly") self.assert_compile( select([lx, ly]).order_by(lx, ly.desc()), "SELECT mytable.myid + mytable.myid AS lx, " "lower(mytable.name) || mytable.description AS ly " "FROM mytable ORDER BY lx, ly DESC", dialect=dialect, ) # expression isn't actually the same thing (even though label is) self.assert_compile( select([lab1, lab2]).order_by( table1.c.myid.label("foo"), desc(table1.c.name.label("bar")) ), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY mytable.myid, mytable.name DESC", dialect=dialect, ) # it's also an exact match, not aliased etc. self.assert_compile( select([lab1, lab2]).order_by( desc(table1.alias().c.name.label("bar")) ), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY mytable_1.name DESC", dialect=dialect, ) # but! it's based on lineage lab2_lineage = lab2.element._clone() self.assert_compile( select([lab1, lab2]).order_by(desc(lab2_lineage.label("bar"))), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY bar DESC", dialect=dialect, ) # here, 'name' is implicitly available, but w/ #3882 we don't # want to render a name that isn't specifically a Label elsewhere # in the query self.assert_compile( select([table1.c.myid]).order_by(table1.c.name.label("name")), "SELECT mytable.myid FROM mytable ORDER BY mytable.name", ) # as well as if it doesn't match self.assert_compile( select([table1.c.myid]).order_by( func.lower(table1.c.name).label("name") ), "SELECT mytable.myid FROM mytable ORDER BY lower(mytable.name)", ) def test_order_by_labels_disabled(self): lab1 = (table1.c.myid + 12).label("foo") lab2 = func.somefunc(table1.c.name).label("bar") dialect = default.DefaultDialect() dialect.supports_simple_order_by_label = False self.assert_compile( select([lab1, lab2]).order_by(lab1, desc(lab2)), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY mytable.myid + :myid_1, somefunc(mytable.name) DESC", dialect=dialect, ) self.assert_compile( select([lab1, lab2]).order_by(func.hoho(lab1), desc(lab2)), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY hoho(mytable.myid + :myid_1), " "somefunc(mytable.name) DESC", dialect=dialect, ) def test_no_group_by_labels(self): lab1 = (table1.c.myid + 12).label("foo") lab2 = func.somefunc(table1.c.name).label("bar") dialect = default.DefaultDialect() self.assert_compile( select([lab1, lab2]).group_by(lab1, lab2), "SELECT mytable.myid + :myid_1 AS foo, somefunc(mytable.name) " "AS bar FROM mytable GROUP BY mytable.myid + :myid_1, " "somefunc(mytable.name)", dialect=dialect, ) def test_conjunctions(self): a, b, c = text("a"), text("b"), text("c") x = and_(a, b, c) assert isinstance(x.type, Boolean) assert str(x) == "a AND b AND c" self.assert_compile( select([x.label("foo")]), "SELECT a AND b AND c AS foo" ) self.assert_compile( and_( table1.c.myid == 12, table1.c.name == "asdf", table2.c.othername == "foo", text("sysdate() = today()"), ), "mytable.myid = :myid_1 AND mytable.name = :name_1 " "AND myothertable.othername = " ":othername_1 AND sysdate() = today()", ) self.assert_compile( and_( table1.c.myid == 12, or_( table2.c.othername == "asdf", table2.c.othername == "foo", table2.c.otherid == 9, ), text("sysdate() = today()"), ), "mytable.myid = :myid_1 AND (myothertable.othername = " ":othername_1 OR myothertable.othername = :othername_2 OR " "myothertable.otherid = :otherid_1) AND sysdate() = " "today()", checkparams={ "othername_1": "asdf", "othername_2": "foo", "otherid_1": 9, "myid_1": 12, }, ) # test a generator self.assert_compile( and_( conj for conj in [table1.c.myid == 12, table1.c.name == "asdf"] ), "mytable.myid = :myid_1 AND mytable.name = :name_1", ) def test_nested_conjunctions_short_circuit(self): """test that empty or_(), and_() conjunctions are collapsed by an enclosing conjunction.""" t = table("t", column("x")) self.assert_compile( select([t]).where(and_(t.c.x == 5, or_(and_(or_(t.c.x == 7))))), "SELECT t.x FROM t WHERE t.x = :x_1 AND t.x = :x_2", ) self.assert_compile( select([t]).where(and_(or_(t.c.x == 12, and_(or_(t.c.x == 8))))), "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2", ) self.assert_compile( select([t]).where( and_( or_( or_(t.c.x == 12), and_(or_(), or_(and_(t.c.x == 8)), and_()), ) ) ), "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2", ) def test_true_short_circuit(self): t = table("t", column("x")) self.assert_compile( select([t]).where(true()), "SELECT t.x FROM t WHERE 1 = 1", dialect=default.DefaultDialect(supports_native_boolean=False), ) self.assert_compile( select([t]).where(true()), "SELECT t.x FROM t WHERE true", dialect=default.DefaultDialect(supports_native_boolean=True), ) self.assert_compile( select([t]), "SELECT t.x FROM t", dialect=default.DefaultDialect(supports_native_boolean=True), ) def test_distinct(self): self.assert_compile( select([table1.c.myid.distinct()]), "SELECT DISTINCT mytable.myid FROM mytable", ) self.assert_compile( select([distinct(table1.c.myid)]), "SELECT DISTINCT mytable.myid FROM mytable", ) self.assert_compile( select([table1.c.myid]).distinct(), "SELECT DISTINCT mytable.myid FROM mytable", ) self.assert_compile( select([func.count(table1.c.myid.distinct())]), "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable", ) self.assert_compile( select([func.count(distinct(table1.c.myid))]), "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable", ) def test_where_empty(self): self.assert_compile( select([table1.c.myid]).where(and_()), "SELECT mytable.myid FROM mytable", ) self.assert_compile( select([table1.c.myid]).where(or_()), "SELECT mytable.myid FROM mytable", ) def test_multiple_col_binds(self): self.assert_compile( select( [literal_column("*")], or_( table1.c.myid == 12, table1.c.myid == "asdf", table1.c.myid == "foo", ), ), "SELECT * FROM mytable WHERE mytable.myid = :myid_1 " "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3", ) def test_order_by_nulls(self): self.assert_compile( table2.select( order_by=[ table2.c.otherid, table2.c.othername.desc().nullsfirst(), ] ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " "myothertable.othername DESC NULLS FIRST", ) self.assert_compile( table2.select( order_by=[ table2.c.otherid, table2.c.othername.desc().nullslast(), ] ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " "myothertable.othername DESC NULLS LAST", ) self.assert_compile( table2.select( order_by=[ table2.c.otherid.nullslast(), table2.c.othername.desc().nullsfirst(), ] ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid NULLS LAST, " "myothertable.othername DESC NULLS FIRST", ) self.assert_compile( table2.select( order_by=[ table2.c.otherid.nullsfirst(), table2.c.othername.desc(), ] ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid NULLS FIRST, " "myothertable.othername DESC", ) self.assert_compile( table2.select( order_by=[ table2.c.otherid.nullsfirst(), table2.c.othername.desc().nullslast(), ] ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid NULLS FIRST, " "myothertable.othername DESC NULLS LAST", ) def test_orderby_groupby(self): self.assert_compile( table2.select( order_by=[table2.c.otherid, asc(table2.c.othername)] ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " "myothertable.othername ASC", ) self.assert_compile( table2.select( order_by=[table2.c.otherid, table2.c.othername.desc()] ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " "myothertable.othername DESC", ) # generative order_by self.assert_compile( table2.select() .order_by(table2.c.otherid) .order_by(table2.c.othername.desc()), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " "myothertable.othername DESC", ) self.assert_compile( table2.select() .order_by(table2.c.otherid) .order_by(table2.c.othername.desc()) .order_by(None), "SELECT myothertable.otherid, myothertable.othername " "FROM myothertable", ) self.assert_compile( select( [table2.c.othername, func.count(table2.c.otherid)], group_by=[table2.c.othername], ), "SELECT myothertable.othername, " "count(myothertable.otherid) AS count_1 " "FROM myothertable GROUP BY myothertable.othername", ) # generative group by self.assert_compile( select( [table2.c.othername, func.count(table2.c.otherid)] ).group_by(table2.c.othername), "SELECT myothertable.othername, " "count(myothertable.otherid) AS count_1 " "FROM myothertable GROUP BY myothertable.othername", ) self.assert_compile( select([table2.c.othername, func.count(table2.c.otherid)]) .group_by(table2.c.othername) .group_by(None), "SELECT myothertable.othername, " "count(myothertable.otherid) AS count_1 " "FROM myothertable", ) self.assert_compile( select( [table2.c.othername, func.count(table2.c.otherid)], group_by=[table2.c.othername], order_by=[table2.c.othername], ), "SELECT myothertable.othername, " "count(myothertable.otherid) AS count_1 " "FROM myothertable " "GROUP BY myothertable.othername ORDER BY myothertable.othername", ) def test_custom_order_by_clause(self): class CustomCompiler(PGCompiler): def order_by_clause(self, select, **kw): return ( super(CustomCompiler, self).order_by_clause(select, **kw) + " CUSTOMIZED" ) class CustomDialect(PGDialect): name = "custom" statement_compiler = CustomCompiler stmt = select([table1.c.myid]).order_by(table1.c.myid) self.assert_compile( stmt, "SELECT mytable.myid FROM mytable ORDER BY " "mytable.myid CUSTOMIZED", dialect=CustomDialect(), ) def test_custom_group_by_clause(self): class CustomCompiler(PGCompiler): def group_by_clause(self, select, **kw): return ( super(CustomCompiler, self).group_by_clause(select, **kw) + " CUSTOMIZED" ) class CustomDialect(PGDialect): name = "custom" statement_compiler = CustomCompiler stmt = select([table1.c.myid]).group_by(table1.c.myid) self.assert_compile( stmt, "SELECT mytable.myid FROM mytable GROUP BY " "mytable.myid CUSTOMIZED", dialect=CustomDialect(), ) def test_for_update(self): self.assert_compile( table1.select(table1.c.myid == 7).with_for_update(), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", ) # not supported by dialect, should just use update self.assert_compile( table1.select(table1.c.myid == 7).with_for_update(nowait=True), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", ) assert_raises_message( exc.ArgumentError, "Unknown for_update argument: 'unknown_mode'", table1.select, table1.c.myid == 7, for_update="unknown_mode", ) def test_alias(self): # test the alias for a table1. column names stay the same, # table name "changes" to "foo". self.assert_compile( select([table1.alias("foo")]), "SELECT foo.myid, foo.name, foo.description FROM mytable AS foo", ) for dialect in (oracle.dialect(),): self.assert_compile( select([table1.alias("foo")]), "SELECT foo.myid, foo.name, foo.description FROM mytable foo", dialect=dialect, ) self.assert_compile( select([table1.alias()]), "SELECT mytable_1.myid, mytable_1.name, mytable_1.description " "FROM mytable AS mytable_1", ) # create a select for a join of two tables. use_labels # means the column names will have labels tablename_columnname, # which become the column keys accessible off the Selectable object. # also, only use one column from the second table and all columns # from the first table1. q = select( [table1, table2.c.otherid], table1.c.myid == table2.c.otherid, use_labels=True, ) # make an alias of the "selectable". column names # stay the same (i.e. the labels), table name "changes" to "t2view". a = alias(q, "t2view") # select from that alias, also using labels. two levels of labels # should produce two underscores. # also, reference the column "mytable_myid" off of the t2view alias. self.assert_compile( a.select(a.c.mytable_myid == 9, use_labels=True), "SELECT t2view.mytable_myid AS t2view_mytable_myid, " "t2view.mytable_name " "AS t2view_mytable_name, " "t2view.mytable_description AS t2view_mytable_description, " "t2view.myothertable_otherid AS t2view_myothertable_otherid FROM " "(SELECT mytable.myid AS mytable_myid, " "mytable.name AS mytable_name, " "mytable.description AS mytable_description, " "myothertable.otherid AS " "myothertable_otherid FROM mytable, myothertable " "WHERE mytable.myid = " "myothertable.otherid) AS t2view " "WHERE t2view.mytable_myid = :mytable_myid_1", ) def test_prefix(self): self.assert_compile( table1.select() .prefix_with("SQL_CALC_FOUND_ROWS") .prefix_with("SQL_SOME_WEIRD_MYSQL_THING"), "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING " "mytable.myid, mytable.name, mytable.description FROM mytable", ) def test_prefix_dialect_specific(self): self.assert_compile( table1.select() .prefix_with("SQL_CALC_FOUND_ROWS", dialect="sqlite") .prefix_with("SQL_SOME_WEIRD_MYSQL_THING", dialect="mysql"), "SELECT SQL_SOME_WEIRD_MYSQL_THING " "mytable.myid, mytable.name, mytable.description FROM mytable", dialect=mysql.dialect(), ) def test_render_binds_as_literal(self): """test a compiler that renders binds inline into SQL in the columns clause.""" dialect = default.DefaultDialect() class Compiler(dialect.statement_compiler): ansi_bind_rules = True dialect.statement_compiler = Compiler self.assert_compile( select([literal("someliteral")]), "SELECT 'someliteral' AS anon_1", dialect=dialect, ) self.assert_compile( select([table1.c.myid + 3]), "SELECT mytable.myid + 3 AS anon_1 FROM mytable", dialect=dialect, ) self.assert_compile( select([table1.c.myid.in_([4, 5, 6])]), "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable", dialect=dialect, ) self.assert_compile( select([func.mod(table1.c.myid, 5)]), "SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable", dialect=dialect, ) self.assert_compile( select([literal("foo").in_([])]), "SELECT 1 != 1 AS anon_1", dialect=dialect, ) self.assert_compile( select([literal(util.b("foo"))]), "SELECT 'foo' AS anon_1", dialect=dialect, ) # test callable self.assert_compile( select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]), "SELECT mytable.myid = 5 AS anon_1 FROM mytable", dialect=dialect, ) empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic") empty_in_dialect.statement_compiler = Compiler assert_raises_message( exc.CompileError, "Bind parameter 'foo' without a " "renderable value not allowed here.", bindparam("foo").in_([]).compile, dialect=empty_in_dialect, ) def test_collate(self): # columns clause self.assert_compile( select([column("x").collate("bar")]), "SELECT x COLLATE bar AS anon_1", ) # WHERE clause self.assert_compile( select([column("x")]).where(column("x").collate("bar") == "foo"), "SELECT x WHERE (x COLLATE bar) = :param_1", ) # ORDER BY clause self.assert_compile( select([column("x")]).order_by(column("x").collate("bar")), "SELECT x ORDER BY x COLLATE bar", ) def test_literal(self): self.assert_compile( select([literal("foo")]), "SELECT :param_1 AS anon_1" ) self.assert_compile( select([literal("foo") + literal("bar")], from_obj=[table1]), "SELECT :param_1 || :param_2 AS anon_1 FROM mytable", ) def test_calculated_columns(self): value_tbl = table( "values", column("id", Integer), column("val1", Float), column("val2", Float), ) self.assert_compile( select( [ value_tbl.c.id, (value_tbl.c.val2 - value_tbl.c.val1) / value_tbl.c.val1, ] ), "SELECT values.id, (values.val2 - values.val1) " "/ values.val1 AS anon_1 FROM values", ) self.assert_compile( select( [value_tbl.c.id], (value_tbl.c.val2 - value_tbl.c.val1) / value_tbl.c.val1 > 2.0, ), "SELECT values.id FROM values WHERE " "(values.val2 - values.val1) / values.val1 > :param_1", ) self.assert_compile( select( [value_tbl.c.id], value_tbl.c.val1 / (value_tbl.c.val2 - value_tbl.c.val1) / value_tbl.c.val1 > 2.0, ), "SELECT values.id FROM values WHERE " "(values.val1 / (values.val2 - values.val1)) " "/ values.val1 > :param_1", ) def test_percent_chars(self): t = table( "table%name", column("percent%"), column("%(oneofthese)s"), column("spaces % more spaces"), ) self.assert_compile( t.select(use_labels=True), """SELECT "table%name"."percent%" AS "table%name_percent%", """ """"table%name"."%(oneofthese)s" AS """ """"table%name_%(oneofthese)s", """ """"table%name"."spaces % more spaces" AS """ """"table%name_spaces % """ '''more spaces" FROM "table%name"''', ) def test_joins(self): self.assert_compile( join(table2, table1, table1.c.myid == table2.c.otherid).select(), "SELECT myothertable.otherid, myothertable.othername, " "mytable.myid, mytable.name, mytable.description FROM " "myothertable JOIN mytable ON mytable.myid = myothertable.otherid", ) self.assert_compile( select( [table1], from_obj=[ join(table1, table2, table1.c.myid == table2.c.otherid) ], ), "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable JOIN myothertable ON mytable.myid = myothertable.otherid", ) self.assert_compile( select( [ join( join( table1, table2, table1.c.myid == table2.c.otherid ), table3, table1.c.myid == table3.c.userid, ) ] ), "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername, " "thirdtable.userid, " "thirdtable.otherstuff FROM mytable JOIN myothertable " "ON mytable.myid =" " myothertable.otherid JOIN thirdtable ON " "mytable.myid = thirdtable.userid", ) self.assert_compile( join( users, addresses, users.c.user_id == addresses.c.user_id ).select(), "SELECT users.user_id, users.user_name, users.password, " "addresses.address_id, addresses.user_id, addresses.street, " "addresses.city, addresses.state, addresses.zip " "FROM users JOIN addresses " "ON users.user_id = addresses.user_id", ) self.assert_compile( select( [table1, table2, table3], from_obj=[ join( table1, table2, table1.c.myid == table2.c.otherid ).outerjoin(table3, table1.c.myid == table3.c.userid) ], ), "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername, " "thirdtable.userid," " thirdtable.otherstuff FROM mytable " "JOIN myothertable ON mytable.myid " "= myothertable.otherid LEFT OUTER JOIN thirdtable " "ON mytable.myid =" " thirdtable.userid", ) self.assert_compile( select( [table1, table2, table3], from_obj=[ outerjoin( table1, join( table2, table3, table2.c.otherid == table3.c.userid ), table1.c.myid == table2.c.otherid, ) ], ), "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername, " "thirdtable.userid," " thirdtable.otherstuff FROM mytable LEFT OUTER JOIN " "(myothertable " "JOIN thirdtable ON myothertable.otherid = " "thirdtable.userid) ON " "mytable.myid = myothertable.otherid", ) query = select( [table1, table2], or_( table1.c.name == "fred", table1.c.myid == 10, table2.c.othername != "jack", text("EXISTS (select yay from foo where boo = lar)"), ), from_obj=[ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ], ) self.assert_compile( query, "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername " "FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = " "myothertable.otherid WHERE mytable.name = :name_1 OR " "mytable.myid = :myid_1 OR myothertable.othername != :othername_1 " "OR EXISTS (select yay from foo where boo = lar)", ) def test_full_outer_join(self): for spec in [ join(table1, table2, table1.c.myid == table2.c.otherid, full=True), outerjoin( table1, table2, table1.c.myid == table2.c.otherid, full=True ), table1.join(table2, table1.c.myid == table2.c.otherid, full=True), table1.outerjoin( table2, table1.c.myid == table2.c.otherid, full=True ), ]: stmt = select([table1]).select_from(spec) self.assert_compile( stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable FULL OUTER JOIN myothertable " "ON mytable.myid = myothertable.otherid", ) def test_compound_selects(self): assert_raises_message( exc.ArgumentError, "All selectables passed to CompoundSelect " "must have identical numbers of columns; " "select #1 has 2 columns, select #2 has 3", union, table3.select(), table1.select(), ) x = union( select([table1], table1.c.myid == 5), select([table1], table1.c.myid == 12), order_by=[table1.c.myid], ) self.assert_compile( x, "SELECT mytable.myid, mytable.name, " "mytable.description " "FROM mytable WHERE " "mytable.myid = :myid_1 UNION " "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = :myid_2 " "ORDER BY mytable.myid", ) x = union(select([table1]), select([table1])) x = union(x, select([table1])) self.assert_compile( x, "(SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable UNION SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable) UNION SELECT mytable.myid," " mytable.name, mytable.description FROM mytable", ) u1 = union( select([table1.c.myid, table1.c.name]), select([table2]), select([table3]), ) self.assert_compile( u1, "SELECT mytable.myid, mytable.name " "FROM mytable UNION SELECT myothertable.otherid, " "myothertable.othername FROM myothertable " "UNION SELECT thirdtable.userid, thirdtable.otherstuff " "FROM thirdtable", ) assert u1.corresponding_column(table2.c.otherid) is u1.c.myid self.assert_compile( union( select([table1.c.myid, table1.c.name]), select([table2]), order_by=["myid"], offset=10, limit=5, ), "SELECT mytable.myid, mytable.name " "FROM mytable UNION SELECT myothertable.otherid, " "myothertable.othername " "FROM myothertable ORDER BY myid " # note table name is omitted "LIMIT :param_1 OFFSET :param_2", {"param_1": 5, "param_2": 10}, ) self.assert_compile( union( select( [ table1.c.myid, table1.c.name, func.max(table1.c.description), ], table1.c.name == "name2", group_by=[table1.c.myid, table1.c.name], ), table1.select(table1.c.name == "name1"), ), "SELECT mytable.myid, mytable.name, " "max(mytable.description) AS max_1 " "FROM mytable WHERE mytable.name = :name_1 " "GROUP BY mytable.myid, " "mytable.name UNION SELECT mytable.myid, mytable.name, " "mytable.description " "FROM mytable WHERE mytable.name = :name_2", ) self.assert_compile( union( select([literal(100).label("value")]), select([literal(200).label("value")]), ), "SELECT :param_1 AS value UNION SELECT :param_2 AS value", ) self.assert_compile( union_all( select([table1.c.myid]), union(select([table2.c.otherid]), select([table3.c.userid])), ), "SELECT mytable.myid FROM mytable UNION ALL " "(SELECT myothertable.otherid FROM myothertable UNION " "SELECT thirdtable.userid FROM thirdtable)", ) s = select([column("foo"), column("bar")]) self.assert_compile( union(s.order_by("foo"), s.order_by("bar")), "(SELECT foo, bar ORDER BY foo) UNION " "(SELECT foo, bar ORDER BY bar)", ) self.assert_compile( union( s.order_by("foo").self_group(), s.order_by("bar").limit(10).self_group(), ), "(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, " "bar ORDER BY bar LIMIT :param_1)", {"param_1": 10}, ) def test_compound_grouping(self): s = select([column("foo"), column("bar")]).select_from(text("bat")) self.assert_compile( union(union(union(s, s), s), s), "((SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) " "UNION SELECT foo, bar FROM bat) UNION SELECT foo, bar FROM bat", ) self.assert_compile( union(s, s, s, s), "SELECT foo, bar FROM bat UNION SELECT foo, bar " "FROM bat UNION SELECT foo, bar FROM bat " "UNION SELECT foo, bar FROM bat", ) self.assert_compile( union(s, union(s, union(s, s))), "SELECT foo, bar FROM bat UNION (SELECT foo, bar FROM bat " "UNION (SELECT foo, bar FROM bat " "UNION SELECT foo, bar FROM bat))", ) self.assert_compile( select([s.alias()]), "SELECT anon_1.foo, anon_1.bar FROM " "(SELECT foo, bar FROM bat) AS anon_1", ) self.assert_compile( select([union(s, s).alias()]), "SELECT anon_1.foo, anon_1.bar FROM " "(SELECT foo, bar FROM bat UNION " "SELECT foo, bar FROM bat) AS anon_1", ) self.assert_compile( select([except_(s, s).alias()]), "SELECT anon_1.foo, anon_1.bar FROM " "(SELECT foo, bar FROM bat EXCEPT " "SELECT foo, bar FROM bat) AS anon_1", ) # this query sqlite specifically chokes on self.assert_compile( union(except_(s, s), s), "(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) " "UNION SELECT foo, bar FROM bat", ) self.assert_compile( union(s, except_(s, s)), "SELECT foo, bar FROM bat " "UNION (SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat)", ) # this solves it self.assert_compile( union(except_(s, s).alias().select(), s), "SELECT anon_1.foo, anon_1.bar FROM " "(SELECT foo, bar FROM bat EXCEPT " "SELECT foo, bar FROM bat) AS anon_1 " "UNION SELECT foo, bar FROM bat", ) self.assert_compile( except_(union(s, s), union(s, s)), "(SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) " "EXCEPT (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat)", ) s2 = union(s, s) s3 = union(s2, s2) self.assert_compile( s3, "(SELECT foo, bar FROM bat " "UNION SELECT foo, bar FROM bat) " "UNION (SELECT foo, bar FROM bat " "UNION SELECT foo, bar FROM bat)", ) self.assert_compile( union(intersect(s, s), intersect(s, s)), "(SELECT foo, bar FROM bat INTERSECT SELECT foo, bar FROM bat) " "UNION (SELECT foo, bar FROM bat INTERSECT " "SELECT foo, bar FROM bat)", ) # tests for [ticket:2528] # sqlite hates all of these. self.assert_compile( union(s.limit(1), s.offset(2)), "(SELECT foo, bar FROM bat LIMIT :param_1) " "UNION (SELECT foo, bar FROM bat LIMIT -1 OFFSET :param_2)", ) self.assert_compile( union(s.order_by(column("bar")), s.offset(2)), "(SELECT foo, bar FROM bat ORDER BY bar) " "UNION (SELECT foo, bar FROM bat LIMIT -1 OFFSET :param_1)", ) self.assert_compile( union(s.limit(1).alias("a"), s.limit(2).alias("b")), "(SELECT foo, bar FROM bat LIMIT :param_1) " "UNION (SELECT foo, bar FROM bat LIMIT :param_2)", ) self.assert_compile( union(s.limit(1).self_group(), s.limit(2).self_group()), "(SELECT foo, bar FROM bat LIMIT :param_1) " "UNION (SELECT foo, bar FROM bat LIMIT :param_2)", ) self.assert_compile( union(s.limit(1), s.limit(2).offset(3)).alias().select(), "SELECT anon_1.foo, anon_1.bar FROM " "((SELECT foo, bar FROM bat LIMIT :param_1) " "UNION (SELECT foo, bar FROM bat LIMIT :param_2 OFFSET :param_3)) " "AS anon_1", ) # this version works for SQLite self.assert_compile( union(s.limit(1).alias().select(), s.offset(2).alias().select()), "SELECT anon_1.foo, anon_1.bar " "FROM (SELECT foo, bar FROM bat" " LIMIT :param_1) AS anon_1 " "UNION SELECT anon_2.foo, anon_2.bar " "FROM (SELECT foo, bar " "FROM bat" " LIMIT -1 OFFSET :param_2) AS anon_2", ) def test_binds(self): for ( stmt, expected_named_stmt, expected_positional_stmt, expected_default_params_dict, expected_default_params_list, test_param_dict, expected_test_params_dict, expected_test_params_list, ) in [ ( select( [table1, table2], and_( table1.c.myid == table2.c.otherid, table1.c.name == bindparam("mytablename"), ), ), "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername FROM mytable, " "myothertable WHERE mytable.myid = myothertable.otherid " "AND mytable.name = :mytablename", "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername FROM mytable, " "myothertable WHERE mytable.myid = myothertable.otherid AND " "mytable.name = ?", {"mytablename": None}, [None], {"mytablename": 5}, {"mytablename": 5}, [5], ), ( select( [table1], or_( table1.c.myid == bindparam("myid"), table2.c.otherid == bindparam("myid"), ), ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable, myothertable WHERE mytable.myid = :myid " "OR myothertable.otherid = :myid", "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable, myothertable WHERE mytable.myid = ? " "OR myothertable.otherid = ?", {"myid": None}, [None, None], {"myid": 5}, {"myid": 5}, [5, 5], ), ( text( "SELECT mytable.myid, mytable.name, " "mytable.description FROM " "mytable, myothertable WHERE mytable.myid = :myid OR " "myothertable.otherid = :myid" ), "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = :myid OR " "myothertable.otherid = :myid", "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = ? OR " "myothertable.otherid = ?", {"myid": None}, [None, None], {"myid": 5}, {"myid": 5}, [5, 5], ), ( select( [table1], or_( table1.c.myid == bindparam("myid", unique=True), table2.c.otherid == bindparam("myid", unique=True), ), ), "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = " ":myid_1 OR myothertable.otherid = :myid_2", "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = ? " "OR myothertable.otherid = ?", {"myid_1": None, "myid_2": None}, [None, None], {"myid_1": 5, "myid_2": 6}, {"myid_1": 5, "myid_2": 6}, [5, 6], ), ( bindparam("test", type_=String, required=False) + text("'hi'"), ":test || 'hi'", "? || 'hi'", {"test": None}, [None], {}, {"test": None}, [None], ), ( # testing select.params() here - bindparam() objects # must get required flag set to False select( [table1], or_( table1.c.myid == bindparam("myid"), table2.c.otherid == bindparam("myotherid"), ), ).params({"myid": 8, "myotherid": 7}), "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = " ":myid OR myothertable.otherid = :myotherid", "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = " "? OR myothertable.otherid = ?", {"myid": 8, "myotherid": 7}, [8, 7], {"myid": 5}, {"myid": 5, "myotherid": 7}, [5, 7], ), ( select( [table1], or_( table1.c.myid == bindparam("myid", value=7, unique=True), table2.c.otherid == bindparam("myid", value=8, unique=True), ), ), "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = " ":myid_1 OR myothertable.otherid = :myid_2", "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = " "? OR myothertable.otherid = ?", {"myid_1": 7, "myid_2": 8}, [7, 8], {"myid_1": 5, "myid_2": 6}, {"myid_1": 5, "myid_2": 6}, [5, 6], ), ]: self.assert_compile( stmt, expected_named_stmt, params=expected_default_params_dict ) self.assert_compile( stmt, expected_positional_stmt, dialect=sqlite.dialect() ) nonpositional = stmt.compile() positional = stmt.compile(dialect=sqlite.dialect()) pp = positional.params eq_( [pp[k] for k in positional.positiontup], expected_default_params_list, ) eq_( nonpositional.construct_params(test_param_dict), expected_test_params_dict, ) pp = positional.construct_params(test_param_dict) eq_( [pp[k] for k in positional.positiontup], expected_test_params_list, ) # check that params() doesn't modify original statement s = select( [table1], or_( table1.c.myid == bindparam("myid"), table2.c.otherid == bindparam("myotherid"), ), ) s2 = s.params({"myid": 8, "myotherid": 7}) s3 = s2.params({"myid": 9}) assert s.compile().params == {"myid": None, "myotherid": None} assert s2.compile().params == {"myid": 8, "myotherid": 7} assert s3.compile().params == {"myid": 9, "myotherid": 7} # test using same 'unique' param object twice in one compile s = select([table1.c.myid]).where(table1.c.myid == 12).as_scalar() s2 = select([table1, s], table1.c.myid == s) self.assert_compile( s2, "SELECT mytable.myid, mytable.name, mytable.description, " "(SELECT mytable.myid FROM mytable WHERE mytable.myid = " ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = " "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)", ) positional = s2.compile(dialect=sqlite.dialect()) pp = positional.params assert [pp[k] for k in positional.positiontup] == [12, 12] # check that conflicts with "unique" params are caught s = select( [table1], or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")), ) assert_raises_message( exc.CompileError, "conflicts with unique bind parameter " "of the same name", str, s, ) s = select( [table1], or_( table1.c.myid == 7, table1.c.myid == 8, table1.c.myid == bindparam("myid_1"), ), ) assert_raises_message( exc.CompileError, "conflicts with unique bind parameter " "of the same name", str, s, ) def _test_binds_no_hash_collision(self): """test that construct_params doesn't corrupt dict due to hash collisions""" total_params = 100000 in_clause = [":in%d" % i for i in range(total_params)] params = dict(("in%d" % i, i) for i in range(total_params)) t = text("text clause %s" % ", ".join(in_clause)) eq_(len(t.bindparams), total_params) c = t.compile() pp = c.construct_params(params) eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp))) eq_(len(set(pp.values())), total_params) def test_bind_as_col(self): t = table("foo", column("id")) s = select([t, literal("lala").label("hoho")]) self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo") assert [str(c) for c in s.c] == ["id", "hoho"] def test_bind_callable(self): expr = column("x") == bindparam("key", callable_=lambda: 12) self.assert_compile(expr, "x = :key", {"x": 12}) def test_bind_params_missing(self): assert_raises_message( exc.InvalidRequestError, r"A value is required for bind parameter 'x'", select([table1]) .where( and_( table1.c.myid == bindparam("x", required=True), table1.c.name == bindparam("y", required=True), ) ) .compile() .construct_params, params=dict(y=5), ) assert_raises_message( exc.InvalidRequestError, r"A value is required for bind parameter 'x'", select([table1]) .where(table1.c.myid == bindparam("x", required=True)) .compile() .construct_params, ) assert_raises_message( exc.InvalidRequestError, r"A value is required for bind parameter 'x', " "in parameter group 2", select([table1]) .where( and_( table1.c.myid == bindparam("x", required=True), table1.c.name == bindparam("y", required=True), ) ) .compile() .construct_params, params=dict(y=5), _group_number=2, ) assert_raises_message( exc.InvalidRequestError, r"A value is required for bind parameter 'x', " "in parameter group 2", select([table1]) .where(table1.c.myid == bindparam("x", required=True)) .compile() .construct_params, _group_number=2, ) def test_tuple(self): self.assert_compile( tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]), "(mytable.myid, mytable.name) IN " "((:param_1, :param_2), (:param_3, :param_4))", ) self.assert_compile( tuple_(table1.c.myid, table1.c.name).in_( [tuple_(table2.c.otherid, table2.c.othername)] ), "(mytable.myid, mytable.name) IN " "((myothertable.otherid, myothertable.othername))", ) self.assert_compile( tuple_(table1.c.myid, table1.c.name).in_( select([table2.c.otherid, table2.c.othername]) ), "(mytable.myid, mytable.name) IN (SELECT " "myothertable.otherid, myothertable.othername FROM myothertable)", ) def test_expanding_parameter(self): self.assert_compile( tuple_(table1.c.myid, table1.c.name).in_( bindparam("foo", expanding=True) ), "(mytable.myid, mytable.name) IN ([EXPANDING_foo])", ) self.assert_compile( table1.c.myid.in_(bindparam("foo", expanding=True)), "mytable.myid IN ([EXPANDING_foo])", ) def test_cast(self): tbl = table( "casttest", column("id", Integer), column("v1", Float), column("v2", Float), column("ts", TIMESTAMP), ) def check_results(dialect, expected_results, literal): eq_( len(expected_results), 5, "Incorrect number of expected results", ) eq_( str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)), "CAST(casttest.v1 AS %s)" % expected_results[0], ) eq_( str(tbl.c.v1.cast(Numeric).compile(dialect=dialect)), "CAST(casttest.v1 AS %s)" % expected_results[0], ) eq_( str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)), "CAST(casttest.v1 AS %s)" % expected_results[1], ) eq_( str(cast(tbl.c.ts, Date).compile(dialect=dialect)), "CAST(casttest.ts AS %s)" % expected_results[2], ) eq_( str(cast(1234, Text).compile(dialect=dialect)), "CAST(%s AS %s)" % (literal, expected_results[3]), ) eq_( str(cast("test", String(20)).compile(dialect=dialect)), "CAST(%s AS %s)" % (literal, expected_results[4]), ) # fixme: shoving all of this dialect-specific stuff in one test # is now officially completely ridiculous AND non-obviously omits # coverage on other dialects. sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile( dialect=dialect ) if isinstance(dialect, type(mysql.dialect())): eq_( str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, " "casttest.ts, " "CAST(casttest.v1 AS DECIMAL) AS anon_1 \nFROM casttest", ) else: eq_( str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, " "casttest.ts, CAST(casttest.v1 AS NUMERIC) AS " "anon_1 \nFROM casttest", ) # first test with PostgreSQL engine check_results( postgresql.dialect(), ["NUMERIC", "NUMERIC(12, 9)", "DATE", "TEXT", "VARCHAR(20)"], "%(param_1)s", ) # then the Oracle engine check_results( oracle.dialect(), ["NUMERIC", "NUMERIC(12, 9)", "DATE", "CLOB", "VARCHAR2(20 CHAR)"], ":param_1", ) # then the sqlite engine check_results( sqlite.dialect(), ["NUMERIC", "NUMERIC(12, 9)", "DATE", "TEXT", "VARCHAR(20)"], "?", ) # then the MySQL engine check_results( mysql.dialect(), ["DECIMAL", "DECIMAL(12, 9)", "DATE", "CHAR", "CHAR(20)"], "%s", ) self.assert_compile( cast(text("NULL"), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect(), ) self.assert_compile( cast(null(), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect(), ) self.assert_compile( cast(literal_column("NULL"), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect(), ) def test_over(self): self.assert_compile(func.row_number().over(), "row_number() OVER ()") self.assert_compile( func.row_number().over( order_by=[table1.c.name, table1.c.description] ), "row_number() OVER (ORDER BY mytable.name, mytable.description)", ) self.assert_compile( func.row_number().over( partition_by=[table1.c.name, table1.c.description] ), "row_number() OVER (PARTITION BY mytable.name, " "mytable.description)", ) self.assert_compile( func.row_number().over( partition_by=[table1.c.name], order_by=[table1.c.description] ), "row_number() OVER (PARTITION BY mytable.name " "ORDER BY mytable.description)", ) self.assert_compile( func.row_number().over( partition_by=table1.c.name, order_by=table1.c.description ), "row_number() OVER (PARTITION BY mytable.name " "ORDER BY mytable.description)", ) self.assert_compile( func.row_number().over( partition_by=table1.c.name, order_by=[table1.c.name, table1.c.description], ), "row_number() OVER (PARTITION BY mytable.name " "ORDER BY mytable.name, mytable.description)", ) self.assert_compile( func.row_number().over( partition_by=[], order_by=[table1.c.name, table1.c.description] ), "row_number() OVER (ORDER BY mytable.name, mytable.description)", ) self.assert_compile( func.row_number().over( partition_by=[table1.c.name, table1.c.description], order_by=[] ), "row_number() OVER (PARTITION BY mytable.name, " "mytable.description)", ) self.assert_compile( func.row_number().over(partition_by=[], order_by=[]), "row_number() OVER ()", ) self.assert_compile( select( [ func.row_number() .over(order_by=table1.c.description) .label("foo") ] ), "SELECT row_number() OVER (ORDER BY mytable.description) " "AS foo FROM mytable", ) # test from_obj generation. # from func: self.assert_compile( select( [func.max(table1.c.name).over(partition_by=["description"])] ), "SELECT max(mytable.name) OVER (PARTITION BY mytable.description) " "AS anon_1 FROM mytable", ) # from partition_by self.assert_compile( select([func.row_number().over(partition_by=[table1.c.name])]), "SELECT row_number() OVER (PARTITION BY mytable.name) " "AS anon_1 FROM mytable", ) # from order_by self.assert_compile( select([func.row_number().over(order_by=table1.c.name)]), "SELECT row_number() OVER (ORDER BY mytable.name) " "AS anon_1 FROM mytable", ) # this tests that _from_objects # concantenates OK self.assert_compile( select([column("x") + over(func.foo())]), "SELECT x + foo() OVER () AS anon_1", ) # test a reference to a label that in the referecned selectable; # this resolves expr = (table1.c.myid + 5).label("sum") stmt = select([expr]).alias() self.assert_compile( select([stmt.c.sum, func.row_number().over(order_by=stmt.c.sum)]), "SELECT anon_1.sum, row_number() OVER (ORDER BY anon_1.sum) " "AS anon_2 FROM (SELECT mytable.myid + :myid_1 AS sum " "FROM mytable) AS anon_1", ) # test a reference to a label that's at the same level as the OVER # in the columns clause; doesn't resolve expr = (table1.c.myid + 5).label("sum") self.assert_compile( select([expr, func.row_number().over(order_by=expr)]), "SELECT mytable.myid + :myid_1 AS sum, " "row_number() OVER " "(ORDER BY mytable.myid + :myid_1) AS anon_1 FROM mytable", ) def test_over_framespec(self): expr = table1.c.myid self.assert_compile( select([func.row_number().over(order_by=expr, rows=(0, None))]), "SELECT row_number() OVER " "(ORDER BY mytable.myid ROWS BETWEEN CURRENT " "ROW AND UNBOUNDED FOLLOWING)" " AS anon_1 FROM mytable", ) self.assert_compile( select([func.row_number().over(order_by=expr, rows=(None, None))]), "SELECT row_number() OVER " "(ORDER BY mytable.myid ROWS BETWEEN UNBOUNDED " "PRECEDING AND UNBOUNDED FOLLOWING)" " AS anon_1 FROM mytable", ) self.assert_compile( select([func.row_number().over(order_by=expr, range_=(None, 0))]), "SELECT row_number() OVER " "(ORDER BY mytable.myid RANGE BETWEEN " "UNBOUNDED PRECEDING AND CURRENT ROW)" " AS anon_1 FROM mytable", ) self.assert_compile( select([func.row_number().over(order_by=expr, range_=(-5, 10))]), "SELECT row_number() OVER " "(ORDER BY mytable.myid RANGE BETWEEN " ":param_1 PRECEDING AND :param_2 FOLLOWING)" " AS anon_1 FROM mytable", checkparams={"param_1": 5, "param_2": 10}, ) self.assert_compile( select([func.row_number().over(order_by=expr, range_=(1, 10))]), "SELECT row_number() OVER " "(ORDER BY mytable.myid RANGE BETWEEN " ":param_1 FOLLOWING AND :param_2 FOLLOWING)" " AS anon_1 FROM mytable", checkparams={"param_1": 1, "param_2": 10}, ) self.assert_compile( select([func.row_number().over(order_by=expr, range_=(-10, -1))]), "SELECT row_number() OVER " "(ORDER BY mytable.myid RANGE BETWEEN " ":param_1 PRECEDING AND :param_2 PRECEDING)" " AS anon_1 FROM mytable", checkparams={"param_1": 10, "param_2": 1}, ) def test_over_invalid_framespecs(self): assert_raises_message( exc.ArgumentError, "Integer or None expected for range value", func.row_number().over, range_=("foo", 8), ) assert_raises_message( exc.ArgumentError, "Integer or None expected for range value", func.row_number().over, range_=(-5, "foo"), ) assert_raises_message( exc.ArgumentError, "'range_' and 'rows' are mutually exclusive", func.row_number().over, range_=(-5, 8), rows=(-2, 5), ) def test_over_within_group(self): from sqlalchemy import within_group stmt = select( [ table1.c.myid, within_group( func.percentile_cont(0.5), table1.c.name.desc() ).over( range_=(1, 2), partition_by=table1.c.name, order_by=table1.c.myid, ), ] ) eq_ignore_whitespace( str(stmt), "SELECT mytable.myid, percentile_cont(:percentile_cont_1) " "WITHIN GROUP (ORDER BY mytable.name DESC) " "OVER (PARTITION BY mytable.name ORDER BY mytable.myid " "RANGE BETWEEN :param_1 FOLLOWING AND :param_2 FOLLOWING) " "AS anon_1 FROM mytable", ) stmt = select( [ table1.c.myid, within_group( func.percentile_cont(0.5), table1.c.name.desc() ).over( rows=(1, 2), partition_by=table1.c.name, order_by=table1.c.myid, ), ] ) eq_ignore_whitespace( str(stmt), "SELECT mytable.myid, percentile_cont(:percentile_cont_1) " "WITHIN GROUP (ORDER BY mytable.name DESC) " "OVER (PARTITION BY mytable.name ORDER BY mytable.myid " "ROWS BETWEEN :param_1 FOLLOWING AND :param_2 FOLLOWING) " "AS anon_1 FROM mytable", ) def test_date_between(self): import datetime table = Table("dt", metadata, Column("date", Date)) self.assert_compile( table.select( table.c.date.between( datetime.date(2006, 6, 1), datetime.date(2006, 6, 5) ) ), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2", checkparams={ "date_1": datetime.date(2006, 6, 1), "date_2": datetime.date(2006, 6, 5), }, ) self.assert_compile( table.select( sql.between( table.c.date, datetime.date(2006, 6, 1), datetime.date(2006, 6, 5), ) ), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2", checkparams={ "date_1": datetime.date(2006, 6, 1), "date_2": datetime.date(2006, 6, 5), }, ) def test_delayed_col_naming(self): my_str = Column(String) sel1 = select([my_str]) assert_raises_message( exc.InvalidRequestError, "Cannot initialize a sub-selectable with this Column", lambda: sel1.c, ) # calling label or as_scalar doesn't compile # anything. sel2 = select([func.substr(my_str, 2, 3)]).label("my_substr") assert_raises_message( exc.CompileError, "Cannot compile Column object until its 'name' is assigned.", sel2.compile, dialect=default.DefaultDialect(), ) sel3 = select([my_str]).as_scalar() assert_raises_message( exc.CompileError, "Cannot compile Column object until its 'name' is assigned.", sel3.compile, dialect=default.DefaultDialect(), ) my_str.name = "foo" self.assert_compile(sel1, "SELECT foo") self.assert_compile( sel2, "(SELECT substr(foo, :substr_2, :substr_3) AS substr_1)" ) self.assert_compile(sel3, "(SELECT foo)") def test_naming(self): # TODO: the part where we check c.keys() are not "compile" tests, they # belong probably in test_selectable, or some broken up # version of that suite f1 = func.hoho(table1.c.name) s1 = select( [ table1.c.myid, table1.c.myid.label("foobar"), f1, func.lala(table1.c.name).label("gg"), ] ) eq_(list(s1.c.keys()), ["myid", "foobar", str(f1), "gg"]) meta = MetaData() t1 = Table("mytable", meta, Column("col1", Integer)) exprs = ( table1.c.myid == 12, func.hoho(table1.c.myid), cast(table1.c.name, Numeric), literal("x"), ) for col, key, expr, lbl in ( (table1.c.name, "name", "mytable.name", None), (exprs[0], str(exprs[0]), "mytable.myid = :myid_1", "anon_1"), (exprs[1], str(exprs[1]), "hoho(mytable.myid)", "hoho_1"), ( exprs[2], str(exprs[2]), "CAST(mytable.name AS NUMERIC)", "anon_1", ), (t1.c.col1, "col1", "mytable.col1", None), ( column("some wacky thing"), "some wacky thing", '"some wacky thing"', "", ), (exprs[3], exprs[3].key, ":param_1", "anon_1"), ): if getattr(col, "table", None) is not None: t = col.table else: t = table1 s1 = select([col], from_obj=t) assert list(s1.c.keys()) == [key], list(s1.c.keys()) if lbl: self.assert_compile( s1, "SELECT %s AS %s FROM mytable" % (expr, lbl) ) else: self.assert_compile(s1, "SELECT %s FROM mytable" % (expr,)) s1 = select([s1]) if lbl: self.assert_compile( s1, "SELECT %s FROM (SELECT %s AS %s FROM mytable)" % (lbl, expr, lbl), ) elif col.table is not None: # sqlite rule labels subquery columns self.assert_compile( s1, "SELECT %s FROM (SELECT %s AS %s FROM mytable)" % (key, expr, key), ) else: self.assert_compile( s1, "SELECT %s FROM (SELECT %s FROM mytable)" % (expr, expr), ) def test_hints(self): s = select([table1.c.myid]).with_hint(table1, "test hint %(name)s") s2 = ( select([table1.c.myid]) .with_hint(table1, "index(%(name)s idx)", "oracle") .with_hint(table1, "WITH HINT INDEX idx", "sybase") ) a1 = table1.alias() s3 = select([a1.c.myid]).with_hint(a1, "index(%(name)s hint)") subs4 = ( select([table1, table2]) .select_from( table1.join(table2, table1.c.myid == table2.c.otherid) ) .with_hint(table1, "hint1") ) s4 = ( select([table3]) .select_from( table3.join(subs4, subs4.c.othername == table3.c.otherstuff) ) .with_hint(table3, "hint3") ) t1 = table("QuotedName", column("col1")) s6 = ( select([t1.c.col1]) .where(t1.c.col1 > 10) .with_hint(t1, "%(name)s idx1") ) a2 = t1.alias("SomeName") s7 = ( select([a2.c.col1]) .where(a2.c.col1 > 10) .with_hint(a2, "%(name)s idx1") ) mysql_d, oracle_d, sybase_d = ( mysql.dialect(), oracle.dialect(), sybase.dialect(), ) for stmt, dialect, expected in [ (s, mysql_d, "SELECT mytable.myid FROM mytable test hint mytable"), ( s, oracle_d, "SELECT /*+ test hint mytable */ mytable.myid FROM mytable", ), ( s, sybase_d, "SELECT mytable.myid FROM mytable test hint mytable", ), (s2, mysql_d, "SELECT mytable.myid FROM mytable"), ( s2, oracle_d, "SELECT /*+ index(mytable idx) */ mytable.myid FROM mytable", ), ( s2, sybase_d, "SELECT mytable.myid FROM mytable WITH HINT INDEX idx", ), ( s3, mysql_d, "SELECT mytable_1.myid FROM mytable AS mytable_1 " "index(mytable_1 hint)", ), ( s3, oracle_d, "SELECT /*+ index(mytable_1 hint) */ mytable_1.myid FROM " "mytable mytable_1", ), ( s3, sybase_d, "SELECT mytable_1.myid FROM mytable AS mytable_1 " "index(mytable_1 hint)", ), ( s4, mysql_d, "SELECT thirdtable.userid, thirdtable.otherstuff " "FROM thirdtable " "hint3 INNER JOIN (SELECT mytable.myid, mytable.name, " "mytable.description, myothertable.otherid, " "myothertable.othername FROM mytable hint1 INNER " "JOIN myothertable ON mytable.myid = myothertable.otherid) " "ON othername = thirdtable.otherstuff", ), ( s4, sybase_d, "SELECT thirdtable.userid, thirdtable.otherstuff " "FROM thirdtable " "hint3 JOIN (SELECT mytable.myid, mytable.name, " "mytable.description, myothertable.otherid, " "myothertable.othername FROM mytable hint1 " "JOIN myothertable ON mytable.myid = myothertable.otherid) " "ON othername = thirdtable.otherstuff", ), ( s4, oracle_d, "SELECT /*+ hint3 */ thirdtable.userid, thirdtable.otherstuff " "FROM thirdtable JOIN (SELECT /*+ hint1 */ mytable.myid," " mytable.name, mytable.description, myothertable.otherid," " myothertable.othername FROM mytable JOIN myothertable ON" " mytable.myid = myothertable.otherid) ON othername =" " thirdtable.otherstuff", ), # TODO: figure out dictionary ordering solution here # (s5, oracle_d, # "SELECT /*+ hint3 */ /*+ hint1 */ thirdtable.userid, " # "thirdtable.otherstuff " # "FROM thirdtable JOIN (SELECT mytable.myid," # " mytable.name, mytable.description, myothertable.otherid," # " myothertable.othername FROM mytable JOIN myothertable ON" # " mytable.myid = myothertable.otherid) ON othername =" # " thirdtable.otherstuff"), ( s6, oracle_d, """SELECT /*+ "QuotedName" idx1 */ "QuotedName".col1 """ """FROM "QuotedName" WHERE "QuotedName".col1 > :col1_1""", ), ( s7, oracle_d, """SELECT /*+ "SomeName" idx1 */ "SomeName".col1 FROM """ """"QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1""", ), ]: self.assert_compile(stmt, expected, dialect=dialect) def test_statement_hints(self): stmt = ( select([table1.c.myid]) .with_statement_hint("test hint one") .with_statement_hint("test hint two", "mysql") ) self.assert_compile( stmt, "SELECT mytable.myid FROM mytable test hint one" ) self.assert_compile( stmt, "SELECT mytable.myid FROM mytable test hint one test hint two", dialect="mysql", ) def test_literal_as_text_fromstring(self): self.assert_compile(and_(text("a"), text("b")), "a AND b") def test_literal_as_text_nonstring_raise(self): assert_raises(exc.ArgumentError, and_, ("a",), ("b",)) class UnsupportedTest(fixtures.TestBase): def test_unsupported_element_str_visit_name(self): from sqlalchemy.sql.expression import ClauseElement class SomeElement(ClauseElement): __visit_name__ = "some_element" assert_raises_message( exc.UnsupportedCompilationError, r"Compiler ", SomeElement().compile, ) def test_unsupported_element_meth_visit_name(self): from sqlalchemy.sql.expression import ClauseElement class SomeElement(ClauseElement): @classmethod def __visit_name__(cls): return "some_element" assert_raises_message( exc.UnsupportedCompilationError, r"Compiler ", SomeElement().compile, ) def test_unsupported_operator(self): from sqlalchemy.sql.expression import BinaryExpression def myop(x, y): pass binary = BinaryExpression(column("foo"), column("bar"), myop) assert_raises_message( exc.UnsupportedCompilationError, r"Compiler " = :param_1') def test_cte(self): # stringify of these was supported anyway by defaultdialect. stmt = select([table1.c.myid]).cte() stmt = select([stmt]) eq_ignore_whitespace( str(stmt), "WITH anon_1 AS (SELECT mytable.myid AS myid FROM mytable) " "SELECT anon_1.myid FROM anon_1", ) def test_returning(self): stmt = table1.insert().returning(table1.c.myid) eq_ignore_whitespace( str(stmt), "INSERT INTO mytable (myid, name, description) " "VALUES (:myid, :name, :description) RETURNING mytable.myid", ) def test_array_index(self): stmt = select([column("foo", types.ARRAY(Integer))[5]]) eq_ignore_whitespace(str(stmt), "SELECT foo[:foo_1] AS anon_1") def test_unknown_type(self): class MyType(types.TypeEngine): __visit_name__ = "mytype" stmt = select([cast(table1.c.myid, MyType)]) eq_ignore_whitespace( str(stmt), "SELECT CAST(mytable.myid AS MyType) AS anon_1 FROM mytable", ) def test_within_group(self): # stringify of these was supported anyway by defaultdialect. from sqlalchemy import within_group stmt = select( [ table1.c.myid, within_group(func.percentile_cont(0.5), table1.c.name.desc()), ] ) eq_ignore_whitespace( str(stmt), "SELECT mytable.myid, percentile_cont(:percentile_cont_1) " "WITHIN GROUP (ORDER BY mytable.name DESC) AS anon_1 FROM mytable", ) class KwargPropagationTest(fixtures.TestBase): @classmethod def setup_class(cls): from sqlalchemy.sql.expression import ColumnClause, TableClause class CatchCol(ColumnClause): pass class CatchTable(TableClause): pass cls.column = CatchCol("x") cls.table = CatchTable("y") cls.criterion = cls.column == CatchCol("y") @compiles(CatchCol) def compile_col(element, compiler, **kw): assert "canary" in kw return compiler.visit_column(element) @compiles(CatchTable) def compile_table(element, compiler, **kw): assert "canary" in kw return compiler.visit_table(element) def _do_test(self, element): d = default.DefaultDialect() d.statement_compiler(d, element, compile_kwargs={"canary": True}) def test_binary(self): self._do_test(self.column == 5) def test_select(self): s = ( select([self.column]) .select_from(self.table) .where(self.column == self.criterion) .order_by(self.column) ) self._do_test(s) def test_case(self): c = case([(self.criterion, self.column)], else_=self.column) self._do_test(c) def test_cast(self): c = cast(self.column, Integer) self._do_test(c) class ExecutionOptionsTest(fixtures.TestBase): def test_non_dml(self): stmt = table1.select() compiled = stmt.compile() eq_(compiled.execution_options, {}) def test_dml(self): stmt = table1.insert() compiled = stmt.compile() eq_(compiled.execution_options, {"autocommit": True}) def test_embedded_element_true_to_none(self): stmt = table1.insert().cte() eq_(stmt._execution_options, {"autocommit": True}) s2 = select([table1]).select_from(stmt) eq_(s2._execution_options, {}) compiled = s2.compile() eq_(compiled.execution_options, {"autocommit": True}) def test_embedded_element_true_to_false(self): stmt = table1.insert().cte() eq_(stmt._execution_options, {"autocommit": True}) s2 = ( select([table1]) .select_from(stmt) .execution_options(autocommit=False) ) eq_(s2._execution_options, {"autocommit": False}) compiled = s2.compile() eq_(compiled.execution_options, {"autocommit": False}) class DDLTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def _illegal_type_fixture(self): class MyType(types.TypeEngine): pass @compiles(MyType) def compile_(element, compiler, **kw): raise exc.CompileError("Couldn't compile type") return MyType def test_reraise_of_column_spec_issue(self): MyType = self._illegal_type_fixture() t1 = Table("t", MetaData(), Column("x", MyType())) assert_raises_message( exc.CompileError, r"\(in table 't', column 'x'\): Couldn't compile type", schema.CreateTable(t1).compile, ) def test_reraise_of_column_spec_issue_unicode(self): MyType = self._illegal_type_fixture() t1 = Table("t", MetaData(), Column(u("méil"), MyType())) assert_raises_message( exc.CompileError, u(r"\(in table 't', column 'méil'\): Couldn't compile type"), schema.CreateTable(t1).compile, ) def test_system_flag(self): m = MetaData() t = Table( "t", m, Column("x", Integer), Column("y", Integer, system=True), Column("z", Integer), ) self.assert_compile( schema.CreateTable(t), "CREATE TABLE t (x INTEGER, z INTEGER)" ) m2 = MetaData() t2 = t.tometadata(m2) self.assert_compile( schema.CreateTable(t2), "CREATE TABLE t (x INTEGER, z INTEGER)" ) def test_composite_pk_constraint_autoinc_first_implicit(self): m = MetaData() t = Table( "t", m, Column("a", Integer, primary_key=True), Column("b", Integer, primary_key=True, autoincrement=True), ) self.assert_compile( schema.CreateTable(t), "CREATE TABLE t (" "a INTEGER NOT NULL, " "b INTEGER NOT NULL, " "PRIMARY KEY (b, a))", ) def test_composite_pk_constraint_maintains_order_explicit(self): m = MetaData() t = Table( "t", m, Column("a", Integer), Column("b", Integer, autoincrement=True), schema.PrimaryKeyConstraint("a", "b"), ) self.assert_compile( schema.CreateTable(t), "CREATE TABLE t (" "a INTEGER NOT NULL, " "b INTEGER NOT NULL, " "PRIMARY KEY (a, b))", ) def test_create_table_suffix(self): class MyDialect(default.DefaultDialect): class MyCompiler(compiler.DDLCompiler): def create_table_suffix(self, table): return "SOME SUFFIX" ddl_compiler = MyCompiler m = MetaData() t1 = Table("t1", m, Column("q", Integer)) self.assert_compile( schema.CreateTable(t1), "CREATE TABLE t1 SOME SUFFIX (q INTEGER)", dialect=MyDialect(), ) def test_table_no_cols(self): m = MetaData() t1 = Table("t1", m) self.assert_compile(schema.CreateTable(t1), "CREATE TABLE t1 ()") def test_table_no_cols_w_constraint(self): m = MetaData() t1 = Table("t1", m, CheckConstraint("a = 1")) self.assert_compile( schema.CreateTable(t1), "CREATE TABLE t1 (CHECK (a = 1))" ) def test_table_one_col_w_constraint(self): m = MetaData() t1 = Table("t1", m, Column("q", Integer), CheckConstraint("a = 1")) self.assert_compile( schema.CreateTable(t1), "CREATE TABLE t1 (q INTEGER, CHECK (a = 1))", ) def test_schema_translate_map_table(self): m = MetaData() t1 = Table("t1", m, Column("q", Integer)) t2 = Table("t2", m, Column("q", Integer), schema="foo") t3 = Table("t3", m, Column("q", Integer), schema="bar") schema_translate_map = {None: "z", "bar": None, "foo": "bat"} self.assert_compile( schema.CreateTable(t1), "CREATE TABLE z.t1 (q INTEGER)", schema_translate_map=schema_translate_map, ) self.assert_compile( schema.CreateTable(t2), "CREATE TABLE bat.t2 (q INTEGER)", schema_translate_map=schema_translate_map, ) self.assert_compile( schema.CreateTable(t3), "CREATE TABLE t3 (q INTEGER)", schema_translate_map=schema_translate_map, ) def test_schema_translate_map_sequence(self): s1 = schema.Sequence("s1") s2 = schema.Sequence("s2", schema="foo") s3 = schema.Sequence("s3", schema="bar") schema_translate_map = {None: "z", "bar": None, "foo": "bat"} self.assert_compile( schema.CreateSequence(s1), "CREATE SEQUENCE z.s1", schema_translate_map=schema_translate_map, ) self.assert_compile( schema.CreateSequence(s2), "CREATE SEQUENCE bat.s2", schema_translate_map=schema_translate_map, ) self.assert_compile( schema.CreateSequence(s3), "CREATE SEQUENCE s3", schema_translate_map=schema_translate_map, ) class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def test_select(self): self.assert_compile( table4.select(), "SELECT remote_owner.remotetable.rem_id, " "remote_owner.remotetable.datatype_id," " remote_owner.remotetable.value " "FROM remote_owner.remotetable", ) self.assert_compile( table4.select( and_(table4.c.datatype_id == 7, table4.c.value == "hi") ), "SELECT remote_owner.remotetable.rem_id, " "remote_owner.remotetable.datatype_id," " remote_owner.remotetable.value " "FROM remote_owner.remotetable WHERE " "remote_owner.remotetable.datatype_id = :datatype_id_1 AND" " remote_owner.remotetable.value = :value_1", ) s = table4.select( and_(table4.c.datatype_id == 7, table4.c.value == "hi"), use_labels=True, ) self.assert_compile( s, "SELECT remote_owner.remotetable.rem_id AS" " remote_owner_remotetable_rem_id, " "remote_owner.remotetable.datatype_id AS" " remote_owner_remotetable_datatype_id, " "remote_owner.remotetable.value " "AS remote_owner_remotetable_value FROM " "remote_owner.remotetable WHERE " "remote_owner.remotetable.datatype_id = :datatype_id_1 AND " "remote_owner.remotetable.value = :value_1", ) # multi-part schema name self.assert_compile( table5.select(), 'SELECT "dbo.remote_owner".remotetable.rem_id, ' '"dbo.remote_owner".remotetable.datatype_id, ' '"dbo.remote_owner".remotetable.value ' 'FROM "dbo.remote_owner".remotetable', ) # multi-part schema name labels - convert '.' to '_' self.assert_compile( table5.select(use_labels=True), 'SELECT "dbo.remote_owner".remotetable.rem_id AS' " dbo_remote_owner_remotetable_rem_id, " '"dbo.remote_owner".remotetable.datatype_id' " AS dbo_remote_owner_remotetable_datatype_id," ' "dbo.remote_owner".remotetable.value AS ' "dbo_remote_owner_remotetable_value FROM" ' "dbo.remote_owner".remotetable', ) def test_schema_translate_select(self): m = MetaData() table1 = Table( "mytable", m, Column("myid", Integer), Column("name", String), Column("description", String), ) schema_translate_map = {"remote_owner": "foob", None: "bar"} self.assert_compile( table1.select().where(table1.c.name == "hi"), "SELECT bar.mytable.myid, bar.mytable.name, " "bar.mytable.description FROM bar.mytable " "WHERE bar.mytable.name = :name_1", schema_translate_map=schema_translate_map, ) self.assert_compile( table4.select().where(table4.c.value == "hi"), "SELECT foob.remotetable.rem_id, foob.remotetable.datatype_id, " "foob.remotetable.value FROM foob.remotetable " "WHERE foob.remotetable.value = :value_1", schema_translate_map=schema_translate_map, ) schema_translate_map = {"remote_owner": "foob"} self.assert_compile( select([table1, table4]).select_from( join(table1, table4, table1.c.myid == table4.c.rem_id) ), "SELECT mytable.myid, mytable.name, mytable.description, " "foob.remotetable.rem_id, foob.remotetable.datatype_id, " "foob.remotetable.value FROM mytable JOIN foob.remotetable " "ON mytable.myid = foob.remotetable.rem_id", schema_translate_map=schema_translate_map, ) def test_schema_translate_aliases(self): schema_translate_map = {None: "bar"} m = MetaData() table1 = Table( "mytable", m, Column("myid", Integer), Column("name", String), Column("description", String), ) table2 = Table( "myothertable", m, Column("otherid", Integer), Column("othername", String), ) alias = table1.alias() stmt = ( select([table2, alias]) .select_from(table2.join(alias, table2.c.otherid == alias.c.myid)) .where(alias.c.name == "foo") ) self.assert_compile( stmt, "SELECT bar.myothertable.otherid, bar.myothertable.othername, " "mytable_1.myid, mytable_1.name, mytable_1.description " "FROM bar.myothertable JOIN bar.mytable AS mytable_1 " "ON bar.myothertable.otherid = mytable_1.myid " "WHERE mytable_1.name = :name_1", schema_translate_map=schema_translate_map, ) def test_schema_translate_crud(self): schema_translate_map = {"remote_owner": "foob", None: "bar"} m = MetaData() table1 = Table( "mytable", m, Column("myid", Integer), Column("name", String), Column("description", String), ) self.assert_compile( table1.insert().values(description="foo"), "INSERT INTO bar.mytable (description) VALUES (:description)", schema_translate_map=schema_translate_map, ) self.assert_compile( table1.update() .where(table1.c.name == "hi") .values(description="foo"), "UPDATE bar.mytable SET description=:description " "WHERE bar.mytable.name = :name_1", schema_translate_map=schema_translate_map, ) self.assert_compile( table1.delete().where(table1.c.name == "hi"), "DELETE FROM bar.mytable WHERE bar.mytable.name = :name_1", schema_translate_map=schema_translate_map, ) self.assert_compile( table4.insert().values(value="there"), "INSERT INTO foob.remotetable (value) VALUES (:value)", schema_translate_map=schema_translate_map, ) self.assert_compile( table4.update() .where(table4.c.value == "hi") .values(value="there"), "UPDATE foob.remotetable SET value=:value " "WHERE foob.remotetable.value = :value_1", schema_translate_map=schema_translate_map, ) self.assert_compile( table4.delete().where(table4.c.value == "hi"), "DELETE FROM foob.remotetable WHERE " "foob.remotetable.value = :value_1", schema_translate_map=schema_translate_map, ) def test_alias(self): a = alias(table4, "remtable") self.assert_compile( a.select(a.c.datatype_id == 7), "SELECT remtable.rem_id, remtable.datatype_id, " "remtable.value FROM" " remote_owner.remotetable AS remtable " "WHERE remtable.datatype_id = :datatype_id_1", ) def test_update(self): self.assert_compile( table4.update( table4.c.value == "test", values={table4.c.datatype_id: 12} ), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id " "WHERE remote_owner.remotetable.value = :value_1", ) def test_insert(self): self.assert_compile( table4.insert(values=(2, 5, "test")), "INSERT INTO remote_owner.remotetable " "(rem_id, datatype_id, value) VALUES " "(:rem_id, :datatype_id, :value)", ) class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def test_dont_overcorrelate(self): self.assert_compile( select([table1], from_obj=[table1, table1.select()]), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable, (SELECT " "mytable.myid AS myid, mytable.name AS " "name, mytable.description AS description " "FROM mytable)", ) def _fixture(self): t1 = table("t1", column("a")) t2 = table("t2", column("a")) return t1, t2, select([t1]).where(t1.c.a == t2.c.a) def _assert_where_correlated(self, stmt): self.assert_compile( stmt, "SELECT t2.a FROM t2 WHERE t2.a = " "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)", ) def _assert_where_all_correlated(self, stmt): self.assert_compile( stmt, "SELECT t1.a, t2.a FROM t1, t2 WHERE t2.a = " "(SELECT t1.a WHERE t1.a = t2.a)", ) # note there's no more "backwards" correlation after # we've done #2746 # def _assert_where_backwards_correlated(self, stmt): # self.assert_compile( # stmt, # "SELECT t2.a FROM t2 WHERE t2.a = " # "(SELECT t1.a FROM t2 WHERE t1.a = t2.a)") # def _assert_column_backwards_correlated(self, stmt): # self.assert_compile(stmt, # "SELECT t2.a, (SELECT t1.a FROM t2 WHERE t1.a = t2.a) " # "AS anon_1 FROM t2") def _assert_column_correlated(self, stmt): self.assert_compile( stmt, "SELECT t2.a, (SELECT t1.a FROM t1 WHERE t1.a = t2.a) " "AS anon_1 FROM t2", ) def _assert_column_all_correlated(self, stmt): self.assert_compile( stmt, "SELECT t1.a, t2.a, " "(SELECT t1.a WHERE t1.a = t2.a) AS anon_1 FROM t1, t2", ) def _assert_having_correlated(self, stmt): self.assert_compile( stmt, "SELECT t2.a FROM t2 HAVING t2.a = " "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)", ) def _assert_from_uncorrelated(self, stmt): self.assert_compile( stmt, "SELECT t2.a, anon_1.a FROM t2, " "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1", ) def _assert_from_all_uncorrelated(self, stmt): self.assert_compile( stmt, "SELECT t1.a, t2.a, anon_1.a FROM t1, t2, " "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1", ) def _assert_where_uncorrelated(self, stmt): self.assert_compile( stmt, "SELECT t2.a FROM t2 WHERE t2.a = " "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)", ) def _assert_column_uncorrelated(self, stmt): self.assert_compile( stmt, "SELECT t2.a, (SELECT t1.a FROM t1, t2 " "WHERE t1.a = t2.a) AS anon_1 FROM t2", ) def _assert_having_uncorrelated(self, stmt): self.assert_compile( stmt, "SELECT t2.a FROM t2 HAVING t2.a = " "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)", ) def _assert_where_single_full_correlated(self, stmt): self.assert_compile( stmt, "SELECT t1.a FROM t1 WHERE t1.a = (SELECT t1.a)" ) def test_correlate_semiauto_where(self): t1, t2, s1 = self._fixture() self._assert_where_correlated( select([t2]).where(t2.c.a == s1.correlate(t2)) ) def test_correlate_semiauto_column(self): t1, t2, s1 = self._fixture() self._assert_column_correlated( select([t2, s1.correlate(t2).as_scalar()]) ) def test_correlate_semiauto_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated(select([t2, s1.correlate(t2).alias()])) def test_correlate_semiauto_having(self): t1, t2, s1 = self._fixture() self._assert_having_correlated( select([t2]).having(t2.c.a == s1.correlate(t2)) ) def test_correlate_except_inclusion_where(self): t1, t2, s1 = self._fixture() self._assert_where_correlated( select([t2]).where(t2.c.a == s1.correlate_except(t1)) ) def test_correlate_except_exclusion_where(self): t1, t2, s1 = self._fixture() self._assert_where_uncorrelated( select([t2]).where(t2.c.a == s1.correlate_except(t2)) ) def test_correlate_except_inclusion_column(self): t1, t2, s1 = self._fixture() self._assert_column_correlated( select([t2, s1.correlate_except(t1).as_scalar()]) ) def test_correlate_except_exclusion_column(self): t1, t2, s1 = self._fixture() self._assert_column_uncorrelated( select([t2, s1.correlate_except(t2).as_scalar()]) ) def test_correlate_except_inclusion_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated( select([t2, s1.correlate_except(t1).alias()]) ) def test_correlate_except_exclusion_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated( select([t2, s1.correlate_except(t2).alias()]) ) def test_correlate_except_none(self): t1, t2, s1 = self._fixture() self._assert_where_all_correlated( select([t1, t2]).where(t2.c.a == s1.correlate_except(None)) ) def test_correlate_except_having(self): t1, t2, s1 = self._fixture() self._assert_having_correlated( select([t2]).having(t2.c.a == s1.correlate_except(t1)) ) def test_correlate_auto_where(self): t1, t2, s1 = self._fixture() self._assert_where_correlated(select([t2]).where(t2.c.a == s1)) def test_correlate_auto_column(self): t1, t2, s1 = self._fixture() self._assert_column_correlated(select([t2, s1.as_scalar()])) def test_correlate_auto_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated(select([t2, s1.alias()])) def test_correlate_auto_having(self): t1, t2, s1 = self._fixture() self._assert_having_correlated(select([t2]).having(t2.c.a == s1)) def test_correlate_disabled_where(self): t1, t2, s1 = self._fixture() self._assert_where_uncorrelated( select([t2]).where(t2.c.a == s1.correlate(None)) ) def test_correlate_disabled_column(self): t1, t2, s1 = self._fixture() self._assert_column_uncorrelated( select([t2, s1.correlate(None).as_scalar()]) ) def test_correlate_disabled_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated( select([t2, s1.correlate(None).alias()]) ) def test_correlate_disabled_having(self): t1, t2, s1 = self._fixture() self._assert_having_uncorrelated( select([t2]).having(t2.c.a == s1.correlate(None)) ) def test_correlate_all_where(self): t1, t2, s1 = self._fixture() self._assert_where_all_correlated( select([t1, t2]).where(t2.c.a == s1.correlate(t1, t2)) ) def test_correlate_all_column(self): t1, t2, s1 = self._fixture() self._assert_column_all_correlated( select([t1, t2, s1.correlate(t1, t2).as_scalar()]) ) def test_correlate_all_from(self): t1, t2, s1 = self._fixture() self._assert_from_all_uncorrelated( select([t1, t2, s1.correlate(t1, t2).alias()]) ) def test_correlate_where_all_unintentional(self): t1, t2, s1 = self._fixture() assert_raises_message( exc.InvalidRequestError, "returned no FROM clauses due to auto-correlation", select([t1, t2]).where(t2.c.a == s1).compile, ) def test_correlate_from_all_ok(self): t1, t2, s1 = self._fixture() self.assert_compile( select([t1, t2, s1]), "SELECT t1.a, t2.a, a FROM t1, t2, " "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a)", ) def test_correlate_auto_where_singlefrom(self): t1, t2, s1 = self._fixture() s = select([t1.c.a]) s2 = select([t1]).where(t1.c.a == s) self.assert_compile( s2, "SELECT t1.a FROM t1 WHERE t1.a = " "(SELECT t1.a FROM t1)" ) def test_correlate_semiauto_where_singlefrom(self): t1, t2, s1 = self._fixture() s = select([t1.c.a]) s2 = select([t1]).where(t1.c.a == s.correlate(t1)) self._assert_where_single_full_correlated(s2) def test_correlate_except_semiauto_where_singlefrom(self): t1, t2, s1 = self._fixture() s = select([t1.c.a]) s2 = select([t1]).where(t1.c.a == s.correlate_except(t2)) self._assert_where_single_full_correlated(s2) def test_correlate_alone_noeffect(self): # new as of #2668 t1, t2, s1 = self._fixture() self.assert_compile( s1.correlate(t1, t2), "SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a" ) def test_correlate_except_froms(self): # new as of #2748 t1 = table("t1", column("a")) t2 = table("t2", column("a"), column("b")) s = select([t2.c.b]).where(t1.c.a == t2.c.a) s = s.correlate_except(t2).alias("s") s2 = select([func.foo(s.c.b)]).as_scalar() s3 = select([t1], order_by=s2) self.assert_compile( s3, "SELECT t1.a FROM t1 ORDER BY " "(SELECT foo(s.b) AS foo_1 FROM " "(SELECT t2.b AS b FROM t2 WHERE t1.a = t2.a) AS s)", ) def test_multilevel_froms_correlation(self): # new as of #2748 p = table("parent", column("id")) c = table("child", column("id"), column("parent_id"), column("pos")) s = ( c.select() .where(c.c.parent_id == p.c.id) .order_by(c.c.pos) .limit(1) ) s = s.correlate(p) s = exists().select_from(s).where(s.c.id == 1) s = select([p]).where(s) self.assert_compile( s, "SELECT parent.id FROM parent WHERE EXISTS (SELECT * " "FROM (SELECT child.id AS id, child.parent_id AS parent_id, " "child.pos AS pos FROM child WHERE child.parent_id = parent.id " "ORDER BY child.pos LIMIT :param_1) WHERE id = :id_1)", ) def test_no_contextless_correlate_except(self): # new as of #2748 t1 = table("t1", column("x")) t2 = table("t2", column("y")) t3 = table("t3", column("z")) s = ( select([t1]) .where(t1.c.x == t2.c.y) .where(t2.c.y == t3.c.z) .correlate_except(t1) ) self.assert_compile( s, "SELECT t1.x FROM t1, t2, t3 WHERE t1.x = t2.y AND t2.y = t3.z" ) def test_multilevel_implicit_correlation_disabled(self): # test that implicit correlation with multilevel WHERE correlation # behaves like 0.8.1, 0.7 (i.e. doesn't happen) t1 = table("t1", column("x")) t2 = table("t2", column("y")) t3 = table("t3", column("z")) s = select([t1.c.x]).where(t1.c.x == t2.c.y) s2 = select([t3.c.z]).where(t3.c.z == s.as_scalar()) s3 = select([t1]).where(t1.c.x == s2.as_scalar()) self.assert_compile( s3, "SELECT t1.x FROM t1 " "WHERE t1.x = (SELECT t3.z " "FROM t3 " "WHERE t3.z = (SELECT t1.x " "FROM t1, t2 " "WHERE t1.x = t2.y))", ) def test_from_implicit_correlation_disabled(self): # test that implicit correlation with immediate and # multilevel FROM clauses behaves like 0.8.1 (i.e. doesn't happen) t1 = table("t1", column("x")) t2 = table("t2", column("y")) s = select([t1.c.x]).where(t1.c.x == t2.c.y) s2 = select([t2, s]) s3 = select([t1, s2]) self.assert_compile( s3, "SELECT t1.x, y, x FROM t1, " "(SELECT t2.y AS y, x FROM t2, " "(SELECT t1.x AS x FROM t1, t2 WHERE t1.x = t2.y))", ) class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = default.DefaultDialect(supports_native_boolean=True) def _fixture(self): m = MetaData() return Table("foo", m, Column("id", Integer)) bool_table = table("t", column("x", Boolean)) def test_coerce_bool_where(self): self.assert_compile( select([self.bool_table]).where(self.bool_table.c.x), "SELECT t.x FROM t WHERE t.x", ) def test_coerce_bool_where_non_native(self): self.assert_compile( select([self.bool_table]).where(self.bool_table.c.x), "SELECT t.x FROM t WHERE t.x = 1", dialect=default.DefaultDialect(supports_native_boolean=False), ) self.assert_compile( select([self.bool_table]).where(~self.bool_table.c.x), "SELECT t.x FROM t WHERE t.x = 0", dialect=default.DefaultDialect(supports_native_boolean=False), ) def test_null_constant(self): self.assert_compile(_literal_as_text(None), "NULL") def test_false_constant(self): self.assert_compile(_literal_as_text(False), "false") def test_true_constant(self): self.assert_compile(_literal_as_text(True), "true") def test_val_and_false(self): t = self._fixture() self.assert_compile(and_(t.c.id == 1, False), "false") def test_val_and_true_coerced(self): t = self._fixture() self.assert_compile(and_(t.c.id == 1, True), "foo.id = :id_1") def test_val_is_null_coerced(self): t = self._fixture() self.assert_compile(and_(t.c.id == None), "foo.id IS NULL") # noqa def test_val_and_None(self): t = self._fixture() self.assert_compile(and_(t.c.id == 1, None), "foo.id = :id_1 AND NULL") def test_None_and_val(self): t = self._fixture() self.assert_compile(and_(None, t.c.id == 1), "NULL AND foo.id = :id_1") def test_None_and_nothing(self): # current convention is None in and_() # returns None May want # to revise this at some point. self.assert_compile(and_(None), "NULL") def test_val_and_null(self): t = self._fixture() self.assert_compile( and_(t.c.id == 1, null()), "foo.id = :id_1 AND NULL" ) class ResultMapTest(fixtures.TestBase): """test the behavior of the 'entry stack' and the determination when the result_map needs to be populated. """ def test_compound_populates(self): t = Table("t", MetaData(), Column("a", Integer), Column("b", Integer)) stmt = select([t]).union(select([t])) comp = stmt.compile() eq_( comp._create_result_map(), { "a": ("a", (t.c.a, "a", "a"), t.c.a.type), "b": ("b", (t.c.b, "b", "b"), t.c.b.type), }, ) def test_compound_not_toplevel_doesnt_populate(self): t = Table("t", MetaData(), Column("a", Integer), Column("b", Integer)) subq = select([t]).union(select([t])) stmt = select([t.c.a]).select_from(t.join(subq, t.c.a == subq.c.a)) comp = stmt.compile() eq_( comp._create_result_map(), {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)}, ) def test_compound_only_top_populates(self): t = Table("t", MetaData(), Column("a", Integer), Column("b", Integer)) stmt = select([t.c.a]).union(select([t.c.b])) comp = stmt.compile() eq_( comp._create_result_map(), {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)}, ) def test_label_plus_element(self): t = Table("t", MetaData(), Column("a", Integer)) l1 = t.c.a.label("bar") tc = type_coerce(t.c.a, String) stmt = select([t.c.a, l1, tc]) comp = stmt.compile() tc_anon_label = comp._create_result_map()["anon_1"][1][0] eq_( comp._create_result_map(), { "a": ("a", (t.c.a, "a", "a"), t.c.a.type), "bar": ("bar", (l1, "bar"), l1.type), "anon_1": ( "%%(%d anon)s" % id(tc), (tc_anon_label, "anon_1", tc), tc.type, ), }, ) def test_label_conflict_union(self): t1 = Table( "t1", MetaData(), Column("a", Integer), Column("b", Integer) ) t2 = Table("t2", MetaData(), Column("t1_a", Integer)) union = select([t2]).union(select([t2])).alias() t1_alias = t1.alias() stmt = ( select([t1, t1_alias]) .select_from(t1.join(union, t1.c.a == union.c.t1_a)) .apply_labels() ) comp = stmt.compile() eq_( set(comp._create_result_map()), set(["t1_1_b", "t1_1_a", "t1_a", "t1_b"]), ) is_(comp._create_result_map()["t1_a"][1][2], t1.c.a) def test_insert_with_select_values(self): astring = Column("a", String) aint = Column("a", Integer) m = MetaData() Table("t1", m, astring) t2 = Table("t2", m, aint) stmt = t2.insert().values(a=select([astring])).returning(aint) comp = stmt.compile(dialect=postgresql.dialect()) eq_( comp._create_result_map(), {"a": ("a", (aint, "a", "a"), aint.type)}, ) def test_insert_from_select(self): astring = Column("a", String) aint = Column("a", Integer) m = MetaData() Table("t1", m, astring) t2 = Table("t2", m, aint) stmt = ( t2.insert().from_select(["a"], select([astring])).returning(aint) ) comp = stmt.compile(dialect=postgresql.dialect()) eq_( comp._create_result_map(), {"a": ("a", (aint, "a", "a"), aint.type)}, ) def test_nested_api(self): from sqlalchemy.engine.result import ResultMetaData stmt2 = select([table2]) stmt1 = select([table1]).select_from(stmt2) contexts = {} int_ = Integer() class MyCompiler(compiler.SQLCompiler): def visit_select(self, stmt, *arg, **kw): if stmt is stmt2: with self._nested_result() as nested: contexts[stmt2] = nested text = super(MyCompiler, self).visit_select(stmt2) self._add_to_result_map("k1", "k1", (1, 2, 3), int_) else: text = super(MyCompiler, self).visit_select( stmt, *arg, **kw ) self._add_to_result_map("k2", "k2", (3, 4, 5), int_) return text comp = MyCompiler(default.DefaultDialect(), stmt1) eq_( ResultMetaData._create_result_map(contexts[stmt2][0]), { "otherid": ( "otherid", (table2.c.otherid, "otherid", "otherid"), table2.c.otherid.type, ), "othername": ( "othername", (table2.c.othername, "othername", "othername"), table2.c.othername.type, ), "k1": ("k1", (1, 2, 3), int_), }, ) eq_( comp._create_result_map(), { "myid": ( "myid", (table1.c.myid, "myid", "myid"), table1.c.myid.type, ), "k2": ("k2", (3, 4, 5), int_), "name": ( "name", (table1.c.name, "name", "name"), table1.c.name.type, ), "description": ( "description", (table1.c.description, "description", "description"), table1.c.description.type, ), }, ) def test_select_wraps_for_translate_ambiguity(self): # test for issue #3657 t = table("a", column("x"), column("y"), column("z")) l1, l2, l3 = t.c.z.label("a"), t.c.x.label("b"), t.c.x.label("c") orig = [t.c.x, t.c.y, l1, l2, l3] stmt = select(orig) wrapped = stmt._generate() wrapped = wrapped.column( func.ROW_NUMBER().over(order_by=t.c.z) ).alias() wrapped_again = select([c for c in wrapped.c]) compiled = wrapped_again.compile( compile_kwargs={"select_wraps_for": stmt} ) proxied = [obj[0] for (k, n, obj, type_) in compiled._result_columns] for orig_obj, proxied_obj in zip(orig, proxied): is_(orig_obj, proxied_obj) def test_select_wraps_for_translate_ambiguity_dupe_cols(self): # test for issue #3657 t = table("a", column("x"), column("y"), column("z")) l1, l2, l3 = t.c.z.label("a"), t.c.x.label("b"), t.c.x.label("c") orig = [t.c.x, t.c.y, l1, l2, l3] # create the statement with some duplicate columns. right now # the behavior is that these redundant columns are deduped. stmt = select([t.c.x, t.c.y, l1, t.c.y, l2, t.c.x, l3]) # so the statement has 7 inner columns... eq_(len(list(stmt.inner_columns)), 7) # but only exposes 5 of them, the other two are dupes of x and y eq_(len(stmt.c), 5) # and when it generates a SELECT it will also render only 5 eq_(len(stmt._columns_plus_names), 5) wrapped = stmt._generate() wrapped = wrapped.column( func.ROW_NUMBER().over(order_by=t.c.z) ).alias() # so when we wrap here we're going to have only 5 columns wrapped_again = select([c for c in wrapped.c]) # so the compiler logic that matches up the "wrapper" to the # "select_wraps_for" can't use inner_columns to match because # these collections are not the same compiled = wrapped_again.compile( compile_kwargs={"select_wraps_for": stmt} ) proxied = [obj[0] for (k, n, obj, type_) in compiled._result_columns] for orig_obj, proxied_obj in zip(orig, proxied): is_(orig_obj, proxied_obj)