summaryrefslogtreecommitdiff
path: root/test/sql/test_generative.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql/test_generative.py')
-rw-r--r--test/sql/test_generative.py520
1 files changed, 294 insertions, 226 deletions
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py
index 8a366f757..51a8a77cc 100644
--- a/test/sql/test_generative.py
+++ b/test/sql/test_generative.py
@@ -1,6 +1,8 @@
-from sqlalchemy import *
from sqlalchemy.sql import table, column, ClauseElement, operators
-from sqlalchemy.sql.expression import _clone, _from_objects
+from sqlalchemy.sql.expression import _clone, _from_objects
+from sqlalchemy import func, select, Integer, Table, \
+ Column, MetaData, extract, String, bindparam, tuple_, and_, union, text,\
+ case, ForeignKey
from sqlalchemy.testing import fixtures, AssertsExecutionResults, \
AssertsCompiledSQL
from sqlalchemy import testing
@@ -10,7 +12,11 @@ from sqlalchemy import exc
from sqlalchemy.sql import util as sql_util
from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message
+A = B = t1 = t2 = t3 = table1 = table2 = table3 = table4 = None
+
+
class TraversalTest(fixtures.TestBase, AssertsExecutionResults):
+
"""test ClauseVisitor's traversal, particularly its
ability to copy and modify a ClauseElement in place."""
@@ -83,7 +89,7 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):
struct = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
struct2 = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
struct3 = B(a1, A("expr2"), B(A("expr1b"),
- A("expr2bmodified")), A("expr3"))
+ A("expr2bmodified")), A("expr3"))
assert a1.is_other(a1)
assert struct.is_other(struct)
@@ -94,11 +100,13 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):
def test_clone(self):
struct = B(A("expr1"), A("expr2"), B(A("expr1b"),
- A("expr2b")), A("expr3"))
+ A("expr2b")), A("expr3"))
class Vis(CloningVisitor):
+
def visit_a(self, a):
pass
+
def visit_b(self, b):
pass
@@ -109,11 +117,13 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):
def test_no_clone(self):
struct = B(A("expr1"), A("expr2"), B(A("expr1b"),
- A("expr2b")), A("expr3"))
+ A("expr2b")), A("expr3"))
class Vis(ClauseVisitor):
+
def visit_a(self, a):
pass
+
def visit_b(self, b):
pass
@@ -124,16 +134,18 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):
def test_change_in_place(self):
struct = B(A("expr1"), A("expr2"), B(A("expr1b"),
- A("expr2b")), A("expr3"))
+ A("expr2b")), A("expr3"))
struct2 = B(A("expr1"), A("expr2modified"), B(A("expr1b"),
- A("expr2b")), A("expr3"))
+ A("expr2b")), A("expr3"))
struct3 = B(A("expr1"), A("expr2"), B(A("expr1b"),
- A("expr2bmodified")), A("expr3"))
+ A("expr2bmodified")), A("expr3"))
class Vis(CloningVisitor):
+
def visit_a(self, a):
if a.expr == "expr2":
a.expr = "expr2modified"
+
def visit_b(self, b):
pass
@@ -144,9 +156,11 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):
assert struct2 == s2
class Vis2(CloningVisitor):
+
def visit_a(self, a):
if a.expr == "expr2b":
a.expr = "expr2bmodified"
+
def visit_b(self, b):
pass
@@ -169,11 +183,14 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):
set(ClauseVisitor().iterate(bin))
assert set(ClauseVisitor().iterate(bin)) == set([foo, bar, bin])
+
class BinaryEndpointTraversalTest(fixtures.TestBase):
+
"""test the special binary product visit"""
def _assert_traversal(self, expr, expected):
canary = []
+
def visit(binary, l, r):
canary.append((binary.operator, l, r))
print(binary.operator, l, r)
@@ -206,8 +223,8 @@ class BinaryEndpointTraversalTest(fixtures.TestBase):
expr = tuple_(
a, b, b1 == tuple_(b1a, b1b == d), c
) > tuple_(
- func.go(e + f)
- )
+ func.go(e + f)
+ )
self._assert_traversal(
expr,
[
@@ -265,7 +282,9 @@ class BinaryEndpointTraversalTest(fixtures.TestBase):
]
)
+
class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
+
"""test copy-in-place behavior of various ClauseElements."""
__dialect__ = 'default'
@@ -274,19 +293,19 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
def setup_class(cls):
global t1, t2, t3
t1 = table("table1",
- column("col1"),
- column("col2"),
- column("col3"),
- )
+ column("col1"),
+ column("col2"),
+ column("col3"),
+ )
t2 = table("table2",
- column("col1"),
- column("col2"),
- column("col3"),
- )
+ column("col1"),
+ column("col2"),
+ column("col3"),
+ )
t3 = Table('table3', MetaData(),
- Column('col1', Integer),
- Column('col2', Integer)
- )
+ Column('col1', Integer),
+ Column('col2', Integer)
+ )
def test_binary(self):
clause = t1.c.col2 == t2.c.col2
@@ -295,18 +314,19 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
def test_binary_anon_label_quirk(self):
t = table('t1', column('col1'))
-
f = t.c.col1 * 5
self.assert_compile(select([f]),
- "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1")
+ "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1")
f.anon_label
a = t.alias()
f = sql_util.ClauseAdapter(a).traverse(f)
- self.assert_compile(select([f]),
- "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1")
+ self.assert_compile(
+ select(
+ [f]),
+ "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1")
def test_join(self):
clause = t1.join(t2, t1.c.col2 == t2.c.col2)
@@ -314,6 +334,7 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
assert str(clause) == str(CloningVisitor().traverse(clause))
class Vis(CloningVisitor):
+
def visit_binary(self, binary):
binary.right = t2.c.col3
@@ -422,10 +443,12 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
def test_text(self):
clause = text(
- "select * from table where foo=:bar",
- bindparams=[bindparam('bar')])
+ "select * from table where foo=:bar",
+ bindparams=[bindparam('bar')])
c1 = str(clause)
+
class Vis(CloningVisitor):
+
def visit_textclause(self, text):
text.text = text.text + " SOME MODIFIER=:lala"
text._bindparams['lala'] = bindparam('lala')
@@ -440,7 +463,9 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
s2 = select([t1])
s2_assert = str(s2)
s3_assert = str(select([t1], t1.c.col2 == 7))
+
class Vis(CloningVisitor):
+
def visit_select(self, select):
select.append_whereclause(t1.c.col2 == 7)
s3 = Vis().traverse(s2)
@@ -448,14 +473,18 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
assert str(s2) == s2_assert
print(str(s2))
print(str(s3))
+
class Vis(ClauseVisitor):
+
def visit_select(self, select):
select.append_whereclause(t1.c.col2 == 7)
Vis().traverse(s2)
assert str(s2) == s3_assert
s4_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col3 == 9)))
+
class Vis(CloningVisitor):
+
def visit_select(self, select):
select.append_whereclause(t1.c.col3 == 9)
s4 = Vis().traverse(s3)
@@ -465,7 +494,9 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
assert str(s3) == s3_assert
s5_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col1 == 9)))
+
class Vis(CloningVisitor):
+
def visit_binary(self, binary):
if binary.left is t1.c.col3:
binary.left = t1.c.col1
@@ -495,8 +526,8 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
u2 = u.params(id_param=7)
u3 = u.params(id_param=10)
assert str(u) == str(u2) == str(u3)
- assert u2.compile().params == {'id_param':7}
- assert u3.compile().params == {'id_param':10}
+ assert u2.compile().params == {'id_param': 7}
+ assert u3.compile().params == {'id_param': 10}
def test_in(self):
expr = t1.c.col1.in_(['foo', 'bar'])
@@ -510,9 +541,9 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
def test_adapt_union(self):
u = union(
- t1.select().where(t1.c.col1 == 4),
- t1.select().where(t1.c.col1 == 5)
- ).alias()
+ t1.select().where(t1.c.col1 == 4),
+ t1.select().where(t1.c.col1 == 5)
+ ).alias()
assert sql_util.ClauseAdapter(u).traverse(t1) is u
@@ -524,8 +555,8 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
s2 = CloningVisitor().traverse(s).alias()
s3 = select([s], s.c.col2 == s2.c.col2)
- self.assert_compile(s3,
- "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM "
+ self.assert_compile(
+ s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM "
"(SELECT table1.col1 AS col1, table1.col2 AS col2, "
"table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_1) "
"AS anon_1, "
@@ -536,8 +567,8 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
s = select([t1], t1.c.col1 == 4).alias()
s2 = CloningVisitor().traverse(s).alias()
s3 = select([s], s.c.col2 == s2.c.col2)
- self.assert_compile(s3,
- "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM "
+ self.assert_compile(
+ s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM "
"(SELECT table1.col1 AS col1, table1.col2 AS col2, "
"table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) "
"AS anon_1, "
@@ -547,25 +578,26 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
def test_extract(self):
s = select([extract('foo', t1.c.col1).label('col1')])
- self.assert_compile(s,
- "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1")
+ self.assert_compile(
+ s,
+ "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1")
s2 = CloningVisitor().traverse(s).alias()
s3 = select([s2.c.col1])
- self.assert_compile(s,
- "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1")
+ self.assert_compile(
+ s,
+ "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1")
self.assert_compile(s3,
- "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM "
- "table1.col1) AS col1 FROM table1) AS anon_1")
-
+ "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM "
+ "table1.col1) AS col1 FROM table1) AS anon_1")
@testing.emits_warning('.*replaced by another column with the same key')
def test_alias(self):
subq = t2.select().alias('subq')
s = select([t1.c.col1, subq.c.col1],
- from_obj=[t1, subq,
- t1.join(subq, t1.c.col1 == subq.c.col2)]
- )
+ from_obj=[t1, subq,
+ t1.join(subq, t1.c.col1 == subq.c.col2)]
+ )
orig = str(s)
s2 = CloningVisitor().traverse(s)
assert orig == str(s) == str(s2)
@@ -581,27 +613,28 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
subq = subq.alias('subq')
s = select([t1.c.col1, subq.c.col1],
- from_obj=[t1, subq,
- t1.join(subq, t1.c.col1 == subq.c.col2)]
- )
+ from_obj=[t1, subq,
+ t1.join(subq, t1.c.col1 == subq.c.col2)]
+ )
s5 = CloningVisitor().traverse(s)
assert orig == str(s) == str(s5)
def test_correlated_select(self):
s = select(['*'], t1.c.col1 == t2.c.col1,
- from_obj=[t1, t2]).correlate(t2)
+ from_obj=[t1, t2]).correlate(t2)
class Vis(CloningVisitor):
+
def visit_select(self, select):
select.append_whereclause(t1.c.col2 == 7)
self.assert_compile(
- select([t2]).where(t2.c.col1 == Vis().traverse(s)),
- "SELECT table2.col1, table2.col2, table2.col3 "
- "FROM table2 WHERE table2.col1 = "
- "(SELECT * FROM table1 WHERE table1.col1 = table2.col1 "
- "AND table1.col2 = :col2_1)"
- )
+ select([t2]).where(t2.c.col1 == Vis().traverse(s)),
+ "SELECT table2.col1, table2.col2, table2.col3 "
+ "FROM table2 WHERE table2.col1 = "
+ "(SELECT * FROM table1 WHERE table1.col1 = table2.col1 "
+ "AND table1.col2 = :col2_1)"
+ )
def test_this_thing(self):
s = select([t1]).where(t1.c.col1 == 'foo').alias()
@@ -626,44 +659,42 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
s = select([1], t1.c.col1 == t1a.c.col1, from_obj=t1a).correlate(t1a)
s = select([t1]).where(t1.c.col1 == s)
- self.assert_compile(s,
- "SELECT table1.col1, table1.col2, table1.col3 FROM table1 "
+ self.assert_compile(
+ s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1 "
"WHERE table1.col1 = "
"(SELECT 1 FROM table1, table1 AS table1_1 "
- "WHERE table1.col1 = table1_1.col1)"
- )
+ "WHERE table1.col1 = table1_1.col1)")
s = CloningVisitor().traverse(s)
- self.assert_compile(s,
- "SELECT table1.col1, table1.col2, table1.col3 FROM table1 "
+ self.assert_compile(
+ s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1 "
"WHERE table1.col1 = "
"(SELECT 1 FROM table1, table1 AS table1_1 "
- "WHERE table1.col1 = table1_1.col1)")
+ "WHERE table1.col1 = table1_1.col1)")
def test_select_fromtwice_two(self):
s = select([t1]).where(t1.c.col1 == 'foo').alias()
s2 = select([1], t1.c.col1 == s.c.col1, from_obj=s).correlate(t1)
s3 = select([t1]).where(t1.c.col1 == s2)
- self.assert_compile(s3,
- "SELECT table1.col1, table1.col2, table1.col3 "
- "FROM table1 WHERE table1.col1 = "
- "(SELECT 1 FROM "
- "(SELECT table1.col1 AS col1, table1.col2 AS col2, "
- "table1.col3 AS col3 FROM table1 "
- "WHERE table1.col1 = :col1_1) "
- "AS anon_1 WHERE table1.col1 = anon_1.col1)"
- )
+ self.assert_compile(
+ s3, "SELECT table1.col1, table1.col2, table1.col3 "
+ "FROM table1 WHERE table1.col1 = "
+ "(SELECT 1 FROM "
+ "(SELECT table1.col1 AS col1, table1.col2 AS col2, "
+ "table1.col3 AS col3 FROM table1 "
+ "WHERE table1.col1 = :col1_1) "
+ "AS anon_1 WHERE table1.col1 = anon_1.col1)")
s4 = ReplacingCloningVisitor().traverse(s3)
- self.assert_compile(s4,
- "SELECT table1.col1, table1.col2, table1.col3 "
- "FROM table1 WHERE table1.col1 = "
- "(SELECT 1 FROM "
- "(SELECT table1.col1 AS col1, table1.col2 AS col2, "
- "table1.col3 AS col3 FROM table1 "
- "WHERE table1.col1 = :col1_1) "
- "AS anon_1 WHERE table1.col1 = anon_1.col1)"
- )
+ self.assert_compile(
+ s4, "SELECT table1.col1, table1.col2, table1.col3 "
+ "FROM table1 WHERE table1.col1 = "
+ "(SELECT 1 FROM "
+ "(SELECT table1.col1 AS col1, table1.col2 AS col2, "
+ "table1.col3 AS col3 FROM table1 "
+ "WHERE table1.col1 = :col1_1) "
+ "AS anon_1 WHERE table1.col1 = anon_1.col1)")
+
class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
@@ -672,15 +703,15 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
def setup_class(cls):
global t1, t2
t1 = table("table1",
- column("col1"),
- column("col2"),
- column("col3"),
- )
+ column("col1"),
+ column("col2"),
+ column("col3"),
+ )
t2 = table("table2",
- column("col1"),
- column("col2"),
- column("col3"),
- )
+ column("col1"),
+ column("col2"),
+ column("col3"),
+ )
def test_correlation_on_clone(self):
t1alias = t1.alias('t1alias')
@@ -698,9 +729,9 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
s = vis.traverse(s)
assert t2alias not in s._froms # not present because it's been
- # cloned
+ # cloned
assert t1alias in s._froms # present because the adapter placed
- # it there
+ # it there
# correlate list on "s" needs to take into account the full
# _cloned_set for each element in _froms when correlating
@@ -710,7 +741,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
't2alias.col1 = (SELECT * FROM table1 AS '
't1alias)')
s = select(['*'], from_obj=[t1alias,
- t2alias]).correlate(t2alias).as_scalar()
+ t2alias]).correlate(t2alias).as_scalar()
self.assert_compile(select(['*'], t2alias.c.col1 == s),
'SELECT * FROM table2 AS t2alias WHERE '
't2alias.col1 = (SELECT * FROM table1 AS '
@@ -806,63 +837,76 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
t1alias = t1.alias('t1alias')
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(vis.traverse(select(['*'], t1.c.col1
- == t2.c.col2)),
+ == t2.c.col2)),
'SELECT * FROM table1 AS t1alias, table2 '
'WHERE t1alias.col1 = table2.col2')
def test_table_to_alias_5(self):
t1alias = t1.alias('t1alias')
vis = sql_util.ClauseAdapter(t1alias)
- self.assert_compile(vis.traverse(select(['*'], t1.c.col1
- == t2.c.col2, from_obj=[t1, t2])),
- 'SELECT * FROM table1 AS t1alias, table2 '
- 'WHERE t1alias.col1 = table2.col2')
+ self.assert_compile(
+ vis.traverse(
+ select(
+ ['*'],
+ t1.c.col1 == t2.c.col2,
+ from_obj=[
+ t1,
+ t2])),
+ 'SELECT * FROM table1 AS t1alias, table2 '
+ 'WHERE t1alias.col1 = table2.col2')
def test_table_to_alias_6(self):
t1alias = t1.alias('t1alias')
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
- select([t1alias, t2]).where(t1alias.c.col1 ==
- vis.traverse(select(['*'],
- t1.c.col1 == t2.c.col2,
- from_obj=[t1, t2]).correlate(t1))),
- "SELECT t1alias.col1, t1alias.col2, t1alias.col3, "
- "table2.col1, table2.col2, table2.col3 "
- "FROM table1 AS t1alias, table2 WHERE t1alias.col1 = "
- "(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)"
- )
+ select([t1alias, t2]).where(
+ t1alias.c.col1 == vis.traverse(
+ select(['*'], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]).
+ correlate(t1)
+ )
+ ),
+ "SELECT t1alias.col1, t1alias.col2, t1alias.col3, "
+ "table2.col1, table2.col2, table2.col3 "
+ "FROM table1 AS t1alias, table2 WHERE t1alias.col1 = "
+ "(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)"
+ )
def test_table_to_alias_7(self):
t1alias = t1.alias('t1alias')
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
- select([t1alias, t2]).where(t1alias.c.col1 ==
- vis.traverse(select(['*'],
- t1.c.col1 == t2.c.col2,
- from_obj=[t1, t2]).correlate(t2))),
- "SELECT t1alias.col1, t1alias.col2, t1alias.col3, "
- "table2.col1, table2.col2, table2.col3 "
- "FROM table1 AS t1alias, table2 "
- "WHERE t1alias.col1 = "
- "(SELECT * FROM table1 AS t1alias "
- "WHERE t1alias.col1 = table2.col2)")
+ select([t1alias, t2]).
+ where(t1alias.c.col1 == vis.traverse(
+ select(['*'], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]).
+ correlate(t2))),
+ "SELECT t1alias.col1, t1alias.col2, t1alias.col3, "
+ "table2.col1, table2.col2, table2.col3 "
+ "FROM table1 AS t1alias, table2 "
+ "WHERE t1alias.col1 = "
+ "(SELECT * FROM table1 AS t1alias "
+ "WHERE t1alias.col1 = table2.col2)")
def test_table_to_alias_8(self):
t1alias = t1.alias('t1alias')
vis = sql_util.ClauseAdapter(t1alias)
- self.assert_compile(vis.traverse(case([(t1.c.col1 == 5,
- t1.c.col2)], else_=t1.c.col1)),
- 'CASE WHEN (t1alias.col1 = :col1_1) THEN '
- 't1alias.col2 ELSE t1alias.col1 END')
+ self.assert_compile(
+ vis.traverse(case([(t1.c.col1 == 5, t1.c.col2)], else_=t1.c.col1)),
+ 'CASE WHEN (t1alias.col1 = :col1_1) THEN '
+ 't1alias.col2 ELSE t1alias.col1 END')
def test_table_to_alias_9(self):
t1alias = t1.alias('t1alias')
vis = sql_util.ClauseAdapter(t1alias)
- self.assert_compile(vis.traverse(case([(5, t1.c.col2)],
- value=t1.c.col1, else_=t1.c.col1)),
- 'CASE t1alias.col1 WHEN :param_1 THEN '
- 't1alias.col2 ELSE t1alias.col1 END')
-
+ self.assert_compile(
+ vis.traverse(
+ case(
+ [
+ (5,
+ t1.c.col2)],
+ value=t1.c.col1,
+ else_=t1.c.col1)),
+ 'CASE t1alias.col1 WHEN :param_1 THEN '
+ 't1alias.col2 ELSE t1alias.col1 END')
def test_table_to_alias_10(self):
s = select(['*'], from_obj=[t1]).alias('foo')
@@ -893,7 +937,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
'table1 AS t1alias')
assert list(_from_objects(ff)) == [t1alias]
- #def test_table_to_alias_2(self):
+ # def test_table_to_alias_2(self):
# TODO: self.assert_compile(vis.traverse(select([func.count(t1.c
# .col1).l abel('foo')]), clone=True), "SELECT
# count(t1alias.col1) AS foo FROM table1 AS t1alias")
@@ -904,7 +948,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
t2alias = t2.alias('t2alias')
vis.chain(sql_util.ClauseAdapter(t2alias))
self.assert_compile(vis.traverse(select(['*'], t1.c.col1
- == t2.c.col2)),
+ == t2.c.col2)),
'SELECT * FROM table1 AS t1alias, table2 '
'AS t2alias WHERE t1alias.col1 = '
't2alias.col2')
@@ -914,11 +958,17 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
vis = sql_util.ClauseAdapter(t1alias)
t2alias = t2.alias('t2alias')
vis.chain(sql_util.ClauseAdapter(t2alias))
- self.assert_compile(vis.traverse(select(['*'], t1.c.col1
- == t2.c.col2, from_obj=[t1, t2])),
- 'SELECT * FROM table1 AS t1alias, table2 '
- 'AS t2alias WHERE t1alias.col1 = '
- 't2alias.col2')
+ self.assert_compile(
+ vis.traverse(
+ select(
+ ['*'],
+ t1.c.col1 == t2.c.col2,
+ from_obj=[
+ t1,
+ t2])),
+ 'SELECT * FROM table1 AS t1alias, table2 '
+ 'AS t2alias WHERE t1alias.col1 = '
+ 't2alias.col2')
def test_table_to_alias_16(self):
t1alias = t1.alias('t1alias')
@@ -927,18 +977,18 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
vis.chain(sql_util.ClauseAdapter(t2alias))
self.assert_compile(
select([t1alias, t2alias]).where(
- t1alias.c.col1 ==
- vis.traverse(select(['*'],
- t1.c.col1 == t2.c.col2,
- from_obj=[t1, t2]).correlate(t1))
- ),
- "SELECT t1alias.col1, t1alias.col2, t1alias.col3, "
- "t2alias.col1, t2alias.col2, t2alias.col3 "
- "FROM table1 AS t1alias, table2 AS t2alias "
- "WHERE t1alias.col1 = "
- "(SELECT * FROM table2 AS t2alias "
- "WHERE t1alias.col1 = t2alias.col2)"
- )
+ t1alias.c.col1 ==
+ vis.traverse(select(['*'],
+ t1.c.col1 == t2.c.col2,
+ from_obj=[t1, t2]).correlate(t1))
+ ),
+ "SELECT t1alias.col1, t1alias.col2, t1alias.col3, "
+ "t2alias.col1, t2alias.col2, t2alias.col3 "
+ "FROM table1 AS t1alias, table2 AS t2alias "
+ "WHERE t1alias.col1 = "
+ "(SELECT * FROM table2 AS t2alias "
+ "WHERE t1alias.col1 = t2alias.col2)"
+ )
def test_table_to_alias_17(self):
t1alias = t1.alias('t1alias')
@@ -946,31 +996,35 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
t2alias = t2.alias('t2alias')
vis.chain(sql_util.ClauseAdapter(t2alias))
self.assert_compile(
- t2alias.select().where(t2alias.c.col2 ==
- vis.traverse(select(['*'],
+ t2alias.select().where(
+ t2alias.c.col2 == vis.traverse(
+ select(
+ ['*'],
t1.c.col1 == t2.c.col2,
- from_obj=[t1, t2]).correlate(t2))),
- 'SELECT t2alias.col1, t2alias.col2, t2alias.col3 '
- 'FROM table2 AS t2alias WHERE t2alias.col2 = '
- '(SELECT * FROM table1 AS t1alias WHERE '
- 't1alias.col1 = t2alias.col2)')
+ from_obj=[
+ t1,
+ t2]).correlate(t2))),
+ 'SELECT t2alias.col1, t2alias.col2, t2alias.col3 '
+ 'FROM table2 AS t2alias WHERE t2alias.col2 = '
+ '(SELECT * FROM table1 AS t1alias WHERE '
+ 't1alias.col1 = t2alias.col2)')
def test_include_exclude(self):
m = MetaData()
a = Table('a', m,
- Column('id', Integer, primary_key=True),
- Column('xxx_id', Integer,
- ForeignKey('a.id', name='adf', use_alter=True)
- )
- )
+ Column('id', Integer, primary_key=True),
+ Column('xxx_id', Integer,
+ ForeignKey('a.id', name='adf', use_alter=True)
+ )
+ )
e = (a.c.id == a.c.xxx_id)
assert str(e) == "a.id = a.xxx_id"
b = a.alias()
- e = sql_util.ClauseAdapter( b, include= set([ a.c.id ]),
- equivalents= { a.c.id: set([ a.c.id]) }
- ).traverse( e)
+ e = sql_util.ClauseAdapter(b, include=set([a.c.id]),
+ equivalents={a.c.id: set([a.c.id])}
+ ).traverse(e)
assert str(e) == "a_1.id = a.xxx_id"
@@ -983,8 +1037,8 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
# force a recursion overflow, by linking a.c.x<->c.c.x, and
# asking for a nonexistent col. corresponding_column should prevent
# endless depth.
- adapt = sql_util.ClauseAdapter(b,
- equivalents={a.c.x: set([c.c.x]), c.c.x: set([a.c.x])})
+ adapt = sql_util.ClauseAdapter(
+ b, equivalents={a.c.x: set([c.c.x]), c.c.x: set([a.c.x])})
assert adapt._corresponding_column(a.c.x, False) is None
def test_multilevel_equivalents(self):
@@ -997,28 +1051,28 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
# two levels of indirection from c.x->b.x->a.x, requires recursive
# corresponding_column call
- adapt = sql_util.ClauseAdapter(alias,
- equivalents={b.c.x: set([a.c.x]), c.c.x: set([b.c.x])})
+ adapt = sql_util.ClauseAdapter(
+ alias, equivalents={b.c.x: set([a.c.x]), c.c.x: set([b.c.x])})
assert adapt._corresponding_column(a.c.x, False) is alias.c.x
assert adapt._corresponding_column(c.c.x, False) is alias.c.x
def test_join_to_alias(self):
metadata = MetaData()
a = Table('a', metadata,
- Column('id', Integer, primary_key=True))
+ Column('id', Integer, primary_key=True))
b = Table('b', metadata,
- Column('id', Integer, primary_key=True),
- Column('aid', Integer, ForeignKey('a.id')),
- )
+ Column('id', Integer, primary_key=True),
+ Column('aid', Integer, ForeignKey('a.id')),
+ )
c = Table('c', metadata,
- Column('id', Integer, primary_key=True),
- Column('bid', Integer, ForeignKey('b.id')),
- )
+ Column('id', Integer, primary_key=True),
+ Column('bid', Integer, ForeignKey('b.id')),
+ )
d = Table('d', metadata,
- Column('id', Integer, primary_key=True),
- Column('aid', Integer, ForeignKey('a.id')),
- )
+ Column('id', Integer, primary_key=True),
+ Column('aid', Integer, ForeignKey('a.id')),
+ )
j1 = a.outerjoin(b)
j2 = select([j1], use_labels=True)
@@ -1054,7 +1108,6 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
assert not t1.is_derived_from(select([t1]))
assert t1.alias().is_derived_from(t1)
-
s1 = select([t1, t2]).alias('foo')
s2 = select([s1]).limit(5).offset(10).alias()
assert s2.is_derived_from(s1)
@@ -1108,13 +1161,13 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
'LIMIT :param_1 OFFSET :param_2) AS anon_1 '
'LEFT OUTER JOIN table1 AS bar ON '
'anon_1.col1 = bar.col1', {'param_1': 5,
- 'param_2': 10})
+ 'param_2': 10})
def test_functions(self):
self.assert_compile(
- sql_util.ClauseAdapter(t1.alias()).\
- traverse(func.count(t1.c.col1)),
- 'count(table1_1.col1)')
+ sql_util.ClauseAdapter(t1.alias()).
+ traverse(func.count(t1.c.col1)),
+ 'count(table1_1.col1)')
s = select([func.count(t1.c.col1)])
self.assert_compile(sql_util.ClauseAdapter(t1.alias()).traverse(s),
'SELECT count(table1_1.col1) AS count_1 '
@@ -1123,20 +1176,20 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
def test_recursive(self):
metadata = MetaData()
a = Table('a', metadata,
- Column('id', Integer, primary_key=True))
+ Column('id', Integer, primary_key=True))
b = Table('b', metadata,
- Column('id', Integer, primary_key=True),
- Column('aid', Integer, ForeignKey('a.id')),
- )
+ Column('id', Integer, primary_key=True),
+ Column('aid', Integer, ForeignKey('a.id')),
+ )
c = Table('c', metadata,
- Column('id', Integer, primary_key=True),
- Column('bid', Integer, ForeignKey('b.id')),
- )
+ Column('id', Integer, primary_key=True),
+ Column('bid', Integer, ForeignKey('b.id')),
+ )
d = Table('d', metadata,
- Column('id', Integer, primary_key=True),
- Column('aid', Integer, ForeignKey('a.id')),
- )
+ Column('id', Integer, primary_key=True),
+ Column('aid', Integer, ForeignKey('a.id')),
+ )
u = union(
a.join(b).select().apply_labels(),
@@ -1144,9 +1197,9 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
).alias()
self.assert_compile(
- sql_util.ClauseAdapter(u).\
- traverse(select([c.c.bid]).where(c.c.bid == u.c.b_aid)),
- "SELECT c.bid "\
+ sql_util.ClauseAdapter(u).
+ traverse(select([c.c.bid]).where(c.c.bid == u.c.b_aid)),
+ "SELECT c.bid "
"FROM c, (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid "
"FROM a JOIN b ON a.id = b.aid UNION SELECT a.id AS a_id, d.id "
"AS d_id, d.aid AS d_aid "
@@ -1154,6 +1207,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
"WHERE c.bid = anon_1.b_aid"
)
+
class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
@@ -1165,13 +1219,19 @@ class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL):
return table(name, column('col1'), column('col2'),
column('col3'))
- table1, table2, table3, table4 = [_table(name) for name in
- ('table1', 'table2', 'table3', 'table4')]
+ table1, table2, table3, table4 = [
+ _table(name) for name in (
+ 'table1', 'table2', 'table3', 'table4')]
def test_splice(self):
t1, t2, t3, t4 = table1, table2, table1.alias(), table2.alias()
- j = t1.join(t2, t1.c.col1 == t2.c.col1).join(t3, t2.c.col1
- == t3.c.col1).join(t4, t4.c.col1 == t1.c.col1)
+ j = t1.join(
+ t2,
+ t1.c.col1 == t2.c.col1).join(
+ t3,
+ t2.c.col1 == t3.c.col1).join(
+ t4,
+ t4.c.col1 == t1.c.col1)
s = select([t1]).where(t1.c.col2 < 5).alias()
self.assert_compile(sql_util.splice_joins(s, j),
'(SELECT table1.col1 AS col1, table1.col2 '
@@ -1204,8 +1264,11 @@ class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL):
def test_splice_2(self):
t2a = table2.alias()
t3a = table3.alias()
- j1 = table1.join(t2a, table1.c.col1 == t2a.c.col1).join(t3a,
- t2a.c.col2 == t3a.c.col2)
+ j1 = table1.join(
+ t2a,
+ table1.c.col1 == t2a.c.col1).join(
+ t3a,
+ t2a.c.col2 == t3a.c.col2)
t2b = table4.alias()
j2 = table1.join(t2b, table1.c.col3 == t2b.c.col3)
self.assert_compile(sql_util.splice_joins(table1, j1),
@@ -1216,16 +1279,21 @@ class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(sql_util.splice_joins(table1, j2),
'table1 JOIN table4 AS table4_1 ON '
'table1.col3 = table4_1.col3')
- self.assert_compile(sql_util.splice_joins(sql_util.splice_joins(table1,
- j1), j2),
- 'table1 JOIN table2 AS table2_1 ON '
- 'table1.col1 = table2_1.col1 JOIN table3 '
- 'AS table3_1 ON table2_1.col2 = '
- 'table3_1.col2 JOIN table4 AS table4_1 ON '
- 'table1.col3 = table4_1.col3')
+ self.assert_compile(
+ sql_util.splice_joins(
+ sql_util.splice_joins(
+ table1,
+ j1),
+ j2),
+ 'table1 JOIN table2 AS table2_1 ON '
+ 'table1.col1 = table2_1.col1 JOIN table3 '
+ 'AS table3_1 ON table2_1.col2 = '
+ 'table3_1.col2 JOIN table4 AS table4_1 ON '
+ 'table1.col3 = table4_1.col3')
class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
+
"""tests the generative capability of Select"""
__dialect__ = 'default'
@@ -1234,16 +1302,15 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
def setup_class(cls):
global t1, t2
t1 = table("table1",
- column("col1"),
- column("col2"),
- column("col3"),
- )
+ column("col1"),
+ column("col2"),
+ column("col3"),
+ )
t2 = table("table2",
- column("col1"),
- column("col2"),
- column("col3"),
- )
-
+ column("col1"),
+ column("col2"),
+ column("col3"),
+ )
def test_columns(self):
s = t1.select()
@@ -1275,7 +1342,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
'SELECT table1.col1, table1.col2, '
'table1.col3 FROM table1')
-
def test_prefixes(self):
s = t1.select()
self.assert_compile(s,
@@ -1308,7 +1374,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
assert_raises(
exc.ArgumentError,
select().execution_options,
- isolation_level='READ_COMMITTED'
+ isolation_level='READ_COMMITTED'
)
# this feature not available yet
@@ -1325,7 +1391,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
s = text('select 42', execution_options=dict(foo='bar'))
assert s._execution_options == dict(foo='bar')
+
class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):
+
"""Tests the generative capability of Insert, Update"""
__dialect__ = 'default'
@@ -1336,15 +1404,15 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):
def setup_class(cls):
global t1, t2
t1 = table("table1",
- column("col1"),
- column("col2"),
- column("col3"),
- )
+ column("col1"),
+ column("col2"),
+ column("col3"),
+ )
t2 = table("table2",
- column("col1"),
- column("col2"),
- column("col3"),
- )
+ column("col1"),
+ column("col2"),
+ column("col3"),
+ )
def test_prefixes(self):
i = t1.insert()
@@ -1396,9 +1464,9 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):
eq_(i.parameters, None)
i = i.values([(5, 6, 7), (8, 9, 10)])
eq_(i.parameters, [
- {"col1": 5, "col2": 6, "col3": 7},
- {"col1": 8, "col2": 9, "col3": 10},
- ]
+ {"col1": 5, "col2": 6, "col3": 7},
+ {"col1": 8, "col2": 9, "col3": 10},
+ ]
)
def test_inline_values_single(self):