diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2015-02-17 14:28:50 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2015-03-12 12:45:31 -0400 |
commit | ebbf23d6490159f0c8f4f35a08fb4b80371cd080 (patch) | |
tree | 0d354c120bb7c7378fb694c7ffe454cac308086e /oslo_db | |
parent | ab20754db71e55b79b9e71e36ad86d9befc89a92 (diff) | |
download | oslo-db-ebbf23d6490159f0c8f4f35a08fb4b80371cd080.tar.gz |
Add process guards + invalidate to the connection pool
It's not safe for a database TCP connection to be
shared to a child process, as this is a file descriptor
which will maintain its state on both sides. Applications
such as Cinder which spin up multiprocessing subprocesses
at startup time are subject to race conditions as a result.
Instead of requiring that engines be explicitly prepared
within a child process, we can detect and accommodate
this situation in the connection pool itself, by tracking
the originating pid of a connection, and if it changes
on checkout, by invalidating.
Change-Id: If116f7b7140b3eba064d8147e5f637a05beb1cd8
Diffstat (limited to 'oslo_db')
-rw-r--r-- | oslo_db/sqlalchemy/session.py | 33 | ||||
-rw-r--r-- | oslo_db/tests/sqlalchemy/test_sqlalchemy.py | 28 |
2 files changed, 61 insertions, 0 deletions
diff --git a/oslo_db/sqlalchemy/session.py b/oslo_db/sqlalchemy/session.py index 7e33075..6a0355d 100644 --- a/oslo_db/sqlalchemy/session.py +++ b/oslo_db/sqlalchemy/session.py @@ -280,11 +280,13 @@ Efficient use of soft deletes: import itertools import logging +import os import re import time from oslo_utils import timeutils import six +from sqlalchemy import exc import sqlalchemy.orm from sqlalchemy import pool from sqlalchemy.sql.expression import literal_column @@ -476,6 +478,8 @@ def _init_connection_args(url, engine_args, **kw): def _init_events(engine, thread_checkin=True, connection_trace=False, **kw): """Set up event listeners for all database backends.""" + _add_process_guards(engine) + if connection_trace: _add_trace_comments(engine) @@ -609,6 +613,35 @@ def get_maker(engine, autocommit=True, expire_on_commit=False): query_cls=Query) +def _add_process_guards(engine): + """Add multiprocessing guards. + + Forces a connection to be reconnected if it is detected + as having been shared to a sub-process. + + """ + + @sqlalchemy.event.listens_for(engine, "connect") + def connect(dbapi_connection, connection_record): + connection_record.info['pid'] = os.getpid() + + @sqlalchemy.event.listens_for(engine, "checkout") + def checkout(dbapi_connection, connection_record, connection_proxy): + pid = os.getpid() + if connection_record.info['pid'] != pid: + LOG.debug(_LW( + "Parent process %(orig)s forked (%(newproc)s) with an open " + "database connection, " + "which is being discarded and recreated."), + {"newproc": pid, "orig": connection_record.info['pid']}) + connection_record.connection = connection_proxy.connection = None + raise exc.DisconnectionError( + "Connection record belongs to pid %s, " + "attempting to check out in pid %s" % + (connection_record.info['pid'], pid) + ) + + def _add_trace_comments(engine): """Add trace comments. diff --git a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py index 24aeb22..433affe 100644 --- a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py +++ b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py @@ -648,6 +648,34 @@ class CreateEngineTest(oslo_test.BaseTestCase): ) +class ProcessGuardTest(test_base.DbTestCase): + def test_process_guard(self): + self.engine.dispose() + + def get_parent_pid(): + return 4 + + def get_child_pid(): + return 5 + + with mock.patch("os.getpid", get_parent_pid): + with self.engine.connect() as conn: + dbapi_id = id(conn.connection.connection) + + with mock.patch("os.getpid", get_child_pid): + with self.engine.connect() as conn: + new_dbapi_id = id(conn.connection.connection) + + self.assertNotEqual(dbapi_id, new_dbapi_id) + + # ensure it doesn't trip again + with mock.patch("os.getpid", get_child_pid): + with self.engine.connect() as conn: + newer_dbapi_id = id(conn.connection.connection) + + self.assertEqual(new_dbapi_id, newer_dbapi_id) + + class PatchStacktraceTest(test_base.DbTestCase): def test_trace(self): |