From 6da80865be650344964e9497a9edcb68ff102ee8 Mon Sep 17 00:00:00 2001 From: Bar Shaul <88437685+barshaul@users.noreply.github.com> Date: Thu, 23 Jun 2022 14:34:08 +0300 Subject: Reuse the old nodes' connections when a cluster topology refresh is being done (#2235) * A fix was made to reuse the old nodes' connections when a cluster topology refresh is being done * Fixed RedisCluster to immediately raise AuthenticationError * Updated CHANGES * Fixed cluster async bgsave test to ignore "bgsave already in progress" error * Fixed linters --- CHANGES | 2 ++ redis/cluster.py | 37 ++++++++++++++++++++++++----------- tests/test_asyncio/test_cluster.py | 10 +++++++--- tests/test_cluster.py | 40 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 14 deletions(-) diff --git a/CHANGES b/CHANGES index 2678218..4afc7ba 100644 --- a/CHANGES +++ b/CHANGES @@ -11,6 +11,8 @@ * Fix broken connection writer lock-up for asyncio (#2065) * Fix auth bug when provided with no username (#2086) * Fix missing ClusterPipeline._lock (#2189) + * Fix reusing the old nodes' connections when cluster topology refresh is being done + * Fix RedisCluster to immediately raise AuthenticationError without a retry * 4.1.3 (Feb 8, 2022) * Fix flushdb and flushall (#1926) diff --git a/redis/cluster.py b/redis/cluster.py index 8e4c654..1737ec7 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -14,6 +14,7 @@ from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( AskError, + AuthenticationError, BusyLoadingError, ClusterCrossSlotError, ClusterDownError, @@ -1113,7 +1114,7 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): ) return response - except (RedisClusterException, BusyLoadingError) as e: + except (RedisClusterException, BusyLoadingError, AuthenticationError) as e: log.exception(type(e)) raise except (ConnectionError, TimeoutError) as e: @@ -1134,6 +1135,7 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): else: # Hard force of reinitialize of the node/slots setup # and try again with the new setup + target_node.redis_connection = None self.nodes_manager.initialize() raise except MovedError as e: @@ -1443,6 +1445,21 @@ class NodesManager: r = Redis(host=host, port=port, **kwargs) return r + def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): + node_name = get_node_name(host, port) + # check if we already have this node in the tmp_nodes_cache + target_node = tmp_nodes_cache.get(node_name) + if target_node is None: + # before creating a new cluster node, check if the cluster node already + # exists in the current nodes cache and has a valid connection so we can + # reuse it + target_node = self.nodes_cache.get(node_name) + if target_node is None or target_node.redis_connection is None: + # create new cluster node for this cluster + target_node = ClusterNode(host, port, role) + + return target_node + def initialize(self): """ Initializes the nodes cache, slots cache and redis connections. @@ -1521,14 +1538,14 @@ class NodesManager: for slot in cluster_slots: primary_node = slot[2] - host = primary_node[0] + host = str_if_bytes(primary_node[0]) if host == "": host = startup_node.host port = int(primary_node[1]) - target_node = tmp_nodes_cache.get(get_node_name(host, port)) - if target_node is None: - target_node = ClusterNode(host, port, PRIMARY) + target_node = self._get_or_create_cluster_node( + host, port, PRIMARY, tmp_nodes_cache + ) # add this node to the nodes cache tmp_nodes_cache[target_node.name] = target_node @@ -1539,14 +1556,12 @@ class NodesManager: replica_nodes = [slot[j] for j in range(3, len(slot))] for replica_node in replica_nodes: - host = replica_node[0] + host = str_if_bytes(replica_node[0]) port = replica_node[1] - target_replica_node = tmp_nodes_cache.get( - get_node_name(host, port) + target_replica_node = self._get_or_create_cluster_node( + host, port, REPLICA, tmp_nodes_cache ) - if target_replica_node is None: - target_replica_node = ClusterNode(host, port, REPLICA) tmp_slots[i].append(target_replica_node) # add this node to the nodes cache tmp_nodes_cache[ @@ -1598,7 +1613,7 @@ class NodesManager: # Set the default node self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] # Populate the startup nodes with all discovered nodes - self.populate_startup_nodes(self.nodes_cache.values()) + self.startup_nodes = tmp_nodes_cache # If initialize was called after a MovedError, clear it self._moved_exception = None diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index e5ec026..f4ea5cd 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -1059,9 +1059,13 @@ class TestClusterRedisCommands: @skip_if_redis_enterprise() async def test_bgsave(self, r: RedisCluster) -> None: - assert await r.bgsave() - await asyncio.sleep(0.3) - assert await r.bgsave(True) + try: + assert await r.bgsave() + await asyncio.sleep(0.3) + assert await r.bgsave(True) + except ResponseError as e: + if "Background save already in progress" not in e.__str__(): + raise async def test_info(self, r: RedisCluster) -> None: # Map keys to same slot diff --git a/tests/test_cluster.py b/tests/test_cluster.py index d1568ef..438ef73 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -29,6 +29,7 @@ from redis.exceptions import ( RedisClusterException, RedisError, ResponseError, + TimeoutError, ) from redis.utils import str_if_bytes from tests.test_pubsub import wait_for_message @@ -651,6 +652,45 @@ class TestRedisClusterObj: else: raise e + def test_timeout_error_topology_refresh_reuse_connections(self, r): + """ + By mucking TIMEOUT errors, we'll force the cluster topology to be reinitialized, + and then ensure that only the impacted connection is replaced + """ + node = r.get_node_from_key("key") + r.set("key", "value") + node_conn_origin = {} + for n in r.get_nodes(): + node_conn_origin[n.name] = n.redis_connection + real_func = r.get_redis_connection(node).parse_response + + class counter: + def __init__(self, val=0): + self.val = int(val) + + count = counter(0) + with patch.object(Redis, "parse_response") as parse_response: + + def moved_redirect_effect(connection, *args, **options): + # raise a timeout for 5 times so we'll need to reinitilize the topology + if count.val >= 5: + parse_response.side_effect = real_func + count.val += 1 + raise TimeoutError() + + parse_response.side_effect = moved_redirect_effect + assert r.get("key") == b"value" + for node_name, conn in node_conn_origin.items(): + if node_name == node.name: + # The old redis connection of the timed out node should have been + # deleted and replaced + assert conn != r.get_redis_connection(node) + else: + # other nodes' redis connection should have been reused during the + # topology refresh + cur_node = r.get_node(node_name=node_name) + assert conn == r.get_redis_connection(cur_node) + @pytest.mark.onlycluster class TestClusterRedisCommands: -- cgit v1.2.1