summaryrefslogtreecommitdiff
path: root/oslo_db/sqlalchemy
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_db/sqlalchemy')
-rw-r--r--oslo_db/sqlalchemy/compat/__init__.py38
-rw-r--r--oslo_db/sqlalchemy/enginefacade.py24
-rw-r--r--oslo_db/sqlalchemy/engines.py81
-rw-r--r--oslo_db/sqlalchemy/exc_filters.py27
-rw-r--r--oslo_db/sqlalchemy/provision.py33
-rw-r--r--oslo_db/sqlalchemy/test_fixtures.py2
-rw-r--r--oslo_db/sqlalchemy/update_match.py4
-rw-r--r--oslo_db/sqlalchemy/utils.py47
8 files changed, 166 insertions, 90 deletions
diff --git a/oslo_db/sqlalchemy/compat/__init__.py b/oslo_db/sqlalchemy/compat/__init__.py
new file mode 100644
index 0000000..d209207
--- /dev/null
+++ b/oslo_db/sqlalchemy/compat/__init__.py
@@ -0,0 +1,38 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_utils import versionutils
+
+from sqlalchemy import __version__
+
+
+_vers = versionutils.convert_version_to_tuple(__version__)
+sqla_2 = _vers >= (2, )
+
+native_pre_ping_event_support = _vers >= (2, 0, 5)
+
+
+def dialect_from_exception_context(ctx):
+ if sqla_2:
+ # SQLAlchemy 2.0 still has context.engine, however if the
+ # exception context is called in the context of a ping handler,
+ # engine is not present. need to use dialect instead
+ return ctx.dialect
+ else:
+ return ctx.engine.dialect
+
+
+def driver_connection(connection):
+ if sqla_2:
+ return connection.connection.driver_connection
+ else:
+ return connection.connection.connection
diff --git a/oslo_db/sqlalchemy/enginefacade.py b/oslo_db/sqlalchemy/enginefacade.py
index 3a316b3..39fb061 100644
--- a/oslo_db/sqlalchemy/enginefacade.py
+++ b/oslo_db/sqlalchemy/enginefacade.py
@@ -164,7 +164,6 @@ class _TransactionFactory(object):
}
self._maker_cfg = {
'expire_on_commit': _Default(False),
- '__autocommit': False,
}
self._transaction_ctx_cfg = {
'rollback_reader_sessions': False,
@@ -468,7 +467,6 @@ class _TransactionFactory(object):
def _maker_args_for_conf(self, conf):
maker_args = self._args_for_conf(self._maker_cfg, conf)
- maker_args['autocommit'] = maker_args.pop('__autocommit')
return maker_args
def dispose_pool(self):
@@ -1238,9 +1236,6 @@ class LegacyEngineFacade(object):
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
- :param autocommit: use autocommit mode for created Session instances
- :type autocommit: bool
-
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
@@ -1282,22 +1277,14 @@ class LegacyEngineFacade(object):
"""
def __init__(self, sql_connection, slave_connection=None,
- sqlite_fk=False, autocommit=False,
- expire_on_commit=False, _conf=None, _factory=None, **kwargs):
+ sqlite_fk=False, expire_on_commit=False, _conf=None,
+ _factory=None, **kwargs):
warnings.warn(
"EngineFacade is deprecated; please use "
"oslo_db.sqlalchemy.enginefacade",
warning.OsloDBDeprecationWarning,
stacklevel=2)
- if autocommit is True:
- warnings.warn(
- 'autocommit support will be removed in SQLAlchemy 2.0 and '
- 'should not be relied on; please rework your code to remove '
- 'reliance on this feature',
- warning.OsloDBDeprecationWarning,
- stacklevel=2)
-
if _factory:
self._factory = _factory
else:
@@ -1305,7 +1292,6 @@ class LegacyEngineFacade(object):
self._factory.configure(
sqlite_fk=sqlite_fk,
- __autocommit=autocommit,
expire_on_commit=expire_on_commit,
**kwargs
)
@@ -1371,7 +1357,7 @@ class LegacyEngineFacade(object):
@classmethod
def from_config(cls, conf,
- sqlite_fk=False, autocommit=False, expire_on_commit=False):
+ sqlite_fk=False, expire_on_commit=False):
"""Initialize EngineFacade using oslo.config config instance options.
:param conf: oslo.config config instance
@@ -1380,9 +1366,6 @@ class LegacyEngineFacade(object):
:param sqlite_fk: enable foreign keys in SQLite
:type sqlite_fk: bool
- :param autocommit: use autocommit mode for created Session instances
- :type autocommit: bool
-
:param expire_on_commit: expire session objects on commit
:type expire_on_commit: bool
@@ -1391,5 +1374,4 @@ class LegacyEngineFacade(object):
return cls(
None,
sqlite_fk=sqlite_fk,
- autocommit=autocommit,
expire_on_commit=expire_on_commit, _conf=conf)
diff --git a/oslo_db/sqlalchemy/engines.py b/oslo_db/sqlalchemy/engines.py
index 31dabf6..7c36c8a 100644
--- a/oslo_db/sqlalchemy/engines.py
+++ b/oslo_db/sqlalchemy/engines.py
@@ -16,6 +16,7 @@
"""Core SQLAlchemy connectivity routines.
"""
+import functools
import itertools
import logging
import os
@@ -29,10 +30,11 @@ import sqlalchemy
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import pool
-from sqlalchemy.sql.expression import select
+from sqlalchemy import select
from oslo_db import exception
+from oslo_db.sqlalchemy import compat
from oslo_db.sqlalchemy import exc_filters
from oslo_db.sqlalchemy import ndb
from oslo_db.sqlalchemy import utils
@@ -57,6 +59,13 @@ def _connect_ping_listener(connection, branch):
Ping the server at transaction begin and transparently reconnect
if a disconnect exception occurs.
+
+ This listener is used up until SQLAlchemy 2.0.5. At 2.0.5, we use the
+ ``pool_pre_ping`` parameter instead of this event handler.
+
+ Note the current test suite in test_exc_filters still **tests** this
+ handler using all SQLAlchemy versions including 2.0.5 and greater.
+
"""
if branch:
return
@@ -94,6 +103,14 @@ def _connect_ping_listener(connection, branch):
connection.rollback()
+# SQLAlchemy 2.0 is compatible here, however oslo.db's test suite
+# raises for all deprecation errors, so we have to check for 2.0
+# and wrap out a parameter that is deprecated
+if compat.sqla_2:
+ _connect_ping_listener = functools.partial(
+ _connect_ping_listener, branch=False)
+
+
def _setup_logging(connection_debug=0):
"""setup_logging function maps SQL debug level to Python log level.
@@ -181,15 +198,18 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
json_deserializer=None, connection_parameters=None):
"""Return a new SQLAlchemy engine."""
- url = sqlalchemy.engine.url.make_url(sql_connection)
+ url = utils.make_url(sql_connection)
if connection_parameters:
url = _extend_url_parameters(url, connection_parameters)
_vet_url(url)
+ _native_pre_ping = compat.native_pre_ping_event_support
+
engine_args = {
- "pool_recycle": connection_recycle_time,
+ 'pool_recycle': connection_recycle_time,
+ 'pool_pre_ping': _native_pre_ping,
'connect_args': {},
'logging_name': logging_name
}
@@ -198,11 +218,13 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
_init_connection_args(
url, engine_args,
- max_pool_size=max_pool_size,
- max_overflow=max_overflow,
- pool_timeout=pool_timeout,
- json_serializer=json_serializer,
- json_deserializer=json_deserializer,
+ dict(
+ max_pool_size=max_pool_size,
+ max_overflow=max_overflow,
+ pool_timeout=pool_timeout,
+ json_serializer=json_serializer,
+ json_deserializer=json_deserializer,
+ )
)
engine = sqlalchemy.create_engine(url, **engine_args)
@@ -223,8 +245,10 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
# register alternate exception handler
exc_filters.register_engine(engine)
- # register engine connect handler
- event.listen(engine, "engine_connect", _connect_ping_listener)
+ if not _native_pre_ping:
+ # register engine connect handler.
+
+ event.listen(engine, "engine_connect", _connect_ping_listener)
# initial connect + test
# NOTE(viktors): the current implementation of _test_connection()
@@ -237,9 +261,16 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
@utils.dispatch_for_dialect('*', multiple=True)
-def _init_connection_args(
- url, engine_args,
- max_pool_size=None, max_overflow=None, pool_timeout=None, **kw):
+def _init_connection_args(url, engine_args, kw):
+
+ # (zzzeek) kw is passed by reference rather than as **kw so that the
+ # init_connection_args routines can modify the contents of what
+ # will be passed to create_engine, including removing arguments that
+ # don't apply. This allows things such as replacing QueuePool with
+ # NUllPool, for example, as the latter pool would reject these parameters.
+ max_pool_size = kw.get("max_pool_size", None)
+ max_overflow = kw.get("max_overflow", None)
+ pool_timeout = kw.get("pool_timeout", None)
pool_class = url.get_dialect().get_pool_class(url)
if issubclass(pool_class, pool.QueuePool):
@@ -252,17 +283,25 @@ def _init_connection_args(
@_init_connection_args.dispatch_for("sqlite")
-def _init_connection_args(url, engine_args, **kw):
+def _init_connection_args(url, engine_args, kw):
pool_class = url.get_dialect().get_pool_class(url)
- # singletonthreadpool is used for :memory: connections;
- # replace it with StaticPool.
if issubclass(pool_class, pool.SingletonThreadPool):
+ # singletonthreadpool is used for :memory: connections;
+ # replace it with StaticPool.
engine_args["poolclass"] = pool.StaticPool
engine_args['connect_args']['check_same_thread'] = False
+ elif issubclass(pool_class, pool.QueuePool):
+ # SQLAlchemy 2.0 uses QueuePool for sqlite file DBs; put NullPool
+ # back to avoid compatibility issues
+ kw.pop("max_pool_size", None)
+ kw.pop("max_overflow", None)
+ engine_args.pop("max_pool_size", None)
+ engine_args.pop("max_overflow", None)
+ engine_args["poolclass"] = pool.NullPool
@_init_connection_args.dispatch_for("postgresql")
-def _init_connection_args(url, engine_args, **kw):
+def _init_connection_args(url, engine_args, kw):
if 'client_encoding' not in url.query:
# Set encoding using engine_args instead of connect_args since
# it's supported for PostgreSQL 8.*. More details at:
@@ -273,13 +312,13 @@ def _init_connection_args(url, engine_args, **kw):
@_init_connection_args.dispatch_for("mysql")
-def _init_connection_args(url, engine_args, **kw):
+def _init_connection_args(url, engine_args, kw):
if 'charset' not in url.query:
engine_args['connect_args']['charset'] = 'utf8'
@_init_connection_args.dispatch_for("mysql+mysqlconnector")
-def _init_connection_args(url, engine_args, **kw):
+def _init_connection_args(url, engine_args, kw):
# mysqlconnector engine (<1.0) incorrectly defaults to
# raise_on_warnings=True
# https://bitbucket.org/zzzeek/sqlalchemy/issue/2515
@@ -288,8 +327,7 @@ def _init_connection_args(url, engine_args, **kw):
@_init_connection_args.dispatch_for("mysql+mysqldb")
-@_init_connection_args.dispatch_for("mysql+oursql")
-def _init_connection_args(url, engine_args, **kw):
+def _init_connection_args(url, engine_args, kw):
# Those drivers require use_unicode=0 to avoid performance drop due
# to internal usage of Python unicode objects in the driver
# http://docs.sqlalchemy.org/en/rel_0_9/dialects/mysql.html
@@ -444,7 +482,6 @@ def _add_process_guards(engine):
"database connection, "
"which is being discarded and recreated.",
{"newproc": pid, "orig": connection_record.info['pid']})
- connection_record.connection = connection_proxy.connection = None
raise exc.DisconnectionError(
"Connection record belongs to pid %s, "
"attempting to check out in pid %s" %
diff --git a/oslo_db/sqlalchemy/exc_filters.py b/oslo_db/sqlalchemy/exc_filters.py
index e578987..420b5c7 100644
--- a/oslo_db/sqlalchemy/exc_filters.py
+++ b/oslo_db/sqlalchemy/exc_filters.py
@@ -20,7 +20,7 @@ from sqlalchemy import event
from sqlalchemy import exc as sqla_exc
from oslo_db import exception
-
+from oslo_db.sqlalchemy import compat
LOG = logging.getLogger(__name__)
@@ -377,6 +377,7 @@ def _raise_operational_errors_directly_filter(operational_error,
def _is_db_connection_error(operational_error, match, engine_name,
is_disconnect):
"""Detect the exception as indicating a recoverable error on connect."""
+
raise exception.DBConnectionError(operational_error)
@@ -423,13 +424,14 @@ def handler(context):
more specific exception class are attempted first.
"""
- def _dialect_registries(engine):
- if engine.dialect.name in _registry:
- yield _registry[engine.dialect.name]
+ def _dialect_registries(dialect):
+ if dialect.name in _registry:
+ yield _registry[dialect.name]
if '*' in _registry:
yield _registry['*']
- for per_dialect in _dialect_registries(context.engine):
+ dialect = compat.dialect_from_exception_context(context)
+ for per_dialect in _dialect_registries(dialect):
for exc in (
context.sqlalchemy_exception,
context.original_exception):
@@ -443,7 +445,7 @@ def handler(context):
fn(
exc,
match,
- context.engine.dialect.name,
+ dialect.name,
context.is_disconnect)
except exception.DBError as dbe:
if (
@@ -460,6 +462,19 @@ def handler(context):
if isinstance(
dbe, exception.DBConnectionError):
context.is_disconnect = True
+
+ # new in 2.0.5
+ if (
+ hasattr(context, "is_pre_ping") and
+ context.is_pre_ping
+ ):
+ # if this is a pre-ping, need to
+ # integrate with the built
+ # in pre-ping handler that doesnt know
+ # about DBConnectionError, just needs
+ # the updated status
+ return None
+
return dbe
diff --git a/oslo_db/sqlalchemy/provision.py b/oslo_db/sqlalchemy/provision.py
index 21eb90a..a6cc527 100644
--- a/oslo_db/sqlalchemy/provision.py
+++ b/oslo_db/sqlalchemy/provision.py
@@ -24,7 +24,6 @@ import re
import string
import sqlalchemy
-from sqlalchemy.engine import url as sa_url
from sqlalchemy import schema
from sqlalchemy import sql
import testresources
@@ -242,7 +241,6 @@ class Backend(object):
:raises: ``BackendNotAvailable`` if the backend is not available.
"""
-
if not self.verified:
try:
eng = self._ensure_backend_available(self.url)
@@ -259,7 +257,7 @@ class Backend(object):
@classmethod
def _ensure_backend_available(cls, url):
- url = sa_url.make_url(str(url))
+ url = utils.make_url(url)
try:
eng = sqlalchemy.create_engine(url)
except ImportError as i_e:
@@ -362,7 +360,7 @@ class Backend(object):
]
for url_str in configured_urls:
- url = sa_url.make_url(url_str)
+ url = utils.make_url(url_str)
m = re.match(r'([^+]+?)(?:\+(.+))?$', url.drivername)
database_type = m.group(1)
Backend.backends_by_database_type[database_type] = \
@@ -494,8 +492,7 @@ class BackendImpl(object, metaclass=abc.ABCMeta):
then emit a command to switch to the named database.
"""
-
- url = sa_url.make_url(str(base_url))
+ url = utils.make_url(base_url)
# TODO(zzzeek): remove hasattr() conditional in favor of "url.set()"
# when SQLAlchemy 1.4 is the minimum version in requirements
@@ -516,16 +513,14 @@ class MySQLBackendImpl(BackendImpl):
return "mysql+pymysql://openstack_citest:openstack_citest@localhost/"
def create_named_database(self, engine, ident, conditional=False):
- with engine.connect() as conn:
+ with engine.begin() as conn:
if not conditional or not self.database_exists(conn, ident):
- with conn.begin():
- conn.exec_driver_sql("CREATE DATABASE %s" % ident)
+ conn.exec_driver_sql("CREATE DATABASE %s" % ident)
def drop_named_database(self, engine, ident, conditional=False):
- with engine.connect() as conn:
+ with engine.begin() as conn:
if not conditional or self.database_exists(conn, ident):
- with conn.begin():
- conn.exec_driver_sql("DROP DATABASE %s" % ident)
+ conn.exec_driver_sql("DROP DATABASE %s" % ident)
def database_exists(self, engine, ident):
s = sql.text("SHOW DATABASES LIKE :ident")
@@ -571,7 +566,7 @@ class SQLiteBackendImpl(BackendImpl):
def provisioned_database_url(self, base_url, ident):
if base_url.database:
- return sa_url.make_url("sqlite:////tmp/%s.db" % ident)
+ return utils.make_url("sqlite:////tmp/%s.db" % ident)
else:
return base_url
@@ -586,19 +581,17 @@ class PostgresqlBackendImpl(BackendImpl):
isolation_level="AUTOCOMMIT",
) as conn:
if not conditional or not self.database_exists(conn, ident):
- with conn.begin():
- conn.exec_driver_sql("CREATE DATABASE %s" % ident)
+ conn.exec_driver_sql("CREATE DATABASE %s" % ident)
def drop_named_database(self, engine, ident, conditional=False):
with engine.connect().execution_options(
isolation_level="AUTOCOMMIT",
) as conn:
self._close_out_database_users(conn, ident)
- with conn.begin():
- if conditional:
- conn.exec_driver_sql("DROP DATABASE IF EXISTS %s" % ident)
- else:
- conn.exec_driver_sql("DROP DATABASE %s" % ident)
+ if conditional:
+ conn.exec_driver_sql("DROP DATABASE IF EXISTS %s" % ident)
+ else:
+ conn.exec_driver_sql("DROP DATABASE %s" % ident)
def drop_additional_objects(self, conn):
enums = [e['name'] for e in sqlalchemy.inspect(conn).get_enums()]
diff --git a/oslo_db/sqlalchemy/test_fixtures.py b/oslo_db/sqlalchemy/test_fixtures.py
index 8b69d3f..f7157c0 100644
--- a/oslo_db/sqlalchemy/test_fixtures.py
+++ b/oslo_db/sqlalchemy/test_fixtures.py
@@ -360,7 +360,7 @@ class AdHocDbFixture(SimpleDbFixture):
"""
def __init__(self, url=None):
if url:
- self.url = provision.sa_url.make_url(str(url))
+ self.url = utils.make_url(url)
driver = self.url.get_backend_name()
else:
driver = None
diff --git a/oslo_db/sqlalchemy/update_match.py b/oslo_db/sqlalchemy/update_match.py
index 559aa78..c2dd8d6 100644
--- a/oslo_db/sqlalchemy/update_match.py
+++ b/oslo_db/sqlalchemy/update_match.py
@@ -393,8 +393,8 @@ def update_returning_pk(query, values, surrogate_key):
mapper = inspect(entity).mapper
session = query.session
- bind = session.connection(mapper=mapper)
- if bind.dialect.implicit_returning:
+ bind = session.connection(bind_arguments=dict(mapper=mapper))
+ if bind.dialect.name == "postgresql":
pk_strategy = _pk_strategy_returning
elif bind.dialect.name == 'mysql' and \
len(mapper.primary_key) == 1 and \
diff --git a/oslo_db/sqlalchemy/utils.py b/oslo_db/sqlalchemy/utils.py
index ba0a607..58b2486 100644
--- a/oslo_db/sqlalchemy/utils.py
+++ b/oslo_db/sqlalchemy/utils.py
@@ -213,7 +213,7 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
null_order_by_stmt = {
"": None,
"nullsfirst": sort_key_attr.is_(None),
- "nullslast": sort_key_attr.isnot(None),
+ "nullslast": sort_key_attr.is_not(None),
}[null_sort_dir]
except KeyError:
raise ValueError(_("Unknown sort direction, "
@@ -1016,26 +1016,29 @@ def suspend_fk_constraints_for_col_alter(
yield
else:
with engine.connect() as conn:
- insp = inspect(conn)
- fks = []
- for ref_table_name in referents:
- for fk in insp.get_foreign_keys(ref_table_name):
- if not fk.get('name'):
- raise AssertionError("foreign key hasn't a name.")
- if fk['referred_table'] == table_name and \
- column_name in fk['referred_columns']:
- fk['source_table'] = ref_table_name
- if 'options' not in fk:
- fk['options'] = {}
- fks.append(fk)
-
- ctx = MigrationContext.configure(conn)
- op = Operations(ctx)
-
with conn.begin():
+ insp = inspect(conn)
+ fks = []
+ for ref_table_name in referents:
+ for fk in insp.get_foreign_keys(ref_table_name):
+ if not fk.get('name'):
+ raise AssertionError("foreign key hasn't a name.")
+ if fk['referred_table'] == table_name and \
+ column_name in fk['referred_columns']:
+ fk['source_table'] = ref_table_name
+ if 'options' not in fk:
+ fk['options'] = {}
+ fks.append(fk)
+
+ ctx = MigrationContext.configure(conn)
+ op = Operations(ctx)
+
for fk in fks:
op.drop_constraint(
- fk['name'], fk['source_table'], type_="foreignkey")
+ fk['name'],
+ fk['source_table'],
+ type_="foreignkey",
+ )
yield
@@ -1051,3 +1054,11 @@ def suspend_fk_constraints_for_col_alter(
deferrable=fk['options'].get('deferrable'),
initially=fk['options'].get('initially'),
)
+
+
+def make_url(target):
+ """Return a ``url.URL`` object"""
+ if isinstance(target, (str, sa_url.URL)):
+ return sa_url.make_url(target)
+ else:
+ return sa_url.make_url(str(target))