diff options
Diffstat (limited to 'test/sql/test_compiler.py')
-rw-r--r-- | test/sql/test_compiler.py | 135 |
1 files changed, 127 insertions, 8 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index 3e6b87351..4b143c150 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -238,6 +238,22 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): checkparams=params ) + def test_limit_offset_select_literal_binds(self): + stmt = select([1]).limit(5).offset(6) + self.assert_compile( + stmt, + "SELECT 1 LIMIT 5 OFFSET 6", + literal_binds=True + ) + + def test_limit_offset_compound_select_literal_binds(self): + stmt = select([1]).union(select([2])).limit(5).offset(6) + self.assert_compile( + stmt, + "SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6", + literal_binds=True + ) + def test_select_precol_compile_ordering(self): s1 = select([column('x')]).select_from(text('a')).limit(5).as_scalar() s2 = select([s1]).limit(10) @@ -419,6 +435,19 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): dialect=default.DefaultDialect(paramstyle='pyformat') ) + def test_anon_param_name_on_keys(self): + self.assert_compile( + keyed.insert(), + "INSERT INTO keyed (x, y, z) VALUES (%(colx)s, %(coly)s, %(z)s)", + dialect=default.DefaultDialect(paramstyle='pyformat') + ) + self.assert_compile( + keyed.c.coly == 5, + "keyed.y = %(coly_1)s", + checkparams={'coly_1': 5}, + dialect=default.DefaultDialect(paramstyle='pyformat') + ) + def test_dupe_columns(self): """test that deduping is performed against clause element identity, not rendered result.""" @@ -2411,7 +2440,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): """SELECT /*+ "QuotedName" idx1 */ "QuotedName".col1 """ """FROM "QuotedName" WHERE "QuotedName".col1 > :col1_1"""), (s7, oracle_d, - """SELECT /*+ SomeName idx1 */ "SomeName".col1 FROM """ + """SELECT /*+ "SomeName" idx1 */ "SomeName".col1 FROM """ """"QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1"""), ]: self.assert_compile( @@ -3351,7 +3380,7 @@ class ResultMapTest(fixtures.TestBase): stmt = select([t]).union(select([t])) comp = stmt.compile() eq_( - comp.result_map, + comp._create_result_map(), {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type), 'b': ('b', (t.c.b, 'b', 'b'), t.c.b.type)} ) @@ -3362,7 +3391,7 @@ class ResultMapTest(fixtures.TestBase): stmt = select([t.c.a]).select_from(t.join(subq, t.c.a == subq.c.a)) comp = stmt.compile() eq_( - comp.result_map, + comp._create_result_map(), {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)} ) @@ -3371,7 +3400,7 @@ class ResultMapTest(fixtures.TestBase): stmt = select([t.c.a]).union(select([t.c.b])) comp = stmt.compile() eq_( - comp.result_map, + comp._create_result_map(), {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)}, ) @@ -3381,9 +3410,9 @@ class ResultMapTest(fixtures.TestBase): tc = type_coerce(t.c.a, String) stmt = select([t.c.a, l1, tc]) comp = stmt.compile() - tc_anon_label = comp.result_map['a_1'][1][0] + tc_anon_label = comp._create_result_map()['a_1'][1][0] eq_( - comp.result_map, + comp._create_result_map(), { 'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type), 'bar': ('bar', (l1, 'bar'), l1.type), @@ -3402,9 +3431,99 @@ class ResultMapTest(fixtures.TestBase): t1.join(union, t1.c.a == union.c.t1_a)).apply_labels() comp = stmt.compile() eq_( - set(comp.result_map), + set(comp._create_result_map()), set(['t1_1_b', 't1_1_a', 't1_a', 't1_b']) ) is_( - comp.result_map['t1_a'][1][2], t1.c.a + comp._create_result_map()['t1_a'][1][2], t1.c.a ) + + def test_insert_with_select_values(self): + astring = Column('a', String) + aint = Column('a', Integer) + m = MetaData() + Table('t1', m, astring) + t2 = Table('t2', m, aint) + + stmt = t2.insert().values(a=select([astring])).returning(aint) + comp = stmt.compile(dialect=postgresql.dialect()) + eq_( + comp._create_result_map(), + {'a': ('a', (aint, 'a', 'a'), aint.type)} + ) + + def test_insert_from_select(self): + astring = Column('a', String) + aint = Column('a', Integer) + m = MetaData() + Table('t1', m, astring) + t2 = Table('t2', m, aint) + + stmt = t2.insert().from_select(['a'], select([astring])).\ + returning(aint) + comp = stmt.compile(dialect=postgresql.dialect()) + eq_( + comp._create_result_map(), + {'a': ('a', (aint, 'a', 'a'), aint.type)} + ) + + def test_nested_api(self): + from sqlalchemy.engine.result import ResultMetaData + stmt2 = select([table2]) + + stmt1 = select([table1]).select_from(stmt2) + + contexts = {} + + int_ = Integer() + + class MyCompiler(compiler.SQLCompiler): + def visit_select(self, stmt, *arg, **kw): + + if stmt is stmt2: + with self._nested_result() as nested: + contexts[stmt2] = nested + text = super(MyCompiler, self).visit_select(stmt2) + self._add_to_result_map("k1", "k1", (1, 2, 3), int_) + else: + text = super(MyCompiler, self).visit_select( + stmt, *arg, **kw) + self._add_to_result_map("k2", "k2", (3, 4, 5), int_) + return text + + comp = MyCompiler(default.DefaultDialect(), stmt1) + + eq_( + ResultMetaData._create_result_map(contexts[stmt2][0]), + { + 'otherid': ( + 'otherid', + (table2.c.otherid, 'otherid', 'otherid'), + table2.c.otherid.type), + 'othername': ( + 'othername', + (table2.c.othername, 'othername', 'othername'), + table2.c.othername.type), + 'k1': ('k1', (1, 2, 3), int_) + } + ) + eq_( + comp._create_result_map(), + { + 'myid': ( + 'myid', + (table1.c.myid, 'myid', 'myid'), table1.c.myid.type + ), + 'k2': ('k2', (3, 4, 5), int_), + 'name': ( + 'name', (table1.c.name, 'name', 'name'), + table1.c.name.type), + 'description': ( + 'description', + (table1.c.description, 'description', 'description'), + table1.c.description.type)} + ) + + + + |