diff options
-rw-r--r-- | doc/source/usage.rst | 93 | ||||
-rw-r--r-- | oslo_db/exception.py | 43 | ||||
-rw-r--r-- | oslo_db/sqlalchemy/enginefacade.py | 995 | ||||
-rw-r--r-- | oslo_db/sqlalchemy/engines.py | 413 | ||||
-rw-r--r-- | oslo_db/sqlalchemy/orm.py | 66 | ||||
-rw-r--r-- | oslo_db/sqlalchemy/session.py | 874 | ||||
-rw-r--r-- | oslo_db/sqlalchemy/test_base.py | 7 | ||||
-rw-r--r-- | oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py | 8 | ||||
-rw-r--r-- | oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py | 50 | ||||
-rw-r--r-- | oslo_db/tests/sqlalchemy/test_enginefacade.py | 1646 | ||||
-rw-r--r-- | oslo_db/tests/sqlalchemy/test_exc_filters.py | 8 | ||||
-rw-r--r-- | oslo_db/tests/sqlalchemy/test_sqlalchemy.py | 31 |
12 files changed, 3397 insertions, 837 deletions
diff --git a/doc/source/usage.rst b/doc/source/usage.rst index f352ee9..11c8c2a 100644 --- a/doc/source/usage.rst +++ b/doc/source/usage.rst @@ -7,26 +7,91 @@ To use oslo.db in a project: Session Handling ================ +Session handling is achieved using the :mod:`oslo_db.sqlalchemy.enginefacade` +system. This module presents a function decorator as well as a +context manager approach to delivering :class:`.Session` as well as +:class:`.Connection` objects to a function or block. + +Both calling styles require the use of a context object. This object may +be of any class, though when used with the decorator form, requires +special instrumentation. + +The context manager form is as follows: + .. code:: python - from oslo.config import cfg - from oslo.db.sqlalchemy import session as db_session - _FACADE = None + from oslo_db.sqlalchemy import enginefacade - def _create_facade_lazily(): - global _FACADE - if _FACADE is None: - _FACADE = db_session.EngineFacade.from_config(cfg.CONF) - return _FACADE - def get_engine(): - facade = _create_facade_lazily() - return facade.get_engine() + class MyContext(object): + "User-defined context class." + + + def some_reader_api_function(context): + with enginefacade.reader.using(context) as session: + return session.query(SomeClass).all() + + + def some_writer_api_function(context, x, y): + with enginefacade.writer.using(context) as session: + session.add(SomeClass(x, y)) + + + def run_some_database_calls(): + context = MyContext() + + results = some_reader_api_function(context) + some_writer_api_function(context, 5, 10) + + +The decorator form accesses attributes off the user-defined context +directly; the context must be decorated with the +:func:`oslo_db.sqlalchemy.enginefacade.transaction_context_provider` +decorator. Each function must receive the context as the first +positional argument: + +.. code:: python + + + from oslo_db.sqlalchemy import enginefacade + + @enginefacade.transaction_context_provider + class MyContext(object): + "User-defined context class." + + @enginefacade.reader + def some_reader_api_function(context): + return context.session.query(SomeClass).all() + + + @enginefacade.writer + def some_writer_api_function(context, x, y): + context.session.add(SomeClass(x, y)) + + + def run_some_database_calls(): + context = MyContext() + + results = some_reader_api_function(context) + some_writer_api_function(context, 5, 10) + +The scope of transaction and connectivity for both approaches is managed +transparently. The configuration for the connection comes from the standard +:obj:`oslo_config.cfg.CONF` collection. Additional configurations can be +established for the enginefacade using the +:func:`oslo_db.sqlalchemy.enginefacade.configure` function, before any use of +the database begins: + +.. code:: python + + from oslo_db.sqlalchemy import enginefacade - def get_session(**kwargs): - facade = _create_facade_lazily() - return facade.get_session(**kwargs) + enginefacade.configure( + sqlite_fk=True, + max_retries=5, + mysql_sql_mode='ANSI' + ) Base class for models usage diff --git a/oslo_db/exception.py b/oslo_db/exception.py index 506006c..c1620e1 100644 --- a/oslo_db/exception.py +++ b/oslo_db/exception.py @@ -205,3 +205,46 @@ class RetryRequest(Exception): """ def __init__(self, inner_exc): self.inner_exc = inner_exc + + +class NoEngineContextEstablished(AttributeError): + """Error raised for non-present enginefacade attribute access. + + + This applies to the ``session`` and ``connection`` attributes + of a user-defined context and/or RequestContext object, when they + are accessed outside of the scope of an enginefacade decorator + or context manager. + + The exception is a subclass of AttributeError so that + normal Python missing attribute behaviors are maintained, such + as support for ``getattr(context, 'session', None)``. + + + """ + + +class NotSupportedWarning(Warning): + """Warn that an argument or call that was passed is not supported. + + This subclasses Warning so that it can be filtered as a distinct + category. + + .. seealso:: + + https://docs.python.org/2/library/warnings.html + + """ + + +class OsloDBDeprecationWarning(DeprecationWarning): + """Issued per usage of a deprecated API. + + This subclasses DeprecationWarning so that it can be filtered as a distinct + category. + + .. seealso:: + + https://docs.python.org/2/library/warnings.html + + """ diff --git a/oslo_db/sqlalchemy/enginefacade.py b/oslo_db/sqlalchemy/enginefacade.py new file mode 100644 index 0000000..c21f39a --- /dev/null +++ b/oslo_db/sqlalchemy/enginefacade.py @@ -0,0 +1,995 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import contextlib +import functools +import operator +import threading +import warnings + +from oslo_config import cfg +from oslo_context import context as oslo_context + +from oslo_db import exception +from oslo_db import options +from oslo_db.sqlalchemy import engines +from oslo_db.sqlalchemy import orm + + +class _symbol(object): + """represent a fixed symbol.""" + + __slots__ = 'name', + + def __init__(self, name): + self.name = name + + def __repr__(self): + return "symbol(%r)" % self.name + + +_ASYNC_READER = _symbol('ASYNC_READER') +"""Represent the transaction state of "async reader". + +This state indicates that the transaction is a read-only and is +safe to use on an asynchronously updated slave database. + +""" + +_READER = _symbol('READER') +"""Represent the transaction state of "reader". + +This state indicates that the transaction is a read-only and is +only safe to use on a synchronously updated slave database; otherwise +the master database should be used. + +""" + + +_WRITER = _symbol('WRITER') +"""Represent the transaction state of "writer". + +This state indicates that the transaction writes data and +should be directed at the master database. + +""" + + +class _Default(object): + """Mark a value as a default value. + + A value in the local configuration dictionary wrapped with + _Default() will not take precedence over a value that is specified + in cfg.CONF. Values that are set after the fact using configure() + will supersede those in cfg.CONF. + + """ + + __slots__ = 'value', + + _notset = _symbol("NOTSET") + + def __init__(self, value=_notset): + self.value = value + + @classmethod + def resolve(cls, value): + if isinstance(value, _Default): + v = value.value + if v is cls._notset: + return None + else: + return v + else: + return value + + @classmethod + def resolve_w_conf(cls, value, conf_namespace, key): + if isinstance(value, _Default): + v = getattr(conf_namespace, key, value.value) + if v is cls._notset: + return None + else: + return v + else: + return value + + @classmethod + def is_set(cls, value): + return not isinstance(value, _Default) or \ + value.value is not cls._notset + + @classmethod + def is_set_w_conf(cls, value, conf_namespace, key): + return not isinstance(value, _Default) or \ + value.value is not cls._notset or \ + hasattr(conf_namespace, key) + + +class _TransactionFactory(object): + """A factory for :class:`._TransactionContext` objects. + + By default, there is just one of these, set up + based on CONF, however instance-level :class:`._TransactionFactory` + objects can be made, as is the case with the + :class:`._TestTransactionFactory` subclass used by the oslo.db test suite. + + """ + def __init__(self): + self._url_cfg = { + 'connection': _Default(), + 'slave_connection': _Default(), + } + self._engine_cfg = { + 'sqlite_fk': _Default(False), + 'mysql_sql_mode': _Default('TRADITIONAL'), + 'idle_timeout': _Default(3600), + 'connection_debug': _Default(0), + 'max_pool_size': _Default(), + 'max_overflow': _Default(), + 'pool_timeout': _Default(), + 'sqlite_synchronous': _Default(True), + 'connection_trace': _Default(False), + 'max_retries': _Default(10), + 'retry_interval': _Default(10), + 'thread_checkin': _Default(True) + } + self._maker_cfg = { + 'expire_on_commit': _Default(False), + '__autocommit': True + } + self._transaction_ctx_cfg = { + 'rollback_reader_sessions': False, + } + self._facade_cfg = { + 'synchronous_reader': True + } + + # other options that are defined in oslo.db.options.database_opts + # but do not apply to the standard enginefacade arguments + # (most seem to apply to api.DBAPI). + self._ignored_cfg = dict( + (k, _Default(None)) for k in [ + 'db_max_retries', 'db_inc_retry_interval', + 'use_db_reconnect', + 'db_retry_interval', 'min_pool_size', + 'db_max_retry_interval', + 'sqlite_db', 'backend']) + + self._started = False + self._legacy_facade = None + self._start_lock = threading.Lock() + + def configure_defaults(self, **kw): + """Apply default configurational options. + + This method can only be called before any specific + transaction-beginning methods have been called. + + Configurational options are within a fixed set of keys, and fall + under three categories: URL configuration, engine configuration, + and session configuration. Each key given will be tested against + these three configuration sets to see which one is applicable; if + it is not applicable to any set, an exception is raised. + + The configurational options given here act as **defaults** + when the :class:`._TransactionFactory` is configured using + a :class:`.oslo.config.cfg.ConfigOpts` object; the options + present within the :class:`.oslo.config.cfg.ConfigOpts` **take + precedence** versus the arguments passed here. By default, + the :class:`._TransactionFactory` loads in the configuration from + :data:`oslo.config.cfg.CONF`, after applying the + :data:`oslo.db.options.database_opts` configurational defaults to it. + + .. seealso:: + + :meth:`._TransactionFactory.configure` + + """ + self._configure(True, kw) + + def configure(self, **kw): + """Apply configurational options. + + This method can only be called before any specific + transaction-beginning methods have been called. + + Behavior here is the same as that of + :meth:`._TransactionFactory.configure_defaults`, + with the exception that values specified here will **supersede** those + setup in the :class:`.oslo.config.cfg.ConfigOpts` options. + + .. seealso:: + + :meth:`._TransactionFactory.configure_defaults` + + """ + self._configure(False, kw) + + def _configure(self, as_defaults, kw): + + if self._started: + raise TypeError("this TransactionFactory is already started") + not_supported = [] + for k, v in kw.items(): + for dict_ in ( + self._url_cfg, self._engine_cfg, + self._maker_cfg, self._ignored_cfg, + self._facade_cfg, self._transaction_ctx_cfg): + if k in dict_: + dict_[k] = _Default(v) if as_defaults else v + break + else: + not_supported.append(k) + + if not_supported: + # would like to raise ValueError here, but there are just + # too many unrecognized (obsolete?) configuration options + # coming in from projects + warnings.warn( + "Configuration option(s) %r not supported" % + sorted(not_supported), + exception.NotSupportedWarning + ) + + def get_legacy_facade(self): + """Return a :class:`.LegacyEngineFacade` for this factory. + + This facade will make use of the same engine and sessionmaker + as this factory, however will not share the same transaction context; + the legacy facade continues to work the old way of returning + a new Session each time get_session() is called. + + """ + if not self._legacy_facade: + self._legacy_facade = LegacyEngineFacade(None, _factory=self) + if not self._started: + self._start() + + return self._legacy_facade + + def _create_connection(self, mode): + if not self._started: + self._start() + if mode is _WRITER: + return self._writer_engine.connect() + elif self.synchronous_reader or mode is _ASYNC_READER: + return self._reader_engine.connect() + else: + return self._writer_engine.connect() + + def _create_session(self, mode, bind=None): + if not self._started: + self._start() + kw = {} + # don't pass 'bind' if bind is None; the sessionmaker + # already has a bind to the engine. + if bind: + kw['bind'] = bind + if mode is _WRITER: + return self._writer_maker(**kw) + elif self.synchronous_reader or mode is _ASYNC_READER: + return self._reader_maker(**kw) + else: + return self._writer_maker(**kw) + + def _args_for_conf(self, default_cfg, conf): + if conf is None: + return dict( + (key, _Default.resolve(value)) + for key, value in default_cfg.items() + if _Default.is_set(value) + ) + else: + return dict( + (key, _Default.resolve_w_conf(value, conf.database, key)) + for key, value in default_cfg.items() + if _Default.is_set_w_conf(value, conf.database, key) + ) + + def _url_args_for_conf(self, conf): + return self._args_for_conf(self._url_cfg, conf) + + def _engine_args_for_conf(self, conf): + return self._args_for_conf(self._engine_cfg, conf) + + def _maker_args_for_conf(self, conf): + return self._args_for_conf(self._maker_cfg, conf) + + def _start(self, conf=False, connection=None, slave_connection=None): + with self._start_lock: + # self._started has been checked on the outside + # when _start() was called. Within the lock, + # check the flag once more to detect the case where + # the start process proceeded while this thread was waiting + # for the lock. + if self._started: + return + self._started = True + if conf is False: + conf = cfg.CONF + + # perform register_opts() local to actually using + # the cfg.CONF to maintain exact compatibility with + # the EngineFacade design. This can be changed if needed. + if conf is not None: + conf.register_opts(options.database_opts, 'database') + + url_args = self._url_args_for_conf(conf) + if connection: + url_args['connection'] = connection + if slave_connection: + url_args['slave_connection'] = slave_connection + engine_args = self._engine_args_for_conf(conf) + maker_args = self._maker_args_for_conf(conf) + maker_args['autocommit'] = maker_args.pop('__autocommit') + + self._writer_engine, self._writer_maker = \ + self._setup_for_connection( + url_args['connection'], + engine_args, maker_args) + + if url_args.get('slave_connection'): + self._reader_engine, self._reader_maker = \ + self._setup_for_connection( + url_args['slave_connection'], + engine_args, maker_args) + else: + self._reader_engine, self._reader_maker = \ + self._writer_engine, self._writer_maker + + self.synchronous_reader = self._facade_cfg['synchronous_reader'] + + def _setup_for_connection( + self, sql_connection, engine_kwargs, maker_kwargs): + engine = engines.create_engine( + sql_connection=sql_connection, **engine_kwargs) + sessionmaker = orm.get_maker(engine=engine, **maker_kwargs) + return engine, sessionmaker + + +class _TestTransactionFactory(_TransactionFactory): + """A :class:`._TransactionFactory` used by test suites. + + This is a :class:`._TransactionFactory` that can be directly injected + with an existing engine and sessionmaker. + + Note that while this is used by oslo.db's own tests of + the enginefacade system, it is also exported for use by + the test suites of other projects, first as an element of the + oslo_db.sqlalchemy.test_base module, and secondly may be used by + external test suites directly. + + Includes a feature to inject itself temporarily as the factory + within the global :class:`._TransactionContextManager`. + + """ + def __init__(self, engine, maker, apply_global, synchronous_reader): + self._reader_engine = self._writer_engine = engine + self._reader_maker = self._writer_maker = maker + self._started = True + self._legacy_facade = None + self.synchronous_reader = synchronous_reader + + self._facade_cfg = _context_manager._factory._facade_cfg + self._transaction_ctx_cfg = \ + _context_manager._factory._transaction_ctx_cfg + if apply_global: + self.existing_factory = _context_manager._factory + _context_manager._root_factory = self + + def dispose_global(self): + _context_manager._root_factory = self.existing_factory + + +class _TransactionContext(object): + """Represent a single database transaction in progress.""" + + def __init__( + self, factory, global_factory=None, + rollback_reader_sessions=False): + """Construct a new :class:`.TransactionContext`. + + :param factory: the :class:`.TransactionFactory` which will + serve as a source of connectivity. + + :param global_factory: the "global" factory which will be used + by the global ``_context_manager`` for new ``_TransactionContext`` + objects created under this one. When left as None the actual + "global" factory is used. + + :param rollback_reader_sessions: if True, a :class:`.Session` object + will have its :meth:`.Session.rollback` method invoked at the end + of a ``@reader`` block, actively rolling back the transaction and + expiring the objects within, before the :class:`.Session` moves + on to be closed, which has the effect of releasing connection + resources back to the connection pool and detaching all objects. + If False, the :class:`.Session` is + not affected at the end of a ``@reader`` block; the underlying + connection referred to by this :class:`.Session` will still + be released in the enclosing context via the :meth:`.Session.close` + method, which still ensures that the DBAPI connection is rolled + back, however the objects associated with the :class:`.Session` + retain their database-persisted contents after they are detached. + + .. seealso:: + + http://docs.sqlalchemy.org/en/rel_0_9/glossary.html#term-released\ + SQLAlchemy documentation on what "releasing resources" means. + + """ + self.factory = factory + self.global_factory = global_factory + self.mode = None + self.session = None + self.connection = None + self.transaction = None + kw = self.factory._transaction_ctx_cfg + self.rollback_reader_sessions = kw['rollback_reader_sessions'] + + @contextlib.contextmanager + def _connection(self, savepoint=False): + if self.connection is None: + try: + if self.session is not None: + # use existing session, which is outer to us + self.connection = self.session.connection() + if savepoint: + with self.connection.begin_nested(): + yield self.connection + else: + yield self.connection + else: + # is outermost + self.connection = self.factory._create_connection( + mode=self.mode) + self.transaction = self.connection.begin() + try: + yield self.connection + self._end_connection_transaction(self.transaction) + except Exception: + self.transaction.rollback() + # TODO(zzzeek) do we need save_and_reraise() here, + # or do newer eventlets not have issues? we are using + # raw "raise" in many other places in oslo.db already + # (and one six.reraise()). + raise + finally: + self.transaction = None + self.connection.close() + finally: + self.connection = None + + else: + # use existing connection, which is outer to us + if savepoint: + with self.connection.begin_nested(): + yield self.connection + else: + yield self.connection + + @contextlib.contextmanager + def _session(self, savepoint=False): + if self.session is None: + self.session = self.factory._create_session( + bind=self.connection, mode=self.mode) + try: + self.session.begin() + yield self.session + self._end_session_transaction(self.session) + except Exception: + self.session.rollback() + # TODO(zzzeek) do we need save_and_reraise() here, + # or do newer eventlets not have issues? we are using + # raw "raise" in many other places in oslo.db already + # (and one six.reraise()). + raise + finally: + self.session.close() + self.session = None + else: + # use existing session, which is outer to us + if savepoint: + with self.session.begin_nested(): + yield self.session + else: + yield self.session + + def _end_session_transaction(self, session): + if self.mode is _WRITER: + session.commit() + elif self.rollback_reader_sessions: + session.rollback() + # In the absense of calling session.rollback(), + # the next call is session.close(). This releases all + # objects from the session into the detached state, and + # releases the connection as well; the connection when returned + # to the pool is either rolled back in any case, or closed fully. + + def _end_connection_transaction(self, transaction): + if self.mode is _WRITER: + transaction.commit() + else: + transaction.rollback() + + def _produce_block(self, mode, connection, savepoint): + if mode is _WRITER: + self._writer() + elif mode is _ASYNC_READER: + self._async_reader() + else: + self._reader() + if connection: + return self._connection(savepoint) + else: + return self._session(savepoint) + + def _writer(self): + if self.mode is None: + self.mode = _WRITER + elif self.mode is _READER: + raise TypeError( + "Can't upgrade a READER transaction " + "to a WRITER mid-transaction") + elif self.mode is _ASYNC_READER: + raise TypeError( + "Can't upgrade an ASYNC_READER transaction " + "to a WRITER mid-transaction") + + def _reader(self): + if self.mode is None: + self.mode = _READER + elif self.mode is _ASYNC_READER: + raise TypeError( + "Can't upgrade an ASYNC_READER transaction " + "to a READER mid-transaction") + + def _async_reader(self): + if self.mode is None: + self.mode = _ASYNC_READER + + +class _TransactionContextTLocal(threading.local): + def __deepcopy__(self, memo): + return self + + +class _TransactionContextManager(object): + """Provide context-management and decorator patterns for transactions. + + This object integrates user-defined "context" objects with the + :class:`._TransactionContext` class, on behalf of a + contained :class:`._TransactionFactory`. + + """ + + def __init__( + self, root=None, + mode=None, + independent=False, + savepoint=False, + connection=False, + replace_global_factory=None, + _is_global_manager=False): + + if root is None: + self._root = self + self._root_factory = _TransactionFactory() + else: + self._root = root + + self._replace_global_factory = replace_global_factory + self._is_global_manager = _is_global_manager + self._mode = mode + self._independent = independent + self._savepoint = savepoint + if self._savepoint and self._independent: + raise TypeError( + "setting savepoint and independent makes no sense.") + self._connection = connection + + @property + def _factory(self): + """The :class:`._TransactionFactory` associated with this context.""" + return self._root._root_factory + + def configure(self, **kw): + """Apply configurational options to the factory. + + This method can only be called before any specific + transaction-beginning methods have been called. + + + """ + self._factory.configure(**kw) + + @property + def replace(self): + """Modifier to replace the global transaction factory with this one.""" + return self._clone(replace_global_factory=self._factory) + + @property + def writer(self): + """Modifier to set the transaction to WRITER.""" + return self._clone(mode=_WRITER) + + @property + def reader(self): + """Modifier to set the transaction to READER.""" + return self._clone(mode=_READER) + + @property + def independent(self): + """Modifier to start a transaction independent from any enclosing.""" + return self._clone(independent=True) + + @property + def savepoint(self): + """Modifier to start a SAVEPOINT if a transaction already exists.""" + return self._clone(savepoint=True) + + @property + def connection(self): + """Modifier to return a core Connection object instead of Session.""" + return self._clone(connection=True) + + @property + def async(self): + """Modifier to set a READER operation to ASYNC_READER.""" + + if self._mode is _WRITER: + raise TypeError("Setting async on a WRITER makes no sense") + return self._clone(mode=_ASYNC_READER) + + def using(self, context): + """Provide a context manager block that will use the given context.""" + return self._transaction_scope(context) + + def __call__(self, fn): + """Decorate a function.""" + + @functools.wraps(fn) + def wrapper(*args, **kwargs): + context = args[0] + + with self._transaction_scope(context): + return fn(*args, **kwargs) + + return wrapper + + def _clone(self, **kw): + default_kw = { + "independent": self._independent, + "mode": self._mode, + "connection": self._connection + } + default_kw.update(kw) + return _TransactionContextManager(root=self._root, **default_kw) + + @contextlib.contextmanager + def _transaction_scope(self, context): + new_transaction = self._independent + transaction_contexts_by_thread = \ + _transaction_contexts_by_thread(context) + + current = restore = getattr( + transaction_contexts_by_thread, "current", None) + + use_factory = self._factory + global_factory = None + + if self._replace_global_factory: + use_factory = global_factory = self._replace_global_factory + elif current is not None and current.global_factory: + global_factory = current.global_factory + + if self._root._is_global_manager: + use_factory = global_factory + + if current is not None and ( + new_transaction or current.factory is not use_factory + ): + current = None + + if current is None: + current = transaction_contexts_by_thread.current = \ + _TransactionContext( + use_factory, global_factory=global_factory, + **use_factory._transaction_ctx_cfg) + + try: + if self._mode is not None: + with current._produce_block( + mode=self._mode, + connection=self._connection, + savepoint=self._savepoint) as resource: + yield resource + else: + yield + finally: + if restore is None: + del transaction_contexts_by_thread.current + elif current is not restore: + transaction_contexts_by_thread.current = restore + + +def _context_descriptor(attr=None): + getter = operator.attrgetter(attr) + + def _property_for_context(context): + try: + transaction_context = context.transaction_ctx + except exception.NoEngineContextEstablished: + raise exception.NoEngineContextEstablished( + "No TransactionContext is established for " + "this %s object within the current thread; " + "the %r attribute is unavailable." + % (context, attr) + ) + else: + return getter(transaction_context) + return property(_property_for_context) + + +def _transaction_ctx_for_context(context): + by_thread = _transaction_contexts_by_thread(context) + try: + return by_thread.current + except AttributeError: + raise exception.NoEngineContextEstablished( + "No TransactionContext is established for " + "this %s object within the current thread. " + % context + ) + + +def _transaction_contexts_by_thread(context): + transaction_contexts_by_thread = getattr( + context, '_enginefacade_context', None) + if transaction_contexts_by_thread is None: + transaction_contexts_by_thread = \ + context._enginefacade_context = _TransactionContextTLocal() + + return transaction_contexts_by_thread + + +def transaction_context_provider(klass): + """Decorate a class with ``session`` and ``connection`` attributes.""" + + setattr( + klass, + 'transaction_ctx', + property(_transaction_ctx_for_context)) + + # Graft transaction context attributes as context properties + for attr in ('session', 'connection', 'transaction'): + setattr(klass, attr, _context_descriptor(attr)) + + return klass + + +# apply the context descriptors to oslo.context.RequestContext +transaction_context_provider(oslo_context.RequestContext) + + +_context_manager = _TransactionContextManager(_is_global_manager=True) +"""default context manager.""" + + +def transaction_context(): + """Construct a local transaction context. + + """ + return _TransactionContextManager() + + +def configure(**kw): + """Apply configurational options to the global factory. + + This method can only be called before any specific transaction-beginning + methods have been called. + + .. seealso:: + + :meth:`._TransactionFactory.configure` + + """ + _context_manager._factory.configure(**kw) + + +def get_legacy_facade(): + """Return a :class:`.LegacyEngineFacade` for the global factory. + + This facade will make use of the same engine and sessionmaker + as this factory, however will not share the same transaction context; + the legacy facade continues to work the old way of returning + a new Session each time get_session() is called. + + """ + return _context_manager._factory.get_legacy_facade() + + +reader = _context_manager.reader +"""The global 'reader' starting point.""" + + +writer = _context_manager.writer +"""The global 'writer' starting point.""" + + +class LegacyEngineFacade(object): + """A helper class for removing of global engine instances from oslo.db. + + .. deprecated:: + + EngineFacade is deprecated. Please use + oslo.db.sqlalchemy.enginefacade for new development. + + As a library, oslo.db can't decide where to store/when to create engine + and sessionmaker instances, so this must be left for a target application. + + On the other hand, in order to simplify the adoption of oslo.db changes, + we'll provide a helper class, which creates engine and sessionmaker + on its instantiation and provides get_engine()/get_session() methods + that are compatible with corresponding utility functions that currently + exist in target projects, e.g. in Nova. + + engine/sessionmaker instances will still be global (and they are meant to + be global), but they will be stored in the app context, rather that in the + oslo.db context. + + Two important things to remember: + + 1. An Engine instance is effectively a pool of DB connections, so it's + meant to be shared (and it's thread-safe). + 2. A Session instance is not meant to be shared and represents a DB + transactional context (i.e. it's not thread-safe). sessionmaker is + a factory of sessions. + + """ + def __init__(self, sql_connection, slave_connection=None, + sqlite_fk=False, autocommit=True, + expire_on_commit=False, _conf=None, _factory=None, **kwargs): + """Initialize engine and sessionmaker instances. + + :param sql_connection: the connection string for the database to use + :type sql_connection: string + + :param slave_connection: the connection string for the 'slave' database + to use. If not provided, the master database + will be used for all operations. Note: this + is meant to be used for offloading of read + operations to asynchronously replicated slaves + to reduce the load on the master database. + :type slave_connection: string + + :param sqlite_fk: enable foreign keys in SQLite + :type sqlite_fk: bool + + :param autocommit: use autocommit mode for created Session instances + :type autocommit: bool + + :param expire_on_commit: expire session objects on commit + :type expire_on_commit: bool + + Keyword arguments: + + :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions. + (defaults to TRADITIONAL) + :keyword idle_timeout: timeout before idle sql connections are reaped + (defaults to 3600) + :keyword connection_debug: verbosity of SQL debugging information. + -1=Off, 0=None, 100=Everything (defaults + to 0) + :keyword max_pool_size: maximum number of SQL connections to keep open + in a pool (defaults to SQLAlchemy settings) + :keyword max_overflow: if set, use this value for max_overflow with + sqlalchemy (defaults to SQLAlchemy settings) + :keyword pool_timeout: if set, use this value for pool_timeout with + sqlalchemy (defaults to SQLAlchemy settings) + :keyword sqlite_synchronous: if True, SQLite uses synchronous mode + (defaults to True) + :keyword connection_trace: add python stack traces to SQL as comment + strings (defaults to False) + :keyword max_retries: maximum db connection retries during startup. + (setting -1 implies an infinite retry count) + (defaults to 10) + :keyword retry_interval: interval between retries of opening a sql + connection (defaults to 10) + :keyword thread_checkin: boolean that indicates that between each + engine checkin event a sleep(0) will occur to + allow other greenthreads to run (defaults to + True) + """ + warnings.warn( + "EngineFacade is deprecated; please use " + "oslo.db.sqlalchemy.enginefacade", + exception.OsloDBDeprecationWarning, + stacklevel=2) + + if _factory: + self._factory = _factory + else: + self._factory = _TransactionFactory() + + self._factory.configure( + sqlite_fk=sqlite_fk, + __autocommit=autocommit, + expire_on_commit=expire_on_commit, + **kwargs + ) + # make sure passed-in urls are favored over that + # of config + self._factory._start( + _conf, connection=sql_connection, + slave_connection=slave_connection) + + def get_engine(self, use_slave=False): + """Get the engine instance (note, that it's shared). + + :param use_slave: if possible, use 'slave' database for this engine. + If the connection string for the slave database + wasn't provided, 'master' engine will be returned. + (defaults to False) + :type use_slave: bool + + """ + if use_slave: + return self._factory._reader_engine + else: + return self._factory._writer_engine + + def get_session(self, use_slave=False, **kwargs): + """Get a Session instance. + + :param use_slave: if possible, use 'slave' database connection for + this session. If the connection string for the + slave database wasn't provided, a session bound + to the 'master' engine will be returned. + (defaults to False) + :type use_slave: bool + + Keyword arugments will be passed to a sessionmaker instance as is (if + passed, they will override the ones used when the sessionmaker instance + was created). See SQLAlchemy Session docs for details. + + """ + if use_slave: + return self._factory._reader_maker(**kwargs) + else: + return self._factory._writer_maker(**kwargs) + + @classmethod + def from_config(cls, conf, + sqlite_fk=False, autocommit=True, expire_on_commit=False): + """Initialize EngineFacade using oslo.config config instance options. + + :param conf: oslo.config config instance + :type conf: oslo.config.cfg.ConfigOpts + + :param sqlite_fk: enable foreign keys in SQLite + :type sqlite_fk: bool + + :param autocommit: use autocommit mode for created Session instances + :type autocommit: bool + + :param expire_on_commit: expire session objects on commit + :type expire_on_commit: bool + + """ + + return cls( + None, + sqlite_fk=sqlite_fk, + autocommit=autocommit, + expire_on_commit=expire_on_commit, _conf=conf) diff --git a/oslo_db/sqlalchemy/engines.py b/oslo_db/sqlalchemy/engines.py new file mode 100644 index 0000000..6e041a2 --- /dev/null +++ b/oslo_db/sqlalchemy/engines.py @@ -0,0 +1,413 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Core SQLAlchemy connectivity routines. +""" + +import itertools +import logging +import os +import re +import time + +import six +import sqlalchemy +from sqlalchemy import event +from sqlalchemy import exc +from sqlalchemy import pool +from sqlalchemy.sql.expression import select + +from oslo_db._i18n import _LW +from oslo_db import exception + +from oslo_db.sqlalchemy import exc_filters +from oslo_db.sqlalchemy import utils + +LOG = logging.getLogger(__name__) + + +def _thread_yield(dbapi_con, con_record): + """Ensure other greenthreads get a chance to be executed. + + If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will + execute instead of time.sleep(0). + Force a context switch. With common database backends (eg MySQLdb and + sqlite), there is no implicit yield caused by network I/O since they are + implemented by C libraries that eventlet cannot monkey patch. + """ + time.sleep(0) + + +def _connect_ping_listener(connection, branch): + """Ping the server at connection startup. + + Ping the server at transaction begin and transparently reconnect + if a disconnect exception occurs. + """ + if branch: + return + + # turn off "close with result". This can also be accomplished + # by branching the connection, however just setting the flag is + # more performant and also doesn't get involved with some + # connection-invalidation awkardness that occurs (see + # https://bitbucket.org/zzzeek/sqlalchemy/issue/3215/) + save_should_close_with_result = connection.should_close_with_result + connection.should_close_with_result = False + try: + # run a SELECT 1. use a core select() so that + # any details like that needed by Oracle, DB2 etc. are handled. + connection.scalar(select([1])) + except exception.DBConnectionError: + # catch DBConnectionError, which is raised by the filter + # system. + # disconnect detected. The connection is now + # "invalid", but the pool should be ready to return + # new connections assuming they are good now. + # run the select again to re-validate the Connection. + connection.scalar(select([1])) + finally: + connection.should_close_with_result = save_should_close_with_result + + +def _setup_logging(connection_debug=0): + """setup_logging function maps SQL debug level to Python log level. + + Connection_debug is a verbosity of SQL debugging information. + 0=None(default value), + 1=Processed only messages with WARNING level or higher + 50=Processed only messages with INFO level or higher + 100=Processed only messages with DEBUG level + """ + if connection_debug >= 0: + logger = logging.getLogger('sqlalchemy.engine') + if connection_debug >= 100: + logger.setLevel(logging.DEBUG) + elif connection_debug >= 50: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.WARNING) + + +def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, + idle_timeout=3600, + connection_debug=0, max_pool_size=None, max_overflow=None, + pool_timeout=None, sqlite_synchronous=True, + connection_trace=False, max_retries=10, retry_interval=10, + thread_checkin=True, logging_name=None): + """Return a new SQLAlchemy engine.""" + + url = sqlalchemy.engine.url.make_url(sql_connection) + + engine_args = { + "pool_recycle": idle_timeout, + 'convert_unicode': True, + 'connect_args': {}, + 'logging_name': logging_name + } + + _setup_logging(connection_debug) + + _init_connection_args( + url, engine_args, + sqlite_fk=sqlite_fk, + max_pool_size=max_pool_size, + max_overflow=max_overflow, + pool_timeout=pool_timeout + ) + + engine = sqlalchemy.create_engine(url, **engine_args) + + _init_events( + engine, + mysql_sql_mode=mysql_sql_mode, + sqlite_synchronous=sqlite_synchronous, + sqlite_fk=sqlite_fk, + thread_checkin=thread_checkin, + connection_trace=connection_trace + ) + + # register alternate exception handler + exc_filters.register_engine(engine) + + # register engine connect handler + event.listen(engine, "engine_connect", _connect_ping_listener) + + # initial connect + test + # NOTE(viktors): the current implementation of _test_connection() + # does nothing, if max_retries == 0, so we can skip it + if max_retries: + test_conn = _test_connection(engine, max_retries, retry_interval) + test_conn.close() + + return engine + + +@utils.dispatch_for_dialect('*', multiple=True) +def _init_connection_args( + url, engine_args, + max_pool_size=None, max_overflow=None, pool_timeout=None, **kw): + + pool_class = url.get_dialect().get_pool_class(url) + if issubclass(pool_class, pool.QueuePool): + if max_pool_size is not None: + engine_args['pool_size'] = max_pool_size + if max_overflow is not None: + engine_args['max_overflow'] = max_overflow + if pool_timeout is not None: + engine_args['pool_timeout'] = pool_timeout + + +@_init_connection_args.dispatch_for("sqlite") +def _init_connection_args(url, engine_args, **kw): + pool_class = url.get_dialect().get_pool_class(url) + # singletonthreadpool is used for :memory: connections; + # replace it with StaticPool. + if issubclass(pool_class, pool.SingletonThreadPool): + engine_args["poolclass"] = pool.StaticPool + engine_args['connect_args']['check_same_thread'] = False + + +@_init_connection_args.dispatch_for("postgresql") +def _init_connection_args(url, engine_args, **kw): + if 'client_encoding' not in url.query: + # Set encoding using engine_args instead of connect_args since + # it's supported for PostgreSQL 8.*. More details at: + # http://docs.sqlalchemy.org/en/rel_0_9/dialects/postgresql.html + engine_args['client_encoding'] = 'utf8' + + +@_init_connection_args.dispatch_for("mysql") +def _init_connection_args(url, engine_args, **kw): + if 'charset' not in url.query: + engine_args['connect_args']['charset'] = 'utf8' + + +@_init_connection_args.dispatch_for("mysql+mysqlconnector") +def _init_connection_args(url, engine_args, **kw): + # mysqlconnector engine (<1.0) incorrectly defaults to + # raise_on_warnings=True + # https://bitbucket.org/zzzeek/sqlalchemy/issue/2515 + if 'raise_on_warnings' not in url.query: + engine_args['connect_args']['raise_on_warnings'] = False + + +@_init_connection_args.dispatch_for("mysql+mysqldb") +@_init_connection_args.dispatch_for("mysql+oursql") +def _init_connection_args(url, engine_args, **kw): + # Those drivers require use_unicode=0 to avoid performance drop due + # to internal usage of Python unicode objects in the driver + # http://docs.sqlalchemy.org/en/rel_0_9/dialects/mysql.html + if 'use_unicode' not in url.query: + engine_args['connect_args']['use_unicode'] = 0 + + +@utils.dispatch_for_dialect('*', multiple=True) +def _init_events(engine, thread_checkin=True, connection_trace=False, **kw): + """Set up event listeners for all database backends.""" + + _add_process_guards(engine) + + if connection_trace: + _add_trace_comments(engine) + + if thread_checkin: + sqlalchemy.event.listen(engine, 'checkin', _thread_yield) + + +@_init_events.dispatch_for("mysql") +def _init_events(engine, mysql_sql_mode=None, **kw): + """Set up event listeners for MySQL.""" + + if mysql_sql_mode is not None: + @sqlalchemy.event.listens_for(engine, "connect") + def _set_session_sql_mode(dbapi_con, connection_rec): + cursor = dbapi_con.cursor() + cursor.execute("SET SESSION sql_mode = %s", [mysql_sql_mode]) + + @sqlalchemy.event.listens_for(engine, "first_connect") + def _check_effective_sql_mode(dbapi_con, connection_rec): + if mysql_sql_mode is not None: + _set_session_sql_mode(dbapi_con, connection_rec) + + cursor = dbapi_con.cursor() + cursor.execute("SHOW VARIABLES LIKE 'sql_mode'") + realmode = cursor.fetchone() + + if realmode is None: + LOG.warning(_LW('Unable to detect effective SQL mode')) + else: + realmode = realmode[1] + LOG.debug('MySQL server mode set to %s', realmode) + if 'TRADITIONAL' not in realmode.upper() and \ + 'STRICT_ALL_TABLES' not in realmode.upper(): + LOG.warning( + _LW( + "MySQL SQL mode is '%s', " + "consider enabling TRADITIONAL or STRICT_ALL_TABLES"), + realmode) + + +@_init_events.dispatch_for("sqlite") +def _init_events(engine, sqlite_synchronous=True, sqlite_fk=False, **kw): + """Set up event listeners for SQLite. + + This includes several settings made on connections as they are + created, as well as transactional control extensions. + + """ + + def regexp(expr, item): + reg = re.compile(expr) + return reg.search(six.text_type(item)) is not None + + @sqlalchemy.event.listens_for(engine, "connect") + def _sqlite_connect_events(dbapi_con, con_record): + + # Add REGEXP functionality on SQLite connections + dbapi_con.create_function('regexp', 2, regexp) + + if not sqlite_synchronous: + # Switch sqlite connections to non-synchronous mode + dbapi_con.execute("PRAGMA synchronous = OFF") + + # Disable pysqlite's emitting of the BEGIN statement entirely. + # Also stops it from emitting COMMIT before any DDL. + # below, we emit BEGIN ourselves. + # see http://docs.sqlalchemy.org/en/rel_0_9/dialects/\ + # sqlite.html#serializable-isolation-savepoints-transactional-ddl + dbapi_con.isolation_level = None + + if sqlite_fk: + # Ensures that the foreign key constraints are enforced in SQLite. + dbapi_con.execute('pragma foreign_keys=ON') + + @sqlalchemy.event.listens_for(engine, "begin") + def _sqlite_emit_begin(conn): + # emit our own BEGIN, checking for existing + # transactional state + if 'in_transaction' not in conn.info: + conn.execute("BEGIN") + conn.info['in_transaction'] = True + + @sqlalchemy.event.listens_for(engine, "rollback") + @sqlalchemy.event.listens_for(engine, "commit") + def _sqlite_end_transaction(conn): + # remove transactional marker + conn.info.pop('in_transaction', None) + + +def _test_connection(engine, max_retries, retry_interval): + if max_retries == -1: + attempts = itertools.count() + else: + attempts = six.moves.range(max_retries) + # See: http://legacy.python.org/dev/peps/pep-3110/#semantic-changes for + # why we are not using 'de' directly (it can be removed from the local + # scope). + de_ref = None + for attempt in attempts: + try: + return engine.connect() + except exception.DBConnectionError as de: + msg = _LW('SQL connection failed. %s attempts left.') + LOG.warning(msg, max_retries - attempt) + time.sleep(retry_interval) + de_ref = de + else: + if de_ref is not None: + six.reraise(type(de_ref), de_ref) + + +def _add_process_guards(engine): + """Add multiprocessing guards. + + Forces a connection to be reconnected if it is detected + as having been shared to a sub-process. + + """ + + @sqlalchemy.event.listens_for(engine, "connect") + def connect(dbapi_connection, connection_record): + connection_record.info['pid'] = os.getpid() + + @sqlalchemy.event.listens_for(engine, "checkout") + def checkout(dbapi_connection, connection_record, connection_proxy): + pid = os.getpid() + if connection_record.info['pid'] != pid: + LOG.debug(_LW( + "Parent process %(orig)s forked (%(newproc)s) with an open " + "database connection, " + "which is being discarded and recreated."), + {"newproc": pid, "orig": connection_record.info['pid']}) + connection_record.connection = connection_proxy.connection = None + raise exc.DisconnectionError( + "Connection record belongs to pid %s, " + "attempting to check out in pid %s" % + (connection_record.info['pid'], pid) + ) + + +def _add_trace_comments(engine): + """Add trace comments. + + Augment statements with a trace of the immediate calling code + for a given statement. + """ + + import os + import sys + import traceback + target_paths = set([ + os.path.dirname(sys.modules['oslo_db'].__file__), + os.path.dirname(sys.modules['sqlalchemy'].__file__) + ]) + skip_paths = set([ + os.path.dirname(sys.modules['oslo_db.tests'].__file__), + ]) + + @sqlalchemy.event.listens_for(engine, "before_cursor_execute", retval=True) + def before_cursor_execute(conn, cursor, statement, parameters, context, + executemany): + + # NOTE(zzzeek) - if different steps per DB dialect are desirable + # here, switch out on engine.name for now. + stack = traceback.extract_stack() + our_line = None + + for idx, (filename, line, method, function) in enumerate(stack): + for tgt in skip_paths: + if filename.startswith(tgt): + break + else: + for tgt in target_paths: + if filename.startswith(tgt): + our_line = idx + break + if our_line: + break + + if our_line: + trace = "; ".join( + "File: %s (%s) %s" % ( + line[0], line[1], line[2] + ) + # include three lines of context. + for line in stack[our_line - 3:our_line] + + ) + statement = "%s -- %s" % (statement, trace) + + return statement, parameters diff --git a/oslo_db/sqlalchemy/orm.py b/oslo_db/sqlalchemy/orm.py new file mode 100644 index 0000000..decd8c8 --- /dev/null +++ b/oslo_db/sqlalchemy/orm.py @@ -0,0 +1,66 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""SQLAlchemy ORM connectivity and query structures. +""" + +from oslo_utils import timeutils +import sqlalchemy.orm +from sqlalchemy.sql.expression import literal_column + +from oslo_db.sqlalchemy import update_match + + +class Query(sqlalchemy.orm.query.Query): + """Subclass of sqlalchemy.query with soft_delete() method.""" + def soft_delete(self, synchronize_session='evaluate'): + return self.update({'deleted': literal_column('id'), + 'updated_at': literal_column('updated_at'), + 'deleted_at': timeutils.utcnow()}, + synchronize_session=synchronize_session) + + def update_returning_pk(self, values, surrogate_key): + """Perform an UPDATE, returning the primary key of the matched row. + + This is a method-version of + oslo_db.sqlalchemy.update_match.update_returning_pk(); see that + function for usage details. + + """ + return update_match.update_returning_pk(self, values, surrogate_key) + + def update_on_match(self, specimen, surrogate_key, values, **kw): + """Emit an UPDATE statement matching the given specimen. + + This is a method-version of + oslo_db.sqlalchemy.update_match.update_on_match(); see that function + for usage details. + + """ + return update_match.update_on_match( + self, specimen, surrogate_key, values, **kw) + + +class Session(sqlalchemy.orm.session.Session): + """oslo.db-specific Session subclass.""" + + +def get_maker(engine, autocommit=True, expire_on_commit=False): + """Return a SQLAlchemy sessionmaker using the given engine.""" + return sqlalchemy.orm.sessionmaker(bind=engine, + class_=Session, + autocommit=autocommit, + expire_on_commit=expire_on_commit, + query_cls=Query) diff --git a/oslo_db/sqlalchemy/session.py b/oslo_db/sqlalchemy/session.py index 0dae851..9155176 100644 --- a/oslo_db/sqlalchemy/session.py +++ b/oslo_db/sqlalchemy/session.py @@ -18,96 +18,71 @@ Recommended ways to use sessions within this framework: -* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``. - `model_query()` will implicitly use a session when called without one - supplied. This is the ideal situation because it will allow queries - to be automatically retried if the database connection is interrupted. - - .. note:: Automatic retry will be enabled in a future patch. - - It is generally fine to issue several queries in a row like this. Even though - they may be run in separate transactions and/or separate sessions, each one - will see the data from the prior calls. If needed, undo- or rollback-like - functionality should be handled at a logical level. For an example, look at - the code around quotas and `reservation_rollback()`. - - Examples: +* Use the ``enginefacade`` system for connectivity, session and + transaction management: .. code-block:: python + from oslo.db.sqlalchemy import enginefacade + + @enginefacade.reader def get_foo(context, foo): - return (model_query(context, models.Foo). + return (model_query(models.Foo, context.session). filter_by(foo=foo). first()) + @enginefacade.writer def update_foo(context, id, newfoo): - (model_query(context, models.Foo). + (model_query(models.Foo, context.session). filter_by(id=id). update({'foo': newfoo})) + @enginefacade.writer def create_foo(context, values): foo_ref = models.Foo() foo_ref.update(values) - foo_ref.save() + foo_ref.save(context.session) return foo_ref - -* Within the scope of a single method, keep all the reads and writes within - the context managed by a single session. In this way, the session's - `__exit__` handler will take care of calling `flush()` and `commit()` for - you. If using this approach, you should not explicitly call `flush()` or - `commit()`. Any error within the context of the session will cause the - session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be - raised in `session`'s `__exit__` handler, and any try/except within the - context managed by `session` will not be triggered. And catching other - non-database errors in the session will not trigger the ROLLBACK, so - exception handlers should always be outside the session, unless the - developer wants to do a partial commit on purpose. If the connection is - dropped before this is possible, the database will implicitly roll back the - transaction. + In the above system, transactions are committed automatically, and + are shared among all dependent database methods. Ensure + that methods which "write" data are enclosed within @writer blocks. .. note:: Statements in the session scope will not be automatically retried. - If you create models within the session, they need to be added, but you +* If you create models within the session, they need to be added, but you do not need to call `model.save()`: .. code-block:: python + @enginefacade.writer def create_many_foo(context, foos): - session = sessionmaker() - with session.begin(): - for foo in foos: - foo_ref = models.Foo() - foo_ref.update(foo) - session.add(foo_ref) + for foo in foos: + foo_ref = models.Foo() + foo_ref.update(foo) + context.session.add(foo_ref) + @enginefacade.writer def update_bar(context, foo_id, newbar): - session = sessionmaker() - with session.begin(): - foo_ref = (model_query(context, models.Foo, session). - filter_by(id=foo_id). - first()) - (model_query(context, models.Bar, session). - filter_by(id=foo_ref['bar_id']). - update({'bar': newbar})) - - .. note:: `update_bar` is a trivially simple example of using - ``with session.begin``. Whereas `create_many_foo` is a good example of - when a transaction is needed, it is always best to use as few queries as - possible. - - The two queries in `update_bar` can be better expressed using a single query - which avoids the need for an explicit transaction. It can be expressed like - so: + foo_ref = (model_query(models.Foo, context.session). + filter_by(id=foo_id). + first()) + (model_query(models.Bar, context.session). + filter_by(id=foo_ref['bar_id']). + update({'bar': newbar})) + + The two queries in `update_bar` can alternatively be expressed using + a single query, which may be more efficient depending on scenario: .. code-block:: python + @enginefacade.writer def update_bar(context, foo_id, newbar): - subq = (model_query(context, models.Foo.id). + subq = (model_query(models.Foo.id, context.session). filter_by(id=foo_id). limit(1). subquery()) - (model_query(context, models.Bar). + (model_query(models.Bar, context.session). filter_by(id=subq.as_scalar()). update({'bar': newbar})) @@ -119,87 +94,54 @@ Recommended ways to use sessions within this framework: WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1); .. note:: `create_duplicate_foo` is a trivially simple example of catching an - exception while using ``with session.begin``. Here create two duplicate + exception while using a savepoint. Here we create two duplicate instances with same primary key, must catch the exception out of context managed by a single session: .. code-block:: python + @enginefacade.writer def create_duplicate_foo(context): foo1 = models.Foo() foo2 = models.Foo() foo1.id = foo2.id = 1 - session = sessionmaker() try: - with session.begin(): + with context.session.begin_nested(): session.add(foo1) session.add(foo2) except exception.DBDuplicateEntry as e: handle_error(e) -* Passing an active session between methods. Sessions should only be passed - to private methods. The private method must use a subtransaction; otherwise - SQLAlchemy will throw an error when you call `session.begin()` on an existing - transaction. Public methods should not accept a session parameter and should - not be involved in sessions within the caller's scope. - - Note that this incurs more overhead in SQLAlchemy than the above means - due to nesting transactions, and it is not possible to implicitly retry - failed database operations when using this approach. - - This also makes code somewhat more difficult to read and debug, because a - single database transaction spans more than one method. Error handling - becomes less clear in this situation. When this is needed for code clarity, - it should be clearly documented. +* The enginefacade system eliminates the need to decide when sessions need + to be passed between methods. All methods should instead share a common + context object; the enginefacade system will maintain the transaction + across method calls. .. code-block:: python - def myfunc(foo): - session = sessionmaker() - with session.begin(): - # do some database things - bar = _private_func(foo, session) + @enginefacade.writer + def myfunc(context, foo): + # do some database things + bar = _private_func(context, foo) return bar - def _private_func(foo, session=None): - if not session: - session = sessionmaker() - with session.begin(subtransaction=True): + def _private_func(context, foo): + with enginefacade.using_writer(context) as session: # do some other database things + session.add(SomeObject()) return bar -There are some things which it is best to avoid: - -* Don't keep a transaction open any longer than necessary. - - This means that your ``with session.begin()`` block should be as short - as possible, while still containing all the related calls for that - transaction. - * Avoid ``with_lockmode('UPDATE')`` when possible. - In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match - any rows, it will take a gap-lock. This is a form of write-lock on the - "gap" where no rows exist, and prevents any other writes to that space. - This can effectively prevent any INSERT into a table by locking the gap - at the end of the index. Similar problems will occur if the SELECT FOR UPDATE - has an overly broad WHERE clause, or doesn't properly use an index. - - One idea proposed at ODS Fall '12 was to use a normal SELECT to test the - number of rows matching a query, and if only one row is returned, - then issue the SELECT FOR UPDATE. - - The better long-term solution is to use - ``INSERT .. ON DUPLICATE KEY UPDATE``. - However, this can not be done until the "deleted" columns are removed and - proper UNIQUE constraints are added to the tables. - + FOR UPDATE is not compatible with MySQL/Galera. Instead, an "opportunistic" + approach should be used, such that if an UPDATE fails, the entire + transaction should be retried. The @wrap_db_retry decorator is one + such system that can be used to achieve this. Enabling soft deletes: -* To use/enable soft-deletes, the `SoftDeleteMixin` must be added - to your model class. For example: +* To use/enable soft-deletes, `SoftDeleteMixin` may be used. For example: .. code-block:: python @@ -209,698 +151,46 @@ Enabling soft deletes: Efficient use of soft deletes: -* There are two possible ways to mark a record as deleted: - `model.soft_delete()` and `query.soft_delete()`. - - The `model.soft_delete()` method works with a single already-fetched entry. - `query.soft_delete()` makes only one db request for all entries that - correspond to the query. - -* In almost all cases you should use `query.soft_delete()`. Some examples: +* While there is a ``model.soft_delete()`` method, prefer + ``query.soft_delete()``. Some examples: .. code-block:: python - def soft_delete_bar(): - count = model_query(BarModel).find(some_condition).soft_delete() + @enginefacade.writer + def soft_delete_bar(context): + # synchronize_session=False will prevent the ORM from attempting + # to search the Session for instances matching the DELETE; + # this is typically not necessary for small operations. + count = model_query(BarModel, context.session).\\ + find(some_condition).soft_delete(synchronize_session=False) if count == 0: raise Exception("0 entries were soft deleted") - def complex_soft_delete_with_synchronization_bar(session=None): - if session is None: - session = sessionmaker() - with session.begin(subtransactions=True): - count = (model_query(BarModel). - find(some_condition). - soft_delete(synchronize_session=True)) - # Here synchronize_session is required, because we - # don't know what is going on in outer session. - if count == 0: - raise Exception("0 entries were soft deleted") - -* There is only one situation where `model.soft_delete()` is appropriate: when - you fetch a single record, work with it, and mark it as deleted in the same - transaction. - - .. code-block:: python - - def soft_delete_bar_model(): - session = sessionmaker() - with session.begin(): - bar_ref = model_query(BarModel).find(some_condition).first() - # Work with bar_ref - bar_ref.soft_delete(session=session) - - However, if you need to work with all entries that correspond to query and - then soft delete them you should use the `query.soft_delete()` method: + @enginefacade.writer + def complex_soft_delete_with_synchronization_bar(context): + # use synchronize_session='evaluate' when you'd like to attempt + # to update the state of the Session to match that of the DELETE. + # This is potentially helpful if the operation is complex and + # continues to work with instances that were loaded, though + # not usually needed. + count = (model_query(BarModel, context.session). + find(some_condition). + soft_delete(synchronize_session='evaulate')) + if count == 0: + raise Exception("0 entries were soft deleted") - .. code-block:: python - def soft_delete_multi_models(): - session = sessionmaker() - with session.begin(): - query = (model_query(BarModel, session=session). - find(some_condition)) - model_refs = query.all() - # Work with model_refs - query.soft_delete(synchronize_session=False) - # synchronize_session=False should be set if there is no outer - # session and these entries are not used after this. - - When working with many rows, it is very important to use query.soft_delete, - which issues a single query. Using `model.soft_delete()`, as in the following - example, is very inefficient. +""" - .. code-block:: python +from oslo_db.sqlalchemy import enginefacade +from oslo_db.sqlalchemy import engines +from oslo_db.sqlalchemy import orm - for bar_ref in bar_refs: - bar_ref.soft_delete(session=session) - # This will produce count(bar_refs) db requests. +EngineFacade = enginefacade.LegacyEngineFacade +create_engine = engines.create_engine +get_maker = orm.get_maker +Query = orm.Query +Session = orm.Session -""" -import itertools -import logging -import os -import re -import time - -from oslo_utils import timeutils -import six -from sqlalchemy import event -from sqlalchemy import exc -import sqlalchemy.orm -from sqlalchemy import pool -from sqlalchemy.sql.expression import literal_column -from sqlalchemy.sql.expression import select - -from oslo_db._i18n import _LW -from oslo_db import exception -from oslo_db import options -from oslo_db.sqlalchemy import exc_filters -from oslo_db.sqlalchemy import update_match -from oslo_db.sqlalchemy import utils - -LOG = logging.getLogger(__name__) - - -def _thread_yield(dbapi_con, con_record): - """Ensure other greenthreads get a chance to be executed. - - If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will - execute instead of time.sleep(0). - Force a context switch. With common database backends (eg MySQLdb and - sqlite), there is no implicit yield caused by network I/O since they are - implemented by C libraries that eventlet cannot monkey patch. - """ - time.sleep(0) - - -def _connect_ping_listener(connection, branch): - """Ping the server at connection startup. - - Ping the server at transaction begin and transparently reconnect - if a disconnect exception occurs. - """ - if branch: - return - - # turn off "close with result". This can also be accomplished - # by branching the connection, however just setting the flag is - # more performant and also doesn't get involved with some - # connection-invalidation awkardness that occurs (see - # https://bitbucket.org/zzzeek/sqlalchemy/issue/3215/) - save_should_close_with_result = connection.should_close_with_result - connection.should_close_with_result = False - try: - # run a SELECT 1. use a core select() so that - # any details like that needed by Oracle, DB2 etc. are handled. - connection.scalar(select([1])) - except exception.DBConnectionError: - # catch DBConnectionError, which is raised by the filter - # system. - # disconnect detected. The connection is now - # "invalid", but the pool should be ready to return - # new connections assuming they are good now. - # run the select again to re-validate the Connection. - connection.scalar(select([1])) - finally: - connection.should_close_with_result = save_should_close_with_result - - -def _setup_logging(connection_debug=0): - """setup_logging function maps SQL debug level to Python log level. - - Connection_debug is a verbosity of SQL debugging information. - 0=None(default value), - 1=Processed only messages with WARNING level or higher - 50=Processed only messages with INFO level or higher - 100=Processed only messages with DEBUG level - """ - if connection_debug >= 0: - logger = logging.getLogger('sqlalchemy.engine') - if connection_debug >= 100: - logger.setLevel(logging.DEBUG) - elif connection_debug >= 50: - logger.setLevel(logging.INFO) - else: - logger.setLevel(logging.WARNING) - - -def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, - idle_timeout=3600, - connection_debug=0, max_pool_size=None, max_overflow=None, - pool_timeout=None, sqlite_synchronous=True, - connection_trace=False, max_retries=10, retry_interval=10, - thread_checkin=True, logging_name=None): - """Return a new SQLAlchemy engine.""" - - url = sqlalchemy.engine.url.make_url(sql_connection) - - engine_args = { - "pool_recycle": idle_timeout, - 'convert_unicode': True, - 'connect_args': {}, - 'logging_name': logging_name - } - - _setup_logging(connection_debug) - - _init_connection_args( - url, engine_args, - sqlite_fk=sqlite_fk, - max_pool_size=max_pool_size, - max_overflow=max_overflow, - pool_timeout=pool_timeout - ) - - engine = sqlalchemy.create_engine(url, **engine_args) - - _init_events( - engine, - mysql_sql_mode=mysql_sql_mode, - sqlite_synchronous=sqlite_synchronous, - sqlite_fk=sqlite_fk, - thread_checkin=thread_checkin, - connection_trace=connection_trace - ) - - # register alternate exception handler - exc_filters.register_engine(engine) - - # register engine connect handler - event.listen(engine, "engine_connect", _connect_ping_listener) - - # initial connect + test - # NOTE(viktors): the current implementation of _test_connection() - # does nothing, if max_retries == 0, so we can skip it - if max_retries: - test_conn = _test_connection(engine, max_retries, retry_interval) - test_conn.close() - - return engine - - -@utils.dispatch_for_dialect('*', multiple=True) -def _init_connection_args( - url, engine_args, - max_pool_size=None, max_overflow=None, pool_timeout=None, **kw): - - pool_class = url.get_dialect().get_pool_class(url) - if issubclass(pool_class, pool.QueuePool): - if max_pool_size is not None: - engine_args['pool_size'] = max_pool_size - if max_overflow is not None: - engine_args['max_overflow'] = max_overflow - if pool_timeout is not None: - engine_args['pool_timeout'] = pool_timeout - - -@_init_connection_args.dispatch_for("sqlite") -def _init_connection_args(url, engine_args, **kw): - pool_class = url.get_dialect().get_pool_class(url) - # singletonthreadpool is used for :memory: connections; - # replace it with StaticPool. - if issubclass(pool_class, pool.SingletonThreadPool): - engine_args["poolclass"] = pool.StaticPool - engine_args['connect_args']['check_same_thread'] = False - - -@_init_connection_args.dispatch_for("postgresql") -def _init_connection_args(url, engine_args, **kw): - if 'client_encoding' not in url.query: - # Set encoding using engine_args instead of connect_args since - # it's supported for PostgreSQL 8.*. More details at: - # http://docs.sqlalchemy.org/en/rel_0_9/dialects/postgresql.html - engine_args['client_encoding'] = 'utf8' - - -@_init_connection_args.dispatch_for("mysql") -def _init_connection_args(url, engine_args, **kw): - if 'charset' not in url.query: - engine_args['connect_args']['charset'] = 'utf8' - - -@_init_connection_args.dispatch_for("mysql+mysqlconnector") -def _init_connection_args(url, engine_args, **kw): - # mysqlconnector engine (<1.0) incorrectly defaults to - # raise_on_warnings=True - # https://bitbucket.org/zzzeek/sqlalchemy/issue/2515 - if 'raise_on_warnings' not in url.query: - engine_args['connect_args']['raise_on_warnings'] = False - - -@_init_connection_args.dispatch_for("mysql+mysqldb") -@_init_connection_args.dispatch_for("mysql+oursql") -def _init_connection_args(url, engine_args, **kw): - # Those drivers require use_unicode=0 to avoid performance drop due - # to internal usage of Python unicode objects in the driver - # http://docs.sqlalchemy.org/en/rel_0_9/dialects/mysql.html - if 'use_unicode' not in url.query: - engine_args['connect_args']['use_unicode'] = 0 - - -@utils.dispatch_for_dialect('*', multiple=True) -def _init_events(engine, thread_checkin=True, connection_trace=False, **kw): - """Set up event listeners for all database backends.""" - - _add_process_guards(engine) - - if connection_trace: - _add_trace_comments(engine) - - if thread_checkin: - sqlalchemy.event.listen(engine, 'checkin', _thread_yield) - - -@_init_events.dispatch_for("mysql") -def _init_events(engine, mysql_sql_mode=None, **kw): - """Set up event listeners for MySQL.""" - - if mysql_sql_mode is not None: - @sqlalchemy.event.listens_for(engine, "connect") - def _set_session_sql_mode(dbapi_con, connection_rec): - cursor = dbapi_con.cursor() - cursor.execute("SET SESSION sql_mode = %s", [mysql_sql_mode]) - - @sqlalchemy.event.listens_for(engine, "first_connect") - def _check_effective_sql_mode(dbapi_con, connection_rec): - if mysql_sql_mode is not None: - _set_session_sql_mode(dbapi_con, connection_rec) - - cursor = dbapi_con.cursor() - cursor.execute("SHOW VARIABLES LIKE 'sql_mode'") - realmode = cursor.fetchone() - - if realmode is None: - LOG.warning(_LW('Unable to detect effective SQL mode')) - else: - realmode = realmode[1] - LOG.debug('MySQL server mode set to %s', realmode) - if 'TRADITIONAL' not in realmode.upper() and \ - 'STRICT_ALL_TABLES' not in realmode.upper(): - LOG.warning( - _LW( - "MySQL SQL mode is '%s', " - "consider enabling TRADITIONAL or STRICT_ALL_TABLES"), - realmode) - - -@_init_events.dispatch_for("sqlite") -def _init_events(engine, sqlite_synchronous=True, sqlite_fk=False, **kw): - """Set up event listeners for SQLite. - - This includes several settings made on connections as they are - created, as well as transactional control extensions. - - """ - - def regexp(expr, item): - reg = re.compile(expr) - return reg.search(six.text_type(item)) is not None - - @sqlalchemy.event.listens_for(engine, "connect") - def _sqlite_connect_events(dbapi_con, con_record): - - # Add REGEXP functionality on SQLite connections - dbapi_con.create_function('regexp', 2, regexp) - - if not sqlite_synchronous: - # Switch sqlite connections to non-synchronous mode - dbapi_con.execute("PRAGMA synchronous = OFF") - - # Disable pysqlite's emitting of the BEGIN statement entirely. - # Also stops it from emitting COMMIT before any DDL. - # below, we emit BEGIN ourselves. - # see http://docs.sqlalchemy.org/en/rel_0_9/dialects/\ - # sqlite.html#serializable-isolation-savepoints-transactional-ddl - dbapi_con.isolation_level = None - - if sqlite_fk: - # Ensures that the foreign key constraints are enforced in SQLite. - dbapi_con.execute('pragma foreign_keys=ON') - - @sqlalchemy.event.listens_for(engine, "begin") - def _sqlite_emit_begin(conn): - # emit our own BEGIN, checking for existing - # transactional state - if 'in_transaction' not in conn.info: - conn.execute("BEGIN") - conn.info['in_transaction'] = True - - @sqlalchemy.event.listens_for(engine, "rollback") - @sqlalchemy.event.listens_for(engine, "commit") - def _sqlite_end_transaction(conn): - # remove transactional marker - conn.info.pop('in_transaction', None) - - -def _test_connection(engine, max_retries, retry_interval): - if max_retries == -1: - attempts = itertools.count() - else: - attempts = six.moves.range(max_retries) - # See: http://legacy.python.org/dev/peps/pep-3110/#semantic-changes for - # why we are not using 'de' directly (it can be removed from the local - # scope). - de_ref = None - for attempt in attempts: - try: - return engine.connect() - except exception.DBConnectionError as de: - msg = _LW('SQL connection failed. %s attempts left.') - LOG.warning(msg, max_retries - attempt) - time.sleep(retry_interval) - de_ref = de - else: - if de_ref is not None: - six.reraise(type(de_ref), de_ref) - - -class Query(sqlalchemy.orm.query.Query): - """Subclass of sqlalchemy.query with soft_delete() method.""" - def soft_delete(self, synchronize_session='evaluate'): - return self.update({'deleted': literal_column('id'), - 'updated_at': literal_column('updated_at'), - 'deleted_at': timeutils.utcnow()}, - synchronize_session=synchronize_session) - - def update_returning_pk(self, values, surrogate_key): - """Perform an UPDATE, returning the primary key of the matched row. - - This is a method-version of - oslo_db.sqlalchemy.update_match.update_returning_pk(); see that - function for usage details. - - """ - return update_match.update_returning_pk(self, values, surrogate_key) - - def update_on_match(self, specimen, surrogate_key, values, **kw): - """Emit an UPDATE statement matching the given specimen. - - This is a method-version of - oslo_db.sqlalchemy.update_match.update_on_match(); see that function - for usage details. - - """ - return update_match.update_on_match( - self, specimen, surrogate_key, values, **kw) - - -class Session(sqlalchemy.orm.session.Session): - """Custom Session class to avoid SqlAlchemy Session monkey patching.""" - - -def get_maker(engine, autocommit=True, expire_on_commit=False): - """Return a SQLAlchemy sessionmaker using the given engine.""" - return sqlalchemy.orm.sessionmaker(bind=engine, - class_=Session, - autocommit=autocommit, - expire_on_commit=expire_on_commit, - query_cls=Query) - - -def _add_process_guards(engine): - """Add multiprocessing guards. - - Forces a connection to be reconnected if it is detected - as having been shared to a sub-process. - - """ - - @sqlalchemy.event.listens_for(engine, "connect") - def connect(dbapi_connection, connection_record): - connection_record.info['pid'] = os.getpid() - - @sqlalchemy.event.listens_for(engine, "checkout") - def checkout(dbapi_connection, connection_record, connection_proxy): - pid = os.getpid() - if connection_record.info['pid'] != pid: - LOG.debug(_LW( - "Parent process %(orig)s forked (%(newproc)s) with an open " - "database connection, " - "which is being discarded and recreated."), - {"newproc": pid, "orig": connection_record.info['pid']}) - connection_record.connection = connection_proxy.connection = None - raise exc.DisconnectionError( - "Connection record belongs to pid %s, " - "attempting to check out in pid %s" % - (connection_record.info['pid'], pid) - ) - - -def _add_trace_comments(engine): - """Add trace comments. - - Augment statements with a trace of the immediate calling code - for a given statement. - """ - - import os - import sys - import traceback - target_paths = set([ - os.path.dirname(sys.modules['oslo_db'].__file__), - os.path.dirname(sys.modules['sqlalchemy'].__file__) - ]) - - @sqlalchemy.event.listens_for(engine, "before_cursor_execute", retval=True) - def before_cursor_execute(conn, cursor, statement, parameters, context, - executemany): - - # NOTE(zzzeek) - if different steps per DB dialect are desirable - # here, switch out on engine.name for now. - stack = traceback.extract_stack() - our_line = None - for idx, (filename, line, method, function) in enumerate(stack): - for tgt in target_paths: - if filename.startswith(tgt): - our_line = idx - break - if our_line: - break - - if our_line: - trace = "; ".join( - "File: %s (%s) %s" % ( - line[0], line[1], line[2] - ) - # include three lines of context. - for line in stack[our_line - 3:our_line] - - ) - statement = "%s -- %s" % (statement, trace) - - return statement, parameters - - -class EngineFacade(object): - """A helper class for removing of global engine instances from oslo.db. - - As a library, oslo.db can't decide where to store/when to create engine - and sessionmaker instances, so this must be left for a target application. - - On the other hand, in order to simplify the adoption of oslo.db changes, - we'll provide a helper class, which creates engine and sessionmaker - on its instantiation and provides get_engine()/get_session() methods - that are compatible with corresponding utility functions that currently - exist in target projects, e.g. in Nova. - - engine/sessionmaker instances will still be global (and they are meant to - be global), but they will be stored in the app context, rather that in the - oslo.db context. - - Note: using of this helper is completely optional and you are encouraged to - integrate engine/sessionmaker instances into your apps any way you like - (e.g. one might want to bind a session to a request context). Two important - things to remember: - - 1. An Engine instance is effectively a pool of DB connections, so it's - meant to be shared (and it's thread-safe). - 2. A Session instance is not meant to be shared and represents a DB - transactional context (i.e. it's not thread-safe). sessionmaker is - a factory of sessions. - - """ - - def __init__(self, sql_connection, slave_connection=None, - sqlite_fk=False, autocommit=True, - expire_on_commit=False, **kwargs): - """Initialize engine and sessionmaker instances. - - :param sql_connection: the connection string for the database to use - :type sql_connection: string - - :param slave_connection: the connection string for the 'slave' database - to use. If not provided, the master database - will be used for all operations. Note: this - is meant to be used for offloading of read - operations to asynchronously replicated slaves - to reduce the load on the master database. - :type slave_connection: string - - :param sqlite_fk: enable foreign keys in SQLite - :type sqlite_fk: bool - - :param autocommit: use autocommit mode for created Session instances - :type autocommit: bool - - :param expire_on_commit: expire session objects on commit - :type expire_on_commit: bool - - Keyword arguments: - - :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions. - (defaults to TRADITIONAL) - :keyword idle_timeout: timeout before idle sql connections are reaped - (defaults to 3600) - :keyword connection_debug: verbosity of SQL debugging information. - -1=Off, 0=None, 100=Everything (defaults - to 0) - :keyword max_pool_size: maximum number of SQL connections to keep open - in a pool (defaults to SQLAlchemy settings) - :keyword max_overflow: if set, use this value for max_overflow with - sqlalchemy (defaults to SQLAlchemy settings) - :keyword pool_timeout: if set, use this value for pool_timeout with - sqlalchemy (defaults to SQLAlchemy settings) - :keyword sqlite_synchronous: if True, SQLite uses synchronous mode - (defaults to True) - :keyword connection_trace: add python stack traces to SQL as comment - strings (defaults to False) - :keyword max_retries: maximum db connection retries during startup. - (setting -1 implies an infinite retry count) - (defaults to 10) - :keyword retry_interval: interval between retries of opening a sql - connection (defaults to 10) - :keyword thread_checkin: boolean that indicates that between each - engine checkin event a sleep(0) will occur to - allow other greenthreads to run (defaults to - True) - """ - - super(EngineFacade, self).__init__() - - engine_kwargs = { - 'sqlite_fk': sqlite_fk, - 'mysql_sql_mode': kwargs.get('mysql_sql_mode', 'TRADITIONAL'), - 'idle_timeout': kwargs.get('idle_timeout', 3600), - 'connection_debug': kwargs.get('connection_debug', 0), - 'max_pool_size': kwargs.get('max_pool_size'), - 'max_overflow': kwargs.get('max_overflow'), - 'pool_timeout': kwargs.get('pool_timeout'), - 'sqlite_synchronous': kwargs.get('sqlite_synchronous', True), - 'connection_trace': kwargs.get('connection_trace', False), - 'max_retries': kwargs.get('max_retries', 10), - 'retry_interval': kwargs.get('retry_interval', 10), - 'thread_checkin': kwargs.get('thread_checkin', True) - } - maker_kwargs = { - 'autocommit': autocommit, - 'expire_on_commit': expire_on_commit - } - - self._engine = create_engine(sql_connection=sql_connection, - **engine_kwargs) - self._session_maker = get_maker(engine=self._engine, - **maker_kwargs) - if slave_connection: - self._slave_engine = create_engine(sql_connection=slave_connection, - **engine_kwargs) - self._slave_session_maker = get_maker(engine=self._slave_engine, - **maker_kwargs) - else: - self._slave_engine = None - self._slave_session_maker = None - - def get_engine(self, use_slave=False): - """Get the engine instance (note, that it's shared). - - :param use_slave: if possible, use 'slave' database for this engine. - If the connection string for the slave database - wasn't provided, 'master' engine will be returned. - (defaults to False) - :type use_slave: bool - - """ - - if use_slave and self._slave_engine: - return self._slave_engine - - return self._engine - - def get_session(self, use_slave=False, **kwargs): - """Get a Session instance. - - :param use_slave: if possible, use 'slave' database connection for - this session. If the connection string for the - slave database wasn't provided, a session bound - to the 'master' engine will be returned. - (defaults to False) - :type use_slave: bool - - Keyword arugments will be passed to a sessionmaker instance as is (if - passed, they will override the ones used when the sessionmaker instance - was created). See SQLAlchemy Session docs for details. - - """ - - if use_slave and self._slave_session_maker: - return self._slave_session_maker(**kwargs) - - return self._session_maker(**kwargs) - - @classmethod - def from_config(cls, conf, - sqlite_fk=False, autocommit=True, expire_on_commit=False): - """Initialize EngineFacade using oslo.config config instance options. - - :param conf: oslo.config config instance - :type conf: oslo.config.cfg.ConfigOpts - - :param sqlite_fk: enable foreign keys in SQLite - :type sqlite_fk: bool - - :param autocommit: use autocommit mode for created Session instances - :type autocommit: bool - - :param expire_on_commit: expire session objects on commit - :type expire_on_commit: bool - - """ - - conf.register_opts(options.database_opts, 'database') - - return cls(sql_connection=conf.database.connection, - slave_connection=conf.database.slave_connection, - sqlite_fk=sqlite_fk, - autocommit=autocommit, - expire_on_commit=expire_on_commit, - mysql_sql_mode=conf.database.mysql_sql_mode, - idle_timeout=conf.database.idle_timeout, - connection_debug=conf.database.connection_debug, - max_pool_size=conf.database.max_pool_size, - max_overflow=conf.database.max_overflow, - pool_timeout=conf.database.pool_timeout, - sqlite_synchronous=conf.database.sqlite_synchronous, - connection_trace=conf.database.connection_trace, - max_retries=conf.database.max_retries, - retry_interval=conf.database.retry_interval) +__all__ = ["EngineFacade", "create_engine", "get_maker", "Query", "Session"] diff --git a/oslo_db/sqlalchemy/test_base.py b/oslo_db/sqlalchemy/test_base.py index 362f685..fe36df8 100644 --- a/oslo_db/sqlalchemy/test_base.py +++ b/oslo_db/sqlalchemy/test_base.py @@ -30,6 +30,7 @@ from oslo_utils import reflection import six from oslo_db import exception +from oslo_db.sqlalchemy import enginefacade from oslo_db.sqlalchemy import provision from oslo_db.sqlalchemy import session @@ -78,9 +79,15 @@ class DbFixture(fixtures.Fixture): else: self.test.engine = self.test.db.engine self.test.sessionmaker = session.get_maker(self.test.engine) + self.addCleanup(setattr, self.test, 'sessionmaker', None) self.addCleanup(setattr, self.test, 'engine', None) + self.test.enginefacade = enginefacade._TestTransactionFactory( + self.test.engine, self.test.sessionmaker, apply_global=True, + synchronous_reader=True) + self.addCleanup(self.test.enginefacade.dispose_global) + class DbTestCase(test_base.BaseTestCase): """Base class for testing of DB code. diff --git a/oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py b/oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py index 0b1bb89..7738254 100644 --- a/oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py +++ b/oslo_db/tests/old_import_api/sqlalchemy/test_exc_filters.py @@ -25,8 +25,8 @@ from sqlalchemy.orm import mapper from oslo.db import exception from oslo.db.sqlalchemy import exc_filters from oslo.db.sqlalchemy import test_base -from oslo_db.sqlalchemy import session as private_session -from oslo_db.tests.old_import_api import utils as test_utils +from oslo_db.sqlalchemy import engines +from oslo_db.tests import utils as test_utils _TABLE_NAME = '__tmp__test__tmp__' @@ -720,7 +720,7 @@ class TestDBDisconnected(TestsExceptionFilter): engine = self.engine event.listen( - engine, "engine_connect", private_session._connect_ping_listener) + engine, "engine_connect", engines._connect_ping_listener) real_do_execute = engine.dialect.do_execute counter = itertools.count(1) @@ -816,7 +816,7 @@ class TestDBConnectRetry(TestsExceptionFilter): with self._dbapi_fixture(dialect_name): with mock.patch.object(engine.dialect, "connect", cant_connect): - return private_session._test_connection(engine, retries, .01) + return engines._test_connection(engine, retries, .01) def test_connect_no_retries(self): conn = self._run_test( diff --git a/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py b/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py index d1f10b7..1aa8e59 100644 --- a/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py +++ b/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py @@ -29,11 +29,11 @@ from sqlalchemy import Integer, String from sqlalchemy.ext.declarative import declarative_base from oslo.db import exception +from oslo.db import options as db_options from oslo.db.sqlalchemy import models from oslo.db.sqlalchemy import session from oslo.db.sqlalchemy import test_base -from oslo_db import options as db_options -from oslo_db.sqlalchemy import session as private_session +from oslo_db.sqlalchemy import engines BASE = declarative_base() @@ -300,8 +300,8 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase): self.assertFalse(ses.autocommit) self.assertTrue(ses.expire_on_commit) - @mock.patch('oslo_db.sqlalchemy.session.get_maker') - @mock.patch('oslo_db.sqlalchemy.session.create_engine') + @mock.patch('oslo_db.sqlalchemy.orm.get_maker') + @mock.patch('oslo_db.sqlalchemy.engines.create_engine') def test_creation_from_config(self, create_engine, get_maker): conf = cfg.ConfigOpts() conf.register_opts(db_options.database_opts, group='database') @@ -339,6 +339,42 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase): autocommit=False, expire_on_commit=True) + @mock.patch('oslo_db.sqlalchemy.orm.get_maker') + @mock.patch('oslo_db.sqlalchemy.engines.create_engine') + def test_passed_in_url_overrides_conf(self, create_engine, get_maker): + conf = cfg.ConfigOpts() + conf.register_opts(db_options.database_opts, group='database') + + overrides = { + 'connection': 'sqlite:///conf_db_setting', + 'connection_debug': 100, + 'max_pool_size': 10, + 'mysql_sql_mode': 'TRADITIONAL', + } + for optname, optvalue in overrides.items(): + conf.set_override(optname, optvalue, group='database') + + session.EngineFacade( + "sqlite:///override_sql", + **dict(conf.database.items()) + ) + + create_engine.assert_called_once_with( + sql_connection='sqlite:///override_sql', + connection_debug=100, + max_pool_size=10, + mysql_sql_mode='TRADITIONAL', + sqlite_fk=False, + idle_timeout=mock.ANY, + retry_interval=mock.ANY, + max_retries=mock.ANY, + max_overflow=mock.ANY, + connection_trace=mock.ANY, + sqlite_synchronous=mock.ANY, + pool_timeout=mock.ANY, + thread_checkin=mock.ANY, + ) + def test_slave_connection(self): paths = self.create_tempfiles([('db.master', ''), ('db.slave', '')], ext='') @@ -483,9 +519,7 @@ class MysqlConnectTest(test_base.MySQLOpportunisticTestCase): ) ) ): - private_session._init_events.dispatch_on_drivername("mysql")( - test_engine - ) + engines._init_events.dispatch_on_drivername("mysql")(test_engine) test_engine.raw_connection() self.assertIn('Unable to detect effective SQL mode', @@ -553,7 +587,7 @@ class PatchStacktraceTest(test_base.DbTestCase): with mock.patch("traceback.extract_stack", side_effect=extract_stack): - private_session._add_trace_comments(engine) + engines._add_trace_comments(engine) conn = engine.connect() with mock.patch.object(engine.dialect, "do_execute") as mock_exec: diff --git a/oslo_db/tests/sqlalchemy/test_enginefacade.py b/oslo_db/tests/sqlalchemy/test_enginefacade.py new file mode 100644 index 0000000..a222565 --- /dev/null +++ b/oslo_db/tests/sqlalchemy/test_enginefacade.py @@ -0,0 +1,1646 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import contextlib +import copy +import warnings + +import mock +from oslo_config import cfg +from oslo_context import context as oslo_context +from oslotest import base as oslo_test_base +from sqlalchemy import Column +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy.orm import mapper +from sqlalchemy import select +from sqlalchemy import String +from sqlalchemy import Table + +from oslo_db import exception +from oslo_db import options +from oslo_db.sqlalchemy import enginefacade +from oslo_db.sqlalchemy import engines as oslo_engines +from oslo_db.sqlalchemy import orm +from oslo_db.sqlalchemy import test_base + + +class SingletonOnName(mock.MagicMock): + def __init__(self, the_name, **kw): + super(SingletonOnName, self).__init__( + __eq__=lambda self, other: other._assert_name == self._assert_name, + _assert_name=the_name, + **kw + ) + + def __deepcopy__(self, memo): + return self + + +class SingletonConnection(SingletonOnName): + def __init__(self, **kw): + super(SingletonConnection, self).__init__( + "connection", **kw) + + +class SingletonEngine(SingletonOnName): + def __init__(self, connection, **kw): + super(SingletonEngine, self).__init__( + "engine", + connect=mock.Mock(return_value=connection), + url=connection, + _assert_connection=connection, + **kw + ) + + +class NonDecoratedContext(object): + """a Context object that's not run through transaction_context_provider.""" + + +class AssertDataSource(collections.namedtuple( + "AssertDataSource", ["writer", "reader", "async_reader"])): + + def element_for_writer(self, const): + if const is enginefacade._WRITER: + return self.writer + elif const is enginefacade._READER: + return self.reader + elif const is enginefacade._ASYNC_READER: + return self.async_reader + else: + assert False, "Unknown constant: %s" % const + + +class MockFacadeTest(oslo_test_base.BaseTestCase): + """test by applying mocks to internal call-points. + + This applies mocks to + oslo.db.sqlalchemy.engines.create_engine() and + oslo.db.sqlalchemy.orm.get_maker(), then mocking a + _TransactionFactory into + oslo.db.sqlalchemy.enginefacade._context_manager._root_factory. + + Various scenarios are run against the enginefacade functions, and the + exact calls made against the mock create_engine(), get_maker(), and + associated objects are tested exactly against expected calls. + + """ + + synchronous_reader = True + + engine_uri = 'some_connection' + slave_uri = None + + def setUp(self): + super(MockFacadeTest, self).setUp() + + writer_conn = SingletonConnection() + writer_engine = SingletonEngine(writer_conn) + writer_session = mock.Mock( + connection=mock.Mock(return_value=writer_conn)) + writer_maker = mock.Mock(return_value=writer_session) + + if self.slave_uri: + async_reader_conn = SingletonConnection() + async_reader_engine = SingletonEngine(async_reader_conn) + async_reader_session = mock.Mock( + connection=mock.Mock(return_value=async_reader_conn)) + async_reader_maker = mock.Mock(return_value=async_reader_session) + + else: + async_reader_conn = writer_conn + async_reader_engine = writer_engine + async_reader_session = writer_session + async_reader_maker = writer_maker + + if self.synchronous_reader: + reader_conn = async_reader_conn + reader_engine = async_reader_engine + reader_session = async_reader_session + reader_maker = async_reader_maker + else: + reader_conn = writer_conn + reader_engine = writer_engine + reader_session = writer_session + reader_maker = writer_maker + + self.connections = AssertDataSource( + writer_conn, reader_conn, async_reader_conn + ) + self.engines = AssertDataSource( + writer_engine, reader_engine, async_reader_engine + ) + self.sessions = AssertDataSource( + writer_session, reader_session, async_reader_session + ) + self.makers = AssertDataSource( + writer_maker, reader_maker, async_reader_maker + ) + + def get_maker(engine, **kw): + if engine is writer_engine: + return self.makers.writer + elif engine is reader_engine: + return self.makers.reader + elif engine is async_reader_engine: + return self.makers.async_reader + else: + assert False + + session_patch = mock.patch.object( + orm, "get_maker", + side_effect=get_maker) + self.get_maker = session_patch.start() + self.addCleanup(session_patch.stop) + + def create_engine(sql_connection, **kw): + if sql_connection == self.engine_uri: + return self.engines.writer + elif sql_connection == self.slave_uri: + return self.engines.async_reader + else: + assert False + + engine_patch = mock.patch.object( + oslo_engines, "create_engine", side_effect=create_engine) + + self.create_engine = engine_patch.start() + self.addCleanup(engine_patch.stop) + + self.factory = enginefacade._TransactionFactory() + self.factory.configure( + synchronous_reader=self.synchronous_reader + ) + + self.factory.configure( + connection=self.engine_uri, + slave_connection=self.slave_uri + ) + + facade_patcher = mock.patch.object( + enginefacade._context_manager, "_root_factory", self.factory) + facade_patcher.start() + self.addCleanup(facade_patcher.stop) + + def _assert_ctx_connection(self, context, connection): + self.assertIs(context.connection, connection) + + def _assert_ctx_session(self, context, session): + self.assertIs(context.session, session) + + def _assert_non_decorated_ctx_connection(self, context, connection): + transaction_ctx = enginefacade._transaction_ctx_for_context(context) + self.assertIs(transaction_ctx.connection, connection) + + def _assert_non_decorated_ctx_session(self, context, session): + transaction_ctx = enginefacade._transaction_ctx_for_context(context) + self.assertIs(transaction_ctx.session, session) + + @contextlib.contextmanager + def _assert_engines(self): + """produce a mock series of engine calls. + + These are expected to match engine-related calls established + by the test subject. + + """ + + writer_conn = SingletonConnection() + writer_engine = SingletonEngine(writer_conn) + if self.slave_uri: + async_reader_conn = SingletonConnection() + async_reader_engine = SingletonEngine(async_reader_conn) + else: + async_reader_conn = writer_conn + async_reader_engine = writer_engine + + if self.synchronous_reader: + reader_engine = async_reader_engine + else: + reader_engine = writer_engine + + engines = AssertDataSource( + writer_engine, reader_engine, async_reader_engine) + + def create_engine(sql_connection, **kw): + if sql_connection == self.engine_uri: + return engines.writer + elif sql_connection == self.slave_uri: + return engines.async_reader + else: + assert False + + engine_factory = mock.Mock(side_effect=create_engine) + engine_factory( + sql_connection=self.engine_uri, + **dict((k, mock.ANY) for k in self.factory._engine_cfg.keys()) + ) + if self.slave_uri: + engine_factory( + sql_connection=self.slave_uri, + **dict((k, mock.ANY) for k in self.factory._engine_cfg.keys()) + ) + + yield AssertDataSource( + writer_engine, reader_engine, async_reader_engine + ) + + self.assertEqual( + engine_factory.mock_calls, + self.create_engine.mock_calls + ) + + for sym in [ + enginefacade._WRITER, enginefacade._READER, + enginefacade._ASYNC_READER + ]: + self.assertEqual( + engines.element_for_writer(sym).mock_calls, + self.engines.element_for_writer(sym).mock_calls + ) + + def _assert_async_reader_connection(self, engines, session=None): + return self._assert_connection( + engines, enginefacade._ASYNC_READER, session) + + def _assert_reader_connection(self, engines, session=None): + return self._assert_connection(engines, enginefacade._READER, session) + + def _assert_writer_connection(self, engines, session=None): + return self._assert_connection(engines, enginefacade._WRITER, session) + + @contextlib.contextmanager + def _assert_connection(self, engines, writer, session=None): + """produce a mock series of connection calls. + + These are expected to match connection-related calls established + by the test subject. + + """ + if session: + connection = session.connection() + yield connection + else: + connection = engines.element_for_writer(writer).connect() + trans = connection.begin() + yield connection + if writer is enginefacade._WRITER: + trans.commit() + else: + trans.rollback() + connection.close() + + self.assertEqual( + connection.mock_calls, + self.connections.element_for_writer(writer).mock_calls) + + @contextlib.contextmanager + def _assert_makers(self, engines): + + writer_session = mock.Mock(connection=mock.Mock( + return_value=engines.writer._assert_connection) + ) + writer_maker = mock.Mock(return_value=writer_session) + + if self.slave_uri: + async_reader_session = mock.Mock(connection=mock.Mock( + return_value=engines.async_reader._assert_connection) + ) + async_reader_maker = mock.Mock(return_value=async_reader_session) + else: + async_reader_session = writer_session + async_reader_maker = writer_maker + + if self.synchronous_reader: + reader_maker = async_reader_maker + else: + reader_maker = writer_maker + + makers = AssertDataSource( + writer_maker, + reader_maker, + async_reader_maker, + ) + + def get_maker(engine, **kw): + if engine is engines.writer: + return makers.writer + elif engine is engines.reader: + return makers.reader + elif engine is engines.async_reader: + return makers.async_reader + else: + assert False + + maker_factories = mock.Mock(side_effect=get_maker) + + maker_factories( + autocommit=True, engine=engines.writer, + expire_on_commit=False) + if self.slave_uri: + maker_factories( + autocommit=True, engine=engines.async_reader, + expire_on_commit=False) + + yield makers + + self.assertEqual( + maker_factories.mock_calls, + self.get_maker.mock_calls) + + for sym in [ + enginefacade._WRITER, enginefacade._READER, + enginefacade._ASYNC_READER + ]: + self.assertEqual( + makers.element_for_writer(sym).mock_calls, + self.makers.element_for_writer(sym).mock_calls) + + def _assert_async_reader_session( + self, makers, connection=None, assert_calls=True): + return self._assert_session( + makers, enginefacade._ASYNC_READER, connection, assert_calls) + + def _assert_reader_session( + self, makers, connection=None, assert_calls=True): + return self._assert_session( + makers, enginefacade._READER, + connection, assert_calls) + + def _assert_writer_session( + self, makers, connection=None, assert_calls=True): + return self._assert_session( + makers, enginefacade._WRITER, + connection, assert_calls) + + @contextlib.contextmanager + def _assert_session( + self, makers, writer, connection=None, assert_calls=True): + """produce a mock series of session calls. + + These are expected to match session-related calls established + by the test subject. + + """ + + if connection: + session = makers.element_for_writer(writer)(bind=connection) + else: + session = makers.element_for_writer(writer)() + session.begin() + yield session + if writer is enginefacade._WRITER: + session.commit() + elif enginefacade.\ + _context_manager._factory._transaction_ctx_cfg[ + 'rollback_reader_sessions']: + session.rollback() + session.close() + + if assert_calls: + self.assertEqual( + session.mock_calls, + self.sessions.element_for_writer(writer).mock_calls) + + def test_session_reader_decorator(self): + context = oslo_context.RequestContext() + + @enginefacade.reader + def go(context): + context.session.execute("test") + go(context) + + with self._assert_engines() as engines: + with self._assert_makers(engines) as makers: + with self._assert_reader_session(makers) as session: + session.execute("test") + + def test_connection_reader_decorator(self): + context = oslo_context.RequestContext() + + @enginefacade.reader.connection + def go(context): + context.connection.execute("test") + go(context) + + with self._assert_engines() as engines: + with self._assert_reader_connection(engines) as connection: + connection.execute("test") + + def test_session_reader_nested_in_connection_reader(self): + context = oslo_context.RequestContext() + + @enginefacade.reader.connection + def go1(context): + context.connection.execute("test1") + go2(context) + + @enginefacade.reader + def go2(context): + context.session.execute("test2") + go1(context) + + with self._assert_engines() as engines: + with self._assert_reader_connection(engines) as connection: + connection.execute("test1") + with self._assert_makers(engines) as makers: + with self._assert_reader_session( + makers, connection) as session: + session.execute("test2") + + def test_connection_reader_nested_in_session_reader(self): + context = oslo_context.RequestContext() + + @enginefacade.reader + def go1(context): + context.session.execute("test1") + go2(context) + + @enginefacade.reader.connection + def go2(context): + context.connection.execute("test2") + + go1(context) + + with self._assert_engines() as engines: + with self._assert_makers(engines) as makers: + with self._assert_reader_session(makers) as session: + session.execute("test1") + with self._assert_reader_connection( + engines, session) as connection: + connection.execute("test2") + + def test_session_reader_decorator_nested(self): + context = oslo_context.RequestContext() + + @enginefacade.reader + def go1(context): + context.session.execute("test1") + go2(context) + + @enginefacade.reader + def go2(context): + context.session.execute("test2") + go1(context) + + with self._assert_engines() as engines: + with self._assert_makers(engines) as makers: + with self._assert_reader_session(makers) as session: + session.execute("test1") + session.execute("test2") + + def test_reader_nested_in_writer_ok(self): + context = oslo_context.RequestContext() + + @enginefacade.writer + def go1(context): + context.session.execute("test1") + go2(context) + + @enginefacade.reader + def go2(context): + context.session.execute("test2") + + go1(context) + with self._assert_engines() as engines: + with self._assert_makers(engines) as makers: + with self._assert_writer_session(makers) as session: + session.execute("test1") + session.execute("test2") + + def test_writer_nested_in_reader_raises(self): + context = oslo_context.RequestContext() + + @enginefacade.reader + def go1(context): + context.session.execute("test1") + go2(context) + + @enginefacade.writer + def go2(context): + context.session.execute("test2") + + exc = self.assertRaises( + TypeError, go1, context + ) + self.assertEqual( + "Can't upgrade a READER " + "transaction to a WRITER mid-transaction", + exc.args[0] + ) + + def test_async_on_writer_raises(self): + exc = self.assertRaises( + TypeError, getattr, enginefacade.writer, "async" + ) + self.assertEqual( + "Setting async on a WRITER makes no sense", + exc.args[0] + ) + + def test_savepoint_and_independent_raises(self): + exc = self.assertRaises( + TypeError, getattr, enginefacade.writer.independent, "savepoint" + ) + self.assertEqual( + "setting savepoint and independent makes no sense.", + exc.args[0] + ) + + def test_reader_nested_in_async_reader_raises(self): + context = oslo_context.RequestContext() + + @enginefacade.reader.async + def go1(context): + context.session.execute("test1") + go2(context) + + @enginefacade.reader + def go2(context): + context.session.execute("test2") + + exc = self.assertRaises( + TypeError, go1, context + ) + self.assertEqual( + "Can't upgrade an ASYNC_READER transaction " + "to a READER mid-transaction", + exc.args[0] + ) + + def test_writer_nested_in_async_reader_raises(self): + context = oslo_context.RequestContext() + + @enginefacade.reader.async + def go1(context): + context.session.execute("test1") + go2(context) + + @enginefacade.writer + def go2(context): + context.session.execute("test2") + + exc = self.assertRaises( + TypeError, go1, context + ) + self.assertEqual( + "Can't upgrade an ASYNC_READER transaction to a " + "WRITER mid-transaction", + exc.args[0] + ) + + def test_reader_then_writer_ok(self): + context = oslo_context.RequestContext() + + @enginefacade.reader + def go1(context): + context.session.execute("test1") + + @enginefacade.writer + def go2(context): + context.session.execute("test2") + + go1(context) + go2(context) + + with self._assert_engines() as engines: + with self._assert_makers(engines) as makers: + with self._assert_reader_session( + makers, assert_calls=False) as session: + session.execute("test1") + with self._assert_writer_session(makers) as session: + session.execute("test2") + + def test_async_reader_then_reader_ok(self): + context = oslo_context.RequestContext() + + @enginefacade.reader.async + def go1(context): + context.session.execute("test1") + + @enginefacade.reader + def go2(context): + context.session.execute("test2") + + go1(context) + go2(context) + + with self._assert_engines() as engines: + with self._assert_makers(engines) as makers: + with self._assert_async_reader_session( + makers, assert_calls=False) as session: + session.execute("test1") + with self._assert_reader_session(makers) as session: + session.execute("test2") + + def test_using_reader(self): + context = oslo_context.RequestContext() + + with enginefacade.reader.using(context) as session: + self._assert_ctx_session(context, session) + session.execute("test1") + + with self._assert_engines() as engines: + with self._assert_makers(engines) as makers: + with self._assert_reader_session(makers) as session: + session.execute("test1") + + def test_using_reader_rollback_reader_session(self): + enginefacade.configure(rollback_reader_sessions=True) + + context = oslo_context.RequestContext() + + with enginefacade.reader.using(context) as session: + self._assert_ctx_session(context, session) + session.execute("test1") + + with self._assert_engines() as engines: + with self._assert_makers(engines) as makers: + with self._assert_reader_session(makers) as session: + session.execute("test1") + + def test_using_writer(self): + context = oslo_context.RequestContext() + + with enginefacade.writer.using(context) as session: + self._assert_ctx_session(context, session) + session.execute("test1") + + with self._assert_engines() as engines: + with self._assert_makers(engines) as makers: + with self._assert_writer_session(makers) as session: + session.execute("test1") + + def test_using_writer_no_descriptors(self): + context = NonDecoratedContext() + + with enginefacade.writer.using(context) as session: + self._assert_non_decorated_ctx_session(context, session) + session.execute("test1") + + with self._assert_engines() as engines: + with self._assert_makers(engines) as makers: + with self._assert_writer_session(makers) as session: + session.execute("test1") + + def test_using_writer_connection_no_descriptors(self): + context = NonDecoratedContext() + + with enginefacade.writer.connection.using(context) as connection: + self._assert_non_decorated_ctx_connection(context, connection) + connection.execute("test1") + + with self._assert_engines() as engines: + with self._assert_writer_connection(engines) as conn: + conn.execute("test1") + + def test_using_reader_connection(self): + context = oslo_context.RequestContext() + + with enginefacade.reader.connection.using(context) as connection: + self._assert_ctx_connection(context, connection) + connection.execute("test1") + + with self._assert_engines() as engines: + with self._assert_reader_connection(engines) as conn: + conn.execute("test1") + + def test_using_writer_connection(self): + context = oslo_context.RequestContext() + + with enginefacade.writer.connection.using(context) as connection: + self._assert_ctx_connection(context, connection) + connection.execute("test1") + + with self._assert_engines() as engines: + with self._assert_writer_connection(engines) as conn: + conn.execute("test1") + + def test_context_copied_using_existing_writer_connection(self): + context = oslo_context.RequestContext() + + with enginefacade.writer.connection.using(context) as connection: + self._assert_ctx_connection(context, connection) + connection.execute("test1") + + ctx2 = copy.deepcopy(context) + + with enginefacade.reader.connection.using(ctx2) as conn2: + self.assertIs(conn2, connection) + self._assert_ctx_connection(ctx2, conn2) + + conn2.execute("test2") + + with self._assert_engines() as engines: + with self._assert_writer_connection(engines) as conn: + conn.execute("test1") + conn.execute("test2") + + def test_context_nodesc_copied_using_existing_writer_connection(self): + context = NonDecoratedContext() + + with enginefacade.writer.connection.using(context) as connection: + self._assert_non_decorated_ctx_connection(context, connection) + connection.execute("test1") + + ctx2 = copy.deepcopy(context) + + with enginefacade.reader.connection.using(ctx2) as conn2: + self.assertIs(conn2, connection) + self._assert_non_decorated_ctx_connection(ctx2, conn2) + + conn2.execute("test2") + + with self._assert_engines() as engines: + with self._assert_writer_connection(engines) as conn: + conn.execute("test1") + conn.execute("test2") + + def test_session_context_exception(self): + context = oslo_context.RequestContext() + exc = self.assertRaises( + exception.NoEngineContextEstablished, + getattr, context, 'session' + ) + + self.assertRegexpMatches( + exc.args[0], + "No TransactionContext is established for " + "this .*RequestContext.* object within the current " + "thread; the 'session' attribute is unavailable." + ) + + def test_session_context_getattr(self): + context = oslo_context.RequestContext() + self.assertIsNone(getattr(context, 'session', None)) + + def test_connection_context_exception(self): + context = oslo_context.RequestContext() + exc = self.assertRaises( + exception.NoEngineContextEstablished, + getattr, context, 'connection' + ) + + self.assertRegexpMatches( + exc.args[0], + "No TransactionContext is established for " + "this .*RequestContext.* object within the current " + "thread; the 'connection' attribute is unavailable." + ) + + def test_connection_context_getattr(self): + context = oslo_context.RequestContext() + self.assertIsNone(getattr(context, 'connection', None)) + + def test_transaction_context_exception(self): + context = oslo_context.RequestContext() + exc = self.assertRaises( + exception.NoEngineContextEstablished, + getattr, context, 'transaction' + ) + + self.assertRegexpMatches( + exc.args[0], + "No TransactionContext is established for " + "this .*RequestContext.* object within the current " + "thread; the 'transaction' attribute is unavailable." + ) + + def test_transaction_context_getattr(self): + context = oslo_context.RequestContext() + self.assertIsNone(getattr(context, 'transaction', None)) + + def test_trans_ctx_context_exception(self): + context = oslo_context.RequestContext() + exc = self.assertRaises( + exception.NoEngineContextEstablished, + getattr, context, 'transaction_ctx' + ) + + self.assertRegexpMatches( + exc.args[0], + "No TransactionContext is established for " + "this .*RequestContext.* object within the current " + "thread." + ) + + def test_trans_ctx_context_getattr(self): + context = oslo_context.RequestContext() + self.assertIsNone(getattr(context, 'transaction_ctx', None)) + + def test_multiple_factories(self): + """Test that the instrumentation applied to a context class is + + independent of a specific _TransactionContextManager + / _TransactionFactory. + + """ + mgr1 = enginefacade.transaction_context() + mgr1.configure( + connection=self.engine_uri, + slave_connection=self.slave_uri + ) + mgr2 = enginefacade.transaction_context() + mgr2.configure( + connection=self.engine_uri, + slave_connection=self.slave_uri + ) + + context = oslo_context.RequestContext() + + self.assertRaises( + exception.NoEngineContextEstablished, + getattr, context, 'session' + ) + with mgr1.writer.using(context): + self.assertIs(context.transaction_ctx.factory, mgr1._factory) + self.assertIsNot(context.transaction_ctx.factory, mgr2._factory) + self.assertIsNotNone(context.session) + + self.assertRaises( + exception.NoEngineContextEstablished, + getattr, context, 'session' + ) + with mgr2.writer.using(context): + self.assertIsNot(context.transaction_ctx.factory, mgr1._factory) + self.assertIs(context.transaction_ctx.factory, mgr2._factory) + self.assertIsNotNone(context.session) + + def test_multiple_factories_nested(self): + """Test that the instrumentation applied to a context class supports + + nested calls among multiple _TransactionContextManager objects. + + """ + mgr1 = enginefacade.transaction_context() + mgr1.configure( + connection=self.engine_uri, + slave_connection=self.slave_uri + ) + mgr2 = enginefacade.transaction_context() + mgr2.configure( + connection=self.engine_uri, + slave_connection=self.slave_uri + ) + + context = oslo_context.RequestContext() + + with mgr1.writer.using(context): + self.assertIs(context.transaction_ctx.factory, mgr1._factory) + self.assertIsNot(context.transaction_ctx.factory, mgr2._factory) + + with mgr2.writer.using(context): + self.assertIsNot( + context.transaction_ctx.factory, mgr1._factory) + self.assertIs(context.transaction_ctx.factory, mgr2._factory) + self.assertIsNotNone(context.session) + + # mgr1 is restored + self.assertIs(context.transaction_ctx.factory, mgr1._factory) + self.assertIsNot(context.transaction_ctx.factory, mgr2._factory) + self.assertIsNotNone(context.session) + + self.assertRaises( + exception.NoEngineContextEstablished, + getattr, context, 'transaction_ctx' + ) + + +class SynchronousReaderWSlaveMockFacadeTest(MockFacadeTest): + synchronous_reader = True + + engine_uri = 'some_connection' + slave_uri = 'some_slave_connection' + + +class AsyncReaderWSlaveMockFacadeTest(MockFacadeTest): + synchronous_reader = False + + engine_uri = 'some_connection' + slave_uri = 'some_slave_connection' + + +class LegacyIntegrationtest(test_base.DbTestCase): + + def test_legacy_integration(self): + legacy_facade = enginefacade.get_legacy_facade() + self.assertTrue( + legacy_facade.get_engine() is + enginefacade._context_manager._factory._writer_engine + ) + + self.assertTrue( + enginefacade.get_legacy_facade() is legacy_facade + ) + + +class ThreadingTest(test_base.DbTestCase): + """Test copying on new threads using real connections and sessions.""" + + def _assert_ctx_connection(self, context, connection): + self.assertIs(context.connection, connection) + + def _assert_ctx_session(self, context, session): + self.assertIs(context.session, session) + + def _patch_thread_ident(self): + self.ident = 1 + + test_instance = self + + class MockThreadingLocal(object): + def __init__(self): + self.__dict__['state'] = collections.defaultdict(dict) + + def __deepcopy__(self, memo): + return self + + def __getattr__(self, key): + ns = self.state[test_instance.ident] + try: + return ns[key] + except KeyError: + raise AttributeError(key) + + def __setattr__(self, key, value): + ns = self.state[test_instance.ident] + ns[key] = value + + def __delattr__(self, key): + ns = self.state[test_instance.ident] + try: + del ns[key] + except KeyError: + raise AttributeError(key) + + return mock.patch.object( + enginefacade, "_TransactionContextTLocal", MockThreadingLocal) + + def test_thread_ctxmanager_writer(self): + context = oslo_context.RequestContext() + + with self._patch_thread_ident(): + with enginefacade.writer.using(context) as session: + self._assert_ctx_session(context, session) + + self.ident = 2 + + with enginefacade.reader.using(context) as sess2: + # new session + self.assertIsNot(sess2, session) + + # thread local shows the new session + self._assert_ctx_session(context, sess2) + + self.ident = 1 + + with enginefacade.reader.using(context) as sess3: + self.assertIs(sess3, session) + self._assert_ctx_session(context, session) + + def test_thread_ctxmanager_writer_connection(self): + context = oslo_context.RequestContext() + + with self._patch_thread_ident(): + with enginefacade.writer.connection.using(context) as conn: + self._assert_ctx_connection(context, conn) + + self.ident = 2 + + with enginefacade.reader.connection.using(context) as conn2: + # new connection + self.assertIsNot(conn2, conn) + + # thread local shows the new connection + self._assert_ctx_connection(context, conn2) + + with enginefacade.reader.connection.using( + context) as conn3: + # we still get the right connection even though + # this context is not the "copied" context + self.assertIsNot(conn3, conn) + self.assertIs(conn3, conn2) + + self.ident = 1 + + with enginefacade.reader.connection.using(context) as conn3: + self.assertIs(conn3, conn) + self._assert_ctx_connection(context, conn) + + def test_thread_ctxmanager_switch_styles(self): + + @enginefacade.writer.connection + def go_one(context): + self.assertIsNone(context.session) + self.assertIsNotNone(context.connection) + + self.ident = 2 + go_two(context) + + self.ident = 1 + self.assertIsNone(context.session) + self.assertIsNotNone(context.connection) + + @enginefacade.reader + def go_two(context): + self.assertIsNone(context.connection) + self.assertIsNotNone(context.session) + + context = oslo_context.RequestContext() + with self._patch_thread_ident(): + go_one(context) + + def test_thread_decorator_writer(self): + sessions = set() + + @enginefacade.writer + def go_one(context): + sessions.add(context.session) + + self.ident = 2 + go_two(context) + + self.ident = 1 + + go_three(context) + + @enginefacade.reader + def go_two(context): + assert context.session not in sessions + + @enginefacade.reader + def go_three(context): + assert context.session in sessions + + context = oslo_context.RequestContext() + with self._patch_thread_ident(): + go_one(context) + + def test_thread_decorator_writer_connection(self): + connections = set() + + @enginefacade.writer.connection + def go_one(context): + connections.add(context.connection) + + self.ident = 2 + go_two(context) + + self.ident = 1 + + go_three(context) + + @enginefacade.reader.connection + def go_two(context): + assert context.connection not in connections + + @enginefacade.reader + def go_three(context): + assert context.connection in connections + + context = oslo_context.RequestContext() + with self._patch_thread_ident(): + go_one(context) + + +class LiveFacadeTest(test_base.DbTestCase): + """test using live SQL with test-provisioned databases. + + Several of these tests require that multiple transactions run + simultaenously; as the default SQLite :memory: connection can't achieve + this, opportunistic test implementations against MySQL and PostgreSQL are + supplied. + + """ + + def setUp(self): + super(LiveFacadeTest, self).setUp() + + metadata = MetaData() + user_table = Table( + 'user', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(30)), + mysql_engine='InnoDB' + ) + self.user_table = user_table + metadata.create_all(self.engine) + self.addCleanup(metadata.drop_all, self.engine) + + class User(object): + def __init__(self, name): + self.name = name + + mapper(User, user_table) + self.User = User + + def _assert_ctx_connection(self, context, connection): + self.assertIs(context.connection, connection) + + def _assert_ctx_session(self, context, session): + self.assertIs(context.session, session) + + def test_transaction_committed(self): + context = oslo_context.RequestContext() + + with enginefacade.writer.using(context) as session: + session.add(self.User(name="u1")) + + session = self.sessionmaker(autocommit=True) + self.assertEqual( + "u1", + session.query(self.User.name).scalar() + ) + + def test_transaction_rollback(self): + context = oslo_context.RequestContext() + + class MyException(Exception): + pass + + @enginefacade.writer + def go(context): + context.session.add(self.User(name="u1")) + context.session.flush() + raise MyException("a test") + + self.assertRaises(MyException, go, context) + + session = self.sessionmaker(autocommit=True) + self.assertEqual( + None, + session.query(self.User.name).scalar() + ) + + def test_context_deepcopy_on_session(self): + context = oslo_context.RequestContext() + with enginefacade.writer.using(context) as session: + + ctx2 = copy.deepcopy(context) + self._assert_ctx_session(ctx2, session) + + with enginefacade.writer.using(ctx2) as s2: + self.assertIs(session, s2) + self._assert_ctx_session(ctx2, s2) + + s2.add(self.User(name="u1")) + s2.flush() + + session = self.sessionmaker(autocommit=True) + self.assertEqual( + "u1", + session.query(self.User.name).scalar() + ) + + def test_context_deepcopy_on_connection(self): + context = oslo_context.RequestContext() + with enginefacade.writer.connection.using(context) as conn: + + ctx2 = copy.deepcopy(context) + self._assert_ctx_connection(ctx2, conn) + + with enginefacade.writer.connection.using(ctx2) as conn2: + self.assertIs(conn, conn2) + self._assert_ctx_connection(ctx2, conn2) + + conn2.execute(self.user_table.insert().values(name="u1")) + + self._assert_ctx_connection(ctx2, conn2) + + session = self.sessionmaker(autocommit=True) + self.assertEqual( + "u1", + session.query(self.User.name).scalar() + ) + + @test_base.backend_specific("postgresql", "mysql") + def test_external_session_transaction(self): + context = oslo_context.RequestContext() + with enginefacade.writer.using(context) as session: + session.add(self.User(name="u1")) + session.flush() + + with enginefacade.writer.independent.using(context) as s2: + # transaction() uses a new session + self.assertIsNot(s2, session) + self._assert_ctx_session(context, s2) + + # rows within a distinct transaction + s2.add(self.User(name="u2")) + + # it also takes over the global enginefacade + # within the context + with enginefacade.writer.using(context) as s3: + self.assertIs(s3, s2) + s3.add(self.User(name="u3")) + + self._assert_ctx_session(context, session) + + # rollback the "outer" transaction + session.rollback() + + # add more state on the "outer" transaction + session.begin() + session.add(self.User(name="u4")) + + session = self.sessionmaker(autocommit=True) + + # inner transction + second part of "outer" transaction were committed + self.assertEqual( + [("u2",), ("u3",), ("u4", )], + session.query( + self.User.name).order_by(self.User.name).all() + ) + + def test_savepoint_transaction_decorator(self): + context = oslo_context.RequestContext() + + @enginefacade.writer + def go1(context): + session = context.session + session.add(self.User(name="u1")) + session.flush() + + try: + go2(context) + except Exception: + pass + + go3(context) + + session.add(self.User(name="u4")) + + @enginefacade.writer.savepoint + def go2(context): + session = context.session + session.add(self.User(name="u2")) + raise Exception("nope") + + @enginefacade.writer.savepoint + def go3(context): + session = context.session + session.add(self.User(name="u3")) + + go1(context) + + session = self.sessionmaker(autocommit=True) + + # inner transction + second part of "outer" transaction were committed + self.assertEqual( + [("u1",), ("u3",), ("u4", )], + session.query( + self.User.name).order_by(self.User.name).all() + ) + + def test_savepoint_transaction(self): + context = oslo_context.RequestContext() + + with enginefacade.writer.using(context) as session: + session.add(self.User(name="u1")) + session.flush() + + try: + with enginefacade.writer.savepoint.using(context) as session: + session.add(self.User(name="u2")) + raise Exception("nope") + except Exception: + pass + + with enginefacade.writer.savepoint.using(context) as session: + session.add(self.User(name="u3")) + + session.add(self.User(name="u4")) + + session = self.sessionmaker(autocommit=True) + + # inner transction + second part of "outer" transaction were committed + self.assertEqual( + [("u1",), ("u3",), ("u4", )], + session.query( + self.User.name).order_by(self.User.name).all() + ) + + @test_base.backend_specific("postgresql", "mysql") + def test_external_session_transaction_decorator(self): + context = oslo_context.RequestContext() + + @enginefacade.writer + def go1(context): + session = context.session + session.add(self.User(name="u1")) + session.flush() + + go2(context, session) + + self._assert_ctx_session(context, session) + + # rollback the "outer" transaction + session.rollback() + + # add more state on the "outer" transaction + session.begin() + session.add(self.User(name="u4")) + + @enginefacade.writer.independent + def go2(context, session): + s2 = context.session + # uses a new session + self.assertIsNot(s2, session) + self._assert_ctx_session(context, s2) + + # rows within a distinct transaction + s2.add(self.User(name="u2")) + + # it also takes over the global enginefacade + # within the context + with enginefacade.writer.using(context) as s3: + self.assertIs(s3, s2) + s3.add(self.User(name="u3")) + + go1(context) + + session = self.sessionmaker(autocommit=True) + + # inner transction + second part of "outer" transaction were committed + self.assertEqual( + [("u2",), ("u3",), ("u4", )], + session.query( + self.User.name).order_by(self.User.name).all() + ) + + @test_base.backend_specific("postgresql", "mysql") + def test_external_connection_transaction(self): + context = oslo_context.RequestContext() + with enginefacade.writer.connection.using(context) as connection: + connection.execute(self.user_table.insert().values(name="u1")) + + # transaction() uses a new Connection + with enginefacade.writer.independent.connection.\ + using(context) as c2: + self.assertIsNot(c2, connection) + self._assert_ctx_connection(context, c2) + + # rows within a distinct transaction + c2.execute(self.user_table.insert().values(name="u2")) + + # it also takes over the global enginefacade + # within the context + with enginefacade.writer.connection.using(context) as c3: + self.assertIs(c2, c3) + c3.execute(self.user_table.insert().values(name="u3")) + self._assert_ctx_connection(context, connection) + + # rollback the "outer" transaction + transaction_ctx = context.transaction_ctx + transaction_ctx.transaction.rollback() + transaction_ctx.transaction = connection.begin() + + # add more state on the "outer" transaction + connection.execute(self.user_table.insert().values(name="u4")) + + session = self.sessionmaker(autocommit=True) + self.assertEqual( + [("u2",), ("u3",), ("u4", )], + session.query( + self.User.name).order_by(self.User.name).all() + ) + + @test_base.backend_specific("postgresql", "mysql") + def test_external_writer_in_reader(self): + context = oslo_context.RequestContext() + with enginefacade.reader.using(context) as session: + ping = session.scalar(select([1])) + self.assertEqual(1, ping) + + # we're definitely a reader + @enginefacade.writer + def go(ctx): + pass + exc = self.assertRaises(TypeError, go, context) + self.assertEqual( + "Can't upgrade a READER transaction to a " + "WRITER mid-transaction", + exc.args[0]) + + # but we can do a writer on a new transaction + with enginefacade.writer.independent.using(context) as sess2: + self.assertIsNot(sess2, session) + self._assert_ctx_session(context, sess2) + + session.add(self.User(name="u1_nocommit")) + sess2.add(self.User(name="u1_commit")) + + user = session.query(self.User).first() + self.assertEqual("u1_commit", user.name) + + session = self.sessionmaker(autocommit=True) + self.assertEqual( + session.query( + self.User.name).order_by(self.User.name).all(), + [("u1_commit",)] + ) + + def test_replace_scope(self): + # "timeout" is an argument accepted by + # the pysqlite dialect, which we set here to ensure + # that even in an all-sqlite test, we test that the URL + # is different in the context we are looking for + alt_connection = "sqlite:///?timeout=90" + + alt_mgr1 = enginefacade.transaction_context() + alt_mgr1.configure( + connection=alt_connection, + ) + + @enginefacade.writer + def go1(context): + s1 = context.session + self.assertEqual( + s1.bind.url, + enginefacade._context_manager._factory._writer_engine.url) + self.assertIs( + s1.bind, + enginefacade._context_manager._factory._writer_engine) + self.assertEqual(s1.bind.url, self.engine.url) + + with alt_mgr1.replace.using(context): + go2(context) + + go4(context) + + @enginefacade.writer + def go2(context): + s2 = context.session + + # factory is not replaced globally... + self.assertIsNot( + enginefacade._context_manager._factory._writer_engine, + alt_mgr1._factory._writer_engine + ) + + # but it is replaced for us + self.assertIs(s2.bind, alt_mgr1._factory._writer_engine) + self.assertEqual( + str(s2.bind.url), alt_connection) + + go3(context) + + @enginefacade.reader + def go3(context): + s3 = context.session + + # in a call of a call, we still have the alt URL + self.assertIs(s3.bind, alt_mgr1._factory._writer_engine) + self.assertEqual( + str(s3.bind.url), alt_connection) + + @enginefacade.writer + def go4(context): + s4 = context.session + + # outside the "replace" context, all is back to normal + self.assertIs(s4.bind, self.engine) + self.assertEqual( + s4.bind.url, self.engine.url) + + context = oslo_context.RequestContext() + go1(context) + self.assertIsNot( + enginefacade._context_manager._factory._writer_engine, + alt_mgr1._factory._writer_engine + ) + + def test_replace_scope_only_global_eng(self): + # "timeout" is an argument accepted by + # the pysqlite dialect, which we set here to ensure + # that even in an all-sqlite test, we test that the URL + # is different in the context we are looking for + alt_connection1 = "sqlite:///?timeout=90" + + alt_mgr1 = enginefacade.transaction_context() + alt_mgr1.configure( + connection=alt_connection1, + ) + + alt_connection2 = "sqlite:///?timeout=120" + + alt_mgr2 = enginefacade.transaction_context() + alt_mgr2.configure( + connection=alt_connection2, + ) + + @enginefacade.writer + def go1(context): + s1 = context.session + # global engine + self.assertEqual(s1.bind.url, self.engine.url) + + # now replace global engine... + with alt_mgr1.replace.using(context): + go2(context) + + # and back + go6(context) + + @enginefacade.writer + def go2(context): + s2 = context.session + + # we have the replace-the-global engine + self.assertEqual(str(s2.bind.url), alt_connection1) + self.assertIs(s2.bind, alt_mgr1._factory._writer_engine) + + go3(context) + + @alt_mgr2.writer + def go3(context): + s3 = context.session + + # we don't use the global engine in the first place. + # make sure our own factory still used. + self.assertEqual(str(s3.bind.url), alt_connection2) + self.assertIs(s3.bind, alt_mgr2._factory._writer_engine) + + go4(context) + + @enginefacade.writer + def go4(context): + s4 = context.session + + # we *do* use the global, so we still want the replacement. + self.assertEqual(str(s4.bind.url), alt_connection1) + self.assertIs(s4.bind, alt_mgr1._factory._writer_engine) + + @enginefacade.writer + def go5(context): + s5 = context.session + + # ...and here also + self.assertEqual(str(s5.bind.url), alt_connection1) + self.assertIs(s5.bind, alt_mgr1._factory._writer_engine) + + @enginefacade.writer + def go6(context): + s6 = context.session + + # ...but not here! + self.assertEqual(str(s6.bind.url), str(self.engine.url)) + self.assertIs(s6.bind, self.engine) + + context = oslo_context.RequestContext() + go1(context) + + +class MySQLLiveFacadeTest( + test_base.MySQLOpportunisticTestCase, LiveFacadeTest): + pass + + +class PGLiveFacadeTest( + test_base.PostgreSQLOpportunisticTestCase, LiveFacadeTest): + pass + + +class ConfigOptionsTest(oslo_test_base.BaseTestCase): + def test_all_options(self): + """test that everything in CONF.database.iteritems() is accepted. + + There's a handful of options in oslo.db.options that seem to have + no meaning, but need to be accepted. In particular, Cinder and + maybe others are doing exactly this call. + + """ + + factory = enginefacade._TransactionFactory() + cfg.CONF.register_opts(options.database_opts, 'database') + factory.configure(**dict(cfg.CONF.database.items())) + + def test_options_not_supported(self): + factory = enginefacade._TransactionFactory() + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + factory.configure(fake1='x', idle_timeout=200, wrong2='y') + + self.assertEqual(1, len(w)) + self.assertTrue( + issubclass(w[-1].category, exception.NotSupportedWarning)) + self.assertEqual( + "Configuration option(s) ['fake1', 'wrong2'] not supported", + str(w[-1].message) + ) + +# TODO(zzzeek): test configuration options, e.g. like +# test_sqlalchemy->test_creation_from_config diff --git a/oslo_db/tests/sqlalchemy/test_exc_filters.py b/oslo_db/tests/sqlalchemy/test_exc_filters.py index bafb59e..34313ce 100644 --- a/oslo_db/tests/sqlalchemy/test_exc_filters.py +++ b/oslo_db/tests/sqlalchemy/test_exc_filters.py @@ -23,8 +23,8 @@ from sqlalchemy import event from sqlalchemy.orm import mapper from oslo_db import exception +from oslo_db.sqlalchemy import engines from oslo_db.sqlalchemy import exc_filters -from oslo_db.sqlalchemy import session from oslo_db.sqlalchemy import test_base from oslo_db.tests import utils as test_utils @@ -784,7 +784,7 @@ class TestDBDisconnected(TestsExceptionFilter): dialect_name, exception, num_disconnects, is_disconnect=True): engine = self.engine - event.listen(engine, "engine_connect", session._connect_ping_listener) + event.listen(engine, "engine_connect", engines._connect_ping_listener) real_do_execute = engine.dialect.do_execute counter = itertools.count(1) @@ -895,7 +895,7 @@ class TestDBConnectRetry(TestsExceptionFilter): with self._dbapi_fixture(dialect_name): with mock.patch.object(engine.dialect, "connect", cant_connect): - return session._test_connection(engine, retries, .01) + return engines._test_connection(engine, retries, .01) def test_connect_no_retries(self): conn = self._run_test( @@ -967,7 +967,7 @@ class TestDBConnectPingWrapping(TestsExceptionFilter): def setUp(self): super(TestDBConnectPingWrapping, self).setUp() event.listen( - self.engine, "engine_connect", session._connect_ping_listener) + self.engine, "engine_connect", engines._connect_ping_listener) @contextlib.contextmanager def _fixture( diff --git a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py index 516acb6..02b0af5 100644 --- a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py +++ b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py @@ -31,6 +31,7 @@ from sqlalchemy.ext.declarative import declarative_base from oslo_db import exception from oslo_db import options as db_options +from oslo_db.sqlalchemy import engines from oslo_db.sqlalchemy import models from oslo_db.sqlalchemy import session from oslo_db.sqlalchemy import test_base @@ -312,8 +313,8 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase): self.assertFalse(ses.autocommit) self.assertTrue(ses.expire_on_commit) - @mock.patch('oslo_db.sqlalchemy.session.get_maker') - @mock.patch('oslo_db.sqlalchemy.session.create_engine') + @mock.patch('oslo_db.sqlalchemy.orm.get_maker') + @mock.patch('oslo_db.sqlalchemy.engines.create_engine') def test_creation_from_config(self, create_engine, get_maker): conf = cfg.ConfigOpts() conf.register_opts(db_options.database_opts, group='database') @@ -495,7 +496,7 @@ class MysqlConnectTest(test_base.MySQLOpportunisticTestCase): ) ) ): - session._init_events.dispatch_on_drivername("mysql")(test_engine) + engines._init_events.dispatch_on_drivername("mysql")(test_engine) test_engine.raw_connection() self.assertIn('Unable to detect effective SQL mode', @@ -556,7 +557,7 @@ class CreateEngineTest(oslo_test.BaseTestCase): self.args = {'connect_args': {}} def test_queuepool_args(self): - session._init_connection_args( + engines._init_connection_args( url.make_url("mysql://u:p@host/test"), self.args, max_pool_size=10, max_overflow=10) self.assertEqual(self.args['pool_size'], 10) @@ -564,7 +565,7 @@ class CreateEngineTest(oslo_test.BaseTestCase): def test_sqlite_memory_pool_args(self): for _url in ("sqlite://", "sqlite:///:memory:"): - session._init_connection_args( + engines._init_connection_args( url.make_url(_url), self.args, max_pool_size=10, max_overflow=10) @@ -581,7 +582,7 @@ class CreateEngineTest(oslo_test.BaseTestCase): self.assertTrue('poolclass' in self.args) def test_sqlite_file_pool_args(self): - session._init_connection_args( + engines._init_connection_args( url.make_url("sqlite:///somefile.db"), self.args, max_pool_size=10, max_overflow=10) @@ -597,37 +598,37 @@ class CreateEngineTest(oslo_test.BaseTestCase): self.assertTrue('poolclass' not in self.args) def test_mysql_connect_args_default(self): - session._init_connection_args( + engines._init_connection_args( url.make_url("mysql://u:p@host/test"), self.args) self.assertEqual(self.args['connect_args'], {'charset': 'utf8', 'use_unicode': 0}) def test_mysql_oursql_connect_args_default(self): - session._init_connection_args( + engines._init_connection_args( url.make_url("mysql+oursql://u:p@host/test"), self.args) self.assertEqual(self.args['connect_args'], {'charset': 'utf8', 'use_unicode': 0}) def test_mysql_mysqldb_connect_args_default(self): - session._init_connection_args( + engines._init_connection_args( url.make_url("mysql+mysqldb://u:p@host/test"), self.args) self.assertEqual(self.args['connect_args'], {'charset': 'utf8', 'use_unicode': 0}) def test_postgresql_connect_args_default(self): - session._init_connection_args( + engines._init_connection_args( url.make_url("postgresql://u:p@host/test"), self.args) self.assertEqual(self.args['client_encoding'], 'utf8') self.assertFalse(self.args['connect_args']) def test_mysqlconnector_raise_on_warnings_default(self): - session._init_connection_args( + engines._init_connection_args( url.make_url("mysql+mysqlconnector://u:p@host/test"), self.args) self.assertEqual(self.args['connect_args']['raise_on_warnings'], False) def test_mysqlconnector_raise_on_warnings_override(self): - session._init_connection_args( + engines._init_connection_args( url.make_url( "mysql+mysqlconnector://u:p@host/test" "?raise_on_warnings=true"), @@ -639,12 +640,12 @@ class CreateEngineTest(oslo_test.BaseTestCase): def test_thread_checkin(self): with mock.patch("sqlalchemy.event.listens_for"): with mock.patch("sqlalchemy.event.listen") as listen_evt: - session._init_events.dispatch_on_drivername( + engines._init_events.dispatch_on_drivername( "sqlite")(mock.Mock()) self.assertEqual( listen_evt.mock_calls[0][1][-1], - session._thread_yield + engines._thread_yield ) @@ -693,7 +694,7 @@ class PatchStacktraceTest(test_base.DbTestCase): with mock.patch("traceback.extract_stack", side_effect=extract_stack): - session._add_trace_comments(engine) + engines._add_trace_comments(engine) conn = engine.connect() orig_do_exec = engine.dialect.do_execute with mock.patch.object(engine.dialect, "do_execute") as mock_exec: |