summaryrefslogtreecommitdiff
path: root/test/ext
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-08-07 12:14:19 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-09-24 11:18:01 -0400
commita8029f5a7e3e376ec57f1614ab0294b717d53c05 (patch)
tree84b1a3b3a6d3f4c9d6e8054f9cdfa190344436cb /test/ext
parent2bcc97da424eef7db9a5d02f81d02344925415ee (diff)
downloadsqlalchemy-a8029f5a7e3e376ec57f1614ab0294b717d53c05.tar.gz
ORM bulk insert via execute
* ORM Insert now includes "bulk" mode that will run essentially the same process as session.bulk_insert_mappings; interprets the given list of values as ORM attributes for key names * ORM UPDATE has a similar feature, without RETURNING support, for session.bulk_update_mappings * Added support for upserts to do RETURNING ORM objects as well * ORM UPDATE/DELETE with list of parameters + WHERE criteria is a not implemented; use connection * ORM UPDATE/DELETE defaults to "auto" synchronize_session; use fetch if RETURNING is present, evaluate if not, as "fetch" is much more efficient (no expired object SELECT problem) and less error prone if RETURNING is available UPDATE: howver this is inefficient! please continue to use evaluate for simple cases, auto can move to fetch if criteria not evaluable * "Evaluate" criteria will now not preemptively unexpire and SELECT attributes that were individually expired. Instead, if evaluation of the criteria indicates that the necessary attrs were expired, we expire the object completely (delete) or expire the SET attrs unconditionally (update). This keeps the object in the same unloaded state where it will refresh those attrs on the next pass, for this generally unusual case. (originally #5664) * Core change! update/delete rowcount comes from len(rows) if RETURNING was used. SQLite at least otherwise did not support this. adjusted test_rowcount accordingly * ORM DELETE with a list of parameters at all is also a not implemented as this would imply "bulk", and there is no bulk_delete_mappings (could be, but we dont have that) * ORM insert().values() with single or multi-values translates key names based on ORM attribute names * ORM returning() implemented for insert, update, delete; explcit returning clauses now interpret rows in an ORM context, with support for qualifying loader options as well * session.bulk_insert_mappings() assigns polymorphic identity if not set. * explicit RETURNING + synchronize_session='fetch' is now supported with UPDATE and DELETE. * expanded return_defaults() to work with DELETE also. * added support for composite attributes to be present in the dictionaries used by bulk_insert_mappings and bulk_update_mappings, which is also the new ORM bulk insert/update feature, that will expand the composite values into their individual mapped attributes the way they'd be on a mapped instance. * bulk UPDATE supports "synchronize_session=evaluate", is the default. this does not apply to session.bulk_update_mappings, just the new version * both bulk UPDATE and bulk INSERT, the latter with or without RETURNING, support *heterogenous* parameter sets. session.bulk_insert/update_mappings did this, so this feature is maintained. now cursor result can be both horizontally and vertically spliced :) This is now a long story with a lot of options, which in itself is a problem to be able to document all of this in some way that makes sense. raising exceptions for use cases we haven't supported is pretty important here too, the tradition of letting unsupported things just not work is likely not a good idea at this point, though there are still many cases that aren't easily avoidable Fixes: #8360 Fixes: #7864 Fixes: #7865 Change-Id: Idf28379f8705e403a3c6a937f6a798a042ef2540
Diffstat (limited to 'test/ext')
-rw-r--r--test/ext/test_horizontal_shard.py212
-rw-r--r--test/ext/test_hybrid.py35
2 files changed, 74 insertions, 173 deletions
diff --git a/test/ext/test_horizontal_shard.py b/test/ext/test_horizontal_shard.py
index 7cc6a6f79..667f4bfb0 100644
--- a/test/ext/test_horizontal_shard.py
+++ b/test/ext/test_horizontal_shard.py
@@ -465,7 +465,11 @@ class ShardTest:
t = get_tokyo(sess2)
eq_(t.city, tokyo.city)
- def test_bulk_update_synchronize_evaluate(self):
+ @testing.combinations(
+ "fetch", "evaluate", "auto", argnames="synchronize_session"
+ )
+ @testing.combinations(True, False, argnames="legacy")
+ def test_orm_update_synchronize(self, synchronize_session, legacy):
sess = self._fixture_data()
eq_(
@@ -476,33 +480,25 @@ class ShardTest:
temps = sess.query(Report).all()
eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
- sess.query(Report).filter(Report.temperature >= 80).update(
- {"temperature": Report.temperature + 6},
- synchronize_session="evaluate",
- )
-
- eq_(
- set(row.temperature for row in sess.query(Report.temperature)),
- {86.0, 75.0, 91.0},
- )
-
- # test synchronize session as well
- eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0})
-
- def test_bulk_update_synchronize_fetch(self):
- sess = self._fixture_data()
-
- eq_(
- set(row.temperature for row in sess.query(Report.temperature)),
- {80.0, 75.0, 85.0},
- )
+ if legacy:
+ sess.query(Report).filter(Report.temperature >= 80).update(
+ {"temperature": Report.temperature + 6},
+ synchronize_session=synchronize_session,
+ )
+ else:
+ sess.execute(
+ update(Report)
+ .filter(Report.temperature >= 80)
+ .values(temperature=Report.temperature + 6)
+ .execution_options(synchronize_session=synchronize_session)
+ )
- temps = sess.query(Report).all()
- eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
+ # test synchronize session
+ def go():
+ eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0})
- sess.query(Report).filter(Report.temperature >= 80).update(
- {"temperature": Report.temperature + 6},
- synchronize_session="fetch",
+ self.assert_sql_count(
+ sess._ShardedSession__binds["north_america"], go, 0
)
eq_(
@@ -510,165 +506,41 @@ class ShardTest:
{86.0, 75.0, 91.0},
)
- # test synchronize session as well
- eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0})
-
- def test_bulk_delete_synchronize_evaluate(self):
- sess = self._fixture_data()
-
- temps = sess.query(Report).all()
- eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
-
- sess.query(Report).filter(Report.temperature >= 80).delete(
- synchronize_session="evaluate"
- )
-
- eq_(
- set(row.temperature for row in sess.query(Report.temperature)),
- {75.0},
- )
-
- # test synchronize session as well
- for t in temps:
- assert inspect(t).deleted is (t.temperature >= 80)
-
- def test_bulk_delete_synchronize_fetch(self):
+ @testing.combinations(
+ "fetch", "evaluate", "auto", argnames="synchronize_session"
+ )
+ @testing.combinations(True, False, argnames="legacy")
+ def test_orm_delete_synchronize(self, synchronize_session, legacy):
sess = self._fixture_data()
temps = sess.query(Report).all()
eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
- sess.query(Report).filter(Report.temperature >= 80).delete(
- synchronize_session="fetch"
- )
-
- eq_(
- set(row.temperature for row in sess.query(Report.temperature)),
- {75.0},
- )
-
- # test synchronize session as well
- for t in temps:
- assert inspect(t).deleted is (t.temperature >= 80)
-
- def test_bulk_update_future_synchronize_evaluate(self):
- sess = self._fixture_data()
-
- eq_(
- set(
- row.temperature
- for row in sess.execute(select(Report.temperature))
- ),
- {80.0, 75.0, 85.0},
- )
-
- temps = sess.execute(select(Report)).scalars().all()
- eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
-
- sess.execute(
- update(Report)
- .filter(Report.temperature >= 80)
- .values(
- {"temperature": Report.temperature + 6},
+ if legacy:
+ sess.query(Report).filter(Report.temperature >= 80).delete(
+ synchronize_session=synchronize_session
)
- .execution_options(synchronize_session="evaluate")
- )
-
- eq_(
- set(
- row.temperature
- for row in sess.execute(select(Report.temperature))
- ),
- {86.0, 75.0, 91.0},
- )
-
- # test synchronize session as well
- eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0})
-
- def test_bulk_update_future_synchronize_fetch(self):
- sess = self._fixture_data()
-
- eq_(
- set(
- row.temperature
- for row in sess.execute(select(Report.temperature))
- ),
- {80.0, 75.0, 85.0},
- )
-
- temps = sess.execute(select(Report)).scalars().all()
- eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
-
- # MARKMARK
- # omitting the criteria so that the UPDATE affects three out of
- # four shards
- sess.execute(
- update(Report)
- .values(
- {"temperature": Report.temperature + 6},
+ else:
+ sess.execute(
+ delete(Report)
+ .filter(Report.temperature >= 80)
+ .execution_options(synchronize_session=synchronize_session)
)
- .execution_options(synchronize_session="fetch")
- )
-
- eq_(
- set(
- row.temperature
- for row in sess.execute(select(Report.temperature))
- ),
- {86.0, 81.0, 91.0},
- )
-
- # test synchronize session as well
- eq_(set(t.temperature for t in temps), {86.0, 81.0, 91.0})
-
- def test_bulk_delete_future_synchronize_evaluate(self):
- sess = self._fixture_data()
-
- temps = sess.execute(select(Report)).scalars().all()
- eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
-
- sess.execute(
- delete(Report)
- .filter(Report.temperature >= 80)
- .execution_options(synchronize_session="evaluate")
- )
- eq_(
- set(
- row.temperature
- for row in sess.execute(select(Report.temperature))
- ),
- {75.0},
- )
-
- # test synchronize session as well
- for t in temps:
- assert inspect(t).deleted is (t.temperature >= 80)
-
- def test_bulk_delete_future_synchronize_fetch(self):
- sess = self._fixture_data()
-
- temps = sess.execute(select(Report)).scalars().all()
- eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
+ def go():
+ # test synchronize session
+ for t in temps:
+ assert inspect(t).deleted is (t.temperature >= 80)
- sess.execute(
- delete(Report)
- .filter(Report.temperature >= 80)
- .execution_options(synchronize_session="fetch")
+ self.assert_sql_count(
+ sess._ShardedSession__binds["north_america"], go, 0
)
eq_(
- set(
- row.temperature
- for row in sess.execute(select(Report.temperature))
- ),
+ set(row.temperature for row in sess.query(Report.temperature)),
{75.0},
)
- # test synchronize session as well
- for t in temps:
- assert inspect(t).deleted is (t.temperature >= 80)
-
class DistinctEngineShardTest(ShardTest, fixtures.MappedTest):
def _init_dbs(self):
diff --git a/test/ext/test_hybrid.py b/test/ext/test_hybrid.py
index de5f89b25..0cba8f3a1 100644
--- a/test/ext/test_hybrid.py
+++ b/test/ext/test_hybrid.py
@@ -3,6 +3,7 @@ from decimal import Decimal
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
+from sqlalchemy import insert
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
@@ -1017,15 +1018,43 @@ class BulkUpdateTest(fixtures.DeclarativeMappedTest, AssertsCompiledSQL):
params={"first_name": "Dr."},
)
- def test_update_expr(self):
+ @testing.combinations("attr", "str", "kwarg", argnames="keytype")
+ def test_update_expr(self, keytype):
Person = self.classes.Person
- statement = update(Person).values({Person.name: "Dr. No"})
+ if keytype == "attr":
+ statement = update(Person).values({Person.name: "Dr. No"})
+ elif keytype == "str":
+ statement = update(Person).values({"name": "Dr. No"})
+ elif keytype == "kwarg":
+ statement = update(Person).values(name="Dr. No")
+ else:
+ assert False
self.assert_compile(
statement,
"UPDATE person SET first_name=:first_name, last_name=:last_name",
- params={"first_name": "Dr.", "last_name": "No"},
+ checkparams={"first_name": "Dr.", "last_name": "No"},
+ )
+
+ @testing.combinations("attr", "str", "kwarg", argnames="keytype")
+ def test_insert_expr(self, keytype):
+ Person = self.classes.Person
+
+ if keytype == "attr":
+ statement = insert(Person).values({Person.name: "Dr. No"})
+ elif keytype == "str":
+ statement = insert(Person).values({"name": "Dr. No"})
+ elif keytype == "kwarg":
+ statement = insert(Person).values(name="Dr. No")
+ else:
+ assert False
+
+ self.assert_compile(
+ statement,
+ "INSERT INTO person (first_name, last_name) VALUES "
+ "(:first_name, :last_name)",
+ checkparams={"first_name": "Dr.", "last_name": "No"},
)
# these tests all run two UPDATES to assert that caching is not