diff options
Diffstat (limited to 'test/sql/test_generative.py')
-rw-r--r-- | test/sql/test_generative.py | 520 |
1 files changed, 294 insertions, 226 deletions
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py index 8a366f757..51a8a77cc 100644 --- a/test/sql/test_generative.py +++ b/test/sql/test_generative.py @@ -1,6 +1,8 @@ -from sqlalchemy import * from sqlalchemy.sql import table, column, ClauseElement, operators -from sqlalchemy.sql.expression import _clone, _from_objects +from sqlalchemy.sql.expression import _clone, _from_objects +from sqlalchemy import func, select, Integer, Table, \ + Column, MetaData, extract, String, bindparam, tuple_, and_, union, text,\ + case, ForeignKey from sqlalchemy.testing import fixtures, AssertsExecutionResults, \ AssertsCompiledSQL from sqlalchemy import testing @@ -10,7 +12,11 @@ from sqlalchemy import exc from sqlalchemy.sql import util as sql_util from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message +A = B = t1 = t2 = t3 = table1 = table2 = table3 = table4 = None + + class TraversalTest(fixtures.TestBase, AssertsExecutionResults): + """test ClauseVisitor's traversal, particularly its ability to copy and modify a ClauseElement in place.""" @@ -83,7 +89,7 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): struct = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) struct2 = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) struct3 = B(a1, A("expr2"), B(A("expr1b"), - A("expr2bmodified")), A("expr3")) + A("expr2bmodified")), A("expr3")) assert a1.is_other(a1) assert struct.is_other(struct) @@ -94,11 +100,13 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): def test_clone(self): struct = B(A("expr1"), A("expr2"), B(A("expr1b"), - A("expr2b")), A("expr3")) + A("expr2b")), A("expr3")) class Vis(CloningVisitor): + def visit_a(self, a): pass + def visit_b(self, b): pass @@ -109,11 +117,13 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): def test_no_clone(self): struct = B(A("expr1"), A("expr2"), B(A("expr1b"), - A("expr2b")), A("expr3")) + A("expr2b")), A("expr3")) class Vis(ClauseVisitor): + def visit_a(self, a): pass + def visit_b(self, b): pass @@ -124,16 +134,18 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): def test_change_in_place(self): struct = B(A("expr1"), A("expr2"), B(A("expr1b"), - A("expr2b")), A("expr3")) + A("expr2b")), A("expr3")) struct2 = B(A("expr1"), A("expr2modified"), B(A("expr1b"), - A("expr2b")), A("expr3")) + A("expr2b")), A("expr3")) struct3 = B(A("expr1"), A("expr2"), B(A("expr1b"), - A("expr2bmodified")), A("expr3")) + A("expr2bmodified")), A("expr3")) class Vis(CloningVisitor): + def visit_a(self, a): if a.expr == "expr2": a.expr = "expr2modified" + def visit_b(self, b): pass @@ -144,9 +156,11 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): assert struct2 == s2 class Vis2(CloningVisitor): + def visit_a(self, a): if a.expr == "expr2b": a.expr = "expr2bmodified" + def visit_b(self, b): pass @@ -169,11 +183,14 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): set(ClauseVisitor().iterate(bin)) assert set(ClauseVisitor().iterate(bin)) == set([foo, bar, bin]) + class BinaryEndpointTraversalTest(fixtures.TestBase): + """test the special binary product visit""" def _assert_traversal(self, expr, expected): canary = [] + def visit(binary, l, r): canary.append((binary.operator, l, r)) print(binary.operator, l, r) @@ -206,8 +223,8 @@ class BinaryEndpointTraversalTest(fixtures.TestBase): expr = tuple_( a, b, b1 == tuple_(b1a, b1b == d), c ) > tuple_( - func.go(e + f) - ) + func.go(e + f) + ) self._assert_traversal( expr, [ @@ -265,7 +282,9 @@ class BinaryEndpointTraversalTest(fixtures.TestBase): ] ) + class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): + """test copy-in-place behavior of various ClauseElements.""" __dialect__ = 'default' @@ -274,19 +293,19 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): def setup_class(cls): global t1, t2, t3 t1 = table("table1", - column("col1"), - column("col2"), - column("col3"), - ) + column("col1"), + column("col2"), + column("col3"), + ) t2 = table("table2", - column("col1"), - column("col2"), - column("col3"), - ) + column("col1"), + column("col2"), + column("col3"), + ) t3 = Table('table3', MetaData(), - Column('col1', Integer), - Column('col2', Integer) - ) + Column('col1', Integer), + Column('col2', Integer) + ) def test_binary(self): clause = t1.c.col2 == t2.c.col2 @@ -295,18 +314,19 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): def test_binary_anon_label_quirk(self): t = table('t1', column('col1')) - f = t.c.col1 * 5 self.assert_compile(select([f]), - "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1") + "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1") f.anon_label a = t.alias() f = sql_util.ClauseAdapter(a).traverse(f) - self.assert_compile(select([f]), - "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1") + self.assert_compile( + select( + [f]), + "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1") def test_join(self): clause = t1.join(t2, t1.c.col2 == t2.c.col2) @@ -314,6 +334,7 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): assert str(clause) == str(CloningVisitor().traverse(clause)) class Vis(CloningVisitor): + def visit_binary(self, binary): binary.right = t2.c.col3 @@ -422,10 +443,12 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): def test_text(self): clause = text( - "select * from table where foo=:bar", - bindparams=[bindparam('bar')]) + "select * from table where foo=:bar", + bindparams=[bindparam('bar')]) c1 = str(clause) + class Vis(CloningVisitor): + def visit_textclause(self, text): text.text = text.text + " SOME MODIFIER=:lala" text._bindparams['lala'] = bindparam('lala') @@ -440,7 +463,9 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): s2 = select([t1]) s2_assert = str(s2) s3_assert = str(select([t1], t1.c.col2 == 7)) + class Vis(CloningVisitor): + def visit_select(self, select): select.append_whereclause(t1.c.col2 == 7) s3 = Vis().traverse(s2) @@ -448,14 +473,18 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): assert str(s2) == s2_assert print(str(s2)) print(str(s3)) + class Vis(ClauseVisitor): + def visit_select(self, select): select.append_whereclause(t1.c.col2 == 7) Vis().traverse(s2) assert str(s2) == s3_assert s4_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col3 == 9))) + class Vis(CloningVisitor): + def visit_select(self, select): select.append_whereclause(t1.c.col3 == 9) s4 = Vis().traverse(s3) @@ -465,7 +494,9 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): assert str(s3) == s3_assert s5_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col1 == 9))) + class Vis(CloningVisitor): + def visit_binary(self, binary): if binary.left is t1.c.col3: binary.left = t1.c.col1 @@ -495,8 +526,8 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): u2 = u.params(id_param=7) u3 = u.params(id_param=10) assert str(u) == str(u2) == str(u3) - assert u2.compile().params == {'id_param':7} - assert u3.compile().params == {'id_param':10} + assert u2.compile().params == {'id_param': 7} + assert u3.compile().params == {'id_param': 10} def test_in(self): expr = t1.c.col1.in_(['foo', 'bar']) @@ -510,9 +541,9 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): def test_adapt_union(self): u = union( - t1.select().where(t1.c.col1 == 4), - t1.select().where(t1.c.col1 == 5) - ).alias() + t1.select().where(t1.c.col1 == 4), + t1.select().where(t1.c.col1 == 5) + ).alias() assert sql_util.ClauseAdapter(u).traverse(t1) is u @@ -524,8 +555,8 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): s2 = CloningVisitor().traverse(s).alias() s3 = select([s], s.c.col2 == s2.c.col2) - self.assert_compile(s3, - "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM " + self.assert_compile( + s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM " "(SELECT table1.col1 AS col1, table1.col2 AS col2, " "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_1) " "AS anon_1, " @@ -536,8 +567,8 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): s = select([t1], t1.c.col1 == 4).alias() s2 = CloningVisitor().traverse(s).alias() s3 = select([s], s.c.col2 == s2.c.col2) - self.assert_compile(s3, - "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM " + self.assert_compile( + s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM " "(SELECT table1.col1 AS col1, table1.col2 AS col2, " "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) " "AS anon_1, " @@ -547,25 +578,26 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): def test_extract(self): s = select([extract('foo', t1.c.col1).label('col1')]) - self.assert_compile(s, - "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1") + self.assert_compile( + s, + "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1") s2 = CloningVisitor().traverse(s).alias() s3 = select([s2.c.col1]) - self.assert_compile(s, - "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1") + self.assert_compile( + s, + "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1") self.assert_compile(s3, - "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM " - "table1.col1) AS col1 FROM table1) AS anon_1") - + "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM " + "table1.col1) AS col1 FROM table1) AS anon_1") @testing.emits_warning('.*replaced by another column with the same key') def test_alias(self): subq = t2.select().alias('subq') s = select([t1.c.col1, subq.c.col1], - from_obj=[t1, subq, - t1.join(subq, t1.c.col1 == subq.c.col2)] - ) + from_obj=[t1, subq, + t1.join(subq, t1.c.col1 == subq.c.col2)] + ) orig = str(s) s2 = CloningVisitor().traverse(s) assert orig == str(s) == str(s2) @@ -581,27 +613,28 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): subq = subq.alias('subq') s = select([t1.c.col1, subq.c.col1], - from_obj=[t1, subq, - t1.join(subq, t1.c.col1 == subq.c.col2)] - ) + from_obj=[t1, subq, + t1.join(subq, t1.c.col1 == subq.c.col2)] + ) s5 = CloningVisitor().traverse(s) assert orig == str(s) == str(s5) def test_correlated_select(self): s = select(['*'], t1.c.col1 == t2.c.col1, - from_obj=[t1, t2]).correlate(t2) + from_obj=[t1, t2]).correlate(t2) class Vis(CloningVisitor): + def visit_select(self, select): select.append_whereclause(t1.c.col2 == 7) self.assert_compile( - select([t2]).where(t2.c.col1 == Vis().traverse(s)), - "SELECT table2.col1, table2.col2, table2.col3 " - "FROM table2 WHERE table2.col1 = " - "(SELECT * FROM table1 WHERE table1.col1 = table2.col1 " - "AND table1.col2 = :col2_1)" - ) + select([t2]).where(t2.c.col1 == Vis().traverse(s)), + "SELECT table2.col1, table2.col2, table2.col3 " + "FROM table2 WHERE table2.col1 = " + "(SELECT * FROM table1 WHERE table1.col1 = table2.col1 " + "AND table1.col2 = :col2_1)" + ) def test_this_thing(self): s = select([t1]).where(t1.c.col1 == 'foo').alias() @@ -626,44 +659,42 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): s = select([1], t1.c.col1 == t1a.c.col1, from_obj=t1a).correlate(t1a) s = select([t1]).where(t1.c.col1 == s) - self.assert_compile(s, - "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " + self.assert_compile( + s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " "WHERE table1.col1 = " "(SELECT 1 FROM table1, table1 AS table1_1 " - "WHERE table1.col1 = table1_1.col1)" - ) + "WHERE table1.col1 = table1_1.col1)") s = CloningVisitor().traverse(s) - self.assert_compile(s, - "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " + self.assert_compile( + s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " "WHERE table1.col1 = " "(SELECT 1 FROM table1, table1 AS table1_1 " - "WHERE table1.col1 = table1_1.col1)") + "WHERE table1.col1 = table1_1.col1)") def test_select_fromtwice_two(self): s = select([t1]).where(t1.c.col1 == 'foo').alias() s2 = select([1], t1.c.col1 == s.c.col1, from_obj=s).correlate(t1) s3 = select([t1]).where(t1.c.col1 == s2) - self.assert_compile(s3, - "SELECT table1.col1, table1.col2, table1.col3 " - "FROM table1 WHERE table1.col1 = " - "(SELECT 1 FROM " - "(SELECT table1.col1 AS col1, table1.col2 AS col2, " - "table1.col3 AS col3 FROM table1 " - "WHERE table1.col1 = :col1_1) " - "AS anon_1 WHERE table1.col1 = anon_1.col1)" - ) + self.assert_compile( + s3, "SELECT table1.col1, table1.col2, table1.col3 " + "FROM table1 WHERE table1.col1 = " + "(SELECT 1 FROM " + "(SELECT table1.col1 AS col1, table1.col2 AS col2, " + "table1.col3 AS col3 FROM table1 " + "WHERE table1.col1 = :col1_1) " + "AS anon_1 WHERE table1.col1 = anon_1.col1)") s4 = ReplacingCloningVisitor().traverse(s3) - self.assert_compile(s4, - "SELECT table1.col1, table1.col2, table1.col3 " - "FROM table1 WHERE table1.col1 = " - "(SELECT 1 FROM " - "(SELECT table1.col1 AS col1, table1.col2 AS col2, " - "table1.col3 AS col3 FROM table1 " - "WHERE table1.col1 = :col1_1) " - "AS anon_1 WHERE table1.col1 = anon_1.col1)" - ) + self.assert_compile( + s4, "SELECT table1.col1, table1.col2, table1.col3 " + "FROM table1 WHERE table1.col1 = " + "(SELECT 1 FROM " + "(SELECT table1.col1 AS col1, table1.col2 AS col2, " + "table1.col3 AS col3 FROM table1 " + "WHERE table1.col1 = :col1_1) " + "AS anon_1 WHERE table1.col1 = anon_1.col1)") + class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -672,15 +703,15 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): def setup_class(cls): global t1, t2 t1 = table("table1", - column("col1"), - column("col2"), - column("col3"), - ) + column("col1"), + column("col2"), + column("col3"), + ) t2 = table("table2", - column("col1"), - column("col2"), - column("col3"), - ) + column("col1"), + column("col2"), + column("col3"), + ) def test_correlation_on_clone(self): t1alias = t1.alias('t1alias') @@ -698,9 +729,9 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): s = vis.traverse(s) assert t2alias not in s._froms # not present because it's been - # cloned + # cloned assert t1alias in s._froms # present because the adapter placed - # it there + # it there # correlate list on "s" needs to take into account the full # _cloned_set for each element in _froms when correlating @@ -710,7 +741,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): 't2alias.col1 = (SELECT * FROM table1 AS ' 't1alias)') s = select(['*'], from_obj=[t1alias, - t2alias]).correlate(t2alias).as_scalar() + t2alias]).correlate(t2alias).as_scalar() self.assert_compile(select(['*'], t2alias.c.col1 == s), 'SELECT * FROM table2 AS t2alias WHERE ' 't2alias.col1 = (SELECT * FROM table1 AS ' @@ -806,63 +837,76 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): t1alias = t1.alias('t1alias') vis = sql_util.ClauseAdapter(t1alias) self.assert_compile(vis.traverse(select(['*'], t1.c.col1 - == t2.c.col2)), + == t2.c.col2)), 'SELECT * FROM table1 AS t1alias, table2 ' 'WHERE t1alias.col1 = table2.col2') def test_table_to_alias_5(self): t1alias = t1.alias('t1alias') vis = sql_util.ClauseAdapter(t1alias) - self.assert_compile(vis.traverse(select(['*'], t1.c.col1 - == t2.c.col2, from_obj=[t1, t2])), - 'SELECT * FROM table1 AS t1alias, table2 ' - 'WHERE t1alias.col1 = table2.col2') + self.assert_compile( + vis.traverse( + select( + ['*'], + t1.c.col1 == t2.c.col2, + from_obj=[ + t1, + t2])), + 'SELECT * FROM table1 AS t1alias, table2 ' + 'WHERE t1alias.col1 = table2.col2') def test_table_to_alias_6(self): t1alias = t1.alias('t1alias') vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( - select([t1alias, t2]).where(t1alias.c.col1 == - vis.traverse(select(['*'], - t1.c.col1 == t2.c.col2, - from_obj=[t1, t2]).correlate(t1))), - "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " - "table2.col1, table2.col2, table2.col3 " - "FROM table1 AS t1alias, table2 WHERE t1alias.col1 = " - "(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)" - ) + select([t1alias, t2]).where( + t1alias.c.col1 == vis.traverse( + select(['*'], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]). + correlate(t1) + ) + ), + "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " + "table2.col1, table2.col2, table2.col3 " + "FROM table1 AS t1alias, table2 WHERE t1alias.col1 = " + "(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)" + ) def test_table_to_alias_7(self): t1alias = t1.alias('t1alias') vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( - select([t1alias, t2]).where(t1alias.c.col1 == - vis.traverse(select(['*'], - t1.c.col1 == t2.c.col2, - from_obj=[t1, t2]).correlate(t2))), - "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " - "table2.col1, table2.col2, table2.col3 " - "FROM table1 AS t1alias, table2 " - "WHERE t1alias.col1 = " - "(SELECT * FROM table1 AS t1alias " - "WHERE t1alias.col1 = table2.col2)") + select([t1alias, t2]). + where(t1alias.c.col1 == vis.traverse( + select(['*'], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]). + correlate(t2))), + "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " + "table2.col1, table2.col2, table2.col3 " + "FROM table1 AS t1alias, table2 " + "WHERE t1alias.col1 = " + "(SELECT * FROM table1 AS t1alias " + "WHERE t1alias.col1 = table2.col2)") def test_table_to_alias_8(self): t1alias = t1.alias('t1alias') vis = sql_util.ClauseAdapter(t1alias) - self.assert_compile(vis.traverse(case([(t1.c.col1 == 5, - t1.c.col2)], else_=t1.c.col1)), - 'CASE WHEN (t1alias.col1 = :col1_1) THEN ' - 't1alias.col2 ELSE t1alias.col1 END') + self.assert_compile( + vis.traverse(case([(t1.c.col1 == 5, t1.c.col2)], else_=t1.c.col1)), + 'CASE WHEN (t1alias.col1 = :col1_1) THEN ' + 't1alias.col2 ELSE t1alias.col1 END') def test_table_to_alias_9(self): t1alias = t1.alias('t1alias') vis = sql_util.ClauseAdapter(t1alias) - self.assert_compile(vis.traverse(case([(5, t1.c.col2)], - value=t1.c.col1, else_=t1.c.col1)), - 'CASE t1alias.col1 WHEN :param_1 THEN ' - 't1alias.col2 ELSE t1alias.col1 END') - + self.assert_compile( + vis.traverse( + case( + [ + (5, + t1.c.col2)], + value=t1.c.col1, + else_=t1.c.col1)), + 'CASE t1alias.col1 WHEN :param_1 THEN ' + 't1alias.col2 ELSE t1alias.col1 END') def test_table_to_alias_10(self): s = select(['*'], from_obj=[t1]).alias('foo') @@ -893,7 +937,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): 'table1 AS t1alias') assert list(_from_objects(ff)) == [t1alias] - #def test_table_to_alias_2(self): + # def test_table_to_alias_2(self): # TODO: self.assert_compile(vis.traverse(select([func.count(t1.c # .col1).l abel('foo')]), clone=True), "SELECT # count(t1alias.col1) AS foo FROM table1 AS t1alias") @@ -904,7 +948,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): t2alias = t2.alias('t2alias') vis.chain(sql_util.ClauseAdapter(t2alias)) self.assert_compile(vis.traverse(select(['*'], t1.c.col1 - == t2.c.col2)), + == t2.c.col2)), 'SELECT * FROM table1 AS t1alias, table2 ' 'AS t2alias WHERE t1alias.col1 = ' 't2alias.col2') @@ -914,11 +958,17 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): vis = sql_util.ClauseAdapter(t1alias) t2alias = t2.alias('t2alias') vis.chain(sql_util.ClauseAdapter(t2alias)) - self.assert_compile(vis.traverse(select(['*'], t1.c.col1 - == t2.c.col2, from_obj=[t1, t2])), - 'SELECT * FROM table1 AS t1alias, table2 ' - 'AS t2alias WHERE t1alias.col1 = ' - 't2alias.col2') + self.assert_compile( + vis.traverse( + select( + ['*'], + t1.c.col1 == t2.c.col2, + from_obj=[ + t1, + t2])), + 'SELECT * FROM table1 AS t1alias, table2 ' + 'AS t2alias WHERE t1alias.col1 = ' + 't2alias.col2') def test_table_to_alias_16(self): t1alias = t1.alias('t1alias') @@ -927,18 +977,18 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): vis.chain(sql_util.ClauseAdapter(t2alias)) self.assert_compile( select([t1alias, t2alias]).where( - t1alias.c.col1 == - vis.traverse(select(['*'], - t1.c.col1 == t2.c.col2, - from_obj=[t1, t2]).correlate(t1)) - ), - "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " - "t2alias.col1, t2alias.col2, t2alias.col3 " - "FROM table1 AS t1alias, table2 AS t2alias " - "WHERE t1alias.col1 = " - "(SELECT * FROM table2 AS t2alias " - "WHERE t1alias.col1 = t2alias.col2)" - ) + t1alias.c.col1 == + vis.traverse(select(['*'], + t1.c.col1 == t2.c.col2, + from_obj=[t1, t2]).correlate(t1)) + ), + "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " + "t2alias.col1, t2alias.col2, t2alias.col3 " + "FROM table1 AS t1alias, table2 AS t2alias " + "WHERE t1alias.col1 = " + "(SELECT * FROM table2 AS t2alias " + "WHERE t1alias.col1 = t2alias.col2)" + ) def test_table_to_alias_17(self): t1alias = t1.alias('t1alias') @@ -946,31 +996,35 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): t2alias = t2.alias('t2alias') vis.chain(sql_util.ClauseAdapter(t2alias)) self.assert_compile( - t2alias.select().where(t2alias.c.col2 == - vis.traverse(select(['*'], + t2alias.select().where( + t2alias.c.col2 == vis.traverse( + select( + ['*'], t1.c.col1 == t2.c.col2, - from_obj=[t1, t2]).correlate(t2))), - 'SELECT t2alias.col1, t2alias.col2, t2alias.col3 ' - 'FROM table2 AS t2alias WHERE t2alias.col2 = ' - '(SELECT * FROM table1 AS t1alias WHERE ' - 't1alias.col1 = t2alias.col2)') + from_obj=[ + t1, + t2]).correlate(t2))), + 'SELECT t2alias.col1, t2alias.col2, t2alias.col3 ' + 'FROM table2 AS t2alias WHERE t2alias.col2 = ' + '(SELECT * FROM table1 AS t1alias WHERE ' + 't1alias.col1 = t2alias.col2)') def test_include_exclude(self): m = MetaData() a = Table('a', m, - Column('id', Integer, primary_key=True), - Column('xxx_id', Integer, - ForeignKey('a.id', name='adf', use_alter=True) - ) - ) + Column('id', Integer, primary_key=True), + Column('xxx_id', Integer, + ForeignKey('a.id', name='adf', use_alter=True) + ) + ) e = (a.c.id == a.c.xxx_id) assert str(e) == "a.id = a.xxx_id" b = a.alias() - e = sql_util.ClauseAdapter( b, include= set([ a.c.id ]), - equivalents= { a.c.id: set([ a.c.id]) } - ).traverse( e) + e = sql_util.ClauseAdapter(b, include=set([a.c.id]), + equivalents={a.c.id: set([a.c.id])} + ).traverse(e) assert str(e) == "a_1.id = a.xxx_id" @@ -983,8 +1037,8 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): # force a recursion overflow, by linking a.c.x<->c.c.x, and # asking for a nonexistent col. corresponding_column should prevent # endless depth. - adapt = sql_util.ClauseAdapter(b, - equivalents={a.c.x: set([c.c.x]), c.c.x: set([a.c.x])}) + adapt = sql_util.ClauseAdapter( + b, equivalents={a.c.x: set([c.c.x]), c.c.x: set([a.c.x])}) assert adapt._corresponding_column(a.c.x, False) is None def test_multilevel_equivalents(self): @@ -997,28 +1051,28 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): # two levels of indirection from c.x->b.x->a.x, requires recursive # corresponding_column call - adapt = sql_util.ClauseAdapter(alias, - equivalents={b.c.x: set([a.c.x]), c.c.x: set([b.c.x])}) + adapt = sql_util.ClauseAdapter( + alias, equivalents={b.c.x: set([a.c.x]), c.c.x: set([b.c.x])}) assert adapt._corresponding_column(a.c.x, False) is alias.c.x assert adapt._corresponding_column(c.c.x, False) is alias.c.x def test_join_to_alias(self): metadata = MetaData() a = Table('a', metadata, - Column('id', Integer, primary_key=True)) + Column('id', Integer, primary_key=True)) b = Table('b', metadata, - Column('id', Integer, primary_key=True), - Column('aid', Integer, ForeignKey('a.id')), - ) + Column('id', Integer, primary_key=True), + Column('aid', Integer, ForeignKey('a.id')), + ) c = Table('c', metadata, - Column('id', Integer, primary_key=True), - Column('bid', Integer, ForeignKey('b.id')), - ) + Column('id', Integer, primary_key=True), + Column('bid', Integer, ForeignKey('b.id')), + ) d = Table('d', metadata, - Column('id', Integer, primary_key=True), - Column('aid', Integer, ForeignKey('a.id')), - ) + Column('id', Integer, primary_key=True), + Column('aid', Integer, ForeignKey('a.id')), + ) j1 = a.outerjoin(b) j2 = select([j1], use_labels=True) @@ -1054,7 +1108,6 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): assert not t1.is_derived_from(select([t1])) assert t1.alias().is_derived_from(t1) - s1 = select([t1, t2]).alias('foo') s2 = select([s1]).limit(5).offset(10).alias() assert s2.is_derived_from(s1) @@ -1108,13 +1161,13 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): 'LIMIT :param_1 OFFSET :param_2) AS anon_1 ' 'LEFT OUTER JOIN table1 AS bar ON ' 'anon_1.col1 = bar.col1', {'param_1': 5, - 'param_2': 10}) + 'param_2': 10}) def test_functions(self): self.assert_compile( - sql_util.ClauseAdapter(t1.alias()).\ - traverse(func.count(t1.c.col1)), - 'count(table1_1.col1)') + sql_util.ClauseAdapter(t1.alias()). + traverse(func.count(t1.c.col1)), + 'count(table1_1.col1)') s = select([func.count(t1.c.col1)]) self.assert_compile(sql_util.ClauseAdapter(t1.alias()).traverse(s), 'SELECT count(table1_1.col1) AS count_1 ' @@ -1123,20 +1176,20 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): def test_recursive(self): metadata = MetaData() a = Table('a', metadata, - Column('id', Integer, primary_key=True)) + Column('id', Integer, primary_key=True)) b = Table('b', metadata, - Column('id', Integer, primary_key=True), - Column('aid', Integer, ForeignKey('a.id')), - ) + Column('id', Integer, primary_key=True), + Column('aid', Integer, ForeignKey('a.id')), + ) c = Table('c', metadata, - Column('id', Integer, primary_key=True), - Column('bid', Integer, ForeignKey('b.id')), - ) + Column('id', Integer, primary_key=True), + Column('bid', Integer, ForeignKey('b.id')), + ) d = Table('d', metadata, - Column('id', Integer, primary_key=True), - Column('aid', Integer, ForeignKey('a.id')), - ) + Column('id', Integer, primary_key=True), + Column('aid', Integer, ForeignKey('a.id')), + ) u = union( a.join(b).select().apply_labels(), @@ -1144,9 +1197,9 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): ).alias() self.assert_compile( - sql_util.ClauseAdapter(u).\ - traverse(select([c.c.bid]).where(c.c.bid == u.c.b_aid)), - "SELECT c.bid "\ + sql_util.ClauseAdapter(u). + traverse(select([c.c.bid]).where(c.c.bid == u.c.b_aid)), + "SELECT c.bid " "FROM c, (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid " "FROM a JOIN b ON a.id = b.aid UNION SELECT a.id AS a_id, d.id " "AS d_id, d.aid AS d_aid " @@ -1154,6 +1207,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): "WHERE c.bid = anon_1.b_aid" ) + class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -1165,13 +1219,19 @@ class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL): return table(name, column('col1'), column('col2'), column('col3')) - table1, table2, table3, table4 = [_table(name) for name in - ('table1', 'table2', 'table3', 'table4')] + table1, table2, table3, table4 = [ + _table(name) for name in ( + 'table1', 'table2', 'table3', 'table4')] def test_splice(self): t1, t2, t3, t4 = table1, table2, table1.alias(), table2.alias() - j = t1.join(t2, t1.c.col1 == t2.c.col1).join(t3, t2.c.col1 - == t3.c.col1).join(t4, t4.c.col1 == t1.c.col1) + j = t1.join( + t2, + t1.c.col1 == t2.c.col1).join( + t3, + t2.c.col1 == t3.c.col1).join( + t4, + t4.c.col1 == t1.c.col1) s = select([t1]).where(t1.c.col2 < 5).alias() self.assert_compile(sql_util.splice_joins(s, j), '(SELECT table1.col1 AS col1, table1.col2 ' @@ -1204,8 +1264,11 @@ class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL): def test_splice_2(self): t2a = table2.alias() t3a = table3.alias() - j1 = table1.join(t2a, table1.c.col1 == t2a.c.col1).join(t3a, - t2a.c.col2 == t3a.c.col2) + j1 = table1.join( + t2a, + table1.c.col1 == t2a.c.col1).join( + t3a, + t2a.c.col2 == t3a.c.col2) t2b = table4.alias() j2 = table1.join(t2b, table1.c.col3 == t2b.c.col3) self.assert_compile(sql_util.splice_joins(table1, j1), @@ -1216,16 +1279,21 @@ class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile(sql_util.splice_joins(table1, j2), 'table1 JOIN table4 AS table4_1 ON ' 'table1.col3 = table4_1.col3') - self.assert_compile(sql_util.splice_joins(sql_util.splice_joins(table1, - j1), j2), - 'table1 JOIN table2 AS table2_1 ON ' - 'table1.col1 = table2_1.col1 JOIN table3 ' - 'AS table3_1 ON table2_1.col2 = ' - 'table3_1.col2 JOIN table4 AS table4_1 ON ' - 'table1.col3 = table4_1.col3') + self.assert_compile( + sql_util.splice_joins( + sql_util.splice_joins( + table1, + j1), + j2), + 'table1 JOIN table2 AS table2_1 ON ' + 'table1.col1 = table2_1.col1 JOIN table3 ' + 'AS table3_1 ON table2_1.col2 = ' + 'table3_1.col2 JOIN table4 AS table4_1 ON ' + 'table1.col3 = table4_1.col3') class SelectTest(fixtures.TestBase, AssertsCompiledSQL): + """tests the generative capability of Select""" __dialect__ = 'default' @@ -1234,16 +1302,15 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): def setup_class(cls): global t1, t2 t1 = table("table1", - column("col1"), - column("col2"), - column("col3"), - ) + column("col1"), + column("col2"), + column("col3"), + ) t2 = table("table2", - column("col1"), - column("col2"), - column("col3"), - ) - + column("col1"), + column("col2"), + column("col3"), + ) def test_columns(self): s = t1.select() @@ -1275,7 +1342,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): 'SELECT table1.col1, table1.col2, ' 'table1.col3 FROM table1') - def test_prefixes(self): s = t1.select() self.assert_compile(s, @@ -1308,7 +1374,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): assert_raises( exc.ArgumentError, select().execution_options, - isolation_level='READ_COMMITTED' + isolation_level='READ_COMMITTED' ) # this feature not available yet @@ -1325,7 +1391,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): s = text('select 42', execution_options=dict(foo='bar')) assert s._execution_options == dict(foo='bar') + class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): + """Tests the generative capability of Insert, Update""" __dialect__ = 'default' @@ -1336,15 +1404,15 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): def setup_class(cls): global t1, t2 t1 = table("table1", - column("col1"), - column("col2"), - column("col3"), - ) + column("col1"), + column("col2"), + column("col3"), + ) t2 = table("table2", - column("col1"), - column("col2"), - column("col3"), - ) + column("col1"), + column("col2"), + column("col3"), + ) def test_prefixes(self): i = t1.insert() @@ -1396,9 +1464,9 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): eq_(i.parameters, None) i = i.values([(5, 6, 7), (8, 9, 10)]) eq_(i.parameters, [ - {"col1": 5, "col2": 6, "col3": 7}, - {"col1": 8, "col2": 9, "col3": 10}, - ] + {"col1": 5, "col2": 6, "col3": 7}, + {"col1": 8, "col2": 9, "col3": 10}, + ] ) def test_inline_values_single(self): |