diff options
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 217 |
1 files changed, 208 insertions, 9 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 2b9cf602a..1b07acab5 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -12,6 +12,7 @@ import typing from typing import Any from typing import Callable from typing import cast +from typing import Iterable from typing import Iterator from typing import List from typing import Mapping @@ -29,6 +30,7 @@ from .interfaces import BindTyping from .interfaces import ConnectionEventsTarget from .interfaces import DBAPICursor from .interfaces import ExceptionContext +from .interfaces import ExecuteStyle from .interfaces import ExecutionContext from .util import _distill_params_20 from .util import _distill_raw_params @@ -438,6 +440,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` describing the ORM version of ``yield_per`` + :param insertmanyvalues_page_size: number of rows to format into an + INSERT statement when the statement uses "insertmanyvalues" mode, + which is a paged form of bulk insert that is used for many backends + when using :term:`executemany` execution typically in conjunction + with RETURNING. Defaults to 1000. May also be modified on a + per-engine basis using the + :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. + + .. versionadded:: 2.0 + + .. seealso:: + + :ref:`engine_insertmanyvalues` + :param schema_translate_map: Available on: :class:`_engine.Connection`, :class:`_engine.Engine`, :class:`_sql.Executable`. @@ -1795,8 +1811,39 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): context.pre_exec() + if context.execute_style is ExecuteStyle.INSERTMANYVALUES: + return self._exec_insertmany_context( + dialect, + context, + ) + else: + return self._exec_single_context( + dialect, context, statement, parameters + ) + + def _exec_single_context( + self, + dialect: Dialect, + context: ExecutionContext, + statement: Union[str, Compiled], + parameters: Optional[_AnyMultiExecuteParams], + ) -> CursorResult[Any]: + """continue the _execute_context() method for a single DBAPI + cursor.execute() or cursor.executemany() call. + + """ if dialect.bind_typing is BindTyping.SETINPUTSIZES: - context._set_input_sizes() + generic_setinputsizes = context._prepare_set_input_sizes() + + if generic_setinputsizes: + try: + dialect.do_set_input_sizes( + context.cursor, generic_setinputsizes, context + ) + except BaseException as e: + self._handle_dbapi_exception( + e, str(statement), parameters, None, context + ) cursor, str_statement, parameters = ( context.cursor, @@ -1840,13 +1887,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): ) else: self._log_info( - "[%s] [SQL parameters hidden due to hide_parameters=True]" - % (stats,) + "[%s] [SQL parameters hidden due to hide_parameters=True]", + stats, ) evt_handled: bool = False try: - if context.executemany: + if context.execute_style is ExecuteStyle.EXECUTEMANY: effective_parameters = cast( "_CoreMultiExecuteParams", effective_parameters ) @@ -1862,7 +1909,10 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): break if not evt_handled: self.dialect.do_executemany( - cursor, str_statement, effective_parameters, context + cursor, + str_statement, + effective_parameters, + context, ) elif not effective_parameters and context.no_parameters: if self.dialect._has_events: @@ -1914,6 +1964,151 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): return result + def _exec_insertmany_context( + self, + dialect: Dialect, + context: ExecutionContext, + ) -> CursorResult[Any]: + """continue the _execute_context() method for an "insertmanyvalues" + operation, which will invoke DBAPI + cursor.execute() one or more times with individual log and + event hook calls. + + """ + + if dialect.bind_typing is BindTyping.SETINPUTSIZES: + generic_setinputsizes = context._prepare_set_input_sizes() + else: + generic_setinputsizes = None + + cursor, str_statement, parameters = ( + context.cursor, + context.statement, + context.parameters, + ) + + effective_parameters = parameters + + engine_events = self._has_events or self.engine._has_events + if self.dialect._has_events: + do_execute_dispatch: Iterable[ + Any + ] = self.dialect.dispatch.do_execute + else: + do_execute_dispatch = () + + if self._echo: + stats = context._get_cache_stats() + " (insertmanyvalues)" + for ( + sub_stmt, + sub_params, + setinputsizes, + batchnum, + totalbatches, + ) in dialect._deliver_insertmanyvalues_batches( + cursor, + str_statement, + effective_parameters, + generic_setinputsizes, + context, + ): + + if setinputsizes: + try: + dialect.do_set_input_sizes( + context.cursor, setinputsizes, context + ) + except BaseException as e: + self._handle_dbapi_exception( + e, + sql_util._long_statement(sub_stmt), + sub_params, + None, + context, + ) + + if engine_events: + for fn in self.dispatch.before_cursor_execute: + sub_stmt, sub_params = fn( + self, + cursor, + sub_stmt, + sub_params, + context, + True, + ) + + if self._echo: + + self._log_info(sql_util._long_statement(sub_stmt)) + + if batchnum > 1: + stats = ( + f"insertmanyvalues batch {batchnum} " + f"of {totalbatches}" + ) + + if not self.engine.hide_parameters: + self._log_info( + "[%s] %r", + stats, + sql_util._repr_params( + sub_params, + batches=10, + ismulti=False, + ), + ) + else: + self._log_info( + "[%s] [SQL parameters hidden due to " + "hide_parameters=True]", + stats, + ) + + try: + for fn in do_execute_dispatch: + if fn( + cursor, + sub_stmt, + sub_params, + context, + ): + break + else: + dialect.do_execute(cursor, sub_stmt, sub_params, context) + + except BaseException as e: + self._handle_dbapi_exception( + e, + sql_util._long_statement(sub_stmt), + sub_params, + cursor, + context, + is_sub_exec=True, + ) + + if engine_events: + self.dispatch.after_cursor_execute( + self, + cursor, + str_statement, + effective_parameters, + context, + context.executemany, + ) + + try: + context.post_exec() + + result = context._setup_result_proxy() + + except BaseException as e: + self._handle_dbapi_exception( + e, str_statement, effective_parameters, cursor, context + ) + + return result + def _cursor_execute( self, cursor: DBAPICursor, @@ -1983,6 +2178,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): parameters: Optional[_AnyExecuteParams], cursor: Optional[DBAPICursor], context: Optional[ExecutionContext], + is_sub_exec: bool = False, ) -> NoReturn: exc_info = sys.exc_info() @@ -2001,6 +2197,11 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): invalidate_pool_on_disconnect = not is_exit_exception + ismulti: bool = ( + not is_sub_exec and context.executemany + if context is not None + else False + ) if self._reentrant_error: raise exc.DBAPIError.instance( statement, @@ -2009,7 +2210,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): self.dialect.loaded_dbapi.Error, hide_parameters=self.engine.hide_parameters, dialect=self.dialect, - ismulti=context.executemany if context is not None else None, + ismulti=ismulti, ).with_traceback(exc_info[2]) from e self._reentrant_error = True try: @@ -2030,9 +2231,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): hide_parameters=self.engine.hide_parameters, connection_invalidated=self._is_disconnect, dialect=self.dialect, - ismulti=context.executemany - if context is not None - else None, + ismulti=ismulti, ) else: sqlalchemy_exception = None |