summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2020-01-30 15:05:12 -0800
committerGitHub <noreply@github.com>2020-01-30 15:05:12 -0800
commit4287963d1e51e0bd9b1b78d78981d67e2c0213db (patch)
tree1ff6263847763d71554d6e68e2d466995689b177
parent09a17eaca6b0972a6446e3132e8024099d271f24 (diff)
downloadredis-py-4287963d1e51e0bd9b1b78d78981d67e2c0213db.tar.gz
better thread-safety for ConnectionPool (#1270)
Better thread and fork safety for ConnectionPool and BlockingConnectionPool
-rw-r--r--CHANGES8
-rw-r--r--redis/__init__.py26
-rwxr-xr-xredis/connection.py163
-rw-r--r--redis/exceptions.py5
-rw-r--r--tests/test_multiprocessing.py2
5 files changed, 155 insertions, 49 deletions
diff --git a/CHANGES b/CHANGES
index c200eea..9b69339 100644
--- a/CHANGES
+++ b/CHANGES
@@ -20,6 +20,14 @@
for backwards compatibility. #1196
* Slightly optimized command packing. Thanks @Deneby67. #1255
* Added support for the TYPE argument to SCAN. Thanks @netocp. #1220
+ * Better thread and fork safety in ConnectionPool and
+ BlockingConnectionPool. Added better locking to synchronize critical
+ sections rather than relying on CPython-specific implementation details
+ relating to atomic operations. Adjusted how the pools identify and
+ deal with a fork. Added a ChildDeadlockedError exception that is
+ raised by child processes in the very unlikely chance that a deadlock
+ is encountered. Thanks @gmbnomis, @mdellweg, @yht804421715. #1270,
+ #1138, #1178, #906, #1262
* 3.3.11
* Further fix for the SSLError -> TimeoutError mapping to work
on obscure releases of Python 2.7.
diff --git a/redis/__init__.py b/redis/__init__.py
index 5539ce0..afc59e9 100644
--- a/redis/__init__.py
+++ b/redis/__init__.py
@@ -10,6 +10,7 @@ from redis.utils import from_url
from redis.exceptions import (
AuthenticationError,
BusyLoadingError,
+ ChildDeadlockedError,
ConnectionError,
DataError,
InvalidResponse,
@@ -33,9 +34,24 @@ __version__ = '3.3.11'
VERSION = tuple(map(int_or_str, __version__.split('.')))
__all__ = [
- 'Redis', 'StrictRedis', 'ConnectionPool', 'BlockingConnectionPool',
- 'Connection', 'SSLConnection', 'UnixDomainSocketConnection', 'from_url',
- 'AuthenticationError', 'BusyLoadingError', 'ConnectionError', 'DataError',
- 'InvalidResponse', 'PubSubError', 'ReadOnlyError', 'RedisError',
- 'ResponseError', 'TimeoutError', 'WatchError'
+ 'AuthenticationError',
+ 'BlockingConnectionPool',
+ 'BusyLoadingError',
+ 'ChildDeadlockedError',
+ 'Connection',
+ 'ConnectionError',
+ 'ConnectionPool',
+ 'DataError',
+ 'from_url',
+ 'InvalidResponse',
+ 'PubSubError',
+ 'ReadOnlyError',
+ 'Redis',
+ 'RedisError',
+ 'ResponseError',
+ 'SSLConnection',
+ 'StrictRedis',
+ 'TimeoutError',
+ 'UnixDomainSocketConnection',
+ 'WatchError',
]
diff --git a/redis/connection.py b/redis/connection.py
index b27bf21..7efb0a2 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -18,6 +18,7 @@ from redis._compat import (xrange, imap, byte_to_chr, unicode, long,
from redis.exceptions import (
AuthenticationError,
BusyLoadingError,
+ ChildDeadlockedError,
ConnectionError,
DataError,
ExecAbortError,
@@ -1069,6 +1070,15 @@ class ConnectionPool(object):
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections
+ # a lock to protect the critical section in _checkpid().
+ # this lock is acquired when the process id changes, such as
+ # after a fork. during this time, multiple threads in the child
+ # process could attempt to acquire this lock. the first thread
+ # to acquire the lock will reset the data structures and lock
+ # object of this pool. subsequent threads acquiring this lock
+ # will notice the first thread already did the work and simply
+ # release the lock.
+ self._fork_lock = threading.Lock()
self.reset()
def __repr__(self):
@@ -1084,50 +1094,107 @@ class ConnectionPool(object):
)
def reset(self):
- self.pid = os.getpid()
+ self._lock = threading.RLock()
self._created_connections = 0
self._available_connections = []
self._in_use_connections = set()
- self._check_lock = threading.Lock()
+
+ # this must be the last operation in this method. while reset() is
+ # called when holding _fork_lock, other threads in this process
+ # can call _checkpid() which compares self.pid and os.getpid() without
+ # holding any lock (for performance reasons). keeping this assignment
+ # as the last operation ensures that those other threads will also
+ # notice a pid difference and block waiting for the first thread to
+ # release _fork_lock. when each of these threads eventually acquire
+ # _fork_lock, they will notice that another thread already called
+ # reset() and they will immediately release _fork_lock and continue on.
+ self.pid = os.getpid()
def _checkpid(self):
+ # _checkpid() attempts to keep ConnectionPool fork-safe on modern
+ # systems. this is called by all ConnectionPool methods that
+ # manipulate the pool's state such as get_connection() and release().
+ #
+ # _checkpid() determines whether the process has forked by comparing
+ # the current process id to the process id saved on the ConnectionPool
+ # instance. if these values are the same, _checkpid() simply returns.
+ #
+ # when the process ids differ, _checkpid() assumes that the process
+ # has forked and that we're now running in the child process. the child
+ # process cannot use the parent's file descriptors (e.g., sockets).
+ # therefore, when _checkpid() sees the process id change, it calls
+ # reset() in order to reinitialize the child's ConnectionPool. this
+ # will cause the child to make all new connection objects.
+ #
+ # _checkpid() is protected by self._fork_lock to ensure that multiple
+ # threads in the child process do not call reset() multiple times.
+ #
+ # there is an extremely small chance this could fail in the following
+ # scenario:
+ # 1. process A calls _checkpid() for the first time and acquires
+ # self._fork_lock.
+ # 2. while holding self._fork_lock, process A forks (the fork()
+ # could happen in a different thread owned by process A)
+ # 3. process B (the forked child process) inherits the
+ # ConnectionPool's state from the parent. that state includes
+ # a locked _fork_lock. process B will not be notified when
+ # process A releases the _fork_lock and will thus never be
+ # able to acquire the _fork_lock.
+ #
+ # to mitigate this possible deadlock, _checkpid() will only wait 5
+ # seconds to acquire _fork_lock. if _fork_lock cannot be acquired in
+ # that time it is assumed that the child is deadlocked and a
+ # redis.ChildDeadlockedError error is raised.
if self.pid != os.getpid():
- with self._check_lock:
- if self.pid == os.getpid():
- # another thread already did the work while we waited
- # on the lock.
- return
- self.reset()
+ # python 2.7 doesn't support a timeout option to lock.acquire()
+ # we have to mimic lock timeouts ourselves.
+ timeout_at = time() + 5
+ acquired = False
+ while time() < timeout_at:
+ acquired = self._fork_lock.acquire(False)
+ if acquired:
+ break
+ if not acquired:
+ raise ChildDeadlockedError
+ # reset() the instance for the new process if another thread
+ # hasn't already done so
+ try:
+ if self.pid != os.getpid():
+ self.reset()
+ finally:
+ self._fork_lock.release()
def get_connection(self, command_name, *keys, **options):
"Get a connection from the pool"
self._checkpid()
- try:
- connection = self._available_connections.pop()
- except IndexError:
- connection = self.make_connection()
- self._in_use_connections.add(connection)
- try:
- # ensure this connection is connected to Redis
- connection.connect()
- # connections that the pool provides should be ready to send
- # a command. if not, the connection was either returned to the
- # pool before all data has been read or the socket has been
- # closed. either way, reconnect and verify everything is good.
+ with self._lock:
try:
- if connection.can_read():
- raise ConnectionError('Connection has data')
- except ConnectionError:
- connection.disconnect()
+ connection = self._available_connections.pop()
+ except IndexError:
+ connection = self.make_connection()
+ self._in_use_connections.add(connection)
+ try:
+ # ensure this connection is connected to Redis
connection.connect()
- if connection.can_read():
- raise ConnectionError('Connection not ready')
- except: # noqa: E722
- # release the connection back to the pool so that we don't leak it
- self.release(connection)
- raise
-
- return connection
+ # connections that the pool provides should be ready to send
+ # a command. if not, the connection was either returned to the
+ # pool before all data has been read or the socket has been
+ # closed. either way, reconnect and verify everything is good.
+ try:
+ if connection.can_read():
+ raise ConnectionError('Connection has data')
+ except ConnectionError:
+ connection.disconnect()
+ connection.connect()
+ if connection.can_read():
+ raise ConnectionError('Connection not ready')
+ except: # noqa: E722
+ # release the connection back to the pool so that we don't
+ # leak it
+ self.release(connection)
+ raise
+
+ return connection
def get_encoder(self):
"Return an encoder based on encoding settings"
@@ -1148,18 +1215,20 @@ class ConnectionPool(object):
def release(self, connection):
"Releases the connection back to the pool"
self._checkpid()
- if connection.pid != self.pid:
- return
- self._in_use_connections.remove(connection)
- self._available_connections.append(connection)
+ with self._lock:
+ if connection.pid != self.pid:
+ return
+ self._in_use_connections.remove(connection)
+ self._available_connections.append(connection)
def disconnect(self):
"Disconnects all connections in the pool"
self._checkpid()
- all_conns = chain(self._available_connections,
- self._in_use_connections)
- for connection in all_conns:
- connection.disconnect()
+ with self._lock:
+ all_conns = chain(self._available_connections,
+ self._in_use_connections)
+ for connection in all_conns:
+ connection.disconnect()
class BlockingConnectionPool(ConnectionPool):
@@ -1207,9 +1276,6 @@ class BlockingConnectionPool(ConnectionPool):
**connection_kwargs)
def reset(self):
- self.pid = os.getpid()
- self._check_lock = threading.Lock()
-
# Create and fill up a thread safe queue with ``None`` values.
self.pool = self.queue_class(self.max_connections)
while True:
@@ -1222,6 +1288,17 @@ class BlockingConnectionPool(ConnectionPool):
# disconnect them later.
self._connections = []
+ # this must be the last operation in this method. while reset() is
+ # called when holding _fork_lock, other threads in this process
+ # can call _checkpid() which compares self.pid and os.getpid() without
+ # holding any lock (for performance reasons). keeping this assignment
+ # as the last operation ensures that those other threads will also
+ # notice a pid difference and block waiting for the first thread to
+ # release _fork_lock. when each of these threads eventually acquire
+ # _fork_lock, they will notice that another thread already called
+ # reset() and they will immediately release _fork_lock and continue on.
+ self.pid = os.getpid()
+
def make_connection(self):
"Make a fresh connection."
connection = self.connection_class(**self.connection_kwargs)
diff --git a/redis/exceptions.py b/redis/exceptions.py
index 9a1852a..6f47024 100644
--- a/redis/exceptions.py
+++ b/redis/exceptions.py
@@ -67,3 +67,8 @@ class LockError(RedisError, ValueError):
class LockNotOwnedError(LockError):
"Error trying to extend or release a lock that is (no longer) owned"
pass
+
+
+class ChildDeadlockedError(Exception):
+ "Error indicating that a child process is deadlocked after a fork()"
+ pass
diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py
index 018914c..3f81606 100644
--- a/tests/test_multiprocessing.py
+++ b/tests/test_multiprocessing.py
@@ -22,7 +22,7 @@ class TestMultiprocessing(object):
# See issue #1085 for details.
# use a multi-connection client as that's the only type that is
- # actuall fork/process-safe
+ # actually fork/process-safe
@pytest.fixture()
def r(self, request):
return _get_client(