diff options
author | Federico Caselli <cfederico87@gmail.com> | 2021-09-14 23:38:00 +0200 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-11-26 10:14:44 -0500 |
commit | 5eb407f84bdabdbcd68975dbf76dc4c0809d7373 (patch) | |
tree | 0d37ab4b9c28d8a0fa6cefdcc1933d52ffd9a599 /lib/sqlalchemy/dialects/postgresql | |
parent | 8ddb3ef165d0c2d6d7167bb861bb349e68b5e8df (diff) | |
download | sqlalchemy-5eb407f84bdabdbcd68975dbf76dc4c0809d7373.tar.gz |
Added support for ``psycopg`` dialect.
Both sync and async versions are supported.
Fixes: #6842
Change-Id: I57751c5028acebfc6f9c43572562405453a2f2a4
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql')
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/__init__.py | 8 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/_psycopg_common.py | 188 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/asyncpg.py | 1 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/base.py | 8 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/psycopg.py | 641 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/psycopg2.py | 163 |
6 files changed, 852 insertions, 157 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/__init__.py b/lib/sqlalchemy/dialects/postgresql/__init__.py index 08b05dc74..b0af5395e 100644 --- a/lib/sqlalchemy/dialects/postgresql/__init__.py +++ b/lib/sqlalchemy/dialects/postgresql/__init__.py @@ -4,9 +4,12 @@ # # This module is part of SQLAlchemy and is released under # the MIT License: https://www.opensource.org/licenses/mit-license.php +from types import ModuleType + from . import asyncpg # noqa from . import base from . import pg8000 # noqa +from . import psycopg # noqa from . import psycopg2 # noqa from . import psycopg2cffi # noqa from .array import All @@ -58,6 +61,11 @@ from .ranges import TSRANGE from .ranges import TSTZRANGE from ...util import compat +# Alias psycopg also as psycopg_asnyc +psycopg_async = type( + "psycopg_asnyc", (ModuleType,), {"dialect": psycopg.dialect_async} +) + base.dialect = dialect = psycopg2.dialect diff --git a/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py new file mode 100644 index 000000000..d82d5f009 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py @@ -0,0 +1,188 @@ +import decimal + +from .array import ARRAY as PGARRAY +from .base import _DECIMAL_TYPES +from .base import _FLOAT_TYPES +from .base import _INT_TYPES +from .base import PGDialect +from .base import PGExecutionContext +from .base import UUID +from .hstore import HSTORE +from ... import exc +from ... import processors +from ... import types as sqltypes +from ... import util + +_server_side_id = util.counter() + + +class _PsycopgNumeric(sqltypes.Numeric): + def bind_processor(self, dialect): + return None + + def result_processor(self, dialect, coltype): + if self.asdecimal: + if coltype in _FLOAT_TYPES: + return processors.to_decimal_processor_factory( + decimal.Decimal, self._effective_decimal_return_scale + ) + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + # psycopg returns Decimal natively for 1700 + return None + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + else: + if coltype in _FLOAT_TYPES: + # psycopg returns float natively for 701 + return None + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + return processors.to_float + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + + +class _PsycopgHStore(HSTORE): + def bind_processor(self, dialect): + if dialect._has_native_hstore: + return None + else: + return super(_PsycopgHStore, self).bind_processor(dialect) + + def result_processor(self, dialect, coltype): + if dialect._has_native_hstore: + return None + else: + return super(_PsycopgHStore, self).result_processor( + dialect, coltype + ) + + +class _PsycopgUUID(UUID): + def bind_processor(self, dialect): + return None + + def result_processor(self, dialect, coltype): + if not self.as_uuid and dialect.use_native_uuid: + + def process(value): + if value is not None: + value = str(value) + return value + + return process + + +class _PsycopgARRAY(PGARRAY): + render_bind_cast = True + + +class _PGExecutionContext_common_psycopg(PGExecutionContext): + def create_server_side_cursor(self): + # use server-side cursors: + # psycopg + # https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#server-side-cursors + # psycopg2 + # https://www.psycopg.org/docs/usage.html#server-side-cursors + ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) + return self._dbapi_connection.cursor(ident) + + +class _PGDialect_common_psycopg(PGDialect): + supports_statement_cache = True + supports_server_side_cursors = True + + default_paramstyle = "pyformat" + + _has_native_hstore = True + + colspecs = util.update_copy( + PGDialect.colspecs, + { + sqltypes.Numeric: _PsycopgNumeric, + HSTORE: _PsycopgHStore, + UUID: _PsycopgUUID, + sqltypes.ARRAY: _PsycopgARRAY, + }, + ) + + def __init__( + self, + client_encoding=None, + use_native_hstore=True, + use_native_uuid=True, + **kwargs + ): + PGDialect.__init__(self, **kwargs) + if not use_native_hstore: + self._has_native_hstore = False + self.use_native_hstore = use_native_hstore + self.use_native_uuid = use_native_uuid + self.client_encoding = client_encoding + + def create_connect_args(self, url): + opts = url.translate_connect_args(username="user", database="dbname") + + is_multihost = False + if "host" in url.query: + is_multihost = isinstance(url.query["host"], (list, tuple)) + + if opts: + if "port" in opts: + opts["port"] = int(opts["port"]) + opts.update(url.query) + if is_multihost: + opts["host"] = ",".join(url.query["host"]) + # send individual dbname, user, password, host, port + # parameters to psycopg2.connect() + return ([], opts) + elif url.query: + # any other connection arguments, pass directly + opts.update(url.query) + if is_multihost: + opts["host"] = ",".join(url.query["host"]) + return ([], opts) + else: + # no connection arguments whatsoever; psycopg2.connect() + # requires that "dsn" be present as a blank string. + return ([""], opts) + + def get_isolation_level_values(self, dbapi_conn): + return ( + "AUTOCOMMIT", + "READ COMMITTED", + "READ UNCOMMITTED", + "REPEATABLE READ", + "SERIALIZABLE", + ) + + def set_deferrable(self, connection, value): + connection.deferrable = value + + def get_deferrable(self, connection): + return connection.deferrable + + def _do_autocommit(self, connection, value): + connection.autocommit = value + + def do_ping(self, dbapi_connection): + cursor = None + try: + self._do_autocommit(dbapi_connection, True) + cursor = dbapi_connection.cursor() + try: + cursor.execute(self._dialect_specific_select_one) + finally: + cursor.close() + if not dbapi_connection.closed: + self._do_autocommit(dbapi_connection, False) + except self.dbapi.Error as err: + if self.is_disconnect(err, dbapi_connection, cursor): + return False + else: + raise + else: + return True diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index 5ef9df800..1fdb46b6f 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -553,7 +553,6 @@ class AsyncAdapt_asyncpg_ss_cursor(AsyncAdapt_asyncpg_cursor): class AsyncAdapt_asyncpg_connection(AdaptedConnection): __slots__ = ( "dbapi", - "_connection", "isolation_level", "_isolation_setting", "readonly", diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py index 008398865..d1d881dc3 100644 --- a/lib/sqlalchemy/dialects/postgresql/base.py +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -1701,7 +1701,7 @@ class UUID(sqltypes.TypeEngine): or as Python uuid objects. The UUID type is currently known to work within the prominent DBAPI - drivers supported by SQLAlchemy including psycopg2, pg8000 and + drivers supported by SQLAlchemy including psycopg, psycopg2, pg8000 and asyncpg. Support for other DBAPI drivers may be incomplete or non-present. """ @@ -1992,6 +1992,12 @@ class ENUM(sqltypes.NativeForEmulated, sqltypes.Enum): self.connection.execute(DropEnumType(enum)) + def get_dbapi_type(self, dbapi): + """dont return dbapi.STRING for ENUM in PostgreSQL, since that's + a different type""" + + return None + def _check_for_name_in_memos(self, checkfirst, kw): """Look in the 'ddl runner' for 'memos', then note our name in that collection. diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg.py b/lib/sqlalchemy/dialects/postgresql/psycopg.py new file mode 100644 index 000000000..c2017c975 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/psycopg.py @@ -0,0 +1,641 @@ +# postgresql/psycopg2.py +# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +r""" +.. dialect:: postgresql+psycopg + :name: psycopg (a.k.a. psycopg 3) + :dbapi: psycopg + :connectstring: postgresql+psycopg://user:password@host:port/dbname[?key=value&key=value...] + :url: https://pypi.org/project/psycopg/ + +``psycopg`` is the package and module name for version 3 of the ``psycopg`` +database driver, formerly known as ``psycopg2``. This driver is different +enough from its ``psycopg2`` predecessor that SQLAlchemy supports it +via a totally separate dialect; support for ``psycopg2`` is expected to remain +for as long as that package continues to function for modern Python versions, +and also remains the default dialect for the ``postgresql://`` dialect +series. + +The SQLAlchemy ``psycopg`` dialect provides both a sync and an async +implementation under the same dialect name. The proper version is +selected depending on how the engine is created: + +* calling :func:`_sa.create_engine` with ``postgresql+psycopg://...`` will + automatically select the sync version, e.g.:: + + from sqlalchemy import create_engine + sync_engine = create_engine("postgresql+psycopg://scott:tiger@localhost/test") + +* calling :func:`_asyncio.create_async_engine` with + ``postgresql+psycopg://...`` will automatically select the async version, + e.g.:: + + from sqlalchemy.ext.asyncio import create_async_engine + asyncio_engine = create_async_engine("postgresql+psycopg://scott:tiger@localhost/test") + +The asyncio version of the dialect may also be specified explicitly using the +``psycopg_async`` suffix, as:: + + from sqlalchemy.ext.asyncio import create_async_engine + asyncio_engine = create_async_engine("postgresql+psycopg_async://scott:tiger@localhost/test") + +The ``psycopg`` dialect has the same API features as that of ``psycopg2``, +with the exeption of the "fast executemany" helpers. The "fast executemany" +helpers are expected to be generalized and ported to ``psycopg`` before the final +release of SQLAlchemy 2.0, however. + + +.. seealso:: + + :ref:`postgresql_psycopg2` - The SQLAlchemy ``psycopg`` + dialect shares most of its behavior with the ``psycopg2`` dialect. + Further documentation is available there. + +""" # noqa +import logging +import re + +from ._psycopg_common import _PGDialect_common_psycopg +from ._psycopg_common import _PGExecutionContext_common_psycopg +from ._psycopg_common import _PsycopgUUID +from .base import INTERVAL +from .base import PGCompiler +from .base import PGIdentifierPreparer +from .base import UUID +from .json import JSON +from .json import JSONB +from .json import JSONPathType +from ... import pool +from ... import types as sqltypes +from ... import util +from ...engine import AdaptedConnection +from ...util.concurrency import await_fallback +from ...util.concurrency import await_only + +logger = logging.getLogger("sqlalchemy.dialects.postgresql") + + +class _PGString(sqltypes.String): + render_bind_cast = True + + +class _PGJSON(JSON): + render_bind_cast = True + + def bind_processor(self, dialect): + return self._make_bind_processor(None, dialect._psycopg_Json) + + def result_processor(self, dialect, coltype): + return None + + +class _PGJSONB(JSONB): + render_bind_cast = True + + def bind_processor(self, dialect): + return self._make_bind_processor(None, dialect._psycopg_Jsonb) + + def result_processor(self, dialect, coltype): + return None + + +class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType): + __visit_name__ = "json_int_index" + + render_bind_cast = True + + +class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType): + __visit_name__ = "json_str_index" + + render_bind_cast = True + + +class _PGJSONPathType(JSONPathType): + pass + + +class _PGUUID(_PsycopgUUID): + render_bind_cast = True + + +class _PGInterval(INTERVAL): + render_bind_cast = True + + +class _PGTimeStamp(sqltypes.DateTime): + render_bind_cast = True + + +class _PGDate(sqltypes.Date): + render_bind_cast = True + + +class _PGTime(sqltypes.Time): + render_bind_cast = True + + +class _PGInteger(sqltypes.Integer): + render_bind_cast = True + + +class _PGSmallInteger(sqltypes.SmallInteger): + render_bind_cast = True + + +class _PGNullType(sqltypes.NullType): + render_bind_cast = True + + +class _PGBigInteger(sqltypes.BigInteger): + render_bind_cast = True + + +class _PGBoolean(sqltypes.Boolean): + render_bind_cast = True + + +class PGExecutionContext_psycopg(_PGExecutionContext_common_psycopg): + pass + + +class PGCompiler_psycopg(PGCompiler): + pass + + +class PGIdentifierPreparer_psycopg(PGIdentifierPreparer): + pass + + +def _log_notices(diagnostic): + logger.info("%s: %s", diagnostic.severity, diagnostic.message_primary) + + +class PGDialect_psycopg(_PGDialect_common_psycopg): + driver = "psycopg" + + supports_statement_cache = True + supports_server_side_cursors = True + default_paramstyle = "pyformat" + supports_sane_multi_rowcount = True + + execution_ctx_cls = PGExecutionContext_psycopg + statement_compiler = PGCompiler_psycopg + preparer = PGIdentifierPreparer_psycopg + psycopg_version = (0, 0) + + _has_native_hstore = True + + colspecs = util.update_copy( + _PGDialect_common_psycopg.colspecs, + { + sqltypes.String: _PGString, + JSON: _PGJSON, + sqltypes.JSON: _PGJSON, + JSONB: _PGJSONB, + sqltypes.JSON.JSONPathType: _PGJSONPathType, + sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType, + sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType, + UUID: _PGUUID, + sqltypes.Interval: _PGInterval, + INTERVAL: _PGInterval, + sqltypes.Date: _PGDate, + sqltypes.DateTime: _PGTimeStamp, + sqltypes.Time: _PGTime, + sqltypes.Integer: _PGInteger, + sqltypes.SmallInteger: _PGSmallInteger, + sqltypes.BigInteger: _PGBigInteger, + }, + ) + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + if self.dbapi: + m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__) + if m: + self.psycopg_version = tuple( + int(x) for x in m.group(1, 2, 3) if x is not None + ) + + if self.psycopg_version < (3, 0, 2): + raise ImportError( + "psycopg version 3.0.2 or higher is required." + ) + + from psycopg.adapt import AdaptersMap + + self._psycopg_adapters_map = adapters_map = AdaptersMap( + self.dbapi.adapters + ) + + if self._json_deserializer: + from psycopg.types.json import set_json_loads + + set_json_loads(self._json_deserializer, adapters_map) + + if self._json_serializer: + from psycopg.types.json import set_json_dumps + + set_json_dumps(self._json_serializer, adapters_map) + + def create_connect_args(self, url): + # see https://github.com/psycopg/psycopg/issues/83 + cargs, cparams = super().create_connect_args(url) + + cparams["context"] = self._psycopg_adapters_map + if self.client_encoding is not None: + cparams["client_encoding"] = self.client_encoding + return cargs, cparams + + def _type_info_fetch(self, connection, name): + from psycopg.types import TypeInfo + + return TypeInfo.fetch(connection.connection, name) + + def initialize(self, connection): + super().initialize(connection) + + # PGDialect.initialize() checks server version for <= 8.2 and sets + # this flag to False if so + if not self.full_returning: + self.insert_executemany_returning = False + + # HSTORE can't be registered until we have a connection so that + # we can look up its OID, so we set up this adapter in + # initialize() + if self.use_native_hstore: + info = self._type_info_fetch(connection, "hstore") + self._has_native_hstore = info is not None + if self._has_native_hstore: + from psycopg.types.hstore import register_hstore + + # register the adapter for connections made subsequent to + # this one + register_hstore(info, self._psycopg_adapters_map) + + # register the adapter for this connection + register_hstore(info, connection.connection) + + @classmethod + def dbapi(cls): + import psycopg + + return psycopg + + @classmethod + def get_async_dialect_cls(cls, url): + return PGDialectAsync_psycopg + + @util.memoized_property + def _isolation_lookup(self): + return { + "READ COMMITTED": self.dbapi.IsolationLevel.READ_COMMITTED, + "READ UNCOMMITTED": self.dbapi.IsolationLevel.READ_UNCOMMITTED, + "REPEATABLE READ": self.dbapi.IsolationLevel.REPEATABLE_READ, + "SERIALIZABLE": self.dbapi.IsolationLevel.SERIALIZABLE, + } + + @util.memoized_property + def _psycopg_Json(self): + from psycopg.types import json + + return json.Json + + @util.memoized_property + def _psycopg_Jsonb(self): + from psycopg.types import json + + return json.Jsonb + + @util.memoized_property + def _psycopg_TransactionStatus(self): + from psycopg.pq import TransactionStatus + + return TransactionStatus + + def _do_isolation_level(self, connection, autocommit, isolation_level): + connection.autocommit = autocommit + connection.isolation_level = isolation_level + + def get_isolation_level(self, dbapi_connection): + if hasattr(dbapi_connection, "dbapi_connection"): + dbapi_connection = dbapi_connection.dbapi_connection + + status_before = dbapi_connection.info.transaction_status + value = super().get_isolation_level(dbapi_connection) + + # don't rely on psycopg providing enum symbols, compare with + # eq/ne + if status_before == self._psycopg_TransactionStatus.IDLE: + dbapi_connection.rollback() + return value + + def set_isolation_level(self, connection, level): + connection = getattr(connection, "dbapi_connection", connection) + if level == "AUTOCOMMIT": + self._do_isolation_level( + connection, autocommit=True, isolation_level=None + ) + else: + self._do_isolation_level( + connection, + autocommit=False, + isolation_level=self._isolation_lookup[level], + ) + + def set_readonly(self, connection, value): + connection.read_only = value + + def get_readonly(self, connection): + return connection.read_only + + def on_connect(self): + def notices(conn): + conn.add_notice_handler(_log_notices) + + fns = [notices] + + if self.isolation_level is not None: + + def on_connect(conn): + self.set_isolation_level(conn, self.isolation_level) + + fns.append(on_connect) + + # fns always has the notices function + def on_connect(conn): + for fn in fns: + fn(conn) + + return on_connect + + def is_disconnect(self, e, connection, cursor): + if isinstance(e, self.dbapi.Error) and connection is not None: + if connection.closed or connection.broken: + return True + return False + + def _do_prepared_twophase(self, connection, command, recover=False): + dbapi_conn = connection.connection.dbapi_connection + if ( + recover + # don't rely on psycopg providing enum symbols, compare with + # eq/ne + or dbapi_conn.info.transaction_status + != self._psycopg_TransactionStatus.IDLE + ): + dbapi_conn.rollback() + before = dbapi_conn.autocommit + try: + self._do_autocommit(dbapi_conn, True) + dbapi_conn.execute(command) + finally: + self._do_autocommit(dbapi_conn, before) + + def do_rollback_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + if is_prepared: + self._do_prepared_twophase( + connection, f"ROLLBACK PREPARED '{xid}'", recover=recover + ) + else: + self.do_rollback(connection.connection) + + def do_commit_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + if is_prepared: + self._do_prepared_twophase( + connection, f"COMMIT PREPARED '{xid}'", recover=recover + ) + else: + self.do_commit(connection.connection) + + +class AsyncAdapt_psycopg_cursor: + __slots__ = ("_cursor", "await_", "_rows") + + _psycopg_ExecStatus = None + + def __init__(self, cursor, await_) -> None: + self._cursor = cursor + self.await_ = await_ + self._rows = [] + + def __getattr__(self, name): + return getattr(self._cursor, name) + + @property + def arraysize(self): + return self._cursor.arraysize + + @arraysize.setter + def arraysize(self, value): + self._cursor.arraysize = value + + def close(self): + self._rows.clear() + # Normal cursor just call _close() in a non-sync way. + self._cursor._close() + + def execute(self, query, params=None, **kw): + result = self.await_(self._cursor.execute(query, params, **kw)) + # sqlalchemy result is not async, so need to pull all rows here + res = self._cursor.pgresult + + # don't rely on psycopg providing enum symbols, compare with + # eq/ne + if res and res.status == self._psycopg_ExecStatus.TUPLES_OK: + rows = self.await_(self._cursor.fetchall()) + if not isinstance(rows, list): + self._rows = list(rows) + else: + self._rows = rows + return result + + def executemany(self, query, params_seq): + return self.await_(self._cursor.executemany(query, params_seq)) + + def __iter__(self): + # TODO: try to avoid pop(0) on a list + while self._rows: + yield self._rows.pop(0) + + def fetchone(self): + if self._rows: + # TODO: try to avoid pop(0) on a list + return self._rows.pop(0) + else: + return None + + def fetchmany(self, size=None): + if size is None: + size = self._cursor.arraysize + + retval = self._rows[0:size] + self._rows = self._rows[size:] + return retval + + def fetchall(self): + retval = self._rows + self._rows = [] + return retval + + +class AsyncAdapt_psycopg_ss_cursor(AsyncAdapt_psycopg_cursor): + def execute(self, query, params=None, **kw): + self.await_(self._cursor.execute(query, params, **kw)) + return self + + def close(self): + self.await_(self._cursor.close()) + + def fetchone(self): + return self.await_(self._cursor.fetchone()) + + def fetchmany(self, size=0): + return self.await_(self._cursor.fetchmany(size)) + + def fetchall(self): + return self.await_(self._cursor.fetchall()) + + def __iter__(self): + iterator = self._cursor.__aiter__() + while True: + try: + yield self.await_(iterator.__anext__()) + except StopAsyncIteration: + break + + +class AsyncAdapt_psycopg_connection(AdaptedConnection): + __slots__ = () + await_ = staticmethod(await_only) + + def __init__(self, connection) -> None: + self._connection = connection + + def __getattr__(self, name): + return getattr(self._connection, name) + + def execute(self, query, params=None, **kw): + cursor = self.await_(self._connection.execute(query, params, **kw)) + return AsyncAdapt_psycopg_cursor(cursor, self.await_) + + def cursor(self, *args, **kw): + cursor = self._connection.cursor(*args, **kw) + if hasattr(cursor, "name"): + return AsyncAdapt_psycopg_ss_cursor(cursor, self.await_) + else: + return AsyncAdapt_psycopg_cursor(cursor, self.await_) + + def commit(self): + self.await_(self._connection.commit()) + + def rollback(self): + self.await_(self._connection.rollback()) + + def close(self): + self.await_(self._connection.close()) + + @property + def autocommit(self): + return self._connection.autocommit + + @autocommit.setter + def autocommit(self, value): + self.set_autocommit(value) + + def set_autocommit(self, value): + self.await_(self._connection.set_autocommit(value)) + + def set_isolation_level(self, value): + self.await_(self._connection.set_isolation_level(value)) + + def set_read_only(self, value): + self.await_(self._connection.set_read_only(value)) + + def set_deferrable(self, value): + self.await_(self._connection.set_deferrable(value)) + + +class AsyncAdaptFallback_psycopg_connection(AsyncAdapt_psycopg_connection): + __slots__ = () + await_ = staticmethod(await_fallback) + + +class PsycopgAdaptDBAPI: + def __init__(self, psycopg) -> None: + self.psycopg = psycopg + + for k, v in self.psycopg.__dict__.items(): + if k != "connect": + self.__dict__[k] = v + + def connect(self, *arg, **kw): + async_fallback = kw.pop("async_fallback", False) + if util.asbool(async_fallback): + return AsyncAdaptFallback_psycopg_connection( + await_fallback( + self.psycopg.AsyncConnection.connect(*arg, **kw) + ) + ) + else: + return AsyncAdapt_psycopg_connection( + await_only(self.psycopg.AsyncConnection.connect(*arg, **kw)) + ) + + +class PGDialectAsync_psycopg(PGDialect_psycopg): + is_async = True + supports_statement_cache = True + + @classmethod + def dbapi(cls): + import psycopg + from psycopg.pq import ExecStatus + + AsyncAdapt_psycopg_cursor._psycopg_ExecStatus = ExecStatus + + return PsycopgAdaptDBAPI(psycopg) + + @classmethod + def get_pool_class(cls, url): + + async_fallback = url.query.get("async_fallback", False) + + if util.asbool(async_fallback): + return pool.FallbackAsyncAdaptedQueuePool + else: + return pool.AsyncAdaptedQueuePool + + def _type_info_fetch(self, connection, name): + from psycopg.types import TypeInfo + + adapted = connection.connection + return adapted.await_(TypeInfo.fetch(adapted._connection, name)) + + def _do_isolation_level(self, connection, autocommit, isolation_level): + connection.set_autocommit(autocommit) + connection.set_isolation_level(isolation_level) + + def _do_autocommit(self, connection, value): + connection.set_autocommit(value) + + def set_readonly(self, connection, value): + connection.set_read_only(value) + + def set_deferrable(self, connection, value): + connection.set_deferrable(value) + + def get_driver_connection(self, connection): + return connection._connection + + +dialect = PGDialect_psycopg +dialect_async = PGDialectAsync_psycopg diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index 39b6e0ed1..3d9f90a29 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -11,6 +11,8 @@ r""" :connectstring: postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...] :url: https://pypi.org/project/psycopg2/ +.. _psycopg2_toplevel: + psycopg2 Connect Arguments -------------------------- @@ -442,25 +444,15 @@ which may be more performant. """ # noqa import collections.abc as collections_abc -import decimal import logging import re -from uuid import UUID as _python_UUID -from .array import ARRAY as PGARRAY -from .base import _DECIMAL_TYPES -from .base import _FLOAT_TYPES -from .base import _INT_TYPES +from ._psycopg_common import _PGDialect_common_psycopg +from ._psycopg_common import _PGExecutionContext_common_psycopg from .base import PGCompiler -from .base import PGDialect -from .base import PGExecutionContext from .base import PGIdentifierPreparer -from .base import UUID -from .hstore import HSTORE from .json import JSON from .json import JSONB -from ... import exc -from ... import processors from ... import types as sqltypes from ... import util from ...engine import cursor as _cursor @@ -469,53 +461,6 @@ from ...engine import cursor as _cursor logger = logging.getLogger("sqlalchemy.dialects.postgresql") -class _PGNumeric(sqltypes.Numeric): - def bind_processor(self, dialect): - return None - - def result_processor(self, dialect, coltype): - if self.asdecimal: - if coltype in _FLOAT_TYPES: - return processors.to_decimal_processor_factory( - decimal.Decimal, self._effective_decimal_return_scale - ) - elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: - # pg8000 returns Decimal natively for 1700 - return None - else: - raise exc.InvalidRequestError( - "Unknown PG numeric type: %d" % coltype - ) - else: - if coltype in _FLOAT_TYPES: - # pg8000 returns float natively for 701 - return None - elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: - return processors.to_float - else: - raise exc.InvalidRequestError( - "Unknown PG numeric type: %d" % coltype - ) - - -class _PGHStore(HSTORE): - def bind_processor(self, dialect): - if dialect._has_native_hstore: - return None - else: - return super(_PGHStore, self).bind_processor(dialect) - - def result_processor(self, dialect, coltype): - if dialect._has_native_hstore: - return None - else: - return super(_PGHStore, self).result_processor(dialect, coltype) - - -class _PGARRAY(PGARRAY): - render_bind_cast = True - - class _PGJSON(JSON): def result_processor(self, dialect, coltype): return None @@ -526,40 +471,9 @@ class _PGJSONB(JSONB): return None -class _PGUUID(UUID): - def bind_processor(self, dialect): - if not self.as_uuid and dialect.use_native_uuid: - - def process(value): - if value is not None: - value = _python_UUID(value) - return value - - return process - - def result_processor(self, dialect, coltype): - if not self.as_uuid and dialect.use_native_uuid: - - def process(value): - if value is not None: - value = str(value) - return value - - return process - - -_server_side_id = util.counter() - - -class PGExecutionContext_psycopg2(PGExecutionContext): +class PGExecutionContext_psycopg2(_PGExecutionContext_common_psycopg): _psycopg2_fetched_rows = None - def create_server_side_cursor(self): - # use server-side cursors: - # https://lists.initd.org/pipermail/psycopg/2007-January/005251.html - ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) - return self._dbapi_connection.cursor(ident) - def post_exec(self): if ( self._psycopg2_fetched_rows @@ -614,7 +528,7 @@ EXECUTEMANY_VALUES_PLUS_BATCH = util.symbol( ) -class PGDialect_psycopg2(PGDialect): +class PGDialect_psycopg2(_PGDialect_common_psycopg): driver = "psycopg2" supports_statement_cache = True @@ -631,34 +545,22 @@ class PGDialect_psycopg2(PGDialect): _has_native_hstore = True colspecs = util.update_copy( - PGDialect.colspecs, + _PGDialect_common_psycopg.colspecs, { - sqltypes.Numeric: _PGNumeric, - HSTORE: _PGHStore, JSON: _PGJSON, sqltypes.JSON: _PGJSON, JSONB: _PGJSONB, - UUID: _PGUUID, - sqltypes.ARRAY: _PGARRAY, }, ) def __init__( self, - client_encoding=None, - use_native_hstore=True, - use_native_uuid=True, executemany_mode="values_only", executemany_batch_page_size=100, executemany_values_page_size=1000, **kwargs ): - PGDialect.__init__(self, **kwargs) - if not use_native_hstore: - self._has_native_hstore = False - self.use_native_hstore = use_native_hstore - self.use_native_uuid = use_native_uuid - self.client_encoding = client_encoding + _PGDialect_common_psycopg.__init__(self, **kwargs) # Parse executemany_mode argument, allowing it to be only one of the # symbol names @@ -737,9 +639,6 @@ class PGDialect_psycopg2(PGDialect): "SERIALIZABLE": extensions.ISOLATION_LEVEL_SERIALIZABLE, } - def get_isolation_level_values(self, dbapi_conn): - return list(self._isolation_lookup) - def set_isolation_level(self, connection, level): connection.set_isolation_level(self._isolation_lookup[level]) @@ -755,25 +654,6 @@ class PGDialect_psycopg2(PGDialect): def get_deferrable(self, connection): return connection.deferrable - def do_ping(self, dbapi_connection): - cursor = None - try: - dbapi_connection.autocommit = True - cursor = dbapi_connection.cursor() - try: - cursor.execute(self._dialect_specific_select_one) - finally: - cursor.close() - if not dbapi_connection.closed: - dbapi_connection.autocommit = False - except self.dbapi.Error as err: - if self.is_disconnect(err, dbapi_connection, cursor): - return False - else: - raise - else: - return True - def on_connect(self): extras = self._psycopg2_extras @@ -911,33 +791,6 @@ class PGDialect_psycopg2(PGDialect): else: return None - def create_connect_args(self, url): - opts = url.translate_connect_args(username="user") - - is_multihost = False - if "host" in url.query: - is_multihost = isinstance(url.query["host"], (list, tuple)) - - if opts: - if "port" in opts: - opts["port"] = int(opts["port"]) - opts.update(url.query) - if is_multihost: - opts["host"] = ",".join(url.query["host"]) - # send individual dbname, user, password, host, port - # parameters to psycopg2.connect() - return ([], opts) - elif url.query: - # any other connection arguments, pass directly - opts.update(url.query) - if is_multihost: - opts["host"] = ",".join(url.query["host"]) - return ([], opts) - else: - # no connection arguments whatsoever; psycopg2.connect() - # requires that "dsn" be present as a blank string. - return ([""], opts) - def is_disconnect(self, e, connection, cursor): if isinstance(e, self.dbapi.Error): # check the "closed" flag. this might not be |