summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing/suite/test_cte.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2018-06-14 22:17:00 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2018-06-18 09:12:19 -0400
commit3619edcb8aa3ceef2a44925b85315fc0e90c5982 (patch)
treeef01478a8de04a145675ad442006ff6cb52dfafe /lib/sqlalchemy/testing/suite/test_cte.py
parentda5323c2fae39aab45d305f723a73483563b2307 (diff)
downloadsqlalchemy-3619edcb8aa3ceef2a44925b85315fc0e90c5982.tar.gz
render WITH clause after INSERT for INSERT..SELECT on Oracle, MySQL
Fixed INSERT FROM SELECT with CTEs for the Oracle and MySQL dialects, where the CTE was being placed above the entire statement as is typical with other databases, however Oracle and MariaDB 10.2 wants the CTE underneath the "INSERT" segment. Note that the Oracle and MySQL dialects don't yet work when a CTE is applied to a subquery inside of an UPDATE or DELETE statement, as the CTE is still applied to the top rather than inside the subquery. Also adds test suite support CTEs against backends. Change-Id: I8ac337104d5c546dd4f0cd305632ffb56ac8bf90 Fixes: #4275 Fixes: #4230
Diffstat (limited to 'lib/sqlalchemy/testing/suite/test_cte.py')
-rw-r--r--lib/sqlalchemy/testing/suite/test_cte.py193
1 files changed, 193 insertions, 0 deletions
diff --git a/lib/sqlalchemy/testing/suite/test_cte.py b/lib/sqlalchemy/testing/suite/test_cte.py
new file mode 100644
index 000000000..cc72278e6
--- /dev/null
+++ b/lib/sqlalchemy/testing/suite/test_cte.py
@@ -0,0 +1,193 @@
+from .. import fixtures, config
+from ..assertions import eq_
+
+from sqlalchemy import Integer, String, select
+from sqlalchemy import ForeignKey
+from sqlalchemy import testing
+
+from ..schema import Table, Column
+
+
+class CTETest(fixtures.TablesTest):
+ __backend__ = True
+ __requires__ = 'ctes',
+
+ run_inserts = 'each'
+ run_deletes = 'each'
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table("some_table", metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)),
+ Column("parent_id", ForeignKey("some_table.id")))
+
+ Table("some_other_table", metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)),
+ Column("parent_id", Integer))
+
+ @classmethod
+ def insert_data(cls):
+ config.db.execute(
+ cls.tables.some_table.insert(),
+ [
+ {"id": 1, "data": "d1", "parent_id": None},
+ {"id": 2, "data": "d2", "parent_id": 1},
+ {"id": 3, "data": "d3", "parent_id": 1},
+ {"id": 4, "data": "d4", "parent_id": 3},
+ {"id": 5, "data": "d5", "parent_id": 3}
+ ]
+ )
+
+ def test_select_nonrecursive_round_trip(self):
+ some_table = self.tables.some_table
+
+ with config.db.connect() as conn:
+ cte = select([some_table]).where(
+ some_table.c.data.in_(["d2", "d3", "d4"])).cte("some_cte")
+ result = conn.execute(
+ select([cte.c.data]).where(cte.c.data.in_(["d4", "d5"]))
+ )
+ eq_(result.fetchall(), [("d4", )])
+
+ def test_select_recursive_round_trip(self):
+ some_table = self.tables.some_table
+
+ with config.db.connect() as conn:
+ cte = select([some_table]).where(
+ some_table.c.data.in_(["d2", "d3", "d4"])).cte(
+ "some_cte", recursive=True)
+
+ cte_alias = cte.alias("c1")
+ st1 = some_table.alias()
+ # note that SQL Server requires this to be UNION ALL,
+ # can't be UNION
+ cte = cte.union_all(
+ select([st1]).where(st1.c.id == cte_alias.c.parent_id)
+ )
+ result = conn.execute(
+ select([cte.c.data]).where(
+ cte.c.data != "d2").order_by(cte.c.data.desc())
+ )
+ eq_(
+ result.fetchall(),
+ [('d4',), ('d3',), ('d3',), ('d1',), ('d1',), ('d1',)]
+ )
+
+ def test_insert_from_select_round_trip(self):
+ some_table = self.tables.some_table
+ some_other_table = self.tables.some_other_table
+
+ with config.db.connect() as conn:
+ cte = select([some_table]).where(
+ some_table.c.data.in_(["d2", "d3", "d4"])
+ ).cte("some_cte")
+ conn.execute(
+ some_other_table.insert().from_select(
+ ["id", "data", "parent_id"],
+ select([cte])
+ )
+ )
+ eq_(
+ conn.execute(
+ select([some_other_table]).order_by(some_other_table.c.id)
+ ).fetchall(),
+ [(2, "d2", 1), (3, "d3", 1), (4, "d4", 3)]
+ )
+
+ @testing.requires.ctes_with_update_delete
+ @testing.requires.update_from
+ def test_update_from_round_trip(self):
+ some_table = self.tables.some_table
+ some_other_table = self.tables.some_other_table
+
+ with config.db.connect() as conn:
+ conn.execute(
+ some_other_table.insert().from_select(
+ ['id', 'data', 'parent_id'],
+ select([some_table])
+ )
+ )
+
+ cte = select([some_table]).where(
+ some_table.c.data.in_(["d2", "d3", "d4"])
+ ).cte("some_cte")
+ conn.execute(
+ some_other_table.update().values(parent_id=5).where(
+ some_other_table.c.data == cte.c.data
+ )
+ )
+ eq_(
+ conn.execute(
+ select([some_other_table]).order_by(some_other_table.c.id)
+ ).fetchall(),
+ [
+ (1, "d1", None), (2, "d2", 5),
+ (3, "d3", 5), (4, "d4", 5), (5, "d5", 3)
+ ]
+ )
+
+ @testing.requires.ctes_with_update_delete
+ @testing.requires.delete_from
+ def test_delete_from_round_trip(self):
+ some_table = self.tables.some_table
+ some_other_table = self.tables.some_other_table
+
+ with config.db.connect() as conn:
+ conn.execute(
+ some_other_table.insert().from_select(
+ ['id', 'data', 'parent_id'],
+ select([some_table])
+ )
+ )
+
+ cte = select([some_table]).where(
+ some_table.c.data.in_(["d2", "d3", "d4"])
+ ).cte("some_cte")
+ conn.execute(
+ some_other_table.delete().where(
+ some_other_table.c.data == cte.c.data
+ )
+ )
+ eq_(
+ conn.execute(
+ select([some_other_table]).order_by(some_other_table.c.id)
+ ).fetchall(),
+ [
+ (1, "d1", None), (5, "d5", 3)
+ ]
+ )
+
+ @testing.requires.ctes_with_update_delete
+ def test_delete_scalar_subq_round_trip(self):
+
+ some_table = self.tables.some_table
+ some_other_table = self.tables.some_other_table
+
+ with config.db.connect() as conn:
+ conn.execute(
+ some_other_table.insert().from_select(
+ ['id', 'data', 'parent_id'],
+ select([some_table])
+ )
+ )
+
+ cte = select([some_table]).where(
+ some_table.c.data.in_(["d2", "d3", "d4"])
+ ).cte("some_cte")
+ conn.execute(
+ some_other_table.delete().where(
+ some_other_table.c.data ==
+ select([cte.c.data]).where(
+ cte.c.id == some_other_table.c.id)
+ )
+ )
+ eq_(
+ conn.execute(
+ select([some_other_table]).order_by(some_other_table.c.id)
+ ).fetchall(),
+ [
+ (1, "d1", None), (5, "d5", 3)
+ ]
+ )