summaryrefslogtreecommitdiff
path: root/test/sql/test_compiler.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql/test_compiler.py')
-rw-r--r--test/sql/test_compiler.py645
1 files changed, 246 insertions, 399 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index 3b8aed23f..9cd893c1a 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -14,8 +14,8 @@ from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message
from sqlalchemy import testing
from sqlalchemy.testing import fixtures, AssertsCompiledSQL
from sqlalchemy import Integer, String, MetaData, Table, Column, select, \
- func, not_, cast, text, tuple_, exists, delete, update, bindparam,\
- insert, literal, and_, null, type_coerce, alias, or_, literal_column,\
+ func, not_, cast, text, tuple_, exists, update, bindparam,\
+ literal, and_, null, type_coerce, alias, or_, literal_column,\
Float, TIMESTAMP, Numeric, Date, Text, collate, union, except_,\
intersect, union_all, Boolean, distinct, join, outerjoin, asc, desc,\
over, subquery, case
@@ -87,6 +87,7 @@ keyed = Table('keyed', metadata,
Column('z', Integer),
)
+
class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
@@ -424,35 +425,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"AS z FROM keyed) AS anon_2) AS anon_1"
)
- def test_dont_overcorrelate(self):
- self.assert_compile(select([table1], from_obj=[table1,
- table1.select()]),
- "SELECT mytable.myid, mytable.name, "
- "mytable.description FROM mytable, (SELECT "
- "mytable.myid AS myid, mytable.name AS "
- "name, mytable.description AS description "
- "FROM mytable)")
-
- def test_full_correlate(self):
- # intentional
- t = table('t', column('a'), column('b'))
- s = select([t.c.a]).where(t.c.a == 1).correlate(t).as_scalar()
-
- s2 = select([t.c.a, s])
- self.assert_compile(s2,
- "SELECT t.a, (SELECT t.a WHERE t.a = :a_1) AS anon_1 FROM t")
-
- # unintentional
- t2 = table('t2', column('c'), column('d'))
- s = select([t.c.a]).where(t.c.a == t2.c.d).as_scalar()
- s2 = select([t, t2, s])
- assert_raises(exc.InvalidRequestError, str, s2)
-
- # intentional again
- s = s.correlate(t, t2)
- s2 = select([t, t2, s])
- self.assert_compile(s, "SELECT t.a WHERE t.a = t2.d")
-
def test_exists(self):
s = select([table1.c.myid]).where(table1.c.myid == 5)
@@ -2239,14 +2211,14 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
assert_raises_message(
exc.CompileError,
- "Cannot compile Column object until it's 'name' is assigned.",
+ "Cannot compile Column object until its 'name' is assigned.",
str, sel2
)
sel3 = select([my_str]).as_scalar()
assert_raises_message(
exc.CompileError,
- "Cannot compile Column object until it's 'name' is assigned.",
+ "Cannot compile Column object until its 'name' is assigned.",
str, sel3
)
@@ -2488,326 +2460,6 @@ class KwargPropagationTest(fixtures.TestBase):
class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
- def test_insert(self):
- # generic insert, will create bind params for all columns
- self.assert_compile(insert(table1),
- "INSERT INTO mytable (myid, name, description) "
- "VALUES (:myid, :name, :description)")
-
- # insert with user-supplied bind params for specific columns,
- # cols provided literally
- self.assert_compile(
- insert(table1, {
- table1.c.myid: bindparam('userid'),
- table1.c.name: bindparam('username')}),
- "INSERT INTO mytable (myid, name) VALUES (:userid, :username)")
-
- # insert with user-supplied bind params for specific columns, cols
- # provided as strings
- self.assert_compile(
- insert(table1, dict(myid=3, name='jack')),
- "INSERT INTO mytable (myid, name) VALUES (:myid, :name)"
- )
-
- # test with a tuple of params instead of named
- self.assert_compile(
- insert(table1, (3, 'jack', 'mydescription')),
- "INSERT INTO mytable (myid, name, description) VALUES "
- "(:myid, :name, :description)",
- checkparams={
- 'myid': 3, 'name': 'jack', 'description': 'mydescription'}
- )
-
- self.assert_compile(
- insert(table1, values={
- table1.c.myid: bindparam('userid')
- }).values(
- {table1.c.name: bindparam('username')}),
- "INSERT INTO mytable (myid, name) VALUES (:userid, :username)"
- )
-
- self.assert_compile(
- insert(table1, values=dict(myid=func.lala())),
- "INSERT INTO mytable (myid) VALUES (lala())")
-
- def test_insert_prefix(self):
- stmt = table1.insert().prefix_with("A", "B", dialect="mysql").\
- prefix_with("C", "D")
- self.assert_compile(stmt,
- "INSERT A B C D INTO mytable (myid, name, description) "
- "VALUES (%s, %s, %s)", dialect=mysql.dialect()
- )
- self.assert_compile(stmt,
- "INSERT C D INTO mytable (myid, name, description) "
- "VALUES (:myid, :name, :description)")
-
- def test_inline_default_insert(self):
- metadata = MetaData()
- table = Table('sometable', metadata,
- Column('id', Integer, primary_key=True),
- Column('foo', Integer, default=func.foobar()))
- self.assert_compile(
- table.insert(values={}, inline=True),
- "INSERT INTO sometable (foo) VALUES (foobar())")
- self.assert_compile(
- table.insert(inline=True),
- "INSERT INTO sometable (foo) VALUES (foobar())", params={})
-
- def test_insert_returning_not_in_default(self):
- stmt = table1.insert().returning(table1.c.myid)
- assert_raises_message(
- exc.CompileError,
- "RETURNING is not supported by this dialect's statement compiler.",
- stmt.compile
- )
-
- def test_empty_insert_default(self):
- stmt = table1.insert().values({}) # hide from 2to3
- self.assert_compile(stmt, "INSERT INTO mytable () VALUES ()")
-
- def test_empty_insert_default_values(self):
- stmt = table1.insert().values({}) # hide from 2to3
- dialect = default.DefaultDialect()
- dialect.supports_empty_insert = dialect.supports_default_values = True
- self.assert_compile(stmt, "INSERT INTO mytable DEFAULT VALUES",
- dialect=dialect)
-
- def test_empty_insert_not_supported(self):
- stmt = table1.insert().values({}) # hide from 2to3
- dialect = default.DefaultDialect()
- dialect.supports_empty_insert = dialect.supports_default_values = False
- assert_raises_message(
- exc.CompileError,
- "The 'default' dialect with current database version "
- "settings does not support empty inserts.",
- stmt.compile, dialect=dialect
- )
-
- def test_multivalues_insert_not_supported(self):
- stmt = table1.insert().values([{"myid": 1}, {"myid": 2}])
- dialect = default.DefaultDialect()
- assert_raises_message(
- exc.CompileError,
- "The 'default' dialect with current database version settings "
- "does not support in-place multirow inserts.",
- stmt.compile, dialect=dialect
- )
-
- def test_multivalues_insert_named(self):
- stmt = table1.insert().\
- values([{"myid": 1, "name": 'a', "description": 'b'},
- {"myid": 2, "name": 'c', "description": 'd'},
- {"myid": 3, "name": 'e', "description": 'f'}
- ])
-
- result = "INSERT INTO mytable (myid, name, description) VALUES " \
- "(:myid_0, :name_0, :description_0), " \
- "(:myid_1, :name_1, :description_1), " \
- "(:myid_2, :name_2, :description_2)"
-
- dialect = default.DefaultDialect()
- dialect.supports_multivalues_insert = True
- self.assert_compile(stmt, result,
- checkparams={
- 'description_2': 'f', 'name_2': 'e',
- 'name_0': 'a', 'name_1': 'c', 'myid_2': 3,
- 'description_0': 'b', 'myid_0': 1,
- 'myid_1': 2, 'description_1': 'd'
- },
- dialect=dialect)
-
- def test_multivalues_insert_positional(self):
- stmt = table1.insert().\
- values([{"myid": 1, "name": 'a', "description": 'b'},
- {"myid": 2, "name": 'c', "description": 'd'},
- {"myid": 3, "name": 'e', "description": 'f'}
- ])
-
- result = "INSERT INTO mytable (myid, name, description) VALUES " \
- "(%s, %s, %s), " \
- "(%s, %s, %s), " \
- "(%s, %s, %s)" \
-
- dialect = default.DefaultDialect()
- dialect.supports_multivalues_insert = True
- dialect.paramstyle = "format"
- dialect.positional = True
- self.assert_compile(stmt, result,
- checkpositional=(1, 'a', 'b', 2, 'c', 'd', 3, 'e', 'f'),
- dialect=dialect)
-
- def test_multirow_inline_default_insert(self):
- metadata = MetaData()
- table = Table('sometable', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String),
- Column('foo', Integer, default=func.foobar()))
-
- stmt = table.insert().\
- values([
- {"id": 1, "data": "data1"},
- {"id": 2, "data": "data2", "foo": "plainfoo"},
- {"id": 3, "data": "data3"},
- ])
- result = "INSERT INTO sometable (id, data, foo) VALUES "\
- "(%(id_0)s, %(data_0)s, foobar()), "\
- "(%(id_1)s, %(data_1)s, %(foo_1)s), "\
- "(%(id_2)s, %(data_2)s, foobar())"
-
- self.assert_compile(stmt, result,
- checkparams={'data_2': 'data3', 'id_0': 1, 'id_2': 3,
- 'foo_1': 'plainfoo', 'data_1': 'data2',
- 'id_1': 2, 'data_0': 'data1'},
- dialect=postgresql.dialect())
-
- def test_multirow_server_default_insert(self):
- metadata = MetaData()
- table = Table('sometable', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String),
- Column('foo', Integer, server_default=func.foobar()))
-
- stmt = table.insert().\
- values([
- {"id": 1, "data": "data1"},
- {"id": 2, "data": "data2", "foo": "plainfoo"},
- {"id": 3, "data": "data3"},
- ])
- result = "INSERT INTO sometable (id, data) VALUES "\
- "(%(id_0)s, %(data_0)s), "\
- "(%(id_1)s, %(data_1)s), "\
- "(%(id_2)s, %(data_2)s)"
-
- self.assert_compile(stmt, result,
- checkparams={'data_2': 'data3', 'id_0': 1, 'id_2': 3,
- 'data_1': 'data2',
- 'id_1': 2, 'data_0': 'data1'},
- dialect=postgresql.dialect())
-
- stmt = table.insert().\
- values([
- {"id": 1, "data": "data1", "foo": "plainfoo"},
- {"id": 2, "data": "data2"},
- {"id": 3, "data": "data3", "foo": "otherfoo"},
- ])
-
- # note the effect here is that the first set of params
- # takes effect for the rest of them, when one is absent
- result = "INSERT INTO sometable (id, data, foo) VALUES "\
- "(%(id_0)s, %(data_0)s, %(foo_0)s), "\
- "(%(id_1)s, %(data_1)s, %(foo_0)s), "\
- "(%(id_2)s, %(data_2)s, %(foo_2)s)"
-
- self.assert_compile(stmt, result,
- checkparams={'data_2': 'data3', 'id_0': 1, 'id_2': 3,
- 'data_1': 'data2',
- "foo_0": "plainfoo",
- "foo_2": "otherfoo",
- 'id_1': 2, 'data_0': 'data1'},
- dialect=postgresql.dialect())
-
- def test_update(self):
- self.assert_compile(
- update(table1, table1.c.myid == 7),
- "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
- params={table1.c.name: 'fred'})
- self.assert_compile(
- table1.update().where(table1.c.myid == 7).
- values({table1.c.myid: 5}),
- "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
- checkparams={'myid': 5, 'myid_1': 7})
- self.assert_compile(
- update(table1, table1.c.myid == 7),
- "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
- params={'name': 'fred'})
- self.assert_compile(
- update(table1, values={table1.c.name: table1.c.myid}),
- "UPDATE mytable SET name=mytable.myid")
- self.assert_compile(
- update(table1,
- whereclause=table1.c.name == bindparam('crit'),
- values={table1.c.name: 'hi'}),
- "UPDATE mytable SET name=:name WHERE mytable.name = :crit",
- params={'crit': 'notthere'},
- checkparams={'crit': 'notthere', 'name': 'hi'})
- self.assert_compile(
- update(table1, table1.c.myid == 12,
- values={table1.c.name: table1.c.myid}),
- "UPDATE mytable SET name=mytable.myid, description="
- ":description WHERE mytable.myid = :myid_1",
- params={'description': 'test'},
- checkparams={'description': 'test', 'myid_1': 12})
- self.assert_compile(
- update(table1, table1.c.myid == 12,
- values={table1.c.myid: 9}),
- "UPDATE mytable SET myid=:myid, description=:description "
- "WHERE mytable.myid = :myid_1",
- params={'myid_1': 12, 'myid': 9, 'description': 'test'})
- self.assert_compile(
- update(table1, table1.c.myid == 12),
- "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
- params={'myid': 18}, checkparams={'myid': 18, 'myid_1': 12})
- s = table1.update(table1.c.myid == 12, values={table1.c.name: 'lala'})
- c = s.compile(column_keys=['id', 'name'])
- self.assert_compile(
- update(table1, table1.c.myid == 12,
- values={table1.c.name: table1.c.myid}
- ).values({table1.c.name: table1.c.name + 'foo'}),
- "UPDATE mytable SET name=(mytable.name || :name_1), "
- "description=:description WHERE mytable.myid = :myid_1",
- params={'description': 'test'})
- eq_(str(s), str(c))
-
- self.assert_compile(update(table1,
- (table1.c.myid == func.hoho(4)) &
- (table1.c.name == literal('foo') +
- table1.c.name + literal('lala')),
- values={
- table1.c.name: table1.c.name + "lala",
- table1.c.myid: func.do_stuff(table1.c.myid, literal('hoho'))
- }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), "
- "name=(mytable.name || :name_1) "
- "WHERE mytable.myid = hoho(:hoho_1) "
- "AND mytable.name = :param_2 || "
- "mytable.name || :param_3")
-
- def test_update_prefix(self):
- stmt = table1.update().prefix_with("A", "B", dialect="mysql").\
- prefix_with("C", "D")
- self.assert_compile(stmt,
- "UPDATE A B C D mytable SET myid=%s, name=%s, description=%s",
- dialect=mysql.dialect()
- )
- self.assert_compile(stmt,
- "UPDATE C D mytable SET myid=:myid, name=:name, "
- "description=:description")
-
- def test_aliased_update(self):
- talias1 = table1.alias('t1')
- self.assert_compile(
- update(talias1, talias1.c.myid == 7),
- "UPDATE mytable AS t1 SET name=:name WHERE t1.myid = :myid_1",
- params={table1.c.name: 'fred'})
- self.assert_compile(
- update(talias1, table1.c.myid == 7),
- "UPDATE mytable AS t1 SET name=:name FROM "
- "mytable WHERE mytable.myid = :myid_1",
- params={table1.c.name: 'fred'})
-
- def test_update_to_expression(self):
- """test update from an expression.
-
- this logic is triggered currently by a left side that doesn't
- have a key. The current supported use case is updating the index
- of a Postgresql ARRAY type.
-
- """
- expr = func.foo(table1.c.myid)
- assert not hasattr(expr, "key")
- self.assert_compile(
- table1.update().values({expr: 'bar'}),
- "UPDATE mytable SET foo(myid)=:param_1"
- )
def test_correlated_update(self):
# test against a straight text subquery
@@ -2880,51 +2532,6 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
"AND myothertable.othername = mytable_1.name",
dialect=mssql.dialect())
- def test_delete(self):
- self.assert_compile(
- delete(table1, table1.c.myid == 7),
- "DELETE FROM mytable WHERE mytable.myid = :myid_1")
- self.assert_compile(
- table1.delete().where(table1.c.myid == 7),
- "DELETE FROM mytable WHERE mytable.myid = :myid_1")
- self.assert_compile(
- table1.delete().where(table1.c.myid == 7).\
- where(table1.c.name == 'somename'),
- "DELETE FROM mytable WHERE mytable.myid = :myid_1 "
- "AND mytable.name = :name_1")
-
- def test_delete_prefix(self):
- stmt = table1.delete().prefix_with("A", "B", dialect="mysql").\
- prefix_with("C", "D")
- self.assert_compile(stmt,
- "DELETE A B C D FROM mytable",
- dialect=mysql.dialect()
- )
- self.assert_compile(stmt,
- "DELETE C D FROM mytable")
-
- def test_aliased_delete(self):
- talias1 = table1.alias('t1')
- self.assert_compile(
- delete(talias1).where(talias1.c.myid == 7),
- "DELETE FROM mytable AS t1 WHERE t1.myid = :myid_1")
-
- def test_correlated_delete(self):
- # test a non-correlated WHERE clause
- s = select([table2.c.othername], table2.c.otherid == 7)
- u = delete(table1, table1.c.name == s)
- self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = "
- "(SELECT myothertable.othername FROM myothertable "
- "WHERE myothertable.otherid = :otherid_1)")
-
- # test one that is actually correlated...
- s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
- u = table1.delete(table1.c.name == s)
- self.assert_compile(u,
- "DELETE FROM mytable WHERE mytable.name = (SELECT "
- "myothertable.othername FROM myothertable WHERE "
- "myothertable.otherid = mytable.myid)")
-
def test_binds_that_match_columns(self):
"""test bind params named after column names
replace the normal SET/VALUES generation."""
@@ -3189,6 +2796,246 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
"(:rem_id, :datatype_id, :value)")
+class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def test_dont_overcorrelate(self):
+ self.assert_compile(select([table1], from_obj=[table1,
+ table1.select()]),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable, (SELECT "
+ "mytable.myid AS myid, mytable.name AS "
+ "name, mytable.description AS description "
+ "FROM mytable)")
+
+ def _fixture(self):
+ t1 = table('t1', column('a'))
+ t2 = table('t2', column('a'))
+ return t1, t2, select([t1]).where(t1.c.a == t2.c.a)
+
+ def _assert_where_correlated(self, stmt):
+ self.assert_compile(
+ stmt,
+ "SELECT t2.a FROM t2 WHERE t2.a = "
+ "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)")
+
+ def _assert_where_all_correlated(self, stmt):
+ self.assert_compile(
+ stmt,
+ "SELECT t1.a, t2.a FROM t1, t2 WHERE t2.a = "
+ "(SELECT t1.a WHERE t1.a = t2.a)")
+
+ def _assert_where_backwards_correlated(self, stmt):
+ self.assert_compile(
+ stmt,
+ "SELECT t2.a FROM t2 WHERE t2.a = "
+ "(SELECT t1.a FROM t2 WHERE t1.a = t2.a)")
+
+ def _assert_column_correlated(self, stmt):
+ self.assert_compile(stmt,
+ "SELECT t2.a, (SELECT t1.a FROM t1 WHERE t1.a = t2.a) "
+ "AS anon_1 FROM t2")
+
+ def _assert_column_all_correlated(self, stmt):
+ self.assert_compile(stmt,
+ "SELECT t1.a, t2.a, "
+ "(SELECT t1.a WHERE t1.a = t2.a) AS anon_1 FROM t1, t2")
+
+ def _assert_column_backwards_correlated(self, stmt):
+ self.assert_compile(stmt,
+ "SELECT t2.a, (SELECT t1.a FROM t2 WHERE t1.a = t2.a) "
+ "AS anon_1 FROM t2")
+
+ def _assert_having_correlated(self, stmt):
+ self.assert_compile(stmt,
+ "SELECT t2.a FROM t2 HAVING t2.a = "
+ "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)")
+
+ def _assert_from_uncorrelated(self, stmt):
+ self.assert_compile(stmt,
+ "SELECT t2.a, anon_1.a FROM t2, "
+ "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1")
+
+ def _assert_from_all_uncorrelated(self, stmt):
+ self.assert_compile(stmt,
+ "SELECT t1.a, t2.a, anon_1.a FROM t1, t2, "
+ "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1")
+
+ def _assert_where_uncorrelated(self, stmt):
+ self.assert_compile(stmt,
+ "SELECT t2.a FROM t2 WHERE t2.a = "
+ "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)")
+
+ def _assert_column_uncorrelated(self, stmt):
+ self.assert_compile(stmt,
+ "SELECT t2.a, (SELECT t1.a FROM t1, t2 "
+ "WHERE t1.a = t2.a) AS anon_1 FROM t2")
+
+ def _assert_having_uncorrelated(self, stmt):
+ self.assert_compile(stmt,
+ "SELECT t2.a FROM t2 HAVING t2.a = "
+ "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)")
+
+ def _assert_where_single_full_correlated(self, stmt):
+ self.assert_compile(stmt,
+ "SELECT t1.a FROM t1 WHERE t1.a = (SELECT t1.a)")
+
+ def test_correlate_semiauto_where(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_where_correlated(
+ select([t2]).where(t2.c.a == s1.correlate(t2)))
+
+ def test_correlate_semiauto_column(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_column_correlated(
+ select([t2, s1.correlate(t2).as_scalar()]))
+
+ def test_correlate_semiauto_from(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_from_uncorrelated(
+ select([t2, s1.correlate(t2).alias()]))
+
+ def test_correlate_semiauto_having(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_having_correlated(
+ select([t2]).having(t2.c.a == s1.correlate(t2)))
+
+ def test_correlate_except_inclusion_where(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_where_correlated(
+ select([t2]).where(t2.c.a == s1.correlate_except(t1)))
+
+ def test_correlate_except_exclusion_where(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_where_backwards_correlated(
+ select([t2]).where(t2.c.a == s1.correlate_except(t2)))
+
+ def test_correlate_except_inclusion_column(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_column_correlated(
+ select([t2, s1.correlate_except(t1).as_scalar()]))
+
+ def test_correlate_except_exclusion_column(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_column_backwards_correlated(
+ select([t2, s1.correlate_except(t2).as_scalar()]))
+
+ def test_correlate_except_inclusion_from(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_from_uncorrelated(
+ select([t2, s1.correlate_except(t1).alias()]))
+
+ def test_correlate_except_exclusion_from(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_from_uncorrelated(
+ select([t2, s1.correlate_except(t2).alias()]))
+
+ def test_correlate_except_having(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_having_correlated(
+ select([t2]).having(t2.c.a == s1.correlate_except(t1)))
+
+ def test_correlate_auto_where(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_where_correlated(
+ select([t2]).where(t2.c.a == s1))
+
+ def test_correlate_auto_column(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_column_correlated(
+ select([t2, s1.as_scalar()]))
+
+ def test_correlate_auto_from(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_from_uncorrelated(
+ select([t2, s1.alias()]))
+
+ def test_correlate_auto_having(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_having_correlated(
+ select([t2]).having(t2.c.a == s1))
+
+ def test_correlate_disabled_where(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_where_uncorrelated(
+ select([t2]).where(t2.c.a == s1.correlate(None)))
+
+ def test_correlate_disabled_column(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_column_uncorrelated(
+ select([t2, s1.correlate(None).as_scalar()]))
+
+ def test_correlate_disabled_from(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_from_uncorrelated(
+ select([t2, s1.correlate(None).alias()]))
+
+ def test_correlate_disabled_having(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_having_uncorrelated(
+ select([t2]).having(t2.c.a == s1.correlate(None)))
+
+ def test_correlate_all_where(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_where_all_correlated(
+ select([t1, t2]).where(t2.c.a == s1.correlate(t1, t2)))
+
+ def test_correlate_all_column(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_column_all_correlated(
+ select([t1, t2, s1.correlate(t1, t2).as_scalar()]))
+
+ def test_correlate_all_from(self):
+ t1, t2, s1 = self._fixture()
+ self._assert_from_all_uncorrelated(
+ select([t1, t2, s1.correlate(t1, t2).alias()]))
+
+ def test_correlate_where_all_unintentional(self):
+ t1, t2, s1 = self._fixture()
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "returned no FROM clauses due to auto-correlation",
+ select([t1, t2]).where(t2.c.a == s1).compile
+ )
+
+ def test_correlate_from_all_ok(self):
+ t1, t2, s1 = self._fixture()
+ self.assert_compile(
+ select([t1, t2, s1]),
+ "SELECT t1.a, t2.a, a FROM t1, t2, "
+ "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a)"
+ )
+
+ def test_correlate_auto_where_singlefrom(self):
+ t1, t2, s1 = self._fixture()
+ s = select([t1.c.a])
+ s2 = select([t1]).where(t1.c.a == s)
+ self.assert_compile(s2,
+ "SELECT t1.a FROM t1 WHERE t1.a = "
+ "(SELECT t1.a FROM t1)")
+
+ def test_correlate_semiauto_where_singlefrom(self):
+ t1, t2, s1 = self._fixture()
+
+ s = select([t1.c.a])
+
+ s2 = select([t1]).where(t1.c.a == s.correlate(t1))
+ self._assert_where_single_full_correlated(s2)
+
+ def test_correlate_except_semiauto_where_singlefrom(self):
+ t1, t2, s1 = self._fixture()
+
+ s = select([t1.c.a])
+
+ s2 = select([t1]).where(t1.c.a == s.correlate_except(t2))
+ self._assert_where_single_full_correlated(s2)
+
+ def test_correlate_alone_noeffect(self):
+ # new as of #2668
+ t1, t2, s1 = self._fixture()
+ self.assert_compile(s1.correlate(t1, t2),
+ "SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a")
+
class CoercionTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
@@ -3315,4 +3162,4 @@ class ResultMapTest(fixtures.TestBase):
)
is_(
comp.result_map['t1_a'][1][2], t1.c.a
- ) \ No newline at end of file
+ )