diff options
Diffstat (limited to 'test/sql/test_compiler.py')
-rw-r--r-- | test/sql/test_compiler.py | 645 |
1 files changed, 246 insertions, 399 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index 3b8aed23f..9cd893c1a 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -14,8 +14,8 @@ from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message from sqlalchemy import testing from sqlalchemy.testing import fixtures, AssertsCompiledSQL from sqlalchemy import Integer, String, MetaData, Table, Column, select, \ - func, not_, cast, text, tuple_, exists, delete, update, bindparam,\ - insert, literal, and_, null, type_coerce, alias, or_, literal_column,\ + func, not_, cast, text, tuple_, exists, update, bindparam,\ + literal, and_, null, type_coerce, alias, or_, literal_column,\ Float, TIMESTAMP, Numeric, Date, Text, collate, union, except_,\ intersect, union_all, Boolean, distinct, join, outerjoin, asc, desc,\ over, subquery, case @@ -87,6 +87,7 @@ keyed = Table('keyed', metadata, Column('z', Integer), ) + class SelectTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -424,35 +425,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "AS z FROM keyed) AS anon_2) AS anon_1" ) - def test_dont_overcorrelate(self): - self.assert_compile(select([table1], from_obj=[table1, - table1.select()]), - "SELECT mytable.myid, mytable.name, " - "mytable.description FROM mytable, (SELECT " - "mytable.myid AS myid, mytable.name AS " - "name, mytable.description AS description " - "FROM mytable)") - - def test_full_correlate(self): - # intentional - t = table('t', column('a'), column('b')) - s = select([t.c.a]).where(t.c.a == 1).correlate(t).as_scalar() - - s2 = select([t.c.a, s]) - self.assert_compile(s2, - "SELECT t.a, (SELECT t.a WHERE t.a = :a_1) AS anon_1 FROM t") - - # unintentional - t2 = table('t2', column('c'), column('d')) - s = select([t.c.a]).where(t.c.a == t2.c.d).as_scalar() - s2 = select([t, t2, s]) - assert_raises(exc.InvalidRequestError, str, s2) - - # intentional again - s = s.correlate(t, t2) - s2 = select([t, t2, s]) - self.assert_compile(s, "SELECT t.a WHERE t.a = t2.d") - def test_exists(self): s = select([table1.c.myid]).where(table1.c.myid == 5) @@ -2239,14 +2211,14 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): assert_raises_message( exc.CompileError, - "Cannot compile Column object until it's 'name' is assigned.", + "Cannot compile Column object until its 'name' is assigned.", str, sel2 ) sel3 = select([my_str]).as_scalar() assert_raises_message( exc.CompileError, - "Cannot compile Column object until it's 'name' is assigned.", + "Cannot compile Column object until its 'name' is assigned.", str, sel3 ) @@ -2488,326 +2460,6 @@ class KwargPropagationTest(fixtures.TestBase): class CRUDTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' - def test_insert(self): - # generic insert, will create bind params for all columns - self.assert_compile(insert(table1), - "INSERT INTO mytable (myid, name, description) " - "VALUES (:myid, :name, :description)") - - # insert with user-supplied bind params for specific columns, - # cols provided literally - self.assert_compile( - insert(table1, { - table1.c.myid: bindparam('userid'), - table1.c.name: bindparam('username')}), - "INSERT INTO mytable (myid, name) VALUES (:userid, :username)") - - # insert with user-supplied bind params for specific columns, cols - # provided as strings - self.assert_compile( - insert(table1, dict(myid=3, name='jack')), - "INSERT INTO mytable (myid, name) VALUES (:myid, :name)" - ) - - # test with a tuple of params instead of named - self.assert_compile( - insert(table1, (3, 'jack', 'mydescription')), - "INSERT INTO mytable (myid, name, description) VALUES " - "(:myid, :name, :description)", - checkparams={ - 'myid': 3, 'name': 'jack', 'description': 'mydescription'} - ) - - self.assert_compile( - insert(table1, values={ - table1.c.myid: bindparam('userid') - }).values( - {table1.c.name: bindparam('username')}), - "INSERT INTO mytable (myid, name) VALUES (:userid, :username)" - ) - - self.assert_compile( - insert(table1, values=dict(myid=func.lala())), - "INSERT INTO mytable (myid) VALUES (lala())") - - def test_insert_prefix(self): - stmt = table1.insert().prefix_with("A", "B", dialect="mysql").\ - prefix_with("C", "D") - self.assert_compile(stmt, - "INSERT A B C D INTO mytable (myid, name, description) " - "VALUES (%s, %s, %s)", dialect=mysql.dialect() - ) - self.assert_compile(stmt, - "INSERT C D INTO mytable (myid, name, description) " - "VALUES (:myid, :name, :description)") - - def test_inline_default_insert(self): - metadata = MetaData() - table = Table('sometable', metadata, - Column('id', Integer, primary_key=True), - Column('foo', Integer, default=func.foobar())) - self.assert_compile( - table.insert(values={}, inline=True), - "INSERT INTO sometable (foo) VALUES (foobar())") - self.assert_compile( - table.insert(inline=True), - "INSERT INTO sometable (foo) VALUES (foobar())", params={}) - - def test_insert_returning_not_in_default(self): - stmt = table1.insert().returning(table1.c.myid) - assert_raises_message( - exc.CompileError, - "RETURNING is not supported by this dialect's statement compiler.", - stmt.compile - ) - - def test_empty_insert_default(self): - stmt = table1.insert().values({}) # hide from 2to3 - self.assert_compile(stmt, "INSERT INTO mytable () VALUES ()") - - def test_empty_insert_default_values(self): - stmt = table1.insert().values({}) # hide from 2to3 - dialect = default.DefaultDialect() - dialect.supports_empty_insert = dialect.supports_default_values = True - self.assert_compile(stmt, "INSERT INTO mytable DEFAULT VALUES", - dialect=dialect) - - def test_empty_insert_not_supported(self): - stmt = table1.insert().values({}) # hide from 2to3 - dialect = default.DefaultDialect() - dialect.supports_empty_insert = dialect.supports_default_values = False - assert_raises_message( - exc.CompileError, - "The 'default' dialect with current database version " - "settings does not support empty inserts.", - stmt.compile, dialect=dialect - ) - - def test_multivalues_insert_not_supported(self): - stmt = table1.insert().values([{"myid": 1}, {"myid": 2}]) - dialect = default.DefaultDialect() - assert_raises_message( - exc.CompileError, - "The 'default' dialect with current database version settings " - "does not support in-place multirow inserts.", - stmt.compile, dialect=dialect - ) - - def test_multivalues_insert_named(self): - stmt = table1.insert().\ - values([{"myid": 1, "name": 'a', "description": 'b'}, - {"myid": 2, "name": 'c', "description": 'd'}, - {"myid": 3, "name": 'e', "description": 'f'} - ]) - - result = "INSERT INTO mytable (myid, name, description) VALUES " \ - "(:myid_0, :name_0, :description_0), " \ - "(:myid_1, :name_1, :description_1), " \ - "(:myid_2, :name_2, :description_2)" - - dialect = default.DefaultDialect() - dialect.supports_multivalues_insert = True - self.assert_compile(stmt, result, - checkparams={ - 'description_2': 'f', 'name_2': 'e', - 'name_0': 'a', 'name_1': 'c', 'myid_2': 3, - 'description_0': 'b', 'myid_0': 1, - 'myid_1': 2, 'description_1': 'd' - }, - dialect=dialect) - - def test_multivalues_insert_positional(self): - stmt = table1.insert().\ - values([{"myid": 1, "name": 'a', "description": 'b'}, - {"myid": 2, "name": 'c', "description": 'd'}, - {"myid": 3, "name": 'e', "description": 'f'} - ]) - - result = "INSERT INTO mytable (myid, name, description) VALUES " \ - "(%s, %s, %s), " \ - "(%s, %s, %s), " \ - "(%s, %s, %s)" \ - - dialect = default.DefaultDialect() - dialect.supports_multivalues_insert = True - dialect.paramstyle = "format" - dialect.positional = True - self.assert_compile(stmt, result, - checkpositional=(1, 'a', 'b', 2, 'c', 'd', 3, 'e', 'f'), - dialect=dialect) - - def test_multirow_inline_default_insert(self): - metadata = MetaData() - table = Table('sometable', metadata, - Column('id', Integer, primary_key=True), - Column('data', String), - Column('foo', Integer, default=func.foobar())) - - stmt = table.insert().\ - values([ - {"id": 1, "data": "data1"}, - {"id": 2, "data": "data2", "foo": "plainfoo"}, - {"id": 3, "data": "data3"}, - ]) - result = "INSERT INTO sometable (id, data, foo) VALUES "\ - "(%(id_0)s, %(data_0)s, foobar()), "\ - "(%(id_1)s, %(data_1)s, %(foo_1)s), "\ - "(%(id_2)s, %(data_2)s, foobar())" - - self.assert_compile(stmt, result, - checkparams={'data_2': 'data3', 'id_0': 1, 'id_2': 3, - 'foo_1': 'plainfoo', 'data_1': 'data2', - 'id_1': 2, 'data_0': 'data1'}, - dialect=postgresql.dialect()) - - def test_multirow_server_default_insert(self): - metadata = MetaData() - table = Table('sometable', metadata, - Column('id', Integer, primary_key=True), - Column('data', String), - Column('foo', Integer, server_default=func.foobar())) - - stmt = table.insert().\ - values([ - {"id": 1, "data": "data1"}, - {"id": 2, "data": "data2", "foo": "plainfoo"}, - {"id": 3, "data": "data3"}, - ]) - result = "INSERT INTO sometable (id, data) VALUES "\ - "(%(id_0)s, %(data_0)s), "\ - "(%(id_1)s, %(data_1)s), "\ - "(%(id_2)s, %(data_2)s)" - - self.assert_compile(stmt, result, - checkparams={'data_2': 'data3', 'id_0': 1, 'id_2': 3, - 'data_1': 'data2', - 'id_1': 2, 'data_0': 'data1'}, - dialect=postgresql.dialect()) - - stmt = table.insert().\ - values([ - {"id": 1, "data": "data1", "foo": "plainfoo"}, - {"id": 2, "data": "data2"}, - {"id": 3, "data": "data3", "foo": "otherfoo"}, - ]) - - # note the effect here is that the first set of params - # takes effect for the rest of them, when one is absent - result = "INSERT INTO sometable (id, data, foo) VALUES "\ - "(%(id_0)s, %(data_0)s, %(foo_0)s), "\ - "(%(id_1)s, %(data_1)s, %(foo_0)s), "\ - "(%(id_2)s, %(data_2)s, %(foo_2)s)" - - self.assert_compile(stmt, result, - checkparams={'data_2': 'data3', 'id_0': 1, 'id_2': 3, - 'data_1': 'data2', - "foo_0": "plainfoo", - "foo_2": "otherfoo", - 'id_1': 2, 'data_0': 'data1'}, - dialect=postgresql.dialect()) - - def test_update(self): - self.assert_compile( - update(table1, table1.c.myid == 7), - "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1", - params={table1.c.name: 'fred'}) - self.assert_compile( - table1.update().where(table1.c.myid == 7). - values({table1.c.myid: 5}), - "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1", - checkparams={'myid': 5, 'myid_1': 7}) - self.assert_compile( - update(table1, table1.c.myid == 7), - "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1", - params={'name': 'fred'}) - self.assert_compile( - update(table1, values={table1.c.name: table1.c.myid}), - "UPDATE mytable SET name=mytable.myid") - self.assert_compile( - update(table1, - whereclause=table1.c.name == bindparam('crit'), - values={table1.c.name: 'hi'}), - "UPDATE mytable SET name=:name WHERE mytable.name = :crit", - params={'crit': 'notthere'}, - checkparams={'crit': 'notthere', 'name': 'hi'}) - self.assert_compile( - update(table1, table1.c.myid == 12, - values={table1.c.name: table1.c.myid}), - "UPDATE mytable SET name=mytable.myid, description=" - ":description WHERE mytable.myid = :myid_1", - params={'description': 'test'}, - checkparams={'description': 'test', 'myid_1': 12}) - self.assert_compile( - update(table1, table1.c.myid == 12, - values={table1.c.myid: 9}), - "UPDATE mytable SET myid=:myid, description=:description " - "WHERE mytable.myid = :myid_1", - params={'myid_1': 12, 'myid': 9, 'description': 'test'}) - self.assert_compile( - update(table1, table1.c.myid == 12), - "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1", - params={'myid': 18}, checkparams={'myid': 18, 'myid_1': 12}) - s = table1.update(table1.c.myid == 12, values={table1.c.name: 'lala'}) - c = s.compile(column_keys=['id', 'name']) - self.assert_compile( - update(table1, table1.c.myid == 12, - values={table1.c.name: table1.c.myid} - ).values({table1.c.name: table1.c.name + 'foo'}), - "UPDATE mytable SET name=(mytable.name || :name_1), " - "description=:description WHERE mytable.myid = :myid_1", - params={'description': 'test'}) - eq_(str(s), str(c)) - - self.assert_compile(update(table1, - (table1.c.myid == func.hoho(4)) & - (table1.c.name == literal('foo') + - table1.c.name + literal('lala')), - values={ - table1.c.name: table1.c.name + "lala", - table1.c.myid: func.do_stuff(table1.c.myid, literal('hoho')) - }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), " - "name=(mytable.name || :name_1) " - "WHERE mytable.myid = hoho(:hoho_1) " - "AND mytable.name = :param_2 || " - "mytable.name || :param_3") - - def test_update_prefix(self): - stmt = table1.update().prefix_with("A", "B", dialect="mysql").\ - prefix_with("C", "D") - self.assert_compile(stmt, - "UPDATE A B C D mytable SET myid=%s, name=%s, description=%s", - dialect=mysql.dialect() - ) - self.assert_compile(stmt, - "UPDATE C D mytable SET myid=:myid, name=:name, " - "description=:description") - - def test_aliased_update(self): - talias1 = table1.alias('t1') - self.assert_compile( - update(talias1, talias1.c.myid == 7), - "UPDATE mytable AS t1 SET name=:name WHERE t1.myid = :myid_1", - params={table1.c.name: 'fred'}) - self.assert_compile( - update(talias1, table1.c.myid == 7), - "UPDATE mytable AS t1 SET name=:name FROM " - "mytable WHERE mytable.myid = :myid_1", - params={table1.c.name: 'fred'}) - - def test_update_to_expression(self): - """test update from an expression. - - this logic is triggered currently by a left side that doesn't - have a key. The current supported use case is updating the index - of a Postgresql ARRAY type. - - """ - expr = func.foo(table1.c.myid) - assert not hasattr(expr, "key") - self.assert_compile( - table1.update().values({expr: 'bar'}), - "UPDATE mytable SET foo(myid)=:param_1" - ) def test_correlated_update(self): # test against a straight text subquery @@ -2880,51 +2532,6 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL): "AND myothertable.othername = mytable_1.name", dialect=mssql.dialect()) - def test_delete(self): - self.assert_compile( - delete(table1, table1.c.myid == 7), - "DELETE FROM mytable WHERE mytable.myid = :myid_1") - self.assert_compile( - table1.delete().where(table1.c.myid == 7), - "DELETE FROM mytable WHERE mytable.myid = :myid_1") - self.assert_compile( - table1.delete().where(table1.c.myid == 7).\ - where(table1.c.name == 'somename'), - "DELETE FROM mytable WHERE mytable.myid = :myid_1 " - "AND mytable.name = :name_1") - - def test_delete_prefix(self): - stmt = table1.delete().prefix_with("A", "B", dialect="mysql").\ - prefix_with("C", "D") - self.assert_compile(stmt, - "DELETE A B C D FROM mytable", - dialect=mysql.dialect() - ) - self.assert_compile(stmt, - "DELETE C D FROM mytable") - - def test_aliased_delete(self): - talias1 = table1.alias('t1') - self.assert_compile( - delete(talias1).where(talias1.c.myid == 7), - "DELETE FROM mytable AS t1 WHERE t1.myid = :myid_1") - - def test_correlated_delete(self): - # test a non-correlated WHERE clause - s = select([table2.c.othername], table2.c.otherid == 7) - u = delete(table1, table1.c.name == s) - self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = " - "(SELECT myothertable.othername FROM myothertable " - "WHERE myothertable.otherid = :otherid_1)") - - # test one that is actually correlated... - s = select([table2.c.othername], table2.c.otherid == table1.c.myid) - u = table1.delete(table1.c.name == s) - self.assert_compile(u, - "DELETE FROM mytable WHERE mytable.name = (SELECT " - "myothertable.othername FROM myothertable WHERE " - "myothertable.otherid = mytable.myid)") - def test_binds_that_match_columns(self): """test bind params named after column names replace the normal SET/VALUES generation.""" @@ -3189,6 +2796,246 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): "(:rem_id, :datatype_id, :value)") +class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_dont_overcorrelate(self): + self.assert_compile(select([table1], from_obj=[table1, + table1.select()]), + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable, (SELECT " + "mytable.myid AS myid, mytable.name AS " + "name, mytable.description AS description " + "FROM mytable)") + + def _fixture(self): + t1 = table('t1', column('a')) + t2 = table('t2', column('a')) + return t1, t2, select([t1]).where(t1.c.a == t2.c.a) + + def _assert_where_correlated(self, stmt): + self.assert_compile( + stmt, + "SELECT t2.a FROM t2 WHERE t2.a = " + "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)") + + def _assert_where_all_correlated(self, stmt): + self.assert_compile( + stmt, + "SELECT t1.a, t2.a FROM t1, t2 WHERE t2.a = " + "(SELECT t1.a WHERE t1.a = t2.a)") + + def _assert_where_backwards_correlated(self, stmt): + self.assert_compile( + stmt, + "SELECT t2.a FROM t2 WHERE t2.a = " + "(SELECT t1.a FROM t2 WHERE t1.a = t2.a)") + + def _assert_column_correlated(self, stmt): + self.assert_compile(stmt, + "SELECT t2.a, (SELECT t1.a FROM t1 WHERE t1.a = t2.a) " + "AS anon_1 FROM t2") + + def _assert_column_all_correlated(self, stmt): + self.assert_compile(stmt, + "SELECT t1.a, t2.a, " + "(SELECT t1.a WHERE t1.a = t2.a) AS anon_1 FROM t1, t2") + + def _assert_column_backwards_correlated(self, stmt): + self.assert_compile(stmt, + "SELECT t2.a, (SELECT t1.a FROM t2 WHERE t1.a = t2.a) " + "AS anon_1 FROM t2") + + def _assert_having_correlated(self, stmt): + self.assert_compile(stmt, + "SELECT t2.a FROM t2 HAVING t2.a = " + "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)") + + def _assert_from_uncorrelated(self, stmt): + self.assert_compile(stmt, + "SELECT t2.a, anon_1.a FROM t2, " + "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1") + + def _assert_from_all_uncorrelated(self, stmt): + self.assert_compile(stmt, + "SELECT t1.a, t2.a, anon_1.a FROM t1, t2, " + "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1") + + def _assert_where_uncorrelated(self, stmt): + self.assert_compile(stmt, + "SELECT t2.a FROM t2 WHERE t2.a = " + "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)") + + def _assert_column_uncorrelated(self, stmt): + self.assert_compile(stmt, + "SELECT t2.a, (SELECT t1.a FROM t1, t2 " + "WHERE t1.a = t2.a) AS anon_1 FROM t2") + + def _assert_having_uncorrelated(self, stmt): + self.assert_compile(stmt, + "SELECT t2.a FROM t2 HAVING t2.a = " + "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)") + + def _assert_where_single_full_correlated(self, stmt): + self.assert_compile(stmt, + "SELECT t1.a FROM t1 WHERE t1.a = (SELECT t1.a)") + + def test_correlate_semiauto_where(self): + t1, t2, s1 = self._fixture() + self._assert_where_correlated( + select([t2]).where(t2.c.a == s1.correlate(t2))) + + def test_correlate_semiauto_column(self): + t1, t2, s1 = self._fixture() + self._assert_column_correlated( + select([t2, s1.correlate(t2).as_scalar()])) + + def test_correlate_semiauto_from(self): + t1, t2, s1 = self._fixture() + self._assert_from_uncorrelated( + select([t2, s1.correlate(t2).alias()])) + + def test_correlate_semiauto_having(self): + t1, t2, s1 = self._fixture() + self._assert_having_correlated( + select([t2]).having(t2.c.a == s1.correlate(t2))) + + def test_correlate_except_inclusion_where(self): + t1, t2, s1 = self._fixture() + self._assert_where_correlated( + select([t2]).where(t2.c.a == s1.correlate_except(t1))) + + def test_correlate_except_exclusion_where(self): + t1, t2, s1 = self._fixture() + self._assert_where_backwards_correlated( + select([t2]).where(t2.c.a == s1.correlate_except(t2))) + + def test_correlate_except_inclusion_column(self): + t1, t2, s1 = self._fixture() + self._assert_column_correlated( + select([t2, s1.correlate_except(t1).as_scalar()])) + + def test_correlate_except_exclusion_column(self): + t1, t2, s1 = self._fixture() + self._assert_column_backwards_correlated( + select([t2, s1.correlate_except(t2).as_scalar()])) + + def test_correlate_except_inclusion_from(self): + t1, t2, s1 = self._fixture() + self._assert_from_uncorrelated( + select([t2, s1.correlate_except(t1).alias()])) + + def test_correlate_except_exclusion_from(self): + t1, t2, s1 = self._fixture() + self._assert_from_uncorrelated( + select([t2, s1.correlate_except(t2).alias()])) + + def test_correlate_except_having(self): + t1, t2, s1 = self._fixture() + self._assert_having_correlated( + select([t2]).having(t2.c.a == s1.correlate_except(t1))) + + def test_correlate_auto_where(self): + t1, t2, s1 = self._fixture() + self._assert_where_correlated( + select([t2]).where(t2.c.a == s1)) + + def test_correlate_auto_column(self): + t1, t2, s1 = self._fixture() + self._assert_column_correlated( + select([t2, s1.as_scalar()])) + + def test_correlate_auto_from(self): + t1, t2, s1 = self._fixture() + self._assert_from_uncorrelated( + select([t2, s1.alias()])) + + def test_correlate_auto_having(self): + t1, t2, s1 = self._fixture() + self._assert_having_correlated( + select([t2]).having(t2.c.a == s1)) + + def test_correlate_disabled_where(self): + t1, t2, s1 = self._fixture() + self._assert_where_uncorrelated( + select([t2]).where(t2.c.a == s1.correlate(None))) + + def test_correlate_disabled_column(self): + t1, t2, s1 = self._fixture() + self._assert_column_uncorrelated( + select([t2, s1.correlate(None).as_scalar()])) + + def test_correlate_disabled_from(self): + t1, t2, s1 = self._fixture() + self._assert_from_uncorrelated( + select([t2, s1.correlate(None).alias()])) + + def test_correlate_disabled_having(self): + t1, t2, s1 = self._fixture() + self._assert_having_uncorrelated( + select([t2]).having(t2.c.a == s1.correlate(None))) + + def test_correlate_all_where(self): + t1, t2, s1 = self._fixture() + self._assert_where_all_correlated( + select([t1, t2]).where(t2.c.a == s1.correlate(t1, t2))) + + def test_correlate_all_column(self): + t1, t2, s1 = self._fixture() + self._assert_column_all_correlated( + select([t1, t2, s1.correlate(t1, t2).as_scalar()])) + + def test_correlate_all_from(self): + t1, t2, s1 = self._fixture() + self._assert_from_all_uncorrelated( + select([t1, t2, s1.correlate(t1, t2).alias()])) + + def test_correlate_where_all_unintentional(self): + t1, t2, s1 = self._fixture() + assert_raises_message( + exc.InvalidRequestError, + "returned no FROM clauses due to auto-correlation", + select([t1, t2]).where(t2.c.a == s1).compile + ) + + def test_correlate_from_all_ok(self): + t1, t2, s1 = self._fixture() + self.assert_compile( + select([t1, t2, s1]), + "SELECT t1.a, t2.a, a FROM t1, t2, " + "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a)" + ) + + def test_correlate_auto_where_singlefrom(self): + t1, t2, s1 = self._fixture() + s = select([t1.c.a]) + s2 = select([t1]).where(t1.c.a == s) + self.assert_compile(s2, + "SELECT t1.a FROM t1 WHERE t1.a = " + "(SELECT t1.a FROM t1)") + + def test_correlate_semiauto_where_singlefrom(self): + t1, t2, s1 = self._fixture() + + s = select([t1.c.a]) + + s2 = select([t1]).where(t1.c.a == s.correlate(t1)) + self._assert_where_single_full_correlated(s2) + + def test_correlate_except_semiauto_where_singlefrom(self): + t1, t2, s1 = self._fixture() + + s = select([t1.c.a]) + + s2 = select([t1]).where(t1.c.a == s.correlate_except(t2)) + self._assert_where_single_full_correlated(s2) + + def test_correlate_alone_noeffect(self): + # new as of #2668 + t1, t2, s1 = self._fixture() + self.assert_compile(s1.correlate(t1, t2), + "SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a") + class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -3315,4 +3162,4 @@ class ResultMapTest(fixtures.TestBase): ) is_( comp.result_map['t1_a'][1][2], t1.c.a - )
\ No newline at end of file + ) |