diff options
Diffstat (limited to 'test/sql/test_insert_exec.py')
-rw-r--r-- | test/sql/test_insert_exec.py | 239 |
1 files changed, 239 insertions, 0 deletions
diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py index b6945813e..4ce093156 100644 --- a/test/sql/test_insert_exec.py +++ b/test/sql/test_insert_exec.py @@ -1,4 +1,7 @@ +import itertools + from sqlalchemy import and_ +from sqlalchemy import event from sqlalchemy import exc from sqlalchemy import ForeignKey from sqlalchemy import func @@ -10,8 +13,10 @@ from sqlalchemy import sql from sqlalchemy import String from sqlalchemy import testing from sqlalchemy import VARCHAR +from sqlalchemy.engine import cursor as _cursor from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import mock @@ -712,3 +717,237 @@ class TableInsertTest(fixtures.TablesTest): table=t, parameters=dict(id=None, data="data", x=5), ) + + +class InsertManyValuesTest(fixtures.RemovesEvents, fixtures.TablesTest): + __backend__ = True + __requires__ = ("insertmanyvalues",) + + @classmethod + def define_tables(cls, metadata): + Table( + "data", + metadata, + Column("id", Integer, primary_key=True), + Column("x", String(50)), + Column("y", String(50)), + Column("z", Integer, server_default="5"), + ) + + Table( + "Unitéble2", + metadata, + Column("méil", Integer, primary_key=True), + Column("\u6e2c\u8a66", Integer), + ) + + def test_insert_unicode_keys(self, connection): + table = self.tables["Unitéble2"] + + stmt = table.insert().returning(table.c["méil"]) + + connection.execute( + stmt, + [ + {"méil": 1, "\u6e2c\u8a66": 1}, + {"méil": 2, "\u6e2c\u8a66": 2}, + {"méil": 3, "\u6e2c\u8a66": 3}, + ], + ) + + eq_(connection.execute(table.select()).all(), [(1, 1), (2, 2), (3, 3)]) + + def test_insert_returning_values(self, connection): + t = self.tables.data + + conn = connection + page_size = conn.dialect.insertmanyvalues_page_size or 100 + data = [ + {"x": "x%d" % i, "y": "y%d" % i} + for i in range(1, page_size * 2 + 27) + ] + result = conn.execute(t.insert().returning(t.c.x, t.c.y), data) + + eq_([tup[0] for tup in result.cursor.description], ["x", "y"]) + eq_(result.keys(), ["x", "y"]) + assert t.c.x in result.keys() + assert t.c.id not in result.keys() + assert not result._soft_closed + assert isinstance( + result.cursor_strategy, + _cursor.FullyBufferedCursorFetchStrategy, + ) + assert not result.closed + eq_(result.mappings().all(), data) + + assert result._soft_closed + # assert result.closed + assert result.cursor is None + + def test_insert_returning_preexecute_pk(self, metadata, connection): + counter = itertools.count(1) + + t = Table( + "t", + self.metadata, + Column( + "id", + Integer, + primary_key=True, + default=lambda: next(counter), + ), + Column("data", Integer), + ) + metadata.create_all(connection) + + result = connection.execute( + t.insert().return_defaults(), + [{"data": 1}, {"data": 2}, {"data": 3}], + ) + + eq_(result.inserted_primary_key_rows, [(1,), (2,), (3,)]) + + def test_insert_returning_defaults(self, connection): + t = self.tables.data + + conn = connection + + result = conn.execute(t.insert(), {"x": "x0", "y": "y0"}) + first_pk = result.inserted_primary_key[0] + + page_size = conn.dialect.insertmanyvalues_page_size or 100 + total_rows = page_size * 5 + 27 + data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, total_rows)] + result = conn.execute(t.insert().returning(t.c.id, t.c.z), data) + + eq_( + result.all(), + [(pk, 5) for pk in range(1 + first_pk, total_rows + first_pk)], + ) + + def test_insert_return_pks_default_values(self, connection): + """test sending multiple, empty rows into an INSERT and getting primary + key values back. + + This has to use a format that indicates at least one DEFAULT in + multiple parameter sets, i.e. "INSERT INTO table (anycol) VALUES + (DEFAULT) (DEFAULT) (DEFAULT) ... RETURNING col" + + if the database doesnt support this (like SQLite, mssql), it + actually runs the statement that many times on the cursor. + This is much less efficient, but is still more efficient than + how it worked previously where we'd run the statement that many + times anyway. + + There's ways to make it work for those, such as on SQLite + we can use "INSERT INTO table (pk_col) VALUES (NULL) RETURNING pk_col", + but that assumes an autoincrement pk_col, not clear how this + could be produced generically. + + """ + t = self.tables.data + + conn = connection + + result = conn.execute(t.insert(), {"x": "x0", "y": "y0"}) + first_pk = result.inserted_primary_key[0] + + page_size = conn.dialect.insertmanyvalues_page_size or 100 + total_rows = page_size * 2 + 27 + data = [{} for i in range(1, total_rows)] + result = conn.execute(t.insert().returning(t.c.id), data) + + eq_( + result.all(), + [(pk,) for pk in range(1 + first_pk, total_rows + first_pk)], + ) + + @testing.combinations(None, 100, 329, argnames="batchsize") + @testing.combinations( + "engine", + "conn_execution_option", + "exec_execution_option", + "stmt_execution_option", + argnames="paramtype", + ) + def test_page_size_adjustment(self, testing_engine, batchsize, paramtype): + + t = self.tables.data + + if paramtype == "engine" and batchsize is not None: + e = testing_engine( + options={ + "insertmanyvalues_page_size": batchsize, + }, + ) + + # sqlite, since this is a new engine, re-create the table + if not testing.requires.independent_connections.enabled: + t.create(e, checkfirst=True) + else: + e = testing.db + + totalnum = 1275 + data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, totalnum)] + + insert_count = 0 + + with e.begin() as conn: + + @event.listens_for(conn, "before_cursor_execute") + def go(conn, cursor, statement, parameters, context, executemany): + nonlocal insert_count + if statement.startswith("INSERT"): + insert_count += 1 + + stmt = t.insert() + if batchsize is None or paramtype == "engine": + conn.execute(stmt.returning(t.c.id), data) + elif paramtype == "conn_execution_option": + conn = conn.execution_options( + insertmanyvalues_page_size=batchsize + ) + conn.execute(stmt.returning(t.c.id), data) + elif paramtype == "stmt_execution_option": + stmt = stmt.execution_options( + insertmanyvalues_page_size=batchsize + ) + conn.execute(stmt.returning(t.c.id), data) + elif paramtype == "exec_execution_option": + conn.execute( + stmt.returning(t.c.id), + data, + execution_options=dict( + insertmanyvalues_page_size=batchsize + ), + ) + else: + assert False + + assert_batchsize = batchsize or 1000 + eq_( + insert_count, + totalnum // assert_batchsize + + (1 if totalnum % assert_batchsize else 0), + ) + + def test_disabled(self, testing_engine): + + e = testing_engine( + options={"use_insertmanyvalues": False}, + share_pool=True, + transfer_staticpool=True, + ) + totalnum = 1275 + data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, totalnum)] + + t = self.tables.data + + with e.begin() as conn: + stmt = t.insert() + with expect_raises_message( + exc.StatementError, + "with current server capabilities does not support " + "INSERT..RETURNING when executemany", + ): + conn.execute(stmt.returning(t.c.id), data) |