# postgresql/pg8000.py # Copyright (C) 2005-2018 the SQLAlchemy authors and contributors # # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php """ .. dialect:: postgresql+pg8000 :name: pg8000 :dbapi: pg8000 :connectstring: \ postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...] :url: https://pythonhosted.org/pg8000/ .. _pg8000_unicode: Unicode ------- pg8000 will encode / decode string values between it and the server using the PostgreSQL ``client_encoding`` parameter; by default this is the value in the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``. Typically, this can be changed to ``utf-8``, as a more useful default:: #client_encoding = sql_ascii # actually, defaults to database # encoding client_encoding = utf8 The ``client_encoding`` can be overridden for a session by executing the SQL: SET CLIENT_ENCODING TO 'utf8'; SQLAlchemy will execute this SQL on all new connections based on the value passed to :func:`.create_engine` using the ``client_encoding`` parameter:: engine = create_engine( "postgresql+pg8000://user:pass@host/dbname", client_encoding='utf8') .. _pg8000_isolation_level: pg8000 Transaction Isolation Level ------------------------------------- The pg8000 dialect offers the same isolation level settings as that of the :ref:`psycopg2 ` dialect: * ``READ COMMITTED`` * ``READ UNCOMMITTED`` * ``REPEATABLE READ`` * ``SERIALIZABLE`` * ``AUTOCOMMIT`` .. versionadded:: 0.9.5 support for AUTOCOMMIT isolation level when using pg8000. .. seealso:: :ref:`postgresql_isolation_level` :ref:`psycopg2_isolation_level` """ from ... import util, exc import decimal from ... import processors from ... import types as sqltypes from .base import ( PGDialect, PGCompiler, PGIdentifierPreparer, PGExecutionContext, _DECIMAL_TYPES, _FLOAT_TYPES, _INT_TYPES, UUID, ) import re from sqlalchemy.dialects.postgresql.json import JSON from ...sql.elements import quoted_name try: from uuid import UUID as _python_UUID except ImportError: _python_UUID = None class _PGNumeric(sqltypes.Numeric): def result_processor(self, dialect, coltype): if self.asdecimal: if coltype in _FLOAT_TYPES: return processors.to_decimal_processor_factory( decimal.Decimal, self._effective_decimal_return_scale ) elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: # pg8000 returns Decimal natively for 1700 return None else: raise exc.InvalidRequestError( "Unknown PG numeric type: %d" % coltype ) else: if coltype in _FLOAT_TYPES: # pg8000 returns float natively for 701 return None elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: return processors.to_float else: raise exc.InvalidRequestError( "Unknown PG numeric type: %d" % coltype ) class _PGNumericNoBind(_PGNumeric): def bind_processor(self, dialect): return None class _PGJSON(JSON): def result_processor(self, dialect, coltype): if dialect._dbapi_version > (1, 10, 1): return None # Has native JSON else: return super(_PGJSON, self).result_processor(dialect, coltype) class _PGUUID(UUID): def bind_processor(self, dialect): if not self.as_uuid: def process(value): if value is not None: value = _python_UUID(value) return value return process def result_processor(self, dialect, coltype): if not self.as_uuid: def process(value): if value is not None: value = str(value) return value return process class PGExecutionContext_pg8000(PGExecutionContext): pass class PGCompiler_pg8000(PGCompiler): def visit_mod_binary(self, binary, operator, **kw): return ( self.process(binary.left, **kw) + " %% " + self.process(binary.right, **kw) ) def post_process_text(self, text): if "%%" in text: util.warn( "The SQLAlchemy postgresql dialect " "now automatically escapes '%' in text() " "expressions to '%%'." ) return text.replace("%", "%%") class PGIdentifierPreparer_pg8000(PGIdentifierPreparer): def _escape_identifier(self, value): value = value.replace(self.escape_quote, self.escape_to_quote) return value.replace("%", "%%") class PGDialect_pg8000(PGDialect): driver = "pg8000" supports_unicode_statements = True supports_unicode_binds = True default_paramstyle = "format" supports_sane_multi_rowcount = True execution_ctx_cls = PGExecutionContext_pg8000 statement_compiler = PGCompiler_pg8000 preparer = PGIdentifierPreparer_pg8000 description_encoding = "use_encoding" colspecs = util.update_copy( PGDialect.colspecs, { sqltypes.Numeric: _PGNumericNoBind, sqltypes.Float: _PGNumeric, JSON: _PGJSON, sqltypes.JSON: _PGJSON, UUID: _PGUUID, }, ) def __init__(self, client_encoding=None, **kwargs): PGDialect.__init__(self, **kwargs) self.client_encoding = client_encoding def initialize(self, connection): self.supports_sane_multi_rowcount = self._dbapi_version >= (1, 9, 14) super(PGDialect_pg8000, self).initialize(connection) @util.memoized_property def _dbapi_version(self): if self.dbapi and hasattr(self.dbapi, "__version__"): return tuple( [ int(x) for x in re.findall( r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__ ) ] ) else: return (99, 99, 99) @classmethod def dbapi(cls): return __import__("pg8000") def create_connect_args(self, url): opts = url.translate_connect_args(username="user") if "port" in opts: opts["port"] = int(opts["port"]) opts.update(url.query) return ([], opts) def is_disconnect(self, e, connection, cursor): return "connection is closed" in str(e) def set_isolation_level(self, connection, level): level = level.replace("_", " ") # adjust for ConnectionFairy possibly being present if hasattr(connection, "connection"): connection = connection.connection if level == "AUTOCOMMIT": connection.autocommit = True elif level in self._isolation_lookup: connection.autocommit = False cursor = connection.cursor() cursor.execute( "SET SESSION CHARACTERISTICS AS TRANSACTION " "ISOLATION LEVEL %s" % level ) cursor.execute("COMMIT") cursor.close() else: raise exc.ArgumentError( "Invalid value '%s' for isolation_level. " "Valid isolation levels for %s are %s or AUTOCOMMIT" % (level, self.name, ", ".join(self._isolation_lookup)) ) def set_client_encoding(self, connection, client_encoding): # adjust for ConnectionFairy possibly being present if hasattr(connection, "connection"): connection = connection.connection cursor = connection.cursor() cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'") cursor.execute("COMMIT") cursor.close() def do_begin_twophase(self, connection, xid): connection.connection.tpc_begin((0, xid, "")) def do_prepare_twophase(self, connection, xid): connection.connection.tpc_prepare() def do_rollback_twophase( self, connection, xid, is_prepared=True, recover=False ): connection.connection.tpc_rollback((0, xid, "")) def do_commit_twophase( self, connection, xid, is_prepared=True, recover=False ): connection.connection.tpc_commit((0, xid, "")) def do_recover_twophase(self, connection): return [row[1] for row in connection.connection.tpc_recover()] def on_connect(self): fns = [] def on_connect(conn): conn.py_types[quoted_name] = conn.py_types[util.text_type] fns.append(on_connect) if self.client_encoding is not None: def on_connect(conn): self.set_client_encoding(conn, self.client_encoding) fns.append(on_connect) if self.isolation_level is not None: def on_connect(conn): self.set_isolation_level(conn, self.isolation_level) fns.append(on_connect) if len(fns) > 0: def on_connect(conn): for fn in fns: fn(conn) return on_connect else: return None dialect = PGDialect_pg8000