summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/source/usage.rst93
-rw-r--r--oslo_db/exception.py43
-rw-r--r--oslo_db/sqlalchemy/enginefacade.py995
-rw-r--r--oslo_db/sqlalchemy/engines.py413
-rw-r--r--oslo_db/sqlalchemy/orm.py66
-rw-r--r--oslo_db/sqlalchemy/session.py874
-rw-r--r--oslo_db/sqlalchemy/test_base.py7
-rw-r--r--oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py8
-rw-r--r--oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py50
-rw-r--r--oslo_db/tests/sqlalchemy/test_enginefacade.py1646
-rw-r--r--oslo_db/tests/sqlalchemy/test_exc_filters.py8
-rw-r--r--oslo_db/tests/sqlalchemy/test_sqlalchemy.py31
12 files changed, 3397 insertions, 837 deletions
diff --git a/doc/source/usage.rst b/doc/source/usage.rst
index f352ee9..11c8c2a 100644
--- a/doc/source/usage.rst
+++ b/doc/source/usage.rst
@@ -7,26 +7,91 @@ To use oslo.db in a project:
Session Handling
================
+Session handling is achieved using the :mod:`oslo_db.sqlalchemy.enginefacade`
+system. This module presents a function decorator as well as a
+context manager approach to delivering :class:`.Session` as well as
+:class:`.Connection` objects to a function or block.
+
+Both calling styles require the use of a context object. This object may
+be of any class, though when used with the decorator form, requires
+special instrumentation.
+
+The context manager form is as follows:
+
.. code:: python
- from oslo.config import cfg
- from oslo.db.sqlalchemy import session as db_session
- _FACADE = None
+ from oslo_db.sqlalchemy import enginefacade
- def _create_facade_lazily():
- global _FACADE
- if _FACADE is None:
- _FACADE = db_session.EngineFacade.from_config(cfg.CONF)
- return _FACADE
- def get_engine():
- facade = _create_facade_lazily()
- return facade.get_engine()
+ class MyContext(object):
+ "User-defined context class."
+
+
+ def some_reader_api_function(context):
+ with enginefacade.reader.using(context) as session:
+ return session.query(SomeClass).all()
+
+
+ def some_writer_api_function(context, x, y):
+ with enginefacade.writer.using(context) as session:
+ session.add(SomeClass(x, y))
+
+
+ def run_some_database_calls():
+ context = MyContext()
+
+ results = some_reader_api_function(context)
+ some_writer_api_function(context, 5, 10)
+
+
+The decorator form accesses attributes off the user-defined context
+directly; the context must be decorated with the
+:func:`oslo_db.sqlalchemy.enginefacade.transaction_context_provider`
+decorator. Each function must receive the context as the first
+positional argument:
+
+.. code:: python
+
+
+ from oslo_db.sqlalchemy import enginefacade
+
+ @enginefacade.transaction_context_provider
+ class MyContext(object):
+ "User-defined context class."
+
+ @enginefacade.reader
+ def some_reader_api_function(context):
+ return context.session.query(SomeClass).all()
+
+
+ @enginefacade.writer
+ def some_writer_api_function(context, x, y):
+ context.session.add(SomeClass(x, y))
+
+
+ def run_some_database_calls():
+ context = MyContext()
+
+ results = some_reader_api_function(context)
+ some_writer_api_function(context, 5, 10)
+
+The scope of transaction and connectivity for both approaches is managed
+transparently. The configuration for the connection comes from the standard
+:obj:`oslo_config.cfg.CONF` collection. Additional configurations can be
+established for the enginefacade using the
+:func:`oslo_db.sqlalchemy.enginefacade.configure` function, before any use of
+the database begins:
+
+.. code:: python
+
+ from oslo_db.sqlalchemy import enginefacade
- def get_session(**kwargs):
- facade = _create_facade_lazily()
- return facade.get_session(**kwargs)
+ enginefacade.configure(
+ sqlite_fk=True,
+ max_retries=5,
+ mysql_sql_mode='ANSI'
+ )
Base class for models usage
diff --git a/oslo_db/exception.py b/oslo_db/exception.py
index 506006c..c1620e1 100644
--- a/oslo_db/exception.py
+++ b/oslo_db/exception.py
@@ -205,3 +205,46 @@ class RetryRequest(Exception):
"""
def __init__(self, inner_exc):
self.inner_exc = inner_exc
+
+
+class NoEngineContextEstablished(AttributeError):
+ """Error raised for non-present enginefacade attribute access.
+
+
+ This applies to the ``session`` and ``connection`` attributes
+ of a user-defined context and/or RequestContext object, when they
+ are accessed outside of the scope of an enginefacade decorator
+ or context manager.
+
+ The exception is a subclass of AttributeError so that
+ normal Python missing attribute behaviors are maintained, such
+ as support for ``getattr(context, 'session', None)``.
+
+
+ """
+
+
+class NotSupportedWarning(Warning):
+ """Warn that an argument or call that was passed is not supported.
+
+ This subclasses Warning so that it can be filtered as a distinct
+ category.
+
+ .. seealso::
+
+ https://docs.python.org/2/library/warnings.html
+
+ """
+
+
+class OsloDBDeprecationWarning(DeprecationWarning):
+ """Issued per usage of a deprecated API.
+
+ This subclasses DeprecationWarning so that it can be filtered as a distinct
+ category.
+
+ .. seealso::
+
+ https://docs.python.org/2/library/warnings.html
+
+ """
diff --git a/oslo_db/sqlalchemy/enginefacade.py b/oslo_db/sqlalchemy/enginefacade.py
new file mode 100644
index 0000000..c21f39a
--- /dev/null
+++ b/oslo_db/sqlalchemy/enginefacade.py
@@ -0,0 +1,995 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import contextlib
+import functools
+import operator
+import threading
+import warnings
+
+from oslo_config import cfg
+from oslo_context import context as oslo_context
+
+from oslo_db import exception
+from oslo_db import options
+from oslo_db.sqlalchemy import engines
+from oslo_db.sqlalchemy import orm
+
+
+class _symbol(object):
+ """represent a fixed symbol."""
+
+ __slots__ = 'name',
+
+ def __init__(self, name):
+ self.name = name
+
+ def __repr__(self):
+ return "symbol(%r)" % self.name
+
+
+_ASYNC_READER = _symbol('ASYNC_READER')
+"""Represent the transaction state of "async reader".
+
+This state indicates that the transaction is a read-only and is
+safe to use on an asynchronously updated slave database.
+
+"""
+
+_READER = _symbol('READER')
+"""Represent the transaction state of "reader".
+
+This state indicates that the transaction is a read-only and is
+only safe to use on a synchronously updated slave database; otherwise
+the master database should be used.
+
+"""
+
+
+_WRITER = _symbol('WRITER')
+"""Represent the transaction state of "writer".
+
+This state indicates that the transaction writes data and
+should be directed at the master database.
+
+"""
+
+
+class _Default(object):
+ """Mark a value as a default value.
+
+ A value in the local configuration dictionary wrapped with
+ _Default() will not take precedence over a value that is specified
+ in cfg.CONF. Values that are set after the fact using configure()
+ will supersede those in cfg.CONF.
+
+ """
+
+ __slots__ = 'value',
+
+ _notset = _symbol("NOTSET")
+
+ def __init__(self, value=_notset):
+ self.value = value
+
+ @classmethod
+ def resolve(cls, value):
+ if isinstance(value, _Default):
+ v = value.value
+ if v is cls._notset:
+ return None
+ else:
+ return v
+ else:
+ return value
+
+ @classmethod
+ def resolve_w_conf(cls, value, conf_namespace, key):
+ if isinstance(value, _Default):
+ v = getattr(conf_namespace, key, value.value)
+ if v is cls._notset:
+ return None
+ else:
+ return v
+ else:
+ return value
+
+ @classmethod
+ def is_set(cls, value):
+ return not isinstance(value, _Default) or \
+ value.value is not cls._notset
+
+ @classmethod
+ def is_set_w_conf(cls, value, conf_namespace, key):
+ return not isinstance(value, _Default) or \
+ value.value is not cls._notset or \
+ hasattr(conf_namespace, key)
+
+
+class _TransactionFactory(object):
+ """A factory for :class:`._TransactionContext` objects.
+
+ By default, there is just one of these, set up
+ based on CONF, however instance-level :class:`._TransactionFactory`
+ objects can be made, as is the case with the
+ :class:`._TestTransactionFactory` subclass used by the oslo.db test suite.
+
+ """
+ def __init__(self):
+ self._url_cfg = {
+ 'connection': _Default(),
+ 'slave_connection': _Default(),
+ }
+ self._engine_cfg = {
+ 'sqlite_fk': _Default(False),
+ 'mysql_sql_mode': _Default('TRADITIONAL'),
+ 'idle_timeout': _Default(3600),
+ 'connection_debug': _Default(0),
+ 'max_pool_size': _Default(),
+ 'max_overflow': _Default(),
+ 'pool_timeout': _Default(),
+ 'sqlite_synchronous': _Default(True),
+ 'connection_trace': _Default(False),
+ 'max_retries': _Default(10),
+ 'retry_interval': _Default(10),
+ 'thread_checkin': _Default(True)
+ }
+ self._maker_cfg = {
+ 'expire_on_commit': _Default(False),
+ '__autocommit': True
+ }
+ self._transaction_ctx_cfg = {
+ 'rollback_reader_sessions': False,
+ }
+ self._facade_cfg = {
+ 'synchronous_reader': True
+ }
+
+ # other options that are defined in oslo.db.options.database_opts
+ # but do not apply to the standard enginefacade arguments
+ # (most seem to apply to api.DBAPI).
+ self._ignored_cfg = dict(
+ (k, _Default(None)) for k in [
+ 'db_max_retries', 'db_inc_retry_interval',
+ 'use_db_reconnect',
+ 'db_retry_interval', 'min_pool_size',
+ 'db_max_retry_interval',
+ 'sqlite_db', 'backend'])
+
+ self._started = False
+ self._legacy_facade = None
+ self._start_lock = threading.Lock()
+
+ def configure_defaults(self, **kw):
+ """Apply default configurational options.
+
+ This method can only be called before any specific
+ transaction-beginning methods have been called.
+
+ Configurational options are within a fixed set of keys, and fall
+ under three categories: URL configuration, engine configuration,
+ and session configuration. Each key given will be tested against
+ these three configuration sets to see which one is applicable; if
+ it is not applicable to any set, an exception is raised.
+
+ The configurational options given here act as **defaults**
+ when the :class:`._TransactionFactory` is configured using
+ a :class:`.oslo.config.cfg.ConfigOpts` object; the options
+ present within the :class:`.oslo.config.cfg.ConfigOpts` **take
+ precedence** versus the arguments passed here. By default,
+ the :class:`._TransactionFactory` loads in the configuration from
+ :data:`oslo.config.cfg.CONF`, after applying the
+ :data:`oslo.db.options.database_opts` configurational defaults to it.
+
+ .. seealso::
+
+ :meth:`._TransactionFactory.configure`
+
+ """
+ self._configure(True, kw)
+
+ def configure(self, **kw):
+ """Apply configurational options.
+
+ This method can only be called before any specific
+ transaction-beginning methods have been called.
+
+ Behavior here is the same as that of
+ :meth:`._TransactionFactory.configure_defaults`,
+ with the exception that values specified here will **supersede** those
+ setup in the :class:`.oslo.config.cfg.ConfigOpts` options.
+
+ .. seealso::
+
+ :meth:`._TransactionFactory.configure_defaults`
+
+ """
+ self._configure(False, kw)
+
+ def _configure(self, as_defaults, kw):
+
+ if self._started:
+ raise TypeError("this TransactionFactory is already started")
+ not_supported = []
+ for k, v in kw.items():
+ for dict_ in (
+ self._url_cfg, self._engine_cfg,
+ self._maker_cfg, self._ignored_cfg,
+ self._facade_cfg, self._transaction_ctx_cfg):
+ if k in dict_:
+ dict_[k] = _Default(v) if as_defaults else v
+ break
+ else:
+ not_supported.append(k)
+
+ if not_supported:
+ # would like to raise ValueError here, but there are just
+ # too many unrecognized (obsolete?) configuration options
+ # coming in from projects
+ warnings.warn(
+ "Configuration option(s) %r not supported" %
+ sorted(not_supported),
+ exception.NotSupportedWarning
+ )
+
+ def get_legacy_facade(self):
+ """Return a :class:`.LegacyEngineFacade` for this factory.
+
+ This facade will make use of the same engine and sessionmaker
+ as this factory, however will not share the same transaction context;
+ the legacy facade continues to work the old way of returning
+ a new Session each time get_session() is called.
+
+ """
+ if not self._legacy_facade:
+ self._legacy_facade = LegacyEngineFacade(None, _factory=self)
+ if not self._started:
+ self._start()
+
+ return self._legacy_facade
+
+ def _create_connection(self, mode):
+ if not self._started:
+ self._start()
+ if mode is _WRITER:
+ return self._writer_engine.connect()
+ elif self.synchronous_reader or mode is _ASYNC_READER:
+ return self._reader_engine.connect()
+ else:
+ return self._writer_engine.connect()
+
+ def _create_session(self, mode, bind=None):
+ if not self._started:
+ self._start()
+ kw = {}
+ # don't pass 'bind' if bind is None; the sessionmaker
+ # already has a bind to the engine.
+ if bind:
+ kw['bind'] = bind
+ if mode is _WRITER:
+ return self._writer_maker(**kw)
+ elif self.synchronous_reader or mode is _ASYNC_READER:
+ return self._reader_maker(**kw)
+ else:
+ return self._writer_maker(**kw)
+
+ def _args_for_conf(self, default_cfg, conf):
+ if conf is None:
+ return dict(
+ (key, _Default.resolve(value))
+ for key, value in default_cfg.items()
+ if _Default.is_set(value)
+ )
+ else:
+ return dict(
+ (key, _Default.resolve_w_conf(value, conf.database, key))
+ for key, value in default_cfg.items()
+ if _Default.is_set_w_conf(value, conf.database, key)
+ )
+
+ def _url_args_for_conf(self, conf):
+ return self._args_for_conf(self._url_cfg, conf)
+
+ def _engine_args_for_conf(self, conf):
+ return self._args_for_conf(self._engine_cfg, conf)
+
+ def _maker_args_for_conf(self, conf):
+ return self._args_for_conf(self._maker_cfg, conf)
+
+ def _start(self, conf=False, connection=None, slave_connection=None):
+ with self._start_lock:
+ # self._started has been checked on the outside
+ # when _start() was called. Within the lock,
+ # check the flag once more to detect the case where
+ # the start process proceeded while this thread was waiting
+ # for the lock.
+ if self._started:
+ return
+ self._started = True
+ if conf is False:
+ conf = cfg.CONF
+
+ # perform register_opts() local to actually using
+ # the cfg.CONF to maintain exact compatibility with
+ # the EngineFacade design. This can be changed if needed.
+ if conf is not None:
+ conf.register_opts(options.database_opts, 'database')
+
+ url_args = self._url_args_for_conf(conf)
+ if connection:
+ url_args['connection'] = connection
+ if slave_connection:
+ url_args['slave_connection'] = slave_connection
+ engine_args = self._engine_args_for_conf(conf)
+ maker_args = self._maker_args_for_conf(conf)
+ maker_args['autocommit'] = maker_args.pop('__autocommit')
+
+ self._writer_engine, self._writer_maker = \
+ self._setup_for_connection(
+ url_args['connection'],
+ engine_args, maker_args)
+
+ if url_args.get('slave_connection'):
+ self._reader_engine, self._reader_maker = \
+ self._setup_for_connection(
+ url_args['slave_connection'],
+ engine_args, maker_args)
+ else:
+ self._reader_engine, self._reader_maker = \
+ self._writer_engine, self._writer_maker
+
+ self.synchronous_reader = self._facade_cfg['synchronous_reader']
+
+ def _setup_for_connection(
+ self, sql_connection, engine_kwargs, maker_kwargs):
+ engine = engines.create_engine(
+ sql_connection=sql_connection, **engine_kwargs)
+ sessionmaker = orm.get_maker(engine=engine, **maker_kwargs)
+ return engine, sessionmaker
+
+
+class _TestTransactionFactory(_TransactionFactory):
+ """A :class:`._TransactionFactory` used by test suites.
+
+ This is a :class:`._TransactionFactory` that can be directly injected
+ with an existing engine and sessionmaker.
+
+ Note that while this is used by oslo.db's own tests of
+ the enginefacade system, it is also exported for use by
+ the test suites of other projects, first as an element of the
+ oslo_db.sqlalchemy.test_base module, and secondly may be used by
+ external test suites directly.
+
+ Includes a feature to inject itself temporarily as the factory
+ within the global :class:`._TransactionContextManager`.
+
+ """
+ def __init__(self, engine, maker, apply_global, synchronous_reader):
+ self._reader_engine = self._writer_engine = engine
+ self._reader_maker = self._writer_maker = maker
+ self._started = True
+ self._legacy_facade = None
+ self.synchronous_reader = synchronous_reader
+
+ self._facade_cfg = _context_manager._factory._facade_cfg
+ self._transaction_ctx_cfg = \
+ _context_manager._factory._transaction_ctx_cfg
+ if apply_global:
+ self.existing_factory = _context_manager._factory
+ _context_manager._root_factory = self
+
+ def dispose_global(self):
+ _context_manager._root_factory = self.existing_factory
+
+
+class _TransactionContext(object):
+ """Represent a single database transaction in progress."""
+
+ def __init__(
+ self, factory, global_factory=None,
+ rollback_reader_sessions=False):
+ """Construct a new :class:`.TransactionContext`.
+
+ :param factory: the :class:`.TransactionFactory` which will
+ serve as a source of connectivity.
+
+ :param global_factory: the "global" factory which will be used
+ by the global ``_context_manager`` for new ``_TransactionContext``
+ objects created under this one. When left as None the actual
+ "global" factory is used.
+
+ :param rollback_reader_sessions: if True, a :class:`.Session` object
+ will have its :meth:`.Session.rollback` method invoked at the end
+ of a ``@reader`` block, actively rolling back the transaction and
+ expiring the objects within, before the :class:`.Session` moves
+ on to be closed, which has the effect of releasing connection
+ resources back to the connection pool and detaching all objects.
+ If False, the :class:`.Session` is
+ not affected at the end of a ``@reader`` block; the underlying
+ connection referred to by this :class:`.Session` will still
+ be released in the enclosing context via the :meth:`.Session.close`
+ method, which still ensures that the DBAPI connection is rolled
+ back, however the objects associated with the :class:`.Session`
+ retain their database-persisted contents after they are detached.
+
+ .. seealso::
+
+ http://docs.sqlalchemy.org/en/rel_0_9/glossary.html#term-released\
+ SQLAlchemy documentation on what "releasing resources" means.
+
+ """
+ self.factory = factory
+ self.global_factory = global_factory
+ self.mode = None
+ self.session = None
+ self.connection = None
+ self.transaction = None
+ kw = self.factory._transaction_ctx_cfg
+ self.rollback_reader_sessions = kw['rollback_reader_sessions']
+
+ @contextlib.contextmanager
+ def _connection(self, savepoint=False):
+ if self.connection is None:
+ try:
+ if self.session is not None:
+ # use existing session, which is outer to us
+ self.connection = self.session.connection()
+ if savepoint:
+ with self.connection.begin_nested():
+ yield self.connection
+ else:
+ yield self.connection
+ else:
+ # is outermost
+ self.connection = self.factory._create_connection(
+ mode=self.mode)
+ self.transaction = self.connection.begin()
+ try:
+ yield self.connection
+ self._end_connection_transaction(self.transaction)
+ except Exception:
+ self.transaction.rollback()
+ # TODO(zzzeek) do we need save_and_reraise() here,
+ # or do newer eventlets not have issues? we are using
+ # raw "raise" in many other places in oslo.db already
+ # (and one six.reraise()).
+ raise
+ finally:
+ self.transaction = None
+ self.connection.close()
+ finally:
+ self.connection = None
+
+ else:
+ # use existing connection, which is outer to us
+ if savepoint:
+ with self.connection.begin_nested():
+ yield self.connection
+ else:
+ yield self.connection
+
+ @contextlib.contextmanager
+ def _session(self, savepoint=False):
+ if self.session is None:
+ self.session = self.factory._create_session(
+ bind=self.connection, mode=self.mode)
+ try:
+ self.session.begin()
+ yield self.session
+ self._end_session_transaction(self.session)
+ except Exception:
+ self.session.rollback()
+ # TODO(zzzeek) do we need save_and_reraise() here,
+ # or do newer eventlets not have issues? we are using
+ # raw "raise" in many other places in oslo.db already
+ # (and one six.reraise()).
+ raise
+ finally:
+ self.session.close()
+ self.session = None
+ else:
+ # use existing session, which is outer to us
+ if savepoint:
+ with self.session.begin_nested():
+ yield self.session
+ else:
+ yield self.session
+
+ def _end_session_transaction(self, session):
+ if self.mode is _WRITER:
+ session.commit()
+ elif self.rollback_reader_sessions:
+ session.rollback()
+ # In the absense of calling session.rollback(),
+ # the next call is session.close(). This releases all
+ # objects from the session into the detached state, and
+ # releases the connection as well; the connection when returned
+ # to the pool is either rolled back in any case, or closed fully.
+
+ def _end_connection_transaction(self, transaction):
+ if self.mode is _WRITER:
+ transaction.commit()
+ else:
+ transaction.rollback()
+
+ def _produce_block(self, mode, connection, savepoint):
+ if mode is _WRITER:
+ self._writer()
+ elif mode is _ASYNC_READER:
+ self._async_reader()
+ else:
+ self._reader()
+ if connection:
+ return self._connection(savepoint)
+ else:
+ return self._session(savepoint)
+
+ def _writer(self):
+ if self.mode is None:
+ self.mode = _WRITER
+ elif self.mode is _READER:
+ raise TypeError(
+ "Can't upgrade a READER transaction "
+ "to a WRITER mid-transaction")
+ elif self.mode is _ASYNC_READER:
+ raise TypeError(
+ "Can't upgrade an ASYNC_READER transaction "
+ "to a WRITER mid-transaction")
+
+ def _reader(self):
+ if self.mode is None:
+ self.mode = _READER
+ elif self.mode is _ASYNC_READER:
+ raise TypeError(
+ "Can't upgrade an ASYNC_READER transaction "
+ "to a READER mid-transaction")
+
+ def _async_reader(self):
+ if self.mode is None:
+ self.mode = _ASYNC_READER
+
+
+class _TransactionContextTLocal(threading.local):
+ def __deepcopy__(self, memo):
+ return self
+
+
+class _TransactionContextManager(object):
+ """Provide context-management and decorator patterns for transactions.
+
+ This object integrates user-defined "context" objects with the
+ :class:`._TransactionContext` class, on behalf of a
+ contained :class:`._TransactionFactory`.
+
+ """
+
+ def __init__(
+ self, root=None,
+ mode=None,
+ independent=False,
+ savepoint=False,
+ connection=False,
+ replace_global_factory=None,
+ _is_global_manager=False):
+
+ if root is None:
+ self._root = self
+ self._root_factory = _TransactionFactory()
+ else:
+ self._root = root
+
+ self._replace_global_factory = replace_global_factory
+ self._is_global_manager = _is_global_manager
+ self._mode = mode
+ self._independent = independent
+ self._savepoint = savepoint
+ if self._savepoint and self._independent:
+ raise TypeError(
+ "setting savepoint and independent makes no sense.")
+ self._connection = connection
+
+ @property
+ def _factory(self):
+ """The :class:`._TransactionFactory` associated with this context."""
+ return self._root._root_factory
+
+ def configure(self, **kw):
+ """Apply configurational options to the factory.
+
+ This method can only be called before any specific
+ transaction-beginning methods have been called.
+
+
+ """
+ self._factory.configure(**kw)
+
+ @property
+ def replace(self):
+ """Modifier to replace the global transaction factory with this one."""
+ return self._clone(replace_global_factory=self._factory)
+
+ @property
+ def writer(self):
+ """Modifier to set the transaction to WRITER."""
+ return self._clone(mode=_WRITER)
+
+ @property
+ def reader(self):
+ """Modifier to set the transaction to READER."""
+ return self._clone(mode=_READER)
+
+ @property
+ def independent(self):
+ """Modifier to start a transaction independent from any enclosing."""
+ return self._clone(independent=True)
+
+ @property
+ def savepoint(self):
+ """Modifier to start a SAVEPOINT if a transaction already exists."""
+ return self._clone(savepoint=True)
+
+ @property
+ def connection(self):
+ """Modifier to return a core Connection object instead of Session."""
+ return self._clone(connection=True)
+
+ @property
+ def async(self):
+ """Modifier to set a READER operation to ASYNC_READER."""
+
+ if self._mode is _WRITER:
+ raise TypeError("Setting async on a WRITER makes no sense")
+ return self._clone(mode=_ASYNC_READER)
+
+ def using(self, context):
+ """Provide a context manager block that will use the given context."""
+ return self._transaction_scope(context)
+
+ def __call__(self, fn):
+ """Decorate a function."""
+
+ @functools.wraps(fn)
+ def wrapper(*args, **kwargs):
+ context = args[0]
+
+ with self._transaction_scope(context):
+ return fn(*args, **kwargs)
+
+ return wrapper
+
+ def _clone(self, **kw):
+ default_kw = {
+ "independent": self._independent,
+ "mode": self._mode,
+ "connection": self._connection
+ }
+ default_kw.update(kw)
+ return _TransactionContextManager(root=self._root, **default_kw)
+
+ @contextlib.contextmanager
+ def _transaction_scope(self, context):
+ new_transaction = self._independent
+ transaction_contexts_by_thread = \
+ _transaction_contexts_by_thread(context)
+
+ current = restore = getattr(
+ transaction_contexts_by_thread, "current", None)
+
+ use_factory = self._factory
+ global_factory = None
+
+ if self._replace_global_factory:
+ use_factory = global_factory = self._replace_global_factory
+ elif current is not None and current.global_factory:
+ global_factory = current.global_factory
+
+ if self._root._is_global_manager:
+ use_factory = global_factory
+
+ if current is not None and (
+ new_transaction or current.factory is not use_factory
+ ):
+ current = None
+
+ if current is None:
+ current = transaction_contexts_by_thread.current = \
+ _TransactionContext(
+ use_factory, global_factory=global_factory,
+ **use_factory._transaction_ctx_cfg)
+
+ try:
+ if self._mode is not None:
+ with current._produce_block(
+ mode=self._mode,
+ connection=self._connection,
+ savepoint=self._savepoint) as resource:
+ yield resource
+ else:
+ yield
+ finally:
+ if restore is None:
+ del transaction_contexts_by_thread.current
+ elif current is not restore:
+ transaction_contexts_by_thread.current = restore
+
+
+def _context_descriptor(attr=None):
+ getter = operator.attrgetter(attr)
+
+ def _property_for_context(context):
+ try:
+ transaction_context = context.transaction_ctx
+ except exception.NoEngineContextEstablished:
+ raise exception.NoEngineContextEstablished(
+ "No TransactionContext is established for "
+ "this %s object within the current thread; "
+ "the %r attribute is unavailable."
+ % (context, attr)
+ )
+ else:
+ return getter(transaction_context)
+ return property(_property_for_context)
+
+
+def _transaction_ctx_for_context(context):
+ by_thread = _transaction_contexts_by_thread(context)
+ try:
+ return by_thread.current
+ except AttributeError:
+ raise exception.NoEngineContextEstablished(
+ "No TransactionContext is established for "
+ "this %s object within the current thread. "
+ % context
+ )
+
+
+def _transaction_contexts_by_thread(context):
+ transaction_contexts_by_thread = getattr(
+ context, '_enginefacade_context', None)
+ if transaction_contexts_by_thread is None:
+ transaction_contexts_by_thread = \
+ context._enginefacade_context = _TransactionContextTLocal()
+
+ return transaction_contexts_by_thread
+
+
+def transaction_context_provider(klass):
+ """Decorate a class with ``session`` and ``connection`` attributes."""
+
+ setattr(
+ klass,
+ 'transaction_ctx',
+ property(_transaction_ctx_for_context))
+
+ # Graft transaction context attributes as context properties
+ for attr in ('session', 'connection', 'transaction'):
+ setattr(klass, attr, _context_descriptor(attr))
+
+ return klass
+
+
+# apply the context descriptors to oslo.context.RequestContext
+transaction_context_provider(oslo_context.RequestContext)
+
+
+_context_manager = _TransactionContextManager(_is_global_manager=True)
+"""default context manager."""
+
+
+def transaction_context():
+ """Construct a local transaction context.
+
+ """
+ return _TransactionContextManager()
+
+
+def configure(**kw):
+ """Apply configurational options to the global factory.
+
+ This method can only be called before any specific transaction-beginning
+ methods have been called.
+
+ .. seealso::
+
+ :meth:`._TransactionFactory.configure`
+
+ """
+ _context_manager._factory.configure(**kw)
+
+
+def get_legacy_facade():
+ """Return a :class:`.LegacyEngineFacade` for the global factory.
+
+ This facade will make use of the same engine and sessionmaker
+ as this factory, however will not share the same transaction context;
+ the legacy facade continues to work the old way of returning
+ a new Session each time get_session() is called.
+
+ """
+ return _context_manager._factory.get_legacy_facade()
+
+
+reader = _context_manager.reader
+"""The global 'reader' starting point."""
+
+
+writer = _context_manager.writer
+"""The global 'writer' starting point."""
+
+
+class LegacyEngineFacade(object):
+ """A helper class for removing of global engine instances from oslo.db.
+
+ .. deprecated::
+
+ EngineFacade is deprecated. Please use
+ oslo.db.sqlalchemy.enginefacade for new development.
+
+ As a library, oslo.db can't decide where to store/when to create engine
+ and sessionmaker instances, so this must be left for a target application.
+
+ On the other hand, in order to simplify the adoption of oslo.db changes,
+ we'll provide a helper class, which creates engine and sessionmaker
+ on its instantiation and provides get_engine()/get_session() methods
+ that are compatible with corresponding utility functions that currently
+ exist in target projects, e.g. in Nova.
+
+ engine/sessionmaker instances will still be global (and they are meant to
+ be global), but they will be stored in the app context, rather that in the
+ oslo.db context.
+
+ Two important things to remember:
+
+ 1. An Engine instance is effectively a pool of DB connections, so it's
+ meant to be shared (and it's thread-safe).
+ 2. A Session instance is not meant to be shared and represents a DB
+ transactional context (i.e. it's not thread-safe). sessionmaker is
+ a factory of sessions.
+
+ """
+ def __init__(self, sql_connection, slave_connection=None,
+ sqlite_fk=False, autocommit=True,
+ expire_on_commit=False, _conf=None, _factory=None, **kwargs):
+ """Initialize engine and sessionmaker instances.
+
+ :param sql_connection: the connection string for the database to use
+ :type sql_connection: string
+
+ :param slave_connection: the connection string for the 'slave' database
+ to use. If not provided, the master database
+ will be used for all operations. Note: this
+ is meant to be used for offloading of read
+ operations to asynchronously replicated slaves
+ to reduce the load on the master database.
+ :type slave_connection: string
+
+ :param sqlite_fk: enable foreign keys in SQLite
+ :type sqlite_fk: bool
+
+ :param autocommit: use autocommit mode for created Session instances
+ :type autocommit: bool
+
+ :param expire_on_commit: expire session objects on commit
+ :type expire_on_commit: bool
+
+ Keyword arguments:
+
+ :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
+ (defaults to TRADITIONAL)
+ :keyword idle_timeout: timeout before idle sql connections are reaped
+ (defaults to 3600)
+ :keyword connection_debug: verbosity of SQL debugging information.
+ -1=Off, 0=None, 100=Everything (defaults
+ to 0)
+ :keyword max_pool_size: maximum number of SQL connections to keep open
+ in a pool (defaults to SQLAlchemy settings)
+ :keyword max_overflow: if set, use this value for max_overflow with
+ sqlalchemy (defaults to SQLAlchemy settings)
+ :keyword pool_timeout: if set, use this value for pool_timeout with
+ sqlalchemy (defaults to SQLAlchemy settings)
+ :keyword sqlite_synchronous: if True, SQLite uses synchronous mode
+ (defaults to True)
+ :keyword connection_trace: add python stack traces to SQL as comment
+ strings (defaults to False)
+ :keyword max_retries: maximum db connection retries during startup.
+ (setting -1 implies an infinite retry count)
+ (defaults to 10)
+ :keyword retry_interval: interval between retries of opening a sql
+ connection (defaults to 10)
+ :keyword thread_checkin: boolean that indicates that between each
+ engine checkin event a sleep(0) will occur to
+ allow other greenthreads to run (defaults to
+ True)
+ """
+ warnings.warn(
+ "EngineFacade is deprecated; please use "
+ "oslo.db.sqlalchemy.enginefacade",
+ exception.OsloDBDeprecationWarning,
+ stacklevel=2)
+
+ if _factory:
+ self._factory = _factory
+ else:
+ self._factory = _TransactionFactory()
+
+ self._factory.configure(
+ sqlite_fk=sqlite_fk,
+ __autocommit=autocommit,
+ expire_on_commit=expire_on_commit,
+ **kwargs
+ )
+ # make sure passed-in urls are favored over that
+ # of config
+ self._factory._start(
+ _conf, connection=sql_connection,
+ slave_connection=slave_connection)
+
+ def get_engine(self, use_slave=False):
+ """Get the engine instance (note, that it's shared).
+
+ :param use_slave: if possible, use 'slave' database for this engine.
+ If the connection string for the slave database
+ wasn't provided, 'master' engine will be returned.
+ (defaults to False)
+ :type use_slave: bool
+
+ """
+ if use_slave:
+ return self._factory._reader_engine
+ else:
+ return self._factory._writer_engine
+
+ def get_session(self, use_slave=False, **kwargs):
+ """Get a Session instance.
+
+ :param use_slave: if possible, use 'slave' database connection for
+ this session. If the connection string for the
+ slave database wasn't provided, a session bound
+ to the 'master' engine will be returned.
+ (defaults to False)
+ :type use_slave: bool
+
+ Keyword arugments will be passed to a sessionmaker instance as is (if
+ passed, they will override the ones used when the sessionmaker instance
+ was created). See SQLAlchemy Session docs for details.
+
+ """
+ if use_slave:
+ return self._factory._reader_maker(**kwargs)
+ else:
+ return self._factory._writer_maker(**kwargs)
+
+ @classmethod
+ def from_config(cls, conf,
+ sqlite_fk=False, autocommit=True, expire_on_commit=False):
+ """Initialize EngineFacade using oslo.config config instance options.
+
+ :param conf: oslo.config config instance
+ :type conf: oslo.config.cfg.ConfigOpts
+
+ :param sqlite_fk: enable foreign keys in SQLite
+ :type sqlite_fk: bool
+
+ :param autocommit: use autocommit mode for created Session instances
+ :type autocommit: bool
+
+ :param expire_on_commit: expire session objects on commit
+ :type expire_on_commit: bool
+
+ """
+
+ return cls(
+ None,
+ sqlite_fk=sqlite_fk,
+ autocommit=autocommit,
+ expire_on_commit=expire_on_commit, _conf=conf)
diff --git a/oslo_db/sqlalchemy/engines.py b/oslo_db/sqlalchemy/engines.py
new file mode 100644
index 0000000..6e041a2
--- /dev/null
+++ b/oslo_db/sqlalchemy/engines.py
@@ -0,0 +1,413 @@
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Core SQLAlchemy connectivity routines.
+"""
+
+import itertools
+import logging
+import os
+import re
+import time
+
+import six
+import sqlalchemy
+from sqlalchemy import event
+from sqlalchemy import exc
+from sqlalchemy import pool
+from sqlalchemy.sql.expression import select
+
+from oslo_db._i18n import _LW
+from oslo_db import exception
+
+from oslo_db.sqlalchemy import exc_filters
+from oslo_db.sqlalchemy import utils
+
+LOG = logging.getLogger(__name__)
+
+
+def _thread_yield(dbapi_con, con_record):
+ """Ensure other greenthreads get a chance to be executed.
+
+ If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
+ execute instead of time.sleep(0).
+ Force a context switch. With common database backends (eg MySQLdb and
+ sqlite), there is no implicit yield caused by network I/O since they are
+ implemented by C libraries that eventlet cannot monkey patch.
+ """
+ time.sleep(0)
+
+
+def _connect_ping_listener(connection, branch):
+ """Ping the server at connection startup.
+
+ Ping the server at transaction begin and transparently reconnect
+ if a disconnect exception occurs.
+ """
+ if branch:
+ return
+
+ # turn off "close with result". This can also be accomplished
+ # by branching the connection, however just setting the flag is
+ # more performant and also doesn't get involved with some
+ # connection-invalidation awkardness that occurs (see
+ # https://bitbucket.org/zzzeek/sqlalchemy/issue/3215/)
+ save_should_close_with_result = connection.should_close_with_result
+ connection.should_close_with_result = False
+ try:
+ # run a SELECT 1. use a core select() so that
+ # any details like that needed by Oracle, DB2 etc. are handled.
+ connection.scalar(select([1]))
+ except exception.DBConnectionError:
+ # catch DBConnectionError, which is raised by the filter
+ # system.
+ # disconnect detected. The connection is now
+ # "invalid", but the pool should be ready to return
+ # new connections assuming they are good now.
+ # run the select again to re-validate the Connection.
+ connection.scalar(select([1]))
+ finally:
+ connection.should_close_with_result = save_should_close_with_result
+
+
+def _setup_logging(connection_debug=0):
+ """setup_logging function maps SQL debug level to Python log level.
+
+ Connection_debug is a verbosity of SQL debugging information.
+ 0=None(default value),
+ 1=Processed only messages with WARNING level or higher
+ 50=Processed only messages with INFO level or higher
+ 100=Processed only messages with DEBUG level
+ """
+ if connection_debug >= 0:
+ logger = logging.getLogger('sqlalchemy.engine')
+ if connection_debug >= 100:
+ logger.setLevel(logging.DEBUG)
+ elif connection_debug >= 50:
+ logger.setLevel(logging.INFO)
+ else:
+ logger.setLevel(logging.WARNING)
+
+
+def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
+ idle_timeout=3600,
+ connection_debug=0, max_pool_size=None, max_overflow=None,
+ pool_timeout=None, sqlite_synchronous=True,
+ connection_trace=False, max_retries=10, retry_interval=10,
+ thread_checkin=True, logging_name=None):
+ """Return a new SQLAlchemy engine."""
+
+ url = sqlalchemy.engine.url.make_url(sql_connection)
+
+ engine_args = {
+ "pool_recycle": idle_timeout,
+ 'convert_unicode': True,
+ 'connect_args': {},
+ 'logging_name': logging_name
+ }
+
+ _setup_logging(connection_debug)
+
+ _init_connection_args(
+ url, engine_args,
+ sqlite_fk=sqlite_fk,
+ max_pool_size=max_pool_size,
+ max_overflow=max_overflow,
+ pool_timeout=pool_timeout
+ )
+
+ engine = sqlalchemy.create_engine(url, **engine_args)
+
+ _init_events(
+ engine,
+ mysql_sql_mode=mysql_sql_mode,
+ sqlite_synchronous=sqlite_synchronous,
+ sqlite_fk=sqlite_fk,
+ thread_checkin=thread_checkin,
+ connection_trace=connection_trace
+ )
+
+ # register alternate exception handler
+ exc_filters.register_engine(engine)
+
+ # register engine connect handler
+ event.listen(engine, "engine_connect", _connect_ping_listener)
+
+ # initial connect + test
+ # NOTE(viktors): the current implementation of _test_connection()
+ # does nothing, if max_retries == 0, so we can skip it
+ if max_retries:
+ test_conn = _test_connection(engine, max_retries, retry_interval)
+ test_conn.close()
+
+ return engine
+
+
+@utils.dispatch_for_dialect('*', multiple=True)
+def _init_connection_args(
+ url, engine_args,
+ max_pool_size=None, max_overflow=None, pool_timeout=None, **kw):
+
+ pool_class = url.get_dialect().get_pool_class(url)
+ if issubclass(pool_class, pool.QueuePool):
+ if max_pool_size is not None:
+ engine_args['pool_size'] = max_pool_size
+ if max_overflow is not None:
+ engine_args['max_overflow'] = max_overflow
+ if pool_timeout is not None:
+ engine_args['pool_timeout'] = pool_timeout
+
+
+@_init_connection_args.dispatch_for("sqlite")
+def _init_connection_args(url, engine_args, **kw):
+ pool_class = url.get_dialect().get_pool_class(url)
+ # singletonthreadpool is used for :memory: connections;
+ # replace it with StaticPool.
+ if issubclass(pool_class, pool.SingletonThreadPool):
+ engine_args["poolclass"] = pool.StaticPool
+ engine_args['connect_args']['check_same_thread'] = False
+
+
+@_init_connection_args.dispatch_for("postgresql")
+def _init_connection_args(url, engine_args, **kw):
+ if 'client_encoding' not in url.query:
+ # Set encoding using engine_args instead of connect_args since
+ # it's supported for PostgreSQL 8.*. More details at:
+ # http://docs.sqlalchemy.org/en/rel_0_9/dialects/postgresql.html
+ engine_args['client_encoding'] = 'utf8'
+
+
+@_init_connection_args.dispatch_for("mysql")
+def _init_connection_args(url, engine_args, **kw):
+ if 'charset' not in url.query:
+ engine_args['connect_args']['charset'] = 'utf8'
+
+
+@_init_connection_args.dispatch_for("mysql+mysqlconnector")
+def _init_connection_args(url, engine_args, **kw):
+ # mysqlconnector engine (<1.0) incorrectly defaults to
+ # raise_on_warnings=True
+ # https://bitbucket.org/zzzeek/sqlalchemy/issue/2515
+ if 'raise_on_warnings' not in url.query:
+ engine_args['connect_args']['raise_on_warnings'] = False
+
+
+@_init_connection_args.dispatch_for("mysql+mysqldb")
+@_init_connection_args.dispatch_for("mysql+oursql")
+def _init_connection_args(url, engine_args, **kw):
+ # Those drivers require use_unicode=0 to avoid performance drop due
+ # to internal usage of Python unicode objects in the driver
+ # http://docs.sqlalchemy.org/en/rel_0_9/dialects/mysql.html
+ if 'use_unicode' not in url.query:
+ engine_args['connect_args']['use_unicode'] = 0
+
+
+@utils.dispatch_for_dialect('*', multiple=True)
+def _init_events(engine, thread_checkin=True, connection_trace=False, **kw):
+ """Set up event listeners for all database backends."""
+
+ _add_process_guards(engine)
+
+ if connection_trace:
+ _add_trace_comments(engine)
+
+ if thread_checkin:
+ sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
+
+
+@_init_events.dispatch_for("mysql")
+def _init_events(engine, mysql_sql_mode=None, **kw):
+ """Set up event listeners for MySQL."""
+
+ if mysql_sql_mode is not None:
+ @sqlalchemy.event.listens_for(engine, "connect")
+ def _set_session_sql_mode(dbapi_con, connection_rec):
+ cursor = dbapi_con.cursor()
+ cursor.execute("SET SESSION sql_mode = %s", [mysql_sql_mode])
+
+ @sqlalchemy.event.listens_for(engine, "first_connect")
+ def _check_effective_sql_mode(dbapi_con, connection_rec):
+ if mysql_sql_mode is not None:
+ _set_session_sql_mode(dbapi_con, connection_rec)
+
+ cursor = dbapi_con.cursor()
+ cursor.execute("SHOW VARIABLES LIKE 'sql_mode'")
+ realmode = cursor.fetchone()
+
+ if realmode is None:
+ LOG.warning(_LW('Unable to detect effective SQL mode'))
+ else:
+ realmode = realmode[1]
+ LOG.debug('MySQL server mode set to %s', realmode)
+ if 'TRADITIONAL' not in realmode.upper() and \
+ 'STRICT_ALL_TABLES' not in realmode.upper():
+ LOG.warning(
+ _LW(
+ "MySQL SQL mode is '%s', "
+ "consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
+ realmode)
+
+
+@_init_events.dispatch_for("sqlite")
+def _init_events(engine, sqlite_synchronous=True, sqlite_fk=False, **kw):
+ """Set up event listeners for SQLite.
+
+ This includes several settings made on connections as they are
+ created, as well as transactional control extensions.
+
+ """
+
+ def regexp(expr, item):
+ reg = re.compile(expr)
+ return reg.search(six.text_type(item)) is not None
+
+ @sqlalchemy.event.listens_for(engine, "connect")
+ def _sqlite_connect_events(dbapi_con, con_record):
+
+ # Add REGEXP functionality on SQLite connections
+ dbapi_con.create_function('regexp', 2, regexp)
+
+ if not sqlite_synchronous:
+ # Switch sqlite connections to non-synchronous mode
+ dbapi_con.execute("PRAGMA synchronous = OFF")
+
+ # Disable pysqlite's emitting of the BEGIN statement entirely.
+ # Also stops it from emitting COMMIT before any DDL.
+ # below, we emit BEGIN ourselves.
+ # see http://docs.sqlalchemy.org/en/rel_0_9/dialects/\
+ # sqlite.html#serializable-isolation-savepoints-transactional-ddl
+ dbapi_con.isolation_level = None
+
+ if sqlite_fk:
+ # Ensures that the foreign key constraints are enforced in SQLite.
+ dbapi_con.execute('pragma foreign_keys=ON')
+
+ @sqlalchemy.event.listens_for(engine, "begin")
+ def _sqlite_emit_begin(conn):
+ # emit our own BEGIN, checking for existing
+ # transactional state
+ if 'in_transaction' not in conn.info:
+ conn.execute("BEGIN")
+ conn.info['in_transaction'] = True
+
+ @sqlalchemy.event.listens_for(engine, "rollback")
+ @sqlalchemy.event.listens_for(engine, "commit")
+ def _sqlite_end_transaction(conn):
+ # remove transactional marker
+ conn.info.pop('in_transaction', None)
+
+
+def _test_connection(engine, max_retries, retry_interval):
+ if max_retries == -1:
+ attempts = itertools.count()
+ else:
+ attempts = six.moves.range(max_retries)
+ # See: http://legacy.python.org/dev/peps/pep-3110/#semantic-changes for
+ # why we are not using 'de' directly (it can be removed from the local
+ # scope).
+ de_ref = None
+ for attempt in attempts:
+ try:
+ return engine.connect()
+ except exception.DBConnectionError as de:
+ msg = _LW('SQL connection failed. %s attempts left.')
+ LOG.warning(msg, max_retries - attempt)
+ time.sleep(retry_interval)
+ de_ref = de
+ else:
+ if de_ref is not None:
+ six.reraise(type(de_ref), de_ref)
+
+
+def _add_process_guards(engine):
+ """Add multiprocessing guards.
+
+ Forces a connection to be reconnected if it is detected
+ as having been shared to a sub-process.
+
+ """
+
+ @sqlalchemy.event.listens_for(engine, "connect")
+ def connect(dbapi_connection, connection_record):
+ connection_record.info['pid'] = os.getpid()
+
+ @sqlalchemy.event.listens_for(engine, "checkout")
+ def checkout(dbapi_connection, connection_record, connection_proxy):
+ pid = os.getpid()
+ if connection_record.info['pid'] != pid:
+ LOG.debug(_LW(
+ "Parent process %(orig)s forked (%(newproc)s) with an open "
+ "database connection, "
+ "which is being discarded and recreated."),
+ {"newproc": pid, "orig": connection_record.info['pid']})
+ connection_record.connection = connection_proxy.connection = None
+ raise exc.DisconnectionError(
+ "Connection record belongs to pid %s, "
+ "attempting to check out in pid %s" %
+ (connection_record.info['pid'], pid)
+ )
+
+
+def _add_trace_comments(engine):
+ """Add trace comments.
+
+ Augment statements with a trace of the immediate calling code
+ for a given statement.
+ """
+
+ import os
+ import sys
+ import traceback
+ target_paths = set([
+ os.path.dirname(sys.modules['oslo_db'].__file__),
+ os.path.dirname(sys.modules['sqlalchemy'].__file__)
+ ])
+ skip_paths = set([
+ os.path.dirname(sys.modules['oslo_db.tests'].__file__),
+ ])
+
+ @sqlalchemy.event.listens_for(engine, "before_cursor_execute", retval=True)
+ def before_cursor_execute(conn, cursor, statement, parameters, context,
+ executemany):
+
+ # NOTE(zzzeek) - if different steps per DB dialect are desirable
+ # here, switch out on engine.name for now.
+ stack = traceback.extract_stack()
+ our_line = None
+
+ for idx, (filename, line, method, function) in enumerate(stack):
+ for tgt in skip_paths:
+ if filename.startswith(tgt):
+ break
+ else:
+ for tgt in target_paths:
+ if filename.startswith(tgt):
+ our_line = idx
+ break
+ if our_line:
+ break
+
+ if our_line:
+ trace = "; ".join(
+ "File: %s (%s) %s" % (
+ line[0], line[1], line[2]
+ )
+ # include three lines of context.
+ for line in stack[our_line - 3:our_line]
+
+ )
+ statement = "%s -- %s" % (statement, trace)
+
+ return statement, parameters
diff --git a/oslo_db/sqlalchemy/orm.py b/oslo_db/sqlalchemy/orm.py
new file mode 100644
index 0000000..decd8c8
--- /dev/null
+++ b/oslo_db/sqlalchemy/orm.py
@@ -0,0 +1,66 @@
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""SQLAlchemy ORM connectivity and query structures.
+"""
+
+from oslo_utils import timeutils
+import sqlalchemy.orm
+from sqlalchemy.sql.expression import literal_column
+
+from oslo_db.sqlalchemy import update_match
+
+
+class Query(sqlalchemy.orm.query.Query):
+ """Subclass of sqlalchemy.query with soft_delete() method."""
+ def soft_delete(self, synchronize_session='evaluate'):
+ return self.update({'deleted': literal_column('id'),
+ 'updated_at': literal_column('updated_at'),
+ 'deleted_at': timeutils.utcnow()},
+ synchronize_session=synchronize_session)
+
+ def update_returning_pk(self, values, surrogate_key):
+ """Perform an UPDATE, returning the primary key of the matched row.
+
+ This is a method-version of
+ oslo_db.sqlalchemy.update_match.update_returning_pk(); see that
+ function for usage details.
+
+ """
+ return update_match.update_returning_pk(self, values, surrogate_key)
+
+ def update_on_match(self, specimen, surrogate_key, values, **kw):
+ """Emit an UPDATE statement matching the given specimen.
+
+ This is a method-version of
+ oslo_db.sqlalchemy.update_match.update_on_match(); see that function
+ for usage details.
+
+ """
+ return update_match.update_on_match(
+ self, specimen, surrogate_key, values, **kw)
+
+
+class Session(sqlalchemy.orm.session.Session):
+ """oslo.db-specific Session subclass."""
+
+
+def get_maker(engine, autocommit=True, expire_on_commit=False):
+ """Return a SQLAlchemy sessionmaker using the given engine."""
+ return sqlalchemy.orm.sessionmaker(bind=engine,
+ class_=Session,
+ autocommit=autocommit,
+ expire_on_commit=expire_on_commit,
+ query_cls=Query)
diff --git a/oslo_db/sqlalchemy/session.py b/oslo_db/sqlalchemy/session.py
index 0dae851..9155176 100644
--- a/oslo_db/sqlalchemy/session.py
+++ b/oslo_db/sqlalchemy/session.py
@@ -18,96 +18,71 @@
Recommended ways to use sessions within this framework:
-* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
- `model_query()` will implicitly use a session when called without one
- supplied. This is the ideal situation because it will allow queries
- to be automatically retried if the database connection is interrupted.
-
- .. note:: Automatic retry will be enabled in a future patch.
-
- It is generally fine to issue several queries in a row like this. Even though
- they may be run in separate transactions and/or separate sessions, each one
- will see the data from the prior calls. If needed, undo- or rollback-like
- functionality should be handled at a logical level. For an example, look at
- the code around quotas and `reservation_rollback()`.
-
- Examples:
+* Use the ``enginefacade`` system for connectivity, session and
+ transaction management:
.. code-block:: python
+ from oslo.db.sqlalchemy import enginefacade
+
+ @enginefacade.reader
def get_foo(context, foo):
- return (model_query(context, models.Foo).
+ return (model_query(models.Foo, context.session).
filter_by(foo=foo).
first())
+ @enginefacade.writer
def update_foo(context, id, newfoo):
- (model_query(context, models.Foo).
+ (model_query(models.Foo, context.session).
filter_by(id=id).
update({'foo': newfoo}))
+ @enginefacade.writer
def create_foo(context, values):
foo_ref = models.Foo()
foo_ref.update(values)
- foo_ref.save()
+ foo_ref.save(context.session)
return foo_ref
-
-* Within the scope of a single method, keep all the reads and writes within
- the context managed by a single session. In this way, the session's
- `__exit__` handler will take care of calling `flush()` and `commit()` for
- you. If using this approach, you should not explicitly call `flush()` or
- `commit()`. Any error within the context of the session will cause the
- session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be
- raised in `session`'s `__exit__` handler, and any try/except within the
- context managed by `session` will not be triggered. And catching other
- non-database errors in the session will not trigger the ROLLBACK, so
- exception handlers should always be outside the session, unless the
- developer wants to do a partial commit on purpose. If the connection is
- dropped before this is possible, the database will implicitly roll back the
- transaction.
+ In the above system, transactions are committed automatically, and
+ are shared among all dependent database methods. Ensure
+ that methods which "write" data are enclosed within @writer blocks.
.. note:: Statements in the session scope will not be automatically retried.
- If you create models within the session, they need to be added, but you
+* If you create models within the session, they need to be added, but you
do not need to call `model.save()`:
.. code-block:: python
+ @enginefacade.writer
def create_many_foo(context, foos):
- session = sessionmaker()
- with session.begin():
- for foo in foos:
- foo_ref = models.Foo()
- foo_ref.update(foo)
- session.add(foo_ref)
+ for foo in foos:
+ foo_ref = models.Foo()
+ foo_ref.update(foo)
+ context.session.add(foo_ref)
+ @enginefacade.writer
def update_bar(context, foo_id, newbar):
- session = sessionmaker()
- with session.begin():
- foo_ref = (model_query(context, models.Foo, session).
- filter_by(id=foo_id).
- first())
- (model_query(context, models.Bar, session).
- filter_by(id=foo_ref['bar_id']).
- update({'bar': newbar}))
-
- .. note:: `update_bar` is a trivially simple example of using
- ``with session.begin``. Whereas `create_many_foo` is a good example of
- when a transaction is needed, it is always best to use as few queries as
- possible.
-
- The two queries in `update_bar` can be better expressed using a single query
- which avoids the need for an explicit transaction. It can be expressed like
- so:
+ foo_ref = (model_query(models.Foo, context.session).
+ filter_by(id=foo_id).
+ first())
+ (model_query(models.Bar, context.session).
+ filter_by(id=foo_ref['bar_id']).
+ update({'bar': newbar}))
+
+ The two queries in `update_bar` can alternatively be expressed using
+ a single query, which may be more efficient depending on scenario:
.. code-block:: python
+ @enginefacade.writer
def update_bar(context, foo_id, newbar):
- subq = (model_query(context, models.Foo.id).
+ subq = (model_query(models.Foo.id, context.session).
filter_by(id=foo_id).
limit(1).
subquery())
- (model_query(context, models.Bar).
+ (model_query(models.Bar, context.session).
filter_by(id=subq.as_scalar()).
update({'bar': newbar}))
@@ -119,87 +94,54 @@ Recommended ways to use sessions within this framework:
WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
.. note:: `create_duplicate_foo` is a trivially simple example of catching an
- exception while using ``with session.begin``. Here create two duplicate
+ exception while using a savepoint. Here we create two duplicate
instances with same primary key, must catch the exception out of context
managed by a single session:
.. code-block:: python
+ @enginefacade.writer
def create_duplicate_foo(context):
foo1 = models.Foo()
foo2 = models.Foo()
foo1.id = foo2.id = 1
- session = sessionmaker()
try:
- with session.begin():
+ with context.session.begin_nested():
session.add(foo1)
session.add(foo2)
except exception.DBDuplicateEntry as e:
handle_error(e)
-* Passing an active session between methods. Sessions should only be passed
- to private methods. The private method must use a subtransaction; otherwise
- SQLAlchemy will throw an error when you call `session.begin()` on an existing
- transaction. Public methods should not accept a session parameter and should
- not be involved in sessions within the caller's scope.
-
- Note that this incurs more overhead in SQLAlchemy than the above means
- due to nesting transactions, and it is not possible to implicitly retry
- failed database operations when using this approach.
-
- This also makes code somewhat more difficult to read and debug, because a
- single database transaction spans more than one method. Error handling
- becomes less clear in this situation. When this is needed for code clarity,
- it should be clearly documented.
+* The enginefacade system eliminates the need to decide when sessions need
+ to be passed between methods. All methods should instead share a common
+ context object; the enginefacade system will maintain the transaction
+ across method calls.
.. code-block:: python
- def myfunc(foo):
- session = sessionmaker()
- with session.begin():
- # do some database things
- bar = _private_func(foo, session)
+ @enginefacade.writer
+ def myfunc(context, foo):
+ # do some database things
+ bar = _private_func(context, foo)
return bar
- def _private_func(foo, session=None):
- if not session:
- session = sessionmaker()
- with session.begin(subtransaction=True):
+ def _private_func(context, foo):
+ with enginefacade.using_writer(context) as session:
# do some other database things
+ session.add(SomeObject())
return bar
-There are some things which it is best to avoid:
-
-* Don't keep a transaction open any longer than necessary.
-
- This means that your ``with session.begin()`` block should be as short
- as possible, while still containing all the related calls for that
- transaction.
-
* Avoid ``with_lockmode('UPDATE')`` when possible.
- In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match
- any rows, it will take a gap-lock. This is a form of write-lock on the
- "gap" where no rows exist, and prevents any other writes to that space.
- This can effectively prevent any INSERT into a table by locking the gap
- at the end of the index. Similar problems will occur if the SELECT FOR UPDATE
- has an overly broad WHERE clause, or doesn't properly use an index.
-
- One idea proposed at ODS Fall '12 was to use a normal SELECT to test the
- number of rows matching a query, and if only one row is returned,
- then issue the SELECT FOR UPDATE.
-
- The better long-term solution is to use
- ``INSERT .. ON DUPLICATE KEY UPDATE``.
- However, this can not be done until the "deleted" columns are removed and
- proper UNIQUE constraints are added to the tables.
-
+ FOR UPDATE is not compatible with MySQL/Galera. Instead, an "opportunistic"
+ approach should be used, such that if an UPDATE fails, the entire
+ transaction should be retried. The @wrap_db_retry decorator is one
+ such system that can be used to achieve this.
Enabling soft deletes:
-* To use/enable soft-deletes, the `SoftDeleteMixin` must be added
- to your model class. For example:
+* To use/enable soft-deletes, `SoftDeleteMixin` may be used. For example:
.. code-block:: python
@@ -209,698 +151,46 @@ Enabling soft deletes:
Efficient use of soft deletes:
-* There are two possible ways to mark a record as deleted:
- `model.soft_delete()` and `query.soft_delete()`.
-
- The `model.soft_delete()` method works with a single already-fetched entry.
- `query.soft_delete()` makes only one db request for all entries that
- correspond to the query.
-
-* In almost all cases you should use `query.soft_delete()`. Some examples:
+* While there is a ``model.soft_delete()`` method, prefer
+ ``query.soft_delete()``. Some examples:
.. code-block:: python
- def soft_delete_bar():
- count = model_query(BarModel).find(some_condition).soft_delete()
+ @enginefacade.writer
+ def soft_delete_bar(context):
+ # synchronize_session=False will prevent the ORM from attempting
+ # to search the Session for instances matching the DELETE;
+ # this is typically not necessary for small operations.
+ count = model_query(BarModel, context.session).\\
+ find(some_condition).soft_delete(synchronize_session=False)
if count == 0:
raise Exception("0 entries were soft deleted")
- def complex_soft_delete_with_synchronization_bar(session=None):
- if session is None:
- session = sessionmaker()
- with session.begin(subtransactions=True):
- count = (model_query(BarModel).
- find(some_condition).
- soft_delete(synchronize_session=True))
- # Here synchronize_session is required, because we
- # don't know what is going on in outer session.
- if count == 0:
- raise Exception("0 entries were soft deleted")
-
-* There is only one situation where `model.soft_delete()` is appropriate: when
- you fetch a single record, work with it, and mark it as deleted in the same
- transaction.
-
- .. code-block:: python
-
- def soft_delete_bar_model():
- session = sessionmaker()
- with session.begin():
- bar_ref = model_query(BarModel).find(some_condition).first()
- # Work with bar_ref
- bar_ref.soft_delete(session=session)
-
- However, if you need to work with all entries that correspond to query and
- then soft delete them you should use the `query.soft_delete()` method:
+ @enginefacade.writer
+ def complex_soft_delete_with_synchronization_bar(context):
+ # use synchronize_session='evaluate' when you'd like to attempt
+ # to update the state of the Session to match that of the DELETE.
+ # This is potentially helpful if the operation is complex and
+ # continues to work with instances that were loaded, though
+ # not usually needed.
+ count = (model_query(BarModel, context.session).
+ find(some_condition).
+ soft_delete(synchronize_session='evaulate'))
+ if count == 0:
+ raise Exception("0 entries were soft deleted")
- .. code-block:: python
- def soft_delete_multi_models():
- session = sessionmaker()
- with session.begin():
- query = (model_query(BarModel, session=session).
- find(some_condition))
- model_refs = query.all()
- # Work with model_refs
- query.soft_delete(synchronize_session=False)
- # synchronize_session=False should be set if there is no outer
- # session and these entries are not used after this.
-
- When working with many rows, it is very important to use query.soft_delete,
- which issues a single query. Using `model.soft_delete()`, as in the following
- example, is very inefficient.
+"""
- .. code-block:: python
+from oslo_db.sqlalchemy import enginefacade
+from oslo_db.sqlalchemy import engines
+from oslo_db.sqlalchemy import orm
- for bar_ref in bar_refs:
- bar_ref.soft_delete(session=session)
- # This will produce count(bar_refs) db requests.
+EngineFacade = enginefacade.LegacyEngineFacade
+create_engine = engines.create_engine
+get_maker = orm.get_maker
+Query = orm.Query
+Session = orm.Session
-"""
-import itertools
-import logging
-import os
-import re
-import time
-
-from oslo_utils import timeutils
-import six
-from sqlalchemy import event
-from sqlalchemy import exc
-import sqlalchemy.orm
-from sqlalchemy import pool
-from sqlalchemy.sql.expression import literal_column
-from sqlalchemy.sql.expression import select
-
-from oslo_db._i18n import _LW
-from oslo_db import exception
-from oslo_db import options
-from oslo_db.sqlalchemy import exc_filters
-from oslo_db.sqlalchemy import update_match
-from oslo_db.sqlalchemy import utils
-
-LOG = logging.getLogger(__name__)
-
-
-def _thread_yield(dbapi_con, con_record):
- """Ensure other greenthreads get a chance to be executed.
-
- If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
- execute instead of time.sleep(0).
- Force a context switch. With common database backends (eg MySQLdb and
- sqlite), there is no implicit yield caused by network I/O since they are
- implemented by C libraries that eventlet cannot monkey patch.
- """
- time.sleep(0)
-
-
-def _connect_ping_listener(connection, branch):
- """Ping the server at connection startup.
-
- Ping the server at transaction begin and transparently reconnect
- if a disconnect exception occurs.
- """
- if branch:
- return
-
- # turn off "close with result". This can also be accomplished
- # by branching the connection, however just setting the flag is
- # more performant and also doesn't get involved with some
- # connection-invalidation awkardness that occurs (see
- # https://bitbucket.org/zzzeek/sqlalchemy/issue/3215/)
- save_should_close_with_result = connection.should_close_with_result
- connection.should_close_with_result = False
- try:
- # run a SELECT 1. use a core select() so that
- # any details like that needed by Oracle, DB2 etc. are handled.
- connection.scalar(select([1]))
- except exception.DBConnectionError:
- # catch DBConnectionError, which is raised by the filter
- # system.
- # disconnect detected. The connection is now
- # "invalid", but the pool should be ready to return
- # new connections assuming they are good now.
- # run the select again to re-validate the Connection.
- connection.scalar(select([1]))
- finally:
- connection.should_close_with_result = save_should_close_with_result
-
-
-def _setup_logging(connection_debug=0):
- """setup_logging function maps SQL debug level to Python log level.
-
- Connection_debug is a verbosity of SQL debugging information.
- 0=None(default value),
- 1=Processed only messages with WARNING level or higher
- 50=Processed only messages with INFO level or higher
- 100=Processed only messages with DEBUG level
- """
- if connection_debug >= 0:
- logger = logging.getLogger('sqlalchemy.engine')
- if connection_debug >= 100:
- logger.setLevel(logging.DEBUG)
- elif connection_debug >= 50:
- logger.setLevel(logging.INFO)
- else:
- logger.setLevel(logging.WARNING)
-
-
-def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
- idle_timeout=3600,
- connection_debug=0, max_pool_size=None, max_overflow=None,
- pool_timeout=None, sqlite_synchronous=True,
- connection_trace=False, max_retries=10, retry_interval=10,
- thread_checkin=True, logging_name=None):
- """Return a new SQLAlchemy engine."""
-
- url = sqlalchemy.engine.url.make_url(sql_connection)
-
- engine_args = {
- "pool_recycle": idle_timeout,
- 'convert_unicode': True,
- 'connect_args': {},
- 'logging_name': logging_name
- }
-
- _setup_logging(connection_debug)
-
- _init_connection_args(
- url, engine_args,
- sqlite_fk=sqlite_fk,
- max_pool_size=max_pool_size,
- max_overflow=max_overflow,
- pool_timeout=pool_timeout
- )
-
- engine = sqlalchemy.create_engine(url, **engine_args)
-
- _init_events(
- engine,
- mysql_sql_mode=mysql_sql_mode,
- sqlite_synchronous=sqlite_synchronous,
- sqlite_fk=sqlite_fk,
- thread_checkin=thread_checkin,
- connection_trace=connection_trace
- )
-
- # register alternate exception handler
- exc_filters.register_engine(engine)
-
- # register engine connect handler
- event.listen(engine, "engine_connect", _connect_ping_listener)
-
- # initial connect + test
- # NOTE(viktors): the current implementation of _test_connection()
- # does nothing, if max_retries == 0, so we can skip it
- if max_retries:
- test_conn = _test_connection(engine, max_retries, retry_interval)
- test_conn.close()
-
- return engine
-
-
-@utils.dispatch_for_dialect('*', multiple=True)
-def _init_connection_args(
- url, engine_args,
- max_pool_size=None, max_overflow=None, pool_timeout=None, **kw):
-
- pool_class = url.get_dialect().get_pool_class(url)
- if issubclass(pool_class, pool.QueuePool):
- if max_pool_size is not None:
- engine_args['pool_size'] = max_pool_size
- if max_overflow is not None:
- engine_args['max_overflow'] = max_overflow
- if pool_timeout is not None:
- engine_args['pool_timeout'] = pool_timeout
-
-
-@_init_connection_args.dispatch_for("sqlite")
-def _init_connection_args(url, engine_args, **kw):
- pool_class = url.get_dialect().get_pool_class(url)
- # singletonthreadpool is used for :memory: connections;
- # replace it with StaticPool.
- if issubclass(pool_class, pool.SingletonThreadPool):
- engine_args["poolclass"] = pool.StaticPool
- engine_args['connect_args']['check_same_thread'] = False
-
-
-@_init_connection_args.dispatch_for("postgresql")
-def _init_connection_args(url, engine_args, **kw):
- if 'client_encoding' not in url.query:
- # Set encoding using engine_args instead of connect_args since
- # it's supported for PostgreSQL 8.*. More details at:
- # http://docs.sqlalchemy.org/en/rel_0_9/dialects/postgresql.html
- engine_args['client_encoding'] = 'utf8'
-
-
-@_init_connection_args.dispatch_for("mysql")
-def _init_connection_args(url, engine_args, **kw):
- if 'charset' not in url.query:
- engine_args['connect_args']['charset'] = 'utf8'
-
-
-@_init_connection_args.dispatch_for("mysql+mysqlconnector")
-def _init_connection_args(url, engine_args, **kw):
- # mysqlconnector engine (<1.0) incorrectly defaults to
- # raise_on_warnings=True
- # https://bitbucket.org/zzzeek/sqlalchemy/issue/2515
- if 'raise_on_warnings' not in url.query:
- engine_args['connect_args']['raise_on_warnings'] = False
-
-
-@_init_connection_args.dispatch_for("mysql+mysqldb")
-@_init_connection_args.dispatch_for("mysql+oursql")
-def _init_connection_args(url, engine_args, **kw):
- # Those drivers require use_unicode=0 to avoid performance drop due
- # to internal usage of Python unicode objects in the driver
- # http://docs.sqlalchemy.org/en/rel_0_9/dialects/mysql.html
- if 'use_unicode' not in url.query:
- engine_args['connect_args']['use_unicode'] = 0
-
-
-@utils.dispatch_for_dialect('*', multiple=True)
-def _init_events(engine, thread_checkin=True, connection_trace=False, **kw):
- """Set up event listeners for all database backends."""
-
- _add_process_guards(engine)
-
- if connection_trace:
- _add_trace_comments(engine)
-
- if thread_checkin:
- sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
-
-
-@_init_events.dispatch_for("mysql")
-def _init_events(engine, mysql_sql_mode=None, **kw):
- """Set up event listeners for MySQL."""
-
- if mysql_sql_mode is not None:
- @sqlalchemy.event.listens_for(engine, "connect")
- def _set_session_sql_mode(dbapi_con, connection_rec):
- cursor = dbapi_con.cursor()
- cursor.execute("SET SESSION sql_mode = %s", [mysql_sql_mode])
-
- @sqlalchemy.event.listens_for(engine, "first_connect")
- def _check_effective_sql_mode(dbapi_con, connection_rec):
- if mysql_sql_mode is not None:
- _set_session_sql_mode(dbapi_con, connection_rec)
-
- cursor = dbapi_con.cursor()
- cursor.execute("SHOW VARIABLES LIKE 'sql_mode'")
- realmode = cursor.fetchone()
-
- if realmode is None:
- LOG.warning(_LW('Unable to detect effective SQL mode'))
- else:
- realmode = realmode[1]
- LOG.debug('MySQL server mode set to %s', realmode)
- if 'TRADITIONAL' not in realmode.upper() and \
- 'STRICT_ALL_TABLES' not in realmode.upper():
- LOG.warning(
- _LW(
- "MySQL SQL mode is '%s', "
- "consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
- realmode)
-
-
-@_init_events.dispatch_for("sqlite")
-def _init_events(engine, sqlite_synchronous=True, sqlite_fk=False, **kw):
- """Set up event listeners for SQLite.
-
- This includes several settings made on connections as they are
- created, as well as transactional control extensions.
-
- """
-
- def regexp(expr, item):
- reg = re.compile(expr)
- return reg.search(six.text_type(item)) is not None
-
- @sqlalchemy.event.listens_for(engine, "connect")
- def _sqlite_connect_events(dbapi_con, con_record):
-
- # Add REGEXP functionality on SQLite connections
- dbapi_con.create_function('regexp', 2, regexp)
-
- if not sqlite_synchronous:
- # Switch sqlite connections to non-synchronous mode
- dbapi_con.execute("PRAGMA synchronous = OFF")
-
- # Disable pysqlite's emitting of the BEGIN statement entirely.
- # Also stops it from emitting COMMIT before any DDL.
- # below, we emit BEGIN ourselves.
- # see http://docs.sqlalchemy.org/en/rel_0_9/dialects/\
- # sqlite.html#serializable-isolation-savepoints-transactional-ddl
- dbapi_con.isolation_level = None
-
- if sqlite_fk:
- # Ensures that the foreign key constraints are enforced in SQLite.
- dbapi_con.execute('pragma foreign_keys=ON')
-
- @sqlalchemy.event.listens_for(engine, "begin")
- def _sqlite_emit_begin(conn):
- # emit our own BEGIN, checking for existing
- # transactional state
- if 'in_transaction' not in conn.info:
- conn.execute("BEGIN")
- conn.info['in_transaction'] = True
-
- @sqlalchemy.event.listens_for(engine, "rollback")
- @sqlalchemy.event.listens_for(engine, "commit")
- def _sqlite_end_transaction(conn):
- # remove transactional marker
- conn.info.pop('in_transaction', None)
-
-
-def _test_connection(engine, max_retries, retry_interval):
- if max_retries == -1:
- attempts = itertools.count()
- else:
- attempts = six.moves.range(max_retries)
- # See: http://legacy.python.org/dev/peps/pep-3110/#semantic-changes for
- # why we are not using 'de' directly (it can be removed from the local
- # scope).
- de_ref = None
- for attempt in attempts:
- try:
- return engine.connect()
- except exception.DBConnectionError as de:
- msg = _LW('SQL connection failed. %s attempts left.')
- LOG.warning(msg, max_retries - attempt)
- time.sleep(retry_interval)
- de_ref = de
- else:
- if de_ref is not None:
- six.reraise(type(de_ref), de_ref)
-
-
-class Query(sqlalchemy.orm.query.Query):
- """Subclass of sqlalchemy.query with soft_delete() method."""
- def soft_delete(self, synchronize_session='evaluate'):
- return self.update({'deleted': literal_column('id'),
- 'updated_at': literal_column('updated_at'),
- 'deleted_at': timeutils.utcnow()},
- synchronize_session=synchronize_session)
-
- def update_returning_pk(self, values, surrogate_key):
- """Perform an UPDATE, returning the primary key of the matched row.
-
- This is a method-version of
- oslo_db.sqlalchemy.update_match.update_returning_pk(); see that
- function for usage details.
-
- """
- return update_match.update_returning_pk(self, values, surrogate_key)
-
- def update_on_match(self, specimen, surrogate_key, values, **kw):
- """Emit an UPDATE statement matching the given specimen.
-
- This is a method-version of
- oslo_db.sqlalchemy.update_match.update_on_match(); see that function
- for usage details.
-
- """
- return update_match.update_on_match(
- self, specimen, surrogate_key, values, **kw)
-
-
-class Session(sqlalchemy.orm.session.Session):
- """Custom Session class to avoid SqlAlchemy Session monkey patching."""
-
-
-def get_maker(engine, autocommit=True, expire_on_commit=False):
- """Return a SQLAlchemy sessionmaker using the given engine."""
- return sqlalchemy.orm.sessionmaker(bind=engine,
- class_=Session,
- autocommit=autocommit,
- expire_on_commit=expire_on_commit,
- query_cls=Query)
-
-
-def _add_process_guards(engine):
- """Add multiprocessing guards.
-
- Forces a connection to be reconnected if it is detected
- as having been shared to a sub-process.
-
- """
-
- @sqlalchemy.event.listens_for(engine, "connect")
- def connect(dbapi_connection, connection_record):
- connection_record.info['pid'] = os.getpid()
-
- @sqlalchemy.event.listens_for(engine, "checkout")
- def checkout(dbapi_connection, connection_record, connection_proxy):
- pid = os.getpid()
- if connection_record.info['pid'] != pid:
- LOG.debug(_LW(
- "Parent process %(orig)s forked (%(newproc)s) with an open "
- "database connection, "
- "which is being discarded and recreated."),
- {"newproc": pid, "orig": connection_record.info['pid']})
- connection_record.connection = connection_proxy.connection = None
- raise exc.DisconnectionError(
- "Connection record belongs to pid %s, "
- "attempting to check out in pid %s" %
- (connection_record.info['pid'], pid)
- )
-
-
-def _add_trace_comments(engine):
- """Add trace comments.
-
- Augment statements with a trace of the immediate calling code
- for a given statement.
- """
-
- import os
- import sys
- import traceback
- target_paths = set([
- os.path.dirname(sys.modules['oslo_db'].__file__),
- os.path.dirname(sys.modules['sqlalchemy'].__file__)
- ])
-
- @sqlalchemy.event.listens_for(engine, "before_cursor_execute", retval=True)
- def before_cursor_execute(conn, cursor, statement, parameters, context,
- executemany):
-
- # NOTE(zzzeek) - if different steps per DB dialect are desirable
- # here, switch out on engine.name for now.
- stack = traceback.extract_stack()
- our_line = None
- for idx, (filename, line, method, function) in enumerate(stack):
- for tgt in target_paths:
- if filename.startswith(tgt):
- our_line = idx
- break
- if our_line:
- break
-
- if our_line:
- trace = "; ".join(
- "File: %s (%s) %s" % (
- line[0], line[1], line[2]
- )
- # include three lines of context.
- for line in stack[our_line - 3:our_line]
-
- )
- statement = "%s -- %s" % (statement, trace)
-
- return statement, parameters
-
-
-class EngineFacade(object):
- """A helper class for removing of global engine instances from oslo.db.
-
- As a library, oslo.db can't decide where to store/when to create engine
- and sessionmaker instances, so this must be left for a target application.
-
- On the other hand, in order to simplify the adoption of oslo.db changes,
- we'll provide a helper class, which creates engine and sessionmaker
- on its instantiation and provides get_engine()/get_session() methods
- that are compatible with corresponding utility functions that currently
- exist in target projects, e.g. in Nova.
-
- engine/sessionmaker instances will still be global (and they are meant to
- be global), but they will be stored in the app context, rather that in the
- oslo.db context.
-
- Note: using of this helper is completely optional and you are encouraged to
- integrate engine/sessionmaker instances into your apps any way you like
- (e.g. one might want to bind a session to a request context). Two important
- things to remember:
-
- 1. An Engine instance is effectively a pool of DB connections, so it's
- meant to be shared (and it's thread-safe).
- 2. A Session instance is not meant to be shared and represents a DB
- transactional context (i.e. it's not thread-safe). sessionmaker is
- a factory of sessions.
-
- """
-
- def __init__(self, sql_connection, slave_connection=None,
- sqlite_fk=False, autocommit=True,
- expire_on_commit=False, **kwargs):
- """Initialize engine and sessionmaker instances.
-
- :param sql_connection: the connection string for the database to use
- :type sql_connection: string
-
- :param slave_connection: the connection string for the 'slave' database
- to use. If not provided, the master database
- will be used for all operations. Note: this
- is meant to be used for offloading of read
- operations to asynchronously replicated slaves
- to reduce the load on the master database.
- :type slave_connection: string
-
- :param sqlite_fk: enable foreign keys in SQLite
- :type sqlite_fk: bool
-
- :param autocommit: use autocommit mode for created Session instances
- :type autocommit: bool
-
- :param expire_on_commit: expire session objects on commit
- :type expire_on_commit: bool
-
- Keyword arguments:
-
- :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
- (defaults to TRADITIONAL)
- :keyword idle_timeout: timeout before idle sql connections are reaped
- (defaults to 3600)
- :keyword connection_debug: verbosity of SQL debugging information.
- -1=Off, 0=None, 100=Everything (defaults
- to 0)
- :keyword max_pool_size: maximum number of SQL connections to keep open
- in a pool (defaults to SQLAlchemy settings)
- :keyword max_overflow: if set, use this value for max_overflow with
- sqlalchemy (defaults to SQLAlchemy settings)
- :keyword pool_timeout: if set, use this value for pool_timeout with
- sqlalchemy (defaults to SQLAlchemy settings)
- :keyword sqlite_synchronous: if True, SQLite uses synchronous mode
- (defaults to True)
- :keyword connection_trace: add python stack traces to SQL as comment
- strings (defaults to False)
- :keyword max_retries: maximum db connection retries during startup.
- (setting -1 implies an infinite retry count)
- (defaults to 10)
- :keyword retry_interval: interval between retries of opening a sql
- connection (defaults to 10)
- :keyword thread_checkin: boolean that indicates that between each
- engine checkin event a sleep(0) will occur to
- allow other greenthreads to run (defaults to
- True)
- """
-
- super(EngineFacade, self).__init__()
-
- engine_kwargs = {
- 'sqlite_fk': sqlite_fk,
- 'mysql_sql_mode': kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
- 'idle_timeout': kwargs.get('idle_timeout', 3600),
- 'connection_debug': kwargs.get('connection_debug', 0),
- 'max_pool_size': kwargs.get('max_pool_size'),
- 'max_overflow': kwargs.get('max_overflow'),
- 'pool_timeout': kwargs.get('pool_timeout'),
- 'sqlite_synchronous': kwargs.get('sqlite_synchronous', True),
- 'connection_trace': kwargs.get('connection_trace', False),
- 'max_retries': kwargs.get('max_retries', 10),
- 'retry_interval': kwargs.get('retry_interval', 10),
- 'thread_checkin': kwargs.get('thread_checkin', True)
- }
- maker_kwargs = {
- 'autocommit': autocommit,
- 'expire_on_commit': expire_on_commit
- }
-
- self._engine = create_engine(sql_connection=sql_connection,
- **engine_kwargs)
- self._session_maker = get_maker(engine=self._engine,
- **maker_kwargs)
- if slave_connection:
- self._slave_engine = create_engine(sql_connection=slave_connection,
- **engine_kwargs)
- self._slave_session_maker = get_maker(engine=self._slave_engine,
- **maker_kwargs)
- else:
- self._slave_engine = None
- self._slave_session_maker = None
-
- def get_engine(self, use_slave=False):
- """Get the engine instance (note, that it's shared).
-
- :param use_slave: if possible, use 'slave' database for this engine.
- If the connection string for the slave database
- wasn't provided, 'master' engine will be returned.
- (defaults to False)
- :type use_slave: bool
-
- """
-
- if use_slave and self._slave_engine:
- return self._slave_engine
-
- return self._engine
-
- def get_session(self, use_slave=False, **kwargs):
- """Get a Session instance.
-
- :param use_slave: if possible, use 'slave' database connection for
- this session. If the connection string for the
- slave database wasn't provided, a session bound
- to the 'master' engine will be returned.
- (defaults to False)
- :type use_slave: bool
-
- Keyword arugments will be passed to a sessionmaker instance as is (if
- passed, they will override the ones used when the sessionmaker instance
- was created). See SQLAlchemy Session docs for details.
-
- """
-
- if use_slave and self._slave_session_maker:
- return self._slave_session_maker(**kwargs)
-
- return self._session_maker(**kwargs)
-
- @classmethod
- def from_config(cls, conf,
- sqlite_fk=False, autocommit=True, expire_on_commit=False):
- """Initialize EngineFacade using oslo.config config instance options.
-
- :param conf: oslo.config config instance
- :type conf: oslo.config.cfg.ConfigOpts
-
- :param sqlite_fk: enable foreign keys in SQLite
- :type sqlite_fk: bool
-
- :param autocommit: use autocommit mode for created Session instances
- :type autocommit: bool
-
- :param expire_on_commit: expire session objects on commit
- :type expire_on_commit: bool
-
- """
-
- conf.register_opts(options.database_opts, 'database')
-
- return cls(sql_connection=conf.database.connection,
- slave_connection=conf.database.slave_connection,
- sqlite_fk=sqlite_fk,
- autocommit=autocommit,
- expire_on_commit=expire_on_commit,
- mysql_sql_mode=conf.database.mysql_sql_mode,
- idle_timeout=conf.database.idle_timeout,
- connection_debug=conf.database.connection_debug,
- max_pool_size=conf.database.max_pool_size,
- max_overflow=conf.database.max_overflow,
- pool_timeout=conf.database.pool_timeout,
- sqlite_synchronous=conf.database.sqlite_synchronous,
- connection_trace=conf.database.connection_trace,
- max_retries=conf.database.max_retries,
- retry_interval=conf.database.retry_interval)
+__all__ = ["EngineFacade", "create_engine", "get_maker", "Query", "Session"]
diff --git a/oslo_db/sqlalchemy/test_base.py b/oslo_db/sqlalchemy/test_base.py
index fbe25a8..2ff9a55 100644
--- a/oslo_db/sqlalchemy/test_base.py
+++ b/oslo_db/sqlalchemy/test_base.py
@@ -28,6 +28,7 @@ import os
import six
from oslo_db import exception
+from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import utils
@@ -72,9 +73,15 @@ class DbFixture(fixtures.Fixture):
else:
self.test.engine = self.test.db.engine
self.test.sessionmaker = session.get_maker(self.test.engine)
+
self.addCleanup(setattr, self.test, 'sessionmaker', None)
self.addCleanup(setattr, self.test, 'engine', None)
+ self.test.enginefacade = enginefacade._TestTransactionFactory(
+ self.test.engine, self.test.sessionmaker, apply_global=True,
+ synchronous_reader=True)
+ self.addCleanup(self.test.enginefacade.dispose_global)
+
class DbTestCase(test_base.BaseTestCase):
"""Base class for testing of DB code.
diff --git a/oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py b/oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py
index 0b1bb89..7738254 100644
--- a/oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py
+++ b/oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py
@@ -25,8 +25,8 @@ from sqlalchemy.orm import mapper
from oslo.db import exception
from oslo.db.sqlalchemy import exc_filters
from oslo.db.sqlalchemy import test_base
-from oslo_db.sqlalchemy import session as private_session
-from oslo_db.tests.old_import_api import utils as test_utils
+from oslo_db.sqlalchemy import engines
+from oslo_db.tests import utils as test_utils
_TABLE_NAME = '__tmp__test__tmp__'
@@ -720,7 +720,7 @@ class TestDBDisconnected(TestsExceptionFilter):
engine = self.engine
event.listen(
- engine, "engine_connect", private_session._connect_ping_listener)
+ engine, "engine_connect", engines._connect_ping_listener)
real_do_execute = engine.dialect.do_execute
counter = itertools.count(1)
@@ -816,7 +816,7 @@ class TestDBConnectRetry(TestsExceptionFilter):
with self._dbapi_fixture(dialect_name):
with mock.patch.object(engine.dialect, "connect", cant_connect):
- return private_session._test_connection(engine, retries, .01)
+ return engines._test_connection(engine, retries, .01)
def test_connect_no_retries(self):
conn = self._run_test(
diff --git a/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py b/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py
index d1f10b7..1aa8e59 100644
--- a/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py
+++ b/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py
@@ -29,11 +29,11 @@ from sqlalchemy import Integer, String
from sqlalchemy.ext.declarative import declarative_base
from oslo.db import exception
+from oslo.db import options as db_options
from oslo.db.sqlalchemy import models
from oslo.db.sqlalchemy import session
from oslo.db.sqlalchemy import test_base
-from oslo_db import options as db_options
-from oslo_db.sqlalchemy import session as private_session
+from oslo_db.sqlalchemy import engines
BASE = declarative_base()
@@ -300,8 +300,8 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
self.assertFalse(ses.autocommit)
self.assertTrue(ses.expire_on_commit)
- @mock.patch('oslo_db.sqlalchemy.session.get_maker')
- @mock.patch('oslo_db.sqlalchemy.session.create_engine')
+ @mock.patch('oslo_db.sqlalchemy.orm.get_maker')
+ @mock.patch('oslo_db.sqlalchemy.engines.create_engine')
def test_creation_from_config(self, create_engine, get_maker):
conf = cfg.ConfigOpts()
conf.register_opts(db_options.database_opts, group='database')
@@ -339,6 +339,42 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
autocommit=False,
expire_on_commit=True)
+ @mock.patch('oslo_db.sqlalchemy.orm.get_maker')
+ @mock.patch('oslo_db.sqlalchemy.engines.create_engine')
+ def test_passed_in_url_overrides_conf(self, create_engine, get_maker):
+ conf = cfg.ConfigOpts()
+ conf.register_opts(db_options.database_opts, group='database')
+
+ overrides = {
+ 'connection': 'sqlite:///conf_db_setting',
+ 'connection_debug': 100,
+ 'max_pool_size': 10,
+ 'mysql_sql_mode': 'TRADITIONAL',
+ }
+ for optname, optvalue in overrides.items():
+ conf.set_override(optname, optvalue, group='database')
+
+ session.EngineFacade(
+ "sqlite:///override_sql",
+ **dict(conf.database.items())
+ )
+
+ create_engine.assert_called_once_with(
+ sql_connection='sqlite:///override_sql',
+ connection_debug=100,
+ max_pool_size=10,
+ mysql_sql_mode='TRADITIONAL',
+ sqlite_fk=False,
+ idle_timeout=mock.ANY,
+ retry_interval=mock.ANY,
+ max_retries=mock.ANY,
+ max_overflow=mock.ANY,
+ connection_trace=mock.ANY,
+ sqlite_synchronous=mock.ANY,
+ pool_timeout=mock.ANY,
+ thread_checkin=mock.ANY,
+ )
+
def test_slave_connection(self):
paths = self.create_tempfiles([('db.master', ''), ('db.slave', '')],
ext='')
@@ -483,9 +519,7 @@ class MysqlConnectTest(test_base.MySQLOpportunisticTestCase):
)
)
):
- private_session._init_events.dispatch_on_drivername("mysql")(
- test_engine
- )
+ engines._init_events.dispatch_on_drivername("mysql")(test_engine)
test_engine.raw_connection()
self.assertIn('Unable to detect effective SQL mode',
@@ -553,7 +587,7 @@ class PatchStacktraceTest(test_base.DbTestCase):
with mock.patch("traceback.extract_stack", side_effect=extract_stack):
- private_session._add_trace_comments(engine)
+ engines._add_trace_comments(engine)
conn = engine.connect()
with mock.patch.object(engine.dialect, "do_execute") as mock_exec:
diff --git a/oslo_db/tests/sqlalchemy/test_enginefacade.py b/oslo_db/tests/sqlalchemy/test_enginefacade.py
new file mode 100644
index 0000000..a222565
--- /dev/null
+++ b/oslo_db/tests/sqlalchemy/test_enginefacade.py
@@ -0,0 +1,1646 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import contextlib
+import copy
+import warnings
+
+import mock
+from oslo_config import cfg
+from oslo_context import context as oslo_context
+from oslotest import base as oslo_test_base
+from sqlalchemy import Column
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy.orm import mapper
+from sqlalchemy import select
+from sqlalchemy import String
+from sqlalchemy import Table
+
+from oslo_db import exception
+from oslo_db import options
+from oslo_db.sqlalchemy import enginefacade
+from oslo_db.sqlalchemy import engines as oslo_engines
+from oslo_db.sqlalchemy import orm
+from oslo_db.sqlalchemy import test_base
+
+
+class SingletonOnName(mock.MagicMock):
+ def __init__(self, the_name, **kw):
+ super(SingletonOnName, self).__init__(
+ __eq__=lambda self, other: other._assert_name == self._assert_name,
+ _assert_name=the_name,
+ **kw
+ )
+
+ def __deepcopy__(self, memo):
+ return self
+
+
+class SingletonConnection(SingletonOnName):
+ def __init__(self, **kw):
+ super(SingletonConnection, self).__init__(
+ "connection", **kw)
+
+
+class SingletonEngine(SingletonOnName):
+ def __init__(self, connection, **kw):
+ super(SingletonEngine, self).__init__(
+ "engine",
+ connect=mock.Mock(return_value=connection),
+ url=connection,
+ _assert_connection=connection,
+ **kw
+ )
+
+
+class NonDecoratedContext(object):
+ """a Context object that's not run through transaction_context_provider."""
+
+
+class AssertDataSource(collections.namedtuple(
+ "AssertDataSource", ["writer", "reader", "async_reader"])):
+
+ def element_for_writer(self, const):
+ if const is enginefacade._WRITER:
+ return self.writer
+ elif const is enginefacade._READER:
+ return self.reader
+ elif const is enginefacade._ASYNC_READER:
+ return self.async_reader
+ else:
+ assert False, "Unknown constant: %s" % const
+
+
+class MockFacadeTest(oslo_test_base.BaseTestCase):
+ """test by applying mocks to internal call-points.
+
+ This applies mocks to
+ oslo.db.sqlalchemy.engines.create_engine() and
+ oslo.db.sqlalchemy.orm.get_maker(), then mocking a
+ _TransactionFactory into
+ oslo.db.sqlalchemy.enginefacade._context_manager._root_factory.
+
+ Various scenarios are run against the enginefacade functions, and the
+ exact calls made against the mock create_engine(), get_maker(), and
+ associated objects are tested exactly against expected calls.
+
+ """
+
+ synchronous_reader = True
+
+ engine_uri = 'some_connection'
+ slave_uri = None
+
+ def setUp(self):
+ super(MockFacadeTest, self).setUp()
+
+ writer_conn = SingletonConnection()
+ writer_engine = SingletonEngine(writer_conn)
+ writer_session = mock.Mock(
+ connection=mock.Mock(return_value=writer_conn))
+ writer_maker = mock.Mock(return_value=writer_session)
+
+ if self.slave_uri:
+ async_reader_conn = SingletonConnection()
+ async_reader_engine = SingletonEngine(async_reader_conn)
+ async_reader_session = mock.Mock(
+ connection=mock.Mock(return_value=async_reader_conn))
+ async_reader_maker = mock.Mock(return_value=async_reader_session)
+
+ else:
+ async_reader_conn = writer_conn
+ async_reader_engine = writer_engine
+ async_reader_session = writer_session
+ async_reader_maker = writer_maker
+
+ if self.synchronous_reader:
+ reader_conn = async_reader_conn
+ reader_engine = async_reader_engine
+ reader_session = async_reader_session
+ reader_maker = async_reader_maker
+ else:
+ reader_conn = writer_conn
+ reader_engine = writer_engine
+ reader_session = writer_session
+ reader_maker = writer_maker
+
+ self.connections = AssertDataSource(
+ writer_conn, reader_conn, async_reader_conn
+ )
+ self.engines = AssertDataSource(
+ writer_engine, reader_engine, async_reader_engine
+ )
+ self.sessions = AssertDataSource(
+ writer_session, reader_session, async_reader_session
+ )
+ self.makers = AssertDataSource(
+ writer_maker, reader_maker, async_reader_maker
+ )
+
+ def get_maker(engine, **kw):
+ if engine is writer_engine:
+ return self.makers.writer
+ elif engine is reader_engine:
+ return self.makers.reader
+ elif engine is async_reader_engine:
+ return self.makers.async_reader
+ else:
+ assert False
+
+ session_patch = mock.patch.object(
+ orm, "get_maker",
+ side_effect=get_maker)
+ self.get_maker = session_patch.start()
+ self.addCleanup(session_patch.stop)
+
+ def create_engine(sql_connection, **kw):
+ if sql_connection == self.engine_uri:
+ return self.engines.writer
+ elif sql_connection == self.slave_uri:
+ return self.engines.async_reader
+ else:
+ assert False
+
+ engine_patch = mock.patch.object(
+ oslo_engines, "create_engine", side_effect=create_engine)
+
+ self.create_engine = engine_patch.start()
+ self.addCleanup(engine_patch.stop)
+
+ self.factory = enginefacade._TransactionFactory()
+ self.factory.configure(
+ synchronous_reader=self.synchronous_reader
+ )
+
+ self.factory.configure(
+ connection=self.engine_uri,
+ slave_connection=self.slave_uri
+ )
+
+ facade_patcher = mock.patch.object(
+ enginefacade._context_manager, "_root_factory", self.factory)
+ facade_patcher.start()
+ self.addCleanup(facade_patcher.stop)
+
+ def _assert_ctx_connection(self, context, connection):
+ self.assertIs(context.connection, connection)
+
+ def _assert_ctx_session(self, context, session):
+ self.assertIs(context.session, session)
+
+ def _assert_non_decorated_ctx_connection(self, context, connection):
+ transaction_ctx = enginefacade._transaction_ctx_for_context(context)
+ self.assertIs(transaction_ctx.connection, connection)
+
+ def _assert_non_decorated_ctx_session(self, context, session):
+ transaction_ctx = enginefacade._transaction_ctx_for_context(context)
+ self.assertIs(transaction_ctx.session, session)
+
+ @contextlib.contextmanager
+ def _assert_engines(self):
+ """produce a mock series of engine calls.
+
+ These are expected to match engine-related calls established
+ by the test subject.
+
+ """
+
+ writer_conn = SingletonConnection()
+ writer_engine = SingletonEngine(writer_conn)
+ if self.slave_uri:
+ async_reader_conn = SingletonConnection()
+ async_reader_engine = SingletonEngine(async_reader_conn)
+ else:
+ async_reader_conn = writer_conn
+ async_reader_engine = writer_engine
+
+ if self.synchronous_reader:
+ reader_engine = async_reader_engine
+ else:
+ reader_engine = writer_engine
+
+ engines = AssertDataSource(
+ writer_engine, reader_engine, async_reader_engine)
+
+ def create_engine(sql_connection, **kw):
+ if sql_connection == self.engine_uri:
+ return engines.writer
+ elif sql_connection == self.slave_uri:
+ return engines.async_reader
+ else:
+ assert False
+
+ engine_factory = mock.Mock(side_effect=create_engine)
+ engine_factory(
+ sql_connection=self.engine_uri,
+ **dict((k, mock.ANY) for k in self.factory._engine_cfg.keys())
+ )
+ if self.slave_uri:
+ engine_factory(
+ sql_connection=self.slave_uri,
+ **dict((k, mock.ANY) for k in self.factory._engine_cfg.keys())
+ )
+
+ yield AssertDataSource(
+ writer_engine, reader_engine, async_reader_engine
+ )
+
+ self.assertEqual(
+ engine_factory.mock_calls,
+ self.create_engine.mock_calls
+ )
+
+ for sym in [
+ enginefacade._WRITER, enginefacade._READER,
+ enginefacade._ASYNC_READER
+ ]:
+ self.assertEqual(
+ engines.element_for_writer(sym).mock_calls,
+ self.engines.element_for_writer(sym).mock_calls
+ )
+
+ def _assert_async_reader_connection(self, engines, session=None):
+ return self._assert_connection(
+ engines, enginefacade._ASYNC_READER, session)
+
+ def _assert_reader_connection(self, engines, session=None):
+ return self._assert_connection(engines, enginefacade._READER, session)
+
+ def _assert_writer_connection(self, engines, session=None):
+ return self._assert_connection(engines, enginefacade._WRITER, session)
+
+ @contextlib.contextmanager
+ def _assert_connection(self, engines, writer, session=None):
+ """produce a mock series of connection calls.
+
+ These are expected to match connection-related calls established
+ by the test subject.
+
+ """
+ if session:
+ connection = session.connection()
+ yield connection
+ else:
+ connection = engines.element_for_writer(writer).connect()
+ trans = connection.begin()
+ yield connection
+ if writer is enginefacade._WRITER:
+ trans.commit()
+ else:
+ trans.rollback()
+ connection.close()
+
+ self.assertEqual(
+ connection.mock_calls,
+ self.connections.element_for_writer(writer).mock_calls)
+
+ @contextlib.contextmanager
+ def _assert_makers(self, engines):
+
+ writer_session = mock.Mock(connection=mock.Mock(
+ return_value=engines.writer._assert_connection)
+ )
+ writer_maker = mock.Mock(return_value=writer_session)
+
+ if self.slave_uri:
+ async_reader_session = mock.Mock(connection=mock.Mock(
+ return_value=engines.async_reader._assert_connection)
+ )
+ async_reader_maker = mock.Mock(return_value=async_reader_session)
+ else:
+ async_reader_session = writer_session
+ async_reader_maker = writer_maker
+
+ if self.synchronous_reader:
+ reader_maker = async_reader_maker
+ else:
+ reader_maker = writer_maker
+
+ makers = AssertDataSource(
+ writer_maker,
+ reader_maker,
+ async_reader_maker,
+ )
+
+ def get_maker(engine, **kw):
+ if engine is engines.writer:
+ return makers.writer
+ elif engine is engines.reader:
+ return makers.reader
+ elif engine is engines.async_reader:
+ return makers.async_reader
+ else:
+ assert False
+
+ maker_factories = mock.Mock(side_effect=get_maker)
+
+ maker_factories(
+ autocommit=True, engine=engines.writer,
+ expire_on_commit=False)
+ if self.slave_uri:
+ maker_factories(
+ autocommit=True, engine=engines.async_reader,
+ expire_on_commit=False)
+
+ yield makers
+
+ self.assertEqual(
+ maker_factories.mock_calls,
+ self.get_maker.mock_calls)
+
+ for sym in [
+ enginefacade._WRITER, enginefacade._READER,
+ enginefacade._ASYNC_READER
+ ]:
+ self.assertEqual(
+ makers.element_for_writer(sym).mock_calls,
+ self.makers.element_for_writer(sym).mock_calls)
+
+ def _assert_async_reader_session(
+ self, makers, connection=None, assert_calls=True):
+ return self._assert_session(
+ makers, enginefacade._ASYNC_READER, connection, assert_calls)
+
+ def _assert_reader_session(
+ self, makers, connection=None, assert_calls=True):
+ return self._assert_session(
+ makers, enginefacade._READER,
+ connection, assert_calls)
+
+ def _assert_writer_session(
+ self, makers, connection=None, assert_calls=True):
+ return self._assert_session(
+ makers, enginefacade._WRITER,
+ connection, assert_calls)
+
+ @contextlib.contextmanager
+ def _assert_session(
+ self, makers, writer, connection=None, assert_calls=True):
+ """produce a mock series of session calls.
+
+ These are expected to match session-related calls established
+ by the test subject.
+
+ """
+
+ if connection:
+ session = makers.element_for_writer(writer)(bind=connection)
+ else:
+ session = makers.element_for_writer(writer)()
+ session.begin()
+ yield session
+ if writer is enginefacade._WRITER:
+ session.commit()
+ elif enginefacade.\
+ _context_manager._factory._transaction_ctx_cfg[
+ 'rollback_reader_sessions']:
+ session.rollback()
+ session.close()
+
+ if assert_calls:
+ self.assertEqual(
+ session.mock_calls,
+ self.sessions.element_for_writer(writer).mock_calls)
+
+ def test_session_reader_decorator(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader
+ def go(context):
+ context.session.execute("test")
+ go(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(makers) as session:
+ session.execute("test")
+
+ def test_connection_reader_decorator(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader.connection
+ def go(context):
+ context.connection.execute("test")
+ go(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_reader_connection(engines) as connection:
+ connection.execute("test")
+
+ def test_session_reader_nested_in_connection_reader(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader.connection
+ def go1(context):
+ context.connection.execute("test1")
+ go2(context)
+
+ @enginefacade.reader
+ def go2(context):
+ context.session.execute("test2")
+ go1(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_reader_connection(engines) as connection:
+ connection.execute("test1")
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(
+ makers, connection) as session:
+ session.execute("test2")
+
+ def test_connection_reader_nested_in_session_reader(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader
+ def go1(context):
+ context.session.execute("test1")
+ go2(context)
+
+ @enginefacade.reader.connection
+ def go2(context):
+ context.connection.execute("test2")
+
+ go1(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(makers) as session:
+ session.execute("test1")
+ with self._assert_reader_connection(
+ engines, session) as connection:
+ connection.execute("test2")
+
+ def test_session_reader_decorator_nested(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader
+ def go1(context):
+ context.session.execute("test1")
+ go2(context)
+
+ @enginefacade.reader
+ def go2(context):
+ context.session.execute("test2")
+ go1(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(makers) as session:
+ session.execute("test1")
+ session.execute("test2")
+
+ def test_reader_nested_in_writer_ok(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.writer
+ def go1(context):
+ context.session.execute("test1")
+ go2(context)
+
+ @enginefacade.reader
+ def go2(context):
+ context.session.execute("test2")
+
+ go1(context)
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_writer_session(makers) as session:
+ session.execute("test1")
+ session.execute("test2")
+
+ def test_writer_nested_in_reader_raises(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader
+ def go1(context):
+ context.session.execute("test1")
+ go2(context)
+
+ @enginefacade.writer
+ def go2(context):
+ context.session.execute("test2")
+
+ exc = self.assertRaises(
+ TypeError, go1, context
+ )
+ self.assertEqual(
+ "Can't upgrade a READER "
+ "transaction to a WRITER mid-transaction",
+ exc.args[0]
+ )
+
+ def test_async_on_writer_raises(self):
+ exc = self.assertRaises(
+ TypeError, getattr, enginefacade.writer, "async"
+ )
+ self.assertEqual(
+ "Setting async on a WRITER makes no sense",
+ exc.args[0]
+ )
+
+ def test_savepoint_and_independent_raises(self):
+ exc = self.assertRaises(
+ TypeError, getattr, enginefacade.writer.independent, "savepoint"
+ )
+ self.assertEqual(
+ "setting savepoint and independent makes no sense.",
+ exc.args[0]
+ )
+
+ def test_reader_nested_in_async_reader_raises(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader.async
+ def go1(context):
+ context.session.execute("test1")
+ go2(context)
+
+ @enginefacade.reader
+ def go2(context):
+ context.session.execute("test2")
+
+ exc = self.assertRaises(
+ TypeError, go1, context
+ )
+ self.assertEqual(
+ "Can't upgrade an ASYNC_READER transaction "
+ "to a READER mid-transaction",
+ exc.args[0]
+ )
+
+ def test_writer_nested_in_async_reader_raises(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader.async
+ def go1(context):
+ context.session.execute("test1")
+ go2(context)
+
+ @enginefacade.writer
+ def go2(context):
+ context.session.execute("test2")
+
+ exc = self.assertRaises(
+ TypeError, go1, context
+ )
+ self.assertEqual(
+ "Can't upgrade an ASYNC_READER transaction to a "
+ "WRITER mid-transaction",
+ exc.args[0]
+ )
+
+ def test_reader_then_writer_ok(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader
+ def go1(context):
+ context.session.execute("test1")
+
+ @enginefacade.writer
+ def go2(context):
+ context.session.execute("test2")
+
+ go1(context)
+ go2(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(
+ makers, assert_calls=False) as session:
+ session.execute("test1")
+ with self._assert_writer_session(makers) as session:
+ session.execute("test2")
+
+ def test_async_reader_then_reader_ok(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader.async
+ def go1(context):
+ context.session.execute("test1")
+
+ @enginefacade.reader
+ def go2(context):
+ context.session.execute("test2")
+
+ go1(context)
+ go2(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_async_reader_session(
+ makers, assert_calls=False) as session:
+ session.execute("test1")
+ with self._assert_reader_session(makers) as session:
+ session.execute("test2")
+
+ def test_using_reader(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.reader.using(context) as session:
+ self._assert_ctx_session(context, session)
+ session.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(makers) as session:
+ session.execute("test1")
+
+ def test_using_reader_rollback_reader_session(self):
+ enginefacade.configure(rollback_reader_sessions=True)
+
+ context = oslo_context.RequestContext()
+
+ with enginefacade.reader.using(context) as session:
+ self._assert_ctx_session(context, session)
+ session.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(makers) as session:
+ session.execute("test1")
+
+ def test_using_writer(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.writer.using(context) as session:
+ self._assert_ctx_session(context, session)
+ session.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_writer_session(makers) as session:
+ session.execute("test1")
+
+ def test_using_writer_no_descriptors(self):
+ context = NonDecoratedContext()
+
+ with enginefacade.writer.using(context) as session:
+ self._assert_non_decorated_ctx_session(context, session)
+ session.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_writer_session(makers) as session:
+ session.execute("test1")
+
+ def test_using_writer_connection_no_descriptors(self):
+ context = NonDecoratedContext()
+
+ with enginefacade.writer.connection.using(context) as connection:
+ self._assert_non_decorated_ctx_connection(context, connection)
+ connection.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_writer_connection(engines) as conn:
+ conn.execute("test1")
+
+ def test_using_reader_connection(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.reader.connection.using(context) as connection:
+ self._assert_ctx_connection(context, connection)
+ connection.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_reader_connection(engines) as conn:
+ conn.execute("test1")
+
+ def test_using_writer_connection(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.writer.connection.using(context) as connection:
+ self._assert_ctx_connection(context, connection)
+ connection.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_writer_connection(engines) as conn:
+ conn.execute("test1")
+
+ def test_context_copied_using_existing_writer_connection(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.writer.connection.using(context) as connection:
+ self._assert_ctx_connection(context, connection)
+ connection.execute("test1")
+
+ ctx2 = copy.deepcopy(context)
+
+ with enginefacade.reader.connection.using(ctx2) as conn2:
+ self.assertIs(conn2, connection)
+ self._assert_ctx_connection(ctx2, conn2)
+
+ conn2.execute("test2")
+
+ with self._assert_engines() as engines:
+ with self._assert_writer_connection(engines) as conn:
+ conn.execute("test1")
+ conn.execute("test2")
+
+ def test_context_nodesc_copied_using_existing_writer_connection(self):
+ context = NonDecoratedContext()
+
+ with enginefacade.writer.connection.using(context) as connection:
+ self._assert_non_decorated_ctx_connection(context, connection)
+ connection.execute("test1")
+
+ ctx2 = copy.deepcopy(context)
+
+ with enginefacade.reader.connection.using(ctx2) as conn2:
+ self.assertIs(conn2, connection)
+ self._assert_non_decorated_ctx_connection(ctx2, conn2)
+
+ conn2.execute("test2")
+
+ with self._assert_engines() as engines:
+ with self._assert_writer_connection(engines) as conn:
+ conn.execute("test1")
+ conn.execute("test2")
+
+ def test_session_context_exception(self):
+ context = oslo_context.RequestContext()
+ exc = self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'session'
+ )
+
+ self.assertRegexpMatches(
+ exc.args[0],
+ "No TransactionContext is established for "
+ "this .*RequestContext.* object within the current "
+ "thread; the 'session' attribute is unavailable."
+ )
+
+ def test_session_context_getattr(self):
+ context = oslo_context.RequestContext()
+ self.assertIsNone(getattr(context, 'session', None))
+
+ def test_connection_context_exception(self):
+ context = oslo_context.RequestContext()
+ exc = self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'connection'
+ )
+
+ self.assertRegexpMatches(
+ exc.args[0],
+ "No TransactionContext is established for "
+ "this .*RequestContext.* object within the current "
+ "thread; the 'connection' attribute is unavailable."
+ )
+
+ def test_connection_context_getattr(self):
+ context = oslo_context.RequestContext()
+ self.assertIsNone(getattr(context, 'connection', None))
+
+ def test_transaction_context_exception(self):
+ context = oslo_context.RequestContext()
+ exc = self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'transaction'
+ )
+
+ self.assertRegexpMatches(
+ exc.args[0],
+ "No TransactionContext is established for "
+ "this .*RequestContext.* object within the current "
+ "thread; the 'transaction' attribute is unavailable."
+ )
+
+ def test_transaction_context_getattr(self):
+ context = oslo_context.RequestContext()
+ self.assertIsNone(getattr(context, 'transaction', None))
+
+ def test_trans_ctx_context_exception(self):
+ context = oslo_context.RequestContext()
+ exc = self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'transaction_ctx'
+ )
+
+ self.assertRegexpMatches(
+ exc.args[0],
+ "No TransactionContext is established for "
+ "this .*RequestContext.* object within the current "
+ "thread."
+ )
+
+ def test_trans_ctx_context_getattr(self):
+ context = oslo_context.RequestContext()
+ self.assertIsNone(getattr(context, 'transaction_ctx', None))
+
+ def test_multiple_factories(self):
+ """Test that the instrumentation applied to a context class is
+
+ independent of a specific _TransactionContextManager
+ / _TransactionFactory.
+
+ """
+ mgr1 = enginefacade.transaction_context()
+ mgr1.configure(
+ connection=self.engine_uri,
+ slave_connection=self.slave_uri
+ )
+ mgr2 = enginefacade.transaction_context()
+ mgr2.configure(
+ connection=self.engine_uri,
+ slave_connection=self.slave_uri
+ )
+
+ context = oslo_context.RequestContext()
+
+ self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'session'
+ )
+ with mgr1.writer.using(context):
+ self.assertIs(context.transaction_ctx.factory, mgr1._factory)
+ self.assertIsNot(context.transaction_ctx.factory, mgr2._factory)
+ self.assertIsNotNone(context.session)
+
+ self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'session'
+ )
+ with mgr2.writer.using(context):
+ self.assertIsNot(context.transaction_ctx.factory, mgr1._factory)
+ self.assertIs(context.transaction_ctx.factory, mgr2._factory)
+ self.assertIsNotNone(context.session)
+
+ def test_multiple_factories_nested(self):
+ """Test that the instrumentation applied to a context class supports
+
+ nested calls among multiple _TransactionContextManager objects.
+
+ """
+ mgr1 = enginefacade.transaction_context()
+ mgr1.configure(
+ connection=self.engine_uri,
+ slave_connection=self.slave_uri
+ )
+ mgr2 = enginefacade.transaction_context()
+ mgr2.configure(
+ connection=self.engine_uri,
+ slave_connection=self.slave_uri
+ )
+
+ context = oslo_context.RequestContext()
+
+ with mgr1.writer.using(context):
+ self.assertIs(context.transaction_ctx.factory, mgr1._factory)
+ self.assertIsNot(context.transaction_ctx.factory, mgr2._factory)
+
+ with mgr2.writer.using(context):
+ self.assertIsNot(
+ context.transaction_ctx.factory, mgr1._factory)
+ self.assertIs(context.transaction_ctx.factory, mgr2._factory)
+ self.assertIsNotNone(context.session)
+
+ # mgr1 is restored
+ self.assertIs(context.transaction_ctx.factory, mgr1._factory)
+ self.assertIsNot(context.transaction_ctx.factory, mgr2._factory)
+ self.assertIsNotNone(context.session)
+
+ self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'transaction_ctx'
+ )
+
+
+class SynchronousReaderWSlaveMockFacadeTest(MockFacadeTest):
+ synchronous_reader = True
+
+ engine_uri = 'some_connection'
+ slave_uri = 'some_slave_connection'
+
+
+class AsyncReaderWSlaveMockFacadeTest(MockFacadeTest):
+ synchronous_reader = False
+
+ engine_uri = 'some_connection'
+ slave_uri = 'some_slave_connection'
+
+
+class LegacyIntegrationtest(test_base.DbTestCase):
+
+ def test_legacy_integration(self):
+ legacy_facade = enginefacade.get_legacy_facade()
+ self.assertTrue(
+ legacy_facade.get_engine() is
+ enginefacade._context_manager._factory._writer_engine
+ )
+
+ self.assertTrue(
+ enginefacade.get_legacy_facade() is legacy_facade
+ )
+
+
+class ThreadingTest(test_base.DbTestCase):
+ """Test copying on new threads using real connections and sessions."""
+
+ def _assert_ctx_connection(self, context, connection):
+ self.assertIs(context.connection, connection)
+
+ def _assert_ctx_session(self, context, session):
+ self.assertIs(context.session, session)
+
+ def _patch_thread_ident(self):
+ self.ident = 1
+
+ test_instance = self
+
+ class MockThreadingLocal(object):
+ def __init__(self):
+ self.__dict__['state'] = collections.defaultdict(dict)
+
+ def __deepcopy__(self, memo):
+ return self
+
+ def __getattr__(self, key):
+ ns = self.state[test_instance.ident]
+ try:
+ return ns[key]
+ except KeyError:
+ raise AttributeError(key)
+
+ def __setattr__(self, key, value):
+ ns = self.state[test_instance.ident]
+ ns[key] = value
+
+ def __delattr__(self, key):
+ ns = self.state[test_instance.ident]
+ try:
+ del ns[key]
+ except KeyError:
+ raise AttributeError(key)
+
+ return mock.patch.object(
+ enginefacade, "_TransactionContextTLocal", MockThreadingLocal)
+
+ def test_thread_ctxmanager_writer(self):
+ context = oslo_context.RequestContext()
+
+ with self._patch_thread_ident():
+ with enginefacade.writer.using(context) as session:
+ self._assert_ctx_session(context, session)
+
+ self.ident = 2
+
+ with enginefacade.reader.using(context) as sess2:
+ # new session
+ self.assertIsNot(sess2, session)
+
+ # thread local shows the new session
+ self._assert_ctx_session(context, sess2)
+
+ self.ident = 1
+
+ with enginefacade.reader.using(context) as sess3:
+ self.assertIs(sess3, session)
+ self._assert_ctx_session(context, session)
+
+ def test_thread_ctxmanager_writer_connection(self):
+ context = oslo_context.RequestContext()
+
+ with self._patch_thread_ident():
+ with enginefacade.writer.connection.using(context) as conn:
+ self._assert_ctx_connection(context, conn)
+
+ self.ident = 2
+
+ with enginefacade.reader.connection.using(context) as conn2:
+ # new connection
+ self.assertIsNot(conn2, conn)
+
+ # thread local shows the new connection
+ self._assert_ctx_connection(context, conn2)
+
+ with enginefacade.reader.connection.using(
+ context) as conn3:
+ # we still get the right connection even though
+ # this context is not the "copied" context
+ self.assertIsNot(conn3, conn)
+ self.assertIs(conn3, conn2)
+
+ self.ident = 1
+
+ with enginefacade.reader.connection.using(context) as conn3:
+ self.assertIs(conn3, conn)
+ self._assert_ctx_connection(context, conn)
+
+ def test_thread_ctxmanager_switch_styles(self):
+
+ @enginefacade.writer.connection
+ def go_one(context):
+ self.assertIsNone(context.session)
+ self.assertIsNotNone(context.connection)
+
+ self.ident = 2
+ go_two(context)
+
+ self.ident = 1
+ self.assertIsNone(context.session)
+ self.assertIsNotNone(context.connection)
+
+ @enginefacade.reader
+ def go_two(context):
+ self.assertIsNone(context.connection)
+ self.assertIsNotNone(context.session)
+
+ context = oslo_context.RequestContext()
+ with self._patch_thread_ident():
+ go_one(context)
+
+ def test_thread_decorator_writer(self):
+ sessions = set()
+
+ @enginefacade.writer
+ def go_one(context):
+ sessions.add(context.session)
+
+ self.ident = 2
+ go_two(context)
+
+ self.ident = 1
+
+ go_three(context)
+
+ @enginefacade.reader
+ def go_two(context):
+ assert context.session not in sessions
+
+ @enginefacade.reader
+ def go_three(context):
+ assert context.session in sessions
+
+ context = oslo_context.RequestContext()
+ with self._patch_thread_ident():
+ go_one(context)
+
+ def test_thread_decorator_writer_connection(self):
+ connections = set()
+
+ @enginefacade.writer.connection
+ def go_one(context):
+ connections.add(context.connection)
+
+ self.ident = 2
+ go_two(context)
+
+ self.ident = 1
+
+ go_three(context)
+
+ @enginefacade.reader.connection
+ def go_two(context):
+ assert context.connection not in connections
+
+ @enginefacade.reader
+ def go_three(context):
+ assert context.connection in connections
+
+ context = oslo_context.RequestContext()
+ with self._patch_thread_ident():
+ go_one(context)
+
+
+class LiveFacadeTest(test_base.DbTestCase):
+ """test using live SQL with test-provisioned databases.
+
+ Several of these tests require that multiple transactions run
+ simultaenously; as the default SQLite :memory: connection can't achieve
+ this, opportunistic test implementations against MySQL and PostgreSQL are
+ supplied.
+
+ """
+
+ def setUp(self):
+ super(LiveFacadeTest, self).setUp()
+
+ metadata = MetaData()
+ user_table = Table(
+ 'user', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30)),
+ mysql_engine='InnoDB'
+ )
+ self.user_table = user_table
+ metadata.create_all(self.engine)
+ self.addCleanup(metadata.drop_all, self.engine)
+
+ class User(object):
+ def __init__(self, name):
+ self.name = name
+
+ mapper(User, user_table)
+ self.User = User
+
+ def _assert_ctx_connection(self, context, connection):
+ self.assertIs(context.connection, connection)
+
+ def _assert_ctx_session(self, context, session):
+ self.assertIs(context.session, session)
+
+ def test_transaction_committed(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.writer.using(context) as session:
+ session.add(self.User(name="u1"))
+
+ session = self.sessionmaker(autocommit=True)
+ self.assertEqual(
+ "u1",
+ session.query(self.User.name).scalar()
+ )
+
+ def test_transaction_rollback(self):
+ context = oslo_context.RequestContext()
+
+ class MyException(Exception):
+ pass
+
+ @enginefacade.writer
+ def go(context):
+ context.session.add(self.User(name="u1"))
+ context.session.flush()
+ raise MyException("a test")
+
+ self.assertRaises(MyException, go, context)
+
+ session = self.sessionmaker(autocommit=True)
+ self.assertEqual(
+ None,
+ session.query(self.User.name).scalar()
+ )
+
+ def test_context_deepcopy_on_session(self):
+ context = oslo_context.RequestContext()
+ with enginefacade.writer.using(context) as session:
+
+ ctx2 = copy.deepcopy(context)
+ self._assert_ctx_session(ctx2, session)
+
+ with enginefacade.writer.using(ctx2) as s2:
+ self.assertIs(session, s2)
+ self._assert_ctx_session(ctx2, s2)
+
+ s2.add(self.User(name="u1"))
+ s2.flush()
+
+ session = self.sessionmaker(autocommit=True)
+ self.assertEqual(
+ "u1",
+ session.query(self.User.name).scalar()
+ )
+
+ def test_context_deepcopy_on_connection(self):
+ context = oslo_context.RequestContext()
+ with enginefacade.writer.connection.using(context) as conn:
+
+ ctx2 = copy.deepcopy(context)
+ self._assert_ctx_connection(ctx2, conn)
+
+ with enginefacade.writer.connection.using(ctx2) as conn2:
+ self.assertIs(conn, conn2)
+ self._assert_ctx_connection(ctx2, conn2)
+
+ conn2.execute(self.user_table.insert().values(name="u1"))
+
+ self._assert_ctx_connection(ctx2, conn2)
+
+ session = self.sessionmaker(autocommit=True)
+ self.assertEqual(
+ "u1",
+ session.query(self.User.name).scalar()
+ )
+
+ @test_base.backend_specific("postgresql", "mysql")
+ def test_external_session_transaction(self):
+ context = oslo_context.RequestContext()
+ with enginefacade.writer.using(context) as session:
+ session.add(self.User(name="u1"))
+ session.flush()
+
+ with enginefacade.writer.independent.using(context) as s2:
+ # transaction() uses a new session
+ self.assertIsNot(s2, session)
+ self._assert_ctx_session(context, s2)
+
+ # rows within a distinct transaction
+ s2.add(self.User(name="u2"))
+
+ # it also takes over the global enginefacade
+ # within the context
+ with enginefacade.writer.using(context) as s3:
+ self.assertIs(s3, s2)
+ s3.add(self.User(name="u3"))
+
+ self._assert_ctx_session(context, session)
+
+ # rollback the "outer" transaction
+ session.rollback()
+
+ # add more state on the "outer" transaction
+ session.begin()
+ session.add(self.User(name="u4"))
+
+ session = self.sessionmaker(autocommit=True)
+
+ # inner transction + second part of "outer" transaction were committed
+ self.assertEqual(
+ [("u2",), ("u3",), ("u4", )],
+ session.query(
+ self.User.name).order_by(self.User.name).all()
+ )
+
+ def test_savepoint_transaction_decorator(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.writer
+ def go1(context):
+ session = context.session
+ session.add(self.User(name="u1"))
+ session.flush()
+
+ try:
+ go2(context)
+ except Exception:
+ pass
+
+ go3(context)
+
+ session.add(self.User(name="u4"))
+
+ @enginefacade.writer.savepoint
+ def go2(context):
+ session = context.session
+ session.add(self.User(name="u2"))
+ raise Exception("nope")
+
+ @enginefacade.writer.savepoint
+ def go3(context):
+ session = context.session
+ session.add(self.User(name="u3"))
+
+ go1(context)
+
+ session = self.sessionmaker(autocommit=True)
+
+ # inner transction + second part of "outer" transaction were committed
+ self.assertEqual(
+ [("u1",), ("u3",), ("u4", )],
+ session.query(
+ self.User.name).order_by(self.User.name).all()
+ )
+
+ def test_savepoint_transaction(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.writer.using(context) as session:
+ session.add(self.User(name="u1"))
+ session.flush()
+
+ try:
+ with enginefacade.writer.savepoint.using(context) as session:
+ session.add(self.User(name="u2"))
+ raise Exception("nope")
+ except Exception:
+ pass
+
+ with enginefacade.writer.savepoint.using(context) as session:
+ session.add(self.User(name="u3"))
+
+ session.add(self.User(name="u4"))
+
+ session = self.sessionmaker(autocommit=True)
+
+ # inner transction + second part of "outer" transaction were committed
+ self.assertEqual(
+ [("u1",), ("u3",), ("u4", )],
+ session.query(
+ self.User.name).order_by(self.User.name).all()
+ )
+
+ @test_base.backend_specific("postgresql", "mysql")
+ def test_external_session_transaction_decorator(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.writer
+ def go1(context):
+ session = context.session
+ session.add(self.User(name="u1"))
+ session.flush()
+
+ go2(context, session)
+
+ self._assert_ctx_session(context, session)
+
+ # rollback the "outer" transaction
+ session.rollback()
+
+ # add more state on the "outer" transaction
+ session.begin()
+ session.add(self.User(name="u4"))
+
+ @enginefacade.writer.independent
+ def go2(context, session):
+ s2 = context.session
+ # uses a new session
+ self.assertIsNot(s2, session)
+ self._assert_ctx_session(context, s2)
+
+ # rows within a distinct transaction
+ s2.add(self.User(name="u2"))
+
+ # it also takes over the global enginefacade
+ # within the context
+ with enginefacade.writer.using(context) as s3:
+ self.assertIs(s3, s2)
+ s3.add(self.User(name="u3"))
+
+ go1(context)
+
+ session = self.sessionmaker(autocommit=True)
+
+ # inner transction + second part of "outer" transaction were committed
+ self.assertEqual(
+ [("u2",), ("u3",), ("u4", )],
+ session.query(
+ self.User.name).order_by(self.User.name).all()
+ )
+
+ @test_base.backend_specific("postgresql", "mysql")
+ def test_external_connection_transaction(self):
+ context = oslo_context.RequestContext()
+ with enginefacade.writer.connection.using(context) as connection:
+ connection.execute(self.user_table.insert().values(name="u1"))
+
+ # transaction() uses a new Connection
+ with enginefacade.writer.independent.connection.\
+ using(context) as c2:
+ self.assertIsNot(c2, connection)
+ self._assert_ctx_connection(context, c2)
+
+ # rows within a distinct transaction
+ c2.execute(self.user_table.insert().values(name="u2"))
+
+ # it also takes over the global enginefacade
+ # within the context
+ with enginefacade.writer.connection.using(context) as c3:
+ self.assertIs(c2, c3)
+ c3.execute(self.user_table.insert().values(name="u3"))
+ self._assert_ctx_connection(context, connection)
+
+ # rollback the "outer" transaction
+ transaction_ctx = context.transaction_ctx
+ transaction_ctx.transaction.rollback()
+ transaction_ctx.transaction = connection.begin()
+
+ # add more state on the "outer" transaction
+ connection.execute(self.user_table.insert().values(name="u4"))
+
+ session = self.sessionmaker(autocommit=True)
+ self.assertEqual(
+ [("u2",), ("u3",), ("u4", )],
+ session.query(
+ self.User.name).order_by(self.User.name).all()
+ )
+
+ @test_base.backend_specific("postgresql", "mysql")
+ def test_external_writer_in_reader(self):
+ context = oslo_context.RequestContext()
+ with enginefacade.reader.using(context) as session:
+ ping = session.scalar(select([1]))
+ self.assertEqual(1, ping)
+
+ # we're definitely a reader
+ @enginefacade.writer
+ def go(ctx):
+ pass
+ exc = self.assertRaises(TypeError, go, context)
+ self.assertEqual(
+ "Can't upgrade a READER transaction to a "
+ "WRITER mid-transaction",
+ exc.args[0])
+
+ # but we can do a writer on a new transaction
+ with enginefacade.writer.independent.using(context) as sess2:
+ self.assertIsNot(sess2, session)
+ self._assert_ctx_session(context, sess2)
+
+ session.add(self.User(name="u1_nocommit"))
+ sess2.add(self.User(name="u1_commit"))
+
+ user = session.query(self.User).first()
+ self.assertEqual("u1_commit", user.name)
+
+ session = self.sessionmaker(autocommit=True)
+ self.assertEqual(
+ session.query(
+ self.User.name).order_by(self.User.name).all(),
+ [("u1_commit",)]
+ )
+
+ def test_replace_scope(self):
+ # "timeout" is an argument accepted by
+ # the pysqlite dialect, which we set here to ensure
+ # that even in an all-sqlite test, we test that the URL
+ # is different in the context we are looking for
+ alt_connection = "sqlite:///?timeout=90"
+
+ alt_mgr1 = enginefacade.transaction_context()
+ alt_mgr1.configure(
+ connection=alt_connection,
+ )
+
+ @enginefacade.writer
+ def go1(context):
+ s1 = context.session
+ self.assertEqual(
+ s1.bind.url,
+ enginefacade._context_manager._factory._writer_engine.url)
+ self.assertIs(
+ s1.bind,
+ enginefacade._context_manager._factory._writer_engine)
+ self.assertEqual(s1.bind.url, self.engine.url)
+
+ with alt_mgr1.replace.using(context):
+ go2(context)
+
+ go4(context)
+
+ @enginefacade.writer
+ def go2(context):
+ s2 = context.session
+
+ # factory is not replaced globally...
+ self.assertIsNot(
+ enginefacade._context_manager._factory._writer_engine,
+ alt_mgr1._factory._writer_engine
+ )
+
+ # but it is replaced for us
+ self.assertIs(s2.bind, alt_mgr1._factory._writer_engine)
+ self.assertEqual(
+ str(s2.bind.url), alt_connection)
+
+ go3(context)
+
+ @enginefacade.reader
+ def go3(context):
+ s3 = context.session
+
+ # in a call of a call, we still have the alt URL
+ self.assertIs(s3.bind, alt_mgr1._factory._writer_engine)
+ self.assertEqual(
+ str(s3.bind.url), alt_connection)
+
+ @enginefacade.writer
+ def go4(context):
+ s4 = context.session
+
+ # outside the "replace" context, all is back to normal
+ self.assertIs(s4.bind, self.engine)
+ self.assertEqual(
+ s4.bind.url, self.engine.url)
+
+ context = oslo_context.RequestContext()
+ go1(context)
+ self.assertIsNot(
+ enginefacade._context_manager._factory._writer_engine,
+ alt_mgr1._factory._writer_engine
+ )
+
+ def test_replace_scope_only_global_eng(self):
+ # "timeout" is an argument accepted by
+ # the pysqlite dialect, which we set here to ensure
+ # that even in an all-sqlite test, we test that the URL
+ # is different in the context we are looking for
+ alt_connection1 = "sqlite:///?timeout=90"
+
+ alt_mgr1 = enginefacade.transaction_context()
+ alt_mgr1.configure(
+ connection=alt_connection1,
+ )
+
+ alt_connection2 = "sqlite:///?timeout=120"
+
+ alt_mgr2 = enginefacade.transaction_context()
+ alt_mgr2.configure(
+ connection=alt_connection2,
+ )
+
+ @enginefacade.writer
+ def go1(context):
+ s1 = context.session
+ # global engine
+ self.assertEqual(s1.bind.url, self.engine.url)
+
+ # now replace global engine...
+ with alt_mgr1.replace.using(context):
+ go2(context)
+
+ # and back
+ go6(context)
+
+ @enginefacade.writer
+ def go2(context):
+ s2 = context.session
+
+ # we have the replace-the-global engine
+ self.assertEqual(str(s2.bind.url), alt_connection1)
+ self.assertIs(s2.bind, alt_mgr1._factory._writer_engine)
+
+ go3(context)
+
+ @alt_mgr2.writer
+ def go3(context):
+ s3 = context.session
+
+ # we don't use the global engine in the first place.
+ # make sure our own factory still used.
+ self.assertEqual(str(s3.bind.url), alt_connection2)
+ self.assertIs(s3.bind, alt_mgr2._factory._writer_engine)
+
+ go4(context)
+
+ @enginefacade.writer
+ def go4(context):
+ s4 = context.session
+
+ # we *do* use the global, so we still want the replacement.
+ self.assertEqual(str(s4.bind.url), alt_connection1)
+ self.assertIs(s4.bind, alt_mgr1._factory._writer_engine)
+
+ @enginefacade.writer
+ def go5(context):
+ s5 = context.session
+
+ # ...and here also
+ self.assertEqual(str(s5.bind.url), alt_connection1)
+ self.assertIs(s5.bind, alt_mgr1._factory._writer_engine)
+
+ @enginefacade.writer
+ def go6(context):
+ s6 = context.session
+
+ # ...but not here!
+ self.assertEqual(str(s6.bind.url), str(self.engine.url))
+ self.assertIs(s6.bind, self.engine)
+
+ context = oslo_context.RequestContext()
+ go1(context)
+
+
+class MySQLLiveFacadeTest(
+ test_base.MySQLOpportunisticTestCase, LiveFacadeTest):
+ pass
+
+
+class PGLiveFacadeTest(
+ test_base.PostgreSQLOpportunisticTestCase, LiveFacadeTest):
+ pass
+
+
+class ConfigOptionsTest(oslo_test_base.BaseTestCase):
+ def test_all_options(self):
+ """test that everything in CONF.database.iteritems() is accepted.
+
+ There's a handful of options in oslo.db.options that seem to have
+ no meaning, but need to be accepted. In particular, Cinder and
+ maybe others are doing exactly this call.
+
+ """
+
+ factory = enginefacade._TransactionFactory()
+ cfg.CONF.register_opts(options.database_opts, 'database')
+ factory.configure(**dict(cfg.CONF.database.items()))
+
+ def test_options_not_supported(self):
+ factory = enginefacade._TransactionFactory()
+
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always")
+
+ factory.configure(fake1='x', idle_timeout=200, wrong2='y')
+
+ self.assertEqual(1, len(w))
+ self.assertTrue(
+ issubclass(w[-1].category, exception.NotSupportedWarning))
+ self.assertEqual(
+ "Configuration option(s) ['fake1', 'wrong2'] not supported",
+ str(w[-1].message)
+ )
+
+# TODO(zzzeek): test configuration options, e.g. like
+# test_sqlalchemy->test_creation_from_config
diff --git a/oslo_db/tests/sqlalchemy/test_exc_filters.py b/oslo_db/tests/sqlalchemy/test_exc_filters.py
index bafb59e..34313ce 100644
--- a/oslo_db/tests/sqlalchemy/test_exc_filters.py
+++ b/oslo_db/tests/sqlalchemy/test_exc_filters.py
@@ -23,8 +23,8 @@ from sqlalchemy import event
from sqlalchemy.orm import mapper
from oslo_db import exception
+from oslo_db.sqlalchemy import engines
from oslo_db.sqlalchemy import exc_filters
-from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import test_base
from oslo_db.tests import utils as test_utils
@@ -784,7 +784,7 @@ class TestDBDisconnected(TestsExceptionFilter):
dialect_name, exception, num_disconnects, is_disconnect=True):
engine = self.engine
- event.listen(engine, "engine_connect", session._connect_ping_listener)
+ event.listen(engine, "engine_connect", engines._connect_ping_listener)
real_do_execute = engine.dialect.do_execute
counter = itertools.count(1)
@@ -895,7 +895,7 @@ class TestDBConnectRetry(TestsExceptionFilter):
with self._dbapi_fixture(dialect_name):
with mock.patch.object(engine.dialect, "connect", cant_connect):
- return session._test_connection(engine, retries, .01)
+ return engines._test_connection(engine, retries, .01)
def test_connect_no_retries(self):
conn = self._run_test(
@@ -967,7 +967,7 @@ class TestDBConnectPingWrapping(TestsExceptionFilter):
def setUp(self):
super(TestDBConnectPingWrapping, self).setUp()
event.listen(
- self.engine, "engine_connect", session._connect_ping_listener)
+ self.engine, "engine_connect", engines._connect_ping_listener)
@contextlib.contextmanager
def _fixture(
diff --git a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py
index 516acb6..02b0af5 100644
--- a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py
+++ b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py
@@ -31,6 +31,7 @@ from sqlalchemy.ext.declarative import declarative_base
from oslo_db import exception
from oslo_db import options as db_options
+from oslo_db.sqlalchemy import engines
from oslo_db.sqlalchemy import models
from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import test_base
@@ -312,8 +313,8 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
self.assertFalse(ses.autocommit)
self.assertTrue(ses.expire_on_commit)
- @mock.patch('oslo_db.sqlalchemy.session.get_maker')
- @mock.patch('oslo_db.sqlalchemy.session.create_engine')
+ @mock.patch('oslo_db.sqlalchemy.orm.get_maker')
+ @mock.patch('oslo_db.sqlalchemy.engines.create_engine')
def test_creation_from_config(self, create_engine, get_maker):
conf = cfg.ConfigOpts()
conf.register_opts(db_options.database_opts, group='database')
@@ -495,7 +496,7 @@ class MysqlConnectTest(test_base.MySQLOpportunisticTestCase):
)
)
):
- session._init_events.dispatch_on_drivername("mysql")(test_engine)
+ engines._init_events.dispatch_on_drivername("mysql")(test_engine)
test_engine.raw_connection()
self.assertIn('Unable to detect effective SQL mode',
@@ -556,7 +557,7 @@ class CreateEngineTest(oslo_test.BaseTestCase):
self.args = {'connect_args': {}}
def test_queuepool_args(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("mysql://u:p@host/test"), self.args,
max_pool_size=10, max_overflow=10)
self.assertEqual(self.args['pool_size'], 10)
@@ -564,7 +565,7 @@ class CreateEngineTest(oslo_test.BaseTestCase):
def test_sqlite_memory_pool_args(self):
for _url in ("sqlite://", "sqlite:///:memory:"):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url(_url), self.args,
max_pool_size=10, max_overflow=10)
@@ -581,7 +582,7 @@ class CreateEngineTest(oslo_test.BaseTestCase):
self.assertTrue('poolclass' in self.args)
def test_sqlite_file_pool_args(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("sqlite:///somefile.db"), self.args,
max_pool_size=10, max_overflow=10)
@@ -597,37 +598,37 @@ class CreateEngineTest(oslo_test.BaseTestCase):
self.assertTrue('poolclass' not in self.args)
def test_mysql_connect_args_default(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("mysql://u:p@host/test"), self.args)
self.assertEqual(self.args['connect_args'],
{'charset': 'utf8', 'use_unicode': 0})
def test_mysql_oursql_connect_args_default(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("mysql+oursql://u:p@host/test"), self.args)
self.assertEqual(self.args['connect_args'],
{'charset': 'utf8', 'use_unicode': 0})
def test_mysql_mysqldb_connect_args_default(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("mysql+mysqldb://u:p@host/test"), self.args)
self.assertEqual(self.args['connect_args'],
{'charset': 'utf8', 'use_unicode': 0})
def test_postgresql_connect_args_default(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("postgresql://u:p@host/test"), self.args)
self.assertEqual(self.args['client_encoding'], 'utf8')
self.assertFalse(self.args['connect_args'])
def test_mysqlconnector_raise_on_warnings_default(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("mysql+mysqlconnector://u:p@host/test"),
self.args)
self.assertEqual(self.args['connect_args']['raise_on_warnings'], False)
def test_mysqlconnector_raise_on_warnings_override(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url(
"mysql+mysqlconnector://u:p@host/test"
"?raise_on_warnings=true"),
@@ -639,12 +640,12 @@ class CreateEngineTest(oslo_test.BaseTestCase):
def test_thread_checkin(self):
with mock.patch("sqlalchemy.event.listens_for"):
with mock.patch("sqlalchemy.event.listen") as listen_evt:
- session._init_events.dispatch_on_drivername(
+ engines._init_events.dispatch_on_drivername(
"sqlite")(mock.Mock())
self.assertEqual(
listen_evt.mock_calls[0][1][-1],
- session._thread_yield
+ engines._thread_yield
)
@@ -693,7 +694,7 @@ class PatchStacktraceTest(test_base.DbTestCase):
with mock.patch("traceback.extract_stack", side_effect=extract_stack):
- session._add_trace_comments(engine)
+ engines._add_trace_comments(engine)
conn = engine.connect()
orig_do_exec = engine.dialect.do_execute
with mock.patch.object(engine.dialect, "do_execute") as mock_exec: