diff options
Diffstat (limited to 'oslo_db/sqlalchemy')
-rw-r--r-- | oslo_db/sqlalchemy/compat/__init__.py | 38 | ||||
-rw-r--r-- | oslo_db/sqlalchemy/enginefacade.py | 24 | ||||
-rw-r--r-- | oslo_db/sqlalchemy/engines.py | 81 | ||||
-rw-r--r-- | oslo_db/sqlalchemy/exc_filters.py | 27 | ||||
-rw-r--r-- | oslo_db/sqlalchemy/provision.py | 33 | ||||
-rw-r--r-- | oslo_db/sqlalchemy/test_fixtures.py | 2 | ||||
-rw-r--r-- | oslo_db/sqlalchemy/update_match.py | 4 | ||||
-rw-r--r-- | oslo_db/sqlalchemy/utils.py | 47 |
8 files changed, 166 insertions, 90 deletions
diff --git a/oslo_db/sqlalchemy/compat/__init__.py b/oslo_db/sqlalchemy/compat/__init__.py new file mode 100644 index 0000000..d209207 --- /dev/null +++ b/oslo_db/sqlalchemy/compat/__init__.py @@ -0,0 +1,38 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_utils import versionutils + +from sqlalchemy import __version__ + + +_vers = versionutils.convert_version_to_tuple(__version__) +sqla_2 = _vers >= (2, ) + +native_pre_ping_event_support = _vers >= (2, 0, 5) + + +def dialect_from_exception_context(ctx): + if sqla_2: + # SQLAlchemy 2.0 still has context.engine, however if the + # exception context is called in the context of a ping handler, + # engine is not present. need to use dialect instead + return ctx.dialect + else: + return ctx.engine.dialect + + +def driver_connection(connection): + if sqla_2: + return connection.connection.driver_connection + else: + return connection.connection.connection diff --git a/oslo_db/sqlalchemy/enginefacade.py b/oslo_db/sqlalchemy/enginefacade.py index 3a316b3..39fb061 100644 --- a/oslo_db/sqlalchemy/enginefacade.py +++ b/oslo_db/sqlalchemy/enginefacade.py @@ -164,7 +164,6 @@ class _TransactionFactory(object): } self._maker_cfg = { 'expire_on_commit': _Default(False), - '__autocommit': False, } self._transaction_ctx_cfg = { 'rollback_reader_sessions': False, @@ -468,7 +467,6 @@ class _TransactionFactory(object): def _maker_args_for_conf(self, conf): maker_args = self._args_for_conf(self._maker_cfg, conf) - maker_args['autocommit'] = maker_args.pop('__autocommit') return maker_args def dispose_pool(self): @@ -1238,9 +1236,6 @@ class LegacyEngineFacade(object): :param sqlite_fk: enable foreign keys in SQLite :type sqlite_fk: bool - :param autocommit: use autocommit mode for created Session instances - :type autocommit: bool - :param expire_on_commit: expire session objects on commit :type expire_on_commit: bool @@ -1282,22 +1277,14 @@ class LegacyEngineFacade(object): """ def __init__(self, sql_connection, slave_connection=None, - sqlite_fk=False, autocommit=False, - expire_on_commit=False, _conf=None, _factory=None, **kwargs): + sqlite_fk=False, expire_on_commit=False, _conf=None, + _factory=None, **kwargs): warnings.warn( "EngineFacade is deprecated; please use " "oslo_db.sqlalchemy.enginefacade", warning.OsloDBDeprecationWarning, stacklevel=2) - if autocommit is True: - warnings.warn( - 'autocommit support will be removed in SQLAlchemy 2.0 and ' - 'should not be relied on; please rework your code to remove ' - 'reliance on this feature', - warning.OsloDBDeprecationWarning, - stacklevel=2) - if _factory: self._factory = _factory else: @@ -1305,7 +1292,6 @@ class LegacyEngineFacade(object): self._factory.configure( sqlite_fk=sqlite_fk, - __autocommit=autocommit, expire_on_commit=expire_on_commit, **kwargs ) @@ -1371,7 +1357,7 @@ class LegacyEngineFacade(object): @classmethod def from_config(cls, conf, - sqlite_fk=False, autocommit=False, expire_on_commit=False): + sqlite_fk=False, expire_on_commit=False): """Initialize EngineFacade using oslo.config config instance options. :param conf: oslo.config config instance @@ -1380,9 +1366,6 @@ class LegacyEngineFacade(object): :param sqlite_fk: enable foreign keys in SQLite :type sqlite_fk: bool - :param autocommit: use autocommit mode for created Session instances - :type autocommit: bool - :param expire_on_commit: expire session objects on commit :type expire_on_commit: bool @@ -1391,5 +1374,4 @@ class LegacyEngineFacade(object): return cls( None, sqlite_fk=sqlite_fk, - autocommit=autocommit, expire_on_commit=expire_on_commit, _conf=conf) diff --git a/oslo_db/sqlalchemy/engines.py b/oslo_db/sqlalchemy/engines.py index 31dabf6..7c36c8a 100644 --- a/oslo_db/sqlalchemy/engines.py +++ b/oslo_db/sqlalchemy/engines.py @@ -16,6 +16,7 @@ """Core SQLAlchemy connectivity routines. """ +import functools import itertools import logging import os @@ -29,10 +30,11 @@ import sqlalchemy from sqlalchemy import event from sqlalchemy import exc from sqlalchemy import pool -from sqlalchemy.sql.expression import select +from sqlalchemy import select from oslo_db import exception +from oslo_db.sqlalchemy import compat from oslo_db.sqlalchemy import exc_filters from oslo_db.sqlalchemy import ndb from oslo_db.sqlalchemy import utils @@ -57,6 +59,13 @@ def _connect_ping_listener(connection, branch): Ping the server at transaction begin and transparently reconnect if a disconnect exception occurs. + + This listener is used up until SQLAlchemy 2.0.5. At 2.0.5, we use the + ``pool_pre_ping`` parameter instead of this event handler. + + Note the current test suite in test_exc_filters still **tests** this + handler using all SQLAlchemy versions including 2.0.5 and greater. + """ if branch: return @@ -94,6 +103,14 @@ def _connect_ping_listener(connection, branch): connection.rollback() +# SQLAlchemy 2.0 is compatible here, however oslo.db's test suite +# raises for all deprecation errors, so we have to check for 2.0 +# and wrap out a parameter that is deprecated +if compat.sqla_2: + _connect_ping_listener = functools.partial( + _connect_ping_listener, branch=False) + + def _setup_logging(connection_debug=0): """setup_logging function maps SQL debug level to Python log level. @@ -181,15 +198,18 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, json_deserializer=None, connection_parameters=None): """Return a new SQLAlchemy engine.""" - url = sqlalchemy.engine.url.make_url(sql_connection) + url = utils.make_url(sql_connection) if connection_parameters: url = _extend_url_parameters(url, connection_parameters) _vet_url(url) + _native_pre_ping = compat.native_pre_ping_event_support + engine_args = { - "pool_recycle": connection_recycle_time, + 'pool_recycle': connection_recycle_time, + 'pool_pre_ping': _native_pre_ping, 'connect_args': {}, 'logging_name': logging_name } @@ -198,11 +218,13 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, _init_connection_args( url, engine_args, - max_pool_size=max_pool_size, - max_overflow=max_overflow, - pool_timeout=pool_timeout, - json_serializer=json_serializer, - json_deserializer=json_deserializer, + dict( + max_pool_size=max_pool_size, + max_overflow=max_overflow, + pool_timeout=pool_timeout, + json_serializer=json_serializer, + json_deserializer=json_deserializer, + ) ) engine = sqlalchemy.create_engine(url, **engine_args) @@ -223,8 +245,10 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, # register alternate exception handler exc_filters.register_engine(engine) - # register engine connect handler - event.listen(engine, "engine_connect", _connect_ping_listener) + if not _native_pre_ping: + # register engine connect handler. + + event.listen(engine, "engine_connect", _connect_ping_listener) # initial connect + test # NOTE(viktors): the current implementation of _test_connection() @@ -237,9 +261,16 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, @utils.dispatch_for_dialect('*', multiple=True) -def _init_connection_args( - url, engine_args, - max_pool_size=None, max_overflow=None, pool_timeout=None, **kw): +def _init_connection_args(url, engine_args, kw): + + # (zzzeek) kw is passed by reference rather than as **kw so that the + # init_connection_args routines can modify the contents of what + # will be passed to create_engine, including removing arguments that + # don't apply. This allows things such as replacing QueuePool with + # NUllPool, for example, as the latter pool would reject these parameters. + max_pool_size = kw.get("max_pool_size", None) + max_overflow = kw.get("max_overflow", None) + pool_timeout = kw.get("pool_timeout", None) pool_class = url.get_dialect().get_pool_class(url) if issubclass(pool_class, pool.QueuePool): @@ -252,17 +283,25 @@ def _init_connection_args( @_init_connection_args.dispatch_for("sqlite") -def _init_connection_args(url, engine_args, **kw): +def _init_connection_args(url, engine_args, kw): pool_class = url.get_dialect().get_pool_class(url) - # singletonthreadpool is used for :memory: connections; - # replace it with StaticPool. if issubclass(pool_class, pool.SingletonThreadPool): + # singletonthreadpool is used for :memory: connections; + # replace it with StaticPool. engine_args["poolclass"] = pool.StaticPool engine_args['connect_args']['check_same_thread'] = False + elif issubclass(pool_class, pool.QueuePool): + # SQLAlchemy 2.0 uses QueuePool for sqlite file DBs; put NullPool + # back to avoid compatibility issues + kw.pop("max_pool_size", None) + kw.pop("max_overflow", None) + engine_args.pop("max_pool_size", None) + engine_args.pop("max_overflow", None) + engine_args["poolclass"] = pool.NullPool @_init_connection_args.dispatch_for("postgresql") -def _init_connection_args(url, engine_args, **kw): +def _init_connection_args(url, engine_args, kw): if 'client_encoding' not in url.query: # Set encoding using engine_args instead of connect_args since # it's supported for PostgreSQL 8.*. More details at: @@ -273,13 +312,13 @@ def _init_connection_args(url, engine_args, **kw): @_init_connection_args.dispatch_for("mysql") -def _init_connection_args(url, engine_args, **kw): +def _init_connection_args(url, engine_args, kw): if 'charset' not in url.query: engine_args['connect_args']['charset'] = 'utf8' @_init_connection_args.dispatch_for("mysql+mysqlconnector") -def _init_connection_args(url, engine_args, **kw): +def _init_connection_args(url, engine_args, kw): # mysqlconnector engine (<1.0) incorrectly defaults to # raise_on_warnings=True # https://bitbucket.org/zzzeek/sqlalchemy/issue/2515 @@ -288,8 +327,7 @@ def _init_connection_args(url, engine_args, **kw): @_init_connection_args.dispatch_for("mysql+mysqldb") -@_init_connection_args.dispatch_for("mysql+oursql") -def _init_connection_args(url, engine_args, **kw): +def _init_connection_args(url, engine_args, kw): # Those drivers require use_unicode=0 to avoid performance drop due # to internal usage of Python unicode objects in the driver # http://docs.sqlalchemy.org/en/rel_0_9/dialects/mysql.html @@ -444,7 +482,6 @@ def _add_process_guards(engine): "database connection, " "which is being discarded and recreated.", {"newproc": pid, "orig": connection_record.info['pid']}) - connection_record.connection = connection_proxy.connection = None raise exc.DisconnectionError( "Connection record belongs to pid %s, " "attempting to check out in pid %s" % diff --git a/oslo_db/sqlalchemy/exc_filters.py b/oslo_db/sqlalchemy/exc_filters.py index e578987..420b5c7 100644 --- a/oslo_db/sqlalchemy/exc_filters.py +++ b/oslo_db/sqlalchemy/exc_filters.py @@ -20,7 +20,7 @@ from sqlalchemy import event from sqlalchemy import exc as sqla_exc from oslo_db import exception - +from oslo_db.sqlalchemy import compat LOG = logging.getLogger(__name__) @@ -377,6 +377,7 @@ def _raise_operational_errors_directly_filter(operational_error, def _is_db_connection_error(operational_error, match, engine_name, is_disconnect): """Detect the exception as indicating a recoverable error on connect.""" + raise exception.DBConnectionError(operational_error) @@ -423,13 +424,14 @@ def handler(context): more specific exception class are attempted first. """ - def _dialect_registries(engine): - if engine.dialect.name in _registry: - yield _registry[engine.dialect.name] + def _dialect_registries(dialect): + if dialect.name in _registry: + yield _registry[dialect.name] if '*' in _registry: yield _registry['*'] - for per_dialect in _dialect_registries(context.engine): + dialect = compat.dialect_from_exception_context(context) + for per_dialect in _dialect_registries(dialect): for exc in ( context.sqlalchemy_exception, context.original_exception): @@ -443,7 +445,7 @@ def handler(context): fn( exc, match, - context.engine.dialect.name, + dialect.name, context.is_disconnect) except exception.DBError as dbe: if ( @@ -460,6 +462,19 @@ def handler(context): if isinstance( dbe, exception.DBConnectionError): context.is_disconnect = True + + # new in 2.0.5 + if ( + hasattr(context, "is_pre_ping") and + context.is_pre_ping + ): + # if this is a pre-ping, need to + # integrate with the built + # in pre-ping handler that doesnt know + # about DBConnectionError, just needs + # the updated status + return None + return dbe diff --git a/oslo_db/sqlalchemy/provision.py b/oslo_db/sqlalchemy/provision.py index 21eb90a..a6cc527 100644 --- a/oslo_db/sqlalchemy/provision.py +++ b/oslo_db/sqlalchemy/provision.py @@ -24,7 +24,6 @@ import re import string import sqlalchemy -from sqlalchemy.engine import url as sa_url from sqlalchemy import schema from sqlalchemy import sql import testresources @@ -242,7 +241,6 @@ class Backend(object): :raises: ``BackendNotAvailable`` if the backend is not available. """ - if not self.verified: try: eng = self._ensure_backend_available(self.url) @@ -259,7 +257,7 @@ class Backend(object): @classmethod def _ensure_backend_available(cls, url): - url = sa_url.make_url(str(url)) + url = utils.make_url(url) try: eng = sqlalchemy.create_engine(url) except ImportError as i_e: @@ -362,7 +360,7 @@ class Backend(object): ] for url_str in configured_urls: - url = sa_url.make_url(url_str) + url = utils.make_url(url_str) m = re.match(r'([^+]+?)(?:\+(.+))?$', url.drivername) database_type = m.group(1) Backend.backends_by_database_type[database_type] = \ @@ -494,8 +492,7 @@ class BackendImpl(object, metaclass=abc.ABCMeta): then emit a command to switch to the named database. """ - - url = sa_url.make_url(str(base_url)) + url = utils.make_url(base_url) # TODO(zzzeek): remove hasattr() conditional in favor of "url.set()" # when SQLAlchemy 1.4 is the minimum version in requirements @@ -516,16 +513,14 @@ class MySQLBackendImpl(BackendImpl): return "mysql+pymysql://openstack_citest:openstack_citest@localhost/" def create_named_database(self, engine, ident, conditional=False): - with engine.connect() as conn: + with engine.begin() as conn: if not conditional or not self.database_exists(conn, ident): - with conn.begin(): - conn.exec_driver_sql("CREATE DATABASE %s" % ident) + conn.exec_driver_sql("CREATE DATABASE %s" % ident) def drop_named_database(self, engine, ident, conditional=False): - with engine.connect() as conn: + with engine.begin() as conn: if not conditional or self.database_exists(conn, ident): - with conn.begin(): - conn.exec_driver_sql("DROP DATABASE %s" % ident) + conn.exec_driver_sql("DROP DATABASE %s" % ident) def database_exists(self, engine, ident): s = sql.text("SHOW DATABASES LIKE :ident") @@ -571,7 +566,7 @@ class SQLiteBackendImpl(BackendImpl): def provisioned_database_url(self, base_url, ident): if base_url.database: - return sa_url.make_url("sqlite:////tmp/%s.db" % ident) + return utils.make_url("sqlite:////tmp/%s.db" % ident) else: return base_url @@ -586,19 +581,17 @@ class PostgresqlBackendImpl(BackendImpl): isolation_level="AUTOCOMMIT", ) as conn: if not conditional or not self.database_exists(conn, ident): - with conn.begin(): - conn.exec_driver_sql("CREATE DATABASE %s" % ident) + conn.exec_driver_sql("CREATE DATABASE %s" % ident) def drop_named_database(self, engine, ident, conditional=False): with engine.connect().execution_options( isolation_level="AUTOCOMMIT", ) as conn: self._close_out_database_users(conn, ident) - with conn.begin(): - if conditional: - conn.exec_driver_sql("DROP DATABASE IF EXISTS %s" % ident) - else: - conn.exec_driver_sql("DROP DATABASE %s" % ident) + if conditional: + conn.exec_driver_sql("DROP DATABASE IF EXISTS %s" % ident) + else: + conn.exec_driver_sql("DROP DATABASE %s" % ident) def drop_additional_objects(self, conn): enums = [e['name'] for e in sqlalchemy.inspect(conn).get_enums()] diff --git a/oslo_db/sqlalchemy/test_fixtures.py b/oslo_db/sqlalchemy/test_fixtures.py index 8b69d3f..f7157c0 100644 --- a/oslo_db/sqlalchemy/test_fixtures.py +++ b/oslo_db/sqlalchemy/test_fixtures.py @@ -360,7 +360,7 @@ class AdHocDbFixture(SimpleDbFixture): """ def __init__(self, url=None): if url: - self.url = provision.sa_url.make_url(str(url)) + self.url = utils.make_url(url) driver = self.url.get_backend_name() else: driver = None diff --git a/oslo_db/sqlalchemy/update_match.py b/oslo_db/sqlalchemy/update_match.py index 559aa78..c2dd8d6 100644 --- a/oslo_db/sqlalchemy/update_match.py +++ b/oslo_db/sqlalchemy/update_match.py @@ -393,8 +393,8 @@ def update_returning_pk(query, values, surrogate_key): mapper = inspect(entity).mapper session = query.session - bind = session.connection(mapper=mapper) - if bind.dialect.implicit_returning: + bind = session.connection(bind_arguments=dict(mapper=mapper)) + if bind.dialect.name == "postgresql": pk_strategy = _pk_strategy_returning elif bind.dialect.name == 'mysql' and \ len(mapper.primary_key) == 1 and \ diff --git a/oslo_db/sqlalchemy/utils.py b/oslo_db/sqlalchemy/utils.py index ba0a607..58b2486 100644 --- a/oslo_db/sqlalchemy/utils.py +++ b/oslo_db/sqlalchemy/utils.py @@ -213,7 +213,7 @@ def paginate_query(query, model, limit, sort_keys, marker=None, null_order_by_stmt = { "": None, "nullsfirst": sort_key_attr.is_(None), - "nullslast": sort_key_attr.isnot(None), + "nullslast": sort_key_attr.is_not(None), }[null_sort_dir] except KeyError: raise ValueError(_("Unknown sort direction, " @@ -1016,26 +1016,29 @@ def suspend_fk_constraints_for_col_alter( yield else: with engine.connect() as conn: - insp = inspect(conn) - fks = [] - for ref_table_name in referents: - for fk in insp.get_foreign_keys(ref_table_name): - if not fk.get('name'): - raise AssertionError("foreign key hasn't a name.") - if fk['referred_table'] == table_name and \ - column_name in fk['referred_columns']: - fk['source_table'] = ref_table_name - if 'options' not in fk: - fk['options'] = {} - fks.append(fk) - - ctx = MigrationContext.configure(conn) - op = Operations(ctx) - with conn.begin(): + insp = inspect(conn) + fks = [] + for ref_table_name in referents: + for fk in insp.get_foreign_keys(ref_table_name): + if not fk.get('name'): + raise AssertionError("foreign key hasn't a name.") + if fk['referred_table'] == table_name and \ + column_name in fk['referred_columns']: + fk['source_table'] = ref_table_name + if 'options' not in fk: + fk['options'] = {} + fks.append(fk) + + ctx = MigrationContext.configure(conn) + op = Operations(ctx) + for fk in fks: op.drop_constraint( - fk['name'], fk['source_table'], type_="foreignkey") + fk['name'], + fk['source_table'], + type_="foreignkey", + ) yield @@ -1051,3 +1054,11 @@ def suspend_fk_constraints_for_col_alter( deferrable=fk['options'].get('deferrable'), initially=fk['options'].get('initially'), ) + + +def make_url(target): + """Return a ``url.URL`` object""" + if isinstance(target, (str, sa_url.URL)): + return sa_url.make_url(target) + else: + return sa_url.make_url(str(target)) |