diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/dialect/postgresql/test_compiler.py | 26 | ||||
-rw-r--r-- | test/sql/test_compare.py | 10 | ||||
-rw-r--r-- | test/sql/test_cte.py | 152 |
3 files changed, 181 insertions, 7 deletions
diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py index 84901d178..77b4242f1 100644 --- a/test/dialect/postgresql/test_compiler.py +++ b/test/dialect/postgresql/test_compiler.py @@ -2594,7 +2594,7 @@ class InsertOnConflictTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( stmt, "WITH i_upsert AS " - "(INSERT INTO mytable (name) VALUES (%(name)s) " + "(INSERT INTO mytable (name) VALUES (%(param_1)s) " "ON CONFLICT (name, description) " "WHERE description != %(description_1)s " "DO UPDATE SET name = excluded.name " @@ -2603,6 +2603,30 @@ class InsertOnConflictTest(fixtures.TestBase, AssertsCompiledSQL): "FROM i_upsert", ) + def test_combined_with_cte(self): + t = table("t", column("c1"), column("c2")) + + delete_statement_cte = t.delete().where(t.c.c1 < 1).cte("deletions") + + insert_stmt = insert(t).values([{"c1": 1, "c2": 2}]) + update_stmt = insert_stmt.on_conflict_do_update( + index_elements=[t.c.c1], + set_={ + col.name: col + for col in insert_stmt.excluded + if col.name in ("c1", "c2") + }, + ).add_cte(delete_statement_cte) + + self.assert_compile( + update_stmt, + "WITH deletions AS (DELETE FROM t WHERE t.c1 < %(c1_1)s) " + "INSERT INTO t (c1, c2) VALUES (%(c1_m0)s, %(c2_m0)s) " + "ON CONFLICT (c1) DO UPDATE SET c1 = excluded.c1, " + "c2 = excluded.c2", + checkparams={"c1_m0": 1, "c2_m0": 2, "c1_1": 1}, + ) + def test_quote_raw_string_col(self): t = table("t", column("FancyName"), column("other name")) diff --git a/test/sql/test_compare.py b/test/sql/test_compare.py index 365ed52b2..188d9337e 100644 --- a/test/sql/test_compare.py +++ b/test/sql/test_compare.py @@ -491,6 +491,16 @@ class CoreFixtures(object): select(table_a.c.a).join(table_c, table_a.c.a == table_c.c.x), ), lambda: ( + select(table_a.c.a), + select(table_a.c.a).add_cte(table_b.insert().cte()), + table_a.insert(), + table_a.delete(), + table_a.update(), + table_a.insert().add_cte(table_b.insert().cte()), + table_a.delete().add_cte(table_b.insert().cte()), + table_a.update().add_cte(table_b.insert().cte()), + ), + lambda: ( select(table_a.c.a).cte(), select(table_a.c.a).cte(recursive=True), select(table_a.c.a).cte(name="some_cte", recursive=True), diff --git a/test/sql/test_cte.py b/test/sql/test_cte.py index 01186c340..e8a8a3150 100644 --- a/test/sql/test_cte.py +++ b/test/sql/test_cte.py @@ -1015,8 +1015,8 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( insert, - "WITH upsert AS (UPDATE orders SET amount=:amount, " - "product=:product, quantity=:quantity " + "WITH upsert AS (UPDATE orders SET amount=:param_5, " + "product=:param_6, quantity=:param_7 " "WHERE orders.region = :region_1 " "RETURNING orders.region, orders.amount, " "orders.product, orders.quantity) " @@ -1025,6 +1025,16 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL): ":param_3 AS anon_3, :param_4 AS anon_4 WHERE NOT (EXISTS " "(SELECT upsert.region, upsert.amount, upsert.product, " "upsert.quantity FROM upsert))", + checkparams={ + "param_1": "Region1", + "param_2": 1.0, + "param_3": "Product1", + "param_4": 1, + "param_5": 1.0, + "param_6": "Product1", + "param_7": 1, + "region_1": "Region1", + }, ) eq_(insert.compile().isinsert, True) @@ -1106,9 +1116,10 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( stmt.select(), - "WITH anon_1 AS (UPDATE orders SET region=:region " + "WITH anon_1 AS (UPDATE orders SET region=:param_1 " "WHERE orders.region = :region_1 RETURNING orders.region) " "SELECT anon_1.region FROM anon_1", + checkparams={"param_1": "y", "region_1": "x"}, ) eq_(stmt.select().compile().isupdate, False) @@ -1122,8 +1133,9 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( stmt.select(), "WITH anon_1 AS (INSERT INTO orders (region) " - "VALUES (:region) RETURNING orders.region) " + "VALUES (:param_1) RETURNING orders.region) " "SELECT anon_1.region FROM anon_1", + checkparams={"param_1": "y"}, ) eq_(stmt.select().compile().isinsert, False) @@ -1196,10 +1208,11 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( stmt, "WITH t AS " - "(UPDATE products SET price=:price " + "(UPDATE products SET price=:param_1 " "RETURNING products.id, products.price) " "SELECT t.id, t.price " "FROM t", + checkparams={"param_1": "someprice"}, ) eq_(stmt.compile().isupdate, False) @@ -1257,10 +1270,11 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( stmt, "WITH pd AS " - "(INSERT INTO products (id, price) VALUES (:id, :price) " + "(INSERT INTO products (id, price) VALUES (:param_1, :param_2) " "RETURNING products.id, products.price) " "SELECT pd.id, pd.price " "FROM pd", + checkparams={"param_1": 1, "param_2": 27.0}, ) eq_(stmt.compile().isinsert, False) @@ -1353,6 +1367,132 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL): ) eq_(stmt.compile().isdelete, True) + def test_select_uses_independent_cte(self): + products = table("products", column("id"), column("price")) + + upd_cte = ( + products.update().values(price=10).where(products.c.price > 50) + ).cte() + + stmt = products.select().where(products.c.price < 45).add_cte(upd_cte) + + self.assert_compile( + stmt, + "WITH anon_1 AS (UPDATE products SET price=:param_1 " + "WHERE products.price > :price_1) " + "SELECT products.id, products.price " + "FROM products WHERE products.price < :price_2", + checkparams={"param_1": 10, "price_1": 50, "price_2": 45}, + ) + + def test_insert_uses_independent_cte(self): + products = table("products", column("id"), column("price")) + + upd_cte = ( + products.update().values(price=10).where(products.c.price > 50) + ).cte() + + stmt = ( + products.insert().values({"id": 1, "price": 20}).add_cte(upd_cte) + ) + + self.assert_compile( + stmt, + "WITH anon_1 AS (UPDATE products SET price=:param_1 " + "WHERE products.price > :price_1) " + "INSERT INTO products (id, price) VALUES (:id, :price)", + checkparams={"id": 1, "price": 20, "param_1": 10, "price_1": 50}, + ) + + def test_update_uses_independent_cte(self): + products = table("products", column("id"), column("price")) + + upd_cte = ( + products.update().values(price=10).where(products.c.price > 50) + ).cte() + + stmt = ( + products.update() + .values(price=5) + .where(products.c.price < 50) + .add_cte(upd_cte) + ) + + self.assert_compile( + stmt, + "WITH anon_1 AS (UPDATE products SET price=:param_1 " + "WHERE products.price > :price_1) UPDATE products " + "SET price=:price WHERE products.price < :price_2", + checkparams={ + "param_1": 10, + "price": 5, + "price_1": 50, + "price_2": 50, + }, + ) + + def test_update_w_insert_independent_cte(self): + products = table("products", column("id"), column("price")) + + ins_cte = (products.insert().values({"id": 1, "price": 10})).cte() + + stmt = ( + products.update() + .values(price=5) + .where(products.c.price < 50) + .add_cte(ins_cte) + ) + + self.assert_compile( + stmt, + "WITH anon_1 AS (INSERT INTO products (id, price) " + "VALUES (:param_1, :param_2)) " + "UPDATE products SET price=:price WHERE products.price < :price_1", + checkparams={ + "price": 5, + "param_1": 1, + "param_2": 10, + "price_1": 50, + }, + ) + + def test_delete_uses_independent_cte(self): + products = table("products", column("id"), column("price")) + + upd_cte = ( + products.update().values(price=10).where(products.c.price > 50) + ).cte() + + stmt = products.delete().where(products.c.price < 45).add_cte(upd_cte) + + self.assert_compile( + stmt, + "WITH anon_1 AS (UPDATE products SET price=:param_1 " + "WHERE products.price > :price_1) " + "DELETE FROM products WHERE products.price < :price_2", + checkparams={"param_1": 10, "price_1": 50, "price_2": 45}, + ) + + def test_independent_cte_can_be_referenced(self): + products = table("products", column("id"), column("price")) + + cte = products.select().cte("pd") + + stmt = ( + products.update() + .where(products.c.price == cte.c.price) + .add_cte(cte) + ) + + self.assert_compile( + stmt, + "WITH pd AS " + "(SELECT products.id AS id, products.price AS price " + "FROM products) " + "UPDATE products SET id=:id, price=:price FROM pd " + "WHERE products.price = pd.price", + ) + def test_standalone_function(self): a = table("a", column("x")) a_stmt = select(a) |