diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2023-05-05 09:16:10 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2023-05-05 09:20:58 -0400 |
| commit | dc60e7a7d35a470c09ce590f37e949ff8e8cdcde (patch) | |
| tree | 1b4a8eb5701ea62575fc975aa0a1700c553fb03a /test/orm/dml | |
| parent | db9a2caa43f0e8539bd1b3d8a2522f8018903605 (diff) | |
| download | sqlalchemy-dc60e7a7d35a470c09ce590f37e949ff8e8cdcde.tar.gz | |
add explicit step to set populate_existing for bulk insert
Fixed issue in new :ref:`orm_queryguide_upsert_returning` feature where the
``populate_existing`` execution option was not being propagated to the
loading option, preventing existing attributes from being refreshed
in-place.
Fixes: #9746
Change-Id: I3efcab644e2b5874c6b265d5313f353c051db629
Diffstat (limited to 'test/orm/dml')
| -rw-r--r-- | test/orm/dml/test_bulk_statements.py | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/test/orm/dml/test_bulk_statements.py b/test/orm/dml/test_bulk_statements.py index ab03b251d..af50ea045 100644 --- a/test/orm/dml/test_bulk_statements.py +++ b/test/orm/dml/test_bulk_statements.py @@ -302,6 +302,77 @@ class InsertStmtTest(testing.AssertsExecutionResults, fixtures.TestBase): else: eq_(result.first(), (10, expected_qs[0])) + @testing.variation("populate_existing", [True, False]) + @testing.requires.provisioned_upsert + def test_upsert_populate_existing(self, decl_base, populate_existing): + """test #9742""" + + class Employee(ComparableEntity, decl_base): + __tablename__ = "employee" + + uuid: Mapped[uuid.UUID] = mapped_column(primary_key=True) + user_name: Mapped[str] = mapped_column(nullable=False) + + decl_base.metadata.create_all(testing.db) + s = fixture_session() + + uuid1 = uuid.uuid4() + uuid2 = uuid.uuid4() + e1 = Employee(uuid=uuid1, user_name="e1 old name") + e2 = Employee(uuid=uuid2, user_name="e2 old name") + s.add_all([e1, e2]) + s.flush() + + stmt = provision.upsert( + config, + Employee, + (Employee,), + set_lambda=lambda inserted: {"user_name": inserted.user_name}, + ).values( + [ + dict(uuid=uuid1, user_name="e1 new name"), + dict(uuid=uuid2, user_name="e2 new name"), + ] + ) + if populate_existing: + rows = s.scalars( + stmt, execution_options={"populate_existing": True} + ) + # SPECIAL: before we actually receive the returning rows, + # the existing objects have not been updated yet + eq_(e1.user_name, "e1 old name") + eq_(e2.user_name, "e2 old name") + + eq_( + set(rows), + { + Employee(uuid=uuid1, user_name="e1 new name"), + Employee(uuid=uuid2, user_name="e2 new name"), + }, + ) + + # now they are updated + eq_(e1.user_name, "e1 new name") + eq_(e2.user_name, "e2 new name") + else: + # no populate existing + rows = s.scalars(stmt) + eq_(e1.user_name, "e1 old name") + eq_(e2.user_name, "e2 old name") + eq_( + set(rows), + { + Employee(uuid=uuid1, user_name="e1 old name"), + Employee(uuid=uuid2, user_name="e2 old name"), + }, + ) + eq_(e1.user_name, "e1 old name") + eq_(e2.user_name, "e2 old name") + s.commit() + s.expire_all() + eq_(e1.user_name, "e1 new name") + eq_(e2.user_name, "e2 new name") + class UpdateStmtTest(fixtures.TestBase): __backend__ = True |
