From f1a3038f480ee1965928cdcd1dc0c47347f270bc Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Tue, 23 Jun 2020 16:21:04 -0400 Subject: Default psycopg2 executemany mode to "values_only" The psycopg2 dialect now defaults to using the very performant ``execute_values()`` psycopg2 extension for compiled INSERT statements, and also impements RETURNING support when this extension is used. This allows INSERT statements that even include an autoincremented SERIAL or IDENTITY value to run very fast while still being able to return the newly generated primary key values. The ORM will then integrate this new feature in a separate change. Implements RETURNING for insert with executemany Adds support to return_defaults() mode and inserted_primary_key to support mutiple INSERTed rows, via return_defauls_rows and inserted_primary_key_rows accessors. within default execution context, new cached compiler getters are used to fetch primary keys from rows inserted_primary_key now returns a plain tuple. this is not yet a row-like object however this can be added. Adds distinct "values_only" and "batch" modes, as "values" has a lot of benefits but "batch" breaks cursor.rowcount psycopg2 minimum version 2.7 so we can remove the large number of checks for very old versions of psycopg2 simplify tests to no longer distinguish between native and non-native json Fixes: #5401 Change-Id: Ic08fd3423d4c5d16ca50994460c0c234868bd61c --- lib/sqlalchemy/dialects/postgresql/psycopg2.py | 251 +++++++++++++------------ 1 file changed, 128 insertions(+), 123 deletions(-) (limited to 'lib/sqlalchemy/dialects/postgresql/psycopg2.py') diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index a9408bcb0..6364838a6 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -61,9 +61,6 @@ psycopg2-specific keyword arguments which are accepted by :ref:`psycopg2_executemany_mode` -* ``use_batch_mode``: this is the previous setting used to affect "executemany" - mode and is now deprecated. - Unix Domain Connections ------------------------ @@ -155,66 +152,82 @@ Modern versions of psycopg2 include a feature known as `_, which have been shown in benchmarking to improve psycopg2's executemany() performance, primarily with INSERT statements, by multiple orders of magnitude. -SQLAlchemy allows this extension to be used for all ``executemany()`` style -calls invoked by an :class:`_engine.Engine` -when used with :ref:`multiple parameter -sets `, which includes the use of this feature both by the -Core as well as by the ORM for inserts of objects with non-autogenerated -primary key values, by adding the ``executemany_mode`` flag to -:func:`_sa.create_engine`:: +SQLAlchemy internally makes use of these extensions for ``executemany()`` style +calls, which correspond to lists of parameters being passed to +:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter +sets `. The ORM also uses this mode internally whenever +possible. + +The two available extensions on the psycopg2 side are the ``execute_values()`` +and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the +``execute_values()`` extension for all qualifying INSERT statements. + +.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode + ``"values_only"`` for ``executemany_mode``, which allows an order of + magnitude performance improvement for INSERT statements, but does not + include "batch" mode for UPDATE and DELETE statements which removes the + ability of ``cursor.rowcount`` to function correctly. + +The use of these extensions is controlled by the ``executemany_mode`` flag +which may be passed to :func:`_sa.create_engine`:: engine = create_engine( "postgresql+psycopg2://scott:tiger@host/dbname", - executemany_mode='batch') - + executemany_mode='values_plus_batch') -.. versionchanged:: 1.3.7 - the ``use_batch_mode`` flag has been superseded - by a new parameter ``executemany_mode`` which provides support both for - psycopg2's ``execute_batch`` helper as well as the ``execute_values`` - helper. Possible options for ``executemany_mode`` include: -* ``None`` - By default, psycopg2's extensions are not used, and the usual - ``cursor.executemany()`` method is used when invoking batches of statements. +* ``values_only`` - this is the default value. the psycopg2 execute_values() + extension is used for qualifying INSERT statements, which rewrites the INSERT + to include multiple VALUES clauses so that many parameter sets can be + inserted with one statement. -* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` so that multiple copies + .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode`` + which is also now the default. + +* ``None`` - No psycopg2 extensions are not used, and the usual + ``cursor.executemany()`` method is used when invoking statements with + multiple parameter sets. + +* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying + INSERT, UPDATE and DELETE statements, so that multiple copies of a SQL query, each one corresponding to a parameter set passed to ``executemany()``, are joined into a single SQL string separated by a - semicolon. This is the same behavior as was provided by the - ``use_batch_mode=True`` flag. - -* ``'values'``- For Core :func:`_expression.insert` - constructs only (including those - emitted by the ORM automatically), the ``psycopg2.extras.execute_values`` - extension is used so that multiple parameter sets are grouped into a single - INSERT statement and joined together with multiple VALUES expressions. This - method requires that the string text of the VALUES clause inside the - INSERT statement is manipulated, so is only supported with a compiled - :func:`_expression.insert` construct where the format is predictable. - For all other - constructs, including plain textual INSERT statements not rendered by the - SQLAlchemy expression language compiler, the - ``psycopg2.extras.execute_batch`` method is used. It is therefore important - to note that **"values" mode implies that "batch" mode is also used for - all statements for which "values" mode does not apply**. - -For both strategies, the ``executemany_batch_page_size`` and -``executemany_values_page_size`` arguments control how many parameter sets -should be represented in each execution. Because "values" mode implies a -fallback down to "batch" mode for non-INSERT statements, there are two -independent page size arguments. For each, the default value of ``None`` means -to use psycopg2's defaults, which at the time of this writing are quite low at -100. For the ``execute_values`` method, a number as high as 10000 may prove -to be performant, whereas for ``execute_batch``, as the number represents -full statements repeated, a number closer to the default of 100 is likely -more appropriate:: + semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount` + attribute will not contain a value for executemany-style executions. + +* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT + statements, ``execute_batch`` is used for UPDATE and DELETE. + When using this mode, the :attr:`_engine.CursorResult.rowcount` + attribute will not contain a value for executemany-style executions against + UPDATE and DELETE statements. + +By "qualifying statements", we mean that the statement being executed +must be a Core :func:`_expression.insert`, :func:`_expression.update` +or :func:`_expression.delete` construct, and not a plain textual SQL +string or one constructed using :func:`_expression.text`. When using the +ORM, all insert/update/delete statements used by the ORM flush process +are qualifying. + +The "page size" for the "values" and "batch" strategies can be affected +by using the ``executemany_batch_page_size`` and +``executemany_values_page_size`` engine parameters. These +control how many parameter sets +should be represented in each execution. The "values" page size defaults +to 1000, which is different that psycopg2's default. The "batch" page +size defaults to 100. These can be affected by passing new values to +:func:`_engine.create_engine`:: engine = create_engine( "postgresql+psycopg2://scott:tiger@host/dbname", executemany_mode='values', executemany_values_page_size=10000, executemany_batch_page_size=500) +.. versionchanged:: 1.4 + + The default for ``executemany_values_page_size`` is now 1000, up from + 100. .. seealso:: @@ -223,10 +236,6 @@ more appropriate:: object to execute statements in such a way as to make use of the DBAPI ``.executemany()`` method. -.. versionchanged:: 1.3.7 - Added support for - ``psycopg2.extras.execute_values``. The ``use_batch_mode`` flag is - superseded by the ``executemany_mode`` flag. - .. _psycopg2_unicode: @@ -474,6 +483,7 @@ from ... import exc from ... import processors from ... import types as sqltypes from ... import util +from ...engine import cursor as _cursor from ...util import collections_abc try: @@ -546,18 +556,12 @@ class _PGHStore(HSTORE): class _PGJSON(JSON): def result_processor(self, dialect, coltype): - if dialect._has_native_json: - return None - else: - return super(_PGJSON, self).result_processor(dialect, coltype) + return None class _PGJSONB(JSONB): def result_processor(self, dialect, coltype): - if dialect._has_native_jsonb: - return None - else: - return super(_PGJSONB, self).result_processor(dialect, coltype) + return None class _PGUUID(UUID): @@ -586,6 +590,8 @@ _server_side_id = util.counter() class PGExecutionContext_psycopg2(PGExecutionContext): + _psycopg2_fetched_rows = None + def create_server_side_cursor(self): # use server-side cursors: # http://lists.initd.org/pipermail/psycopg/2007-January/005251.html @@ -593,6 +599,22 @@ class PGExecutionContext_psycopg2(PGExecutionContext): return self._dbapi_connection.cursor(ident) def post_exec(self): + if ( + self._psycopg2_fetched_rows + and self.compiled + and self.compiled.returning + ): + # psycopg2 execute_values will provide for a real cursor where + # cursor.description works correctly. however, it executes the + # INSERT statement multiple times for multiple pages of rows, so + # while this cursor also supports calling .fetchall() directly, in + # order to get the list of all rows inserted across multiple pages, + # we have to retrieve the aggregated list from the execute_values() + # function directly. + strat_cls = _cursor.FullyBufferedCursorFetchStrategy + self.cursor_fetch_strategy = strat_cls( + self.cursor, initial_buffer=self._psycopg2_fetched_rows + ) self._log_notices(self.cursor) def _log_notices(self, cursor): @@ -621,9 +643,13 @@ class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer): pass -EXECUTEMANY_DEFAULT = util.symbol("executemany_default") -EXECUTEMANY_BATCH = util.symbol("executemany_batch") -EXECUTEMANY_VALUES = util.symbol("executemany_values") +EXECUTEMANY_DEFAULT = util.symbol("executemany_default", canonical=0) +EXECUTEMANY_BATCH = util.symbol("executemany_batch", canonical=1) +EXECUTEMANY_VALUES = util.symbol("executemany_values", canonical=2) +EXECUTEMANY_VALUES_PLUS_BATCH = util.symbol( + "executemany_values_plus_batch", + canonical=EXECUTEMANY_BATCH | EXECUTEMANY_VALUES, +) class PGDialect_psycopg2(PGDialect): @@ -641,17 +667,7 @@ class PGDialect_psycopg2(PGDialect): preparer = PGIdentifierPreparer_psycopg2 psycopg2_version = (0, 0) - FEATURE_VERSION_MAP = dict( - native_json=(2, 5), - native_jsonb=(2, 5, 4), - sane_multi_rowcount=(2, 0, 9), - array_oid=(2, 4, 3), - hstore_adapter=(2, 4), - ) - - _has_native_hstore = False - _has_native_json = False - _has_native_jsonb = False + _has_native_hstore = True engine_config_types = PGDialect.engine_config_types.union( {"use_native_unicode": util.asbool} @@ -671,13 +687,6 @@ class PGDialect_psycopg2(PGDialect): }, ) - @util.deprecated_params( - use_batch_mode=( - "1.3.7", - "The psycopg2 use_batch_mode flag is superseded by " - "executemany_mode='batch'", - ) - ) def __init__( self, server_side_cursors=False, @@ -685,15 +694,16 @@ class PGDialect_psycopg2(PGDialect): client_encoding=None, use_native_hstore=True, use_native_uuid=True, - executemany_mode=None, - executemany_batch_page_size=None, - executemany_values_page_size=None, - use_batch_mode=None, + executemany_mode="values_only", + executemany_batch_page_size=100, + executemany_values_page_size=1000, **kwargs ): PGDialect.__init__(self, **kwargs) self.server_side_cursors = server_side_cursors self.use_native_unicode = use_native_unicode + if not use_native_hstore: + self._has_native_hstore = False self.use_native_hstore = use_native_hstore self.use_native_uuid = use_native_uuid self.supports_unicode_binds = use_native_unicode @@ -706,12 +716,14 @@ class PGDialect_psycopg2(PGDialect): { EXECUTEMANY_DEFAULT: [None], EXECUTEMANY_BATCH: ["batch"], - EXECUTEMANY_VALUES: ["values"], + EXECUTEMANY_VALUES: ["values_only"], + EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"], }, "executemany_mode", ) - if use_batch_mode: - self.executemany_mode = EXECUTEMANY_BATCH + + if self.executemany_mode & EXECUTEMANY_VALUES: + self.insert_executemany_returning = True self.executemany_batch_page_size = executemany_batch_page_size self.executemany_values_page_size = executemany_values_page_size @@ -723,24 +735,21 @@ class PGDialect_psycopg2(PGDialect): int(x) for x in m.group(1, 2, 3) if x is not None ) + if self.psycopg2_version < (2, 7): + raise ImportError( + "psycopg2 version 2.7 or higher is required." + ) + def initialize(self, connection): super(PGDialect_psycopg2, self).initialize(connection) self._has_native_hstore = ( self.use_native_hstore and self._hstore_oids(connection.connection) is not None ) - self._has_native_json = ( - self.psycopg2_version >= self.FEATURE_VERSION_MAP["native_json"] - ) - self._has_native_jsonb = ( - self.psycopg2_version >= self.FEATURE_VERSION_MAP["native_jsonb"] - ) # http://initd.org/psycopg/docs/news.html#what-s-new-in-psycopg-2-0-9 - self.supports_sane_multi_rowcount = ( - self.psycopg2_version - >= self.FEATURE_VERSION_MAP["sane_multi_rowcount"] - and self.executemany_mode is EXECUTEMANY_DEFAULT + self.supports_sane_multi_rowcount = not ( + self.executemany_mode & EXECUTEMANY_BATCH ) @classmethod @@ -830,11 +839,7 @@ class PGDialect_psycopg2(PGDialect): kw = {"oid": oid} if util.py2k: kw["unicode"] = True - if ( - self.psycopg2_version - >= self.FEATURE_VERSION_MAP["array_oid"] - ): - kw["array_oid"] = array_oid + kw["array_oid"] = array_oid extras.register_hstore(conn, **kw) fns.append(on_connect) @@ -842,14 +847,12 @@ class PGDialect_psycopg2(PGDialect): if self.dbapi and self._json_deserializer: def on_connect(conn): - if self._has_native_json: - extras.register_default_json( - conn, loads=self._json_deserializer - ) - if self._has_native_jsonb: - extras.register_default_jsonb( - conn, loads=self._json_deserializer - ) + extras.register_default_json( + conn, loads=self._json_deserializer + ) + extras.register_default_jsonb( + conn, loads=self._json_deserializer + ) fns.append(on_connect) @@ -864,12 +867,8 @@ class PGDialect_psycopg2(PGDialect): return None def do_executemany(self, cursor, statement, parameters, context=None): - if self.executemany_mode is EXECUTEMANY_DEFAULT: - cursor.executemany(statement, parameters) - return - if ( - self.executemany_mode is EXECUTEMANY_VALUES + self.executemany_mode & EXECUTEMANY_VALUES and context and context.isinsert and context.compiled.insert_single_values_expr @@ -893,15 +892,17 @@ class PGDialect_psycopg2(PGDialect): kwargs = {"page_size": self.executemany_values_page_size} else: kwargs = {} - self._psycopg2_extras().execute_values( + xtras = self._psycopg2_extras() + context._psycopg2_fetched_rows = xtras.execute_values( cursor, statement, parameters, template=executemany_values, + fetch=bool(context.compiled.returning), **kwargs ) - else: + elif self.executemany_mode & EXECUTEMANY_BATCH: if self.executemany_batch_page_size: kwargs = {"page_size": self.executemany_batch_page_size} else: @@ -909,15 +910,19 @@ class PGDialect_psycopg2(PGDialect): self._psycopg2_extras().execute_batch( cursor, statement, parameters, **kwargs ) + else: + cursor.executemany(statement, parameters) @util.memoized_instancemethod def _hstore_oids(self, conn): - if self.psycopg2_version >= self.FEATURE_VERSION_MAP["hstore_adapter"]: - extras = self._psycopg2_extras() - oids = extras.HstoreAdapter.get_oids(conn) - if oids is not None and oids[0]: - return oids[0:2] - return None + extras = self._psycopg2_extras() + if hasattr(conn, "connection"): + conn = conn.connection + oids = extras.HstoreAdapter.get_oids(conn) + if oids is not None and oids[0]: + return oids[0:2] + else: + return None def create_connect_args(self, url): opts = url.translate_connect_args(username="user") -- cgit v1.2.1