diff options
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql/psycopg2.py')
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/psycopg2.py | 251 |
1 files changed, 128 insertions, 123 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index a9408bcb0..6364838a6 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -61,9 +61,6 @@ psycopg2-specific keyword arguments which are accepted by :ref:`psycopg2_executemany_mode` -* ``use_batch_mode``: this is the previous setting used to affect "executemany" - mode and is now deprecated. - Unix Domain Connections ------------------------ @@ -155,66 +152,82 @@ Modern versions of psycopg2 include a feature known as <http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which have been shown in benchmarking to improve psycopg2's executemany() performance, primarily with INSERT statements, by multiple orders of magnitude. -SQLAlchemy allows this extension to be used for all ``executemany()`` style -calls invoked by an :class:`_engine.Engine` -when used with :ref:`multiple parameter -sets <execute_multiple>`, which includes the use of this feature both by the -Core as well as by the ORM for inserts of objects with non-autogenerated -primary key values, by adding the ``executemany_mode`` flag to -:func:`_sa.create_engine`:: +SQLAlchemy internally makes use of these extensions for ``executemany()`` style +calls, which correspond to lists of parameters being passed to +:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter +sets <execute_multiple>`. The ORM also uses this mode internally whenever +possible. + +The two available extensions on the psycopg2 side are the ``execute_values()`` +and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the +``execute_values()`` extension for all qualifying INSERT statements. + +.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode + ``"values_only"`` for ``executemany_mode``, which allows an order of + magnitude performance improvement for INSERT statements, but does not + include "batch" mode for UPDATE and DELETE statements which removes the + ability of ``cursor.rowcount`` to function correctly. + +The use of these extensions is controlled by the ``executemany_mode`` flag +which may be passed to :func:`_sa.create_engine`:: engine = create_engine( "postgresql+psycopg2://scott:tiger@host/dbname", - executemany_mode='batch') - + executemany_mode='values_plus_batch') -.. versionchanged:: 1.3.7 - the ``use_batch_mode`` flag has been superseded - by a new parameter ``executemany_mode`` which provides support both for - psycopg2's ``execute_batch`` helper as well as the ``execute_values`` - helper. Possible options for ``executemany_mode`` include: -* ``None`` - By default, psycopg2's extensions are not used, and the usual - ``cursor.executemany()`` method is used when invoking batches of statements. +* ``values_only`` - this is the default value. the psycopg2 execute_values() + extension is used for qualifying INSERT statements, which rewrites the INSERT + to include multiple VALUES clauses so that many parameter sets can be + inserted with one statement. -* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` so that multiple copies + .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode`` + which is also now the default. + +* ``None`` - No psycopg2 extensions are not used, and the usual + ``cursor.executemany()`` method is used when invoking statements with + multiple parameter sets. + +* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying + INSERT, UPDATE and DELETE statements, so that multiple copies of a SQL query, each one corresponding to a parameter set passed to ``executemany()``, are joined into a single SQL string separated by a - semicolon. This is the same behavior as was provided by the - ``use_batch_mode=True`` flag. - -* ``'values'``- For Core :func:`_expression.insert` - constructs only (including those - emitted by the ORM automatically), the ``psycopg2.extras.execute_values`` - extension is used so that multiple parameter sets are grouped into a single - INSERT statement and joined together with multiple VALUES expressions. This - method requires that the string text of the VALUES clause inside the - INSERT statement is manipulated, so is only supported with a compiled - :func:`_expression.insert` construct where the format is predictable. - For all other - constructs, including plain textual INSERT statements not rendered by the - SQLAlchemy expression language compiler, the - ``psycopg2.extras.execute_batch`` method is used. It is therefore important - to note that **"values" mode implies that "batch" mode is also used for - all statements for which "values" mode does not apply**. - -For both strategies, the ``executemany_batch_page_size`` and -``executemany_values_page_size`` arguments control how many parameter sets -should be represented in each execution. Because "values" mode implies a -fallback down to "batch" mode for non-INSERT statements, there are two -independent page size arguments. For each, the default value of ``None`` means -to use psycopg2's defaults, which at the time of this writing are quite low at -100. For the ``execute_values`` method, a number as high as 10000 may prove -to be performant, whereas for ``execute_batch``, as the number represents -full statements repeated, a number closer to the default of 100 is likely -more appropriate:: + semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount` + attribute will not contain a value for executemany-style executions. + +* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT + statements, ``execute_batch`` is used for UPDATE and DELETE. + When using this mode, the :attr:`_engine.CursorResult.rowcount` + attribute will not contain a value for executemany-style executions against + UPDATE and DELETE statements. + +By "qualifying statements", we mean that the statement being executed +must be a Core :func:`_expression.insert`, :func:`_expression.update` +or :func:`_expression.delete` construct, and not a plain textual SQL +string or one constructed using :func:`_expression.text`. When using the +ORM, all insert/update/delete statements used by the ORM flush process +are qualifying. + +The "page size" for the "values" and "batch" strategies can be affected +by using the ``executemany_batch_page_size`` and +``executemany_values_page_size`` engine parameters. These +control how many parameter sets +should be represented in each execution. The "values" page size defaults +to 1000, which is different that psycopg2's default. The "batch" page +size defaults to 100. These can be affected by passing new values to +:func:`_engine.create_engine`:: engine = create_engine( "postgresql+psycopg2://scott:tiger@host/dbname", executemany_mode='values', executemany_values_page_size=10000, executemany_batch_page_size=500) +.. versionchanged:: 1.4 + + The default for ``executemany_values_page_size`` is now 1000, up from + 100. .. seealso:: @@ -223,10 +236,6 @@ more appropriate:: object to execute statements in such a way as to make use of the DBAPI ``.executemany()`` method. -.. versionchanged:: 1.3.7 - Added support for - ``psycopg2.extras.execute_values``. The ``use_batch_mode`` flag is - superseded by the ``executemany_mode`` flag. - .. _psycopg2_unicode: @@ -474,6 +483,7 @@ from ... import exc from ... import processors from ... import types as sqltypes from ... import util +from ...engine import cursor as _cursor from ...util import collections_abc try: @@ -546,18 +556,12 @@ class _PGHStore(HSTORE): class _PGJSON(JSON): def result_processor(self, dialect, coltype): - if dialect._has_native_json: - return None - else: - return super(_PGJSON, self).result_processor(dialect, coltype) + return None class _PGJSONB(JSONB): def result_processor(self, dialect, coltype): - if dialect._has_native_jsonb: - return None - else: - return super(_PGJSONB, self).result_processor(dialect, coltype) + return None class _PGUUID(UUID): @@ -586,6 +590,8 @@ _server_side_id = util.counter() class PGExecutionContext_psycopg2(PGExecutionContext): + _psycopg2_fetched_rows = None + def create_server_side_cursor(self): # use server-side cursors: # http://lists.initd.org/pipermail/psycopg/2007-January/005251.html @@ -593,6 +599,22 @@ class PGExecutionContext_psycopg2(PGExecutionContext): return self._dbapi_connection.cursor(ident) def post_exec(self): + if ( + self._psycopg2_fetched_rows + and self.compiled + and self.compiled.returning + ): + # psycopg2 execute_values will provide for a real cursor where + # cursor.description works correctly. however, it executes the + # INSERT statement multiple times for multiple pages of rows, so + # while this cursor also supports calling .fetchall() directly, in + # order to get the list of all rows inserted across multiple pages, + # we have to retrieve the aggregated list from the execute_values() + # function directly. + strat_cls = _cursor.FullyBufferedCursorFetchStrategy + self.cursor_fetch_strategy = strat_cls( + self.cursor, initial_buffer=self._psycopg2_fetched_rows + ) self._log_notices(self.cursor) def _log_notices(self, cursor): @@ -621,9 +643,13 @@ class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer): pass -EXECUTEMANY_DEFAULT = util.symbol("executemany_default") -EXECUTEMANY_BATCH = util.symbol("executemany_batch") -EXECUTEMANY_VALUES = util.symbol("executemany_values") +EXECUTEMANY_DEFAULT = util.symbol("executemany_default", canonical=0) +EXECUTEMANY_BATCH = util.symbol("executemany_batch", canonical=1) +EXECUTEMANY_VALUES = util.symbol("executemany_values", canonical=2) +EXECUTEMANY_VALUES_PLUS_BATCH = util.symbol( + "executemany_values_plus_batch", + canonical=EXECUTEMANY_BATCH | EXECUTEMANY_VALUES, +) class PGDialect_psycopg2(PGDialect): @@ -641,17 +667,7 @@ class PGDialect_psycopg2(PGDialect): preparer = PGIdentifierPreparer_psycopg2 psycopg2_version = (0, 0) - FEATURE_VERSION_MAP = dict( - native_json=(2, 5), - native_jsonb=(2, 5, 4), - sane_multi_rowcount=(2, 0, 9), - array_oid=(2, 4, 3), - hstore_adapter=(2, 4), - ) - - _has_native_hstore = False - _has_native_json = False - _has_native_jsonb = False + _has_native_hstore = True engine_config_types = PGDialect.engine_config_types.union( {"use_native_unicode": util.asbool} @@ -671,13 +687,6 @@ class PGDialect_psycopg2(PGDialect): }, ) - @util.deprecated_params( - use_batch_mode=( - "1.3.7", - "The psycopg2 use_batch_mode flag is superseded by " - "executemany_mode='batch'", - ) - ) def __init__( self, server_side_cursors=False, @@ -685,15 +694,16 @@ class PGDialect_psycopg2(PGDialect): client_encoding=None, use_native_hstore=True, use_native_uuid=True, - executemany_mode=None, - executemany_batch_page_size=None, - executemany_values_page_size=None, - use_batch_mode=None, + executemany_mode="values_only", + executemany_batch_page_size=100, + executemany_values_page_size=1000, **kwargs ): PGDialect.__init__(self, **kwargs) self.server_side_cursors = server_side_cursors self.use_native_unicode = use_native_unicode + if not use_native_hstore: + self._has_native_hstore = False self.use_native_hstore = use_native_hstore self.use_native_uuid = use_native_uuid self.supports_unicode_binds = use_native_unicode @@ -706,12 +716,14 @@ class PGDialect_psycopg2(PGDialect): { EXECUTEMANY_DEFAULT: [None], EXECUTEMANY_BATCH: ["batch"], - EXECUTEMANY_VALUES: ["values"], + EXECUTEMANY_VALUES: ["values_only"], + EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"], }, "executemany_mode", ) - if use_batch_mode: - self.executemany_mode = EXECUTEMANY_BATCH + + if self.executemany_mode & EXECUTEMANY_VALUES: + self.insert_executemany_returning = True self.executemany_batch_page_size = executemany_batch_page_size self.executemany_values_page_size = executemany_values_page_size @@ -723,24 +735,21 @@ class PGDialect_psycopg2(PGDialect): int(x) for x in m.group(1, 2, 3) if x is not None ) + if self.psycopg2_version < (2, 7): + raise ImportError( + "psycopg2 version 2.7 or higher is required." + ) + def initialize(self, connection): super(PGDialect_psycopg2, self).initialize(connection) self._has_native_hstore = ( self.use_native_hstore and self._hstore_oids(connection.connection) is not None ) - self._has_native_json = ( - self.psycopg2_version >= self.FEATURE_VERSION_MAP["native_json"] - ) - self._has_native_jsonb = ( - self.psycopg2_version >= self.FEATURE_VERSION_MAP["native_jsonb"] - ) # http://initd.org/psycopg/docs/news.html#what-s-new-in-psycopg-2-0-9 - self.supports_sane_multi_rowcount = ( - self.psycopg2_version - >= self.FEATURE_VERSION_MAP["sane_multi_rowcount"] - and self.executemany_mode is EXECUTEMANY_DEFAULT + self.supports_sane_multi_rowcount = not ( + self.executemany_mode & EXECUTEMANY_BATCH ) @classmethod @@ -830,11 +839,7 @@ class PGDialect_psycopg2(PGDialect): kw = {"oid": oid} if util.py2k: kw["unicode"] = True - if ( - self.psycopg2_version - >= self.FEATURE_VERSION_MAP["array_oid"] - ): - kw["array_oid"] = array_oid + kw["array_oid"] = array_oid extras.register_hstore(conn, **kw) fns.append(on_connect) @@ -842,14 +847,12 @@ class PGDialect_psycopg2(PGDialect): if self.dbapi and self._json_deserializer: def on_connect(conn): - if self._has_native_json: - extras.register_default_json( - conn, loads=self._json_deserializer - ) - if self._has_native_jsonb: - extras.register_default_jsonb( - conn, loads=self._json_deserializer - ) + extras.register_default_json( + conn, loads=self._json_deserializer + ) + extras.register_default_jsonb( + conn, loads=self._json_deserializer + ) fns.append(on_connect) @@ -864,12 +867,8 @@ class PGDialect_psycopg2(PGDialect): return None def do_executemany(self, cursor, statement, parameters, context=None): - if self.executemany_mode is EXECUTEMANY_DEFAULT: - cursor.executemany(statement, parameters) - return - if ( - self.executemany_mode is EXECUTEMANY_VALUES + self.executemany_mode & EXECUTEMANY_VALUES and context and context.isinsert and context.compiled.insert_single_values_expr @@ -893,15 +892,17 @@ class PGDialect_psycopg2(PGDialect): kwargs = {"page_size": self.executemany_values_page_size} else: kwargs = {} - self._psycopg2_extras().execute_values( + xtras = self._psycopg2_extras() + context._psycopg2_fetched_rows = xtras.execute_values( cursor, statement, parameters, template=executemany_values, + fetch=bool(context.compiled.returning), **kwargs ) - else: + elif self.executemany_mode & EXECUTEMANY_BATCH: if self.executemany_batch_page_size: kwargs = {"page_size": self.executemany_batch_page_size} else: @@ -909,15 +910,19 @@ class PGDialect_psycopg2(PGDialect): self._psycopg2_extras().execute_batch( cursor, statement, parameters, **kwargs ) + else: + cursor.executemany(statement, parameters) @util.memoized_instancemethod def _hstore_oids(self, conn): - if self.psycopg2_version >= self.FEATURE_VERSION_MAP["hstore_adapter"]: - extras = self._psycopg2_extras() - oids = extras.HstoreAdapter.get_oids(conn) - if oids is not None and oids[0]: - return oids[0:2] - return None + extras = self._psycopg2_extras() + if hasattr(conn, "connection"): + conn = conn.connection + oids = extras.HstoreAdapter.get_oids(conn) + if oids is not None and oids[0]: + return oids[0:2] + else: + return None def create_connect_args(self, url): opts = url.translate_connect_args(username="user") |