summaryrefslogtreecommitdiff
path: root/oslo_db/sqlalchemy/session.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_db/sqlalchemy/session.py')
-rw-r--r--oslo_db/sqlalchemy/session.py874
1 files changed, 82 insertions, 792 deletions
diff --git a/oslo_db/sqlalchemy/session.py b/oslo_db/sqlalchemy/session.py
index 0dae851..9155176 100644
--- a/oslo_db/sqlalchemy/session.py
+++ b/oslo_db/sqlalchemy/session.py
@@ -18,96 +18,71 @@
Recommended ways to use sessions within this framework:
-* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
- `model_query()` will implicitly use a session when called without one
- supplied. This is the ideal situation because it will allow queries
- to be automatically retried if the database connection is interrupted.
-
- .. note:: Automatic retry will be enabled in a future patch.
-
- It is generally fine to issue several queries in a row like this. Even though
- they may be run in separate transactions and/or separate sessions, each one
- will see the data from the prior calls. If needed, undo- or rollback-like
- functionality should be handled at a logical level. For an example, look at
- the code around quotas and `reservation_rollback()`.
-
- Examples:
+* Use the ``enginefacade`` system for connectivity, session and
+ transaction management:
.. code-block:: python
+ from oslo.db.sqlalchemy import enginefacade
+
+ @enginefacade.reader
def get_foo(context, foo):
- return (model_query(context, models.Foo).
+ return (model_query(models.Foo, context.session).
filter_by(foo=foo).
first())
+ @enginefacade.writer
def update_foo(context, id, newfoo):
- (model_query(context, models.Foo).
+ (model_query(models.Foo, context.session).
filter_by(id=id).
update({'foo': newfoo}))
+ @enginefacade.writer
def create_foo(context, values):
foo_ref = models.Foo()
foo_ref.update(values)
- foo_ref.save()
+ foo_ref.save(context.session)
return foo_ref
-
-* Within the scope of a single method, keep all the reads and writes within
- the context managed by a single session. In this way, the session's
- `__exit__` handler will take care of calling `flush()` and `commit()` for
- you. If using this approach, you should not explicitly call `flush()` or
- `commit()`. Any error within the context of the session will cause the
- session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be
- raised in `session`'s `__exit__` handler, and any try/except within the
- context managed by `session` will not be triggered. And catching other
- non-database errors in the session will not trigger the ROLLBACK, so
- exception handlers should always be outside the session, unless the
- developer wants to do a partial commit on purpose. If the connection is
- dropped before this is possible, the database will implicitly roll back the
- transaction.
+ In the above system, transactions are committed automatically, and
+ are shared among all dependent database methods. Ensure
+ that methods which "write" data are enclosed within @writer blocks.
.. note:: Statements in the session scope will not be automatically retried.
- If you create models within the session, they need to be added, but you
+* If you create models within the session, they need to be added, but you
do not need to call `model.save()`:
.. code-block:: python
+ @enginefacade.writer
def create_many_foo(context, foos):
- session = sessionmaker()
- with session.begin():
- for foo in foos:
- foo_ref = models.Foo()
- foo_ref.update(foo)
- session.add(foo_ref)
+ for foo in foos:
+ foo_ref = models.Foo()
+ foo_ref.update(foo)
+ context.session.add(foo_ref)
+ @enginefacade.writer
def update_bar(context, foo_id, newbar):
- session = sessionmaker()
- with session.begin():
- foo_ref = (model_query(context, models.Foo, session).
- filter_by(id=foo_id).
- first())
- (model_query(context, models.Bar, session).
- filter_by(id=foo_ref['bar_id']).
- update({'bar': newbar}))
-
- .. note:: `update_bar` is a trivially simple example of using
- ``with session.begin``. Whereas `create_many_foo` is a good example of
- when a transaction is needed, it is always best to use as few queries as
- possible.
-
- The two queries in `update_bar` can be better expressed using a single query
- which avoids the need for an explicit transaction. It can be expressed like
- so:
+ foo_ref = (model_query(models.Foo, context.session).
+ filter_by(id=foo_id).
+ first())
+ (model_query(models.Bar, context.session).
+ filter_by(id=foo_ref['bar_id']).
+ update({'bar': newbar}))
+
+ The two queries in `update_bar` can alternatively be expressed using
+ a single query, which may be more efficient depending on scenario:
.. code-block:: python
+ @enginefacade.writer
def update_bar(context, foo_id, newbar):
- subq = (model_query(context, models.Foo.id).
+ subq = (model_query(models.Foo.id, context.session).
filter_by(id=foo_id).
limit(1).
subquery())
- (model_query(context, models.Bar).
+ (model_query(models.Bar, context.session).
filter_by(id=subq.as_scalar()).
update({'bar': newbar}))
@@ -119,87 +94,54 @@ Recommended ways to use sessions within this framework:
WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
.. note:: `create_duplicate_foo` is a trivially simple example of catching an
- exception while using ``with session.begin``. Here create two duplicate
+ exception while using a savepoint. Here we create two duplicate
instances with same primary key, must catch the exception out of context
managed by a single session:
.. code-block:: python
+ @enginefacade.writer
def create_duplicate_foo(context):
foo1 = models.Foo()
foo2 = models.Foo()
foo1.id = foo2.id = 1
- session = sessionmaker()
try:
- with session.begin():
+ with context.session.begin_nested():
session.add(foo1)
session.add(foo2)
except exception.DBDuplicateEntry as e:
handle_error(e)
-* Passing an active session between methods. Sessions should only be passed
- to private methods. The private method must use a subtransaction; otherwise
- SQLAlchemy will throw an error when you call `session.begin()` on an existing
- transaction. Public methods should not accept a session parameter and should
- not be involved in sessions within the caller's scope.
-
- Note that this incurs more overhead in SQLAlchemy than the above means
- due to nesting transactions, and it is not possible to implicitly retry
- failed database operations when using this approach.
-
- This also makes code somewhat more difficult to read and debug, because a
- single database transaction spans more than one method. Error handling
- becomes less clear in this situation. When this is needed for code clarity,
- it should be clearly documented.
+* The enginefacade system eliminates the need to decide when sessions need
+ to be passed between methods. All methods should instead share a common
+ context object; the enginefacade system will maintain the transaction
+ across method calls.
.. code-block:: python
- def myfunc(foo):
- session = sessionmaker()
- with session.begin():
- # do some database things
- bar = _private_func(foo, session)
+ @enginefacade.writer
+ def myfunc(context, foo):
+ # do some database things
+ bar = _private_func(context, foo)
return bar
- def _private_func(foo, session=None):
- if not session:
- session = sessionmaker()
- with session.begin(subtransaction=True):
+ def _private_func(context, foo):
+ with enginefacade.using_writer(context) as session:
# do some other database things
+ session.add(SomeObject())
return bar
-There are some things which it is best to avoid:
-
-* Don't keep a transaction open any longer than necessary.
-
- This means that your ``with session.begin()`` block should be as short
- as possible, while still containing all the related calls for that
- transaction.
-
* Avoid ``with_lockmode('UPDATE')`` when possible.
- In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match
- any rows, it will take a gap-lock. This is a form of write-lock on the
- "gap" where no rows exist, and prevents any other writes to that space.
- This can effectively prevent any INSERT into a table by locking the gap
- at the end of the index. Similar problems will occur if the SELECT FOR UPDATE
- has an overly broad WHERE clause, or doesn't properly use an index.
-
- One idea proposed at ODS Fall '12 was to use a normal SELECT to test the
- number of rows matching a query, and if only one row is returned,
- then issue the SELECT FOR UPDATE.
-
- The better long-term solution is to use
- ``INSERT .. ON DUPLICATE KEY UPDATE``.
- However, this can not be done until the "deleted" columns are removed and
- proper UNIQUE constraints are added to the tables.
-
+ FOR UPDATE is not compatible with MySQL/Galera. Instead, an "opportunistic"
+ approach should be used, such that if an UPDATE fails, the entire
+ transaction should be retried. The @wrap_db_retry decorator is one
+ such system that can be used to achieve this.
Enabling soft deletes:
-* To use/enable soft-deletes, the `SoftDeleteMixin` must be added
- to your model class. For example:
+* To use/enable soft-deletes, `SoftDeleteMixin` may be used. For example:
.. code-block:: python
@@ -209,698 +151,46 @@ Enabling soft deletes:
Efficient use of soft deletes:
-* There are two possible ways to mark a record as deleted:
- `model.soft_delete()` and `query.soft_delete()`.
-
- The `model.soft_delete()` method works with a single already-fetched entry.
- `query.soft_delete()` makes only one db request for all entries that
- correspond to the query.
-
-* In almost all cases you should use `query.soft_delete()`. Some examples:
+* While there is a ``model.soft_delete()`` method, prefer
+ ``query.soft_delete()``. Some examples:
.. code-block:: python
- def soft_delete_bar():
- count = model_query(BarModel).find(some_condition).soft_delete()
+ @enginefacade.writer
+ def soft_delete_bar(context):
+ # synchronize_session=False will prevent the ORM from attempting
+ # to search the Session for instances matching the DELETE;
+ # this is typically not necessary for small operations.
+ count = model_query(BarModel, context.session).\\
+ find(some_condition).soft_delete(synchronize_session=False)
if count == 0:
raise Exception("0 entries were soft deleted")
- def complex_soft_delete_with_synchronization_bar(session=None):
- if session is None:
- session = sessionmaker()
- with session.begin(subtransactions=True):
- count = (model_query(BarModel).
- find(some_condition).
- soft_delete(synchronize_session=True))
- # Here synchronize_session is required, because we
- # don't know what is going on in outer session.
- if count == 0:
- raise Exception("0 entries were soft deleted")
-
-* There is only one situation where `model.soft_delete()` is appropriate: when
- you fetch a single record, work with it, and mark it as deleted in the same
- transaction.
-
- .. code-block:: python
-
- def soft_delete_bar_model():
- session = sessionmaker()
- with session.begin():
- bar_ref = model_query(BarModel).find(some_condition).first()
- # Work with bar_ref
- bar_ref.soft_delete(session=session)
-
- However, if you need to work with all entries that correspond to query and
- then soft delete them you should use the `query.soft_delete()` method:
+ @enginefacade.writer
+ def complex_soft_delete_with_synchronization_bar(context):
+ # use synchronize_session='evaluate' when you'd like to attempt
+ # to update the state of the Session to match that of the DELETE.
+ # This is potentially helpful if the operation is complex and
+ # continues to work with instances that were loaded, though
+ # not usually needed.
+ count = (model_query(BarModel, context.session).
+ find(some_condition).
+ soft_delete(synchronize_session='evaulate'))
+ if count == 0:
+ raise Exception("0 entries were soft deleted")
- .. code-block:: python
- def soft_delete_multi_models():
- session = sessionmaker()
- with session.begin():
- query = (model_query(BarModel, session=session).
- find(some_condition))
- model_refs = query.all()
- # Work with model_refs
- query.soft_delete(synchronize_session=False)
- # synchronize_session=False should be set if there is no outer
- # session and these entries are not used after this.
-
- When working with many rows, it is very important to use query.soft_delete,
- which issues a single query. Using `model.soft_delete()`, as in the following
- example, is very inefficient.
+"""
- .. code-block:: python
+from oslo_db.sqlalchemy import enginefacade
+from oslo_db.sqlalchemy import engines
+from oslo_db.sqlalchemy import orm
- for bar_ref in bar_refs:
- bar_ref.soft_delete(session=session)
- # This will produce count(bar_refs) db requests.
+EngineFacade = enginefacade.LegacyEngineFacade
+create_engine = engines.create_engine
+get_maker = orm.get_maker
+Query = orm.Query
+Session = orm.Session
-"""
-import itertools
-import logging
-import os
-import re
-import time
-
-from oslo_utils import timeutils
-import six
-from sqlalchemy import event
-from sqlalchemy import exc
-import sqlalchemy.orm
-from sqlalchemy import pool
-from sqlalchemy.sql.expression import literal_column
-from sqlalchemy.sql.expression import select
-
-from oslo_db._i18n import _LW
-from oslo_db import exception
-from oslo_db import options
-from oslo_db.sqlalchemy import exc_filters
-from oslo_db.sqlalchemy import update_match
-from oslo_db.sqlalchemy import utils
-
-LOG = logging.getLogger(__name__)
-
-
-def _thread_yield(dbapi_con, con_record):
- """Ensure other greenthreads get a chance to be executed.
-
- If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
- execute instead of time.sleep(0).
- Force a context switch. With common database backends (eg MySQLdb and
- sqlite), there is no implicit yield caused by network I/O since they are
- implemented by C libraries that eventlet cannot monkey patch.
- """
- time.sleep(0)
-
-
-def _connect_ping_listener(connection, branch):
- """Ping the server at connection startup.
-
- Ping the server at transaction begin and transparently reconnect
- if a disconnect exception occurs.
- """
- if branch:
- return
-
- # turn off "close with result". This can also be accomplished
- # by branching the connection, however just setting the flag is
- # more performant and also doesn't get involved with some
- # connection-invalidation awkardness that occurs (see
- # https://bitbucket.org/zzzeek/sqlalchemy/issue/3215/)
- save_should_close_with_result = connection.should_close_with_result
- connection.should_close_with_result = False
- try:
- # run a SELECT 1. use a core select() so that
- # any details like that needed by Oracle, DB2 etc. are handled.
- connection.scalar(select([1]))
- except exception.DBConnectionError:
- # catch DBConnectionError, which is raised by the filter
- # system.
- # disconnect detected. The connection is now
- # "invalid", but the pool should be ready to return
- # new connections assuming they are good now.
- # run the select again to re-validate the Connection.
- connection.scalar(select([1]))
- finally:
- connection.should_close_with_result = save_should_close_with_result
-
-
-def _setup_logging(connection_debug=0):
- """setup_logging function maps SQL debug level to Python log level.
-
- Connection_debug is a verbosity of SQL debugging information.
- 0=None(default value),
- 1=Processed only messages with WARNING level or higher
- 50=Processed only messages with INFO level or higher
- 100=Processed only messages with DEBUG level
- """
- if connection_debug >= 0:
- logger = logging.getLogger('sqlalchemy.engine')
- if connection_debug >= 100:
- logger.setLevel(logging.DEBUG)
- elif connection_debug >= 50:
- logger.setLevel(logging.INFO)
- else:
- logger.setLevel(logging.WARNING)
-
-
-def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
- idle_timeout=3600,
- connection_debug=0, max_pool_size=None, max_overflow=None,
- pool_timeout=None, sqlite_synchronous=True,
- connection_trace=False, max_retries=10, retry_interval=10,
- thread_checkin=True, logging_name=None):
- """Return a new SQLAlchemy engine."""
-
- url = sqlalchemy.engine.url.make_url(sql_connection)
-
- engine_args = {
- "pool_recycle": idle_timeout,
- 'convert_unicode': True,
- 'connect_args': {},
- 'logging_name': logging_name
- }
-
- _setup_logging(connection_debug)
-
- _init_connection_args(
- url, engine_args,
- sqlite_fk=sqlite_fk,
- max_pool_size=max_pool_size,
- max_overflow=max_overflow,
- pool_timeout=pool_timeout
- )
-
- engine = sqlalchemy.create_engine(url, **engine_args)
-
- _init_events(
- engine,
- mysql_sql_mode=mysql_sql_mode,
- sqlite_synchronous=sqlite_synchronous,
- sqlite_fk=sqlite_fk,
- thread_checkin=thread_checkin,
- connection_trace=connection_trace
- )
-
- # register alternate exception handler
- exc_filters.register_engine(engine)
-
- # register engine connect handler
- event.listen(engine, "engine_connect", _connect_ping_listener)
-
- # initial connect + test
- # NOTE(viktors): the current implementation of _test_connection()
- # does nothing, if max_retries == 0, so we can skip it
- if max_retries:
- test_conn = _test_connection(engine, max_retries, retry_interval)
- test_conn.close()
-
- return engine
-
-
-@utils.dispatch_for_dialect('*', multiple=True)
-def _init_connection_args(
- url, engine_args,
- max_pool_size=None, max_overflow=None, pool_timeout=None, **kw):
-
- pool_class = url.get_dialect().get_pool_class(url)
- if issubclass(pool_class, pool.QueuePool):
- if max_pool_size is not None:
- engine_args['pool_size'] = max_pool_size
- if max_overflow is not None:
- engine_args['max_overflow'] = max_overflow
- if pool_timeout is not None:
- engine_args['pool_timeout'] = pool_timeout
-
-
-@_init_connection_args.dispatch_for("sqlite")
-def _init_connection_args(url, engine_args, **kw):
- pool_class = url.get_dialect().get_pool_class(url)
- # singletonthreadpool is used for :memory: connections;
- # replace it with StaticPool.
- if issubclass(pool_class, pool.SingletonThreadPool):
- engine_args["poolclass"] = pool.StaticPool
- engine_args['connect_args']['check_same_thread'] = False
-
-
-@_init_connection_args.dispatch_for("postgresql")
-def _init_connection_args(url, engine_args, **kw):
- if 'client_encoding' not in url.query:
- # Set encoding using engine_args instead of connect_args since
- # it's supported for PostgreSQL 8.*. More details at:
- # http://docs.sqlalchemy.org/en/rel_0_9/dialects/postgresql.html
- engine_args['client_encoding'] = 'utf8'
-
-
-@_init_connection_args.dispatch_for("mysql")
-def _init_connection_args(url, engine_args, **kw):
- if 'charset' not in url.query:
- engine_args['connect_args']['charset'] = 'utf8'
-
-
-@_init_connection_args.dispatch_for("mysql+mysqlconnector")
-def _init_connection_args(url, engine_args, **kw):
- # mysqlconnector engine (<1.0) incorrectly defaults to
- # raise_on_warnings=True
- # https://bitbucket.org/zzzeek/sqlalchemy/issue/2515
- if 'raise_on_warnings' not in url.query:
- engine_args['connect_args']['raise_on_warnings'] = False
-
-
-@_init_connection_args.dispatch_for("mysql+mysqldb")
-@_init_connection_args.dispatch_for("mysql+oursql")
-def _init_connection_args(url, engine_args, **kw):
- # Those drivers require use_unicode=0 to avoid performance drop due
- # to internal usage of Python unicode objects in the driver
- # http://docs.sqlalchemy.org/en/rel_0_9/dialects/mysql.html
- if 'use_unicode' not in url.query:
- engine_args['connect_args']['use_unicode'] = 0
-
-
-@utils.dispatch_for_dialect('*', multiple=True)
-def _init_events(engine, thread_checkin=True, connection_trace=False, **kw):
- """Set up event listeners for all database backends."""
-
- _add_process_guards(engine)
-
- if connection_trace:
- _add_trace_comments(engine)
-
- if thread_checkin:
- sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
-
-
-@_init_events.dispatch_for("mysql")
-def _init_events(engine, mysql_sql_mode=None, **kw):
- """Set up event listeners for MySQL."""
-
- if mysql_sql_mode is not None:
- @sqlalchemy.event.listens_for(engine, "connect")
- def _set_session_sql_mode(dbapi_con, connection_rec):
- cursor = dbapi_con.cursor()
- cursor.execute("SET SESSION sql_mode = %s", [mysql_sql_mode])
-
- @sqlalchemy.event.listens_for(engine, "first_connect")
- def _check_effective_sql_mode(dbapi_con, connection_rec):
- if mysql_sql_mode is not None:
- _set_session_sql_mode(dbapi_con, connection_rec)
-
- cursor = dbapi_con.cursor()
- cursor.execute("SHOW VARIABLES LIKE 'sql_mode'")
- realmode = cursor.fetchone()
-
- if realmode is None:
- LOG.warning(_LW('Unable to detect effective SQL mode'))
- else:
- realmode = realmode[1]
- LOG.debug('MySQL server mode set to %s', realmode)
- if 'TRADITIONAL' not in realmode.upper() and \
- 'STRICT_ALL_TABLES' not in realmode.upper():
- LOG.warning(
- _LW(
- "MySQL SQL mode is '%s', "
- "consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
- realmode)
-
-
-@_init_events.dispatch_for("sqlite")
-def _init_events(engine, sqlite_synchronous=True, sqlite_fk=False, **kw):
- """Set up event listeners for SQLite.
-
- This includes several settings made on connections as they are
- created, as well as transactional control extensions.
-
- """
-
- def regexp(expr, item):
- reg = re.compile(expr)
- return reg.search(six.text_type(item)) is not None
-
- @sqlalchemy.event.listens_for(engine, "connect")
- def _sqlite_connect_events(dbapi_con, con_record):
-
- # Add REGEXP functionality on SQLite connections
- dbapi_con.create_function('regexp', 2, regexp)
-
- if not sqlite_synchronous:
- # Switch sqlite connections to non-synchronous mode
- dbapi_con.execute("PRAGMA synchronous = OFF")
-
- # Disable pysqlite's emitting of the BEGIN statement entirely.
- # Also stops it from emitting COMMIT before any DDL.
- # below, we emit BEGIN ourselves.
- # see http://docs.sqlalchemy.org/en/rel_0_9/dialects/\
- # sqlite.html#serializable-isolation-savepoints-transactional-ddl
- dbapi_con.isolation_level = None
-
- if sqlite_fk:
- # Ensures that the foreign key constraints are enforced in SQLite.
- dbapi_con.execute('pragma foreign_keys=ON')
-
- @sqlalchemy.event.listens_for(engine, "begin")
- def _sqlite_emit_begin(conn):
- # emit our own BEGIN, checking for existing
- # transactional state
- if 'in_transaction' not in conn.info:
- conn.execute("BEGIN")
- conn.info['in_transaction'] = True
-
- @sqlalchemy.event.listens_for(engine, "rollback")
- @sqlalchemy.event.listens_for(engine, "commit")
- def _sqlite_end_transaction(conn):
- # remove transactional marker
- conn.info.pop('in_transaction', None)
-
-
-def _test_connection(engine, max_retries, retry_interval):
- if max_retries == -1:
- attempts = itertools.count()
- else:
- attempts = six.moves.range(max_retries)
- # See: http://legacy.python.org/dev/peps/pep-3110/#semantic-changes for
- # why we are not using 'de' directly (it can be removed from the local
- # scope).
- de_ref = None
- for attempt in attempts:
- try:
- return engine.connect()
- except exception.DBConnectionError as de:
- msg = _LW('SQL connection failed. %s attempts left.')
- LOG.warning(msg, max_retries - attempt)
- time.sleep(retry_interval)
- de_ref = de
- else:
- if de_ref is not None:
- six.reraise(type(de_ref), de_ref)
-
-
-class Query(sqlalchemy.orm.query.Query):
- """Subclass of sqlalchemy.query with soft_delete() method."""
- def soft_delete(self, synchronize_session='evaluate'):
- return self.update({'deleted': literal_column('id'),
- 'updated_at': literal_column('updated_at'),
- 'deleted_at': timeutils.utcnow()},
- synchronize_session=synchronize_session)
-
- def update_returning_pk(self, values, surrogate_key):
- """Perform an UPDATE, returning the primary key of the matched row.
-
- This is a method-version of
- oslo_db.sqlalchemy.update_match.update_returning_pk(); see that
- function for usage details.
-
- """
- return update_match.update_returning_pk(self, values, surrogate_key)
-
- def update_on_match(self, specimen, surrogate_key, values, **kw):
- """Emit an UPDATE statement matching the given specimen.
-
- This is a method-version of
- oslo_db.sqlalchemy.update_match.update_on_match(); see that function
- for usage details.
-
- """
- return update_match.update_on_match(
- self, specimen, surrogate_key, values, **kw)
-
-
-class Session(sqlalchemy.orm.session.Session):
- """Custom Session class to avoid SqlAlchemy Session monkey patching."""
-
-
-def get_maker(engine, autocommit=True, expire_on_commit=False):
- """Return a SQLAlchemy sessionmaker using the given engine."""
- return sqlalchemy.orm.sessionmaker(bind=engine,
- class_=Session,
- autocommit=autocommit,
- expire_on_commit=expire_on_commit,
- query_cls=Query)
-
-
-def _add_process_guards(engine):
- """Add multiprocessing guards.
-
- Forces a connection to be reconnected if it is detected
- as having been shared to a sub-process.
-
- """
-
- @sqlalchemy.event.listens_for(engine, "connect")
- def connect(dbapi_connection, connection_record):
- connection_record.info['pid'] = os.getpid()
-
- @sqlalchemy.event.listens_for(engine, "checkout")
- def checkout(dbapi_connection, connection_record, connection_proxy):
- pid = os.getpid()
- if connection_record.info['pid'] != pid:
- LOG.debug(_LW(
- "Parent process %(orig)s forked (%(newproc)s) with an open "
- "database connection, "
- "which is being discarded and recreated."),
- {"newproc": pid, "orig": connection_record.info['pid']})
- connection_record.connection = connection_proxy.connection = None
- raise exc.DisconnectionError(
- "Connection record belongs to pid %s, "
- "attempting to check out in pid %s" %
- (connection_record.info['pid'], pid)
- )
-
-
-def _add_trace_comments(engine):
- """Add trace comments.
-
- Augment statements with a trace of the immediate calling code
- for a given statement.
- """
-
- import os
- import sys
- import traceback
- target_paths = set([
- os.path.dirname(sys.modules['oslo_db'].__file__),
- os.path.dirname(sys.modules['sqlalchemy'].__file__)
- ])
-
- @sqlalchemy.event.listens_for(engine, "before_cursor_execute", retval=True)
- def before_cursor_execute(conn, cursor, statement, parameters, context,
- executemany):
-
- # NOTE(zzzeek) - if different steps per DB dialect are desirable
- # here, switch out on engine.name for now.
- stack = traceback.extract_stack()
- our_line = None
- for idx, (filename, line, method, function) in enumerate(stack):
- for tgt in target_paths:
- if filename.startswith(tgt):
- our_line = idx
- break
- if our_line:
- break
-
- if our_line:
- trace = "; ".join(
- "File: %s (%s) %s" % (
- line[0], line[1], line[2]
- )
- # include three lines of context.
- for line in stack[our_line - 3:our_line]
-
- )
- statement = "%s -- %s" % (statement, trace)
-
- return statement, parameters
-
-
-class EngineFacade(object):
- """A helper class for removing of global engine instances from oslo.db.
-
- As a library, oslo.db can't decide where to store/when to create engine
- and sessionmaker instances, so this must be left for a target application.
-
- On the other hand, in order to simplify the adoption of oslo.db changes,
- we'll provide a helper class, which creates engine and sessionmaker
- on its instantiation and provides get_engine()/get_session() methods
- that are compatible with corresponding utility functions that currently
- exist in target projects, e.g. in Nova.
-
- engine/sessionmaker instances will still be global (and they are meant to
- be global), but they will be stored in the app context, rather that in the
- oslo.db context.
-
- Note: using of this helper is completely optional and you are encouraged to
- integrate engine/sessionmaker instances into your apps any way you like
- (e.g. one might want to bind a session to a request context). Two important
- things to remember:
-
- 1. An Engine instance is effectively a pool of DB connections, so it's
- meant to be shared (and it's thread-safe).
- 2. A Session instance is not meant to be shared and represents a DB
- transactional context (i.e. it's not thread-safe). sessionmaker is
- a factory of sessions.
-
- """
-
- def __init__(self, sql_connection, slave_connection=None,
- sqlite_fk=False, autocommit=True,
- expire_on_commit=False, **kwargs):
- """Initialize engine and sessionmaker instances.
-
- :param sql_connection: the connection string for the database to use
- :type sql_connection: string
-
- :param slave_connection: the connection string for the 'slave' database
- to use. If not provided, the master database
- will be used for all operations. Note: this
- is meant to be used for offloading of read
- operations to asynchronously replicated slaves
- to reduce the load on the master database.
- :type slave_connection: string
-
- :param sqlite_fk: enable foreign keys in SQLite
- :type sqlite_fk: bool
-
- :param autocommit: use autocommit mode for created Session instances
- :type autocommit: bool
-
- :param expire_on_commit: expire session objects on commit
- :type expire_on_commit: bool
-
- Keyword arguments:
-
- :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
- (defaults to TRADITIONAL)
- :keyword idle_timeout: timeout before idle sql connections are reaped
- (defaults to 3600)
- :keyword connection_debug: verbosity of SQL debugging information.
- -1=Off, 0=None, 100=Everything (defaults
- to 0)
- :keyword max_pool_size: maximum number of SQL connections to keep open
- in a pool (defaults to SQLAlchemy settings)
- :keyword max_overflow: if set, use this value for max_overflow with
- sqlalchemy (defaults to SQLAlchemy settings)
- :keyword pool_timeout: if set, use this value for pool_timeout with
- sqlalchemy (defaults to SQLAlchemy settings)
- :keyword sqlite_synchronous: if True, SQLite uses synchronous mode
- (defaults to True)
- :keyword connection_trace: add python stack traces to SQL as comment
- strings (defaults to False)
- :keyword max_retries: maximum db connection retries during startup.
- (setting -1 implies an infinite retry count)
- (defaults to 10)
- :keyword retry_interval: interval between retries of opening a sql
- connection (defaults to 10)
- :keyword thread_checkin: boolean that indicates that between each
- engine checkin event a sleep(0) will occur to
- allow other greenthreads to run (defaults to
- True)
- """
-
- super(EngineFacade, self).__init__()
-
- engine_kwargs = {
- 'sqlite_fk': sqlite_fk,
- 'mysql_sql_mode': kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
- 'idle_timeout': kwargs.get('idle_timeout', 3600),
- 'connection_debug': kwargs.get('connection_debug', 0),
- 'max_pool_size': kwargs.get('max_pool_size'),
- 'max_overflow': kwargs.get('max_overflow'),
- 'pool_timeout': kwargs.get('pool_timeout'),
- 'sqlite_synchronous': kwargs.get('sqlite_synchronous', True),
- 'connection_trace': kwargs.get('connection_trace', False),
- 'max_retries': kwargs.get('max_retries', 10),
- 'retry_interval': kwargs.get('retry_interval', 10),
- 'thread_checkin': kwargs.get('thread_checkin', True)
- }
- maker_kwargs = {
- 'autocommit': autocommit,
- 'expire_on_commit': expire_on_commit
- }
-
- self._engine = create_engine(sql_connection=sql_connection,
- **engine_kwargs)
- self._session_maker = get_maker(engine=self._engine,
- **maker_kwargs)
- if slave_connection:
- self._slave_engine = create_engine(sql_connection=slave_connection,
- **engine_kwargs)
- self._slave_session_maker = get_maker(engine=self._slave_engine,
- **maker_kwargs)
- else:
- self._slave_engine = None
- self._slave_session_maker = None
-
- def get_engine(self, use_slave=False):
- """Get the engine instance (note, that it's shared).
-
- :param use_slave: if possible, use 'slave' database for this engine.
- If the connection string for the slave database
- wasn't provided, 'master' engine will be returned.
- (defaults to False)
- :type use_slave: bool
-
- """
-
- if use_slave and self._slave_engine:
- return self._slave_engine
-
- return self._engine
-
- def get_session(self, use_slave=False, **kwargs):
- """Get a Session instance.
-
- :param use_slave: if possible, use 'slave' database connection for
- this session. If the connection string for the
- slave database wasn't provided, a session bound
- to the 'master' engine will be returned.
- (defaults to False)
- :type use_slave: bool
-
- Keyword arugments will be passed to a sessionmaker instance as is (if
- passed, they will override the ones used when the sessionmaker instance
- was created). See SQLAlchemy Session docs for details.
-
- """
-
- if use_slave and self._slave_session_maker:
- return self._slave_session_maker(**kwargs)
-
- return self._session_maker(**kwargs)
-
- @classmethod
- def from_config(cls, conf,
- sqlite_fk=False, autocommit=True, expire_on_commit=False):
- """Initialize EngineFacade using oslo.config config instance options.
-
- :param conf: oslo.config config instance
- :type conf: oslo.config.cfg.ConfigOpts
-
- :param sqlite_fk: enable foreign keys in SQLite
- :type sqlite_fk: bool
-
- :param autocommit: use autocommit mode for created Session instances
- :type autocommit: bool
-
- :param expire_on_commit: expire session objects on commit
- :type expire_on_commit: bool
-
- """
-
- conf.register_opts(options.database_opts, 'database')
-
- return cls(sql_connection=conf.database.connection,
- slave_connection=conf.database.slave_connection,
- sqlite_fk=sqlite_fk,
- autocommit=autocommit,
- expire_on_commit=expire_on_commit,
- mysql_sql_mode=conf.database.mysql_sql_mode,
- idle_timeout=conf.database.idle_timeout,
- connection_debug=conf.database.connection_debug,
- max_pool_size=conf.database.max_pool_size,
- max_overflow=conf.database.max_overflow,
- pool_timeout=conf.database.pool_timeout,
- sqlite_synchronous=conf.database.sqlite_synchronous,
- connection_trace=conf.database.connection_trace,
- max_retries=conf.database.max_retries,
- retry_interval=conf.database.retry_interval)
+__all__ = ["EngineFacade", "create_engine", "get_maker", "Query", "Session"]