summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/base.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-07-18 15:08:37 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-09-24 11:15:32 -0400
commit2bcc97da424eef7db9a5d02f81d02344925415ee (patch)
tree13d4f04bc7dd40a0207f86aa2fc3a3b49e065674 /lib/sqlalchemy/engine/base.py
parent332188e5680574368001ded52eb0a9d259ecdef5 (diff)
downloadsqlalchemy-2bcc97da424eef7db9a5d02f81d02344925415ee.tar.gz
implement batched INSERT..VALUES () () for executemany
the feature is enabled for all built in backends when RETURNING is used, except for Oracle that doesn't need it, and on psycopg2 and mssql+pyodbc it is used for all INSERT statements, not just those that use RETURNING. third party dialects would need to opt in to the new feature by setting use_insertmanyvalues to True. Also adds dialect-level guards against using returning with executemany where we dont have an implementation to suit it. execute single w/ returning still defers to the server without us checking. Fixes: #6047 Fixes: #7907 Change-Id: I3936d3c00003f02e322f2e43fb949d0e6e568304
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
-rw-r--r--lib/sqlalchemy/engine/base.py217
1 files changed, 208 insertions, 9 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 2b9cf602a..1b07acab5 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -12,6 +12,7 @@ import typing
from typing import Any
from typing import Callable
from typing import cast
+from typing import Iterable
from typing import Iterator
from typing import List
from typing import Mapping
@@ -29,6 +30,7 @@ from .interfaces import BindTyping
from .interfaces import ConnectionEventsTarget
from .interfaces import DBAPICursor
from .interfaces import ExceptionContext
+from .interfaces import ExecuteStyle
from .interfaces import ExecutionContext
from .util import _distill_params_20
from .util import _distill_raw_params
@@ -438,6 +440,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
:ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
describing the ORM version of ``yield_per``
+ :param insertmanyvalues_page_size: number of rows to format into an
+ INSERT statement when the statement uses "insertmanyvalues" mode,
+ which is a paged form of bulk insert that is used for many backends
+ when using :term:`executemany` execution typically in conjunction
+ with RETURNING. Defaults to 1000. May also be modified on a
+ per-engine basis using the
+ :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :ref:`engine_insertmanyvalues`
+
:param schema_translate_map: Available on: :class:`_engine.Connection`,
:class:`_engine.Engine`, :class:`_sql.Executable`.
@@ -1795,8 +1811,39 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
context.pre_exec()
+ if context.execute_style is ExecuteStyle.INSERTMANYVALUES:
+ return self._exec_insertmany_context(
+ dialect,
+ context,
+ )
+ else:
+ return self._exec_single_context(
+ dialect, context, statement, parameters
+ )
+
+ def _exec_single_context(
+ self,
+ dialect: Dialect,
+ context: ExecutionContext,
+ statement: Union[str, Compiled],
+ parameters: Optional[_AnyMultiExecuteParams],
+ ) -> CursorResult[Any]:
+ """continue the _execute_context() method for a single DBAPI
+ cursor.execute() or cursor.executemany() call.
+
+ """
if dialect.bind_typing is BindTyping.SETINPUTSIZES:
- context._set_input_sizes()
+ generic_setinputsizes = context._prepare_set_input_sizes()
+
+ if generic_setinputsizes:
+ try:
+ dialect.do_set_input_sizes(
+ context.cursor, generic_setinputsizes, context
+ )
+ except BaseException as e:
+ self._handle_dbapi_exception(
+ e, str(statement), parameters, None, context
+ )
cursor, str_statement, parameters = (
context.cursor,
@@ -1840,13 +1887,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
)
else:
self._log_info(
- "[%s] [SQL parameters hidden due to hide_parameters=True]"
- % (stats,)
+ "[%s] [SQL parameters hidden due to hide_parameters=True]",
+ stats,
)
evt_handled: bool = False
try:
- if context.executemany:
+ if context.execute_style is ExecuteStyle.EXECUTEMANY:
effective_parameters = cast(
"_CoreMultiExecuteParams", effective_parameters
)
@@ -1862,7 +1909,10 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
break
if not evt_handled:
self.dialect.do_executemany(
- cursor, str_statement, effective_parameters, context
+ cursor,
+ str_statement,
+ effective_parameters,
+ context,
)
elif not effective_parameters and context.no_parameters:
if self.dialect._has_events:
@@ -1914,6 +1964,151 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return result
+ def _exec_insertmany_context(
+ self,
+ dialect: Dialect,
+ context: ExecutionContext,
+ ) -> CursorResult[Any]:
+ """continue the _execute_context() method for an "insertmanyvalues"
+ operation, which will invoke DBAPI
+ cursor.execute() one or more times with individual log and
+ event hook calls.
+
+ """
+
+ if dialect.bind_typing is BindTyping.SETINPUTSIZES:
+ generic_setinputsizes = context._prepare_set_input_sizes()
+ else:
+ generic_setinputsizes = None
+
+ cursor, str_statement, parameters = (
+ context.cursor,
+ context.statement,
+ context.parameters,
+ )
+
+ effective_parameters = parameters
+
+ engine_events = self._has_events or self.engine._has_events
+ if self.dialect._has_events:
+ do_execute_dispatch: Iterable[
+ Any
+ ] = self.dialect.dispatch.do_execute
+ else:
+ do_execute_dispatch = ()
+
+ if self._echo:
+ stats = context._get_cache_stats() + " (insertmanyvalues)"
+ for (
+ sub_stmt,
+ sub_params,
+ setinputsizes,
+ batchnum,
+ totalbatches,
+ ) in dialect._deliver_insertmanyvalues_batches(
+ cursor,
+ str_statement,
+ effective_parameters,
+ generic_setinputsizes,
+ context,
+ ):
+
+ if setinputsizes:
+ try:
+ dialect.do_set_input_sizes(
+ context.cursor, setinputsizes, context
+ )
+ except BaseException as e:
+ self._handle_dbapi_exception(
+ e,
+ sql_util._long_statement(sub_stmt),
+ sub_params,
+ None,
+ context,
+ )
+
+ if engine_events:
+ for fn in self.dispatch.before_cursor_execute:
+ sub_stmt, sub_params = fn(
+ self,
+ cursor,
+ sub_stmt,
+ sub_params,
+ context,
+ True,
+ )
+
+ if self._echo:
+
+ self._log_info(sql_util._long_statement(sub_stmt))
+
+ if batchnum > 1:
+ stats = (
+ f"insertmanyvalues batch {batchnum} "
+ f"of {totalbatches}"
+ )
+
+ if not self.engine.hide_parameters:
+ self._log_info(
+ "[%s] %r",
+ stats,
+ sql_util._repr_params(
+ sub_params,
+ batches=10,
+ ismulti=False,
+ ),
+ )
+ else:
+ self._log_info(
+ "[%s] [SQL parameters hidden due to "
+ "hide_parameters=True]",
+ stats,
+ )
+
+ try:
+ for fn in do_execute_dispatch:
+ if fn(
+ cursor,
+ sub_stmt,
+ sub_params,
+ context,
+ ):
+ break
+ else:
+ dialect.do_execute(cursor, sub_stmt, sub_params, context)
+
+ except BaseException as e:
+ self._handle_dbapi_exception(
+ e,
+ sql_util._long_statement(sub_stmt),
+ sub_params,
+ cursor,
+ context,
+ is_sub_exec=True,
+ )
+
+ if engine_events:
+ self.dispatch.after_cursor_execute(
+ self,
+ cursor,
+ str_statement,
+ effective_parameters,
+ context,
+ context.executemany,
+ )
+
+ try:
+ context.post_exec()
+
+ result = context._setup_result_proxy()
+
+ except BaseException as e:
+ self._handle_dbapi_exception(
+ e, str_statement, effective_parameters, cursor, context
+ )
+
+ return result
+
def _cursor_execute(
self,
cursor: DBAPICursor,
@@ -1983,6 +2178,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
parameters: Optional[_AnyExecuteParams],
cursor: Optional[DBAPICursor],
context: Optional[ExecutionContext],
+ is_sub_exec: bool = False,
) -> NoReturn:
exc_info = sys.exc_info()
@@ -2001,6 +2197,11 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
invalidate_pool_on_disconnect = not is_exit_exception
+ ismulti: bool = (
+ not is_sub_exec and context.executemany
+ if context is not None
+ else False
+ )
if self._reentrant_error:
raise exc.DBAPIError.instance(
statement,
@@ -2009,7 +2210,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self.dialect.loaded_dbapi.Error,
hide_parameters=self.engine.hide_parameters,
dialect=self.dialect,
- ismulti=context.executemany if context is not None else None,
+ ismulti=ismulti,
).with_traceback(exc_info[2]) from e
self._reentrant_error = True
try:
@@ -2030,9 +2231,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
hide_parameters=self.engine.hide_parameters,
connection_invalidated=self._is_disconnect,
dialect=self.dialect,
- ismulti=context.executemany
- if context is not None
- else None,
+ ismulti=ismulti,
)
else:
sqlalchemy_exception = None