summaryrefslogtreecommitdiff
path: root/oslo_db
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2015-02-17 14:28:50 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2015-03-12 12:45:31 -0400
commitebbf23d6490159f0c8f4f35a08fb4b80371cd080 (patch)
tree0d354c120bb7c7378fb694c7ffe454cac308086e /oslo_db
parentab20754db71e55b79b9e71e36ad86d9befc89a92 (diff)
downloadoslo-db-ebbf23d6490159f0c8f4f35a08fb4b80371cd080.tar.gz
Add process guards + invalidate to the connection pool
It's not safe for a database TCP connection to be shared to a child process, as this is a file descriptor which will maintain its state on both sides. Applications such as Cinder which spin up multiprocessing subprocesses at startup time are subject to race conditions as a result. Instead of requiring that engines be explicitly prepared within a child process, we can detect and accommodate this situation in the connection pool itself, by tracking the originating pid of a connection, and if it changes on checkout, by invalidating. Change-Id: If116f7b7140b3eba064d8147e5f637a05beb1cd8
Diffstat (limited to 'oslo_db')
-rw-r--r--oslo_db/sqlalchemy/session.py33
-rw-r--r--oslo_db/tests/sqlalchemy/test_sqlalchemy.py28
2 files changed, 61 insertions, 0 deletions
diff --git a/oslo_db/sqlalchemy/session.py b/oslo_db/sqlalchemy/session.py
index 7e33075..6a0355d 100644
--- a/oslo_db/sqlalchemy/session.py
+++ b/oslo_db/sqlalchemy/session.py
@@ -280,11 +280,13 @@ Efficient use of soft deletes:
import itertools
import logging
+import os
import re
import time
from oslo_utils import timeutils
import six
+from sqlalchemy import exc
import sqlalchemy.orm
from sqlalchemy import pool
from sqlalchemy.sql.expression import literal_column
@@ -476,6 +478,8 @@ def _init_connection_args(url, engine_args, **kw):
def _init_events(engine, thread_checkin=True, connection_trace=False, **kw):
"""Set up event listeners for all database backends."""
+ _add_process_guards(engine)
+
if connection_trace:
_add_trace_comments(engine)
@@ -609,6 +613,35 @@ def get_maker(engine, autocommit=True, expire_on_commit=False):
query_cls=Query)
+def _add_process_guards(engine):
+ """Add multiprocessing guards.
+
+ Forces a connection to be reconnected if it is detected
+ as having been shared to a sub-process.
+
+ """
+
+ @sqlalchemy.event.listens_for(engine, "connect")
+ def connect(dbapi_connection, connection_record):
+ connection_record.info['pid'] = os.getpid()
+
+ @sqlalchemy.event.listens_for(engine, "checkout")
+ def checkout(dbapi_connection, connection_record, connection_proxy):
+ pid = os.getpid()
+ if connection_record.info['pid'] != pid:
+ LOG.debug(_LW(
+ "Parent process %(orig)s forked (%(newproc)s) with an open "
+ "database connection, "
+ "which is being discarded and recreated."),
+ {"newproc": pid, "orig": connection_record.info['pid']})
+ connection_record.connection = connection_proxy.connection = None
+ raise exc.DisconnectionError(
+ "Connection record belongs to pid %s, "
+ "attempting to check out in pid %s" %
+ (connection_record.info['pid'], pid)
+ )
+
+
def _add_trace_comments(engine):
"""Add trace comments.
diff --git a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py
index 24aeb22..433affe 100644
--- a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py
+++ b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py
@@ -648,6 +648,34 @@ class CreateEngineTest(oslo_test.BaseTestCase):
)
+class ProcessGuardTest(test_base.DbTestCase):
+ def test_process_guard(self):
+ self.engine.dispose()
+
+ def get_parent_pid():
+ return 4
+
+ def get_child_pid():
+ return 5
+
+ with mock.patch("os.getpid", get_parent_pid):
+ with self.engine.connect() as conn:
+ dbapi_id = id(conn.connection.connection)
+
+ with mock.patch("os.getpid", get_child_pid):
+ with self.engine.connect() as conn:
+ new_dbapi_id = id(conn.connection.connection)
+
+ self.assertNotEqual(dbapi_id, new_dbapi_id)
+
+ # ensure it doesn't trip again
+ with mock.patch("os.getpid", get_child_pid):
+ with self.engine.connect() as conn:
+ newer_dbapi_id = id(conn.connection.connection)
+
+ self.assertEqual(new_dbapi_id, newer_dbapi_id)
+
+
class PatchStacktraceTest(test_base.DbTestCase):
def test_trace(self):