diff options
author | Federico Caselli <cfederico87@gmail.com> | 2021-09-14 23:38:00 +0200 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-11-26 10:14:44 -0500 |
commit | 5eb407f84bdabdbcd68975dbf76dc4c0809d7373 (patch) | |
tree | 0d37ab4b9c28d8a0fa6cefdcc1933d52ffd9a599 /lib/sqlalchemy/dialects/postgresql/_psycopg_common.py | |
parent | 8ddb3ef165d0c2d6d7167bb861bb349e68b5e8df (diff) | |
download | sqlalchemy-5eb407f84bdabdbcd68975dbf76dc4c0809d7373.tar.gz |
Added support for ``psycopg`` dialect.
Both sync and async versions are supported.
Fixes: #6842
Change-Id: I57751c5028acebfc6f9c43572562405453a2f2a4
Diffstat (limited to 'lib/sqlalchemy/dialects/postgresql/_psycopg_common.py')
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/_psycopg_common.py | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py new file mode 100644 index 000000000..d82d5f009 --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py @@ -0,0 +1,188 @@ +import decimal + +from .array import ARRAY as PGARRAY +from .base import _DECIMAL_TYPES +from .base import _FLOAT_TYPES +from .base import _INT_TYPES +from .base import PGDialect +from .base import PGExecutionContext +from .base import UUID +from .hstore import HSTORE +from ... import exc +from ... import processors +from ... import types as sqltypes +from ... import util + +_server_side_id = util.counter() + + +class _PsycopgNumeric(sqltypes.Numeric): + def bind_processor(self, dialect): + return None + + def result_processor(self, dialect, coltype): + if self.asdecimal: + if coltype in _FLOAT_TYPES: + return processors.to_decimal_processor_factory( + decimal.Decimal, self._effective_decimal_return_scale + ) + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + # psycopg returns Decimal natively for 1700 + return None + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + else: + if coltype in _FLOAT_TYPES: + # psycopg returns float natively for 701 + return None + elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: + return processors.to_float + else: + raise exc.InvalidRequestError( + "Unknown PG numeric type: %d" % coltype + ) + + +class _PsycopgHStore(HSTORE): + def bind_processor(self, dialect): + if dialect._has_native_hstore: + return None + else: + return super(_PsycopgHStore, self).bind_processor(dialect) + + def result_processor(self, dialect, coltype): + if dialect._has_native_hstore: + return None + else: + return super(_PsycopgHStore, self).result_processor( + dialect, coltype + ) + + +class _PsycopgUUID(UUID): + def bind_processor(self, dialect): + return None + + def result_processor(self, dialect, coltype): + if not self.as_uuid and dialect.use_native_uuid: + + def process(value): + if value is not None: + value = str(value) + return value + + return process + + +class _PsycopgARRAY(PGARRAY): + render_bind_cast = True + + +class _PGExecutionContext_common_psycopg(PGExecutionContext): + def create_server_side_cursor(self): + # use server-side cursors: + # psycopg + # https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#server-side-cursors + # psycopg2 + # https://www.psycopg.org/docs/usage.html#server-side-cursors + ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) + return self._dbapi_connection.cursor(ident) + + +class _PGDialect_common_psycopg(PGDialect): + supports_statement_cache = True + supports_server_side_cursors = True + + default_paramstyle = "pyformat" + + _has_native_hstore = True + + colspecs = util.update_copy( + PGDialect.colspecs, + { + sqltypes.Numeric: _PsycopgNumeric, + HSTORE: _PsycopgHStore, + UUID: _PsycopgUUID, + sqltypes.ARRAY: _PsycopgARRAY, + }, + ) + + def __init__( + self, + client_encoding=None, + use_native_hstore=True, + use_native_uuid=True, + **kwargs + ): + PGDialect.__init__(self, **kwargs) + if not use_native_hstore: + self._has_native_hstore = False + self.use_native_hstore = use_native_hstore + self.use_native_uuid = use_native_uuid + self.client_encoding = client_encoding + + def create_connect_args(self, url): + opts = url.translate_connect_args(username="user", database="dbname") + + is_multihost = False + if "host" in url.query: + is_multihost = isinstance(url.query["host"], (list, tuple)) + + if opts: + if "port" in opts: + opts["port"] = int(opts["port"]) + opts.update(url.query) + if is_multihost: + opts["host"] = ",".join(url.query["host"]) + # send individual dbname, user, password, host, port + # parameters to psycopg2.connect() + return ([], opts) + elif url.query: + # any other connection arguments, pass directly + opts.update(url.query) + if is_multihost: + opts["host"] = ",".join(url.query["host"]) + return ([], opts) + else: + # no connection arguments whatsoever; psycopg2.connect() + # requires that "dsn" be present as a blank string. + return ([""], opts) + + def get_isolation_level_values(self, dbapi_conn): + return ( + "AUTOCOMMIT", + "READ COMMITTED", + "READ UNCOMMITTED", + "REPEATABLE READ", + "SERIALIZABLE", + ) + + def set_deferrable(self, connection, value): + connection.deferrable = value + + def get_deferrable(self, connection): + return connection.deferrable + + def _do_autocommit(self, connection, value): + connection.autocommit = value + + def do_ping(self, dbapi_connection): + cursor = None + try: + self._do_autocommit(dbapi_connection, True) + cursor = dbapi_connection.cursor() + try: + cursor.execute(self._dialect_specific_select_one) + finally: + cursor.close() + if not dbapi_connection.closed: + self._do_autocommit(dbapi_connection, False) + except self.dbapi.Error as err: + if self.is_disconnect(err, dbapi_connection, cursor): + return False + else: + raise + else: + return True |