From 67214cc3eaa7890c87e45550b8320779f954094b Mon Sep 17 00:00:00 2001 From: Bar Shaul <88437685+barshaul@users.noreply.github.com> Date: Thu, 10 Nov 2022 13:16:49 +0200 Subject: Failover handling improvements for RedisCluster and Async RedisCluster (#2377) * Cluster&AsyncCluster: Removed handling of timeouts/connection errors within the cluster loop, fixed "cannot pickle '_thread.lock' object" bug, added client's side failover handling improvements * Fixed linters * Type fixes * Added to CHANGES * Added getter and setter for the client's retry object and added more tests * Fixed linters * Fixed test * Fixed test_client_kill test * Changed get_default_backoff to default_backoff, removed retry_on_error and connection_error_retry_attempts from RedisCluster, default retry changed to no retries * Fixing linters * Reverting deletion of connection_error_retry_attempts to maintain backward compatibility * Updating retry object for existing and new connections * Changed the default value of reinitialize_steps from 10 to 5 * fix review comments Co-authored-by: Chayim Co-authored-by: dvora-h Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com> --- redis/cluster.py | 187 ++++++++++++++++++++++++++----------------------------- 1 file changed, 88 insertions(+), 99 deletions(-) (limited to 'redis/cluster.py') diff --git a/redis/cluster.py b/redis/cluster.py index 027fe40..91deaea 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1,12 +1,12 @@ -import copy import random import socket import sys import threading import time from collections import OrderedDict -from typing import Any, Callable, Dict, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from redis.backoff import default_backoff from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url @@ -29,6 +29,7 @@ from redis.exceptions import ( TryAgainError, ) from redis.lock import Lock +from redis.retry import Retry from redis.utils import ( dict_merge, list_keys_to_dict, @@ -426,27 +427,28 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): def __init__( self, - host=None, - port=6379, - startup_nodes=None, - cluster_error_retry_attempts=3, - require_full_coverage=False, - reinitialize_steps=10, - read_from_replicas=False, - dynamic_startup_nodes=True, - url=None, + host: Optional[str] = None, + port: int = 6379, + startup_nodes: Optional[List["ClusterNode"]] = None, + cluster_error_retry_attempts: int = 3, + retry: Optional["Retry"] = None, + require_full_coverage: bool = False, + reinitialize_steps: int = 5, + read_from_replicas: bool = False, + dynamic_startup_nodes: bool = True, + url: Optional[str] = None, **kwargs, ): """ Initialize a new RedisCluster client. - :startup_nodes: 'list[ClusterNode]' + :param startup_nodes: List of nodes from which initial bootstrapping can be done - :host: 'str' + :param host: Can be used to point to a startup node - :port: 'int' + :param port: Can be used to point to a startup node - :require_full_coverage: 'bool' + :param require_full_coverage: When set to False (default value): the client will not require a full coverage of the slots. However, if not all slots are covered, and at least one node has 'cluster-require-full-coverage' set to @@ -456,12 +458,12 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): When set to True: all slots must be covered to construct the cluster client. If not all slots are covered, RedisClusterException will be thrown. - :read_from_replicas: 'bool' + :param read_from_replicas: Enable read from replicas in READONLY mode. You can read possibly stale data. When set to true, read commands will be assigned between the primary and its replications in a Round-Robin manner. - :dynamic_startup_nodes: 'bool' + :param dynamic_startup_nodes: Set the RedisCluster's startup nodes to all of the discovered nodes. If true (default value), the cluster's discovered nodes will be used to determine the cluster nodes-slots mapping in the next topology refresh. @@ -469,10 +471,11 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): listed in the CLUSTER SLOTS output. If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists specific IP addresses, it is best to set it to false. - :cluster_error_retry_attempts: 'int' - Retry command execution attempts when encountering ClusterDownError - or ConnectionError - :reinitialize_steps: 'int' + :param cluster_error_retry_attempts: + Number of times to retry before raising an error when + :class:`~.TimeoutError` or :class:`~.ConnectionError` or + :class:`~.ClusterDownError` are encountered + :param reinitialize_steps: Specifies the number of MOVED errors that need to occur before reinitializing the whole cluster topology. If a MOVED error occurs and the cluster does not need to be reinitialized on this current @@ -540,6 +543,11 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): self.user_on_connect_func = kwargs.pop("redis_connect_func", None) kwargs.update({"redis_connect_func": self.on_connect}) kwargs = cleanup_kwargs(**kwargs) + if retry: + self.retry = retry + kwargs.update({"retry": self.retry}) + else: + kwargs.update({"retry": Retry(default_backoff(), 0)}) self.encoder = Encoder( kwargs.get("encoding", "utf-8"), @@ -666,6 +674,14 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): self.nodes_manager.default_node = node return True + def get_retry(self) -> Optional["Retry"]: + return self.retry + + def set_retry(self, retry: "Retry") -> None: + self.retry = retry + for node in self.get_nodes(): + node.redis_connection.set_retry(retry) + def monitor(self, target_node=None): """ Returns a Monitor object for the specified target node. @@ -986,12 +1002,13 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): # nodes were passed to this function, we cannot retry the command # execution since the nodes may not be valid anymore after the tables # were reinitialized. So in case of passed target nodes, - # retry_attempts will be set to 1. + # retry_attempts will be set to 0. retry_attempts = ( - 1 if target_nodes_specified else self.cluster_error_retry_attempts + 0 if target_nodes_specified else self.cluster_error_retry_attempts ) - exception = None - for _ in range(0, retry_attempts): + # Add one for the first execution + execute_attempts = 1 + retry_attempts + for _ in range(execute_attempts): try: res = {} if not target_nodes_specified: @@ -1008,18 +1025,15 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): # Return the processed result return self._process_result(args[0], res, **kwargs) except Exception as e: - if type(e) in self.__class__.ERRORS_ALLOW_RETRY: + if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: # The nodes and slots cache were reinitialized. # Try again with the new cluster setup. - exception = e + retry_attempts -= 1 + continue else: - # All other errors should be raised. + # raise the exception raise e - # If it fails the configured number of times then raise exception back - # to caller of this method - raise exception - def _execute_command(self, target_node, *args, **kwargs): """ Send a command to a node in the cluster @@ -1031,7 +1045,6 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): asking = False moved = False ttl = int(self.RedisClusterRequestTTL) - connection_error_retry_counter = 0 while ttl > 0: ttl -= 1 @@ -1064,25 +1077,21 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): except AuthenticationError: raise except (ConnectionError, TimeoutError) as e: + # Connection retries are being handled in the node's + # Retry object. # ConnectionError can also be raised if we couldn't get a # connection from the pool before timing out, so check that # this is an actual connection before attempting to disconnect. if connection is not None: connection.disconnect() - connection_error_retry_counter += 1 - - # Give the node 0.25 seconds to get back up and retry again - # with same node and configuration. After 5 attempts then try - # to reinitialize the cluster and see if the nodes - # configuration has changed or not - if connection_error_retry_counter < 5: - time.sleep(0.25) - else: - # Hard force of reinitialize of the node/slots setup - # and try again with the new setup - target_node.redis_connection = None - self.nodes_manager.initialize() - raise e + + # Remove the failed node from the startup nodes before we try + # to reinitialize the cluster + self.nodes_manager.startup_nodes.pop(target_node.name, None) + # Reset the cluster node's connection + target_node.redis_connection = None + self.nodes_manager.initialize() + raise e except MovedError as e: # First, we will try to patch the slots/nodes cache with the # redirected node output and try again. If MovedError exceeds @@ -1406,17 +1415,15 @@ class NodesManager: startup_nodes_reachable = False fully_covered = False kwargs = self.connection_kwargs + exception = None for startup_node in self.startup_nodes.values(): try: if startup_node.redis_connection: r = startup_node.redis_connection else: - # Create a new Redis connection and let Redis decode the - # responses so we won't need to handle that - copy_kwargs = copy.deepcopy(kwargs) - copy_kwargs.update({"decode_responses": True, "encoding": "utf-8"}) + # Create a new Redis connection r = self.create_redis_node( - startup_node.host, startup_node.port, **copy_kwargs + startup_node.host, startup_node.port, **kwargs ) self.startup_nodes[startup_node.name].redis_connection = r # Make sure cluster mode is enabled on this node @@ -1426,25 +1433,11 @@ class NodesManager: ) cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) startup_nodes_reachable = True - except (ConnectionError, TimeoutError): - continue - except ResponseError as e: - # Isn't a cluster connection, so it won't parse these - # exceptions automatically - message = e.__str__() - if "CLUSTERDOWN" in message or "MASTERDOWN" in message: - continue - else: - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server: {startup_node}. error: {message}" - ) except Exception as e: - message = e.__str__() - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server {startup_node.name}. error: {message}" - ) + # Try the next startup node. + # The exception is saved and raised only if we have no more nodes. + exception = e + continue # CLUSTER SLOTS command results in the following output: # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] @@ -1514,9 +1507,9 @@ class NodesManager: if not startup_nodes_reachable: raise RedisClusterException( - "Redis Cluster cannot be connected. Please provide at least " - "one reachable node. " - ) + f"Redis Cluster cannot be connected. Please provide at least " + f"one reachable node: {str(exception)}" + ) from exception # Create Redis connections to all nodes self.create_redis_connections(list(tmp_nodes_cache.values())) @@ -1699,14 +1692,14 @@ class ClusterPipeline(RedisCluster): def __init__( self, - nodes_manager, - commands_parser, - result_callbacks=None, - cluster_response_callbacks=None, - startup_nodes=None, - read_from_replicas=False, - cluster_error_retry_attempts=5, - reinitialize_steps=10, + nodes_manager: "NodesManager", + commands_parser: "CommandsParser", + result_callbacks: Optional[Dict[str, Callable]] = None, + cluster_response_callbacks: Optional[Dict[str, Callable]] = None, + startup_nodes: Optional[List["ClusterNode"]] = None, + read_from_replicas: bool = False, + cluster_error_retry_attempts: int = 3, + reinitialize_steps: int = 5, lock=None, **kwargs, ): @@ -1858,22 +1851,22 @@ class ClusterPipeline(RedisCluster): """ if not stack: return [] - - for _ in range(0, self.cluster_error_retry_attempts): + retry_attempts = self.cluster_error_retry_attempts + while True: try: return self._send_cluster_commands( stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections, ) - except ClusterDownError: - # Try again with the new cluster setup. All other errors - # should be raised. - pass - - # If it fails the configured number of times then raise - # exception back to caller of this method - raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster") + except (ClusterDownError, ConnectionError) as e: + if retry_attempts > 0: + # Try again with the new cluster setup. All other errors + # should be raised. + retry_attempts -= 1 + pass + else: + raise e def _send_cluster_commands( self, stack, raise_on_error=True, allow_redirections=True @@ -1898,7 +1891,6 @@ class ClusterPipeline(RedisCluster): # we figure out the slot number that command maps to, then from # the slot determine the node. for c in attempt: - connection_error_retry_counter = 0 while True: # refer to our internal node -> slot table that # tells us where a given command should route to. @@ -1931,13 +1923,10 @@ class ClusterPipeline(RedisCluster): try: connection = get_connection(redis_node, c.args) except ConnectionError: - connection_error_retry_counter += 1 - if connection_error_retry_counter < 5: - # reinitialize the node -> slot table - self.nodes_manager.initialize() - continue - else: - raise + # Connection retries are being handled in the node's + # Retry object. Reinitialize the node -> slot table. + self.nodes_manager.initialize() + raise nodes[node_name] = NodeCommands( redis_node.parse_response, redis_node.connection_pool, -- cgit v1.2.1