summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
-rw-r--r--lib/sqlalchemy/engine/base.py217
1 files changed, 208 insertions, 9 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 2b9cf602a..1b07acab5 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -12,6 +12,7 @@ import typing
from typing import Any
from typing import Callable
from typing import cast
+from typing import Iterable
from typing import Iterator
from typing import List
from typing import Mapping
@@ -29,6 +30,7 @@ from .interfaces import BindTyping
from .interfaces import ConnectionEventsTarget
from .interfaces import DBAPICursor
from .interfaces import ExceptionContext
+from .interfaces import ExecuteStyle
from .interfaces import ExecutionContext
from .util import _distill_params_20
from .util import _distill_raw_params
@@ -438,6 +440,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
:ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
describing the ORM version of ``yield_per``
+ :param insertmanyvalues_page_size: number of rows to format into an
+ INSERT statement when the statement uses "insertmanyvalues" mode,
+ which is a paged form of bulk insert that is used for many backends
+ when using :term:`executemany` execution typically in conjunction
+ with RETURNING. Defaults to 1000. May also be modified on a
+ per-engine basis using the
+ :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :ref:`engine_insertmanyvalues`
+
:param schema_translate_map: Available on: :class:`_engine.Connection`,
:class:`_engine.Engine`, :class:`_sql.Executable`.
@@ -1795,8 +1811,39 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
context.pre_exec()
+ if context.execute_style is ExecuteStyle.INSERTMANYVALUES:
+ return self._exec_insertmany_context(
+ dialect,
+ context,
+ )
+ else:
+ return self._exec_single_context(
+ dialect, context, statement, parameters
+ )
+
+ def _exec_single_context(
+ self,
+ dialect: Dialect,
+ context: ExecutionContext,
+ statement: Union[str, Compiled],
+ parameters: Optional[_AnyMultiExecuteParams],
+ ) -> CursorResult[Any]:
+ """continue the _execute_context() method for a single DBAPI
+ cursor.execute() or cursor.executemany() call.
+
+ """
if dialect.bind_typing is BindTyping.SETINPUTSIZES:
- context._set_input_sizes()
+ generic_setinputsizes = context._prepare_set_input_sizes()
+
+ if generic_setinputsizes:
+ try:
+ dialect.do_set_input_sizes(
+ context.cursor, generic_setinputsizes, context
+ )
+ except BaseException as e:
+ self._handle_dbapi_exception(
+ e, str(statement), parameters, None, context
+ )
cursor, str_statement, parameters = (
context.cursor,
@@ -1840,13 +1887,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
)
else:
self._log_info(
- "[%s] [SQL parameters hidden due to hide_parameters=True]"
- % (stats,)
+ "[%s] [SQL parameters hidden due to hide_parameters=True]",
+ stats,
)
evt_handled: bool = False
try:
- if context.executemany:
+ if context.execute_style is ExecuteStyle.EXECUTEMANY:
effective_parameters = cast(
"_CoreMultiExecuteParams", effective_parameters
)
@@ -1862,7 +1909,10 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
break
if not evt_handled:
self.dialect.do_executemany(
- cursor, str_statement, effective_parameters, context
+ cursor,
+ str_statement,
+ effective_parameters,
+ context,
)
elif not effective_parameters and context.no_parameters:
if self.dialect._has_events:
@@ -1914,6 +1964,151 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return result
+ def _exec_insertmany_context(
+ self,
+ dialect: Dialect,
+ context: ExecutionContext,
+ ) -> CursorResult[Any]:
+ """continue the _execute_context() method for an "insertmanyvalues"
+ operation, which will invoke DBAPI
+ cursor.execute() one or more times with individual log and
+ event hook calls.
+
+ """
+
+ if dialect.bind_typing is BindTyping.SETINPUTSIZES:
+ generic_setinputsizes = context._prepare_set_input_sizes()
+ else:
+ generic_setinputsizes = None
+
+ cursor, str_statement, parameters = (
+ context.cursor,
+ context.statement,
+ context.parameters,
+ )
+
+ effective_parameters = parameters
+
+ engine_events = self._has_events or self.engine._has_events
+ if self.dialect._has_events:
+ do_execute_dispatch: Iterable[
+ Any
+ ] = self.dialect.dispatch.do_execute
+ else:
+ do_execute_dispatch = ()
+
+ if self._echo:
+ stats = context._get_cache_stats() + " (insertmanyvalues)"
+ for (
+ sub_stmt,
+ sub_params,
+ setinputsizes,
+ batchnum,
+ totalbatches,
+ ) in dialect._deliver_insertmanyvalues_batches(
+ cursor,
+ str_statement,
+ effective_parameters,
+ generic_setinputsizes,
+ context,
+ ):
+
+ if setinputsizes:
+ try:
+ dialect.do_set_input_sizes(
+ context.cursor, setinputsizes, context
+ )
+ except BaseException as e:
+ self._handle_dbapi_exception(
+ e,
+ sql_util._long_statement(sub_stmt),
+ sub_params,
+ None,
+ context,
+ )
+
+ if engine_events:
+ for fn in self.dispatch.before_cursor_execute:
+ sub_stmt, sub_params = fn(
+ self,
+ cursor,
+ sub_stmt,
+ sub_params,
+ context,
+ True,
+ )
+
+ if self._echo:
+
+ self._log_info(sql_util._long_statement(sub_stmt))
+
+ if batchnum > 1:
+ stats = (
+ f"insertmanyvalues batch {batchnum} "
+ f"of {totalbatches}"
+ )
+
+ if not self.engine.hide_parameters:
+ self._log_info(
+ "[%s] %r",
+ stats,
+ sql_util._repr_params(
+ sub_params,
+ batches=10,
+ ismulti=False,
+ ),
+ )
+ else:
+ self._log_info(
+ "[%s] [SQL parameters hidden due to "
+ "hide_parameters=True]",
+ stats,
+ )
+
+ try:
+ for fn in do_execute_dispatch:
+ if fn(
+ cursor,
+ sub_stmt,
+ sub_params,
+ context,
+ ):
+ break
+ else:
+ dialect.do_execute(cursor, sub_stmt, sub_params, context)
+
+ except BaseException as e:
+ self._handle_dbapi_exception(
+ e,
+ sql_util._long_statement(sub_stmt),
+ sub_params,
+ cursor,
+ context,
+ is_sub_exec=True,
+ )
+
+ if engine_events:
+ self.dispatch.after_cursor_execute(
+ self,
+ cursor,
+ str_statement,
+ effective_parameters,
+ context,
+ context.executemany,
+ )
+
+ try:
+ context.post_exec()
+
+ result = context._setup_result_proxy()
+
+ except BaseException as e:
+ self._handle_dbapi_exception(
+ e, str_statement, effective_parameters, cursor, context
+ )
+
+ return result
+
def _cursor_execute(
self,
cursor: DBAPICursor,
@@ -1983,6 +2178,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
parameters: Optional[_AnyExecuteParams],
cursor: Optional[DBAPICursor],
context: Optional[ExecutionContext],
+ is_sub_exec: bool = False,
) -> NoReturn:
exc_info = sys.exc_info()
@@ -2001,6 +2197,11 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
invalidate_pool_on_disconnect = not is_exit_exception
+ ismulti: bool = (
+ not is_sub_exec and context.executemany
+ if context is not None
+ else False
+ )
if self._reentrant_error:
raise exc.DBAPIError.instance(
statement,
@@ -2009,7 +2210,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self.dialect.loaded_dbapi.Error,
hide_parameters=self.engine.hide_parameters,
dialect=self.dialect,
- ismulti=context.executemany if context is not None else None,
+ ismulti=ismulti,
).with_traceback(exc_info[2]) from e
self._reentrant_error = True
try:
@@ -2030,9 +2231,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
hide_parameters=self.engine.hide_parameters,
connection_invalidated=self._is_disconnect,
dialect=self.dialect,
- ismulti=context.executemany
- if context is not None
- else None,
+ ismulti=ismulti,
)
else:
sqlalchemy_exception = None