diff options
Diffstat (limited to 'oslo_db/sqlalchemy/engines.py')
-rw-r--r-- | oslo_db/sqlalchemy/engines.py | 65 |
1 files changed, 46 insertions, 19 deletions
diff --git a/oslo_db/sqlalchemy/engines.py b/oslo_db/sqlalchemy/engines.py index 4634ce5..146d189 100644 --- a/oslo_db/sqlalchemy/engines.py +++ b/oslo_db/sqlalchemy/engines.py @@ -16,6 +16,7 @@ """Core SQLAlchemy connectivity routines. """ +import functools import itertools import logging import os @@ -29,10 +30,11 @@ import sqlalchemy from sqlalchemy import event from sqlalchemy import exc from sqlalchemy import pool -from sqlalchemy.sql.expression import select +from sqlalchemy import select from oslo_db import exception +from oslo_db.sqlalchemy import compat from oslo_db.sqlalchemy import exc_filters from oslo_db.sqlalchemy import ndb from oslo_db.sqlalchemy import utils @@ -57,6 +59,7 @@ def _connect_ping_listener(connection, branch): Ping the server at transaction begin and transparently reconnect if a disconnect exception occurs. + """ if branch: return @@ -94,6 +97,14 @@ def _connect_ping_listener(connection, branch): connection.rollback() +# SQLAlchemy 2.0 is compatible here, however oslo.db's test suite +# raises for all deprecation errors, so we have to check for 2.0 +# and wrap out a parameter that is deprecated +if compat.sqla_2: + _connect_ping_listener = functools.partial( + _connect_ping_listener, branch=False) + + def _setup_logging(connection_debug=0): """setup_logging function maps SQL debug level to Python log level. @@ -189,7 +200,7 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, _vet_url(url) engine_args = { - "pool_recycle": connection_recycle_time, + 'pool_recycle': connection_recycle_time, 'connect_args': {}, 'logging_name': logging_name } @@ -198,11 +209,13 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, _init_connection_args( url, engine_args, - max_pool_size=max_pool_size, - max_overflow=max_overflow, - pool_timeout=pool_timeout, - json_serializer=json_serializer, - json_deserializer=json_deserializer, + dict( + max_pool_size=max_pool_size, + max_overflow=max_overflow, + pool_timeout=pool_timeout, + json_serializer=json_serializer, + json_deserializer=json_deserializer, + ) ) engine = sqlalchemy.create_engine(url, **engine_args) @@ -224,6 +237,7 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, exc_filters.register_engine(engine) # register engine connect handler + event.listen(engine, "engine_connect", _connect_ping_listener) # initial connect + test @@ -237,9 +251,16 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, @utils.dispatch_for_dialect('*', multiple=True) -def _init_connection_args( - url, engine_args, - max_pool_size=None, max_overflow=None, pool_timeout=None, **kw): +def _init_connection_args(url, engine_args, kw): + + # (zzzeek) kw is passed by reference rather than as **kw so that the + # init_connection_args routines can modify the contents of what + # will be passed to create_engine, including removing arguments that + # don't apply. This allows things such as replacing QueuePool with + # NUllPool, for example, as the latter pool would reject these parameters. + max_pool_size = kw.get("max_pool_size", None) + max_overflow = kw.get("max_overflow", None) + pool_timeout = kw.get("pool_timeout", None) pool_class = url.get_dialect().get_pool_class(url) if issubclass(pool_class, pool.QueuePool): @@ -252,17 +273,25 @@ def _init_connection_args( @_init_connection_args.dispatch_for("sqlite") -def _init_connection_args(url, engine_args, **kw): +def _init_connection_args(url, engine_args, kw): pool_class = url.get_dialect().get_pool_class(url) - # singletonthreadpool is used for :memory: connections; - # replace it with StaticPool. if issubclass(pool_class, pool.SingletonThreadPool): + # singletonthreadpool is used for :memory: connections; + # replace it with StaticPool. engine_args["poolclass"] = pool.StaticPool engine_args['connect_args']['check_same_thread'] = False + elif issubclass(pool_class, pool.QueuePool): + # SQLAlchemy 2.0 uses QueuePool for sqlite file DBs; put NullPool + # back to avoid compatibility issues + kw.pop("max_pool_size", None) + kw.pop("max_overflow", None) + engine_args.pop("max_pool_size", None) + engine_args.pop("max_overflow", None) + engine_args["poolclass"] = pool.NullPool @_init_connection_args.dispatch_for("postgresql") -def _init_connection_args(url, engine_args, **kw): +def _init_connection_args(url, engine_args, kw): if 'client_encoding' not in url.query: # Set encoding using engine_args instead of connect_args since # it's supported for PostgreSQL 8.*. More details at: @@ -273,13 +302,13 @@ def _init_connection_args(url, engine_args, **kw): @_init_connection_args.dispatch_for("mysql") -def _init_connection_args(url, engine_args, **kw): +def _init_connection_args(url, engine_args, kw): if 'charset' not in url.query: engine_args['connect_args']['charset'] = 'utf8' @_init_connection_args.dispatch_for("mysql+mysqlconnector") -def _init_connection_args(url, engine_args, **kw): +def _init_connection_args(url, engine_args, kw): # mysqlconnector engine (<1.0) incorrectly defaults to # raise_on_warnings=True # https://bitbucket.org/zzzeek/sqlalchemy/issue/2515 @@ -288,8 +317,7 @@ def _init_connection_args(url, engine_args, **kw): @_init_connection_args.dispatch_for("mysql+mysqldb") -@_init_connection_args.dispatch_for("mysql+oursql") -def _init_connection_args(url, engine_args, **kw): +def _init_connection_args(url, engine_args, kw): # Those drivers require use_unicode=0 to avoid performance drop due # to internal usage of Python unicode objects in the driver # http://docs.sqlalchemy.org/en/rel_0_9/dialects/mysql.html @@ -444,7 +472,6 @@ def _add_process_guards(engine): "database connection, " "which is being discarded and recreated.", {"newproc": pid, "orig": connection_record.info['pid']}) - connection_record.connection = connection_proxy.connection = None raise exc.DisconnectionError( "Connection record belongs to pid %s, " "attempting to check out in pid %s" % |