summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-12-01 19:39:15 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2015-06-04 11:43:39 -0400
commitfdbd928b1fdf0334e1740e565ab8206fff54eaa6 (patch)
treef4ae7b4c94ed32ce9e1f80391f8c0f2c20305c79
parent42dc93608f527406534ea8b10a76556eb92fd9dd (diff)
downloadoslo-db-fdbd928b1fdf0334e1740e565ab8206fff54eaa6.tar.gz
Implement new oslo.db.sqlalchemy.enginefacade module
This module presents a replacement for the EngineFacade system. At the center is the oslo.db.sqlalchemy.enginefacade module, which when imported, provides decorators and context managers which perform all database and ORM connectivity functions transparently. The docstrings as well as the blueprint provide an introduction. The patch includes a refactoring of sqlalchemy/session.py into three dependent modules engines.py, orm.py and enginefacade.py. This is to maintain a non-cyclical import structure as well as to maintain the import behavior of oslo.db overall, as some projects such as glance currently have dependencies on this structure. There is also a slimming down and attempt at modernizing some very old documentation in session.py. The enginefacade system should be preferred moving forward. Implements: blueprint make-enginefacade-a-facade Change-Id: I9a3d0c26bb727eb2c0bd823b9a12fde57cc7c9c3
-rw-r--r--doc/source/usage.rst93
-rw-r--r--oslo_db/exception.py43
-rw-r--r--oslo_db/sqlalchemy/enginefacade.py995
-rw-r--r--oslo_db/sqlalchemy/engines.py413
-rw-r--r--oslo_db/sqlalchemy/orm.py66
-rw-r--r--oslo_db/sqlalchemy/session.py874
-rw-r--r--oslo_db/sqlalchemy/test_base.py7
-rw-r--r--oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py8
-rw-r--r--oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py50
-rw-r--r--oslo_db/tests/sqlalchemy/test_enginefacade.py1646
-rw-r--r--oslo_db/tests/sqlalchemy/test_exc_filters.py8
-rw-r--r--oslo_db/tests/sqlalchemy/test_sqlalchemy.py31
12 files changed, 3397 insertions, 837 deletions
diff --git a/doc/source/usage.rst b/doc/source/usage.rst
index f352ee9..11c8c2a 100644
--- a/doc/source/usage.rst
+++ b/doc/source/usage.rst
@@ -7,26 +7,91 @@ To use oslo.db in a project:
Session Handling
================
+Session handling is achieved using the :mod:`oslo_db.sqlalchemy.enginefacade`
+system. This module presents a function decorator as well as a
+context manager approach to delivering :class:`.Session` as well as
+:class:`.Connection` objects to a function or block.
+
+Both calling styles require the use of a context object. This object may
+be of any class, though when used with the decorator form, requires
+special instrumentation.
+
+The context manager form is as follows:
+
.. code:: python
- from oslo.config import cfg
- from oslo.db.sqlalchemy import session as db_session
- _FACADE = None
+ from oslo_db.sqlalchemy import enginefacade
- def _create_facade_lazily():
- global _FACADE
- if _FACADE is None:
- _FACADE = db_session.EngineFacade.from_config(cfg.CONF)
- return _FACADE
- def get_engine():
- facade = _create_facade_lazily()
- return facade.get_engine()
+ class MyContext(object):
+ "User-defined context class."
+
+
+ def some_reader_api_function(context):
+ with enginefacade.reader.using(context) as session:
+ return session.query(SomeClass).all()
+
+
+ def some_writer_api_function(context, x, y):
+ with enginefacade.writer.using(context) as session:
+ session.add(SomeClass(x, y))
+
+
+ def run_some_database_calls():
+ context = MyContext()
+
+ results = some_reader_api_function(context)
+ some_writer_api_function(context, 5, 10)
+
+
+The decorator form accesses attributes off the user-defined context
+directly; the context must be decorated with the
+:func:`oslo_db.sqlalchemy.enginefacade.transaction_context_provider`
+decorator. Each function must receive the context as the first
+positional argument:
+
+.. code:: python
+
+
+ from oslo_db.sqlalchemy import enginefacade
+
+ @enginefacade.transaction_context_provider
+ class MyContext(object):
+ "User-defined context class."
+
+ @enginefacade.reader
+ def some_reader_api_function(context):
+ return context.session.query(SomeClass).all()
+
+
+ @enginefacade.writer
+ def some_writer_api_function(context, x, y):
+ context.session.add(SomeClass(x, y))
+
+
+ def run_some_database_calls():
+ context = MyContext()
+
+ results = some_reader_api_function(context)
+ some_writer_api_function(context, 5, 10)
+
+The scope of transaction and connectivity for both approaches is managed
+transparently. The configuration for the connection comes from the standard
+:obj:`oslo_config.cfg.CONF` collection. Additional configurations can be
+established for the enginefacade using the
+:func:`oslo_db.sqlalchemy.enginefacade.configure` function, before any use of
+the database begins:
+
+.. code:: python
+
+ from oslo_db.sqlalchemy import enginefacade
- def get_session(**kwargs):
- facade = _create_facade_lazily()
- return facade.get_session(**kwargs)
+ enginefacade.configure(
+ sqlite_fk=True,
+ max_retries=5,
+ mysql_sql_mode='ANSI'
+ )
Base class for models usage
diff --git a/oslo_db/exception.py b/oslo_db/exception.py
index 506006c..c1620e1 100644
--- a/oslo_db/exception.py
+++ b/oslo_db/exception.py
@@ -205,3 +205,46 @@ class RetryRequest(Exception):
"""
def __init__(self, inner_exc):
self.inner_exc = inner_exc
+
+
+class NoEngineContextEstablished(AttributeError):
+ """Error raised for non-present enginefacade attribute access.
+
+
+ This applies to the ``session`` and ``connection`` attributes
+ of a user-defined context and/or RequestContext object, when they
+ are accessed outside of the scope of an enginefacade decorator
+ or context manager.
+
+ The exception is a subclass of AttributeError so that
+ normal Python missing attribute behaviors are maintained, such
+ as support for ``getattr(context, 'session', None)``.
+
+
+ """
+
+
+class NotSupportedWarning(Warning):
+ """Warn that an argument or call that was passed is not supported.
+
+ This subclasses Warning so that it can be filtered as a distinct
+ category.
+
+ .. seealso::
+
+ https://docs.python.org/2/library/warnings.html
+
+ """
+
+
+class OsloDBDeprecationWarning(DeprecationWarning):
+ """Issued per usage of a deprecated API.
+
+ This subclasses DeprecationWarning so that it can be filtered as a distinct
+ category.
+
+ .. seealso::
+
+ https://docs.python.org/2/library/warnings.html
+
+ """
diff --git a/oslo_db/sqlalchemy/enginefacade.py b/oslo_db/sqlalchemy/enginefacade.py
new file mode 100644
index 0000000..c21f39a
--- /dev/null
+++ b/oslo_db/sqlalchemy/enginefacade.py
@@ -0,0 +1,995 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import contextlib
+import functools
+import operator
+import threading
+import warnings
+
+from oslo_config import cfg
+from oslo_context import context as oslo_context
+
+from oslo_db import exception
+from oslo_db import options
+from oslo_db.sqlalchemy import engines
+from oslo_db.sqlalchemy import orm
+
+
+class _symbol(object):
+ """represent a fixed symbol."""
+
+ __slots__ = 'name',
+
+ def __init__(self, name):
+ self.name = name
+
+ def __repr__(self):
+ return "symbol(%r)" % self.name
+
+
+_ASYNC_READER = _symbol('ASYNC_READER')
+"""Represent the transaction state of "async reader".
+
+This state indicates that the transaction is a read-only and is
+safe to use on an asynchronously updated slave database.
+
+"""
+
+_READER = _symbol('READER')
+"""Represent the transaction state of "reader".
+
+This state indicates that the transaction is a read-only and is
+only safe to use on a synchronously updated slave database; otherwise
+the master database should be used.
+
+"""
+
+
+_WRITER = _symbol('WRITER')
+"""Represent the transaction state of "writer".
+
+This state indicates that the transaction writes data and
+should be directed at the master database.
+
+"""
+
+
+class _Default(object):
+ """Mark a value as a default value.
+
+ A value in the local configuration dictionary wrapped with
+ _Default() will not take precedence over a value that is specified
+ in cfg.CONF. Values that are set after the fact using configure()
+ will supersede those in cfg.CONF.
+
+ """
+
+ __slots__ = 'value',
+
+ _notset = _symbol("NOTSET")
+
+ def __init__(self, value=_notset):
+ self.value = value
+
+ @classmethod
+ def resolve(cls, value):
+ if isinstance(value, _Default):
+ v = value.value
+ if v is cls._notset:
+ return None
+ else:
+ return v
+ else:
+ return value
+
+ @classmethod
+ def resolve_w_conf(cls, value, conf_namespace, key):
+ if isinstance(value, _Default):
+ v = getattr(conf_namespace, key, value.value)
+ if v is cls._notset:
+ return None
+ else:
+ return v
+ else:
+ return value
+
+ @classmethod
+ def is_set(cls, value):
+ return not isinstance(value, _Default) or \
+ value.value is not cls._notset
+
+ @classmethod
+ def is_set_w_conf(cls, value, conf_namespace, key):
+ return not isinstance(value, _Default) or \
+ value.value is not cls._notset or \
+ hasattr(conf_namespace, key)
+
+
+class _TransactionFactory(object):
+ """A factory for :class:`._TransactionContext` objects.
+
+ By default, there is just one of these, set up
+ based on CONF, however instance-level :class:`._TransactionFactory`
+ objects can be made, as is the case with the
+ :class:`._TestTransactionFactory` subclass used by the oslo.db test suite.
+
+ """
+ def __init__(self):
+ self._url_cfg = {
+ 'connection': _Default(),
+ 'slave_connection': _Default(),
+ }
+ self._engine_cfg = {
+ 'sqlite_fk': _Default(False),
+ 'mysql_sql_mode': _Default('TRADITIONAL'),
+ 'idle_timeout': _Default(3600),
+ 'connection_debug': _Default(0),
+ 'max_pool_size': _Default(),
+ 'max_overflow': _Default(),
+ 'pool_timeout': _Default(),
+ 'sqlite_synchronous': _Default(True),
+ 'connection_trace': _Default(False),
+ 'max_retries': _Default(10),
+ 'retry_interval': _Default(10),
+ 'thread_checkin': _Default(True)
+ }
+ self._maker_cfg = {
+ 'expire_on_commit': _Default(False),
+ '__autocommit': True
+ }
+ self._transaction_ctx_cfg = {
+ 'rollback_reader_sessions': False,
+ }
+ self._facade_cfg = {
+ 'synchronous_reader': True
+ }
+
+ # other options that are defined in oslo.db.options.database_opts
+ # but do not apply to the standard enginefacade arguments
+ # (most seem to apply to api.DBAPI).
+ self._ignored_cfg = dict(
+ (k, _Default(None)) for k in [
+ 'db_max_retries', 'db_inc_retry_interval',
+ 'use_db_reconnect',
+ 'db_retry_interval', 'min_pool_size',
+ 'db_max_retry_interval',
+ 'sqlite_db', 'backend'])
+
+ self._started = False
+ self._legacy_facade = None
+ self._start_lock = threading.Lock()
+
+ def configure_defaults(self, **kw):
+ """Apply default configurational options.
+
+ This method can only be called before any specific
+ transaction-beginning methods have been called.
+
+ Configurational options are within a fixed set of keys, and fall
+ under three categories: URL configuration, engine configuration,
+ and session configuration. Each key given will be tested against
+ these three configuration sets to see which one is applicable; if
+ it is not applicable to any set, an exception is raised.
+
+ The configurational options given here act as **defaults**
+ when the :class:`._TransactionFactory` is configured using
+ a :class:`.oslo.config.cfg.ConfigOpts` object; the options
+ present within the :class:`.oslo.config.cfg.ConfigOpts` **take
+ precedence** versus the arguments passed here. By default,
+ the :class:`._TransactionFactory` loads in the configuration from
+ :data:`oslo.config.cfg.CONF`, after applying the
+ :data:`oslo.db.options.database_opts` configurational defaults to it.
+
+ .. seealso::
+
+ :meth:`._TransactionFactory.configure`
+
+ """
+ self._configure(True, kw)
+
+ def configure(self, **kw):
+ """Apply configurational options.
+
+ This method can only be called before any specific
+ transaction-beginning methods have been called.
+
+ Behavior here is the same as that of
+ :meth:`._TransactionFactory.configure_defaults`,
+ with the exception that values specified here will **supersede** those
+ setup in the :class:`.oslo.config.cfg.ConfigOpts` options.
+
+ .. seealso::
+
+ :meth:`._TransactionFactory.configure_defaults`
+
+ """
+ self._configure(False, kw)
+
+ def _configure(self, as_defaults, kw):
+
+ if self._started:
+ raise TypeError("this TransactionFactory is already started")
+ not_supported = []
+ for k, v in kw.items():
+ for dict_ in (
+ self._url_cfg, self._engine_cfg,
+ self._maker_cfg, self._ignored_cfg,
+ self._facade_cfg, self._transaction_ctx_cfg):
+ if k in dict_:
+ dict_[k] = _Default(v) if as_defaults else v
+ break
+ else:
+ not_supported.append(k)
+
+ if not_supported:
+ # would like to raise ValueError here, but there are just
+ # too many unrecognized (obsolete?) configuration options
+ # coming in from projects
+ warnings.warn(
+ "Configuration option(s) %r not supported" %
+ sorted(not_supported),
+ exception.NotSupportedWarning
+ )
+
+ def get_legacy_facade(self):
+ """Return a :class:`.LegacyEngineFacade` for this factory.
+
+ This facade will make use of the same engine and sessionmaker
+ as this factory, however will not share the same transaction context;
+ the legacy facade continues to work the old way of returning
+ a new Session each time get_session() is called.
+
+ """
+ if not self._legacy_facade:
+ self._legacy_facade = LegacyEngineFacade(None, _factory=self)
+ if not self._started:
+ self._start()
+
+ return self._legacy_facade
+
+ def _create_connection(self, mode):
+ if not self._started:
+ self._start()
+ if mode is _WRITER:
+ return self._writer_engine.connect()
+ elif self.synchronous_reader or mode is _ASYNC_READER:
+ return self._reader_engine.connect()
+ else:
+ return self._writer_engine.connect()
+
+ def _create_session(self, mode, bind=None):
+ if not self._started:
+ self._start()
+ kw = {}
+ # don't pass 'bind' if bind is None; the sessionmaker
+ # already has a bind to the engine.
+ if bind:
+ kw['bind'] = bind
+ if mode is _WRITER:
+ return self._writer_maker(**kw)
+ elif self.synchronous_reader or mode is _ASYNC_READER:
+ return self._reader_maker(**kw)
+ else:
+ return self._writer_maker(**kw)
+
+ def _args_for_conf(self, default_cfg, conf):
+ if conf is None:
+ return dict(
+ (key, _Default.resolve(value))
+ for key, value in default_cfg.items()
+ if _Default.is_set(value)
+ )
+ else:
+ return dict(
+ (key, _Default.resolve_w_conf(value, conf.database, key))
+ for key, value in default_cfg.items()
+ if _Default.is_set_w_conf(value, conf.database, key)
+ )
+
+ def _url_args_for_conf(self, conf):
+ return self._args_for_conf(self._url_cfg, conf)
+
+ def _engine_args_for_conf(self, conf):
+ return self._args_for_conf(self._engine_cfg, conf)
+
+ def _maker_args_for_conf(self, conf):
+ return self._args_for_conf(self._maker_cfg, conf)
+
+ def _start(self, conf=False, connection=None, slave_connection=None):
+ with self._start_lock:
+ # self._started has been checked on the outside
+ # when _start() was called. Within the lock,
+ # check the flag once more to detect the case where
+ # the start process proceeded while this thread was waiting
+ # for the lock.
+ if self._started:
+ return
+ self._started = True
+ if conf is False:
+ conf = cfg.CONF
+
+ # perform register_opts() local to actually using
+ # the cfg.CONF to maintain exact compatibility with
+ # the EngineFacade design. This can be changed if needed.
+ if conf is not None:
+ conf.register_opts(options.database_opts, 'database')
+
+ url_args = self._url_args_for_conf(conf)
+ if connection:
+ url_args['connection'] = connection
+ if slave_connection:
+ url_args['slave_connection'] = slave_connection
+ engine_args = self._engine_args_for_conf(conf)
+ maker_args = self._maker_args_for_conf(conf)
+ maker_args['autocommit'] = maker_args.pop('__autocommit')
+
+ self._writer_engine, self._writer_maker = \
+ self._setup_for_connection(
+ url_args['connection'],
+ engine_args, maker_args)
+
+ if url_args.get('slave_connection'):
+ self._reader_engine, self._reader_maker = \
+ self._setup_for_connection(
+ url_args['slave_connection'],
+ engine_args, maker_args)
+ else:
+ self._reader_engine, self._reader_maker = \
+ self._writer_engine, self._writer_maker
+
+ self.synchronous_reader = self._facade_cfg['synchronous_reader']
+
+ def _setup_for_connection(
+ self, sql_connection, engine_kwargs, maker_kwargs):
+ engine = engines.create_engine(
+ sql_connection=sql_connection, **engine_kwargs)
+ sessionmaker = orm.get_maker(engine=engine, **maker_kwargs)
+ return engine, sessionmaker
+
+
+class _TestTransactionFactory(_TransactionFactory):
+ """A :class:`._TransactionFactory` used by test suites.
+
+ This is a :class:`._TransactionFactory` that can be directly injected
+ with an existing engine and sessionmaker.
+
+ Note that while this is used by oslo.db's own tests of
+ the enginefacade system, it is also exported for use by
+ the test suites of other projects, first as an element of the
+ oslo_db.sqlalchemy.test_base module, and secondly may be used by
+ external test suites directly.
+
+ Includes a feature to inject itself temporarily as the factory
+ within the global :class:`._TransactionContextManager`.
+
+ """
+ def __init__(self, engine, maker, apply_global, synchronous_reader):
+ self._reader_engine = self._writer_engine = engine
+ self._reader_maker = self._writer_maker = maker
+ self._started = True
+ self._legacy_facade = None
+ self.synchronous_reader = synchronous_reader
+
+ self._facade_cfg = _context_manager._factory._facade_cfg
+ self._transaction_ctx_cfg = \
+ _context_manager._factory._transaction_ctx_cfg
+ if apply_global:
+ self.existing_factory = _context_manager._factory
+ _context_manager._root_factory = self
+
+ def dispose_global(self):
+ _context_manager._root_factory = self.existing_factory
+
+
+class _TransactionContext(object):
+ """Represent a single database transaction in progress."""
+
+ def __init__(
+ self, factory, global_factory=None,
+ rollback_reader_sessions=False):
+ """Construct a new :class:`.TransactionContext`.
+
+ :param factory: the :class:`.TransactionFactory` which will
+ serve as a source of connectivity.
+
+ :param global_factory: the "global" factory which will be used
+ by the global ``_context_manager`` for new ``_TransactionContext``
+ objects created under this one. When left as None the actual
+ "global" factory is used.
+
+ :param rollback_reader_sessions: if True, a :class:`.Session` object
+ will have its :meth:`.Session.rollback` method invoked at the end
+ of a ``@reader`` block, actively rolling back the transaction and
+ expiring the objects within, before the :class:`.Session` moves
+ on to be closed, which has the effect of releasing connection
+ resources back to the connection pool and detaching all objects.
+ If False, the :class:`.Session` is
+ not affected at the end of a ``@reader`` block; the underlying
+ connection referred to by this :class:`.Session` will still
+ be released in the enclosing context via the :meth:`.Session.close`
+ method, which still ensures that the DBAPI connection is rolled
+ back, however the objects associated with the :class:`.Session`
+ retain their database-persisted contents after they are detached.
+
+ .. seealso::
+
+ http://docs.sqlalchemy.org/en/rel_0_9/glossary.html#term-released\
+ SQLAlchemy documentation on what "releasing resources" means.
+
+ """
+ self.factory = factory
+ self.global_factory = global_factory
+ self.mode = None
+ self.session = None
+ self.connection = None
+ self.transaction = None
+ kw = self.factory._transaction_ctx_cfg
+ self.rollback_reader_sessions = kw['rollback_reader_sessions']
+
+ @contextlib.contextmanager
+ def _connection(self, savepoint=False):
+ if self.connection is None:
+ try:
+ if self.session is not None:
+ # use existing session, which is outer to us
+ self.connection = self.session.connection()
+ if savepoint:
+ with self.connection.begin_nested():
+ yield self.connection
+ else:
+ yield self.connection
+ else:
+ # is outermost
+ self.connection = self.factory._create_connection(
+ mode=self.mode)
+ self.transaction = self.connection.begin()
+ try:
+ yield self.connection
+ self._end_connection_transaction(self.transaction)
+ except Exception:
+ self.transaction.rollback()
+ # TODO(zzzeek) do we need save_and_reraise() here,
+ # or do newer eventlets not have issues? we are using
+ # raw "raise" in many other places in oslo.db already
+ # (and one six.reraise()).
+ raise
+ finally:
+ self.transaction = None
+ self.connection.close()
+ finally:
+ self.connection = None
+
+ else:
+ # use existing connection, which is outer to us
+ if savepoint:
+ with self.connection.begin_nested():
+ yield self.connection
+ else:
+ yield self.connection
+
+ @contextlib.contextmanager
+ def _session(self, savepoint=False):
+ if self.session is None:
+ self.session = self.factory._create_session(
+ bind=self.connection, mode=self.mode)
+ try:
+ self.session.begin()
+ yield self.session
+ self._end_session_transaction(self.session)
+ except Exception:
+ self.session.rollback()
+ # TODO(zzzeek) do we need save_and_reraise() here,
+ # or do newer eventlets not have issues? we are using
+ # raw "raise" in many other places in oslo.db already
+ # (and one six.reraise()).
+ raise
+ finally:
+ self.session.close()
+ self.session = None
+ else:
+ # use existing session, which is outer to us
+ if savepoint:
+ with self.session.begin_nested():
+ yield self.session
+ else:
+ yield self.session
+
+ def _end_session_transaction(self, session):
+ if self.mode is _WRITER:
+ session.commit()
+ elif self.rollback_reader_sessions:
+ session.rollback()
+ # In the absense of calling session.rollback(),
+ # the next call is session.close(). This releases all
+ # objects from the session into the detached state, and
+ # releases the connection as well; the connection when returned
+ # to the pool is either rolled back in any case, or closed fully.
+
+ def _end_connection_transaction(self, transaction):
+ if self.mode is _WRITER:
+ transaction.commit()
+ else:
+ transaction.rollback()
+
+ def _produce_block(self, mode, connection, savepoint):
+ if mode is _WRITER:
+ self._writer()
+ elif mode is _ASYNC_READER:
+ self._async_reader()
+ else:
+ self._reader()
+ if connection:
+ return self._connection(savepoint)
+ else:
+ return self._session(savepoint)
+
+ def _writer(self):
+ if self.mode is None:
+ self.mode = _WRITER
+ elif self.mode is _READER:
+ raise TypeError(
+ "Can't upgrade a READER transaction "
+ "to a WRITER mid-transaction")
+ elif self.mode is _ASYNC_READER:
+ raise TypeError(
+ "Can't upgrade an ASYNC_READER transaction "
+ "to a WRITER mid-transaction")
+
+ def _reader(self):
+ if self.mode is None:
+ self.mode = _READER
+ elif self.mode is _ASYNC_READER:
+ raise TypeError(
+ "Can't upgrade an ASYNC_READER transaction "
+ "to a READER mid-transaction")
+
+ def _async_reader(self):
+ if self.mode is None:
+ self.mode = _ASYNC_READER
+
+
+class _TransactionContextTLocal(threading.local):
+ def __deepcopy__(self, memo):
+ return self
+
+
+class _TransactionContextManager(object):
+ """Provide context-management and decorator patterns for transactions.
+
+ This object integrates user-defined "context" objects with the
+ :class:`._TransactionContext` class, on behalf of a
+ contained :class:`._TransactionFactory`.
+
+ """
+
+ def __init__(
+ self, root=None,
+ mode=None,
+ independent=False,
+ savepoint=False,
+ connection=False,
+ replace_global_factory=None,
+ _is_global_manager=False):
+
+ if root is None:
+ self._root = self
+ self._root_factory = _TransactionFactory()
+ else:
+ self._root = root
+
+ self._replace_global_factory = replace_global_factory
+ self._is_global_manager = _is_global_manager
+ self._mode = mode
+ self._independent = independent
+ self._savepoint = savepoint
+ if self._savepoint and self._independent:
+ raise TypeError(
+ "setting savepoint and independent makes no sense.")
+ self._connection = connection
+
+ @property
+ def _factory(self):
+ """The :class:`._TransactionFactory` associated with this context."""
+ return self._root._root_factory
+
+ def configure(self, **kw):
+ """Apply configurational options to the factory.
+
+ This method can only be called before any specific
+ transaction-beginning methods have been called.
+
+
+ """
+ self._factory.configure(**kw)
+
+ @property
+ def replace(self):
+ """Modifier to replace the global transaction factory with this one."""
+ return self._clone(replace_global_factory=self._factory)
+
+ @property
+ def writer(self):
+ """Modifier to set the transaction to WRITER."""
+ return self._clone(mode=_WRITER)
+
+ @property
+ def reader(self):
+ """Modifier to set the transaction to READER."""
+ return self._clone(mode=_READER)
+
+ @property
+ def independent(self):
+ """Modifier to start a transaction independent from any enclosing."""
+ return self._clone(independent=True)
+
+ @property
+ def savepoint(self):
+ """Modifier to start a SAVEPOINT if a transaction already exists."""
+ return self._clone(savepoint=True)
+
+ @property
+ def connection(self):
+ """Modifier to return a core Connection object instead of Session."""
+ return self._clone(connection=True)
+
+ @property
+ def async(self):
+ """Modifier to set a READER operation to ASYNC_READER."""
+
+ if self._mode is _WRITER:
+ raise TypeError("Setting async on a WRITER makes no sense")
+ return self._clone(mode=_ASYNC_READER)
+
+ def using(self, context):
+ """Provide a context manager block that will use the given context."""
+ return self._transaction_scope(context)
+
+ def __call__(self, fn):
+ """Decorate a function."""
+
+ @functools.wraps(fn)
+ def wrapper(*args, **kwargs):
+ context = args[0]
+
+ with self._transaction_scope(context):
+ return fn(*args, **kwargs)
+
+ return wrapper
+
+ def _clone(self, **kw):
+ default_kw = {
+ "independent": self._independent,
+ "mode": self._mode,
+ "connection": self._connection
+ }
+ default_kw.update(kw)
+ return _TransactionContextManager(root=self._root, **default_kw)
+
+ @contextlib.contextmanager
+ def _transaction_scope(self, context):
+ new_transaction = self._independent
+ transaction_contexts_by_thread = \
+ _transaction_contexts_by_thread(context)
+
+ current = restore = getattr(
+ transaction_contexts_by_thread, "current", None)
+
+ use_factory = self._factory
+ global_factory = None
+
+ if self._replace_global_factory:
+ use_factory = global_factory = self._replace_global_factory
+ elif current is not None and current.global_factory:
+ global_factory = current.global_factory
+
+ if self._root._is_global_manager:
+ use_factory = global_factory
+
+ if current is not None and (
+ new_transaction or current.factory is not use_factory
+ ):
+ current = None
+
+ if current is None:
+ current = transaction_contexts_by_thread.current = \
+ _TransactionContext(
+ use_factory, global_factory=global_factory,
+ **use_factory._transaction_ctx_cfg)
+
+ try:
+ if self._mode is not None:
+ with current._produce_block(
+ mode=self._mode,
+ connection=self._connection,
+ savepoint=self._savepoint) as resource:
+ yield resource
+ else:
+ yield
+ finally:
+ if restore is None:
+ del transaction_contexts_by_thread.current
+ elif current is not restore:
+ transaction_contexts_by_thread.current = restore
+
+
+def _context_descriptor(attr=None):
+ getter = operator.attrgetter(attr)
+
+ def _property_for_context(context):
+ try:
+ transaction_context = context.transaction_ctx
+ except exception.NoEngineContextEstablished:
+ raise exception.NoEngineContextEstablished(
+ "No TransactionContext is established for "
+ "this %s object within the current thread; "
+ "the %r attribute is unavailable."
+ % (context, attr)
+ )
+ else:
+ return getter(transaction_context)
+ return property(_property_for_context)
+
+
+def _transaction_ctx_for_context(context):
+ by_thread = _transaction_contexts_by_thread(context)
+ try:
+ return by_thread.current
+ except AttributeError:
+ raise exception.NoEngineContextEstablished(
+ "No TransactionContext is established for "
+ "this %s object within the current thread. "
+ % context
+ )
+
+
+def _transaction_contexts_by_thread(context):
+ transaction_contexts_by_thread = getattr(
+ context, '_enginefacade_context', None)
+ if transaction_contexts_by_thread is None:
+ transaction_contexts_by_thread = \
+ context._enginefacade_context = _TransactionContextTLocal()
+
+ return transaction_contexts_by_thread
+
+
+def transaction_context_provider(klass):
+ """Decorate a class with ``session`` and ``connection`` attributes."""
+
+ setattr(
+ klass,
+ 'transaction_ctx',
+ property(_transaction_ctx_for_context))
+
+ # Graft transaction context attributes as context properties
+ for attr in ('session', 'connection', 'transaction'):
+ setattr(klass, attr, _context_descriptor(attr))
+
+ return klass
+
+
+# apply the context descriptors to oslo.context.RequestContext
+transaction_context_provider(oslo_context.RequestContext)
+
+
+_context_manager = _TransactionContextManager(_is_global_manager=True)
+"""default context manager."""
+
+
+def transaction_context():
+ """Construct a local transaction context.
+
+ """
+ return _TransactionContextManager()
+
+
+def configure(**kw):
+ """Apply configurational options to the global factory.
+
+ This method can only be called before any specific transaction-beginning
+ methods have been called.
+
+ .. seealso::
+
+ :meth:`._TransactionFactory.configure`
+
+ """
+ _context_manager._factory.configure(**kw)
+
+
+def get_legacy_facade():
+ """Return a :class:`.LegacyEngineFacade` for the global factory.
+
+ This facade will make use of the same engine and sessionmaker
+ as this factory, however will not share the same transaction context;
+ the legacy facade continues to work the old way of returning
+ a new Session each time get_session() is called.
+
+ """
+ return _context_manager._factory.get_legacy_facade()
+
+
+reader = _context_manager.reader
+"""The global 'reader' starting point."""
+
+
+writer = _context_manager.writer
+"""The global 'writer' starting point."""
+
+
+class LegacyEngineFacade(object):
+ """A helper class for removing of global engine instances from oslo.db.
+
+ .. deprecated::
+
+ EngineFacade is deprecated. Please use
+ oslo.db.sqlalchemy.enginefacade for new development.
+
+ As a library, oslo.db can't decide where to store/when to create engine
+ and sessionmaker instances, so this must be left for a target application.
+
+ On the other hand, in order to simplify the adoption of oslo.db changes,
+ we'll provide a helper class, which creates engine and sessionmaker
+ on its instantiation and provides get_engine()/get_session() methods
+ that are compatible with corresponding utility functions that currently
+ exist in target projects, e.g. in Nova.
+
+ engine/sessionmaker instances will still be global (and they are meant to
+ be global), but they will be stored in the app context, rather that in the
+ oslo.db context.
+
+ Two important things to remember:
+
+ 1. An Engine instance is effectively a pool of DB connections, so it's
+ meant to be shared (and it's thread-safe).
+ 2. A Session instance is not meant to be shared and represents a DB
+ transactional context (i.e. it's not thread-safe). sessionmaker is
+ a factory of sessions.
+
+ """
+ def __init__(self, sql_connection, slave_connection=None,
+ sqlite_fk=False, autocommit=True,
+ expire_on_commit=False, _conf=None, _factory=None, **kwargs):
+ """Initialize engine and sessionmaker instances.
+
+ :param sql_connection: the connection string for the database to use
+ :type sql_connection: string
+
+ :param slave_connection: the connection string for the 'slave' database
+ to use. If not provided, the master database
+ will be used for all operations. Note: this
+ is meant to be used for offloading of read
+ operations to asynchronously replicated slaves
+ to reduce the load on the master database.
+ :type slave_connection: string
+
+ :param sqlite_fk: enable foreign keys in SQLite
+ :type sqlite_fk: bool
+
+ :param autocommit: use autocommit mode for created Session instances
+ :type autocommit: bool
+
+ :param expire_on_commit: expire session objects on commit
+ :type expire_on_commit: bool
+
+ Keyword arguments:
+
+ :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
+ (defaults to TRADITIONAL)
+ :keyword idle_timeout: timeout before idle sql connections are reaped
+ (defaults to 3600)
+ :keyword connection_debug: verbosity of SQL debugging information.
+ -1=Off, 0=None, 100=Everything (defaults
+ to 0)
+ :keyword max_pool_size: maximum number of SQL connections to keep open
+ in a pool (defaults to SQLAlchemy settings)
+ :keyword max_overflow: if set, use this value for max_overflow with
+ sqlalchemy (defaults to SQLAlchemy settings)
+ :keyword pool_timeout: if set, use this value for pool_timeout with
+ sqlalchemy (defaults to SQLAlchemy settings)
+ :keyword sqlite_synchronous: if True, SQLite uses synchronous mode
+ (defaults to True)
+ :keyword connection_trace: add python stack traces to SQL as comment
+ strings (defaults to False)
+ :keyword max_retries: maximum db connection retries during startup.
+ (setting -1 implies an infinite retry count)
+ (defaults to 10)
+ :keyword retry_interval: interval between retries of opening a sql
+ connection (defaults to 10)
+ :keyword thread_checkin: boolean that indicates that between each
+ engine checkin event a sleep(0) will occur to
+ allow other greenthreads to run (defaults to
+ True)
+ """
+ warnings.warn(
+ "EngineFacade is deprecated; please use "
+ "oslo.db.sqlalchemy.enginefacade",
+ exception.OsloDBDeprecationWarning,
+ stacklevel=2)
+
+ if _factory:
+ self._factory = _factory
+ else:
+ self._factory = _TransactionFactory()
+
+ self._factory.configure(
+ sqlite_fk=sqlite_fk,
+ __autocommit=autocommit,
+ expire_on_commit=expire_on_commit,
+ **kwargs
+ )
+ # make sure passed-in urls are favored over that
+ # of config
+ self._factory._start(
+ _conf, connection=sql_connection,
+ slave_connection=slave_connection)
+
+ def get_engine(self, use_slave=False):
+ """Get the engine instance (note, that it's shared).
+
+ :param use_slave: if possible, use 'slave' database for this engine.
+ If the connection string for the slave database
+ wasn't provided, 'master' engine will be returned.
+ (defaults to False)
+ :type use_slave: bool
+
+ """
+ if use_slave:
+ return self._factory._reader_engine
+ else:
+ return self._factory._writer_engine
+
+ def get_session(self, use_slave=False, **kwargs):
+ """Get a Session instance.
+
+ :param use_slave: if possible, use 'slave' database connection for
+ this session. If the connection string for the
+ slave database wasn't provided, a session bound
+ to the 'master' engine will be returned.
+ (defaults to False)
+ :type use_slave: bool
+
+ Keyword arugments will be passed to a sessionmaker instance as is (if
+ passed, they will override the ones used when the sessionmaker instance
+ was created). See SQLAlchemy Session docs for details.
+
+ """
+ if use_slave:
+ return self._factory._reader_maker(**kwargs)
+ else:
+ return self._factory._writer_maker(**kwargs)
+
+ @classmethod
+ def from_config(cls, conf,
+ sqlite_fk=False, autocommit=True, expire_on_commit=False):
+ """Initialize EngineFacade using oslo.config config instance options.
+
+ :param conf: oslo.config config instance
+ :type conf: oslo.config.cfg.ConfigOpts
+
+ :param sqlite_fk: enable foreign keys in SQLite
+ :type sqlite_fk: bool
+
+ :param autocommit: use autocommit mode for created Session instances
+ :type autocommit: bool
+
+ :param expire_on_commit: expire session objects on commit
+ :type expire_on_commit: bool
+
+ """
+
+ return cls(
+ None,
+ sqlite_fk=sqlite_fk,
+ autocommit=autocommit,
+ expire_on_commit=expire_on_commit, _conf=conf)
diff --git a/oslo_db/sqlalchemy/engines.py b/oslo_db/sqlalchemy/engines.py
new file mode 100644
index 0000000..6e041a2
--- /dev/null
+++ b/oslo_db/sqlalchemy/engines.py
@@ -0,0 +1,413 @@
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Core SQLAlchemy connectivity routines.
+"""
+
+import itertools
+import logging
+import os
+import re
+import time
+
+import six
+import sqlalchemy
+from sqlalchemy import event
+from sqlalchemy import exc
+from sqlalchemy import pool
+from sqlalchemy.sql.expression import select
+
+from oslo_db._i18n import _LW
+from oslo_db import exception
+
+from oslo_db.sqlalchemy import exc_filters
+from oslo_db.sqlalchemy import utils
+
+LOG = logging.getLogger(__name__)
+
+
+def _thread_yield(dbapi_con, con_record):
+ """Ensure other greenthreads get a chance to be executed.
+
+ If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
+ execute instead of time.sleep(0).
+ Force a context switch. With common database backends (eg MySQLdb and
+ sqlite), there is no implicit yield caused by network I/O since they are
+ implemented by C libraries that eventlet cannot monkey patch.
+ """
+ time.sleep(0)
+
+
+def _connect_ping_listener(connection, branch):
+ """Ping the server at connection startup.
+
+ Ping the server at transaction begin and transparently reconnect
+ if a disconnect exception occurs.
+ """
+ if branch:
+ return
+
+ # turn off "close with result". This can also be accomplished
+ # by branching the connection, however just setting the flag is
+ # more performant and also doesn't get involved with some
+ # connection-invalidation awkardness that occurs (see
+ # https://bitbucket.org/zzzeek/sqlalchemy/issue/3215/)
+ save_should_close_with_result = connection.should_close_with_result
+ connection.should_close_with_result = False
+ try:
+ # run a SELECT 1. use a core select() so that
+ # any details like that needed by Oracle, DB2 etc. are handled.
+ connection.scalar(select([1]))
+ except exception.DBConnectionError:
+ # catch DBConnectionError, which is raised by the filter
+ # system.
+ # disconnect detected. The connection is now
+ # "invalid", but the pool should be ready to return
+ # new connections assuming they are good now.
+ # run the select again to re-validate the Connection.
+ connection.scalar(select([1]))
+ finally:
+ connection.should_close_with_result = save_should_close_with_result
+
+
+def _setup_logging(connection_debug=0):
+ """setup_logging function maps SQL debug level to Python log level.
+
+ Connection_debug is a verbosity of SQL debugging information.
+ 0=None(default value),
+ 1=Processed only messages with WARNING level or higher
+ 50=Processed only messages with INFO level or higher
+ 100=Processed only messages with DEBUG level
+ """
+ if connection_debug >= 0:
+ logger = logging.getLogger('sqlalchemy.engine')
+ if connection_debug >= 100:
+ logger.setLevel(logging.DEBUG)
+ elif connection_debug >= 50:
+ logger.setLevel(logging.INFO)
+ else:
+ logger.setLevel(logging.WARNING)
+
+
+def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
+ idle_timeout=3600,
+ connection_debug=0, max_pool_size=None, max_overflow=None,
+ pool_timeout=None, sqlite_synchronous=True,
+ connection_trace=False, max_retries=10, retry_interval=10,
+ thread_checkin=True, logging_name=None):
+ """Return a new SQLAlchemy engine."""
+
+ url = sqlalchemy.engine.url.make_url(sql_connection)
+
+ engine_args = {
+ "pool_recycle": idle_timeout,
+ 'convert_unicode': True,
+ 'connect_args': {},
+ 'logging_name': logging_name
+ }
+
+ _setup_logging(connection_debug)
+
+ _init_connection_args(
+ url, engine_args,
+ sqlite_fk=sqlite_fk,
+ max_pool_size=max_pool_size,
+ max_overflow=max_overflow,
+ pool_timeout=pool_timeout
+ )
+
+ engine = sqlalchemy.create_engine(url, **engine_args)
+
+ _init_events(
+ engine,
+ mysql_sql_mode=mysql_sql_mode,
+ sqlite_synchronous=sqlite_synchronous,
+ sqlite_fk=sqlite_fk,
+ thread_checkin=thread_checkin,
+ connection_trace=connection_trace
+ )
+
+ # register alternate exception handler
+ exc_filters.register_engine(engine)
+
+ # register engine connect handler
+ event.listen(engine, "engine_connect", _connect_ping_listener)
+
+ # initial connect + test
+ # NOTE(viktors): the current implementation of _test_connection()
+ # does nothing, if max_retries == 0, so we can skip it
+ if max_retries:
+ test_conn = _test_connection(engine, max_retries, retry_interval)
+ test_conn.close()
+
+ return engine
+
+
+@utils.dispatch_for_dialect('*', multiple=True)
+def _init_connection_args(
+ url, engine_args,
+ max_pool_size=None, max_overflow=None, pool_timeout=None, **kw):
+
+ pool_class = url.get_dialect().get_pool_class(url)
+ if issubclass(pool_class, pool.QueuePool):
+ if max_pool_size is not None:
+ engine_args['pool_size'] = max_pool_size
+ if max_overflow is not None:
+ engine_args['max_overflow'] = max_overflow
+ if pool_timeout is not None:
+ engine_args['pool_timeout'] = pool_timeout
+
+
+@_init_connection_args.dispatch_for("sqlite")
+def _init_connection_args(url, engine_args, **kw):
+ pool_class = url.get_dialect().get_pool_class(url)
+ # singletonthreadpool is used for :memory: connections;
+ # replace it with StaticPool.
+ if issubclass(pool_class, pool.SingletonThreadPool):
+ engine_args["poolclass"] = pool.StaticPool
+ engine_args['connect_args']['check_same_thread'] = False
+
+
+@_init_connection_args.dispatch_for("postgresql")
+def _init_connection_args(url, engine_args, **kw):
+ if 'client_encoding' not in url.query:
+ # Set encoding using engine_args instead of connect_args since
+ # it's supported for PostgreSQL 8.*. More details at:
+ # http://docs.sqlalchemy.org/en/rel_0_9/dialects/postgresql.html
+ engine_args['client_encoding'] = 'utf8'
+
+
+@_init_connection_args.dispatch_for("mysql")
+def _init_connection_args(url, engine_args, **kw):
+ if 'charset' not in url.query:
+ engine_args['connect_args']['charset'] = 'utf8'
+
+
+@_init_connection_args.dispatch_for("mysql+mysqlconnector")
+def _init_connection_args(url, engine_args, **kw):
+ # mysqlconnector engine (<1.0) incorrectly defaults to
+ # raise_on_warnings=True
+ # https://bitbucket.org/zzzeek/sqlalchemy/issue/2515
+ if 'raise_on_warnings' not in url.query:
+ engine_args['connect_args']['raise_on_warnings'] = False
+
+
+@_init_connection_args.dispatch_for("mysql+mysqldb")
+@_init_connection_args.dispatch_for("mysql+oursql")
+def _init_connection_args(url, engine_args, **kw):
+ # Those drivers require use_unicode=0 to avoid performance drop due
+ # to internal usage of Python unicode objects in the driver
+ # http://docs.sqlalchemy.org/en/rel_0_9/dialects/mysql.html
+ if 'use_unicode' not in url.query:
+ engine_args['connect_args']['use_unicode'] = 0
+
+
+@utils.dispatch_for_dialect('*', multiple=True)
+def _init_events(engine, thread_checkin=True, connection_trace=False, **kw):
+ """Set up event listeners for all database backends."""
+
+ _add_process_guards(engine)
+
+ if connection_trace:
+ _add_trace_comments(engine)
+
+ if thread_checkin:
+ sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
+
+
+@_init_events.dispatch_for("mysql")
+def _init_events(engine, mysql_sql_mode=None, **kw):
+ """Set up event listeners for MySQL."""
+
+ if mysql_sql_mode is not None:
+ @sqlalchemy.event.listens_for(engine, "connect")
+ def _set_session_sql_mode(dbapi_con, connection_rec):
+ cursor = dbapi_con.cursor()
+ cursor.execute("SET SESSION sql_mode = %s", [mysql_sql_mode])
+
+ @sqlalchemy.event.listens_for(engine, "first_connect")
+ def _check_effective_sql_mode(dbapi_con, connection_rec):
+ if mysql_sql_mode is not None:
+ _set_session_sql_mode(dbapi_con, connection_rec)
+
+ cursor = dbapi_con.cursor()
+ cursor.execute("SHOW VARIABLES LIKE 'sql_mode'")
+ realmode = cursor.fetchone()
+
+ if realmode is None:
+ LOG.warning(_LW('Unable to detect effective SQL mode'))
+ else:
+ realmode = realmode[1]
+ LOG.debug('MySQL server mode set to %s', realmode)
+ if 'TRADITIONAL' not in realmode.upper() and \
+ 'STRICT_ALL_TABLES' not in realmode.upper():
+ LOG.warning(
+ _LW(
+ "MySQL SQL mode is '%s', "
+ "consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
+ realmode)
+
+
+@_init_events.dispatch_for("sqlite")
+def _init_events(engine, sqlite_synchronous=True, sqlite_fk=False, **kw):
+ """Set up event listeners for SQLite.
+
+ This includes several settings made on connections as they are
+ created, as well as transactional control extensions.
+
+ """
+
+ def regexp(expr, item):
+ reg = re.compile(expr)
+ return reg.search(six.text_type(item)) is not None
+
+ @sqlalchemy.event.listens_for(engine, "connect")
+ def _sqlite_connect_events(dbapi_con, con_record):
+
+ # Add REGEXP functionality on SQLite connections
+ dbapi_con.create_function('regexp', 2, regexp)
+
+ if not sqlite_synchronous:
+ # Switch sqlite connections to non-synchronous mode
+ dbapi_con.execute("PRAGMA synchronous = OFF")
+
+ # Disable pysqlite's emitting of the BEGIN statement entirely.
+ # Also stops it from emitting COMMIT before any DDL.
+ # below, we emit BEGIN ourselves.
+ # see http://docs.sqlalchemy.org/en/rel_0_9/dialects/\
+ # sqlite.html#serializable-isolation-savepoints-transactional-ddl
+ dbapi_con.isolation_level = None
+
+ if sqlite_fk:
+ # Ensures that the foreign key constraints are enforced in SQLite.
+ dbapi_con.execute('pragma foreign_keys=ON')
+
+ @sqlalchemy.event.listens_for(engine, "begin")
+ def _sqlite_emit_begin(conn):
+ # emit our own BEGIN, checking for existing
+ # transactional state
+ if 'in_transaction' not in conn.info:
+ conn.execute("BEGIN")
+ conn.info['in_transaction'] = True
+
+ @sqlalchemy.event.listens_for(engine, "rollback")
+ @sqlalchemy.event.listens_for(engine, "commit")
+ def _sqlite_end_transaction(conn):
+ # remove transactional marker
+ conn.info.pop('in_transaction', None)
+
+
+def _test_connection(engine, max_retries, retry_interval):
+ if max_retries == -1:
+ attempts = itertools.count()
+ else:
+ attempts = six.moves.range(max_retries)
+ # See: http://legacy.python.org/dev/peps/pep-3110/#semantic-changes for
+ # why we are not using 'de' directly (it can be removed from the local
+ # scope).
+ de_ref = None
+ for attempt in attempts:
+ try:
+ return engine.connect()
+ except exception.DBConnectionError as de:
+ msg = _LW('SQL connection failed. %s attempts left.')
+ LOG.warning(msg, max_retries - attempt)
+ time.sleep(retry_interval)
+ de_ref = de
+ else:
+ if de_ref is not None:
+ six.reraise(type(de_ref), de_ref)
+
+
+def _add_process_guards(engine):
+ """Add multiprocessing guards.
+
+ Forces a connection to be reconnected if it is detected
+ as having been shared to a sub-process.
+
+ """
+
+ @sqlalchemy.event.listens_for(engine, "connect")
+ def connect(dbapi_connection, connection_record):
+ connection_record.info['pid'] = os.getpid()
+
+ @sqlalchemy.event.listens_for(engine, "checkout")
+ def checkout(dbapi_connection, connection_record, connection_proxy):
+ pid = os.getpid()
+ if connection_record.info['pid'] != pid:
+ LOG.debug(_LW(
+ "Parent process %(orig)s forked (%(newproc)s) with an open "
+ "database connection, "
+ "which is being discarded and recreated."),
+ {"newproc": pid, "orig": connection_record.info['pid']})
+ connection_record.connection = connection_proxy.connection = None
+ raise exc.DisconnectionError(
+ "Connection record belongs to pid %s, "
+ "attempting to check out in pid %s" %
+ (connection_record.info['pid'], pid)
+ )
+
+
+def _add_trace_comments(engine):
+ """Add trace comments.
+
+ Augment statements with a trace of the immediate calling code
+ for a given statement.
+ """
+
+ import os
+ import sys
+ import traceback
+ target_paths = set([
+ os.path.dirname(sys.modules['oslo_db'].__file__),
+ os.path.dirname(sys.modules['sqlalchemy'].__file__)
+ ])
+ skip_paths = set([
+ os.path.dirname(sys.modules['oslo_db.tests'].__file__),
+ ])
+
+ @sqlalchemy.event.listens_for(engine, "before_cursor_execute", retval=True)
+ def before_cursor_execute(conn, cursor, statement, parameters, context,
+ executemany):
+
+ # NOTE(zzzeek) - if different steps per DB dialect are desirable
+ # here, switch out on engine.name for now.
+ stack = traceback.extract_stack()
+ our_line = None
+
+ for idx, (filename, line, method, function) in enumerate(stack):
+ for tgt in skip_paths:
+ if filename.startswith(tgt):
+ break
+ else:
+ for tgt in target_paths:
+ if filename.startswith(tgt):
+ our_line = idx
+ break
+ if our_line:
+ break
+
+ if our_line:
+ trace = "; ".join(
+ "File: %s (%s) %s" % (
+ line[0], line[1], line[2]
+ )
+ # include three lines of context.
+ for line in stack[our_line - 3:our_line]
+
+ )
+ statement = "%s -- %s" % (statement, trace)
+
+ return statement, parameters
diff --git a/oslo_db/sqlalchemy/orm.py b/oslo_db/sqlalchemy/orm.py
new file mode 100644
index 0000000..decd8c8
--- /dev/null
+++ b/oslo_db/sqlalchemy/orm.py
@@ -0,0 +1,66 @@
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""SQLAlchemy ORM connectivity and query structures.
+"""
+
+from oslo_utils import timeutils
+import sqlalchemy.orm
+from sqlalchemy.sql.expression import literal_column
+
+from oslo_db.sqlalchemy import update_match
+
+
+class Query(sqlalchemy.orm.query.Query):
+ """Subclass of sqlalchemy.query with soft_delete() method."""
+ def soft_delete(self, synchronize_session='evaluate'):
+ return self.update({'deleted': literal_column('id'),
+ 'updated_at': literal_column('updated_at'),
+ 'deleted_at': timeutils.utcnow()},
+ synchronize_session=synchronize_session)
+
+ def update_returning_pk(self, values, surrogate_key):
+ """Perform an UPDATE, returning the primary key of the matched row.
+
+ This is a method-version of
+ oslo_db.sqlalchemy.update_match.update_returning_pk(); see that
+ function for usage details.
+
+ """
+ return update_match.update_returning_pk(self, values, surrogate_key)
+
+ def update_on_match(self, specimen, surrogate_key, values, **kw):
+ """Emit an UPDATE statement matching the given specimen.
+
+ This is a method-version of
+ oslo_db.sqlalchemy.update_match.update_on_match(); see that function
+ for usage details.
+
+ """
+ return update_match.update_on_match(
+ self, specimen, surrogate_key, values, **kw)
+
+
+class Session(sqlalchemy.orm.session.Session):
+ """oslo.db-specific Session subclass."""
+
+
+def get_maker(engine, autocommit=True, expire_on_commit=False):
+ """Return a SQLAlchemy sessionmaker using the given engine."""
+ return sqlalchemy.orm.sessionmaker(bind=engine,
+ class_=Session,
+ autocommit=autocommit,
+ expire_on_commit=expire_on_commit,
+ query_cls=Query)
diff --git a/oslo_db/sqlalchemy/session.py b/oslo_db/sqlalchemy/session.py
index 0dae851..9155176 100644
--- a/oslo_db/sqlalchemy/session.py
+++ b/oslo_db/sqlalchemy/session.py
@@ -18,96 +18,71 @@
Recommended ways to use sessions within this framework:
-* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
- `model_query()` will implicitly use a session when called without one
- supplied. This is the ideal situation because it will allow queries
- to be automatically retried if the database connection is interrupted.
-
- .. note:: Automatic retry will be enabled in a future patch.
-
- It is generally fine to issue several queries in a row like this. Even though
- they may be run in separate transactions and/or separate sessions, each one
- will see the data from the prior calls. If needed, undo- or rollback-like
- functionality should be handled at a logical level. For an example, look at
- the code around quotas and `reservation_rollback()`.
-
- Examples:
+* Use the ``enginefacade`` system for connectivity, session and
+ transaction management:
.. code-block:: python
+ from oslo.db.sqlalchemy import enginefacade
+
+ @enginefacade.reader
def get_foo(context, foo):
- return (model_query(context, models.Foo).
+ return (model_query(models.Foo, context.session).
filter_by(foo=foo).
first())
+ @enginefacade.writer
def update_foo(context, id, newfoo):
- (model_query(context, models.Foo).
+ (model_query(models.Foo, context.session).
filter_by(id=id).
update({'foo': newfoo}))
+ @enginefacade.writer
def create_foo(context, values):
foo_ref = models.Foo()
foo_ref.update(values)
- foo_ref.save()
+ foo_ref.save(context.session)
return foo_ref
-
-* Within the scope of a single method, keep all the reads and writes within
- the context managed by a single session. In this way, the session's
- `__exit__` handler will take care of calling `flush()` and `commit()` for
- you. If using this approach, you should not explicitly call `flush()` or
- `commit()`. Any error within the context of the session will cause the
- session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be
- raised in `session`'s `__exit__` handler, and any try/except within the
- context managed by `session` will not be triggered. And catching other
- non-database errors in the session will not trigger the ROLLBACK, so
- exception handlers should always be outside the session, unless the
- developer wants to do a partial commit on purpose. If the connection is
- dropped before this is possible, the database will implicitly roll back the
- transaction.
+ In the above system, transactions are committed automatically, and
+ are shared among all dependent database methods. Ensure
+ that methods which "write" data are enclosed within @writer blocks.
.. note:: Statements in the session scope will not be automatically retried.
- If you create models within the session, they need to be added, but you
+* If you create models within the session, they need to be added, but you
do not need to call `model.save()`:
.. code-block:: python
+ @enginefacade.writer
def create_many_foo(context, foos):
- session = sessionmaker()
- with session.begin():
- for foo in foos:
- foo_ref = models.Foo()
- foo_ref.update(foo)
- session.add(foo_ref)
+ for foo in foos:
+ foo_ref = models.Foo()
+ foo_ref.update(foo)
+ context.session.add(foo_ref)
+ @enginefacade.writer
def update_bar(context, foo_id, newbar):
- session = sessionmaker()
- with session.begin():
- foo_ref = (model_query(context, models.Foo, session).
- filter_by(id=foo_id).
- first())
- (model_query(context, models.Bar, session).
- filter_by(id=foo_ref['bar_id']).
- update({'bar': newbar}))
-
- .. note:: `update_bar` is a trivially simple example of using
- ``with session.begin``. Whereas `create_many_foo` is a good example of
- when a transaction is needed, it is always best to use as few queries as
- possible.
-
- The two queries in `update_bar` can be better expressed using a single query
- which avoids the need for an explicit transaction. It can be expressed like
- so:
+ foo_ref = (model_query(models.Foo, context.session).
+ filter_by(id=foo_id).
+ first())
+ (model_query(models.Bar, context.session).
+ filter_by(id=foo_ref['bar_id']).
+ update({'bar': newbar}))
+
+ The two queries in `update_bar` can alternatively be expressed using
+ a single query, which may be more efficient depending on scenario:
.. code-block:: python
+ @enginefacade.writer
def update_bar(context, foo_id, newbar):
- subq = (model_query(context, models.Foo.id).
+ subq = (model_query(models.Foo.id, context.session).
filter_by(id=foo_id).
limit(1).
subquery())
- (model_query(context, models.Bar).
+ (model_query(models.Bar, context.session).
filter_by(id=subq.as_scalar()).
update({'bar': newbar}))
@@ -119,87 +94,54 @@ Recommended ways to use sessions within this framework:
WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
.. note:: `create_duplicate_foo` is a trivially simple example of catching an
- exception while using ``with session.begin``. Here create two duplicate
+ exception while using a savepoint. Here we create two duplicate
instances with same primary key, must catch the exception out of context
managed by a single session:
.. code-block:: python
+ @enginefacade.writer
def create_duplicate_foo(context):
foo1 = models.Foo()
foo2 = models.Foo()
foo1.id = foo2.id = 1
- session = sessionmaker()
try:
- with session.begin():
+ with context.session.begin_nested():
session.add(foo1)
session.add(foo2)
except exception.DBDuplicateEntry as e:
handle_error(e)
-* Passing an active session between methods. Sessions should only be passed
- to private methods. The private method must use a subtransaction; otherwise
- SQLAlchemy will throw an error when you call `session.begin()` on an existing
- transaction. Public methods should not accept a session parameter and should
- not be involved in sessions within the caller's scope.
-
- Note that this incurs more overhead in SQLAlchemy than the above means
- due to nesting transactions, and it is not possible to implicitly retry
- failed database operations when using this approach.
-
- This also makes code somewhat more difficult to read and debug, because a
- single database transaction spans more than one method. Error handling
- becomes less clear in this situation. When this is needed for code clarity,
- it should be clearly documented.
+* The enginefacade system eliminates the need to decide when sessions need
+ to be passed between methods. All methods should instead share a common
+ context object; the enginefacade system will maintain the transaction
+ across method calls.
.. code-block:: python
- def myfunc(foo):
- session = sessionmaker()
- with session.begin():
- # do some database things
- bar = _private_func(foo, session)
+ @enginefacade.writer
+ def myfunc(context, foo):
+ # do some database things
+ bar = _private_func(context, foo)
return bar
- def _private_func(foo, session=None):
- if not session:
- session = sessionmaker()
- with session.begin(subtransaction=True):
+ def _private_func(context, foo):
+ with enginefacade.using_writer(context) as session:
# do some other database things
+ session.add(SomeObject())
return bar
-There are some things which it is best to avoid:
-
-* Don't keep a transaction open any longer than necessary.
-
- This means that your ``with session.begin()`` block should be as short
- as possible, while still containing all the related calls for that
- transaction.
-
* Avoid ``with_lockmode('UPDATE')`` when possible.
- In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match
- any rows, it will take a gap-lock. This is a form of write-lock on the
- "gap" where no rows exist, and prevents any other writes to that space.
- This can effectively prevent any INSERT into a table by locking the gap
- at the end of the index. Similar problems will occur if the SELECT FOR UPDATE
- has an overly broad WHERE clause, or doesn't properly use an index.
-
- One idea proposed at ODS Fall '12 was to use a normal SELECT to test the
- number of rows matching a query, and if only one row is returned,
- then issue the SELECT FOR UPDATE.
-
- The better long-term solution is to use
- ``INSERT .. ON DUPLICATE KEY UPDATE``.
- However, this can not be done until the "deleted" columns are removed and
- proper UNIQUE constraints are added to the tables.
-
+ FOR UPDATE is not compatible with MySQL/Galera. Instead, an "opportunistic"
+ approach should be used, such that if an UPDATE fails, the entire
+ transaction should be retried. The @wrap_db_retry decorator is one
+ such system that can be used to achieve this.
Enabling soft deletes:
-* To use/enable soft-deletes, the `SoftDeleteMixin` must be added
- to your model class. For example:
+* To use/enable soft-deletes, `SoftDeleteMixin` may be used. For example:
.. code-block:: python
@@ -209,698 +151,46 @@ Enabling soft deletes:
Efficient use of soft deletes:
-* There are two possible ways to mark a record as deleted:
- `model.soft_delete()` and `query.soft_delete()`.
-
- The `model.soft_delete()` method works with a single already-fetched entry.
- `query.soft_delete()` makes only one db request for all entries that
- correspond to the query.
-
-* In almost all cases you should use `query.soft_delete()`. Some examples:
+* While there is a ``model.soft_delete()`` method, prefer
+ ``query.soft_delete()``. Some examples:
.. code-block:: python
- def soft_delete_bar():
- count = model_query(BarModel).find(some_condition).soft_delete()
+ @enginefacade.writer
+ def soft_delete_bar(context):
+ # synchronize_session=False will prevent the ORM from attempting
+ # to search the Session for instances matching the DELETE;
+ # this is typically not necessary for small operations.
+ count = model_query(BarModel, context.session).\\
+ find(some_condition).soft_delete(synchronize_session=False)
if count == 0:
raise Exception("0 entries were soft deleted")
- def complex_soft_delete_with_synchronization_bar(session=None):
- if session is None:
- session = sessionmaker()
- with session.begin(subtransactions=True):
- count = (model_query(BarModel).
- find(some_condition).
- soft_delete(synchronize_session=True))
- # Here synchronize_session is required, because we
- # don't know what is going on in outer session.
- if count == 0:
- raise Exception("0 entries were soft deleted")
-
-* There is only one situation where `model.soft_delete()` is appropriate: when
- you fetch a single record, work with it, and mark it as deleted in the same
- transaction.
-
- .. code-block:: python
-
- def soft_delete_bar_model():
- session = sessionmaker()
- with session.begin():
- bar_ref = model_query(BarModel).find(some_condition).first()
- # Work with bar_ref
- bar_ref.soft_delete(session=session)
-
- However, if you need to work with all entries that correspond to query and
- then soft delete them you should use the `query.soft_delete()` method:
+ @enginefacade.writer
+ def complex_soft_delete_with_synchronization_bar(context):
+ # use synchronize_session='evaluate' when you'd like to attempt
+ # to update the state of the Session to match that of the DELETE.
+ # This is potentially helpful if the operation is complex and
+ # continues to work with instances that were loaded, though
+ # not usually needed.
+ count = (model_query(BarModel, context.session).
+ find(some_condition).
+ soft_delete(synchronize_session='evaulate'))
+ if count == 0:
+ raise Exception("0 entries were soft deleted")
- .. code-block:: python
- def soft_delete_multi_models():
- session = sessionmaker()
- with session.begin():
- query = (model_query(BarModel, session=session).
- find(some_condition))
- model_refs = query.all()
- # Work with model_refs
- query.soft_delete(synchronize_session=False)
- # synchronize_session=False should be set if there is no outer
- # session and these entries are not used after this.
-
- When working with many rows, it is very important to use query.soft_delete,
- which issues a single query. Using `model.soft_delete()`, as in the following
- example, is very inefficient.
+"""
- .. code-block:: python
+from oslo_db.sqlalchemy import enginefacade
+from oslo_db.sqlalchemy import engines
+from oslo_db.sqlalchemy import orm
- for bar_ref in bar_refs:
- bar_ref.soft_delete(session=session)
- # This will produce count(bar_refs) db requests.
+EngineFacade = enginefacade.LegacyEngineFacade
+create_engine = engines.create_engine
+get_maker = orm.get_maker
+Query = orm.Query
+Session = orm.Session
-"""
-import itertools
-import logging
-import os
-import re
-import time
-
-from oslo_utils import timeutils
-import six
-from sqlalchemy import event
-from sqlalchemy import exc
-import sqlalchemy.orm
-from sqlalchemy import pool
-from sqlalchemy.sql.expression import literal_column
-from sqlalchemy.sql.expression import select
-
-from oslo_db._i18n import _LW
-from oslo_db import exception
-from oslo_db import options
-from oslo_db.sqlalchemy import exc_filters
-from oslo_db.sqlalchemy import update_match
-from oslo_db.sqlalchemy import utils
-
-LOG = logging.getLogger(__name__)
-
-
-def _thread_yield(dbapi_con, con_record):
- """Ensure other greenthreads get a chance to be executed.
-
- If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
- execute instead of time.sleep(0).
- Force a context switch. With common database backends (eg MySQLdb and
- sqlite), there is no implicit yield caused by network I/O since they are
- implemented by C libraries that eventlet cannot monkey patch.
- """
- time.sleep(0)
-
-
-def _connect_ping_listener(connection, branch):
- """Ping the server at connection startup.
-
- Ping the server at transaction begin and transparently reconnect
- if a disconnect exception occurs.
- """
- if branch:
- return
-
- # turn off "close with result". This can also be accomplished
- # by branching the connection, however just setting the flag is
- # more performant and also doesn't get involved with some
- # connection-invalidation awkardness that occurs (see
- # https://bitbucket.org/zzzeek/sqlalchemy/issue/3215/)
- save_should_close_with_result = connection.should_close_with_result
- connection.should_close_with_result = False
- try:
- # run a SELECT 1. use a core select() so that
- # any details like that needed by Oracle, DB2 etc. are handled.
- connection.scalar(select([1]))
- except exception.DBConnectionError:
- # catch DBConnectionError, which is raised by the filter
- # system.
- # disconnect detected. The connection is now
- # "invalid", but the pool should be ready to return
- # new connections assuming they are good now.
- # run the select again to re-validate the Connection.
- connection.scalar(select([1]))
- finally:
- connection.should_close_with_result = save_should_close_with_result
-
-
-def _setup_logging(connection_debug=0):
- """setup_logging function maps SQL debug level to Python log level.
-
- Connection_debug is a verbosity of SQL debugging information.
- 0=None(default value),
- 1=Processed only messages with WARNING level or higher
- 50=Processed only messages with INFO level or higher
- 100=Processed only messages with DEBUG level
- """
- if connection_debug >= 0:
- logger = logging.getLogger('sqlalchemy.engine')
- if connection_debug >= 100:
- logger.setLevel(logging.DEBUG)
- elif connection_debug >= 50:
- logger.setLevel(logging.INFO)
- else:
- logger.setLevel(logging.WARNING)
-
-
-def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
- idle_timeout=3600,
- connection_debug=0, max_pool_size=None, max_overflow=None,
- pool_timeout=None, sqlite_synchronous=True,
- connection_trace=False, max_retries=10, retry_interval=10,
- thread_checkin=True, logging_name=None):
- """Return a new SQLAlchemy engine."""
-
- url = sqlalchemy.engine.url.make_url(sql_connection)
-
- engine_args = {
- "pool_recycle": idle_timeout,
- 'convert_unicode': True,
- 'connect_args': {},
- 'logging_name': logging_name
- }
-
- _setup_logging(connection_debug)
-
- _init_connection_args(
- url, engine_args,
- sqlite_fk=sqlite_fk,
- max_pool_size=max_pool_size,
- max_overflow=max_overflow,
- pool_timeout=pool_timeout
- )
-
- engine = sqlalchemy.create_engine(url, **engine_args)
-
- _init_events(
- engine,
- mysql_sql_mode=mysql_sql_mode,
- sqlite_synchronous=sqlite_synchronous,
- sqlite_fk=sqlite_fk,
- thread_checkin=thread_checkin,
- connection_trace=connection_trace
- )
-
- # register alternate exception handler
- exc_filters.register_engine(engine)
-
- # register engine connect handler
- event.listen(engine, "engine_connect", _connect_ping_listener)
-
- # initial connect + test
- # NOTE(viktors): the current implementation of _test_connection()
- # does nothing, if max_retries == 0, so we can skip it
- if max_retries:
- test_conn = _test_connection(engine, max_retries, retry_interval)
- test_conn.close()
-
- return engine
-
-
-@utils.dispatch_for_dialect('*', multiple=True)
-def _init_connection_args(
- url, engine_args,
- max_pool_size=None, max_overflow=None, pool_timeout=None, **kw):
-
- pool_class = url.get_dialect().get_pool_class(url)
- if issubclass(pool_class, pool.QueuePool):
- if max_pool_size is not None:
- engine_args['pool_size'] = max_pool_size
- if max_overflow is not None:
- engine_args['max_overflow'] = max_overflow
- if pool_timeout is not None:
- engine_args['pool_timeout'] = pool_timeout
-
-
-@_init_connection_args.dispatch_for("sqlite")
-def _init_connection_args(url, engine_args, **kw):
- pool_class = url.get_dialect().get_pool_class(url)
- # singletonthreadpool is used for :memory: connections;
- # replace it with StaticPool.
- if issubclass(pool_class, pool.SingletonThreadPool):
- engine_args["poolclass"] = pool.StaticPool
- engine_args['connect_args']['check_same_thread'] = False
-
-
-@_init_connection_args.dispatch_for("postgresql")
-def _init_connection_args(url, engine_args, **kw):
- if 'client_encoding' not in url.query:
- # Set encoding using engine_args instead of connect_args since
- # it's supported for PostgreSQL 8.*. More details at:
- # http://docs.sqlalchemy.org/en/rel_0_9/dialects/postgresql.html
- engine_args['client_encoding'] = 'utf8'
-
-
-@_init_connection_args.dispatch_for("mysql")
-def _init_connection_args(url, engine_args, **kw):
- if 'charset' not in url.query:
- engine_args['connect_args']['charset'] = 'utf8'
-
-
-@_init_connection_args.dispatch_for("mysql+mysqlconnector")
-def _init_connection_args(url, engine_args, **kw):
- # mysqlconnector engine (<1.0) incorrectly defaults to
- # raise_on_warnings=True
- # https://bitbucket.org/zzzeek/sqlalchemy/issue/2515
- if 'raise_on_warnings' not in url.query:
- engine_args['connect_args']['raise_on_warnings'] = False
-
-
-@_init_connection_args.dispatch_for("mysql+mysqldb")
-@_init_connection_args.dispatch_for("mysql+oursql")
-def _init_connection_args(url, engine_args, **kw):
- # Those drivers require use_unicode=0 to avoid performance drop due
- # to internal usage of Python unicode objects in the driver
- # http://docs.sqlalchemy.org/en/rel_0_9/dialects/mysql.html
- if 'use_unicode' not in url.query:
- engine_args['connect_args']['use_unicode'] = 0
-
-
-@utils.dispatch_for_dialect('*', multiple=True)
-def _init_events(engine, thread_checkin=True, connection_trace=False, **kw):
- """Set up event listeners for all database backends."""
-
- _add_process_guards(engine)
-
- if connection_trace:
- _add_trace_comments(engine)
-
- if thread_checkin:
- sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
-
-
-@_init_events.dispatch_for("mysql")
-def _init_events(engine, mysql_sql_mode=None, **kw):
- """Set up event listeners for MySQL."""
-
- if mysql_sql_mode is not None:
- @sqlalchemy.event.listens_for(engine, "connect")
- def _set_session_sql_mode(dbapi_con, connection_rec):
- cursor = dbapi_con.cursor()
- cursor.execute("SET SESSION sql_mode = %s", [mysql_sql_mode])
-
- @sqlalchemy.event.listens_for(engine, "first_connect")
- def _check_effective_sql_mode(dbapi_con, connection_rec):
- if mysql_sql_mode is not None:
- _set_session_sql_mode(dbapi_con, connection_rec)
-
- cursor = dbapi_con.cursor()
- cursor.execute("SHOW VARIABLES LIKE 'sql_mode'")
- realmode = cursor.fetchone()
-
- if realmode is None:
- LOG.warning(_LW('Unable to detect effective SQL mode'))
- else:
- realmode = realmode[1]
- LOG.debug('MySQL server mode set to %s', realmode)
- if 'TRADITIONAL' not in realmode.upper() and \
- 'STRICT_ALL_TABLES' not in realmode.upper():
- LOG.warning(
- _LW(
- "MySQL SQL mode is '%s', "
- "consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
- realmode)
-
-
-@_init_events.dispatch_for("sqlite")
-def _init_events(engine, sqlite_synchronous=True, sqlite_fk=False, **kw):
- """Set up event listeners for SQLite.
-
- This includes several settings made on connections as they are
- created, as well as transactional control extensions.
-
- """
-
- def regexp(expr, item):
- reg = re.compile(expr)
- return reg.search(six.text_type(item)) is not None
-
- @sqlalchemy.event.listens_for(engine, "connect")
- def _sqlite_connect_events(dbapi_con, con_record):
-
- # Add REGEXP functionality on SQLite connections
- dbapi_con.create_function('regexp', 2, regexp)
-
- if not sqlite_synchronous:
- # Switch sqlite connections to non-synchronous mode
- dbapi_con.execute("PRAGMA synchronous = OFF")
-
- # Disable pysqlite's emitting of the BEGIN statement entirely.
- # Also stops it from emitting COMMIT before any DDL.
- # below, we emit BEGIN ourselves.
- # see http://docs.sqlalchemy.org/en/rel_0_9/dialects/\
- # sqlite.html#serializable-isolation-savepoints-transactional-ddl
- dbapi_con.isolation_level = None
-
- if sqlite_fk:
- # Ensures that the foreign key constraints are enforced in SQLite.
- dbapi_con.execute('pragma foreign_keys=ON')
-
- @sqlalchemy.event.listens_for(engine, "begin")
- def _sqlite_emit_begin(conn):
- # emit our own BEGIN, checking for existing
- # transactional state
- if 'in_transaction' not in conn.info:
- conn.execute("BEGIN")
- conn.info['in_transaction'] = True
-
- @sqlalchemy.event.listens_for(engine, "rollback")
- @sqlalchemy.event.listens_for(engine, "commit")
- def _sqlite_end_transaction(conn):
- # remove transactional marker
- conn.info.pop('in_transaction', None)
-
-
-def _test_connection(engine, max_retries, retry_interval):
- if max_retries == -1:
- attempts = itertools.count()
- else:
- attempts = six.moves.range(max_retries)
- # See: http://legacy.python.org/dev/peps/pep-3110/#semantic-changes for
- # why we are not using 'de' directly (it can be removed from the local
- # scope).
- de_ref = None
- for attempt in attempts:
- try:
- return engine.connect()
- except exception.DBConnectionError as de:
- msg = _LW('SQL connection failed. %s attempts left.')
- LOG.warning(msg, max_retries - attempt)
- time.sleep(retry_interval)
- de_ref = de
- else:
- if de_ref is not None:
- six.reraise(type(de_ref), de_ref)
-
-
-class Query(sqlalchemy.orm.query.Query):
- """Subclass of sqlalchemy.query with soft_delete() method."""
- def soft_delete(self, synchronize_session='evaluate'):
- return self.update({'deleted': literal_column('id'),
- 'updated_at': literal_column('updated_at'),
- 'deleted_at': timeutils.utcnow()},
- synchronize_session=synchronize_session)
-
- def update_returning_pk(self, values, surrogate_key):
- """Perform an UPDATE, returning the primary key of the matched row.
-
- This is a method-version of
- oslo_db.sqlalchemy.update_match.update_returning_pk(); see that
- function for usage details.
-
- """
- return update_match.update_returning_pk(self, values, surrogate_key)
-
- def update_on_match(self, specimen, surrogate_key, values, **kw):
- """Emit an UPDATE statement matching the given specimen.
-
- This is a method-version of
- oslo_db.sqlalchemy.update_match.update_on_match(); see that function
- for usage details.
-
- """
- return update_match.update_on_match(
- self, specimen, surrogate_key, values, **kw)
-
-
-class Session(sqlalchemy.orm.session.Session):
- """Custom Session class to avoid SqlAlchemy Session monkey patching."""
-
-
-def get_maker(engine, autocommit=True, expire_on_commit=False):
- """Return a SQLAlchemy sessionmaker using the given engine."""
- return sqlalchemy.orm.sessionmaker(bind=engine,
- class_=Session,
- autocommit=autocommit,
- expire_on_commit=expire_on_commit,
- query_cls=Query)
-
-
-def _add_process_guards(engine):
- """Add multiprocessing guards.
-
- Forces a connection to be reconnected if it is detected
- as having been shared to a sub-process.
-
- """
-
- @sqlalchemy.event.listens_for(engine, "connect")
- def connect(dbapi_connection, connection_record):
- connection_record.info['pid'] = os.getpid()
-
- @sqlalchemy.event.listens_for(engine, "checkout")
- def checkout(dbapi_connection, connection_record, connection_proxy):
- pid = os.getpid()
- if connection_record.info['pid'] != pid:
- LOG.debug(_LW(
- "Parent process %(orig)s forked (%(newproc)s) with an open "
- "database connection, "
- "which is being discarded and recreated."),
- {"newproc": pid, "orig": connection_record.info['pid']})
- connection_record.connection = connection_proxy.connection = None
- raise exc.DisconnectionError(
- "Connection record belongs to pid %s, "
- "attempting to check out in pid %s" %
- (connection_record.info['pid'], pid)
- )
-
-
-def _add_trace_comments(engine):
- """Add trace comments.
-
- Augment statements with a trace of the immediate calling code
- for a given statement.
- """
-
- import os
- import sys
- import traceback
- target_paths = set([
- os.path.dirname(sys.modules['oslo_db'].__file__),
- os.path.dirname(sys.modules['sqlalchemy'].__file__)
- ])
-
- @sqlalchemy.event.listens_for(engine, "before_cursor_execute", retval=True)
- def before_cursor_execute(conn, cursor, statement, parameters, context,
- executemany):
-
- # NOTE(zzzeek) - if different steps per DB dialect are desirable
- # here, switch out on engine.name for now.
- stack = traceback.extract_stack()
- our_line = None
- for idx, (filename, line, method, function) in enumerate(stack):
- for tgt in target_paths:
- if filename.startswith(tgt):
- our_line = idx
- break
- if our_line:
- break
-
- if our_line:
- trace = "; ".join(
- "File: %s (%s) %s" % (
- line[0], line[1], line[2]
- )
- # include three lines of context.
- for line in stack[our_line - 3:our_line]
-
- )
- statement = "%s -- %s" % (statement, trace)
-
- return statement, parameters
-
-
-class EngineFacade(object):
- """A helper class for removing of global engine instances from oslo.db.
-
- As a library, oslo.db can't decide where to store/when to create engine
- and sessionmaker instances, so this must be left for a target application.
-
- On the other hand, in order to simplify the adoption of oslo.db changes,
- we'll provide a helper class, which creates engine and sessionmaker
- on its instantiation and provides get_engine()/get_session() methods
- that are compatible with corresponding utility functions that currently
- exist in target projects, e.g. in Nova.
-
- engine/sessionmaker instances will still be global (and they are meant to
- be global), but they will be stored in the app context, rather that in the
- oslo.db context.
-
- Note: using of this helper is completely optional and you are encouraged to
- integrate engine/sessionmaker instances into your apps any way you like
- (e.g. one might want to bind a session to a request context). Two important
- things to remember:
-
- 1. An Engine instance is effectively a pool of DB connections, so it's
- meant to be shared (and it's thread-safe).
- 2. A Session instance is not meant to be shared and represents a DB
- transactional context (i.e. it's not thread-safe). sessionmaker is
- a factory of sessions.
-
- """
-
- def __init__(self, sql_connection, slave_connection=None,
- sqlite_fk=False, autocommit=True,
- expire_on_commit=False, **kwargs):
- """Initialize engine and sessionmaker instances.
-
- :param sql_connection: the connection string for the database to use
- :type sql_connection: string
-
- :param slave_connection: the connection string for the 'slave' database
- to use. If not provided, the master database
- will be used for all operations. Note: this
- is meant to be used for offloading of read
- operations to asynchronously replicated slaves
- to reduce the load on the master database.
- :type slave_connection: string
-
- :param sqlite_fk: enable foreign keys in SQLite
- :type sqlite_fk: bool
-
- :param autocommit: use autocommit mode for created Session instances
- :type autocommit: bool
-
- :param expire_on_commit: expire session objects on commit
- :type expire_on_commit: bool
-
- Keyword arguments:
-
- :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
- (defaults to TRADITIONAL)
- :keyword idle_timeout: timeout before idle sql connections are reaped
- (defaults to 3600)
- :keyword connection_debug: verbosity of SQL debugging information.
- -1=Off, 0=None, 100=Everything (defaults
- to 0)
- :keyword max_pool_size: maximum number of SQL connections to keep open
- in a pool (defaults to SQLAlchemy settings)
- :keyword max_overflow: if set, use this value for max_overflow with
- sqlalchemy (defaults to SQLAlchemy settings)
- :keyword pool_timeout: if set, use this value for pool_timeout with
- sqlalchemy (defaults to SQLAlchemy settings)
- :keyword sqlite_synchronous: if True, SQLite uses synchronous mode
- (defaults to True)
- :keyword connection_trace: add python stack traces to SQL as comment
- strings (defaults to False)
- :keyword max_retries: maximum db connection retries during startup.
- (setting -1 implies an infinite retry count)
- (defaults to 10)
- :keyword retry_interval: interval between retries of opening a sql
- connection (defaults to 10)
- :keyword thread_checkin: boolean that indicates that between each
- engine checkin event a sleep(0) will occur to
- allow other greenthreads to run (defaults to
- True)
- """
-
- super(EngineFacade, self).__init__()
-
- engine_kwargs = {
- 'sqlite_fk': sqlite_fk,
- 'mysql_sql_mode': kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
- 'idle_timeout': kwargs.get('idle_timeout', 3600),
- 'connection_debug': kwargs.get('connection_debug', 0),
- 'max_pool_size': kwargs.get('max_pool_size'),
- 'max_overflow': kwargs.get('max_overflow'),
- 'pool_timeout': kwargs.get('pool_timeout'),
- 'sqlite_synchronous': kwargs.get('sqlite_synchronous', True),
- 'connection_trace': kwargs.get('connection_trace', False),
- 'max_retries': kwargs.get('max_retries', 10),
- 'retry_interval': kwargs.get('retry_interval', 10),
- 'thread_checkin': kwargs.get('thread_checkin', True)
- }
- maker_kwargs = {
- 'autocommit': autocommit,
- 'expire_on_commit': expire_on_commit
- }
-
- self._engine = create_engine(sql_connection=sql_connection,
- **engine_kwargs)
- self._session_maker = get_maker(engine=self._engine,
- **maker_kwargs)
- if slave_connection:
- self._slave_engine = create_engine(sql_connection=slave_connection,
- **engine_kwargs)
- self._slave_session_maker = get_maker(engine=self._slave_engine,
- **maker_kwargs)
- else:
- self._slave_engine = None
- self._slave_session_maker = None
-
- def get_engine(self, use_slave=False):
- """Get the engine instance (note, that it's shared).
-
- :param use_slave: if possible, use 'slave' database for this engine.
- If the connection string for the slave database
- wasn't provided, 'master' engine will be returned.
- (defaults to False)
- :type use_slave: bool
-
- """
-
- if use_slave and self._slave_engine:
- return self._slave_engine
-
- return self._engine
-
- def get_session(self, use_slave=False, **kwargs):
- """Get a Session instance.
-
- :param use_slave: if possible, use 'slave' database connection for
- this session. If the connection string for the
- slave database wasn't provided, a session bound
- to the 'master' engine will be returned.
- (defaults to False)
- :type use_slave: bool
-
- Keyword arugments will be passed to a sessionmaker instance as is (if
- passed, they will override the ones used when the sessionmaker instance
- was created). See SQLAlchemy Session docs for details.
-
- """
-
- if use_slave and self._slave_session_maker:
- return self._slave_session_maker(**kwargs)
-
- return self._session_maker(**kwargs)
-
- @classmethod
- def from_config(cls, conf,
- sqlite_fk=False, autocommit=True, expire_on_commit=False):
- """Initialize EngineFacade using oslo.config config instance options.
-
- :param conf: oslo.config config instance
- :type conf: oslo.config.cfg.ConfigOpts
-
- :param sqlite_fk: enable foreign keys in SQLite
- :type sqlite_fk: bool
-
- :param autocommit: use autocommit mode for created Session instances
- :type autocommit: bool
-
- :param expire_on_commit: expire session objects on commit
- :type expire_on_commit: bool
-
- """
-
- conf.register_opts(options.database_opts, 'database')
-
- return cls(sql_connection=conf.database.connection,
- slave_connection=conf.database.slave_connection,
- sqlite_fk=sqlite_fk,
- autocommit=autocommit,
- expire_on_commit=expire_on_commit,
- mysql_sql_mode=conf.database.mysql_sql_mode,
- idle_timeout=conf.database.idle_timeout,
- connection_debug=conf.database.connection_debug,
- max_pool_size=conf.database.max_pool_size,
- max_overflow=conf.database.max_overflow,
- pool_timeout=conf.database.pool_timeout,
- sqlite_synchronous=conf.database.sqlite_synchronous,
- connection_trace=conf.database.connection_trace,
- max_retries=conf.database.max_retries,
- retry_interval=conf.database.retry_interval)
+__all__ = ["EngineFacade", "create_engine", "get_maker", "Query", "Session"]
diff --git a/oslo_db/sqlalchemy/test_base.py b/oslo_db/sqlalchemy/test_base.py
index fbe25a8..2ff9a55 100644
--- a/oslo_db/sqlalchemy/test_base.py
+++ b/oslo_db/sqlalchemy/test_base.py
@@ -28,6 +28,7 @@ import os
import six
from oslo_db import exception
+from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import utils
@@ -72,9 +73,15 @@ class DbFixture(fixtures.Fixture):
else:
self.test.engine = self.test.db.engine
self.test.sessionmaker = session.get_maker(self.test.engine)
+
self.addCleanup(setattr, self.test, 'sessionmaker', None)
self.addCleanup(setattr, self.test, 'engine', None)
+ self.test.enginefacade = enginefacade._TestTransactionFactory(
+ self.test.engine, self.test.sessionmaker, apply_global=True,
+ synchronous_reader=True)
+ self.addCleanup(self.test.enginefacade.dispose_global)
+
class DbTestCase(test_base.BaseTestCase):
"""Base class for testing of DB code.
diff --git a/oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py b/oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py
index 0b1bb89..7738254 100644
--- a/oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py
+++ b/oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py
@@ -25,8 +25,8 @@ from sqlalchemy.orm import mapper
from oslo.db import exception
from oslo.db.sqlalchemy import exc_filters
from oslo.db.sqlalchemy import test_base
-from oslo_db.sqlalchemy import session as private_session
-from oslo_db.tests.old_import_api import utils as test_utils
+from oslo_db.sqlalchemy import engines
+from oslo_db.tests import utils as test_utils
_TABLE_NAME = '__tmp__test__tmp__'
@@ -720,7 +720,7 @@ class TestDBDisconnected(TestsExceptionFilter):
engine = self.engine
event.listen(
- engine, "engine_connect", private_session._connect_ping_listener)
+ engine, "engine_connect", engines._connect_ping_listener)
real_do_execute = engine.dialect.do_execute
counter = itertools.count(1)
@@ -816,7 +816,7 @@ class TestDBConnectRetry(TestsExceptionFilter):
with self._dbapi_fixture(dialect_name):
with mock.patch.object(engine.dialect, "connect", cant_connect):
- return private_session._test_connection(engine, retries, .01)
+ return engines._test_connection(engine, retries, .01)
def test_connect_no_retries(self):
conn = self._run_test(
diff --git a/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py b/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py
index d1f10b7..1aa8e59 100644
--- a/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py
+++ b/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py
@@ -29,11 +29,11 @@ from sqlalchemy import Integer, String
from sqlalchemy.ext.declarative import declarative_base
from oslo.db import exception
+from oslo.db import options as db_options
from oslo.db.sqlalchemy import models
from oslo.db.sqlalchemy import session
from oslo.db.sqlalchemy import test_base
-from oslo_db import options as db_options
-from oslo_db.sqlalchemy import session as private_session
+from oslo_db.sqlalchemy import engines
BASE = declarative_base()
@@ -300,8 +300,8 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
self.assertFalse(ses.autocommit)
self.assertTrue(ses.expire_on_commit)
- @mock.patch('oslo_db.sqlalchemy.session.get_maker')
- @mock.patch('oslo_db.sqlalchemy.session.create_engine')
+ @mock.patch('oslo_db.sqlalchemy.orm.get_maker')
+ @mock.patch('oslo_db.sqlalchemy.engines.create_engine')
def test_creation_from_config(self, create_engine, get_maker):
conf = cfg.ConfigOpts()
conf.register_opts(db_options.database_opts, group='database')
@@ -339,6 +339,42 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
autocommit=False,
expire_on_commit=True)
+ @mock.patch('oslo_db.sqlalchemy.orm.get_maker')
+ @mock.patch('oslo_db.sqlalchemy.engines.create_engine')
+ def test_passed_in_url_overrides_conf(self, create_engine, get_maker):
+ conf = cfg.ConfigOpts()
+ conf.register_opts(db_options.database_opts, group='database')
+
+ overrides = {
+ 'connection': 'sqlite:///conf_db_setting',
+ 'connection_debug': 100,
+ 'max_pool_size': 10,
+ 'mysql_sql_mode': 'TRADITIONAL',
+ }
+ for optname, optvalue in overrides.items():
+ conf.set_override(optname, optvalue, group='database')
+
+ session.EngineFacade(
+ "sqlite:///override_sql",
+ **dict(conf.database.items())
+ )
+
+ create_engine.assert_called_once_with(
+ sql_connection='sqlite:///override_sql',
+ connection_debug=100,
+ max_pool_size=10,
+ mysql_sql_mode='TRADITIONAL',
+ sqlite_fk=False,
+ idle_timeout=mock.ANY,
+ retry_interval=mock.ANY,
+ max_retries=mock.ANY,
+ max_overflow=mock.ANY,
+ connection_trace=mock.ANY,
+ sqlite_synchronous=mock.ANY,
+ pool_timeout=mock.ANY,
+ thread_checkin=mock.ANY,
+ )
+
def test_slave_connection(self):
paths = self.create_tempfiles([('db.master', ''), ('db.slave', '')],
ext='')
@@ -483,9 +519,7 @@ class MysqlConnectTest(test_base.MySQLOpportunisticTestCase):
)
)
):
- private_session._init_events.dispatch_on_drivername("mysql")(
- test_engine
- )
+ engines._init_events.dispatch_on_drivername("mysql")(test_engine)
test_engine.raw_connection()
self.assertIn('Unable to detect effective SQL mode',
@@ -553,7 +587,7 @@ class PatchStacktraceTest(test_base.DbTestCase):
with mock.patch("traceback.extract_stack", side_effect=extract_stack):
- private_session._add_trace_comments(engine)
+ engines._add_trace_comments(engine)
conn = engine.connect()
with mock.patch.object(engine.dialect, "do_execute") as mock_exec:
diff --git a/oslo_db/tests/sqlalchemy/test_enginefacade.py b/oslo_db/tests/sqlalchemy/test_enginefacade.py
new file mode 100644
index 0000000..a222565
--- /dev/null
+++ b/oslo_db/tests/sqlalchemy/test_enginefacade.py
@@ -0,0 +1,1646 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import contextlib
+import copy
+import warnings
+
+import mock
+from oslo_config import cfg
+from oslo_context import context as oslo_context
+from oslotest import base as oslo_test_base
+from sqlalchemy import Column
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy.orm import mapper
+from sqlalchemy import select
+from sqlalchemy import String
+from sqlalchemy import Table
+
+from oslo_db import exception
+from oslo_db import options
+from oslo_db.sqlalchemy import enginefacade
+from oslo_db.sqlalchemy import engines as oslo_engines
+from oslo_db.sqlalchemy import orm
+from oslo_db.sqlalchemy import test_base
+
+
+class SingletonOnName(mock.MagicMock):
+ def __init__(self, the_name, **kw):
+ super(SingletonOnName, self).__init__(
+ __eq__=lambda self, other: other._assert_name == self._assert_name,
+ _assert_name=the_name,
+ **kw
+ )
+
+ def __deepcopy__(self, memo):
+ return self
+
+
+class SingletonConnection(SingletonOnName):
+ def __init__(self, **kw):
+ super(SingletonConnection, self).__init__(
+ "connection", **kw)
+
+
+class SingletonEngine(SingletonOnName):
+ def __init__(self, connection, **kw):
+ super(SingletonEngine, self).__init__(
+ "engine",
+ connect=mock.Mock(return_value=connection),
+ url=connection,
+ _assert_connection=connection,
+ **kw
+ )
+
+
+class NonDecoratedContext(object):
+ """a Context object that's not run through transaction_context_provider."""
+
+
+class AssertDataSource(collections.namedtuple(
+ "AssertDataSource", ["writer", "reader", "async_reader"])):
+
+ def element_for_writer(self, const):
+ if const is enginefacade._WRITER:
+ return self.writer
+ elif const is enginefacade._READER:
+ return self.reader
+ elif const is enginefacade._ASYNC_READER:
+ return self.async_reader
+ else:
+ assert False, "Unknown constant: %s" % const
+
+
+class MockFacadeTest(oslo_test_base.BaseTestCase):
+ """test by applying mocks to internal call-points.
+
+ This applies mocks to
+ oslo.db.sqlalchemy.engines.create_engine() and
+ oslo.db.sqlalchemy.orm.get_maker(), then mocking a
+ _TransactionFactory into
+ oslo.db.sqlalchemy.enginefacade._context_manager._root_factory.
+
+ Various scenarios are run against the enginefacade functions, and the
+ exact calls made against the mock create_engine(), get_maker(), and
+ associated objects are tested exactly against expected calls.
+
+ """
+
+ synchronous_reader = True
+
+ engine_uri = 'some_connection'
+ slave_uri = None
+
+ def setUp(self):
+ super(MockFacadeTest, self).setUp()
+
+ writer_conn = SingletonConnection()
+ writer_engine = SingletonEngine(writer_conn)
+ writer_session = mock.Mock(
+ connection=mock.Mock(return_value=writer_conn))
+ writer_maker = mock.Mock(return_value=writer_session)
+
+ if self.slave_uri:
+ async_reader_conn = SingletonConnection()
+ async_reader_engine = SingletonEngine(async_reader_conn)
+ async_reader_session = mock.Mock(
+ connection=mock.Mock(return_value=async_reader_conn))
+ async_reader_maker = mock.Mock(return_value=async_reader_session)
+
+ else:
+ async_reader_conn = writer_conn
+ async_reader_engine = writer_engine
+ async_reader_session = writer_session
+ async_reader_maker = writer_maker
+
+ if self.synchronous_reader:
+ reader_conn = async_reader_conn
+ reader_engine = async_reader_engine
+ reader_session = async_reader_session
+ reader_maker = async_reader_maker
+ else:
+ reader_conn = writer_conn
+ reader_engine = writer_engine
+ reader_session = writer_session
+ reader_maker = writer_maker
+
+ self.connections = AssertDataSource(
+ writer_conn, reader_conn, async_reader_conn
+ )
+ self.engines = AssertDataSource(
+ writer_engine, reader_engine, async_reader_engine
+ )
+ self.sessions = AssertDataSource(
+ writer_session, reader_session, async_reader_session
+ )
+ self.makers = AssertDataSource(
+ writer_maker, reader_maker, async_reader_maker
+ )
+
+ def get_maker(engine, **kw):
+ if engine is writer_engine:
+ return self.makers.writer
+ elif engine is reader_engine:
+ return self.makers.reader
+ elif engine is async_reader_engine:
+ return self.makers.async_reader
+ else:
+ assert False
+
+ session_patch = mock.patch.object(
+ orm, "get_maker",
+ side_effect=get_maker)
+ self.get_maker = session_patch.start()
+ self.addCleanup(session_patch.stop)
+
+ def create_engine(sql_connection, **kw):
+ if sql_connection == self.engine_uri:
+ return self.engines.writer
+ elif sql_connection == self.slave_uri:
+ return self.engines.async_reader
+ else:
+ assert False
+
+ engine_patch = mock.patch.object(
+ oslo_engines, "create_engine", side_effect=create_engine)
+
+ self.create_engine = engine_patch.start()
+ self.addCleanup(engine_patch.stop)
+
+ self.factory = enginefacade._TransactionFactory()
+ self.factory.configure(
+ synchronous_reader=self.synchronous_reader
+ )
+
+ self.factory.configure(
+ connection=self.engine_uri,
+ slave_connection=self.slave_uri
+ )
+
+ facade_patcher = mock.patch.object(
+ enginefacade._context_manager, "_root_factory", self.factory)
+ facade_patcher.start()
+ self.addCleanup(facade_patcher.stop)
+
+ def _assert_ctx_connection(self, context, connection):
+ self.assertIs(context.connection, connection)
+
+ def _assert_ctx_session(self, context, session):
+ self.assertIs(context.session, session)
+
+ def _assert_non_decorated_ctx_connection(self, context, connection):
+ transaction_ctx = enginefacade._transaction_ctx_for_context(context)
+ self.assertIs(transaction_ctx.connection, connection)
+
+ def _assert_non_decorated_ctx_session(self, context, session):
+ transaction_ctx = enginefacade._transaction_ctx_for_context(context)
+ self.assertIs(transaction_ctx.session, session)
+
+ @contextlib.contextmanager
+ def _assert_engines(self):
+ """produce a mock series of engine calls.
+
+ These are expected to match engine-related calls established
+ by the test subject.
+
+ """
+
+ writer_conn = SingletonConnection()
+ writer_engine = SingletonEngine(writer_conn)
+ if self.slave_uri:
+ async_reader_conn = SingletonConnection()
+ async_reader_engine = SingletonEngine(async_reader_conn)
+ else:
+ async_reader_conn = writer_conn
+ async_reader_engine = writer_engine
+
+ if self.synchronous_reader:
+ reader_engine = async_reader_engine
+ else:
+ reader_engine = writer_engine
+
+ engines = AssertDataSource(
+ writer_engine, reader_engine, async_reader_engine)
+
+ def create_engine(sql_connection, **kw):
+ if sql_connection == self.engine_uri:
+ return engines.writer
+ elif sql_connection == self.slave_uri:
+ return engines.async_reader
+ else:
+ assert False
+
+ engine_factory = mock.Mock(side_effect=create_engine)
+ engine_factory(
+ sql_connection=self.engine_uri,
+ **dict((k, mock.ANY) for k in self.factory._engine_cfg.keys())
+ )
+ if self.slave_uri:
+ engine_factory(
+ sql_connection=self.slave_uri,
+ **dict((k, mock.ANY) for k in self.factory._engine_cfg.keys())
+ )
+
+ yield AssertDataSource(
+ writer_engine, reader_engine, async_reader_engine
+ )
+
+ self.assertEqual(
+ engine_factory.mock_calls,
+ self.create_engine.mock_calls
+ )
+
+ for sym in [
+ enginefacade._WRITER, enginefacade._READER,
+ enginefacade._ASYNC_READER
+ ]:
+ self.assertEqual(
+ engines.element_for_writer(sym).mock_calls,
+ self.engines.element_for_writer(sym).mock_calls
+ )
+
+ def _assert_async_reader_connection(self, engines, session=None):
+ return self._assert_connection(
+ engines, enginefacade._ASYNC_READER, session)
+
+ def _assert_reader_connection(self, engines, session=None):
+ return self._assert_connection(engines, enginefacade._READER, session)
+
+ def _assert_writer_connection(self, engines, session=None):
+ return self._assert_connection(engines, enginefacade._WRITER, session)
+
+ @contextlib.contextmanager
+ def _assert_connection(self, engines, writer, session=None):
+ """produce a mock series of connection calls.
+
+ These are expected to match connection-related calls established
+ by the test subject.
+
+ """
+ if session:
+ connection = session.connection()
+ yield connection
+ else:
+ connection = engines.element_for_writer(writer).connect()
+ trans = connection.begin()
+ yield connection
+ if writer is enginefacade._WRITER:
+ trans.commit()
+ else:
+ trans.rollback()
+ connection.close()
+
+ self.assertEqual(
+ connection.mock_calls,
+ self.connections.element_for_writer(writer).mock_calls)
+
+ @contextlib.contextmanager
+ def _assert_makers(self, engines):
+
+ writer_session = mock.Mock(connection=mock.Mock(
+ return_value=engines.writer._assert_connection)
+ )
+ writer_maker = mock.Mock(return_value=writer_session)
+
+ if self.slave_uri:
+ async_reader_session = mock.Mock(connection=mock.Mock(
+ return_value=engines.async_reader._assert_connection)
+ )
+ async_reader_maker = mock.Mock(return_value=async_reader_session)
+ else:
+ async_reader_session = writer_session
+ async_reader_maker = writer_maker
+
+ if self.synchronous_reader:
+ reader_maker = async_reader_maker
+ else:
+ reader_maker = writer_maker
+
+ makers = AssertDataSource(
+ writer_maker,
+ reader_maker,
+ async_reader_maker,
+ )
+
+ def get_maker(engine, **kw):
+ if engine is engines.writer:
+ return makers.writer
+ elif engine is engines.reader:
+ return makers.reader
+ elif engine is engines.async_reader:
+ return makers.async_reader
+ else:
+ assert False
+
+ maker_factories = mock.Mock(side_effect=get_maker)
+
+ maker_factories(
+ autocommit=True, engine=engines.writer,
+ expire_on_commit=False)
+ if self.slave_uri:
+ maker_factories(
+ autocommit=True, engine=engines.async_reader,
+ expire_on_commit=False)
+
+ yield makers
+
+ self.assertEqual(
+ maker_factories.mock_calls,
+ self.get_maker.mock_calls)
+
+ for sym in [
+ enginefacade._WRITER, enginefacade._READER,
+ enginefacade._ASYNC_READER
+ ]:
+ self.assertEqual(
+ makers.element_for_writer(sym).mock_calls,
+ self.makers.element_for_writer(sym).mock_calls)
+
+ def _assert_async_reader_session(
+ self, makers, connection=None, assert_calls=True):
+ return self._assert_session(
+ makers, enginefacade._ASYNC_READER, connection, assert_calls)
+
+ def _assert_reader_session(
+ self, makers, connection=None, assert_calls=True):
+ return self._assert_session(
+ makers, enginefacade._READER,
+ connection, assert_calls)
+
+ def _assert_writer_session(
+ self, makers, connection=None, assert_calls=True):
+ return self._assert_session(
+ makers, enginefacade._WRITER,
+ connection, assert_calls)
+
+ @contextlib.contextmanager
+ def _assert_session(
+ self, makers, writer, connection=None, assert_calls=True):
+ """produce a mock series of session calls.
+
+ These are expected to match session-related calls established
+ by the test subject.
+
+ """
+
+ if connection:
+ session = makers.element_for_writer(writer)(bind=connection)
+ else:
+ session = makers.element_for_writer(writer)()
+ session.begin()
+ yield session
+ if writer is enginefacade._WRITER:
+ session.commit()
+ elif enginefacade.\
+ _context_manager._factory._transaction_ctx_cfg[
+ 'rollback_reader_sessions']:
+ session.rollback()
+ session.close()
+
+ if assert_calls:
+ self.assertEqual(
+ session.mock_calls,
+ self.sessions.element_for_writer(writer).mock_calls)
+
+ def test_session_reader_decorator(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader
+ def go(context):
+ context.session.execute("test")
+ go(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(makers) as session:
+ session.execute("test")
+
+ def test_connection_reader_decorator(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader.connection
+ def go(context):
+ context.connection.execute("test")
+ go(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_reader_connection(engines) as connection:
+ connection.execute("test")
+
+ def test_session_reader_nested_in_connection_reader(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader.connection
+ def go1(context):
+ context.connection.execute("test1")
+ go2(context)
+
+ @enginefacade.reader
+ def go2(context):
+ context.session.execute("test2")
+ go1(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_reader_connection(engines) as connection:
+ connection.execute("test1")
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(
+ makers, connection) as session:
+ session.execute("test2")
+
+ def test_connection_reader_nested_in_session_reader(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader
+ def go1(context):
+ context.session.execute("test1")
+ go2(context)
+
+ @enginefacade.reader.connection
+ def go2(context):
+ context.connection.execute("test2")
+
+ go1(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(makers) as session:
+ session.execute("test1")
+ with self._assert_reader_connection(
+ engines, session) as connection:
+ connection.execute("test2")
+
+ def test_session_reader_decorator_nested(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader
+ def go1(context):
+ context.session.execute("test1")
+ go2(context)
+
+ @enginefacade.reader
+ def go2(context):
+ context.session.execute("test2")
+ go1(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(makers) as session:
+ session.execute("test1")
+ session.execute("test2")
+
+ def test_reader_nested_in_writer_ok(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.writer
+ def go1(context):
+ context.session.execute("test1")
+ go2(context)
+
+ @enginefacade.reader
+ def go2(context):
+ context.session.execute("test2")
+
+ go1(context)
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_writer_session(makers) as session:
+ session.execute("test1")
+ session.execute("test2")
+
+ def test_writer_nested_in_reader_raises(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader
+ def go1(context):
+ context.session.execute("test1")
+ go2(context)
+
+ @enginefacade.writer
+ def go2(context):
+ context.session.execute("test2")
+
+ exc = self.assertRaises(
+ TypeError, go1, context
+ )
+ self.assertEqual(
+ "Can't upgrade a READER "
+ "transaction to a WRITER mid-transaction",
+ exc.args[0]
+ )
+
+ def test_async_on_writer_raises(self):
+ exc = self.assertRaises(
+ TypeError, getattr, enginefacade.writer, "async"
+ )
+ self.assertEqual(
+ "Setting async on a WRITER makes no sense",
+ exc.args[0]
+ )
+
+ def test_savepoint_and_independent_raises(self):
+ exc = self.assertRaises(
+ TypeError, getattr, enginefacade.writer.independent, "savepoint"
+ )
+ self.assertEqual(
+ "setting savepoint and independent makes no sense.",
+ exc.args[0]
+ )
+
+ def test_reader_nested_in_async_reader_raises(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader.async
+ def go1(context):
+ context.session.execute("test1")
+ go2(context)
+
+ @enginefacade.reader
+ def go2(context):
+ context.session.execute("test2")
+
+ exc = self.assertRaises(
+ TypeError, go1, context
+ )
+ self.assertEqual(
+ "Can't upgrade an ASYNC_READER transaction "
+ "to a READER mid-transaction",
+ exc.args[0]
+ )
+
+ def test_writer_nested_in_async_reader_raises(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader.async
+ def go1(context):
+ context.session.execute("test1")
+ go2(context)
+
+ @enginefacade.writer
+ def go2(context):
+ context.session.execute("test2")
+
+ exc = self.assertRaises(
+ TypeError, go1, context
+ )
+ self.assertEqual(
+ "Can't upgrade an ASYNC_READER transaction to a "
+ "WRITER mid-transaction",
+ exc.args[0]
+ )
+
+ def test_reader_then_writer_ok(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader
+ def go1(context):
+ context.session.execute("test1")
+
+ @enginefacade.writer
+ def go2(context):
+ context.session.execute("test2")
+
+ go1(context)
+ go2(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(
+ makers, assert_calls=False) as session:
+ session.execute("test1")
+ with self._assert_writer_session(makers) as session:
+ session.execute("test2")
+
+ def test_async_reader_then_reader_ok(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.reader.async
+ def go1(context):
+ context.session.execute("test1")
+
+ @enginefacade.reader
+ def go2(context):
+ context.session.execute("test2")
+
+ go1(context)
+ go2(context)
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_async_reader_session(
+ makers, assert_calls=False) as session:
+ session.execute("test1")
+ with self._assert_reader_session(makers) as session:
+ session.execute("test2")
+
+ def test_using_reader(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.reader.using(context) as session:
+ self._assert_ctx_session(context, session)
+ session.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(makers) as session:
+ session.execute("test1")
+
+ def test_using_reader_rollback_reader_session(self):
+ enginefacade.configure(rollback_reader_sessions=True)
+
+ context = oslo_context.RequestContext()
+
+ with enginefacade.reader.using(context) as session:
+ self._assert_ctx_session(context, session)
+ session.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_reader_session(makers) as session:
+ session.execute("test1")
+
+ def test_using_writer(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.writer.using(context) as session:
+ self._assert_ctx_session(context, session)
+ session.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_writer_session(makers) as session:
+ session.execute("test1")
+
+ def test_using_writer_no_descriptors(self):
+ context = NonDecoratedContext()
+
+ with enginefacade.writer.using(context) as session:
+ self._assert_non_decorated_ctx_session(context, session)
+ session.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_makers(engines) as makers:
+ with self._assert_writer_session(makers) as session:
+ session.execute("test1")
+
+ def test_using_writer_connection_no_descriptors(self):
+ context = NonDecoratedContext()
+
+ with enginefacade.writer.connection.using(context) as connection:
+ self._assert_non_decorated_ctx_connection(context, connection)
+ connection.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_writer_connection(engines) as conn:
+ conn.execute("test1")
+
+ def test_using_reader_connection(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.reader.connection.using(context) as connection:
+ self._assert_ctx_connection(context, connection)
+ connection.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_reader_connection(engines) as conn:
+ conn.execute("test1")
+
+ def test_using_writer_connection(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.writer.connection.using(context) as connection:
+ self._assert_ctx_connection(context, connection)
+ connection.execute("test1")
+
+ with self._assert_engines() as engines:
+ with self._assert_writer_connection(engines) as conn:
+ conn.execute("test1")
+
+ def test_context_copied_using_existing_writer_connection(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.writer.connection.using(context) as connection:
+ self._assert_ctx_connection(context, connection)
+ connection.execute("test1")
+
+ ctx2 = copy.deepcopy(context)
+
+ with enginefacade.reader.connection.using(ctx2) as conn2:
+ self.assertIs(conn2, connection)
+ self._assert_ctx_connection(ctx2, conn2)
+
+ conn2.execute("test2")
+
+ with self._assert_engines() as engines:
+ with self._assert_writer_connection(engines) as conn:
+ conn.execute("test1")
+ conn.execute("test2")
+
+ def test_context_nodesc_copied_using_existing_writer_connection(self):
+ context = NonDecoratedContext()
+
+ with enginefacade.writer.connection.using(context) as connection:
+ self._assert_non_decorated_ctx_connection(context, connection)
+ connection.execute("test1")
+
+ ctx2 = copy.deepcopy(context)
+
+ with enginefacade.reader.connection.using(ctx2) as conn2:
+ self.assertIs(conn2, connection)
+ self._assert_non_decorated_ctx_connection(ctx2, conn2)
+
+ conn2.execute("test2")
+
+ with self._assert_engines() as engines:
+ with self._assert_writer_connection(engines) as conn:
+ conn.execute("test1")
+ conn.execute("test2")
+
+ def test_session_context_exception(self):
+ context = oslo_context.RequestContext()
+ exc = self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'session'
+ )
+
+ self.assertRegexpMatches(
+ exc.args[0],
+ "No TransactionContext is established for "
+ "this .*RequestContext.* object within the current "
+ "thread; the 'session' attribute is unavailable."
+ )
+
+ def test_session_context_getattr(self):
+ context = oslo_context.RequestContext()
+ self.assertIsNone(getattr(context, 'session', None))
+
+ def test_connection_context_exception(self):
+ context = oslo_context.RequestContext()
+ exc = self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'connection'
+ )
+
+ self.assertRegexpMatches(
+ exc.args[0],
+ "No TransactionContext is established for "
+ "this .*RequestContext.* object within the current "
+ "thread; the 'connection' attribute is unavailable."
+ )
+
+ def test_connection_context_getattr(self):
+ context = oslo_context.RequestContext()
+ self.assertIsNone(getattr(context, 'connection', None))
+
+ def test_transaction_context_exception(self):
+ context = oslo_context.RequestContext()
+ exc = self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'transaction'
+ )
+
+ self.assertRegexpMatches(
+ exc.args[0],
+ "No TransactionContext is established for "
+ "this .*RequestContext.* object within the current "
+ "thread; the 'transaction' attribute is unavailable."
+ )
+
+ def test_transaction_context_getattr(self):
+ context = oslo_context.RequestContext()
+ self.assertIsNone(getattr(context, 'transaction', None))
+
+ def test_trans_ctx_context_exception(self):
+ context = oslo_context.RequestContext()
+ exc = self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'transaction_ctx'
+ )
+
+ self.assertRegexpMatches(
+ exc.args[0],
+ "No TransactionContext is established for "
+ "this .*RequestContext.* object within the current "
+ "thread."
+ )
+
+ def test_trans_ctx_context_getattr(self):
+ context = oslo_context.RequestContext()
+ self.assertIsNone(getattr(context, 'transaction_ctx', None))
+
+ def test_multiple_factories(self):
+ """Test that the instrumentation applied to a context class is
+
+ independent of a specific _TransactionContextManager
+ / _TransactionFactory.
+
+ """
+ mgr1 = enginefacade.transaction_context()
+ mgr1.configure(
+ connection=self.engine_uri,
+ slave_connection=self.slave_uri
+ )
+ mgr2 = enginefacade.transaction_context()
+ mgr2.configure(
+ connection=self.engine_uri,
+ slave_connection=self.slave_uri
+ )
+
+ context = oslo_context.RequestContext()
+
+ self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'session'
+ )
+ with mgr1.writer.using(context):
+ self.assertIs(context.transaction_ctx.factory, mgr1._factory)
+ self.assertIsNot(context.transaction_ctx.factory, mgr2._factory)
+ self.assertIsNotNone(context.session)
+
+ self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'session'
+ )
+ with mgr2.writer.using(context):
+ self.assertIsNot(context.transaction_ctx.factory, mgr1._factory)
+ self.assertIs(context.transaction_ctx.factory, mgr2._factory)
+ self.assertIsNotNone(context.session)
+
+ def test_multiple_factories_nested(self):
+ """Test that the instrumentation applied to a context class supports
+
+ nested calls among multiple _TransactionContextManager objects.
+
+ """
+ mgr1 = enginefacade.transaction_context()
+ mgr1.configure(
+ connection=self.engine_uri,
+ slave_connection=self.slave_uri
+ )
+ mgr2 = enginefacade.transaction_context()
+ mgr2.configure(
+ connection=self.engine_uri,
+ slave_connection=self.slave_uri
+ )
+
+ context = oslo_context.RequestContext()
+
+ with mgr1.writer.using(context):
+ self.assertIs(context.transaction_ctx.factory, mgr1._factory)
+ self.assertIsNot(context.transaction_ctx.factory, mgr2._factory)
+
+ with mgr2.writer.using(context):
+ self.assertIsNot(
+ context.transaction_ctx.factory, mgr1._factory)
+ self.assertIs(context.transaction_ctx.factory, mgr2._factory)
+ self.assertIsNotNone(context.session)
+
+ # mgr1 is restored
+ self.assertIs(context.transaction_ctx.factory, mgr1._factory)
+ self.assertIsNot(context.transaction_ctx.factory, mgr2._factory)
+ self.assertIsNotNone(context.session)
+
+ self.assertRaises(
+ exception.NoEngineContextEstablished,
+ getattr, context, 'transaction_ctx'
+ )
+
+
+class SynchronousReaderWSlaveMockFacadeTest(MockFacadeTest):
+ synchronous_reader = True
+
+ engine_uri = 'some_connection'
+ slave_uri = 'some_slave_connection'
+
+
+class AsyncReaderWSlaveMockFacadeTest(MockFacadeTest):
+ synchronous_reader = False
+
+ engine_uri = 'some_connection'
+ slave_uri = 'some_slave_connection'
+
+
+class LegacyIntegrationtest(test_base.DbTestCase):
+
+ def test_legacy_integration(self):
+ legacy_facade = enginefacade.get_legacy_facade()
+ self.assertTrue(
+ legacy_facade.get_engine() is
+ enginefacade._context_manager._factory._writer_engine
+ )
+
+ self.assertTrue(
+ enginefacade.get_legacy_facade() is legacy_facade
+ )
+
+
+class ThreadingTest(test_base.DbTestCase):
+ """Test copying on new threads using real connections and sessions."""
+
+ def _assert_ctx_connection(self, context, connection):
+ self.assertIs(context.connection, connection)
+
+ def _assert_ctx_session(self, context, session):
+ self.assertIs(context.session, session)
+
+ def _patch_thread_ident(self):
+ self.ident = 1
+
+ test_instance = self
+
+ class MockThreadingLocal(object):
+ def __init__(self):
+ self.__dict__['state'] = collections.defaultdict(dict)
+
+ def __deepcopy__(self, memo):
+ return self
+
+ def __getattr__(self, key):
+ ns = self.state[test_instance.ident]
+ try:
+ return ns[key]
+ except KeyError:
+ raise AttributeError(key)
+
+ def __setattr__(self, key, value):
+ ns = self.state[test_instance.ident]
+ ns[key] = value
+
+ def __delattr__(self, key):
+ ns = self.state[test_instance.ident]
+ try:
+ del ns[key]
+ except KeyError:
+ raise AttributeError(key)
+
+ return mock.patch.object(
+ enginefacade, "_TransactionContextTLocal", MockThreadingLocal)
+
+ def test_thread_ctxmanager_writer(self):
+ context = oslo_context.RequestContext()
+
+ with self._patch_thread_ident():
+ with enginefacade.writer.using(context) as session:
+ self._assert_ctx_session(context, session)
+
+ self.ident = 2
+
+ with enginefacade.reader.using(context) as sess2:
+ # new session
+ self.assertIsNot(sess2, session)
+
+ # thread local shows the new session
+ self._assert_ctx_session(context, sess2)
+
+ self.ident = 1
+
+ with enginefacade.reader.using(context) as sess3:
+ self.assertIs(sess3, session)
+ self._assert_ctx_session(context, session)
+
+ def test_thread_ctxmanager_writer_connection(self):
+ context = oslo_context.RequestContext()
+
+ with self._patch_thread_ident():
+ with enginefacade.writer.connection.using(context) as conn:
+ self._assert_ctx_connection(context, conn)
+
+ self.ident = 2
+
+ with enginefacade.reader.connection.using(context) as conn2:
+ # new connection
+ self.assertIsNot(conn2, conn)
+
+ # thread local shows the new connection
+ self._assert_ctx_connection(context, conn2)
+
+ with enginefacade.reader.connection.using(
+ context) as conn3:
+ # we still get the right connection even though
+ # this context is not the "copied" context
+ self.assertIsNot(conn3, conn)
+ self.assertIs(conn3, conn2)
+
+ self.ident = 1
+
+ with enginefacade.reader.connection.using(context) as conn3:
+ self.assertIs(conn3, conn)
+ self._assert_ctx_connection(context, conn)
+
+ def test_thread_ctxmanager_switch_styles(self):
+
+ @enginefacade.writer.connection
+ def go_one(context):
+ self.assertIsNone(context.session)
+ self.assertIsNotNone(context.connection)
+
+ self.ident = 2
+ go_two(context)
+
+ self.ident = 1
+ self.assertIsNone(context.session)
+ self.assertIsNotNone(context.connection)
+
+ @enginefacade.reader
+ def go_two(context):
+ self.assertIsNone(context.connection)
+ self.assertIsNotNone(context.session)
+
+ context = oslo_context.RequestContext()
+ with self._patch_thread_ident():
+ go_one(context)
+
+ def test_thread_decorator_writer(self):
+ sessions = set()
+
+ @enginefacade.writer
+ def go_one(context):
+ sessions.add(context.session)
+
+ self.ident = 2
+ go_two(context)
+
+ self.ident = 1
+
+ go_three(context)
+
+ @enginefacade.reader
+ def go_two(context):
+ assert context.session not in sessions
+
+ @enginefacade.reader
+ def go_three(context):
+ assert context.session in sessions
+
+ context = oslo_context.RequestContext()
+ with self._patch_thread_ident():
+ go_one(context)
+
+ def test_thread_decorator_writer_connection(self):
+ connections = set()
+
+ @enginefacade.writer.connection
+ def go_one(context):
+ connections.add(context.connection)
+
+ self.ident = 2
+ go_two(context)
+
+ self.ident = 1
+
+ go_three(context)
+
+ @enginefacade.reader.connection
+ def go_two(context):
+ assert context.connection not in connections
+
+ @enginefacade.reader
+ def go_three(context):
+ assert context.connection in connections
+
+ context = oslo_context.RequestContext()
+ with self._patch_thread_ident():
+ go_one(context)
+
+
+class LiveFacadeTest(test_base.DbTestCase):
+ """test using live SQL with test-provisioned databases.
+
+ Several of these tests require that multiple transactions run
+ simultaenously; as the default SQLite :memory: connection can't achieve
+ this, opportunistic test implementations against MySQL and PostgreSQL are
+ supplied.
+
+ """
+
+ def setUp(self):
+ super(LiveFacadeTest, self).setUp()
+
+ metadata = MetaData()
+ user_table = Table(
+ 'user', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30)),
+ mysql_engine='InnoDB'
+ )
+ self.user_table = user_table
+ metadata.create_all(self.engine)
+ self.addCleanup(metadata.drop_all, self.engine)
+
+ class User(object):
+ def __init__(self, name):
+ self.name = name
+
+ mapper(User, user_table)
+ self.User = User
+
+ def _assert_ctx_connection(self, context, connection):
+ self.assertIs(context.connection, connection)
+
+ def _assert_ctx_session(self, context, session):
+ self.assertIs(context.session, session)
+
+ def test_transaction_committed(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.writer.using(context) as session:
+ session.add(self.User(name="u1"))
+
+ session = self.sessionmaker(autocommit=True)
+ self.assertEqual(
+ "u1",
+ session.query(self.User.name).scalar()
+ )
+
+ def test_transaction_rollback(self):
+ context = oslo_context.RequestContext()
+
+ class MyException(Exception):
+ pass
+
+ @enginefacade.writer
+ def go(context):
+ context.session.add(self.User(name="u1"))
+ context.session.flush()
+ raise MyException("a test")
+
+ self.assertRaises(MyException, go, context)
+
+ session = self.sessionmaker(autocommit=True)
+ self.assertEqual(
+ None,
+ session.query(self.User.name).scalar()
+ )
+
+ def test_context_deepcopy_on_session(self):
+ context = oslo_context.RequestContext()
+ with enginefacade.writer.using(context) as session:
+
+ ctx2 = copy.deepcopy(context)
+ self._assert_ctx_session(ctx2, session)
+
+ with enginefacade.writer.using(ctx2) as s2:
+ self.assertIs(session, s2)
+ self._assert_ctx_session(ctx2, s2)
+
+ s2.add(self.User(name="u1"))
+ s2.flush()
+
+ session = self.sessionmaker(autocommit=True)
+ self.assertEqual(
+ "u1",
+ session.query(self.User.name).scalar()
+ )
+
+ def test_context_deepcopy_on_connection(self):
+ context = oslo_context.RequestContext()
+ with enginefacade.writer.connection.using(context) as conn:
+
+ ctx2 = copy.deepcopy(context)
+ self._assert_ctx_connection(ctx2, conn)
+
+ with enginefacade.writer.connection.using(ctx2) as conn2:
+ self.assertIs(conn, conn2)
+ self._assert_ctx_connection(ctx2, conn2)
+
+ conn2.execute(self.user_table.insert().values(name="u1"))
+
+ self._assert_ctx_connection(ctx2, conn2)
+
+ session = self.sessionmaker(autocommit=True)
+ self.assertEqual(
+ "u1",
+ session.query(self.User.name).scalar()
+ )
+
+ @test_base.backend_specific("postgresql", "mysql")
+ def test_external_session_transaction(self):
+ context = oslo_context.RequestContext()
+ with enginefacade.writer.using(context) as session:
+ session.add(self.User(name="u1"))
+ session.flush()
+
+ with enginefacade.writer.independent.using(context) as s2:
+ # transaction() uses a new session
+ self.assertIsNot(s2, session)
+ self._assert_ctx_session(context, s2)
+
+ # rows within a distinct transaction
+ s2.add(self.User(name="u2"))
+
+ # it also takes over the global enginefacade
+ # within the context
+ with enginefacade.writer.using(context) as s3:
+ self.assertIs(s3, s2)
+ s3.add(self.User(name="u3"))
+
+ self._assert_ctx_session(context, session)
+
+ # rollback the "outer" transaction
+ session.rollback()
+
+ # add more state on the "outer" transaction
+ session.begin()
+ session.add(self.User(name="u4"))
+
+ session = self.sessionmaker(autocommit=True)
+
+ # inner transction + second part of "outer" transaction were committed
+ self.assertEqual(
+ [("u2",), ("u3",), ("u4", )],
+ session.query(
+ self.User.name).order_by(self.User.name).all()
+ )
+
+ def test_savepoint_transaction_decorator(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.writer
+ def go1(context):
+ session = context.session
+ session.add(self.User(name="u1"))
+ session.flush()
+
+ try:
+ go2(context)
+ except Exception:
+ pass
+
+ go3(context)
+
+ session.add(self.User(name="u4"))
+
+ @enginefacade.writer.savepoint
+ def go2(context):
+ session = context.session
+ session.add(self.User(name="u2"))
+ raise Exception("nope")
+
+ @enginefacade.writer.savepoint
+ def go3(context):
+ session = context.session
+ session.add(self.User(name="u3"))
+
+ go1(context)
+
+ session = self.sessionmaker(autocommit=True)
+
+ # inner transction + second part of "outer" transaction were committed
+ self.assertEqual(
+ [("u1",), ("u3",), ("u4", )],
+ session.query(
+ self.User.name).order_by(self.User.name).all()
+ )
+
+ def test_savepoint_transaction(self):
+ context = oslo_context.RequestContext()
+
+ with enginefacade.writer.using(context) as session:
+ session.add(self.User(name="u1"))
+ session.flush()
+
+ try:
+ with enginefacade.writer.savepoint.using(context) as session:
+ session.add(self.User(name="u2"))
+ raise Exception("nope")
+ except Exception:
+ pass
+
+ with enginefacade.writer.savepoint.using(context) as session:
+ session.add(self.User(name="u3"))
+
+ session.add(self.User(name="u4"))
+
+ session = self.sessionmaker(autocommit=True)
+
+ # inner transction + second part of "outer" transaction were committed
+ self.assertEqual(
+ [("u1",), ("u3",), ("u4", )],
+ session.query(
+ self.User.name).order_by(self.User.name).all()
+ )
+
+ @test_base.backend_specific("postgresql", "mysql")
+ def test_external_session_transaction_decorator(self):
+ context = oslo_context.RequestContext()
+
+ @enginefacade.writer
+ def go1(context):
+ session = context.session
+ session.add(self.User(name="u1"))
+ session.flush()
+
+ go2(context, session)
+
+ self._assert_ctx_session(context, session)
+
+ # rollback the "outer" transaction
+ session.rollback()
+
+ # add more state on the "outer" transaction
+ session.begin()
+ session.add(self.User(name="u4"))
+
+ @enginefacade.writer.independent
+ def go2(context, session):
+ s2 = context.session
+ # uses a new session
+ self.assertIsNot(s2, session)
+ self._assert_ctx_session(context, s2)
+
+ # rows within a distinct transaction
+ s2.add(self.User(name="u2"))
+
+ # it also takes over the global enginefacade
+ # within the context
+ with enginefacade.writer.using(context) as s3:
+ self.assertIs(s3, s2)
+ s3.add(self.User(name="u3"))
+
+ go1(context)
+
+ session = self.sessionmaker(autocommit=True)
+
+ # inner transction + second part of "outer" transaction were committed
+ self.assertEqual(
+ [("u2",), ("u3",), ("u4", )],
+ session.query(
+ self.User.name).order_by(self.User.name).all()
+ )
+
+ @test_base.backend_specific("postgresql", "mysql")
+ def test_external_connection_transaction(self):
+ context = oslo_context.RequestContext()
+ with enginefacade.writer.connection.using(context) as connection:
+ connection.execute(self.user_table.insert().values(name="u1"))
+
+ # transaction() uses a new Connection
+ with enginefacade.writer.independent.connection.\
+ using(context) as c2:
+ self.assertIsNot(c2, connection)
+ self._assert_ctx_connection(context, c2)
+
+ # rows within a distinct transaction
+ c2.execute(self.user_table.insert().values(name="u2"))
+
+ # it also takes over the global enginefacade
+ # within the context
+ with enginefacade.writer.connection.using(context) as c3:
+ self.assertIs(c2, c3)
+ c3.execute(self.user_table.insert().values(name="u3"))
+ self._assert_ctx_connection(context, connection)
+
+ # rollback the "outer" transaction
+ transaction_ctx = context.transaction_ctx
+ transaction_ctx.transaction.rollback()
+ transaction_ctx.transaction = connection.begin()
+
+ # add more state on the "outer" transaction
+ connection.execute(self.user_table.insert().values(name="u4"))
+
+ session = self.sessionmaker(autocommit=True)
+ self.assertEqual(
+ [("u2",), ("u3",), ("u4", )],
+ session.query(
+ self.User.name).order_by(self.User.name).all()
+ )
+
+ @test_base.backend_specific("postgresql", "mysql")
+ def test_external_writer_in_reader(self):
+ context = oslo_context.RequestContext()
+ with enginefacade.reader.using(context) as session:
+ ping = session.scalar(select([1]))
+ self.assertEqual(1, ping)
+
+ # we're definitely a reader
+ @enginefacade.writer
+ def go(ctx):
+ pass
+ exc = self.assertRaises(TypeError, go, context)
+ self.assertEqual(
+ "Can't upgrade a READER transaction to a "
+ "WRITER mid-transaction",
+ exc.args[0])
+
+ # but we can do a writer on a new transaction
+ with enginefacade.writer.independent.using(context) as sess2:
+ self.assertIsNot(sess2, session)
+ self._assert_ctx_session(context, sess2)
+
+ session.add(self.User(name="u1_nocommit"))
+ sess2.add(self.User(name="u1_commit"))
+
+ user = session.query(self.User).first()
+ self.assertEqual("u1_commit", user.name)
+
+ session = self.sessionmaker(autocommit=True)
+ self.assertEqual(
+ session.query(
+ self.User.name).order_by(self.User.name).all(),
+ [("u1_commit",)]
+ )
+
+ def test_replace_scope(self):
+ # "timeout" is an argument accepted by
+ # the pysqlite dialect, which we set here to ensure
+ # that even in an all-sqlite test, we test that the URL
+ # is different in the context we are looking for
+ alt_connection = "sqlite:///?timeout=90"
+
+ alt_mgr1 = enginefacade.transaction_context()
+ alt_mgr1.configure(
+ connection=alt_connection,
+ )
+
+ @enginefacade.writer
+ def go1(context):
+ s1 = context.session
+ self.assertEqual(
+ s1.bind.url,
+ enginefacade._context_manager._factory._writer_engine.url)
+ self.assertIs(
+ s1.bind,
+ enginefacade._context_manager._factory._writer_engine)
+ self.assertEqual(s1.bind.url, self.engine.url)
+
+ with alt_mgr1.replace.using(context):
+ go2(context)
+
+ go4(context)
+
+ @enginefacade.writer
+ def go2(context):
+ s2 = context.session
+
+ # factory is not replaced globally...
+ self.assertIsNot(
+ enginefacade._context_manager._factory._writer_engine,
+ alt_mgr1._factory._writer_engine
+ )
+
+ # but it is replaced for us
+ self.assertIs(s2.bind, alt_mgr1._factory._writer_engine)
+ self.assertEqual(
+ str(s2.bind.url), alt_connection)
+
+ go3(context)
+
+ @enginefacade.reader
+ def go3(context):
+ s3 = context.session
+
+ # in a call of a call, we still have the alt URL
+ self.assertIs(s3.bind, alt_mgr1._factory._writer_engine)
+ self.assertEqual(
+ str(s3.bind.url), alt_connection)
+
+ @enginefacade.writer
+ def go4(context):
+ s4 = context.session
+
+ # outside the "replace" context, all is back to normal
+ self.assertIs(s4.bind, self.engine)
+ self.assertEqual(
+ s4.bind.url, self.engine.url)
+
+ context = oslo_context.RequestContext()
+ go1(context)
+ self.assertIsNot(
+ enginefacade._context_manager._factory._writer_engine,
+ alt_mgr1._factory._writer_engine
+ )
+
+ def test_replace_scope_only_global_eng(self):
+ # "timeout" is an argument accepted by
+ # the pysqlite dialect, which we set here to ensure
+ # that even in an all-sqlite test, we test that the URL
+ # is different in the context we are looking for
+ alt_connection1 = "sqlite:///?timeout=90"
+
+ alt_mgr1 = enginefacade.transaction_context()
+ alt_mgr1.configure(
+ connection=alt_connection1,
+ )
+
+ alt_connection2 = "sqlite:///?timeout=120"
+
+ alt_mgr2 = enginefacade.transaction_context()
+ alt_mgr2.configure(
+ connection=alt_connection2,
+ )
+
+ @enginefacade.writer
+ def go1(context):
+ s1 = context.session
+ # global engine
+ self.assertEqual(s1.bind.url, self.engine.url)
+
+ # now replace global engine...
+ with alt_mgr1.replace.using(context):
+ go2(context)
+
+ # and back
+ go6(context)
+
+ @enginefacade.writer
+ def go2(context):
+ s2 = context.session
+
+ # we have the replace-the-global engine
+ self.assertEqual(str(s2.bind.url), alt_connection1)
+ self.assertIs(s2.bind, alt_mgr1._factory._writer_engine)
+
+ go3(context)
+
+ @alt_mgr2.writer
+ def go3(context):
+ s3 = context.session
+
+ # we don't use the global engine in the first place.
+ # make sure our own factory still used.
+ self.assertEqual(str(s3.bind.url), alt_connection2)
+ self.assertIs(s3.bind, alt_mgr2._factory._writer_engine)
+
+ go4(context)
+
+ @enginefacade.writer
+ def go4(context):
+ s4 = context.session
+
+ # we *do* use the global, so we still want the replacement.
+ self.assertEqual(str(s4.bind.url), alt_connection1)
+ self.assertIs(s4.bind, alt_mgr1._factory._writer_engine)
+
+ @enginefacade.writer
+ def go5(context):
+ s5 = context.session
+
+ # ...and here also
+ self.assertEqual(str(s5.bind.url), alt_connection1)
+ self.assertIs(s5.bind, alt_mgr1._factory._writer_engine)
+
+ @enginefacade.writer
+ def go6(context):
+ s6 = context.session
+
+ # ...but not here!
+ self.assertEqual(str(s6.bind.url), str(self.engine.url))
+ self.assertIs(s6.bind, self.engine)
+
+ context = oslo_context.RequestContext()
+ go1(context)
+
+
+class MySQLLiveFacadeTest(
+ test_base.MySQLOpportunisticTestCase, LiveFacadeTest):
+ pass
+
+
+class PGLiveFacadeTest(
+ test_base.PostgreSQLOpportunisticTestCase, LiveFacadeTest):
+ pass
+
+
+class ConfigOptionsTest(oslo_test_base.BaseTestCase):
+ def test_all_options(self):
+ """test that everything in CONF.database.iteritems() is accepted.
+
+ There's a handful of options in oslo.db.options that seem to have
+ no meaning, but need to be accepted. In particular, Cinder and
+ maybe others are doing exactly this call.
+
+ """
+
+ factory = enginefacade._TransactionFactory()
+ cfg.CONF.register_opts(options.database_opts, 'database')
+ factory.configure(**dict(cfg.CONF.database.items()))
+
+ def test_options_not_supported(self):
+ factory = enginefacade._TransactionFactory()
+
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always")
+
+ factory.configure(fake1='x', idle_timeout=200, wrong2='y')
+
+ self.assertEqual(1, len(w))
+ self.assertTrue(
+ issubclass(w[-1].category, exception.NotSupportedWarning))
+ self.assertEqual(
+ "Configuration option(s) ['fake1', 'wrong2'] not supported",
+ str(w[-1].message)
+ )
+
+# TODO(zzzeek): test configuration options, e.g. like
+# test_sqlalchemy->test_creation_from_config
diff --git a/oslo_db/tests/sqlalchemy/test_exc_filters.py b/oslo_db/tests/sqlalchemy/test_exc_filters.py
index bafb59e..34313ce 100644
--- a/oslo_db/tests/sqlalchemy/test_exc_filters.py
+++ b/oslo_db/tests/sqlalchemy/test_exc_filters.py
@@ -23,8 +23,8 @@ from sqlalchemy import event
from sqlalchemy.orm import mapper
from oslo_db import exception
+from oslo_db.sqlalchemy import engines
from oslo_db.sqlalchemy import exc_filters
-from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import test_base
from oslo_db.tests import utils as test_utils
@@ -784,7 +784,7 @@ class TestDBDisconnected(TestsExceptionFilter):
dialect_name, exception, num_disconnects, is_disconnect=True):
engine = self.engine
- event.listen(engine, "engine_connect", session._connect_ping_listener)
+ event.listen(engine, "engine_connect", engines._connect_ping_listener)
real_do_execute = engine.dialect.do_execute
counter = itertools.count(1)
@@ -895,7 +895,7 @@ class TestDBConnectRetry(TestsExceptionFilter):
with self._dbapi_fixture(dialect_name):
with mock.patch.object(engine.dialect, "connect", cant_connect):
- return session._test_connection(engine, retries, .01)
+ return engines._test_connection(engine, retries, .01)
def test_connect_no_retries(self):
conn = self._run_test(
@@ -967,7 +967,7 @@ class TestDBConnectPingWrapping(TestsExceptionFilter):
def setUp(self):
super(TestDBConnectPingWrapping, self).setUp()
event.listen(
- self.engine, "engine_connect", session._connect_ping_listener)
+ self.engine, "engine_connect", engines._connect_ping_listener)
@contextlib.contextmanager
def _fixture(
diff --git a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py
index 516acb6..02b0af5 100644
--- a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py
+++ b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py
@@ -31,6 +31,7 @@ from sqlalchemy.ext.declarative import declarative_base
from oslo_db import exception
from oslo_db import options as db_options
+from oslo_db.sqlalchemy import engines
from oslo_db.sqlalchemy import models
from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import test_base
@@ -312,8 +313,8 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
self.assertFalse(ses.autocommit)
self.assertTrue(ses.expire_on_commit)
- @mock.patch('oslo_db.sqlalchemy.session.get_maker')
- @mock.patch('oslo_db.sqlalchemy.session.create_engine')
+ @mock.patch('oslo_db.sqlalchemy.orm.get_maker')
+ @mock.patch('oslo_db.sqlalchemy.engines.create_engine')
def test_creation_from_config(self, create_engine, get_maker):
conf = cfg.ConfigOpts()
conf.register_opts(db_options.database_opts, group='database')
@@ -495,7 +496,7 @@ class MysqlConnectTest(test_base.MySQLOpportunisticTestCase):
)
)
):
- session._init_events.dispatch_on_drivername("mysql")(test_engine)
+ engines._init_events.dispatch_on_drivername("mysql")(test_engine)
test_engine.raw_connection()
self.assertIn('Unable to detect effective SQL mode',
@@ -556,7 +557,7 @@ class CreateEngineTest(oslo_test.BaseTestCase):
self.args = {'connect_args': {}}
def test_queuepool_args(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("mysql://u:p@host/test"), self.args,
max_pool_size=10, max_overflow=10)
self.assertEqual(self.args['pool_size'], 10)
@@ -564,7 +565,7 @@ class CreateEngineTest(oslo_test.BaseTestCase):
def test_sqlite_memory_pool_args(self):
for _url in ("sqlite://", "sqlite:///:memory:"):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url(_url), self.args,
max_pool_size=10, max_overflow=10)
@@ -581,7 +582,7 @@ class CreateEngineTest(oslo_test.BaseTestCase):
self.assertTrue('poolclass' in self.args)
def test_sqlite_file_pool_args(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("sqlite:///somefile.db"), self.args,
max_pool_size=10, max_overflow=10)
@@ -597,37 +598,37 @@ class CreateEngineTest(oslo_test.BaseTestCase):
self.assertTrue('poolclass' not in self.args)
def test_mysql_connect_args_default(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("mysql://u:p@host/test"), self.args)
self.assertEqual(self.args['connect_args'],
{'charset': 'utf8', 'use_unicode': 0})
def test_mysql_oursql_connect_args_default(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("mysql+oursql://u:p@host/test"), self.args)
self.assertEqual(self.args['connect_args'],
{'charset': 'utf8', 'use_unicode': 0})
def test_mysql_mysqldb_connect_args_default(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("mysql+mysqldb://u:p@host/test"), self.args)
self.assertEqual(self.args['connect_args'],
{'charset': 'utf8', 'use_unicode': 0})
def test_postgresql_connect_args_default(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("postgresql://u:p@host/test"), self.args)
self.assertEqual(self.args['client_encoding'], 'utf8')
self.assertFalse(self.args['connect_args'])
def test_mysqlconnector_raise_on_warnings_default(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url("mysql+mysqlconnector://u:p@host/test"),
self.args)
self.assertEqual(self.args['connect_args']['raise_on_warnings'], False)
def test_mysqlconnector_raise_on_warnings_override(self):
- session._init_connection_args(
+ engines._init_connection_args(
url.make_url(
"mysql+mysqlconnector://u:p@host/test"
"?raise_on_warnings=true"),
@@ -639,12 +640,12 @@ class CreateEngineTest(oslo_test.BaseTestCase):
def test_thread_checkin(self):
with mock.patch("sqlalchemy.event.listens_for"):
with mock.patch("sqlalchemy.event.listen") as listen_evt:
- session._init_events.dispatch_on_drivername(
+ engines._init_events.dispatch_on_drivername(
"sqlite")(mock.Mock())
self.assertEqual(
listen_evt.mock_calls[0][1][-1],
- session._thread_yield
+ engines._thread_yield
)
@@ -693,7 +694,7 @@ class PatchStacktraceTest(test_base.DbTestCase):
with mock.patch("traceback.extract_stack", side_effect=extract_stack):
- session._add_trace_comments(engine)
+ engines._add_trace_comments(engine)
conn = engine.connect()
orig_do_exec = engine.dialect.do_execute
with mock.patch.object(engine.dialect, "do_execute") as mock_exec: