diff options
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql/asyncpg.py')
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/asyncpg.py | 178 |
1 files changed, 157 insertions, 21 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index 6b7e78266..a79469f2e 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -45,6 +45,57 @@ in conjunction with :func:`_sa.create_engine`:: ``json_deserializer`` when creating the engine with :func:`create_engine` or :func:`create_async_engine`. + +.. _asyncpg_prepared_statement_cache: + +Prepared Statement Cache +-------------------------- + +The asyncpg SQLAlchemy dialect makes use of ``asyncpg.connection.prepare()`` +for all statements. The prepared statement objects are cached after +construction which appears to grant a 10% or more performance improvement for +statement invocation. The cache is on a per-DBAPI connection basis, which +means that the primary storage for prepared statements is within DBAPI +connections pooled within the connection pool. The size of this cache +defaults to 100 statements per DBAPI connection and may be adjusted using the +``prepared_statement_cache_size`` DBAPI argument (note that while this argument +is implemented by SQLAlchemy, it is part of the DBAPI emulation portion of the +asyncpg dialect, therefore is handled as a DBAPI argument, not a dialect +argument):: + + + engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=500") + +To disable the prepared statement cache, use a value of zero:: + + engine = create_async_engine("postgresql+asyncpg://user:pass@hostname/dbname?prepared_statement_cache_size=0") + +.. versionadded:: 1.4.0b2 Added ``prepared_statement_cache_size`` for asyncpg. + + +.. warning:: The ``asyncpg`` database driver necessarily uses caches for + PostgreSQL type OIDs, which become stale when custom PostgreSQL datatypes + such as ``ENUM`` objects are changed via DDL operations. Additionally, + prepared statements themselves which are optionally cached by SQLAlchemy's + driver as described above may also become "stale" when DDL has been emitted + to the PostgreSQL database which modifies the tables or other objects + involved in a particular prepared statement. + + The SQLAlchemy asyncpg dialect will invalidate these caches within its local + process when statements that represent DDL are emitted on a local + connection, but this is only controllable within a single Python process / + database engine. If DDL changes are made from other database engines + and/or processes, a running application may encounter asyncpg exceptions + ``InvalidCachedStatementError`` and/or ``InternalServerError("cache lookup + failed for type <oid>")`` if it refers to pooled database connections which + operated upon the previous structures. The SQLAlchemy asyncpg dialect will + recover from these error cases when the driver raises these exceptions by + clearing its internal caches as well as those of the asyncpg driver in + response to them, but cannot prevent them from being raised in the first + place if the cached prepared statement or asyncpg type caches have gone + stale, nor can it retry the statement as the PostgreSQL transaction is + invalidated when these errors occur. + """ # noqa import collections @@ -52,6 +103,7 @@ import decimal import itertools import json as _py_json import re +import time from . import json from .base import _DECIMAL_TYPES @@ -235,9 +287,23 @@ class AsyncpgOID(OID): class PGExecutionContext_asyncpg(PGExecutionContext): + def handle_dbapi_exception(self, e): + if isinstance( + e, + ( + self.dialect.dbapi.InvalidCachedStatementError, + self.dialect.dbapi.InternalServerError, + ), + ): + self.dialect._invalidate_schema_cache() + def pre_exec(self): if self.isddl: - self._dbapi_connection.reset_schema_state() + self.dialect._invalidate_schema_cache() + + self.cursor._invalidate_schema_cache_asof = ( + self.dialect._invalidate_schema_cache_asof + ) if not self.compiled: return @@ -269,6 +335,7 @@ class AsyncAdapt_asyncpg_cursor: "rowcount", "_inputsizes", "_cursor", + "_invalidate_schema_cache_asof", ) server_side = False @@ -282,6 +349,7 @@ class AsyncAdapt_asyncpg_cursor: self.arraysize = 1 self.rowcount = -1 self._inputsizes = None + self._invalidate_schema_cache_asof = 0 def close(self): self._rows[:] = [] @@ -302,25 +370,25 @@ class AsyncAdapt_asyncpg_cursor: ) async def _prepare_and_execute(self, operation, parameters): - # TODO: I guess cache these in an LRU cache, or see if we can - # use some asyncpg concept - - # TODO: would be nice to support the dollar numeric thing - # directly, this is much easier for now if not self._adapt_connection._started: await self._adapt_connection._start_transaction() params = self._parameters() + + # TODO: would be nice to support the dollar numeric thing + # directly, this is much easier for now operation = re.sub(r"\?", lambda m: next(params), operation) + try: - prepared_stmt = await self._connection.prepare(operation) + prepared_stmt, attributes = await self._adapt_connection._prepare( + operation, self._invalidate_schema_cache_asof + ) - attributes = prepared_stmt.get_attributes() if attributes: self.description = [ (attr.name, attr.type.oid, None, None, None, None, None) - for attr in prepared_stmt.get_attributes() + for attr in attributes ] else: self.description = None @@ -350,15 +418,21 @@ class AsyncAdapt_asyncpg_cursor: self._handle_exception(error) def executemany(self, operation, seq_of_parameters): - if not self._adapt_connection._started: - self._adapt_connection.await_( - self._adapt_connection._start_transaction() + adapt_connection = self._adapt_connection + + adapt_connection.await_( + adapt_connection._check_type_cache_invalidation( + self._invalidate_schema_cache_asof ) + ) + + if not adapt_connection._started: + adapt_connection.await_(adapt_connection._start_transaction()) params = self._parameters() operation = re.sub(r"\?", lambda m: next(params), operation) try: - return self._adapt_connection.await_( + return adapt_connection.await_( self._connection.executemany(operation, seq_of_parameters) ) except Exception as error: @@ -485,11 +559,13 @@ class AsyncAdapt_asyncpg_connection: "deferrable", "_transaction", "_started", + "_prepared_statement_cache", + "_invalidate_schema_cache_asof", ) await_ = staticmethod(await_only) - def __init__(self, dbapi, connection): + def __init__(self, dbapi, connection, prepared_statement_cache_size=100): self.dbapi = dbapi self._connection = connection self.isolation_level = self._isolation_setting = "read_committed" @@ -497,6 +573,46 @@ class AsyncAdapt_asyncpg_connection: self.deferrable = False self._transaction = None self._started = False + self._invalidate_schema_cache_asof = time.time() + + if prepared_statement_cache_size: + self._prepared_statement_cache = util.LRUCache( + prepared_statement_cache_size + ) + else: + self._prepared_statement_cache = None + + async def _check_type_cache_invalidation(self, invalidate_timestamp): + if invalidate_timestamp > self._invalidate_schema_cache_asof: + await self._connection.reload_schema_state() + self._invalidate_schema_cache_asof = invalidate_timestamp + + async def _prepare(self, operation, invalidate_timestamp): + await self._check_type_cache_invalidation(invalidate_timestamp) + + cache = self._prepared_statement_cache + if cache is None: + prepared_stmt = await self._connection.prepare(operation) + attributes = prepared_stmt.get_attributes() + return prepared_stmt, attributes + + # asyncpg uses a type cache for the "attributes" which seems to go + # stale independently of the PreparedStatement itself, so place that + # collection in the cache as well. + if operation in cache: + prepared_stmt, attributes, cached_timestamp = cache[operation] + + # preparedstatements themselves also go stale for certain DDL + # changes such as size of a VARCHAR changing, so there is also + # a cross-connection invalidation timestamp + if cached_timestamp > invalidate_timestamp: + return prepared_stmt, attributes + + prepared_stmt = await self._connection.prepare(operation) + attributes = prepared_stmt.get_attributes() + cache[operation] = (prepared_stmt, attributes, time.time()) + + return prepared_stmt, attributes def _handle_exception(self, error): if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error): @@ -551,9 +667,6 @@ class AsyncAdapt_asyncpg_connection: else: return AsyncAdapt_asyncpg_cursor(self) - def reset_schema_state(self): - self.await_(self._connection.reload_schema_state()) - def rollback(self): if self._started: self.await_(self._transaction.rollback()) @@ -586,16 +699,20 @@ class AsyncAdapt_asyncpg_dbapi: def connect(self, *arg, **kw): async_fallback = kw.pop("async_fallback", False) - + prepared_statement_cache_size = kw.pop( + "prepared_statement_cache_size", 100 + ) if util.asbool(async_fallback): return AsyncAdaptFallback_asyncpg_connection( self, await_fallback(self.asyncpg.connect(*arg, **kw)), + prepared_statement_cache_size=prepared_statement_cache_size, ) else: return AsyncAdapt_asyncpg_connection( self, await_only(self.asyncpg.connect(*arg, **kw)), + prepared_statement_cache_size=prepared_statement_cache_size, ) class Error(Exception): @@ -628,15 +745,29 @@ class AsyncAdapt_asyncpg_dbapi: class NotSupportedError(DatabaseError): pass + class InternalServerError(InternalError): + pass + + class InvalidCachedStatementError(NotSupportedError): + def __init__(self, message): + super( + AsyncAdapt_asyncpg_dbapi.InvalidCachedStatementError, self + ).__init__( + message + " (SQLAlchemy asyncpg dialect will now invalidate " + "all prepared caches in response to this exception)", + ) + @util.memoized_property def _asyncpg_error_translate(self): import asyncpg return { - asyncpg.exceptions.IntegrityConstraintViolationError: self.IntegrityError, # noqa + asyncpg.exceptions.IntegrityConstraintViolationError: self.IntegrityError, # noqa: E501 asyncpg.exceptions.PostgresError: self.Error, asyncpg.exceptions.SyntaxOrAccessError: self.ProgrammingError, asyncpg.exceptions.InterfaceError: self.InterfaceError, + asyncpg.exceptions.InvalidCachedStatementError: self.InvalidCachedStatementError, # noqa: E501 + asyncpg.exceptions.InternalServerError: self.InternalServerError, } def Binary(self, value): @@ -730,6 +861,10 @@ class PGDialect_asyncpg(PGDialect): }, ) is_async = True + _invalidate_schema_cache_asof = 0 + + def _invalidate_schema_cache(self): + self._invalidate_schema_cache_asof = time.time() @util.memoized_property def _dbapi_version(self): @@ -787,9 +922,10 @@ class PGDialect_asyncpg(PGDialect): def create_connect_args(self, url): opts = url.translate_connect_args(username="user") - if "port" in opts: - opts["port"] = int(opts["port"]) + opts.update(url.query) + util.coerce_kw_type(opts, "prepared_statement_cache_size", int) + util.coerce_kw_type(opts, "port", int) return ([], opts) @classmethod |