From 3347888bfa19f9e82a71ae6dc13a4837c87ea893 Mon Sep 17 00:00:00 2001 From: Bar Shaul <88437685+barshaul@users.noreply.github.com> Date: Thu, 23 Dec 2021 12:18:02 +0200 Subject: Retry on error exception and timeout fixes (#1821) --- redis/cluster.py | 101 +++++++++++++++++++++++++++---------------------------- 1 file changed, 49 insertions(+), 52 deletions(-) (limited to 'redis/cluster.py') diff --git a/redis/cluster.py b/redis/cluster.py index 0c2fc71..5707a9d 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -17,6 +17,7 @@ from redis.exceptions import ( ClusterCrossSlotError, ClusterDownError, ClusterError, + ConnectionError, DataError, MasterDownError, MovedError, @@ -374,6 +375,12 @@ class RedisCluster(RedisClusterCommands): ), ) + ERRORS_ALLOW_RETRY = ( + ConnectionError, + TimeoutError, + ClusterDownError, + ) + def __init__( self, host=None, @@ -385,8 +392,6 @@ class RedisCluster(RedisClusterCommands): reinitialize_steps=10, read_from_replicas=False, url=None, - retry_on_timeout=False, - retry=None, **kwargs, ): """ @@ -417,11 +422,6 @@ class RedisCluster(RedisClusterCommands): :cluster_error_retry_attempts: 'int' Retry command execution attempts when encountering ClusterDownError or ConnectionError - :retry_on_timeout: 'bool' - To specify a retry policy, first set `retry_on_timeout` to `True` - then set `retry` to a valid `Retry` object - :retry: 'Retry' - a `Retry` object :reinitialize_steps: 'int' Specifies the number of MOVED errors that need to occur before reinitializing the whole cluster topology. If a MOVED error occurs @@ -452,9 +452,6 @@ class RedisCluster(RedisClusterCommands): "Argument 'db' is not possible to use in cluster mode" ) - if retry_on_timeout: - kwargs.update({"retry_on_timeout": retry_on_timeout, "retry": retry}) - # Get the startup node/s from_url = False if url is not None: @@ -850,7 +847,7 @@ class RedisCluster(RedisClusterCommands): def execute_command(self, *args, **kwargs): """ - Wrapper for ClusterDownError and ConnectionError error handling. + Wrapper for ERRORS_ALLOW_RETRY error handling. It will try the number of times specified by the config option "self.cluster_error_retry_attempts" which defaults to 3 unless manually @@ -865,18 +862,19 @@ class RedisCluster(RedisClusterCommands): dict """ target_nodes_specified = False - target_nodes = kwargs.pop("target_nodes", None) - if target_nodes is not None and not self._is_nodes_flag(target_nodes): - target_nodes = self._parse_target_nodes(target_nodes) + target_nodes = None + passed_targets = kwargs.pop("target_nodes", None) + if passed_targets is not None and not self._is_nodes_flag(passed_targets): + target_nodes = self._parse_target_nodes(passed_targets) target_nodes_specified = True - # If ClusterDownError/ConnectionError were thrown, the nodes - # and slots cache were reinitialized. We will retry executing the - # command with the updated cluster setup only when the target nodes - # can be determined again with the new cache tables. Therefore, - # when target nodes were passed to this function, we cannot retry - # the command execution since the nodes may not be valid anymore - # after the tables were reinitialized. So in case of passed target - # nodes, retry_attempts will be set to 1. + # If an error that allows retrying was thrown, the nodes and slots + # cache were reinitialized. We will retry executing the command with + # the updated cluster setup only when the target nodes can be + # determined again with the new cache tables. Therefore, when target + # nodes were passed to this function, we cannot retry the command + # execution since the nodes may not be valid anymore after the tables + # were reinitialized. So in case of passed target nodes, + # retry_attempts will be set to 1. retry_attempts = ( 1 if target_nodes_specified else self.cluster_error_retry_attempts ) @@ -887,7 +885,7 @@ class RedisCluster(RedisClusterCommands): if not target_nodes_specified: # Determine the nodes to execute the command on target_nodes = self._determine_nodes( - *args, **kwargs, nodes_flag=target_nodes + *args, **kwargs, nodes_flag=passed_targets ) if not target_nodes: raise RedisClusterException( @@ -897,11 +895,14 @@ class RedisCluster(RedisClusterCommands): res[node.name] = self._execute_command(node, *args, **kwargs) # Return the processed result return self._process_result(args[0], res, **kwargs) - except (ClusterDownError, ConnectionError) as e: - # The nodes and slots cache were reinitialized. - # Try again with the new cluster setup. All other errors - # should be raised. - exception = e + except BaseException as e: + if type(e) in RedisCluster.ERRORS_ALLOW_RETRY: + # The nodes and slots cache were reinitialized. + # Try again with the new cluster setup. + exception = e + else: + # All other errors should be raised. + raise e # If it fails the configured number of times then raise exception back # to caller of this method @@ -953,11 +954,11 @@ class RedisCluster(RedisClusterCommands): ) return response - except (RedisClusterException, BusyLoadingError): - log.exception("RedisClusterException || BusyLoadingError") + except (RedisClusterException, BusyLoadingError) as e: + log.exception(type(e)) raise - except ConnectionError: - log.exception("ConnectionError") + except (ConnectionError, TimeoutError) as e: + log.exception(type(e)) # ConnectionError can also be raised if we couldn't get a # connection from the pool before timing out, so check that # this is an actual connection before attempting to disconnect. @@ -976,13 +977,6 @@ class RedisCluster(RedisClusterCommands): # and try again with the new setup self.nodes_manager.initialize() raise - except TimeoutError: - log.exception("TimeoutError") - if connection is not None: - connection.disconnect() - - if ttl < self.RedisClusterRequestTTL / 2: - time.sleep(0.05) except MovedError as e: # First, we will try to patch the slots/nodes cache with the # redirected node output and try again. If MovedError exceeds @@ -1016,7 +1010,7 @@ class RedisCluster(RedisClusterCommands): # ClusterDownError can occur during a failover and to get # self-healed, we will try to reinitialize the cluster layout # and retry executing the command - time.sleep(0.05) + time.sleep(0.25) self.nodes_manager.initialize() raise e except ResponseError as e: @@ -1342,7 +1336,7 @@ class NodesManager: raise RedisClusterException( "Cluster mode is not enabled on this node" ) - cluster_slots = r.execute_command("CLUSTER SLOTS") + cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) startup_nodes_reachable = True except (ConnectionError, TimeoutError) as e: msg = e.__str__ @@ -1631,21 +1625,20 @@ class ClusterPubSub(PubSub): return self.node.redis_connection -ERRORS_ALLOW_RETRY = ( - ConnectionError, - TimeoutError, - MovedError, - AskError, - TryAgainError, -) - - class ClusterPipeline(RedisCluster): """ Support for Redis pipeline in cluster mode """ + ERRORS_ALLOW_RETRY = ( + ConnectionError, + TimeoutError, + MovedError, + AskError, + TryAgainError, + ) + def __init__( self, nodes_manager, @@ -1653,7 +1646,7 @@ class ClusterPipeline(RedisCluster): cluster_response_callbacks=None, startup_nodes=None, read_from_replicas=False, - cluster_error_retry_attempts=3, + cluster_error_retry_attempts=5, reinitialize_steps=10, **kwargs, ): @@ -1915,7 +1908,11 @@ class ClusterPipeline(RedisCluster): # collect all the commands we are allowed to retry. # (MOVED, ASK, or connection errors or timeout errors) attempt = sorted( - (c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)), + ( + c + for c in attempt + if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) + ), key=lambda x: x.position, ) if attempt and allow_redirections: -- cgit v1.2.1