summaryrefslogtreecommitdiff
path: root/redis/cluster.py
diff options
context:
space:
mode:
authorBar Shaul <88437685+barshaul@users.noreply.github.com>2022-06-23 14:34:08 +0300
committerGitHub <noreply@github.com>2022-06-23 14:34:08 +0300
commit6da80865be650344964e9497a9edcb68ff102ee8 (patch)
tree9fa3159f0575df6f854d272307b2f1e311212cf7 /redis/cluster.py
parent23fd3273ba4dbee35585f53208dc044112dd391f (diff)
downloadredis-py-6da80865be650344964e9497a9edcb68ff102ee8.tar.gz
Reuse the old nodes' connections when a cluster topology refresh is being done (#2235)
* A fix was made to reuse the old nodes' connections when a cluster topology refresh is being done * Fixed RedisCluster to immediately raise AuthenticationError * Updated CHANGES * Fixed cluster async bgsave test to ignore "bgsave already in progress" error * Fixed linters
Diffstat (limited to 'redis/cluster.py')
-rw-r--r--redis/cluster.py37
1 files changed, 26 insertions, 11 deletions
diff --git a/redis/cluster.py b/redis/cluster.py
index 8e4c654..1737ec7 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -14,6 +14,7 @@ from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
AskError,
+ AuthenticationError,
BusyLoadingError,
ClusterCrossSlotError,
ClusterDownError,
@@ -1113,7 +1114,7 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
)
return response
- except (RedisClusterException, BusyLoadingError) as e:
+ except (RedisClusterException, BusyLoadingError, AuthenticationError) as e:
log.exception(type(e))
raise
except (ConnectionError, TimeoutError) as e:
@@ -1134,6 +1135,7 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
else:
# Hard force of reinitialize of the node/slots setup
# and try again with the new setup
+ target_node.redis_connection = None
self.nodes_manager.initialize()
raise
except MovedError as e:
@@ -1443,6 +1445,21 @@ class NodesManager:
r = Redis(host=host, port=port, **kwargs)
return r
+ def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
+ node_name = get_node_name(host, port)
+ # check if we already have this node in the tmp_nodes_cache
+ target_node = tmp_nodes_cache.get(node_name)
+ if target_node is None:
+ # before creating a new cluster node, check if the cluster node already
+ # exists in the current nodes cache and has a valid connection so we can
+ # reuse it
+ target_node = self.nodes_cache.get(node_name)
+ if target_node is None or target_node.redis_connection is None:
+ # create new cluster node for this cluster
+ target_node = ClusterNode(host, port, role)
+
+ return target_node
+
def initialize(self):
"""
Initializes the nodes cache, slots cache and redis connections.
@@ -1521,14 +1538,14 @@ class NodesManager:
for slot in cluster_slots:
primary_node = slot[2]
- host = primary_node[0]
+ host = str_if_bytes(primary_node[0])
if host == "":
host = startup_node.host
port = int(primary_node[1])
- target_node = tmp_nodes_cache.get(get_node_name(host, port))
- if target_node is None:
- target_node = ClusterNode(host, port, PRIMARY)
+ target_node = self._get_or_create_cluster_node(
+ host, port, PRIMARY, tmp_nodes_cache
+ )
# add this node to the nodes cache
tmp_nodes_cache[target_node.name] = target_node
@@ -1539,14 +1556,12 @@ class NodesManager:
replica_nodes = [slot[j] for j in range(3, len(slot))]
for replica_node in replica_nodes:
- host = replica_node[0]
+ host = str_if_bytes(replica_node[0])
port = replica_node[1]
- target_replica_node = tmp_nodes_cache.get(
- get_node_name(host, port)
+ target_replica_node = self._get_or_create_cluster_node(
+ host, port, REPLICA, tmp_nodes_cache
)
- if target_replica_node is None:
- target_replica_node = ClusterNode(host, port, REPLICA)
tmp_slots[i].append(target_replica_node)
# add this node to the nodes cache
tmp_nodes_cache[
@@ -1598,7 +1613,7 @@ class NodesManager:
# Set the default node
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
# Populate the startup nodes with all discovered nodes
- self.populate_startup_nodes(self.nodes_cache.values())
+ self.startup_nodes = tmp_nodes_cache
# If initialize was called after a MovedError, clear it
self._moved_exception = None