summaryrefslogtreecommitdiff
path: root/test/sql/test_insert_exec.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2023-04-05 11:58:52 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2023-04-21 11:30:40 -0400
commitcf6872d3bdf1a8a9613e853694acc2b1e6f06f51 (patch)
tree3a4ee41ab8b48aea7ac1e275c2f553763ec28dad /test/sql/test_insert_exec.py
parent63f51491c5f0cb22883c800a065d7c4b4c54774e (diff)
downloadsqlalchemy-cf6872d3bdf1a8a9613e853694acc2b1e6f06f51.tar.gz
add deterministic imv returning ordering using sentinel columns
Repaired a major shortcoming which was identified in the :ref:`engine_insertmanyvalues` performance optimization feature first introduced in the 2.0 series. This was a continuation of the change in 2.0.9 which disabled the SQL Server version of the feature due to a reliance in the ORM on apparent row ordering that is not guaranteed to take place. The fix applies new logic to all "insertmanyvalues" operations, which takes effect when a new parameter :paramref:`_dml.Insert.returning.sort_by_parameter_order` on the :meth:`_dml.Insert.returning` or :meth:`_dml.UpdateBase.return_defaults` methods, that through a combination of alternate SQL forms, direct correspondence of client side parameters, and in some cases downgrading to running row-at-a-time, will apply sorting to each batch of returned rows using correspondence to primary key or other unique values in each row which can be correlated to the input data. Performance impact is expected to be minimal as nearly all common primary key scenarios are suitable for parameter-ordered batching to be achieved for all backends other than SQLite, while "row-at-a-time" mode operates with a bare minimum of Python overhead compared to the very heavyweight approaches used in the 1.x series. For SQLite, there is no difference in performance when "row-at-a-time" mode is used. It's anticipated that with an efficient "row-at-a-time" INSERT with RETURNING batching capability, the "insertmanyvalues" feature can be later be more easily generalized to third party backends that include RETURNING support but not necessarily easy ways to guarantee a correspondence with parameter order. Fixes: #9618 References: #9603 Change-Id: I1d79353f5f19638f752936ba1c35e4dc235a8b7c
Diffstat (limited to 'test/sql/test_insert_exec.py')
-rw-r--r--test/sql/test_insert_exec.py1625
1 files changed, 1624 insertions, 1 deletions
diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py
index 3b5a1856c..f545671e7 100644
--- a/test/sql/test_insert_exec.py
+++ b/test/sql/test_insert_exec.py
@@ -1,11 +1,19 @@
+import contextlib
+import functools
import itertools
+import uuid
from sqlalchemy import and_
+from sqlalchemy import ARRAY
from sqlalchemy import bindparam
+from sqlalchemy import DateTime
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
+from sqlalchemy import Identity
+from sqlalchemy import insert
+from sqlalchemy import insert_sentinel
from sqlalchemy import INT
from sqlalchemy import Integer
from sqlalchemy import literal
@@ -14,16 +22,22 @@ from sqlalchemy import Sequence
from sqlalchemy import sql
from sqlalchemy import String
from sqlalchemy import testing
+from sqlalchemy import TypeDecorator
+from sqlalchemy import Uuid
from sqlalchemy import VARCHAR
from sqlalchemy.engine import cursor as _cursor
+from sqlalchemy.sql.compiler import InsertmanyvaluesSentinelOpts
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import config
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_raises_message
+from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import mock
from sqlalchemy.testing import provision
+from sqlalchemy.testing.fixtures import insertmanyvalues_fixture
from sqlalchemy.testing.provision import normalize_sequence
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
@@ -924,7 +938,7 @@ class InsertManyValuesTest(fixtures.RemovesEvents, fixtures.TablesTest):
config,
data,
(data,),
- lambda inserted: {"x": inserted.x + " upserted"},
+ set_lambda=lambda inserted: {"x": inserted.x + " upserted"},
)
result = connection.execute(stmt, upsert_data)
@@ -1169,3 +1183,1612 @@ class InsertManyValuesTest(fixtures.RemovesEvents, fixtures.TablesTest):
"INSERT..RETURNING when executemany",
):
conn.execute(stmt.returning(t.c.id), data)
+
+
+class IMVSentinelTest(fixtures.TestBase):
+ __backend__ = True
+
+ __requires__ = ("insert_returning",)
+
+ def _expect_downgrade_warnings(
+ self,
+ *,
+ warn_for_downgrades,
+ sort_by_parameter_order,
+ separate_sentinel=False,
+ server_autoincrement=False,
+ client_side_pk=False,
+ autoincrement_is_sequence=False,
+ connection=None,
+ ):
+
+ if connection:
+ dialect = connection.dialect
+ else:
+ dialect = testing.db.dialect
+
+ if (
+ sort_by_parameter_order
+ and warn_for_downgrades
+ and dialect.use_insertmanyvalues
+ ):
+
+ if (
+ not separate_sentinel
+ and (
+ server_autoincrement
+ and (
+ not (
+ dialect.insertmanyvalues_implicit_sentinel # noqa: E501
+ & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
+ )
+ or (
+ autoincrement_is_sequence
+ and not (
+ dialect.insertmanyvalues_implicit_sentinel # noqa: E501
+ & InsertmanyvaluesSentinelOpts.SEQUENCE
+ )
+ )
+ )
+ )
+ or (
+ not separate_sentinel
+ and not server_autoincrement
+ and not client_side_pk
+ )
+ ):
+ return expect_warnings(
+ "Batches were downgraded",
+ raise_on_any_unexpected=True,
+ )
+
+ return contextlib.nullcontext()
+
+ @testing.variation
+ def sort_by_parameter_order(self):
+ return [True, False]
+
+ @testing.variation
+ def warn_for_downgrades(self):
+ return [True, False]
+
+ @testing.variation
+ def randomize_returning(self):
+ return [True, False]
+
+ @testing.requires.insertmanyvalues
+ def test_fixture_randomizing(self, connection, metadata):
+ t = Table(
+ "t",
+ metadata,
+ Column("id", Integer, Identity(), primary_key=True),
+ Column("data", String(50)),
+ )
+ metadata.create_all(connection)
+
+ insertmanyvalues_fixture(connection, randomize_rows=True)
+
+ results = set()
+
+ for i in range(15):
+ result = connection.execute(
+ insert(t).returning(t.c.data, sort_by_parameter_order=False),
+ [{"data": "d1"}, {"data": "d2"}, {"data": "d3"}],
+ )
+
+ hashed_result = tuple(result.all())
+ results.add(hashed_result)
+ if len(results) > 1:
+ return
+ else:
+ assert False, "got same order every time for 15 tries"
+
+ @testing.only_on("postgresql>=13")
+ @testing.variation("downgrade", [True, False])
+ def test_fixture_downgraded(self, connection, metadata, downgrade):
+ t = Table(
+ "t",
+ metadata,
+ Column(
+ "id",
+ Uuid(),
+ server_default=func.gen_random_uuid(),
+ primary_key=True,
+ ),
+ Column("data", String(50)),
+ )
+ metadata.create_all(connection)
+
+ r1 = connection.execute(
+ insert(t).returning(t.c.data, sort_by_parameter_order=True),
+ [{"data": "d1"}, {"data": "d2"}, {"data": "d3"}],
+ )
+ eq_(r1.all(), [("d1",), ("d2",), ("d3",)])
+
+ if downgrade:
+ insertmanyvalues_fixture(connection, warn_on_downgraded=True)
+
+ with self._expect_downgrade_warnings(
+ warn_for_downgrades=True,
+ sort_by_parameter_order=True,
+ ):
+ connection.execute(
+ insert(t).returning(
+ t.c.data, sort_by_parameter_order=True
+ ),
+ [{"data": "d1"}, {"data": "d2"}, {"data": "d3"}],
+ )
+ else:
+ # run a plain test to help ensure the fixture doesn't leak to
+ # other tests
+ r1 = connection.execute(
+ insert(t).returning(t.c.data, sort_by_parameter_order=True),
+ [{"data": "d1"}, {"data": "d2"}, {"data": "d3"}],
+ )
+ eq_(r1.all(), [("d1",), ("d2",), ("d3",)])
+
+ @testing.variation(
+ "sequence_type",
+ [
+ ("sequence", testing.requires.sequences),
+ ("identity", testing.requires.identity_columns),
+ ],
+ )
+ @testing.variation("increment", ["positive", "negative", "implicit"])
+ @testing.variation("explicit_sentinel", [True, False])
+ def test_invalid_identities(
+ self,
+ metadata,
+ connection,
+ warn_for_downgrades,
+ randomize_returning,
+ sort_by_parameter_order,
+ sequence_type: testing.Variation,
+ increment: testing.Variation,
+ explicit_sentinel,
+ ):
+ if sequence_type.sequence:
+ seq_cls = functools.partial(Sequence, name="t1_id_seq")
+ elif sequence_type.identity:
+ seq_cls = Identity
+ else:
+ sequence_type.fail()
+
+ if increment.implicit:
+ sequence = seq_cls(start=1)
+ elif increment.positive:
+ sequence = seq_cls(start=1, increment=1)
+ elif increment.negative:
+ sequence = seq_cls(start=-1, increment=-1)
+ else:
+ increment.fail()
+
+ t1 = Table(
+ "t1",
+ metadata,
+ Column(
+ "id",
+ Integer,
+ sequence,
+ primary_key=True,
+ insert_sentinel=bool(explicit_sentinel),
+ ),
+ Column("data", String(50)),
+ )
+ metadata.create_all(connection)
+
+ fixtures.insertmanyvalues_fixture(
+ connection,
+ randomize_rows=bool(randomize_returning),
+ warn_on_downgraded=bool(warn_for_downgrades),
+ )
+
+ stmt = insert(t1).returning(
+ t1.c.id,
+ t1.c.data,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ )
+ data = [{"data": f"d{i}"} for i in range(10)]
+
+ use_imv = testing.db.dialect.use_insertmanyvalues
+ if (
+ use_imv
+ and increment.negative
+ and explicit_sentinel
+ and sort_by_parameter_order
+ ):
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ rf"Can't use "
+ rf"{'SEQUENCE' if sequence_type.sequence else 'IDENTITY'} "
+ rf"default with negative increment",
+ ):
+ connection.execute(stmt, data)
+ return
+ elif (
+ use_imv
+ and explicit_sentinel
+ and sort_by_parameter_order
+ and sequence_type.sequence
+ and not (
+ testing.db.dialect.insertmanyvalues_implicit_sentinel
+ & InsertmanyvaluesSentinelOpts.SEQUENCE
+ )
+ ):
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ r"Column t1.id can't be explicitly marked as a sentinel "
+ r"column .* as the particular type of default generation",
+ ):
+ connection.execute(stmt, data)
+ return
+
+ with self._expect_downgrade_warnings(
+ warn_for_downgrades=warn_for_downgrades,
+ sort_by_parameter_order=sort_by_parameter_order,
+ server_autoincrement=not increment.negative,
+ autoincrement_is_sequence=sequence_type.sequence,
+ ):
+ result = connection.execute(stmt, data)
+
+ if sort_by_parameter_order:
+ coll = list
+ else:
+ coll = set
+
+ if increment.negative:
+ expected_data = [(-1 - i, f"d{i}") for i in range(10)]
+ else:
+ expected_data = [(i + 1, f"d{i}") for i in range(10)]
+
+ eq_(
+ coll(result),
+ coll(expected_data),
+ )
+
+ @testing.combinations(
+ Integer(),
+ String(50),
+ (ARRAY(Integer()), testing.requires.array_type),
+ DateTime(),
+ Uuid(),
+ argnames="datatype",
+ )
+ def test_inserts_w_all_nulls(
+ self, connection, metadata, sort_by_parameter_order, datatype
+ ):
+ """this test is geared towards the INSERT..SELECT VALUES case,
+ where if the VALUES have all NULL for some column, PostgreSQL assumes
+ the datatype must be TEXT and throws for other table datatypes. So an
+ additional layer of casts is applied to the SELECT p0,p1, p2... part of
+ the statement for all datatypes unconditionally. Even though the VALUES
+ clause also has bind casts for selected datatypes, this NULL handling
+ is needed even for simple datatypes. We'd prefer not to render bind
+ casts for all possible datatypes as that affects other kinds of
+ statements as well and also is very verbose for insertmanyvalues.
+
+
+ """
+ t = Table(
+ "t",
+ metadata,
+ Column("id", Integer, Identity(), primary_key=True),
+ Column("data", datatype),
+ )
+ metadata.create_all(connection)
+ result = connection.execute(
+ insert(t).returning(
+ t.c.id,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ ),
+ [{"data": None}, {"data": None}, {"data": None}],
+ )
+ eq_(set(result), {(1,), (2,), (3,)})
+
+ @testing.variation("pk_type", ["autoinc", "clientside"])
+ @testing.variation("add_sentinel", ["none", "clientside", "sentinel"])
+ def test_imv_w_additional_values(
+ self,
+ metadata,
+ connection,
+ sort_by_parameter_order,
+ pk_type: testing.Variation,
+ randomize_returning,
+ warn_for_downgrades,
+ add_sentinel,
+ ):
+ if pk_type.autoinc:
+ pk_col = Column("id", Integer(), Identity(), primary_key=True)
+ elif pk_type.clientside:
+ pk_col = Column("id", Uuid(), default=uuid.uuid4, primary_key=True)
+ else:
+ pk_type.fail()
+
+ if add_sentinel.clientside:
+ extra_col = insert_sentinel(
+ "sentinel", type_=Uuid(), default=uuid.uuid4
+ )
+ elif add_sentinel.sentinel:
+ extra_col = insert_sentinel("sentinel")
+ else:
+ extra_col = Column("sentinel", Integer())
+
+ t1 = Table(
+ "t1",
+ metadata,
+ pk_col,
+ Column("data", String(30)),
+ Column("moredata", String(30)),
+ extra_col,
+ Column(
+ "has_server_default",
+ String(50),
+ server_default="some_server_default",
+ ),
+ )
+ metadata.create_all(connection)
+
+ fixtures.insertmanyvalues_fixture(
+ connection,
+ randomize_rows=bool(randomize_returning),
+ warn_on_downgraded=bool(warn_for_downgrades),
+ )
+
+ stmt = (
+ insert(t1)
+ .values(moredata="more data")
+ .returning(
+ t1.c.data,
+ t1.c.moredata,
+ t1.c.has_server_default,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ )
+ )
+ data = [{"data": f"d{i}"} for i in range(10)]
+
+ with self._expect_downgrade_warnings(
+ warn_for_downgrades=warn_for_downgrades,
+ sort_by_parameter_order=sort_by_parameter_order,
+ separate_sentinel=not add_sentinel.none,
+ server_autoincrement=pk_type.autoinc,
+ client_side_pk=pk_type.clientside,
+ ):
+ result = connection.execute(stmt, data)
+
+ if sort_by_parameter_order:
+ coll = list
+ else:
+ coll = set
+
+ eq_(
+ coll(result),
+ coll(
+ [
+ (f"d{i}", "more data", "some_server_default")
+ for i in range(10)
+ ]
+ ),
+ )
+
+ def test_sentinel_incorrect_rowcount(
+ self, metadata, connection, sort_by_parameter_order
+ ):
+ """test assertions to ensure sentinel values don't have duplicates"""
+
+ uuids = [uuid.uuid4() for i in range(10)]
+
+ # make some dupes
+ uuids[3] = uuids[5]
+ uuids[9] = uuids[5]
+
+ t1 = Table(
+ "data",
+ metadata,
+ Column("id", Integer, Identity(), primary_key=True),
+ Column("data", String(50)),
+ insert_sentinel(
+ "uuids",
+ Uuid(),
+ default=functools.partial(next, iter(uuids)),
+ ),
+ )
+
+ metadata.create_all(connection)
+
+ stmt = insert(t1).returning(
+ t1.c.data,
+ t1.c.uuids,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ )
+ data = [{"data": f"d{i}"} for i in range(10)]
+
+ if testing.db.dialect.use_insertmanyvalues and sort_by_parameter_order:
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ "Sentinel-keyed result set did not produce correct "
+ "number of rows 10; produced 8.",
+ ):
+ connection.execute(stmt, data)
+ else:
+ result = connection.execute(stmt, data)
+ eq_(
+ set(result.all()),
+ {(f"d{i}", uuids[i]) for i in range(10)},
+ )
+
+ @testing.variation("resolve_sentinel_values", [True, False])
+ def test_sentinel_cant_match_keys(
+ self,
+ metadata,
+ connection,
+ sort_by_parameter_order,
+ resolve_sentinel_values,
+ ):
+ """test assertions to ensure sentinel values passed in parameter
+ structures can be identified when they come back in cursor.fetchall().
+
+ Values that are further modified by the database driver or by
+ SQL expressions (as in the case below) before being INSERTed
+ won't match coming back out, so datatypes need to implement
+ _sentinel_value_resolver() if this is the case.
+
+ """
+
+ class UnsymmetricDataType(TypeDecorator):
+ cache_ok = True
+ impl = String
+
+ def bind_expression(self, bindparam):
+ return func.lower(bindparam)
+
+ if resolve_sentinel_values:
+
+ def _sentinel_value_resolver(self, dialect):
+ def fix_sentinels(value):
+ return value.lower()
+
+ return fix_sentinels
+
+ t1 = Table(
+ "data",
+ metadata,
+ Column("id", Integer, Identity(), primary_key=True),
+ Column("data", String(50)),
+ insert_sentinel("unsym", UnsymmetricDataType(10)),
+ )
+
+ metadata.create_all(connection)
+
+ stmt = insert(t1).returning(
+ t1.c.data,
+ t1.c.unsym,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ )
+ data = [{"data": f"d{i}", "unsym": f"UPPER_d{i}"} for i in range(10)]
+
+ if (
+ testing.db.dialect.use_insertmanyvalues
+ and sort_by_parameter_order
+ and not resolve_sentinel_values
+ ):
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ r"Can't match sentinel values in result set to parameter "
+ r"sets; key 'UPPER_d.' was not found.",
+ ):
+ connection.execute(stmt, data)
+ else:
+ result = connection.execute(stmt, data)
+ eq_(
+ set(result.all()),
+ {(f"d{i}", f"upper_d{i}") for i in range(10)},
+ )
+
+ @testing.variation("add_insert_sentinel", [True, False])
+ def test_sentinel_insert_default_pk_only(
+ self,
+ metadata,
+ connection,
+ sort_by_parameter_order,
+ add_insert_sentinel,
+ ):
+ t1 = Table(
+ "data",
+ metadata,
+ Column(
+ "id",
+ Integer,
+ Identity(),
+ insert_sentinel=bool(add_insert_sentinel),
+ primary_key=True,
+ ),
+ Column("data", String(50)),
+ )
+
+ metadata.create_all(connection)
+
+ fixtures.insertmanyvalues_fixture(
+ connection, randomize_rows=True, warn_on_downgraded=False
+ )
+
+ stmt = insert(t1).returning(
+ t1.c.id,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ )
+ data = [{} for i in range(3)]
+
+ if (
+ testing.db.dialect.use_insertmanyvalues
+ and add_insert_sentinel
+ and sort_by_parameter_order
+ and not (
+ testing.db.dialect.insertmanyvalues_implicit_sentinel
+ & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
+ )
+ ):
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ "Column data.id can't be explicitly marked as a "
+ f"sentinel column when using the {testing.db.dialect.name} "
+ "dialect",
+ ):
+ connection.execute(stmt, data)
+ return
+ else:
+ result = connection.execute(stmt, data)
+
+ if sort_by_parameter_order:
+ # if we used a client side default function, or we had no sentinel
+ # at all, we're sorted
+ coll = list
+ else:
+ # otherwise we are not, we randomized the order in any case
+ coll = set
+
+ eq_(
+ coll(result),
+ coll(
+ [
+ (1,),
+ (2,),
+ (3,),
+ ]
+ ),
+ )
+
+ @testing.only_on("postgresql>=13")
+ @testing.variation("default_type", ["server_side", "client_side"])
+ @testing.variation("add_insert_sentinel", [True, False])
+ def test_no_sentinel_on_non_int_ss_function(
+ self,
+ metadata,
+ connection,
+ add_insert_sentinel,
+ default_type,
+ sort_by_parameter_order,
+ ):
+
+ t1 = Table(
+ "data",
+ metadata,
+ Column(
+ "id",
+ Uuid(),
+ server_default=func.gen_random_uuid()
+ if default_type.server_side
+ else None,
+ default=uuid.uuid4 if default_type.client_side else None,
+ primary_key=True,
+ insert_sentinel=bool(add_insert_sentinel),
+ ),
+ Column("data", String(50)),
+ )
+
+ metadata.create_all(connection)
+
+ fixtures.insertmanyvalues_fixture(
+ connection, randomize_rows=True, warn_on_downgraded=False
+ )
+
+ stmt = insert(t1).returning(
+ t1.c.data,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ )
+ data = [
+ {"data": "d1"},
+ {"data": "d2"},
+ {"data": "d3"},
+ ]
+
+ if (
+ default_type.server_side
+ and add_insert_sentinel
+ and sort_by_parameter_order
+ ):
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ r"Column data.id can't be a sentinel column because it uses "
+ r"an explicit server side default that's not the Identity\(\)",
+ ):
+ connection.execute(stmt, data)
+ return
+ else:
+ result = connection.execute(stmt, data)
+
+ if sort_by_parameter_order:
+ # if we used a client side default function, or we had no sentinel
+ # at all, we're sorted
+ coll = list
+ else:
+ # otherwise we are not, we randomized the order in any case
+ coll = set
+
+ eq_(
+ coll(result),
+ coll(
+ [
+ ("d1",),
+ ("d2",),
+ ("d3",),
+ ]
+ ),
+ )
+
+ @testing.variation(
+ "pk_type",
+ [
+ ("plain_autoinc", testing.requires.autoincrement_without_sequence),
+ ("sequence", testing.requires.sequences),
+ ("identity", testing.requires.identity_columns),
+ ],
+ )
+ @testing.variation(
+ "sentinel",
+ [
+ "none", # passes because we automatically downgrade
+ # for no sentinel col
+ "implicit_not_omitted",
+ "implicit_omitted",
+ "explicit",
+ "explicit_but_nullable",
+ "default_uuid",
+ "default_string_uuid",
+ ("identity", testing.requires.multiple_identity_columns),
+ ("sequence", testing.requires.sequences),
+ ],
+ )
+ def test_sentinel_col_configurations(
+ self,
+ pk_type: testing.Variation,
+ sentinel: testing.Variation,
+ sort_by_parameter_order,
+ randomize_returning,
+ metadata,
+ connection,
+ ):
+
+ if pk_type.plain_autoinc:
+ pk_col = Column("id", Integer, primary_key=True)
+ elif pk_type.sequence:
+ pk_col = Column(
+ "id",
+ Integer,
+ Sequence("result_id_seq", start=1),
+ primary_key=True,
+ )
+ elif pk_type.identity:
+ pk_col = Column("id", Integer, Identity(), primary_key=True)
+ else:
+ pk_type.fail()
+
+ if sentinel.implicit_not_omitted or sentinel.implicit_omitted:
+ _sentinel = insert_sentinel(
+ "sentinel",
+ omit_from_statements=bool(sentinel.implicit_omitted),
+ )
+ elif sentinel.explicit:
+ _sentinel = Column(
+ "some_uuid", Uuid(), nullable=False, insert_sentinel=True
+ )
+ elif sentinel.explicit_but_nullable:
+ _sentinel = Column("some_uuid", Uuid(), insert_sentinel=True)
+ elif sentinel.default_uuid or sentinel.default_string_uuid:
+ _sentinel = Column(
+ "some_uuid",
+ Uuid(native_uuid=bool(sentinel.default_uuid)),
+ insert_sentinel=True,
+ default=uuid.uuid4,
+ )
+ elif sentinel.identity:
+ _sentinel = Column(
+ "some_identity",
+ Integer,
+ Identity(),
+ insert_sentinel=True,
+ )
+ elif sentinel.sequence:
+ _sentinel = Column(
+ "some_identity",
+ Integer,
+ Sequence("some_id_seq", start=1),
+ insert_sentinel=True,
+ )
+ else:
+ _sentinel = Column("some_uuid", Uuid())
+
+ t = Table("t", metadata, pk_col, Column("data", String(50)), _sentinel)
+
+ metadata.create_all(connection)
+
+ fixtures.insertmanyvalues_fixture(
+ connection,
+ randomize_rows=bool(randomize_returning),
+ warn_on_downgraded=True,
+ )
+
+ stmt = insert(t).returning(
+ pk_col,
+ t.c.data,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ )
+ if sentinel.explicit:
+ data = [
+ {"data": f"d{i}", "some_uuid": uuid.uuid4()}
+ for i in range(150)
+ ]
+ else:
+ data = [{"data": f"d{i}"} for i in range(150)]
+
+ expect_sentinel_use = (
+ sort_by_parameter_order
+ and testing.db.dialect.insert_returning
+ and testing.db.dialect.use_insertmanyvalues
+ )
+
+ if sentinel.explicit_but_nullable and expect_sentinel_use:
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ "Column t.some_uuid has been marked as a sentinel column "
+ "with no default generation function; it at least needs to "
+ "be marked nullable=False",
+ ):
+ connection.execute(stmt, data)
+ return
+
+ elif (
+ expect_sentinel_use
+ and sentinel.sequence
+ and not (
+ testing.db.dialect.insertmanyvalues_implicit_sentinel
+ & InsertmanyvaluesSentinelOpts.SEQUENCE
+ )
+ ):
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ "Column t.some_identity can't be explicitly marked as a "
+ f"sentinel column when using the {testing.db.dialect.name} "
+ "dialect",
+ ):
+ connection.execute(stmt, data)
+ return
+
+ elif (
+ sentinel.none
+ and expect_sentinel_use
+ and stmt.compile(
+ dialect=testing.db.dialect
+ )._get_sentinel_column_for_table(t)
+ is None
+ ):
+ with expect_warnings(
+ "Batches were downgraded for sorted INSERT",
+ raise_on_any_unexpected=True,
+ ):
+ result = connection.execute(stmt, data)
+ else:
+ result = connection.execute(stmt, data)
+
+ if sort_by_parameter_order:
+ eq_(list(result), [(i + 1, f"d{i}") for i in range(150)])
+ else:
+ eq_(set(result), {(i + 1, f"d{i}") for i in range(150)})
+
+ @testing.variation(
+ "return_type", ["include_sentinel", "default_only", "return_defaults"]
+ )
+ @testing.variation("add_sentinel_flag_to_col", [True, False])
+ def test_sentinel_on_non_autoinc_primary_key(
+ self,
+ metadata,
+ connection,
+ return_type: testing.Variation,
+ sort_by_parameter_order,
+ randomize_returning,
+ add_sentinel_flag_to_col,
+ ):
+ uuids = [uuid.uuid4() for i in range(10)]
+ _some_uuids = iter(uuids)
+
+ t1 = Table(
+ "data",
+ metadata,
+ Column(
+ "id",
+ Uuid(),
+ default=functools.partial(next, _some_uuids),
+ primary_key=True,
+ insert_sentinel=bool(add_sentinel_flag_to_col),
+ ),
+ Column("data", String(50)),
+ Column(
+ "has_server_default",
+ String(30),
+ server_default="some_server_default",
+ ),
+ )
+
+ fixtures.insertmanyvalues_fixture(
+ connection,
+ randomize_rows=bool(randomize_returning),
+ warn_on_downgraded=True,
+ )
+
+ if sort_by_parameter_order:
+ collection_cls = list
+ else:
+ collection_cls = set
+
+ metadata.create_all(connection)
+
+ if sort_by_parameter_order:
+ kw = {"sort_by_parameter_order": True}
+ else:
+ kw = {}
+
+ if return_type.include_sentinel:
+ stmt = t1.insert().returning(
+ t1.c.id, t1.c.data, t1.c.has_server_default, **kw
+ )
+ elif return_type.default_only:
+ stmt = t1.insert().returning(
+ t1.c.data, t1.c.has_server_default, **kw
+ )
+ elif return_type.return_defaults:
+ stmt = t1.insert().return_defaults(**kw)
+
+ else:
+ return_type.fail()
+
+ r = connection.execute(
+ stmt,
+ [{"data": f"d{i}"} for i in range(1, 6)],
+ )
+
+ if return_type.include_sentinel:
+ eq_(r.keys(), ["id", "data", "has_server_default"])
+ eq_(
+ collection_cls(r),
+ collection_cls(
+ [
+ (uuids[i], f"d{i+1}", "some_server_default")
+ for i in range(5)
+ ]
+ ),
+ )
+ elif return_type.default_only:
+ eq_(r.keys(), ["data", "has_server_default"])
+ eq_(
+ collection_cls(r),
+ collection_cls(
+ [
+ (
+ f"d{i+1}",
+ "some_server_default",
+ )
+ for i in range(5)
+ ]
+ ),
+ )
+ elif return_type.return_defaults:
+ eq_(r.keys(), ["has_server_default"])
+ eq_(r.inserted_primary_key_rows, [(uuids[i],) for i in range(5)])
+ eq_(
+ r.returned_defaults_rows,
+ [
+ ("some_server_default",),
+ ("some_server_default",),
+ ("some_server_default",),
+ ("some_server_default",),
+ ("some_server_default",),
+ ],
+ )
+ eq_(r.all(), [])
+ else:
+ return_type.fail()
+
+ def test_client_composite_pk(
+ self,
+ metadata,
+ connection,
+ randomize_returning,
+ sort_by_parameter_order,
+ warn_for_downgrades,
+ ):
+ uuids = [uuid.uuid4() for i in range(10)]
+
+ t1 = Table(
+ "data",
+ metadata,
+ Column(
+ "id1",
+ Uuid(),
+ default=functools.partial(next, iter(uuids)),
+ primary_key=True,
+ ),
+ Column(
+ "id2",
+ # note this is testing that plain populated PK cols
+ # also qualify as sentinels since they have to be there
+ String(30),
+ primary_key=True,
+ ),
+ Column("data", String(50)),
+ Column(
+ "has_server_default",
+ String(30),
+ server_default="some_server_default",
+ ),
+ )
+ metadata.create_all(connection)
+
+ fixtures.insertmanyvalues_fixture(
+ connection,
+ randomize_rows=bool(randomize_returning),
+ warn_on_downgraded=bool(warn_for_downgrades),
+ )
+
+ result = connection.execute(
+ insert(t1).returning(
+ t1.c.id1,
+ t1.c.id2,
+ t1.c.data,
+ t1.c.has_server_default,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ ),
+ [{"id2": f"id{i}", "data": f"d{i}"} for i in range(10)],
+ )
+
+ if sort_by_parameter_order:
+ coll = list
+ else:
+ coll = set
+
+ eq_(
+ coll(result),
+ coll(
+ [
+ (uuids[i], f"id{i}", f"d{i}", "some_server_default")
+ for i in range(10)
+ ]
+ ),
+ )
+
+ @testing.variation("add_sentinel", [True, False])
+ @testing.variation(
+ "set_identity", [(True, testing.requires.identity_columns), False]
+ )
+ def test_no_pk(
+ self,
+ metadata,
+ connection,
+ randomize_returning,
+ sort_by_parameter_order,
+ warn_for_downgrades,
+ add_sentinel,
+ set_identity,
+ ):
+ if set_identity:
+ id_col = Column("id", Integer(), Identity())
+ else:
+ id_col = Column("id", Integer())
+
+ uuids = [uuid.uuid4() for i in range(10)]
+
+ sentinel_col = Column(
+ "unique_id",
+ Uuid,
+ default=functools.partial(next, iter(uuids)),
+ insert_sentinel=bool(add_sentinel),
+ )
+ t1 = Table(
+ "nopk",
+ metadata,
+ id_col,
+ Column("data", String(50)),
+ sentinel_col,
+ Column(
+ "has_server_default",
+ String(30),
+ server_default="some_server_default",
+ ),
+ )
+ metadata.create_all(connection)
+
+ fixtures.insertmanyvalues_fixture(
+ connection,
+ randomize_rows=bool(randomize_returning),
+ warn_on_downgraded=bool(warn_for_downgrades),
+ )
+
+ stmt = insert(t1).returning(
+ t1.c.id,
+ t1.c.data,
+ t1.c.has_server_default,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ )
+ if not set_identity:
+ data = [{"id": i + 1, "data": f"d{i}"} for i in range(10)]
+ else:
+ data = [{"data": f"d{i}"} for i in range(10)]
+
+ with self._expect_downgrade_warnings(
+ warn_for_downgrades=warn_for_downgrades,
+ sort_by_parameter_order=sort_by_parameter_order,
+ separate_sentinel=add_sentinel,
+ ):
+ result = connection.execute(stmt, data)
+
+ if sort_by_parameter_order:
+ coll = list
+ else:
+ coll = set
+
+ eq_(
+ coll(result),
+ coll([(i + 1, f"d{i}", "some_server_default") for i in range(10)]),
+ )
+
+ @testing.variation("add_sentinel_to_col", [True, False])
+ @testing.variation(
+ "set_autoincrement", [True, (False, testing.skip_if("mariadb"))]
+ )
+ def test_hybrid_client_composite_pk(
+ self,
+ metadata,
+ connection,
+ randomize_returning,
+ sort_by_parameter_order,
+ warn_for_downgrades,
+ add_sentinel_to_col,
+ set_autoincrement,
+ ):
+ """test a pk that is part server generated part client generated.
+
+ The server generated col by itself can be the sentinel. if it's
+ part of the PK and is autoincrement=True then it is automatically
+ used as such. if not, there's a graceful downgrade.
+
+ """
+
+ t1 = Table(
+ "data",
+ metadata,
+ Column(
+ "idint",
+ Integer,
+ Identity(),
+ autoincrement=True if set_autoincrement else "auto",
+ primary_key=True,
+ insert_sentinel=bool(add_sentinel_to_col),
+ ),
+ Column(
+ "idstr",
+ String(30),
+ primary_key=True,
+ ),
+ Column("data", String(50)),
+ Column(
+ "has_server_default",
+ String(30),
+ server_default="some_server_default",
+ ),
+ )
+
+ no_autoincrement = (
+ not testing.requires.supports_autoincrement_w_composite_pk.enabled # noqa: E501
+ )
+ if set_autoincrement and no_autoincrement:
+ with expect_raises_message(
+ exc.CompileError,
+ r".*SQLite does not support autoincrement for "
+ "composite primary keys",
+ ):
+ metadata.create_all(connection)
+ return
+ else:
+
+ metadata.create_all(connection)
+
+ fixtures.insertmanyvalues_fixture(
+ connection,
+ randomize_rows=bool(randomize_returning),
+ warn_on_downgraded=bool(warn_for_downgrades),
+ )
+
+ stmt = insert(t1).returning(
+ t1.c.idint,
+ t1.c.idstr,
+ t1.c.data,
+ t1.c.has_server_default,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ )
+
+ if no_autoincrement:
+ data = [
+ {"idint": i + 1, "idstr": f"id{i}", "data": f"d{i}"}
+ for i in range(10)
+ ]
+ else:
+ data = [{"idstr": f"id{i}", "data": f"d{i}"} for i in range(10)]
+
+ if (
+ testing.db.dialect.use_insertmanyvalues
+ and add_sentinel_to_col
+ and sort_by_parameter_order
+ and not (
+ testing.db.dialect.insertmanyvalues_implicit_sentinel
+ & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
+ )
+ ):
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ "Column data.idint can't be explicitly marked as a sentinel "
+ "column when using the sqlite dialect",
+ ):
+ result = connection.execute(stmt, data)
+ return
+
+ with self._expect_downgrade_warnings(
+ warn_for_downgrades=warn_for_downgrades,
+ sort_by_parameter_order=sort_by_parameter_order,
+ separate_sentinel=not set_autoincrement and add_sentinel_to_col,
+ server_autoincrement=set_autoincrement,
+ ):
+ result = connection.execute(stmt, data)
+
+ if sort_by_parameter_order:
+ coll = list
+ else:
+ coll = set
+
+ eq_(
+ coll(result),
+ coll(
+ [
+ (i + 1, f"id{i}", f"d{i}", "some_server_default")
+ for i in range(10)
+ ]
+ ),
+ )
+
+ @testing.variation("composite_pk", [True, False])
+ @testing.only_on(
+ [
+ "+psycopg",
+ "+psycopg2",
+ "+pysqlite",
+ "+mysqlclient",
+ "+cx_oracle",
+ "+oracledb",
+ ]
+ )
+ def test_failure_mode_if_i_dont_send_value(
+ self, metadata, connection, sort_by_parameter_order, composite_pk
+ ):
+ """test that we get a regular integrity error if a required
+ PK value was not sent, that is, imv does not get in the way
+
+ """
+ t1 = Table(
+ "data",
+ metadata,
+ Column("id", String(30), primary_key=True),
+ Column("data", String(50)),
+ Column(
+ "has_server_default",
+ String(30),
+ server_default="some_server_default",
+ ),
+ )
+ if composite_pk:
+ t1.append_column(Column("uid", Uuid(), default=uuid.uuid4))
+
+ metadata.create_all(connection)
+
+ with expect_warnings(
+ r".*but has no Python-side or server-side default ",
+ raise_on_any_unexpected=True,
+ ):
+ with expect_raises(exc.IntegrityError):
+ connection.execute(
+ insert(t1).returning(
+ t1.c.id,
+ t1.c.data,
+ t1.c.has_server_default,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ ),
+ [{"data": f"d{i}"} for i in range(10)],
+ )
+
+ @testing.variation("add_sentinel_flag_to_col", [True, False])
+ @testing.variation(
+ "return_type", ["include_sentinel", "default_only", "return_defaults"]
+ )
+ @testing.variation(
+ "sentinel_type",
+ [
+ ("autoincrement", testing.requires.autoincrement_without_sequence),
+ "identity",
+ "sequence",
+ ],
+ )
+ def test_implicit_autoincrement_sentinel(
+ self,
+ metadata,
+ connection,
+ return_type: testing.Variation,
+ sort_by_parameter_order,
+ randomize_returning,
+ sentinel_type,
+ add_sentinel_flag_to_col,
+ ):
+
+ if sentinel_type.identity:
+ sentinel_args = [Identity()]
+ elif sentinel_type.sequence:
+ sentinel_args = [Sequence("id_seq", start=1)]
+ else:
+ sentinel_args = []
+ t1 = Table(
+ "data",
+ metadata,
+ Column(
+ "id",
+ Integer,
+ *sentinel_args,
+ primary_key=True,
+ insert_sentinel=bool(add_sentinel_flag_to_col),
+ ),
+ Column("data", String(50)),
+ Column(
+ "has_server_default",
+ String(30),
+ server_default="some_server_default",
+ ),
+ )
+
+ fixtures.insertmanyvalues_fixture(
+ connection,
+ randomize_rows=bool(randomize_returning),
+ warn_on_downgraded=False,
+ )
+
+ if sort_by_parameter_order:
+ collection_cls = list
+ else:
+ collection_cls = set
+
+ metadata.create_all(connection)
+
+ if sort_by_parameter_order:
+ kw = {"sort_by_parameter_order": True}
+ else:
+ kw = {}
+
+ if return_type.include_sentinel:
+ stmt = t1.insert().returning(
+ t1.c.id, t1.c.data, t1.c.has_server_default, **kw
+ )
+ elif return_type.default_only:
+ stmt = t1.insert().returning(
+ t1.c.data, t1.c.has_server_default, **kw
+ )
+ elif return_type.return_defaults:
+ stmt = t1.insert().return_defaults(**kw)
+
+ else:
+ return_type.fail()
+
+ if (
+ testing.db.dialect.use_insertmanyvalues
+ and add_sentinel_flag_to_col
+ and sort_by_parameter_order
+ and (
+ not (
+ testing.db.dialect.insertmanyvalues_implicit_sentinel
+ & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
+ )
+ or (
+ # currently a SQL Server case, we dont yet render a
+ # syntax for SQL Server sequence w/ deterministic
+ # ordering. The INSERT..SELECT could be restructured
+ # further to support this at a later time however
+ # sequences with SQL Server are very unusual.
+ sentinel_type.sequence
+ and not (
+ testing.db.dialect.insertmanyvalues_implicit_sentinel
+ & InsertmanyvaluesSentinelOpts.SEQUENCE
+ )
+ )
+ )
+ ):
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ "Column data.id can't be explicitly marked as a "
+ f"sentinel column when using the {testing.db.dialect.name} "
+ "dialect",
+ ):
+ connection.execute(
+ stmt,
+ [{"data": f"d{i}"} for i in range(1, 6)],
+ )
+ return
+ else:
+ r = connection.execute(
+ stmt,
+ [{"data": f"d{i}"} for i in range(1, 6)],
+ )
+
+ if return_type.include_sentinel:
+ eq_(r.keys(), ["id", "data", "has_server_default"])
+ eq_(
+ collection_cls(r),
+ collection_cls(
+ [(i, f"d{i}", "some_server_default") for i in range(1, 6)]
+ ),
+ )
+ elif return_type.default_only:
+ eq_(r.keys(), ["data", "has_server_default"])
+ eq_(
+ collection_cls(r),
+ collection_cls(
+ [(f"d{i}", "some_server_default") for i in range(1, 6)]
+ ),
+ )
+ elif return_type.return_defaults:
+ eq_(r.keys(), ["id", "has_server_default"])
+ eq_(
+ collection_cls(r.inserted_primary_key_rows),
+ collection_cls([(i + 1,) for i in range(5)]),
+ )
+ eq_(
+ collection_cls(r.returned_defaults_rows),
+ collection_cls(
+ [
+ (
+ 1,
+ "some_server_default",
+ ),
+ (
+ 2,
+ "some_server_default",
+ ),
+ (
+ 3,
+ "some_server_default",
+ ),
+ (
+ 4,
+ "some_server_default",
+ ),
+ (
+ 5,
+ "some_server_default",
+ ),
+ ]
+ ),
+ )
+ eq_(r.all(), [])
+ else:
+ return_type.fail()
+
+ @testing.variation("pk_type", ["serverside", "clientside"])
+ @testing.variation(
+ "sentinel_type",
+ [
+ "use_pk",
+ ("use_pk_explicit", testing.skip_if("sqlite")),
+ "separate_uuid",
+ "separate_sentinel",
+ ],
+ )
+ @testing.requires.provisioned_upsert
+ def test_upsert_downgrades(
+ self,
+ metadata,
+ connection,
+ pk_type: testing.Variation,
+ sort_by_parameter_order,
+ randomize_returning,
+ sentinel_type,
+ warn_for_downgrades,
+ ):
+ if pk_type.serverside:
+ pk_col = Column(
+ "id",
+ Integer(),
+ primary_key=True,
+ insert_sentinel=bool(sentinel_type.use_pk_explicit),
+ )
+ elif pk_type.clientside:
+ pk_col = Column(
+ "id",
+ Uuid(),
+ default=uuid.uuid4,
+ primary_key=True,
+ insert_sentinel=bool(sentinel_type.use_pk_explicit),
+ )
+ else:
+ pk_type.fail()
+
+ if sentinel_type.separate_uuid:
+ extra_col = Column(
+ "sent_col",
+ Uuid(),
+ default=uuid.uuid4,
+ insert_sentinel=True,
+ nullable=False,
+ )
+ elif sentinel_type.separate_sentinel:
+ extra_col = insert_sentinel("sent_col")
+ else:
+ extra_col = Column("sent_col", Integer)
+
+ t1 = Table(
+ "upsert_table",
+ metadata,
+ pk_col,
+ Column("data", String(50)),
+ extra_col,
+ Column(
+ "has_server_default",
+ String(30),
+ server_default="some_server_default",
+ ),
+ )
+ metadata.create_all(connection)
+
+ result = connection.execute(
+ insert(t1).returning(
+ t1.c.id, t1.c.data, sort_by_parameter_order=True
+ ),
+ [{"data": "d1"}, {"data": "d2"}],
+ )
+ d1d2 = list(result)
+
+ if pk_type.serverside:
+ new_ids = [10, 15, 3]
+ elif pk_type.clientside:
+ new_ids = [uuid.uuid4() for i in range(3)]
+ else:
+ pk_type.fail()
+
+ upsert_data = [
+ {"id": d1d2[0][0], "data": "d1 new"},
+ {"id": new_ids[0], "data": "d10"},
+ {"id": new_ids[1], "data": "d15"},
+ {"id": d1d2[1][0], "data": "d2 new"},
+ {"id": new_ids[2], "data": "d3"},
+ ]
+
+ fixtures.insertmanyvalues_fixture(
+ connection,
+ randomize_rows=bool(randomize_returning),
+ warn_on_downgraded=bool(warn_for_downgrades),
+ )
+
+ stmt = provision.upsert(
+ config,
+ t1,
+ (t1.c.data, t1.c.has_server_default),
+ set_lambda=lambda inserted: {
+ "data": inserted.data + " upserted",
+ },
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ )
+
+ with self._expect_downgrade_warnings(
+ warn_for_downgrades=warn_for_downgrades,
+ sort_by_parameter_order=sort_by_parameter_order,
+ ):
+ result = connection.execute(stmt, upsert_data)
+
+ expected_data = [
+ ("d1 new upserted", "some_server_default"),
+ ("d10", "some_server_default"),
+ ("d15", "some_server_default"),
+ ("d2 new upserted", "some_server_default"),
+ ("d3", "some_server_default"),
+ ]
+ if sort_by_parameter_order:
+ coll = list
+ else:
+ coll = set
+
+ eq_(coll(result), coll(expected_data))
+
+ def test_auto_downgraded_non_mvi_dialect(
+ self,
+ metadata,
+ testing_engine,
+ randomize_returning,
+ warn_for_downgrades,
+ sort_by_parameter_order,
+ ):
+ """Accommodate the case of the dialect that supports RETURNING, but
+ does not support "multi values INSERT" syntax.
+
+ These dialects should still provide insertmanyvalues/returning
+ support, using downgraded batching.
+
+ For now, we are still keeping this entire thing "opt in" by requiring
+ that use_insertmanyvalues=True, which means we can't simplify the
+ ORM by not worrying about dialects where ordering is available or
+ not.
+
+ However, dialects that use RETURNING, but don't support INSERT VALUES
+ (..., ..., ...) can set themselves up like this::
+
+ class MyDialect(DefaultDialect):
+ use_insertmanyvalues = True
+ supports_multivalues_insert = False
+
+ This test runs for everyone **including** Oracle, where we
+ exercise Oracle using "insertmanyvalues" without "multivalues_insert".
+
+ """
+ engine = testing_engine()
+ engine.connect().close()
+
+ engine.dialect.supports_multivalues_insert = False
+ engine.dialect.use_insertmanyvalues = True
+
+ uuids = [uuid.uuid4() for i in range(10)]
+
+ t1 = Table(
+ "t1",
+ metadata,
+ Column("id", Uuid(), default=functools.partial(next, iter(uuids))),
+ Column("data", String(50)),
+ )
+ metadata.create_all(engine)
+
+ with engine.connect() as conn:
+
+ fixtures.insertmanyvalues_fixture(
+ conn,
+ randomize_rows=bool(randomize_returning),
+ warn_on_downgraded=bool(warn_for_downgrades),
+ )
+
+ stmt = insert(t1).returning(
+ t1.c.id,
+ t1.c.data,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ )
+ data = [{"data": f"d{i}"} for i in range(10)]
+
+ with self._expect_downgrade_warnings(
+ warn_for_downgrades=warn_for_downgrades,
+ sort_by_parameter_order=True, # will warn even if not sorted
+ connection=conn,
+ ):
+ result = conn.execute(stmt, data)
+
+ expected_data = [(uuids[i], f"d{i}") for i in range(10)]
+ if sort_by_parameter_order:
+ coll = list
+ else:
+ coll = set
+
+ eq_(coll(result), coll(expected_data))