summaryrefslogtreecommitdiff
path: root/test/sql/test_insert_exec.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql/test_insert_exec.py')
-rw-r--r--test/sql/test_insert_exec.py239
1 files changed, 239 insertions, 0 deletions
diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py
index b6945813e..4ce093156 100644
--- a/test/sql/test_insert_exec.py
+++ b/test/sql/test_insert_exec.py
@@ -1,4 +1,7 @@
+import itertools
+
from sqlalchemy import and_
+from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
@@ -10,8 +13,10 @@ from sqlalchemy import sql
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import VARCHAR
+from sqlalchemy.engine import cursor as _cursor
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import mock
@@ -712,3 +717,237 @@ class TableInsertTest(fixtures.TablesTest):
table=t,
parameters=dict(id=None, data="data", x=5),
)
+
+
+class InsertManyValuesTest(fixtures.RemovesEvents, fixtures.TablesTest):
+ __backend__ = True
+ __requires__ = ("insertmanyvalues",)
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "data",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("x", String(50)),
+ Column("y", String(50)),
+ Column("z", Integer, server_default="5"),
+ )
+
+ Table(
+ "Unitéble2",
+ metadata,
+ Column("méil", Integer, primary_key=True),
+ Column("\u6e2c\u8a66", Integer),
+ )
+
+ def test_insert_unicode_keys(self, connection):
+ table = self.tables["Unitéble2"]
+
+ stmt = table.insert().returning(table.c["méil"])
+
+ connection.execute(
+ stmt,
+ [
+ {"méil": 1, "\u6e2c\u8a66": 1},
+ {"méil": 2, "\u6e2c\u8a66": 2},
+ {"méil": 3, "\u6e2c\u8a66": 3},
+ ],
+ )
+
+ eq_(connection.execute(table.select()).all(), [(1, 1), (2, 2), (3, 3)])
+
+ def test_insert_returning_values(self, connection):
+ t = self.tables.data
+
+ conn = connection
+ page_size = conn.dialect.insertmanyvalues_page_size or 100
+ data = [
+ {"x": "x%d" % i, "y": "y%d" % i}
+ for i in range(1, page_size * 2 + 27)
+ ]
+ result = conn.execute(t.insert().returning(t.c.x, t.c.y), data)
+
+ eq_([tup[0] for tup in result.cursor.description], ["x", "y"])
+ eq_(result.keys(), ["x", "y"])
+ assert t.c.x in result.keys()
+ assert t.c.id not in result.keys()
+ assert not result._soft_closed
+ assert isinstance(
+ result.cursor_strategy,
+ _cursor.FullyBufferedCursorFetchStrategy,
+ )
+ assert not result.closed
+ eq_(result.mappings().all(), data)
+
+ assert result._soft_closed
+ # assert result.closed
+ assert result.cursor is None
+
+ def test_insert_returning_preexecute_pk(self, metadata, connection):
+ counter = itertools.count(1)
+
+ t = Table(
+ "t",
+ self.metadata,
+ Column(
+ "id",
+ Integer,
+ primary_key=True,
+ default=lambda: next(counter),
+ ),
+ Column("data", Integer),
+ )
+ metadata.create_all(connection)
+
+ result = connection.execute(
+ t.insert().return_defaults(),
+ [{"data": 1}, {"data": 2}, {"data": 3}],
+ )
+
+ eq_(result.inserted_primary_key_rows, [(1,), (2,), (3,)])
+
+ def test_insert_returning_defaults(self, connection):
+ t = self.tables.data
+
+ conn = connection
+
+ result = conn.execute(t.insert(), {"x": "x0", "y": "y0"})
+ first_pk = result.inserted_primary_key[0]
+
+ page_size = conn.dialect.insertmanyvalues_page_size or 100
+ total_rows = page_size * 5 + 27
+ data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, total_rows)]
+ result = conn.execute(t.insert().returning(t.c.id, t.c.z), data)
+
+ eq_(
+ result.all(),
+ [(pk, 5) for pk in range(1 + first_pk, total_rows + first_pk)],
+ )
+
+ def test_insert_return_pks_default_values(self, connection):
+ """test sending multiple, empty rows into an INSERT and getting primary
+ key values back.
+
+ This has to use a format that indicates at least one DEFAULT in
+ multiple parameter sets, i.e. "INSERT INTO table (anycol) VALUES
+ (DEFAULT) (DEFAULT) (DEFAULT) ... RETURNING col"
+
+ if the database doesnt support this (like SQLite, mssql), it
+ actually runs the statement that many times on the cursor.
+ This is much less efficient, but is still more efficient than
+ how it worked previously where we'd run the statement that many
+ times anyway.
+
+ There's ways to make it work for those, such as on SQLite
+ we can use "INSERT INTO table (pk_col) VALUES (NULL) RETURNING pk_col",
+ but that assumes an autoincrement pk_col, not clear how this
+ could be produced generically.
+
+ """
+ t = self.tables.data
+
+ conn = connection
+
+ result = conn.execute(t.insert(), {"x": "x0", "y": "y0"})
+ first_pk = result.inserted_primary_key[0]
+
+ page_size = conn.dialect.insertmanyvalues_page_size or 100
+ total_rows = page_size * 2 + 27
+ data = [{} for i in range(1, total_rows)]
+ result = conn.execute(t.insert().returning(t.c.id), data)
+
+ eq_(
+ result.all(),
+ [(pk,) for pk in range(1 + first_pk, total_rows + first_pk)],
+ )
+
+ @testing.combinations(None, 100, 329, argnames="batchsize")
+ @testing.combinations(
+ "engine",
+ "conn_execution_option",
+ "exec_execution_option",
+ "stmt_execution_option",
+ argnames="paramtype",
+ )
+ def test_page_size_adjustment(self, testing_engine, batchsize, paramtype):
+
+ t = self.tables.data
+
+ if paramtype == "engine" and batchsize is not None:
+ e = testing_engine(
+ options={
+ "insertmanyvalues_page_size": batchsize,
+ },
+ )
+
+ # sqlite, since this is a new engine, re-create the table
+ if not testing.requires.independent_connections.enabled:
+ t.create(e, checkfirst=True)
+ else:
+ e = testing.db
+
+ totalnum = 1275
+ data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, totalnum)]
+
+ insert_count = 0
+
+ with e.begin() as conn:
+
+ @event.listens_for(conn, "before_cursor_execute")
+ def go(conn, cursor, statement, parameters, context, executemany):
+ nonlocal insert_count
+ if statement.startswith("INSERT"):
+ insert_count += 1
+
+ stmt = t.insert()
+ if batchsize is None or paramtype == "engine":
+ conn.execute(stmt.returning(t.c.id), data)
+ elif paramtype == "conn_execution_option":
+ conn = conn.execution_options(
+ insertmanyvalues_page_size=batchsize
+ )
+ conn.execute(stmt.returning(t.c.id), data)
+ elif paramtype == "stmt_execution_option":
+ stmt = stmt.execution_options(
+ insertmanyvalues_page_size=batchsize
+ )
+ conn.execute(stmt.returning(t.c.id), data)
+ elif paramtype == "exec_execution_option":
+ conn.execute(
+ stmt.returning(t.c.id),
+ data,
+ execution_options=dict(
+ insertmanyvalues_page_size=batchsize
+ ),
+ )
+ else:
+ assert False
+
+ assert_batchsize = batchsize or 1000
+ eq_(
+ insert_count,
+ totalnum // assert_batchsize
+ + (1 if totalnum % assert_batchsize else 0),
+ )
+
+ def test_disabled(self, testing_engine):
+
+ e = testing_engine(
+ options={"use_insertmanyvalues": False},
+ share_pool=True,
+ transfer_staticpool=True,
+ )
+ totalnum = 1275
+ data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, totalnum)]
+
+ t = self.tables.data
+
+ with e.begin() as conn:
+ stmt = t.insert()
+ with expect_raises_message(
+ exc.StatementError,
+ "with current server capabilities does not support "
+ "INSERT..RETURNING when executemany",
+ ):
+ conn.execute(stmt.returning(t.c.id), data)