diff options
author | Bar Shaul <88437685+barshaul@users.noreply.github.com> | 2022-06-23 14:34:08 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-23 14:34:08 +0300 |
commit | 6da80865be650344964e9497a9edcb68ff102ee8 (patch) | |
tree | 9fa3159f0575df6f854d272307b2f1e311212cf7 /redis/cluster.py | |
parent | 23fd3273ba4dbee35585f53208dc044112dd391f (diff) | |
download | redis-py-6da80865be650344964e9497a9edcb68ff102ee8.tar.gz |
Reuse the old nodes' connections when a cluster topology refresh is being done (#2235)
* A fix was made to reuse the old nodes' connections when a cluster topology refresh is being done
* Fixed RedisCluster to immediately raise AuthenticationError
* Updated CHANGES
* Fixed cluster async bgsave test to ignore "bgsave already in progress" error
* Fixed linters
Diffstat (limited to 'redis/cluster.py')
-rw-r--r-- | redis/cluster.py | 37 |
1 files changed, 26 insertions, 11 deletions
diff --git a/redis/cluster.py b/redis/cluster.py index 8e4c654..1737ec7 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -14,6 +14,7 @@ from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( AskError, + AuthenticationError, BusyLoadingError, ClusterCrossSlotError, ClusterDownError, @@ -1113,7 +1114,7 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): ) return response - except (RedisClusterException, BusyLoadingError) as e: + except (RedisClusterException, BusyLoadingError, AuthenticationError) as e: log.exception(type(e)) raise except (ConnectionError, TimeoutError) as e: @@ -1134,6 +1135,7 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): else: # Hard force of reinitialize of the node/slots setup # and try again with the new setup + target_node.redis_connection = None self.nodes_manager.initialize() raise except MovedError as e: @@ -1443,6 +1445,21 @@ class NodesManager: r = Redis(host=host, port=port, **kwargs) return r + def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): + node_name = get_node_name(host, port) + # check if we already have this node in the tmp_nodes_cache + target_node = tmp_nodes_cache.get(node_name) + if target_node is None: + # before creating a new cluster node, check if the cluster node already + # exists in the current nodes cache and has a valid connection so we can + # reuse it + target_node = self.nodes_cache.get(node_name) + if target_node is None or target_node.redis_connection is None: + # create new cluster node for this cluster + target_node = ClusterNode(host, port, role) + + return target_node + def initialize(self): """ Initializes the nodes cache, slots cache and redis connections. @@ -1521,14 +1538,14 @@ class NodesManager: for slot in cluster_slots: primary_node = slot[2] - host = primary_node[0] + host = str_if_bytes(primary_node[0]) if host == "": host = startup_node.host port = int(primary_node[1]) - target_node = tmp_nodes_cache.get(get_node_name(host, port)) - if target_node is None: - target_node = ClusterNode(host, port, PRIMARY) + target_node = self._get_or_create_cluster_node( + host, port, PRIMARY, tmp_nodes_cache + ) # add this node to the nodes cache tmp_nodes_cache[target_node.name] = target_node @@ -1539,14 +1556,12 @@ class NodesManager: replica_nodes = [slot[j] for j in range(3, len(slot))] for replica_node in replica_nodes: - host = replica_node[0] + host = str_if_bytes(replica_node[0]) port = replica_node[1] - target_replica_node = tmp_nodes_cache.get( - get_node_name(host, port) + target_replica_node = self._get_or_create_cluster_node( + host, port, REPLICA, tmp_nodes_cache ) - if target_replica_node is None: - target_replica_node = ClusterNode(host, port, REPLICA) tmp_slots[i].append(target_replica_node) # add this node to the nodes cache tmp_nodes_cache[ @@ -1598,7 +1613,7 @@ class NodesManager: # Set the default node self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] # Populate the startup nodes with all discovered nodes - self.populate_startup_nodes(self.nodes_cache.values()) + self.startup_nodes = tmp_nodes_cache # If initialize was called after a MovedError, clear it self._moved_exception = None |