diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-07-18 15:08:37 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-09-24 11:15:32 -0400 |
commit | 2bcc97da424eef7db9a5d02f81d02344925415ee (patch) | |
tree | 13d4f04bc7dd40a0207f86aa2fc3a3b49e065674 /lib/sqlalchemy/engine/base.py | |
parent | 332188e5680574368001ded52eb0a9d259ecdef5 (diff) | |
download | sqlalchemy-2bcc97da424eef7db9a5d02f81d02344925415ee.tar.gz |
implement batched INSERT..VALUES () () for executemany
the feature is enabled for all built in backends
when RETURNING is used,
except for Oracle that doesn't need it, and on
psycopg2 and mssql+pyodbc it is used for all INSERT statements,
not just those that use RETURNING.
third party dialects would need to opt in to the new feature
by setting use_insertmanyvalues to True.
Also adds dialect-level guards against using returning
with executemany where we dont have an implementation to
suit it. execute single w/ returning still defers to the
server without us checking.
Fixes: #6047
Fixes: #7907
Change-Id: I3936d3c00003f02e322f2e43fb949d0e6e568304
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 217 |
1 files changed, 208 insertions, 9 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 2b9cf602a..1b07acab5 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -12,6 +12,7 @@ import typing from typing import Any from typing import Callable from typing import cast +from typing import Iterable from typing import Iterator from typing import List from typing import Mapping @@ -29,6 +30,7 @@ from .interfaces import BindTyping from .interfaces import ConnectionEventsTarget from .interfaces import DBAPICursor from .interfaces import ExceptionContext +from .interfaces import ExecuteStyle from .interfaces import ExecutionContext from .util import _distill_params_20 from .util import _distill_raw_params @@ -438,6 +440,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` describing the ORM version of ``yield_per`` + :param insertmanyvalues_page_size: number of rows to format into an + INSERT statement when the statement uses "insertmanyvalues" mode, + which is a paged form of bulk insert that is used for many backends + when using :term:`executemany` execution typically in conjunction + with RETURNING. Defaults to 1000. May also be modified on a + per-engine basis using the + :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. + + .. versionadded:: 2.0 + + .. seealso:: + + :ref:`engine_insertmanyvalues` + :param schema_translate_map: Available on: :class:`_engine.Connection`, :class:`_engine.Engine`, :class:`_sql.Executable`. @@ -1795,8 +1811,39 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): context.pre_exec() + if context.execute_style is ExecuteStyle.INSERTMANYVALUES: + return self._exec_insertmany_context( + dialect, + context, + ) + else: + return self._exec_single_context( + dialect, context, statement, parameters + ) + + def _exec_single_context( + self, + dialect: Dialect, + context: ExecutionContext, + statement: Union[str, Compiled], + parameters: Optional[_AnyMultiExecuteParams], + ) -> CursorResult[Any]: + """continue the _execute_context() method for a single DBAPI + cursor.execute() or cursor.executemany() call. + + """ if dialect.bind_typing is BindTyping.SETINPUTSIZES: - context._set_input_sizes() + generic_setinputsizes = context._prepare_set_input_sizes() + + if generic_setinputsizes: + try: + dialect.do_set_input_sizes( + context.cursor, generic_setinputsizes, context + ) + except BaseException as e: + self._handle_dbapi_exception( + e, str(statement), parameters, None, context + ) cursor, str_statement, parameters = ( context.cursor, @@ -1840,13 +1887,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): ) else: self._log_info( - "[%s] [SQL parameters hidden due to hide_parameters=True]" - % (stats,) + "[%s] [SQL parameters hidden due to hide_parameters=True]", + stats, ) evt_handled: bool = False try: - if context.executemany: + if context.execute_style is ExecuteStyle.EXECUTEMANY: effective_parameters = cast( "_CoreMultiExecuteParams", effective_parameters ) @@ -1862,7 +1909,10 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): break if not evt_handled: self.dialect.do_executemany( - cursor, str_statement, effective_parameters, context + cursor, + str_statement, + effective_parameters, + context, ) elif not effective_parameters and context.no_parameters: if self.dialect._has_events: @@ -1914,6 +1964,151 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): return result + def _exec_insertmany_context( + self, + dialect: Dialect, + context: ExecutionContext, + ) -> CursorResult[Any]: + """continue the _execute_context() method for an "insertmanyvalues" + operation, which will invoke DBAPI + cursor.execute() one or more times with individual log and + event hook calls. + + """ + + if dialect.bind_typing is BindTyping.SETINPUTSIZES: + generic_setinputsizes = context._prepare_set_input_sizes() + else: + generic_setinputsizes = None + + cursor, str_statement, parameters = ( + context.cursor, + context.statement, + context.parameters, + ) + + effective_parameters = parameters + + engine_events = self._has_events or self.engine._has_events + if self.dialect._has_events: + do_execute_dispatch: Iterable[ + Any + ] = self.dialect.dispatch.do_execute + else: + do_execute_dispatch = () + + if self._echo: + stats = context._get_cache_stats() + " (insertmanyvalues)" + for ( + sub_stmt, + sub_params, + setinputsizes, + batchnum, + totalbatches, + ) in dialect._deliver_insertmanyvalues_batches( + cursor, + str_statement, + effective_parameters, + generic_setinputsizes, + context, + ): + + if setinputsizes: + try: + dialect.do_set_input_sizes( + context.cursor, setinputsizes, context + ) + except BaseException as e: + self._handle_dbapi_exception( + e, + sql_util._long_statement(sub_stmt), + sub_params, + None, + context, + ) + + if engine_events: + for fn in self.dispatch.before_cursor_execute: + sub_stmt, sub_params = fn( + self, + cursor, + sub_stmt, + sub_params, + context, + True, + ) + + if self._echo: + + self._log_info(sql_util._long_statement(sub_stmt)) + + if batchnum > 1: + stats = ( + f"insertmanyvalues batch {batchnum} " + f"of {totalbatches}" + ) + + if not self.engine.hide_parameters: + self._log_info( + "[%s] %r", + stats, + sql_util._repr_params( + sub_params, + batches=10, + ismulti=False, + ), + ) + else: + self._log_info( + "[%s] [SQL parameters hidden due to " + "hide_parameters=True]", + stats, + ) + + try: + for fn in do_execute_dispatch: + if fn( + cursor, + sub_stmt, + sub_params, + context, + ): + break + else: + dialect.do_execute(cursor, sub_stmt, sub_params, context) + + except BaseException as e: + self._handle_dbapi_exception( + e, + sql_util._long_statement(sub_stmt), + sub_params, + cursor, + context, + is_sub_exec=True, + ) + + if engine_events: + self.dispatch.after_cursor_execute( + self, + cursor, + str_statement, + effective_parameters, + context, + context.executemany, + ) + + try: + context.post_exec() + + result = context._setup_result_proxy() + + except BaseException as e: + self._handle_dbapi_exception( + e, str_statement, effective_parameters, cursor, context + ) + + return result + def _cursor_execute( self, cursor: DBAPICursor, @@ -1983,6 +2178,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): parameters: Optional[_AnyExecuteParams], cursor: Optional[DBAPICursor], context: Optional[ExecutionContext], + is_sub_exec: bool = False, ) -> NoReturn: exc_info = sys.exc_info() @@ -2001,6 +2197,11 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): invalidate_pool_on_disconnect = not is_exit_exception + ismulti: bool = ( + not is_sub_exec and context.executemany + if context is not None + else False + ) if self._reentrant_error: raise exc.DBAPIError.instance( statement, @@ -2009,7 +2210,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): self.dialect.loaded_dbapi.Error, hide_parameters=self.engine.hide_parameters, dialect=self.dialect, - ismulti=context.executemany if context is not None else None, + ismulti=ismulti, ).with_traceback(exc_info[2]) from e self._reentrant_error = True try: @@ -2030,9 +2231,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): hide_parameters=self.engine.hide_parameters, connection_invalidated=self._is_disconnect, dialect=self.dialect, - ismulti=context.executemany - if context is not None - else None, + ismulti=ismulti, ) else: sqlalchemy_exception = None |