summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2020-05-20 14:27:21 -0700
committerAndy McCurdy <andy@andymccurdy.com>2020-06-01 13:22:44 -0700
commite06af2a0f648d4241930479884533bc53cc9fdfb (patch)
tree47d666006546c6c296eaacf63f90282800717bb0
parent7973bd94f9cd6058e7d0dc60b6bc19cdb9faf06b (diff)
downloadredis-py-e06af2a0f648d4241930479884533bc53cc9fdfb.tar.gz
SentinelConnectionPool plays better with threaded applications.
Prevent the pool from closing sockets on connections that are actively in use by other threads when the master address changes. Connections returned to the pool that are still connected to the old master will be disconnected gracefully. Fixes #1345
-rw-r--r--CHANGES2
-rwxr-xr-xredis/connection.py49
-rw-r--r--redis/sentinel.py16
3 files changed, 53 insertions, 14 deletions
diff --git a/CHANGES b/CHANGES
index 21047ab..98f3f73 100644
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,8 @@
* Restore try/except clauses to __del__ methods. These will be removed
in 4.0 when more explicit resource management if enforced. #1339
* Update the master_address when Sentinels promote a new master. #847
+ * Update SentinelConnectionPool to not forcefully disconnect other in-use
+ connections which can negatively affect threaded applications. #1345
* 3.5.2 (May 14, 2020)
* Tune the locking in ConnectionPool.get_connection so that the lock is
not held while waiting for the socket to establish and validate the
diff --git a/redis/connection.py b/redis/connection.py
index 5c8fd3c..e3c9b66 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -1230,18 +1230,43 @@ class ConnectionPool(object):
"Releases the connection back to the pool"
self._checkpid()
with self._lock:
- if connection.pid != self.pid:
+ try:
+ self._in_use_connections.remove(connection)
+ except KeyError:
+ # Gracefully fail when a connection is returned to this pool
+ # that the pool doesn't actually own
+ pass
+
+ if self.owns_connection(connection):
+ self._available_connections.append(connection)
+ else:
+ # pool doesn't own this connection. do not add it back
+ # to the pool and decrement the count so that another
+ # connection can take its place if needed
+ self._created_connections -= 1
+ connection.disconnect()
return
- self._in_use_connections.remove(connection)
- self._available_connections.append(connection)
- def disconnect(self):
- "Disconnects all connections in the pool"
+ def owns_connection(self, connection):
+ return connection.pid == self.pid
+
+ def disconnect(self, inuse_connections=True):
+ """
+ Disconnects connections in the pool
+
+ If ``inuse_connections`` is True, disconnect connections that are
+ current in use, potentially by other threads. Otherwise only disconnect
+ connections that are idle in the pool.
+ """
self._checkpid()
with self._lock:
- all_conns = chain(self._available_connections,
- self._in_use_connections)
- for connection in all_conns:
+ if inuse_connections:
+ connections = chain(self._available_connections,
+ self._in_use_connections)
+ else:
+ connections = self._available_connections
+
+ for connection in connections:
connection.disconnect()
@@ -1375,7 +1400,13 @@ class BlockingConnectionPool(ConnectionPool):
"Releases the connection back to the pool."
# Make sure we haven't changed process.
self._checkpid()
- if connection.pid != self.pid:
+ if not self.owns_connection(connection):
+ # pool doesn't own this connection. do not add it back
+ # to the pool. instead add a None value which is a placeholder
+ # that will cause the pool to recreate the connection if
+ # its needed.
+ connection.disconnect()
+ self.pool.put_nowait(None)
return
# Put the connection back into the pool.
diff --git a/redis/sentinel.py b/redis/sentinel.py
index 3857c88..2b212ea 100644
--- a/redis/sentinel.py
+++ b/redis/sentinel.py
@@ -95,16 +95,22 @@ class SentinelConnectionPool(ConnectionPool):
self.master_address = None
self.slave_rr_counter = None
+ def owns_connection(self, connection):
+ check = not self.is_master or \
+ (self.is_master and
+ self.master_address == (connection.host, connection.port))
+ parent = super(SentinelConnectionPool, self)
+ return check and parent.owns_connection(connection)
+
def get_master_address(self):
master_address = self.sentinel_manager.discover_master(
self.service_name)
if self.is_master:
- if self.master_address is None:
- self.master_address = master_address
- elif master_address != self.master_address:
- # Master address changed, disconnect all clients in this pool
+ if self.master_address != master_address:
self.master_address = master_address
- self.disconnect()
+ # disconnect any idle connections so that they reconnect
+ # to the new master the next time that they are used.
+ self.disconnect(inuse_connections=False)
return master_address
def rotate_slaves(self):