summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/dialects/postgresql/psycopg2.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql/psycopg2.py')
-rw-r--r--lib/sqlalchemy/dialects/postgresql/psycopg2.py251
1 files changed, 128 insertions, 123 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
index a9408bcb0..6364838a6 100644
--- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py
+++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
@@ -61,9 +61,6 @@ psycopg2-specific keyword arguments which are accepted by
:ref:`psycopg2_executemany_mode`
-* ``use_batch_mode``: this is the previous setting used to affect "executemany"
- mode and is now deprecated.
-
Unix Domain Connections
------------------------
@@ -155,66 +152,82 @@ Modern versions of psycopg2 include a feature known as
<http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which
have been shown in benchmarking to improve psycopg2's executemany()
performance, primarily with INSERT statements, by multiple orders of magnitude.
-SQLAlchemy allows this extension to be used for all ``executemany()`` style
-calls invoked by an :class:`_engine.Engine`
-when used with :ref:`multiple parameter
-sets <execute_multiple>`, which includes the use of this feature both by the
-Core as well as by the ORM for inserts of objects with non-autogenerated
-primary key values, by adding the ``executemany_mode`` flag to
-:func:`_sa.create_engine`::
+SQLAlchemy internally makes use of these extensions for ``executemany()`` style
+calls, which correspond to lists of parameters being passed to
+:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter
+sets <execute_multiple>`. The ORM also uses this mode internally whenever
+possible.
+
+The two available extensions on the psycopg2 side are the ``execute_values()``
+and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the
+``execute_values()`` extension for all qualifying INSERT statements.
+
+.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode
+ ``"values_only"`` for ``executemany_mode``, which allows an order of
+ magnitude performance improvement for INSERT statements, but does not
+ include "batch" mode for UPDATE and DELETE statements which removes the
+ ability of ``cursor.rowcount`` to function correctly.
+
+The use of these extensions is controlled by the ``executemany_mode`` flag
+which may be passed to :func:`_sa.create_engine`::
engine = create_engine(
"postgresql+psycopg2://scott:tiger@host/dbname",
- executemany_mode='batch')
-
+ executemany_mode='values_plus_batch')
-.. versionchanged:: 1.3.7 - the ``use_batch_mode`` flag has been superseded
- by a new parameter ``executemany_mode`` which provides support both for
- psycopg2's ``execute_batch`` helper as well as the ``execute_values``
- helper.
Possible options for ``executemany_mode`` include:
-* ``None`` - By default, psycopg2's extensions are not used, and the usual
- ``cursor.executemany()`` method is used when invoking batches of statements.
+* ``values_only`` - this is the default value. the psycopg2 execute_values()
+ extension is used for qualifying INSERT statements, which rewrites the INSERT
+ to include multiple VALUES clauses so that many parameter sets can be
+ inserted with one statement.
-* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` so that multiple copies
+ .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode``
+ which is also now the default.
+
+* ``None`` - No psycopg2 extensions are not used, and the usual
+ ``cursor.executemany()`` method is used when invoking statements with
+ multiple parameter sets.
+
+* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying
+ INSERT, UPDATE and DELETE statements, so that multiple copies
of a SQL query, each one corresponding to a parameter set passed to
``executemany()``, are joined into a single SQL string separated by a
- semicolon. This is the same behavior as was provided by the
- ``use_batch_mode=True`` flag.
-
-* ``'values'``- For Core :func:`_expression.insert`
- constructs only (including those
- emitted by the ORM automatically), the ``psycopg2.extras.execute_values``
- extension is used so that multiple parameter sets are grouped into a single
- INSERT statement and joined together with multiple VALUES expressions. This
- method requires that the string text of the VALUES clause inside the
- INSERT statement is manipulated, so is only supported with a compiled
- :func:`_expression.insert` construct where the format is predictable.
- For all other
- constructs, including plain textual INSERT statements not rendered by the
- SQLAlchemy expression language compiler, the
- ``psycopg2.extras.execute_batch`` method is used. It is therefore important
- to note that **"values" mode implies that "batch" mode is also used for
- all statements for which "values" mode does not apply**.
-
-For both strategies, the ``executemany_batch_page_size`` and
-``executemany_values_page_size`` arguments control how many parameter sets
-should be represented in each execution. Because "values" mode implies a
-fallback down to "batch" mode for non-INSERT statements, there are two
-independent page size arguments. For each, the default value of ``None`` means
-to use psycopg2's defaults, which at the time of this writing are quite low at
-100. For the ``execute_values`` method, a number as high as 10000 may prove
-to be performant, whereas for ``execute_batch``, as the number represents
-full statements repeated, a number closer to the default of 100 is likely
-more appropriate::
+ semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount`
+ attribute will not contain a value for executemany-style executions.
+
+* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT
+ statements, ``execute_batch`` is used for UPDATE and DELETE.
+ When using this mode, the :attr:`_engine.CursorResult.rowcount`
+ attribute will not contain a value for executemany-style executions against
+ UPDATE and DELETE statements.
+
+By "qualifying statements", we mean that the statement being executed
+must be a Core :func:`_expression.insert`, :func:`_expression.update`
+or :func:`_expression.delete` construct, and not a plain textual SQL
+string or one constructed using :func:`_expression.text`. When using the
+ORM, all insert/update/delete statements used by the ORM flush process
+are qualifying.
+
+The "page size" for the "values" and "batch" strategies can be affected
+by using the ``executemany_batch_page_size`` and
+``executemany_values_page_size`` engine parameters. These
+control how many parameter sets
+should be represented in each execution. The "values" page size defaults
+to 1000, which is different that psycopg2's default. The "batch" page
+size defaults to 100. These can be affected by passing new values to
+:func:`_engine.create_engine`::
engine = create_engine(
"postgresql+psycopg2://scott:tiger@host/dbname",
executemany_mode='values',
executemany_values_page_size=10000, executemany_batch_page_size=500)
+.. versionchanged:: 1.4
+
+ The default for ``executemany_values_page_size`` is now 1000, up from
+ 100.
.. seealso::
@@ -223,10 +236,6 @@ more appropriate::
object to execute statements in such a way as to make
use of the DBAPI ``.executemany()`` method.
-.. versionchanged:: 1.3.7 - Added support for
- ``psycopg2.extras.execute_values``. The ``use_batch_mode`` flag is
- superseded by the ``executemany_mode`` flag.
-
.. _psycopg2_unicode:
@@ -474,6 +483,7 @@ from ... import exc
from ... import processors
from ... import types as sqltypes
from ... import util
+from ...engine import cursor as _cursor
from ...util import collections_abc
try:
@@ -546,18 +556,12 @@ class _PGHStore(HSTORE):
class _PGJSON(JSON):
def result_processor(self, dialect, coltype):
- if dialect._has_native_json:
- return None
- else:
- return super(_PGJSON, self).result_processor(dialect, coltype)
+ return None
class _PGJSONB(JSONB):
def result_processor(self, dialect, coltype):
- if dialect._has_native_jsonb:
- return None
- else:
- return super(_PGJSONB, self).result_processor(dialect, coltype)
+ return None
class _PGUUID(UUID):
@@ -586,6 +590,8 @@ _server_side_id = util.counter()
class PGExecutionContext_psycopg2(PGExecutionContext):
+ _psycopg2_fetched_rows = None
+
def create_server_side_cursor(self):
# use server-side cursors:
# http://lists.initd.org/pipermail/psycopg/2007-January/005251.html
@@ -593,6 +599,22 @@ class PGExecutionContext_psycopg2(PGExecutionContext):
return self._dbapi_connection.cursor(ident)
def post_exec(self):
+ if (
+ self._psycopg2_fetched_rows
+ and self.compiled
+ and self.compiled.returning
+ ):
+ # psycopg2 execute_values will provide for a real cursor where
+ # cursor.description works correctly. however, it executes the
+ # INSERT statement multiple times for multiple pages of rows, so
+ # while this cursor also supports calling .fetchall() directly, in
+ # order to get the list of all rows inserted across multiple pages,
+ # we have to retrieve the aggregated list from the execute_values()
+ # function directly.
+ strat_cls = _cursor.FullyBufferedCursorFetchStrategy
+ self.cursor_fetch_strategy = strat_cls(
+ self.cursor, initial_buffer=self._psycopg2_fetched_rows
+ )
self._log_notices(self.cursor)
def _log_notices(self, cursor):
@@ -621,9 +643,13 @@ class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer):
pass
-EXECUTEMANY_DEFAULT = util.symbol("executemany_default")
-EXECUTEMANY_BATCH = util.symbol("executemany_batch")
-EXECUTEMANY_VALUES = util.symbol("executemany_values")
+EXECUTEMANY_DEFAULT = util.symbol("executemany_default", canonical=0)
+EXECUTEMANY_BATCH = util.symbol("executemany_batch", canonical=1)
+EXECUTEMANY_VALUES = util.symbol("executemany_values", canonical=2)
+EXECUTEMANY_VALUES_PLUS_BATCH = util.symbol(
+ "executemany_values_plus_batch",
+ canonical=EXECUTEMANY_BATCH | EXECUTEMANY_VALUES,
+)
class PGDialect_psycopg2(PGDialect):
@@ -641,17 +667,7 @@ class PGDialect_psycopg2(PGDialect):
preparer = PGIdentifierPreparer_psycopg2
psycopg2_version = (0, 0)
- FEATURE_VERSION_MAP = dict(
- native_json=(2, 5),
- native_jsonb=(2, 5, 4),
- sane_multi_rowcount=(2, 0, 9),
- array_oid=(2, 4, 3),
- hstore_adapter=(2, 4),
- )
-
- _has_native_hstore = False
- _has_native_json = False
- _has_native_jsonb = False
+ _has_native_hstore = True
engine_config_types = PGDialect.engine_config_types.union(
{"use_native_unicode": util.asbool}
@@ -671,13 +687,6 @@ class PGDialect_psycopg2(PGDialect):
},
)
- @util.deprecated_params(
- use_batch_mode=(
- "1.3.7",
- "The psycopg2 use_batch_mode flag is superseded by "
- "executemany_mode='batch'",
- )
- )
def __init__(
self,
server_side_cursors=False,
@@ -685,15 +694,16 @@ class PGDialect_psycopg2(PGDialect):
client_encoding=None,
use_native_hstore=True,
use_native_uuid=True,
- executemany_mode=None,
- executemany_batch_page_size=None,
- executemany_values_page_size=None,
- use_batch_mode=None,
+ executemany_mode="values_only",
+ executemany_batch_page_size=100,
+ executemany_values_page_size=1000,
**kwargs
):
PGDialect.__init__(self, **kwargs)
self.server_side_cursors = server_side_cursors
self.use_native_unicode = use_native_unicode
+ if not use_native_hstore:
+ self._has_native_hstore = False
self.use_native_hstore = use_native_hstore
self.use_native_uuid = use_native_uuid
self.supports_unicode_binds = use_native_unicode
@@ -706,12 +716,14 @@ class PGDialect_psycopg2(PGDialect):
{
EXECUTEMANY_DEFAULT: [None],
EXECUTEMANY_BATCH: ["batch"],
- EXECUTEMANY_VALUES: ["values"],
+ EXECUTEMANY_VALUES: ["values_only"],
+ EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"],
},
"executemany_mode",
)
- if use_batch_mode:
- self.executemany_mode = EXECUTEMANY_BATCH
+
+ if self.executemany_mode & EXECUTEMANY_VALUES:
+ self.insert_executemany_returning = True
self.executemany_batch_page_size = executemany_batch_page_size
self.executemany_values_page_size = executemany_values_page_size
@@ -723,24 +735,21 @@ class PGDialect_psycopg2(PGDialect):
int(x) for x in m.group(1, 2, 3) if x is not None
)
+ if self.psycopg2_version < (2, 7):
+ raise ImportError(
+ "psycopg2 version 2.7 or higher is required."
+ )
+
def initialize(self, connection):
super(PGDialect_psycopg2, self).initialize(connection)
self._has_native_hstore = (
self.use_native_hstore
and self._hstore_oids(connection.connection) is not None
)
- self._has_native_json = (
- self.psycopg2_version >= self.FEATURE_VERSION_MAP["native_json"]
- )
- self._has_native_jsonb = (
- self.psycopg2_version >= self.FEATURE_VERSION_MAP["native_jsonb"]
- )
# http://initd.org/psycopg/docs/news.html#what-s-new-in-psycopg-2-0-9
- self.supports_sane_multi_rowcount = (
- self.psycopg2_version
- >= self.FEATURE_VERSION_MAP["sane_multi_rowcount"]
- and self.executemany_mode is EXECUTEMANY_DEFAULT
+ self.supports_sane_multi_rowcount = not (
+ self.executemany_mode & EXECUTEMANY_BATCH
)
@classmethod
@@ -830,11 +839,7 @@ class PGDialect_psycopg2(PGDialect):
kw = {"oid": oid}
if util.py2k:
kw["unicode"] = True
- if (
- self.psycopg2_version
- >= self.FEATURE_VERSION_MAP["array_oid"]
- ):
- kw["array_oid"] = array_oid
+ kw["array_oid"] = array_oid
extras.register_hstore(conn, **kw)
fns.append(on_connect)
@@ -842,14 +847,12 @@ class PGDialect_psycopg2(PGDialect):
if self.dbapi and self._json_deserializer:
def on_connect(conn):
- if self._has_native_json:
- extras.register_default_json(
- conn, loads=self._json_deserializer
- )
- if self._has_native_jsonb:
- extras.register_default_jsonb(
- conn, loads=self._json_deserializer
- )
+ extras.register_default_json(
+ conn, loads=self._json_deserializer
+ )
+ extras.register_default_jsonb(
+ conn, loads=self._json_deserializer
+ )
fns.append(on_connect)
@@ -864,12 +867,8 @@ class PGDialect_psycopg2(PGDialect):
return None
def do_executemany(self, cursor, statement, parameters, context=None):
- if self.executemany_mode is EXECUTEMANY_DEFAULT:
- cursor.executemany(statement, parameters)
- return
-
if (
- self.executemany_mode is EXECUTEMANY_VALUES
+ self.executemany_mode & EXECUTEMANY_VALUES
and context
and context.isinsert
and context.compiled.insert_single_values_expr
@@ -893,15 +892,17 @@ class PGDialect_psycopg2(PGDialect):
kwargs = {"page_size": self.executemany_values_page_size}
else:
kwargs = {}
- self._psycopg2_extras().execute_values(
+ xtras = self._psycopg2_extras()
+ context._psycopg2_fetched_rows = xtras.execute_values(
cursor,
statement,
parameters,
template=executemany_values,
+ fetch=bool(context.compiled.returning),
**kwargs
)
- else:
+ elif self.executemany_mode & EXECUTEMANY_BATCH:
if self.executemany_batch_page_size:
kwargs = {"page_size": self.executemany_batch_page_size}
else:
@@ -909,15 +910,19 @@ class PGDialect_psycopg2(PGDialect):
self._psycopg2_extras().execute_batch(
cursor, statement, parameters, **kwargs
)
+ else:
+ cursor.executemany(statement, parameters)
@util.memoized_instancemethod
def _hstore_oids(self, conn):
- if self.psycopg2_version >= self.FEATURE_VERSION_MAP["hstore_adapter"]:
- extras = self._psycopg2_extras()
- oids = extras.HstoreAdapter.get_oids(conn)
- if oids is not None and oids[0]:
- return oids[0:2]
- return None
+ extras = self._psycopg2_extras()
+ if hasattr(conn, "connection"):
+ conn = conn.connection
+ oids = extras.HstoreAdapter.get_oids(conn)
+ if oids is not None and oids[0]:
+ return oids[0:2]
+ else:
+ return None
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")