summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/dialects/postgresql/asyncpg.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql/asyncpg.py')
-rw-r--r--lib/sqlalchemy/dialects/postgresql/asyncpg.py178
1 files changed, 157 insertions, 21 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 6b7e78266..a79469f2e 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -45,6 +45,57 @@ in conjunction with :func:`_sa.create_engine`::
``json_deserializer`` when creating the engine with
:func:`create_engine` or :func:`create_async_engine`.
+
+.. _asyncpg_prepared_statement_cache:
+
+Prepared Statement Cache
+--------------------------
+
+The asyncpg SQLAlchemy dialect makes use of ``asyncpg.connection.prepare()``
+for all statements. The prepared statement objects are cached after
+construction which appears to grant a 10% or more performance improvement for
+statement invocation. The cache is on a per-DBAPI connection basis, which
+means that the primary storage for prepared statements is within DBAPI
+connections pooled within the connection pool. The size of this cache
+defaults to 100 statements per DBAPI connection and may be adjusted using the
+``prepared_statement_cache_size`` DBAPI argument (note that while this argument
+is implemented by SQLAlchemy, it is part of the DBAPI emulation portion of the
+asyncpg dialect, therefore is handled as a DBAPI argument, not a dialect
+argument)::
+
+
+ engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=500")
+
+To disable the prepared statement cache, use a value of zero::
+
+ engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0")
+
+.. versionadded:: 1.4.0b2 Added ``prepared_statement_cache_size`` for asyncpg.
+
+
+.. warning:: The ``asyncpg`` database driver necessarily uses caches for
+ PostgreSQL type OIDs, which become stale when custom PostgreSQL datatypes
+ such as ``ENUM`` objects are changed via DDL operations. Additionally,
+ prepared statements themselves which are optionally cached by SQLAlchemy's
+ driver as described above may also become "stale" when DDL has been emitted
+ to the PostgreSQL database which modifies the tables or other objects
+ involved in a particular prepared statement.
+
+ The SQLAlchemy asyncpg dialect will invalidate these caches within its local
+ process when statements that represent DDL are emitted on a local
+ connection, but this is only controllable within a single Python process /
+ database engine. If DDL changes are made from other database engines
+ and/or processes, a running application may encounter asyncpg exceptions
+ ``InvalidCachedStatementError`` and/or ``InternalServerError("cache lookup
+ failed for type <oid>")`` if it refers to pooled database connections which
+ operated upon the previous structures. The SQLAlchemy asyncpg dialect will
+ recover from these error cases when the driver raises these exceptions by
+ clearing its internal caches as well as those of the asyncpg driver in
+ response to them, but cannot prevent them from being raised in the first
+ place if the cached prepared statement or asyncpg type caches have gone
+ stale, nor can it retry the statement as the PostgreSQL transaction is
+ invalidated when these errors occur.
+
""" # noqa
import collections
@@ -52,6 +103,7 @@ import decimal
import itertools
import json as _py_json
import re
+import time
from . import json
from .base import _DECIMAL_TYPES
@@ -235,9 +287,23 @@ class AsyncpgOID(OID):
class PGExecutionContext_asyncpg(PGExecutionContext):
+ def handle_dbapi_exception(self, e):
+ if isinstance(
+ e,
+ (
+ self.dialect.dbapi.InvalidCachedStatementError,
+ self.dialect.dbapi.InternalServerError,
+ ),
+ ):
+ self.dialect._invalidate_schema_cache()
+
def pre_exec(self):
if self.isddl:
- self._dbapi_connection.reset_schema_state()
+ self.dialect._invalidate_schema_cache()
+
+ self.cursor._invalidate_schema_cache_asof = (
+ self.dialect._invalidate_schema_cache_asof
+ )
if not self.compiled:
return
@@ -269,6 +335,7 @@ class AsyncAdapt_asyncpg_cursor:
"rowcount",
"_inputsizes",
"_cursor",
+ "_invalidate_schema_cache_asof",
)
server_side = False
@@ -282,6 +349,7 @@ class AsyncAdapt_asyncpg_cursor:
self.arraysize = 1
self.rowcount = -1
self._inputsizes = None
+ self._invalidate_schema_cache_asof = 0
def close(self):
self._rows[:] = []
@@ -302,25 +370,25 @@ class AsyncAdapt_asyncpg_cursor:
)
async def _prepare_and_execute(self, operation, parameters):
- # TODO: I guess cache these in an LRU cache, or see if we can
- # use some asyncpg concept
-
- # TODO: would be nice to support the dollar numeric thing
- # directly, this is much easier for now
if not self._adapt_connection._started:
await self._adapt_connection._start_transaction()
params = self._parameters()
+
+ # TODO: would be nice to support the dollar numeric thing
+ # directly, this is much easier for now
operation = re.sub(r"\?", lambda m: next(params), operation)
+
try:
- prepared_stmt = await self._connection.prepare(operation)
+ prepared_stmt, attributes = await self._adapt_connection._prepare(
+ operation, self._invalidate_schema_cache_asof
+ )
- attributes = prepared_stmt.get_attributes()
if attributes:
self.description = [
(attr.name, attr.type.oid, None, None, None, None, None)
- for attr in prepared_stmt.get_attributes()
+ for attr in attributes
]
else:
self.description = None
@@ -350,15 +418,21 @@ class AsyncAdapt_asyncpg_cursor:
self._handle_exception(error)
def executemany(self, operation, seq_of_parameters):
- if not self._adapt_connection._started:
- self._adapt_connection.await_(
- self._adapt_connection._start_transaction()
+ adapt_connection = self._adapt_connection
+
+ adapt_connection.await_(
+ adapt_connection._check_type_cache_invalidation(
+ self._invalidate_schema_cache_asof
)
+ )
+
+ if not adapt_connection._started:
+ adapt_connection.await_(adapt_connection._start_transaction())
params = self._parameters()
operation = re.sub(r"\?", lambda m: next(params), operation)
try:
- return self._adapt_connection.await_(
+ return adapt_connection.await_(
self._connection.executemany(operation, seq_of_parameters)
)
except Exception as error:
@@ -485,11 +559,13 @@ class AsyncAdapt_asyncpg_connection:
"deferrable",
"_transaction",
"_started",
+ "_prepared_statement_cache",
+ "_invalidate_schema_cache_asof",
)
await_ = staticmethod(await_only)
- def __init__(self, dbapi, connection):
+ def __init__(self, dbapi, connection, prepared_statement_cache_size=100):
self.dbapi = dbapi
self._connection = connection
self.isolation_level = self._isolation_setting = "read_committed"
@@ -497,6 +573,46 @@ class AsyncAdapt_asyncpg_connection:
self.deferrable = False
self._transaction = None
self._started = False
+ self._invalidate_schema_cache_asof = time.time()
+
+ if prepared_statement_cache_size:
+ self._prepared_statement_cache = util.LRUCache(
+ prepared_statement_cache_size
+ )
+ else:
+ self._prepared_statement_cache = None
+
+ async def _check_type_cache_invalidation(self, invalidate_timestamp):
+ if invalidate_timestamp > self._invalidate_schema_cache_asof:
+ await self._connection.reload_schema_state()
+ self._invalidate_schema_cache_asof = invalidate_timestamp
+
+ async def _prepare(self, operation, invalidate_timestamp):
+ await self._check_type_cache_invalidation(invalidate_timestamp)
+
+ cache = self._prepared_statement_cache
+ if cache is None:
+ prepared_stmt = await self._connection.prepare(operation)
+ attributes = prepared_stmt.get_attributes()
+ return prepared_stmt, attributes
+
+ # asyncpg uses a type cache for the "attributes" which seems to go
+ # stale independently of the PreparedStatement itself, so place that
+ # collection in the cache as well.
+ if operation in cache:
+ prepared_stmt, attributes, cached_timestamp = cache[operation]
+
+ # preparedstatements themselves also go stale for certain DDL
+ # changes such as size of a VARCHAR changing, so there is also
+ # a cross-connection invalidation timestamp
+ if cached_timestamp > invalidate_timestamp:
+ return prepared_stmt, attributes
+
+ prepared_stmt = await self._connection.prepare(operation)
+ attributes = prepared_stmt.get_attributes()
+ cache[operation] = (prepared_stmt, attributes, time.time())
+
+ return prepared_stmt, attributes
def _handle_exception(self, error):
if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error):
@@ -551,9 +667,6 @@ class AsyncAdapt_asyncpg_connection:
else:
return AsyncAdapt_asyncpg_cursor(self)
- def reset_schema_state(self):
- self.await_(self._connection.reload_schema_state())
-
def rollback(self):
if self._started:
self.await_(self._transaction.rollback())
@@ -586,16 +699,20 @@ class AsyncAdapt_asyncpg_dbapi:
def connect(self, *arg, **kw):
async_fallback = kw.pop("async_fallback", False)
-
+ prepared_statement_cache_size = kw.pop(
+ "prepared_statement_cache_size", 100
+ )
if util.asbool(async_fallback):
return AsyncAdaptFallback_asyncpg_connection(
self,
await_fallback(self.asyncpg.connect(*arg, **kw)),
+ prepared_statement_cache_size=prepared_statement_cache_size,
)
else:
return AsyncAdapt_asyncpg_connection(
self,
await_only(self.asyncpg.connect(*arg, **kw)),
+ prepared_statement_cache_size=prepared_statement_cache_size,
)
class Error(Exception):
@@ -628,15 +745,29 @@ class AsyncAdapt_asyncpg_dbapi:
class NotSupportedError(DatabaseError):
pass
+ class InternalServerError(InternalError):
+ pass
+
+ class InvalidCachedStatementError(NotSupportedError):
+ def __init__(self, message):
+ super(
+ AsyncAdapt_asyncpg_dbapi.InvalidCachedStatementError, self
+ ).__init__(
+ message + " (SQLAlchemy asyncpg dialect will now invalidate "
+ "all prepared caches in response to this exception)",
+ )
+
@util.memoized_property
def _asyncpg_error_translate(self):
import asyncpg
return {
- asyncpg.exceptions.IntegrityConstraintViolationError: self.IntegrityError, # noqa
+ asyncpg.exceptions.IntegrityConstraintViolationError: self.IntegrityError, # noqa: E501
asyncpg.exceptions.PostgresError: self.Error,
asyncpg.exceptions.SyntaxOrAccessError: self.ProgrammingError,
asyncpg.exceptions.InterfaceError: self.InterfaceError,
+ asyncpg.exceptions.InvalidCachedStatementError: self.InvalidCachedStatementError, # noqa: E501
+ asyncpg.exceptions.InternalServerError: self.InternalServerError,
}
def Binary(self, value):
@@ -730,6 +861,10 @@ class PGDialect_asyncpg(PGDialect):
},
)
is_async = True
+ _invalidate_schema_cache_asof = 0
+
+ def _invalidate_schema_cache(self):
+ self._invalidate_schema_cache_asof = time.time()
@util.memoized_property
def _dbapi_version(self):
@@ -787,9 +922,10 @@ class PGDialect_asyncpg(PGDialect):
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")
- if "port" in opts:
- opts["port"] = int(opts["port"])
+
opts.update(url.query)
+ util.coerce_kw_type(opts, "prepared_statement_cache_size", int)
+ util.coerce_kw_type(opts, "port", int)
return ([], opts)
@classmethod