diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2018-06-14 22:17:00 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2018-06-18 09:12:19 -0400 |
commit | 3619edcb8aa3ceef2a44925b85315fc0e90c5982 (patch) | |
tree | ef01478a8de04a145675ad442006ff6cb52dfafe /lib/sqlalchemy/testing/suite/test_cte.py | |
parent | da5323c2fae39aab45d305f723a73483563b2307 (diff) | |
download | sqlalchemy-3619edcb8aa3ceef2a44925b85315fc0e90c5982.tar.gz |
render WITH clause after INSERT for INSERT..SELECT on Oracle, MySQL
Fixed INSERT FROM SELECT with CTEs for the Oracle and MySQL dialects, where
the CTE was being placed above the entire statement as is typical with
other databases, however Oracle and MariaDB 10.2 wants the CTE underneath
the "INSERT" segment. Note that the Oracle and MySQL dialects don't yet
work when a CTE is applied to a subquery inside of an UPDATE or DELETE
statement, as the CTE is still applied to the top rather than inside the
subquery.
Also adds test suite support CTEs against backends.
Change-Id: I8ac337104d5c546dd4f0cd305632ffb56ac8bf90
Fixes: #4275
Fixes: #4230
Diffstat (limited to 'lib/sqlalchemy/testing/suite/test_cte.py')
-rw-r--r-- | lib/sqlalchemy/testing/suite/test_cte.py | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/lib/sqlalchemy/testing/suite/test_cte.py b/lib/sqlalchemy/testing/suite/test_cte.py new file mode 100644 index 000000000..cc72278e6 --- /dev/null +++ b/lib/sqlalchemy/testing/suite/test_cte.py @@ -0,0 +1,193 @@ +from .. import fixtures, config +from ..assertions import eq_ + +from sqlalchemy import Integer, String, select +from sqlalchemy import ForeignKey +from sqlalchemy import testing + +from ..schema import Table, Column + + +class CTETest(fixtures.TablesTest): + __backend__ = True + __requires__ = 'ctes', + + run_inserts = 'each' + run_deletes = 'each' + + @classmethod + def define_tables(cls, metadata): + Table("some_table", metadata, + Column('id', Integer, primary_key=True), + Column('data', String(50)), + Column("parent_id", ForeignKey("some_table.id"))) + + Table("some_other_table", metadata, + Column('id', Integer, primary_key=True), + Column('data', String(50)), + Column("parent_id", Integer)) + + @classmethod + def insert_data(cls): + config.db.execute( + cls.tables.some_table.insert(), + [ + {"id": 1, "data": "d1", "parent_id": None}, + {"id": 2, "data": "d2", "parent_id": 1}, + {"id": 3, "data": "d3", "parent_id": 1}, + {"id": 4, "data": "d4", "parent_id": 3}, + {"id": 5, "data": "d5", "parent_id": 3} + ] + ) + + def test_select_nonrecursive_round_trip(self): + some_table = self.tables.some_table + + with config.db.connect() as conn: + cte = select([some_table]).where( + some_table.c.data.in_(["d2", "d3", "d4"])).cte("some_cte") + result = conn.execute( + select([cte.c.data]).where(cte.c.data.in_(["d4", "d5"])) + ) + eq_(result.fetchall(), [("d4", )]) + + def test_select_recursive_round_trip(self): + some_table = self.tables.some_table + + with config.db.connect() as conn: + cte = select([some_table]).where( + some_table.c.data.in_(["d2", "d3", "d4"])).cte( + "some_cte", recursive=True) + + cte_alias = cte.alias("c1") + st1 = some_table.alias() + # note that SQL Server requires this to be UNION ALL, + # can't be UNION + cte = cte.union_all( + select([st1]).where(st1.c.id == cte_alias.c.parent_id) + ) + result = conn.execute( + select([cte.c.data]).where( + cte.c.data != "d2").order_by(cte.c.data.desc()) + ) + eq_( + result.fetchall(), + [('d4',), ('d3',), ('d3',), ('d1',), ('d1',), ('d1',)] + ) + + def test_insert_from_select_round_trip(self): + some_table = self.tables.some_table + some_other_table = self.tables.some_other_table + + with config.db.connect() as conn: + cte = select([some_table]).where( + some_table.c.data.in_(["d2", "d3", "d4"]) + ).cte("some_cte") + conn.execute( + some_other_table.insert().from_select( + ["id", "data", "parent_id"], + select([cte]) + ) + ) + eq_( + conn.execute( + select([some_other_table]).order_by(some_other_table.c.id) + ).fetchall(), + [(2, "d2", 1), (3, "d3", 1), (4, "d4", 3)] + ) + + @testing.requires.ctes_with_update_delete + @testing.requires.update_from + def test_update_from_round_trip(self): + some_table = self.tables.some_table + some_other_table = self.tables.some_other_table + + with config.db.connect() as conn: + conn.execute( + some_other_table.insert().from_select( + ['id', 'data', 'parent_id'], + select([some_table]) + ) + ) + + cte = select([some_table]).where( + some_table.c.data.in_(["d2", "d3", "d4"]) + ).cte("some_cte") + conn.execute( + some_other_table.update().values(parent_id=5).where( + some_other_table.c.data == cte.c.data + ) + ) + eq_( + conn.execute( + select([some_other_table]).order_by(some_other_table.c.id) + ).fetchall(), + [ + (1, "d1", None), (2, "d2", 5), + (3, "d3", 5), (4, "d4", 5), (5, "d5", 3) + ] + ) + + @testing.requires.ctes_with_update_delete + @testing.requires.delete_from + def test_delete_from_round_trip(self): + some_table = self.tables.some_table + some_other_table = self.tables.some_other_table + + with config.db.connect() as conn: + conn.execute( + some_other_table.insert().from_select( + ['id', 'data', 'parent_id'], + select([some_table]) + ) + ) + + cte = select([some_table]).where( + some_table.c.data.in_(["d2", "d3", "d4"]) + ).cte("some_cte") + conn.execute( + some_other_table.delete().where( + some_other_table.c.data == cte.c.data + ) + ) + eq_( + conn.execute( + select([some_other_table]).order_by(some_other_table.c.id) + ).fetchall(), + [ + (1, "d1", None), (5, "d5", 3) + ] + ) + + @testing.requires.ctes_with_update_delete + def test_delete_scalar_subq_round_trip(self): + + some_table = self.tables.some_table + some_other_table = self.tables.some_other_table + + with config.db.connect() as conn: + conn.execute( + some_other_table.insert().from_select( + ['id', 'data', 'parent_id'], + select([some_table]) + ) + ) + + cte = select([some_table]).where( + some_table.c.data.in_(["d2", "d3", "d4"]) + ).cte("some_cte") + conn.execute( + some_other_table.delete().where( + some_other_table.c.data == + select([cte.c.data]).where( + cte.c.id == some_other_table.c.id) + ) + ) + eq_( + conn.execute( + select([some_other_table]).order_by(some_other_table.c.id) + ).fetchall(), + [ + (1, "d1", None), (5, "d5", 3) + ] + ) |