summaryrefslogtreecommitdiff
path: root/test/orm/dml
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2023-05-05 09:16:10 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2023-05-05 09:20:58 -0400
commitdc60e7a7d35a470c09ce590f37e949ff8e8cdcde (patch)
tree1b4a8eb5701ea62575fc975aa0a1700c553fb03a /test/orm/dml
parentdb9a2caa43f0e8539bd1b3d8a2522f8018903605 (diff)
downloadsqlalchemy-dc60e7a7d35a470c09ce590f37e949ff8e8cdcde.tar.gz
add explicit step to set populate_existing for bulk insert
Fixed issue in new :ref:`orm_queryguide_upsert_returning` feature where the ``populate_existing`` execution option was not being propagated to the loading option, preventing existing attributes from being refreshed in-place. Fixes: #9746 Change-Id: I3efcab644e2b5874c6b265d5313f353c051db629
Diffstat (limited to 'test/orm/dml')
-rw-r--r--test/orm/dml/test_bulk_statements.py71
1 files changed, 71 insertions, 0 deletions
diff --git a/test/orm/dml/test_bulk_statements.py b/test/orm/dml/test_bulk_statements.py
index ab03b251d..af50ea045 100644
--- a/test/orm/dml/test_bulk_statements.py
+++ b/test/orm/dml/test_bulk_statements.py
@@ -302,6 +302,77 @@ class InsertStmtTest(testing.AssertsExecutionResults, fixtures.TestBase):
else:
eq_(result.first(), (10, expected_qs[0]))
+ @testing.variation("populate_existing", [True, False])
+ @testing.requires.provisioned_upsert
+ def test_upsert_populate_existing(self, decl_base, populate_existing):
+ """test #9742"""
+
+ class Employee(ComparableEntity, decl_base):
+ __tablename__ = "employee"
+
+ uuid: Mapped[uuid.UUID] = mapped_column(primary_key=True)
+ user_name: Mapped[str] = mapped_column(nullable=False)
+
+ decl_base.metadata.create_all(testing.db)
+ s = fixture_session()
+
+ uuid1 = uuid.uuid4()
+ uuid2 = uuid.uuid4()
+ e1 = Employee(uuid=uuid1, user_name="e1 old name")
+ e2 = Employee(uuid=uuid2, user_name="e2 old name")
+ s.add_all([e1, e2])
+ s.flush()
+
+ stmt = provision.upsert(
+ config,
+ Employee,
+ (Employee,),
+ set_lambda=lambda inserted: {"user_name": inserted.user_name},
+ ).values(
+ [
+ dict(uuid=uuid1, user_name="e1 new name"),
+ dict(uuid=uuid2, user_name="e2 new name"),
+ ]
+ )
+ if populate_existing:
+ rows = s.scalars(
+ stmt, execution_options={"populate_existing": True}
+ )
+ # SPECIAL: before we actually receive the returning rows,
+ # the existing objects have not been updated yet
+ eq_(e1.user_name, "e1 old name")
+ eq_(e2.user_name, "e2 old name")
+
+ eq_(
+ set(rows),
+ {
+ Employee(uuid=uuid1, user_name="e1 new name"),
+ Employee(uuid=uuid2, user_name="e2 new name"),
+ },
+ )
+
+ # now they are updated
+ eq_(e1.user_name, "e1 new name")
+ eq_(e2.user_name, "e2 new name")
+ else:
+ # no populate existing
+ rows = s.scalars(stmt)
+ eq_(e1.user_name, "e1 old name")
+ eq_(e2.user_name, "e2 old name")
+ eq_(
+ set(rows),
+ {
+ Employee(uuid=uuid1, user_name="e1 old name"),
+ Employee(uuid=uuid2, user_name="e2 old name"),
+ },
+ )
+ eq_(e1.user_name, "e1 old name")
+ eq_(e2.user_name, "e2 old name")
+ s.commit()
+ s.expire_all()
+ eq_(e1.user_name, "e1 new name")
+ eq_(e2.user_name, "e2 new name")
+
class UpdateStmtTest(fixtures.TestBase):
__backend__ = True