diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2020-01-30 15:05:12 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-30 15:05:12 -0800 |
commit | 4287963d1e51e0bd9b1b78d78981d67e2c0213db (patch) | |
tree | 1ff6263847763d71554d6e68e2d466995689b177 | |
parent | 09a17eaca6b0972a6446e3132e8024099d271f24 (diff) | |
download | redis-py-4287963d1e51e0bd9b1b78d78981d67e2c0213db.tar.gz |
better thread-safety for ConnectionPool (#1270)
Better thread and fork safety for ConnectionPool and BlockingConnectionPool
-rw-r--r-- | CHANGES | 8 | ||||
-rw-r--r-- | redis/__init__.py | 26 | ||||
-rwxr-xr-x | redis/connection.py | 163 | ||||
-rw-r--r-- | redis/exceptions.py | 5 | ||||
-rw-r--r-- | tests/test_multiprocessing.py | 2 |
5 files changed, 155 insertions, 49 deletions
@@ -20,6 +20,14 @@ for backwards compatibility. #1196 * Slightly optimized command packing. Thanks @Deneby67. #1255 * Added support for the TYPE argument to SCAN. Thanks @netocp. #1220 + * Better thread and fork safety in ConnectionPool and + BlockingConnectionPool. Added better locking to synchronize critical + sections rather than relying on CPython-specific implementation details + relating to atomic operations. Adjusted how the pools identify and + deal with a fork. Added a ChildDeadlockedError exception that is + raised by child processes in the very unlikely chance that a deadlock + is encountered. Thanks @gmbnomis, @mdellweg, @yht804421715. #1270, + #1138, #1178, #906, #1262 * 3.3.11 * Further fix for the SSLError -> TimeoutError mapping to work on obscure releases of Python 2.7. diff --git a/redis/__init__.py b/redis/__init__.py index 5539ce0..afc59e9 100644 --- a/redis/__init__.py +++ b/redis/__init__.py @@ -10,6 +10,7 @@ from redis.utils import from_url from redis.exceptions import ( AuthenticationError, BusyLoadingError, + ChildDeadlockedError, ConnectionError, DataError, InvalidResponse, @@ -33,9 +34,24 @@ __version__ = '3.3.11' VERSION = tuple(map(int_or_str, __version__.split('.'))) __all__ = [ - 'Redis', 'StrictRedis', 'ConnectionPool', 'BlockingConnectionPool', - 'Connection', 'SSLConnection', 'UnixDomainSocketConnection', 'from_url', - 'AuthenticationError', 'BusyLoadingError', 'ConnectionError', 'DataError', - 'InvalidResponse', 'PubSubError', 'ReadOnlyError', 'RedisError', - 'ResponseError', 'TimeoutError', 'WatchError' + 'AuthenticationError', + 'BlockingConnectionPool', + 'BusyLoadingError', + 'ChildDeadlockedError', + 'Connection', + 'ConnectionError', + 'ConnectionPool', + 'DataError', + 'from_url', + 'InvalidResponse', + 'PubSubError', + 'ReadOnlyError', + 'Redis', + 'RedisError', + 'ResponseError', + 'SSLConnection', + 'StrictRedis', + 'TimeoutError', + 'UnixDomainSocketConnection', + 'WatchError', ] diff --git a/redis/connection.py b/redis/connection.py index b27bf21..7efb0a2 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -18,6 +18,7 @@ from redis._compat import (xrange, imap, byte_to_chr, unicode, long, from redis.exceptions import ( AuthenticationError, BusyLoadingError, + ChildDeadlockedError, ConnectionError, DataError, ExecAbortError, @@ -1069,6 +1070,15 @@ class ConnectionPool(object): self.connection_kwargs = connection_kwargs self.max_connections = max_connections + # a lock to protect the critical section in _checkpid(). + # this lock is acquired when the process id changes, such as + # after a fork. during this time, multiple threads in the child + # process could attempt to acquire this lock. the first thread + # to acquire the lock will reset the data structures and lock + # object of this pool. subsequent threads acquiring this lock + # will notice the first thread already did the work and simply + # release the lock. + self._fork_lock = threading.Lock() self.reset() def __repr__(self): @@ -1084,50 +1094,107 @@ class ConnectionPool(object): ) def reset(self): - self.pid = os.getpid() + self._lock = threading.RLock() self._created_connections = 0 self._available_connections = [] self._in_use_connections = set() - self._check_lock = threading.Lock() + + # this must be the last operation in this method. while reset() is + # called when holding _fork_lock, other threads in this process + # can call _checkpid() which compares self.pid and os.getpid() without + # holding any lock (for performance reasons). keeping this assignment + # as the last operation ensures that those other threads will also + # notice a pid difference and block waiting for the first thread to + # release _fork_lock. when each of these threads eventually acquire + # _fork_lock, they will notice that another thread already called + # reset() and they will immediately release _fork_lock and continue on. + self.pid = os.getpid() def _checkpid(self): + # _checkpid() attempts to keep ConnectionPool fork-safe on modern + # systems. this is called by all ConnectionPool methods that + # manipulate the pool's state such as get_connection() and release(). + # + # _checkpid() determines whether the process has forked by comparing + # the current process id to the process id saved on the ConnectionPool + # instance. if these values are the same, _checkpid() simply returns. + # + # when the process ids differ, _checkpid() assumes that the process + # has forked and that we're now running in the child process. the child + # process cannot use the parent's file descriptors (e.g., sockets). + # therefore, when _checkpid() sees the process id change, it calls + # reset() in order to reinitialize the child's ConnectionPool. this + # will cause the child to make all new connection objects. + # + # _checkpid() is protected by self._fork_lock to ensure that multiple + # threads in the child process do not call reset() multiple times. + # + # there is an extremely small chance this could fail in the following + # scenario: + # 1. process A calls _checkpid() for the first time and acquires + # self._fork_lock. + # 2. while holding self._fork_lock, process A forks (the fork() + # could happen in a different thread owned by process A) + # 3. process B (the forked child process) inherits the + # ConnectionPool's state from the parent. that state includes + # a locked _fork_lock. process B will not be notified when + # process A releases the _fork_lock and will thus never be + # able to acquire the _fork_lock. + # + # to mitigate this possible deadlock, _checkpid() will only wait 5 + # seconds to acquire _fork_lock. if _fork_lock cannot be acquired in + # that time it is assumed that the child is deadlocked and a + # redis.ChildDeadlockedError error is raised. if self.pid != os.getpid(): - with self._check_lock: - if self.pid == os.getpid(): - # another thread already did the work while we waited - # on the lock. - return - self.reset() + # python 2.7 doesn't support a timeout option to lock.acquire() + # we have to mimic lock timeouts ourselves. + timeout_at = time() + 5 + acquired = False + while time() < timeout_at: + acquired = self._fork_lock.acquire(False) + if acquired: + break + if not acquired: + raise ChildDeadlockedError + # reset() the instance for the new process if another thread + # hasn't already done so + try: + if self.pid != os.getpid(): + self.reset() + finally: + self._fork_lock.release() def get_connection(self, command_name, *keys, **options): "Get a connection from the pool" self._checkpid() - try: - connection = self._available_connections.pop() - except IndexError: - connection = self.make_connection() - self._in_use_connections.add(connection) - try: - # ensure this connection is connected to Redis - connection.connect() - # connections that the pool provides should be ready to send - # a command. if not, the connection was either returned to the - # pool before all data has been read or the socket has been - # closed. either way, reconnect and verify everything is good. + with self._lock: try: - if connection.can_read(): - raise ConnectionError('Connection has data') - except ConnectionError: - connection.disconnect() + connection = self._available_connections.pop() + except IndexError: + connection = self.make_connection() + self._in_use_connections.add(connection) + try: + # ensure this connection is connected to Redis connection.connect() - if connection.can_read(): - raise ConnectionError('Connection not ready') - except: # noqa: E722 - # release the connection back to the pool so that we don't leak it - self.release(connection) - raise - - return connection + # connections that the pool provides should be ready to send + # a command. if not, the connection was either returned to the + # pool before all data has been read or the socket has been + # closed. either way, reconnect and verify everything is good. + try: + if connection.can_read(): + raise ConnectionError('Connection has data') + except ConnectionError: + connection.disconnect() + connection.connect() + if connection.can_read(): + raise ConnectionError('Connection not ready') + except: # noqa: E722 + # release the connection back to the pool so that we don't + # leak it + self.release(connection) + raise + + return connection def get_encoder(self): "Return an encoder based on encoding settings" @@ -1148,18 +1215,20 @@ class ConnectionPool(object): def release(self, connection): "Releases the connection back to the pool" self._checkpid() - if connection.pid != self.pid: - return - self._in_use_connections.remove(connection) - self._available_connections.append(connection) + with self._lock: + if connection.pid != self.pid: + return + self._in_use_connections.remove(connection) + self._available_connections.append(connection) def disconnect(self): "Disconnects all connections in the pool" self._checkpid() - all_conns = chain(self._available_connections, - self._in_use_connections) - for connection in all_conns: - connection.disconnect() + with self._lock: + all_conns = chain(self._available_connections, + self._in_use_connections) + for connection in all_conns: + connection.disconnect() class BlockingConnectionPool(ConnectionPool): @@ -1207,9 +1276,6 @@ class BlockingConnectionPool(ConnectionPool): **connection_kwargs) def reset(self): - self.pid = os.getpid() - self._check_lock = threading.Lock() - # Create and fill up a thread safe queue with ``None`` values. self.pool = self.queue_class(self.max_connections) while True: @@ -1222,6 +1288,17 @@ class BlockingConnectionPool(ConnectionPool): # disconnect them later. self._connections = [] + # this must be the last operation in this method. while reset() is + # called when holding _fork_lock, other threads in this process + # can call _checkpid() which compares self.pid and os.getpid() without + # holding any lock (for performance reasons). keeping this assignment + # as the last operation ensures that those other threads will also + # notice a pid difference and block waiting for the first thread to + # release _fork_lock. when each of these threads eventually acquire + # _fork_lock, they will notice that another thread already called + # reset() and they will immediately release _fork_lock and continue on. + self.pid = os.getpid() + def make_connection(self): "Make a fresh connection." connection = self.connection_class(**self.connection_kwargs) diff --git a/redis/exceptions.py b/redis/exceptions.py index 9a1852a..6f47024 100644 --- a/redis/exceptions.py +++ b/redis/exceptions.py @@ -67,3 +67,8 @@ class LockError(RedisError, ValueError): class LockNotOwnedError(LockError): "Error trying to extend or release a lock that is (no longer) owned" pass + + +class ChildDeadlockedError(Exception): + "Error indicating that a child process is deadlocked after a fork()" + pass diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py index 018914c..3f81606 100644 --- a/tests/test_multiprocessing.py +++ b/tests/test_multiprocessing.py @@ -22,7 +22,7 @@ class TestMultiprocessing(object): # See issue #1085 for details. # use a multi-connection client as that's the only type that is - # actuall fork/process-safe + # actually fork/process-safe @pytest.fixture() def r(self, request): return _get_client( |