diff options
Diffstat (limited to 'test/sql/test_functions.py')
-rw-r--r-- | test/sql/test_functions.py | 496 |
1 files changed, 496 insertions, 0 deletions
diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py index 19562dade..e460a90cb 100644 --- a/test/sql/test_functions.py +++ b/test/sql/test_functions.py @@ -5,12 +5,15 @@ import decimal from sqlalchemy import ARRAY from sqlalchemy import bindparam from sqlalchemy import Boolean +from sqlalchemy import cast from sqlalchemy import Column from sqlalchemy import Date from sqlalchemy import DateTime from sqlalchemy import extract +from sqlalchemy import Float from sqlalchemy import func from sqlalchemy import Integer +from sqlalchemy import JSON from sqlalchemy import literal from sqlalchemy import literal_column from sqlalchemy import Numeric @@ -20,6 +23,8 @@ from sqlalchemy import sql from sqlalchemy import String from sqlalchemy import Table from sqlalchemy import testing +from sqlalchemy import Text +from sqlalchemy import true from sqlalchemy import types as sqltypes from sqlalchemy import util from sqlalchemy.dialects import mysql @@ -1150,3 +1155,494 @@ class RegisterTest(fixtures.TestBase, AssertsCompiledSQL): assert "not_registered_func" not in functions._registry["_default"] assert isinstance(func.not_registered_func_child().type, Integer) + + +class TableValuedCompileTest(fixtures.TestBase, AssertsCompiledSQL): + """test the full set of functions as FROM developed in [ticket:3566]""" + + __dialect__ = "default_enhanced" + + def test_aggregate_scalar_over_table_valued(self): + test = table("test", column("id"), column("data", JSON)) + + elem = ( + func.json_array_elements_text(test.c.data["key"]) + .table_valued("value") + .alias("elem") + ) + + maxdepth = select(func.max(cast(elem.c.value, Float))).label( + "maxdepth" + ) + + stmt = select(test.c.id.label("test_id"), maxdepth).order_by( + "maxdepth" + ) + + self.assert_compile( + stmt, + "SELECT test.id AS test_id, " + "(SELECT max(CAST(elem.value AS FLOAT)) AS max_1 " + "FROM json_array_elements_text(test.data[:data_1]) AS elem) " + "AS maxdepth " + "FROM test ORDER BY maxdepth", + ) + + def test_scalar_table_valued(self): + assets_transactions = table( + "assets_transactions", column("id"), column("contents", JSON) + ) + + stmt = select( + assets_transactions.c.id, + func.jsonb_each( + assets_transactions.c.contents + ).scalar_table_valued("key"), + func.jsonb_each( + assets_transactions.c.contents + ).scalar_table_valued("value"), + ) + self.assert_compile( + stmt, + "SELECT assets_transactions.id, " + "(jsonb_each(assets_transactions.contents)).key, " + "(jsonb_each(assets_transactions.contents)).value " + "FROM assets_transactions", + ) + + def test_table_valued_one(self): + assets_transactions = table( + "assets_transactions", column("id"), column("contents", JSON) + ) + + jb = func.jsonb_each(assets_transactions.c.contents).table_valued( + "key", "value" + ) + + stmt = select(assets_transactions.c.id, jb.c.key, jb.c.value).join( + jb, true() + ) + + self.assert_compile( + stmt, + "SELECT assets_transactions.id, anon_1.key, anon_1.value " + "FROM assets_transactions " + "JOIN jsonb_each(assets_transactions.contents) AS anon_1 ON true", + ) + + def test_table_valued_two(self): + """ + SELECT vi.id, vv.value + FROM value_ids() AS vi JOIN values AS vv ON vv.id = vi.id + + """ + + values = table( + "values", + column( + "id", + Integer, + ), + column("value", String), + ) + vi = func.value_ids().table_valued(column("id", Integer)).alias("vi") + vv = values.alias("vv") + + stmt = select(vi.c.id, vv.c.value).select_from( # noqa + vi.join(vv, vv.c.id == vi.c.id) + ) + self.assert_compile( + stmt, + "SELECT vi.id, vv.value FROM value_ids() AS vi " + "JOIN values AS vv ON vv.id = vi.id", + ) + + def test_table_as_table_valued(self): + a = table( + "a", + column("id"), + column("x"), + column("y"), + ) + + stmt = select(func.row_to_json(a.table_valued())) + + self.assert_compile( + stmt, "SELECT row_to_json(a) AS row_to_json_1 FROM a" + ) + + def test_subquery_as_table_valued(self): + """ + SELECT row_to_json(anon_1) AS row_to_json_1 + FROM (SELECT a.id AS id, a.x AS x, a.y AS y + FROM a) AS anon_1 + + """ + + a = table( + "a", + column("id"), + column("x"), + column("y"), + ) + + stmt = select(func.row_to_json(a.select().subquery().table_valued())) + + self.assert_compile( + stmt, + "SELECT row_to_json(anon_1) AS row_to_json_1 FROM " + "(SELECT a.id AS id, a.x AS x, a.y AS y FROM a) AS anon_1", + ) + + def test_scalar_subquery(self): + + a = table( + "a", + column("id"), + column("x"), + column("y"), + ) + + stmt = select(func.row_to_json(a.select().scalar_subquery())) + + self.assert_compile( + stmt, + "SELECT row_to_json((SELECT a.id, a.x, a.y FROM a)) " + "AS row_to_json_1", + ) + + def test_named_with_ordinality(self): + """ + SELECT a.id AS a_id, a.refs AS a_refs, + unnested.unnested AS unnested_unnested, + unnested.ordinality AS unnested_ordinality, + b.id AS b_id, b.ref AS b_ref + FROM a LEFT OUTER JOIN unnest(a.refs) + `WITH ORDINALITY AS unnested(unnested, ordinality) ON true + LEFT OUTER JOIN b ON unnested.unnested = b.ref + + """ # noqa 501 + + a = table("a", column("id"), column("refs")) + b = table("b", column("id"), column("ref")) + + unnested = ( + func.unnest(a.c.refs) + .table_valued("unnested", with_ordinality="ordinality") + .render_derived() + .alias("unnested") + ) + + stmt = ( + select( + a.c.id, a.c.refs, unnested.c.unnested, unnested.c.ordinality + ) + .outerjoin(unnested, true()) + .outerjoin( + b, + unnested.c.unnested == b.c.ref, + ) + ) + self.assert_compile( + stmt, + "SELECT a.id, a.refs, unnested.unnested, unnested.ordinality " + "FROM a " + "LEFT OUTER JOIN unnest(a.refs) " + "WITH ORDINALITY AS unnested(unnested, ordinality) ON true " + "LEFT OUTER JOIN b ON unnested.unnested = b.ref", + ) + + def test_star_with_ordinality(self): + """ + SELECT * FROM generate_series(4,1,-1) WITH ORDINALITY; + """ + + stmt = select("*").select_from( # noqa + func.generate_series(4, 1, -1).table_valued( + with_ordinality="ordinality" + ) + ) + self.assert_compile( + stmt, + "SELECT * FROM generate_series" + "(:generate_series_1, :generate_series_2, :generate_series_3) " + "WITH ORDINALITY AS anon_1", + ) + + def test_json_object_keys_with_ordinality(self): + """ + SELECT * FROM json_object_keys('{"a1":"1","a2":"2","a3":"3"}') + WITH ORDINALITY AS t(keys, n); + """ + stmt = select("*").select_from( + func.json_object_keys( + literal({"a1": "1", "a2": "2", "a3": "3"}, type_=JSON) + ) + .table_valued("keys", with_ordinality="n") + .render_derived() + .alias("t") + ) + + self.assert_compile( + stmt, + "SELECT * FROM json_object_keys(:param_1) " + "WITH ORDINALITY AS t(keys, n)", + ) + + def test_alias_column(self): + """ + + :: + + SELECT x, y + FROM + generate_series(:generate_series_1, :generate_series_2) AS x, + generate_series(:generate_series_3, :generate_series_4) AS y + + """ + + x = func.generate_series(1, 2).alias("x") + y = func.generate_series(3, 4).alias("y") + stmt = select(x.column, y.column) + + self.assert_compile( + stmt, + "SELECT x, y FROM " + "generate_series(:generate_series_1, :generate_series_2) AS x, " + "generate_series(:generate_series_3, :generate_series_4) AS y", + ) + + def test_column_valued_one(self): + fn = func.unnest(["one", "two", "three", "four"]).column_valued() + + stmt = select(fn) + + self.assert_compile( + stmt, "SELECT anon_1 FROM unnest(:unnest_1) AS anon_1" + ) + + def test_column_valued_two(self): + """ + + :: + + SELECT x, y + FROM + generate_series(:generate_series_1, :generate_series_2) AS x, + generate_series(:generate_series_3, :generate_series_4) AS y + + """ + + x = func.generate_series(1, 2).column_valued("x") + y = func.generate_series(3, 4).column_valued("y") + stmt = select(x, y) + + self.assert_compile( + stmt, + "SELECT x, y FROM " + "generate_series(:generate_series_1, :generate_series_2) AS x, " + "generate_series(:generate_series_3, :generate_series_4) AS y", + ) + + def test_column_valued_subquery(self): + x = func.generate_series(1, 2).column_valued("x") + y = func.generate_series(3, 4).column_valued("y") + subq = select(x, y).subquery() + stmt = select(subq).where(subq.c.x > 2) + + self.assert_compile( + stmt, + "SELECT anon_1.x, anon_1.y FROM " + "(SELECT x, y FROM " + "generate_series(:generate_series_1, :generate_series_2) AS x, " + "generate_series(:generate_series_3, :generate_series_4) AS y" + ") AS anon_1 " + "WHERE anon_1.x > :x_1", + ) + + @testing.combinations((True,), (False,)) + def test_render_derived_with_lateral(self, apply_alias_after_lateral): + """ + # this is the "record" type + + SELECT + table1.user_id AS table1_user_id, + table2.name AS table2_name, + jsonb_table.name AS jsonb_table_name, + count(jsonb_table.time) AS count_1 + FROM table1 + JOIN table2 ON table1.user_id = table2.id + JOIN LATERAL jsonb_to_recordset(table1.jsonb) + AS jsonb_table(name TEXT, time FLOAT) ON true + WHERE table2.route_id = %(route_id_1)s + AND jsonb_table.name IN (%(name_1)s, %(name_2)s, %(name_3)s) + GROUP BY table1.user_id, table2.name, jsonb_table.name + ORDER BY table2.name + + """ # noqa + + table1 = table("table1", column("user_id"), column("jsonb")) + table2 = table( + "table2", column("id"), column("name"), column("route_id") + ) + jsonb_table = func.jsonb_to_recordset(table1.c.jsonb).table_valued( + column("name", Text), column("time", Float) + ) + + # I'm a little concerned about the naming, that lateral() and + # alias() both make a new name unconditionally. lateral() already + # works this way, so try to just make sure .alias() after the + # fact works too + if apply_alias_after_lateral: + jsonb_table = ( + jsonb_table.render_derived(with_types=True) + .lateral() + .alias("jsonb_table") + ) + else: + jsonb_table = jsonb_table.render_derived(with_types=True).lateral( + "jsonb_table" + ) + + stmt = ( + select( + table1.c.user_id, + table2.c.name, + jsonb_table.c.name.label("jsonb_table_name"), + func.count(jsonb_table.c.time), + ) + .select_from(table1) + .join(table2, table1.c.user_id == table2.c.id) + .join(jsonb_table, true()) + .where(table2.c.route_id == 5) + .where(jsonb_table.c.name.in_(["n1", "n2", "n3"])) + .group_by(table1.c.user_id, table2.c.name, jsonb_table.c.name) + .order_by(table2.c.name) + ) + + self.assert_compile( + stmt, + "SELECT table1.user_id, table2.name, " + "jsonb_table.name AS jsonb_table_name, " + "count(jsonb_table.time) AS count_1 " + "FROM table1 " + "JOIN table2 ON table1.user_id = table2.id " + "JOIN LATERAL jsonb_to_recordset(table1.jsonb) " + "AS jsonb_table(name TEXT, time FLOAT) ON true " + "WHERE table2.route_id = 5 " + "AND jsonb_table.name IN ('n1', 'n2', 'n3') " + "GROUP BY table1.user_id, table2.name, jsonb_table.name " + "ORDER BY table2.name", + literal_binds=True, + render_postcompile=True, + ) + + def test_function_alias(self): + """ + :: + + SELECT result_elem -> 'Field' as field + FROM "check" AS check_, json_array_elements( + ( + SELECT check_inside.response -> 'Results' + FROM "check" as check_inside + WHERE check_inside.id = check_.id + ) + ) AS result_elem + WHERE result_elem ->> 'Name' = 'FooBar' + + """ + check = table("check", column("id"), column("response", JSON)) + + check_inside = check.alias("check_inside") + check_outside = check.alias("_check") + + subq = ( + select(check_inside.c.response["Results"]) + .where(check_inside.c.id == check_outside.c.id) + .scalar_subquery() + ) + + fn = func.json_array_elements(subq, type_=JSON).alias("result_elem") + + stmt = ( + select(fn.column["Field"].label("field")) + .where(fn.column["Name"] == "FooBar") + .select_from(check_outside) + ) + + self.assert_compile( + stmt, + "SELECT result_elem[:result_elem_1] AS field " + "FROM json_array_elements(" + "(SELECT check_inside.response[:response_1] AS anon_1 " + 'FROM "check" AS check_inside ' + "WHERE check_inside.id = _check.id)" + ') AS result_elem, "check" AS _check ' + "WHERE result_elem[:result_elem_2] = :param_1", + ) + + def test_named_table_valued(self): + + fn = ( + func.json_to_recordset( # noqa + '[{"a":1,"b":"foo"},{"a":"2","c":"bar"}]' + ) + .table_valued(column("a", Integer), column("b", String)) + .render_derived(with_types=True) + ) + + stmt = select(fn.c.a, fn.c.b) + + self.assert_compile( + stmt, + "SELECT anon_1.a, anon_1.b " + "FROM json_to_recordset(:json_to_recordset_1) " + "AS anon_1(a INTEGER, b VARCHAR)", + ) + + def test_named_table_valued_subquery(self): + + fn = ( + func.json_to_recordset( # noqa + '[{"a":1,"b":"foo"},{"a":"2","c":"bar"}]' + ) + .table_valued(column("a", Integer), column("b", String)) + .render_derived(with_types=True) + ) + + stmt = select(fn.c.a, fn.c.b).subquery() + + stmt = select(stmt) + + self.assert_compile( + stmt, + "SELECT anon_1.a, anon_1.b FROM " + "(SELECT anon_2.a AS a, anon_2.b AS b " + "FROM json_to_recordset(:json_to_recordset_1) " + "AS anon_2(a INTEGER, b VARCHAR)" + ") AS anon_1", + ) + + def test_named_table_valued_alias(self): + + """select * from json_to_recordset + ('[{"a":1,"b":"foo"},{"a":"2","c":"bar"}]') as x(a int, b text);""" + + fn = ( + func.json_to_recordset( # noqa + '[{"a":1,"b":"foo"},{"a":"2","c":"bar"}]' + ) + .table_valued(column("a", Integer), column("b", String)) + .render_derived(with_types=True) + .alias("jbr") + ) + + stmt = select(fn.c.a, fn.c.b) + + self.assert_compile( + stmt, + "SELECT jbr.a, jbr.b " + "FROM json_to_recordset(:json_to_recordset_1) " + "AS jbr(a INTEGER, b VARCHAR)", + ) |