diff options
Diffstat (limited to 'test/sql/test_generative.py')
-rw-r--r-- | test/sql/test_generative.py | 110 |
1 files changed, 55 insertions, 55 deletions
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py index e129f69c4..627736370 100644 --- a/test/sql/test_generative.py +++ b/test/sql/test_generative.py @@ -27,7 +27,7 @@ class TraversalTest(TestBase, AssertsExecutionResults): return other is self __hash__ = ClauseElement.__hash__ - + def __eq__(self, other): return other.expr == self.expr @@ -100,7 +100,7 @@ class TraversalTest(TestBase, AssertsExecutionResults): s2 = vis.traverse(struct) assert struct == s2 assert not struct.is_other(s2) - + def test_no_clone(self): struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) @@ -151,9 +151,9 @@ class TraversalTest(TestBase, AssertsExecutionResults): class CustomObj(Column): pass - + assert CustomObj.__visit_name__ == Column.__visit_name__ == 'column' - + foo, bar = CustomObj('foo', String), CustomObj('bar', String) bin = foo == bar s = set(ClauseVisitor().iterate(bin)) @@ -193,7 +193,7 @@ class ClauseTest(TestBase, AssertsCompiledSQL): f = sql_util.ClauseAdapter(a).traverse(f) self.assert_compile(select([f]), "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1") - + def test_join(self): clause = t1.join(t2, t1.c.col2==t2.c.col2) c1 = str(clause) @@ -206,20 +206,20 @@ class ClauseTest(TestBase, AssertsCompiledSQL): clause2 = Vis().traverse(clause) assert c1 == str(clause) assert str(clause2) == str(t1.join(t2, t1.c.col2==t2.c.col3)) - + def test_aliased_column_adapt(self): clause = t1.select() - + aliased = t1.select().alias() aliased2 = t1.alias() adapter = sql_util.ColumnAdapter(aliased) - + f = select([ adapter.columns[c] for c in aliased2.c ]).select_from(aliased) - + s = select([aliased2]).select_from(aliased) eq_(str(s), str(f)) @@ -230,8 +230,8 @@ class ClauseTest(TestBase, AssertsCompiledSQL): str(select([func.count(aliased2.c.col1)]).select_from(aliased)), str(f) ) - - + + def test_text(self): clause = text("select * from table where foo=:bar", bindparams=[bindparam('bar')]) c1 = str(clause) @@ -288,7 +288,7 @@ class ClauseTest(TestBase, AssertsCompiledSQL): print str(s5) assert str(s5) == s5_assert assert str(s4) == s4_assert - + def test_union(self): u = union(t1.select(), t2.select()) u2 = CloningVisitor().traverse(u) @@ -300,27 +300,27 @@ class ClauseTest(TestBase, AssertsCompiledSQL): u2 = CloningVisitor().traverse(u) assert str(u) == str(u2) assert [str(c) for c in u2.c] == cols - + s1 = select([t1], t1.c.col1 == bindparam('id_param')) s2 = select([t2]) u = union(s1, s2) - + u2 = u.params(id_param=7) u3 = u.params(id_param=10) assert str(u) == str(u2) == str(u3) assert u2.compile().params == {'id_param':7} assert u3.compile().params == {'id_param':10} - + def test_in(self): expr = t1.c.col1.in_(['foo', 'bar']) expr2 = CloningVisitor().traverse(expr) assert str(expr) == str(expr2) - + def test_adapt_union(self): u = union(t1.select().where(t1.c.col1==4), t1.select().where(t1.c.col1==5)).alias() - + assert sql_util.ClauseAdapter(u).traverse(t1) is u - + def test_binds(self): """test that unique bindparams change their name upon clone() to prevent conflicts""" @@ -340,17 +340,17 @@ class ClauseTest(TestBase, AssertsCompiledSQL): "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1, "\ "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_2) AS anon_2 "\ "WHERE anon_1.col2 = anon_2.col2") - + def test_extract(self): s = select([extract('foo', t1.c.col1).label('col1')]) self.assert_compile(s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1") - + s2 = CloningVisitor().traverse(s).alias() s3 = select([s2.c.col1]) self.assert_compile(s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1") self.assert_compile(s3, "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1) AS anon_1") - - + + @testing.emits_warning('.*replaced by another column with the same key') def test_alias(self): subq = t2.select().alias('subq') @@ -372,7 +372,7 @@ class ClauseTest(TestBase, AssertsCompiledSQL): s = select([t1.c.col1, subq.c.col1], from_obj=[t1, subq, t1.join(subq, t1.c.col1==subq.c.col2)]) s5 = CloningVisitor().traverse(s) assert orig == str(s) == str(s5) - + def test_correlated_select(self): s = select(['*'], t1.c.col1==t2.c.col1, from_obj=[t1, t2]).correlate(t2) class Vis(CloningVisitor): @@ -380,32 +380,32 @@ class ClauseTest(TestBase, AssertsCompiledSQL): select.append_whereclause(t1.c.col2==7) self.assert_compile(Vis().traverse(s), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :col2_1") - + def test_this_thing(self): s = select([t1]).where(t1.c.col1=='foo').alias() s2 = select([s.c.col1]) - + self.assert_compile(s2, "SELECT anon_1.col1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1") t1a = t1.alias() s2 = sql_util.ClauseAdapter(t1a).traverse(s2) self.assert_compile(s2, "SELECT anon_1.col1 FROM (SELECT table1_1.col1 AS col1, table1_1.col2 AS col2, table1_1.col3 AS col3 FROM table1 AS table1_1 WHERE table1_1.col1 = :col1_1) AS anon_1") - + def test_select_fromtwice(self): t1a = t1.alias() - + s = select([1], t1.c.col1==t1a.c.col1, from_obj=t1a).correlate(t1) self.assert_compile(s, "SELECT 1 FROM table1 AS table1_1 WHERE table1.col1 = table1_1.col1") - + s = CloningVisitor().traverse(s) self.assert_compile(s, "SELECT 1 FROM table1 AS table1_1 WHERE table1.col1 = table1_1.col1") - + s = select([t1]).where(t1.c.col1=='foo').alias() - + s2 = select([1], t1.c.col1==s.c.col1, from_obj=s).correlate(t1) self.assert_compile(s2, "SELECT 1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1 WHERE table1.col1 = anon_1.col1") s2 = ReplacingCloningVisitor().traverse(s2) self.assert_compile(s2, "SELECT 1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1 WHERE table1.col1 = anon_1.col1") - + class ClauseAdapterTest(TestBase, AssertsCompiledSQL): @classmethod def setup_class(cls): @@ -446,7 +446,7 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)") s = CloningVisitor().traverse(s) self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)") - + s = select(['*']).where(t1.c.col1==t2.c.col1).as_scalar() self.assert_compile(select([t1.c.col1, s]), "SELECT table1.col1, (SELECT * FROM table2 WHERE table1.col1 = table2.col1) AS anon_1 FROM table1") vis = sql_util.ClauseAdapter(t1alias) @@ -478,7 +478,7 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): j1 = addresses.join(ualias, addresses.c.user_id==ualias.c.id) self.assert_compile(sql_util.ClauseAdapter(j1).traverse(s), "SELECT count(addresses.id) AS count_1 FROM addresses WHERE users_1.id = addresses.user_id") - + def test_table_to_alias(self): t1alias = t1.alias('t1alias') @@ -543,7 +543,7 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): a = Table('a', m, Column('x', Integer), Column('y', Integer)) b = Table('b', m, Column('x', Integer), Column('y', Integer)) c = Table('c', m, Column('x', Integer), Column('y', Integer)) - + # force a recursion overflow, by linking a.c.x<->c.c.x, and # asking for a nonexistent col. corresponding_column should prevent # endless depth. @@ -557,13 +557,13 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): c = Table('c', m, Column('x', Integer), Column('y', Integer)) alias = select([a]).select_from(a.join(b, a.c.x==b.c.x)).alias() - + # two levels of indirection from c.x->b.x->a.x, requires recursive # corresponding_column call adapt = sql_util.ClauseAdapter(alias, equivalents= {b.c.x: set([ a.c.x]), c.c.x:set([b.c.x])}) assert adapt._corresponding_column(a.c.x, False) is alias.c.x assert adapt._corresponding_column(c.c.x, False) is alias.c.x - + def test_join_to_alias(self): metadata = MetaData() a = Table('a', metadata, @@ -642,13 +642,13 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT :param_1 OFFSET :param_2) AS anon_1 "\ "LEFT OUTER JOIN table1 AS bar ON anon_1.col1 = bar.col1", {'param_1':5, 'param_2':10}) - + def test_functions(self): self.assert_compile(sql_util.ClauseAdapter(t1.alias()).traverse(func.count(t1.c.col1)), "count(table1_1.col1)") s = select([func.count(t1.c.col1)]) self.assert_compile(sql_util.ClauseAdapter(t1.alias()).traverse(s), "SELECT count(table1_1.col1) AS count_1 FROM table1 AS table1_1") - + def test_recursive(self): metadata = MetaData() a = Table('a', metadata, @@ -670,8 +670,8 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): u = union( a.join(b).select().apply_labels(), a.join(d).select().apply_labels() - ).alias() - + ).alias() + self.assert_compile( sql_util.ClauseAdapter(u).traverse(select([c.c.bid]).where(c.c.bid==u.c.b_aid)), "SELECT c.bid "\ @@ -687,16 +687,16 @@ class SpliceJoinsTest(TestBase, AssertsCompiledSQL): global table1, table2, table3, table4 def _table(name): return table(name, column("col1"), column("col2"),column("col3")) - - table1, table2, table3, table4 = [_table(name) for name in ("table1", "table2", "table3", "table4")] + + table1, table2, table3, table4 = [_table(name) for name in ("table1", "table2", "table3", "table4")] def test_splice(self): (t1, t2, t3, t4) = (table1, table2, table1.alias(), table2.alias()) - + j = t1.join(t2, t1.c.col1==t2.c.col1).join(t3, t2.c.col1==t3.c.col1).join(t4, t4.c.col1==t1.c.col1) - + s = select([t1]).where(t1.c.col2<5).alias() - + self.assert_compile(sql_util.splice_joins(s, j), "(SELECT table1.col1 AS col1, table1.col2 AS col2, "\ "table1.col3 AS col3 FROM table1 WHERE table1.col2 < :col2_1) AS anon_1 "\ @@ -705,12 +705,12 @@ class SpliceJoinsTest(TestBase, AssertsCompiledSQL): def test_stop_on(self): (t1, t2, t3) = (table1, table2, table3) - + j1= t1.join(t2, t1.c.col1==t2.c.col1) j2 = j1.join(t3, t2.c.col1==t3.c.col1) - + s = select([t1]).select_from(j1).alias() - + self.assert_compile(sql_util.splice_joins(s, j2), "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 JOIN table2 "\ "ON table1.col1 = table2.col1) AS anon_1 JOIN table2 ON anon_1.col1 = table2.col1 JOIN table3 "\ @@ -720,27 +720,27 @@ class SpliceJoinsTest(TestBase, AssertsCompiledSQL): self.assert_compile(sql_util.splice_joins(s, j2, j1), "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 "\ "JOIN table2 ON table1.col1 = table2.col1) AS anon_1 JOIN table3 ON table2.col1 = table3.col1") - + def test_splice_2(self): t2a = table2.alias() t3a = table3.alias() j1 = table1.join(t2a, table1.c.col1==t2a.c.col1).join(t3a, t2a.c.col2==t3a.c.col2) - + t2b = table4.alias() j2 = table1.join(t2b, table1.c.col3==t2b.c.col3) - + self.assert_compile(sql_util.splice_joins(table1, j1), "table1 JOIN table2 AS table2_1 ON table1.col1 = table2_1.col1 "\ "JOIN table3 AS table3_1 ON table2_1.col2 = table3_1.col2") - + self.assert_compile(sql_util.splice_joins(table1, j2), "table1 JOIN table4 AS table4_1 ON table1.col3 = table4_1.col3") self.assert_compile(sql_util.splice_joins(sql_util.splice_joins(table1, j1), j2), "table1 JOIN table2 AS table2_1 ON table1.col1 = table2_1.col1 "\ "JOIN table3 AS table3_1 ON table2_1.col2 = table3_1.col2 "\ "JOIN table4 AS table4_1 ON table1.col3 = table4_1.col3") - - + + class SelectTest(TestBase, AssertsCompiledSQL): """tests the generative capability of Select""" @@ -838,7 +838,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): assert s._execution_options == dict(foo='bar') # s2 should have its execution_options based on s, though. assert s2._execution_options == dict(foo='bar', bar='baz') - + # this feature not available yet def _NOTYET_test_execution_options_in_text(self): s = text('select 42', execution_options=dict(foo='bar')) |