summaryrefslogtreecommitdiff
path: root/test/sql/test_insert_exec.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-07-18 15:08:37 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-09-24 11:15:32 -0400
commit2bcc97da424eef7db9a5d02f81d02344925415ee (patch)
tree13d4f04bc7dd40a0207f86aa2fc3a3b49e065674 /test/sql/test_insert_exec.py
parent332188e5680574368001ded52eb0a9d259ecdef5 (diff)
downloadsqlalchemy-2bcc97da424eef7db9a5d02f81d02344925415ee.tar.gz
implement batched INSERT..VALUES () () for executemany
the feature is enabled for all built in backends when RETURNING is used, except for Oracle that doesn't need it, and on psycopg2 and mssql+pyodbc it is used for all INSERT statements, not just those that use RETURNING. third party dialects would need to opt in to the new feature by setting use_insertmanyvalues to True. Also adds dialect-level guards against using returning with executemany where we dont have an implementation to suit it. execute single w/ returning still defers to the server without us checking. Fixes: #6047 Fixes: #7907 Change-Id: I3936d3c00003f02e322f2e43fb949d0e6e568304
Diffstat (limited to 'test/sql/test_insert_exec.py')
-rw-r--r--test/sql/test_insert_exec.py239
1 files changed, 239 insertions, 0 deletions
diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py
index b6945813e..4ce093156 100644
--- a/test/sql/test_insert_exec.py
+++ b/test/sql/test_insert_exec.py
@@ -1,4 +1,7 @@
+import itertools
+
from sqlalchemy import and_
+from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
@@ -10,8 +13,10 @@ from sqlalchemy import sql
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import VARCHAR
+from sqlalchemy.engine import cursor as _cursor
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import mock
@@ -712,3 +717,237 @@ class TableInsertTest(fixtures.TablesTest):
table=t,
parameters=dict(id=None, data="data", x=5),
)
+
+
+class InsertManyValuesTest(fixtures.RemovesEvents, fixtures.TablesTest):
+ __backend__ = True
+ __requires__ = ("insertmanyvalues",)
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "data",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("x", String(50)),
+ Column("y", String(50)),
+ Column("z", Integer, server_default="5"),
+ )
+
+ Table(
+ "Unitéble2",
+ metadata,
+ Column("méil", Integer, primary_key=True),
+ Column("\u6e2c\u8a66", Integer),
+ )
+
+ def test_insert_unicode_keys(self, connection):
+ table = self.tables["Unitéble2"]
+
+ stmt = table.insert().returning(table.c["méil"])
+
+ connection.execute(
+ stmt,
+ [
+ {"méil": 1, "\u6e2c\u8a66": 1},
+ {"méil": 2, "\u6e2c\u8a66": 2},
+ {"méil": 3, "\u6e2c\u8a66": 3},
+ ],
+ )
+
+ eq_(connection.execute(table.select()).all(), [(1, 1), (2, 2), (3, 3)])
+
+ def test_insert_returning_values(self, connection):
+ t = self.tables.data
+
+ conn = connection
+ page_size = conn.dialect.insertmanyvalues_page_size or 100
+ data = [
+ {"x": "x%d" % i, "y": "y%d" % i}
+ for i in range(1, page_size * 2 + 27)
+ ]
+ result = conn.execute(t.insert().returning(t.c.x, t.c.y), data)
+
+ eq_([tup[0] for tup in result.cursor.description], ["x", "y"])
+ eq_(result.keys(), ["x", "y"])
+ assert t.c.x in result.keys()
+ assert t.c.id not in result.keys()
+ assert not result._soft_closed
+ assert isinstance(
+ result.cursor_strategy,
+ _cursor.FullyBufferedCursorFetchStrategy,
+ )
+ assert not result.closed
+ eq_(result.mappings().all(), data)
+
+ assert result._soft_closed
+ # assert result.closed
+ assert result.cursor is None
+
+ def test_insert_returning_preexecute_pk(self, metadata, connection):
+ counter = itertools.count(1)
+
+ t = Table(
+ "t",
+ self.metadata,
+ Column(
+ "id",
+ Integer,
+ primary_key=True,
+ default=lambda: next(counter),
+ ),
+ Column("data", Integer),
+ )
+ metadata.create_all(connection)
+
+ result = connection.execute(
+ t.insert().return_defaults(),
+ [{"data": 1}, {"data": 2}, {"data": 3}],
+ )
+
+ eq_(result.inserted_primary_key_rows, [(1,), (2,), (3,)])
+
+ def test_insert_returning_defaults(self, connection):
+ t = self.tables.data
+
+ conn = connection
+
+ result = conn.execute(t.insert(), {"x": "x0", "y": "y0"})
+ first_pk = result.inserted_primary_key[0]
+
+ page_size = conn.dialect.insertmanyvalues_page_size or 100
+ total_rows = page_size * 5 + 27
+ data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, total_rows)]
+ result = conn.execute(t.insert().returning(t.c.id, t.c.z), data)
+
+ eq_(
+ result.all(),
+ [(pk, 5) for pk in range(1 + first_pk, total_rows + first_pk)],
+ )
+
+ def test_insert_return_pks_default_values(self, connection):
+ """test sending multiple, empty rows into an INSERT and getting primary
+ key values back.
+
+ This has to use a format that indicates at least one DEFAULT in
+ multiple parameter sets, i.e. "INSERT INTO table (anycol) VALUES
+ (DEFAULT) (DEFAULT) (DEFAULT) ... RETURNING col"
+
+ if the database doesnt support this (like SQLite, mssql), it
+ actually runs the statement that many times on the cursor.
+ This is much less efficient, but is still more efficient than
+ how it worked previously where we'd run the statement that many
+ times anyway.
+
+ There's ways to make it work for those, such as on SQLite
+ we can use "INSERT INTO table (pk_col) VALUES (NULL) RETURNING pk_col",
+ but that assumes an autoincrement pk_col, not clear how this
+ could be produced generically.
+
+ """
+ t = self.tables.data
+
+ conn = connection
+
+ result = conn.execute(t.insert(), {"x": "x0", "y": "y0"})
+ first_pk = result.inserted_primary_key[0]
+
+ page_size = conn.dialect.insertmanyvalues_page_size or 100
+ total_rows = page_size * 2 + 27
+ data = [{} for i in range(1, total_rows)]
+ result = conn.execute(t.insert().returning(t.c.id), data)
+
+ eq_(
+ result.all(),
+ [(pk,) for pk in range(1 + first_pk, total_rows + first_pk)],
+ )
+
+ @testing.combinations(None, 100, 329, argnames="batchsize")
+ @testing.combinations(
+ "engine",
+ "conn_execution_option",
+ "exec_execution_option",
+ "stmt_execution_option",
+ argnames="paramtype",
+ )
+ def test_page_size_adjustment(self, testing_engine, batchsize, paramtype):
+
+ t = self.tables.data
+
+ if paramtype == "engine" and batchsize is not None:
+ e = testing_engine(
+ options={
+ "insertmanyvalues_page_size": batchsize,
+ },
+ )
+
+ # sqlite, since this is a new engine, re-create the table
+ if not testing.requires.independent_connections.enabled:
+ t.create(e, checkfirst=True)
+ else:
+ e = testing.db
+
+ totalnum = 1275
+ data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, totalnum)]
+
+ insert_count = 0
+
+ with e.begin() as conn:
+
+ @event.listens_for(conn, "before_cursor_execute")
+ def go(conn, cursor, statement, parameters, context, executemany):
+ nonlocal insert_count
+ if statement.startswith("INSERT"):
+ insert_count += 1
+
+ stmt = t.insert()
+ if batchsize is None or paramtype == "engine":
+ conn.execute(stmt.returning(t.c.id), data)
+ elif paramtype == "conn_execution_option":
+ conn = conn.execution_options(
+ insertmanyvalues_page_size=batchsize
+ )
+ conn.execute(stmt.returning(t.c.id), data)
+ elif paramtype == "stmt_execution_option":
+ stmt = stmt.execution_options(
+ insertmanyvalues_page_size=batchsize
+ )
+ conn.execute(stmt.returning(t.c.id), data)
+ elif paramtype == "exec_execution_option":
+ conn.execute(
+ stmt.returning(t.c.id),
+ data,
+ execution_options=dict(
+ insertmanyvalues_page_size=batchsize
+ ),
+ )
+ else:
+ assert False
+
+ assert_batchsize = batchsize or 1000
+ eq_(
+ insert_count,
+ totalnum // assert_batchsize
+ + (1 if totalnum % assert_batchsize else 0),
+ )
+
+ def test_disabled(self, testing_engine):
+
+ e = testing_engine(
+ options={"use_insertmanyvalues": False},
+ share_pool=True,
+ transfer_staticpool=True,
+ )
+ totalnum = 1275
+ data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, totalnum)]
+
+ t = self.tables.data
+
+ with e.begin() as conn:
+ stmt = t.insert()
+ with expect_raises_message(
+ exc.StatementError,
+ "with current server capabilities does not support "
+ "INSERT..RETURNING when executemany",
+ ):
+ conn.execute(stmt.returning(t.c.id), data)