diff options
author | Bar Shaul <88437685+barshaul@users.noreply.github.com> | 2022-11-10 13:16:49 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-10 13:16:49 +0200 |
commit | 67214cc3eaa7890c87e45550b8320779f954094b (patch) | |
tree | 3bca8b8913224255bdf72de79265ca0441cecb1c /redis | |
parent | bb06ccd52924800ac501d17c8a42038c8e5c5770 (diff) | |
download | redis-py-67214cc3eaa7890c87e45550b8320779f954094b.tar.gz |
Failover handling improvements for RedisCluster and Async RedisCluster (#2377)
* Cluster&AsyncCluster: Removed handling of timeouts/connection errors within the cluster loop, fixed "cannot pickle '_thread.lock' object" bug, added client's side failover handling improvements
* Fixed linters
* Type fixes
* Added to CHANGES
* Added getter and setter for the client's retry object and added more tests
* Fixed linters
* Fixed test
* Fixed test_client_kill test
* Changed get_default_backoff to default_backoff, removed retry_on_error and connection_error_retry_attempts from RedisCluster, default retry changed to no retries
* Fixing linters
* Reverting deletion of connection_error_retry_attempts to maintain backward compatibility
* Updating retry object for existing and new connections
* Changed the default value of reinitialize_steps from 10 to 5
* fix review comments
Co-authored-by: Chayim <chayim@users.noreply.github.com>
Co-authored-by: dvora-h <dvora.heller@redis.com>
Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
Diffstat (limited to 'redis')
-rw-r--r-- | redis/__init__.py | 2 | ||||
-rw-r--r-- | redis/asyncio/__init__.py | 2 | ||||
-rw-r--r-- | redis/asyncio/client.py | 7 | ||||
-rw-r--r-- | redis/asyncio/cluster.py | 121 | ||||
-rw-r--r-- | redis/asyncio/connection.py | 8 | ||||
-rw-r--r-- | redis/backoff.py | 17 | ||||
-rwxr-xr-x | redis/client.py | 8 | ||||
-rw-r--r-- | redis/cluster.py | 187 | ||||
-rwxr-xr-x | redis/connection.py | 9 |
9 files changed, 198 insertions, 163 deletions
diff --git a/redis/__init__.py b/redis/__init__.py index 5201fe2..6503ac3 100644 --- a/redis/__init__.py +++ b/redis/__init__.py @@ -1,5 +1,6 @@ import sys +from redis.backoff import default_backoff from redis.client import Redis, StrictRedis from redis.cluster import RedisCluster from redis.connection import ( @@ -66,6 +67,7 @@ __all__ = [ "CredentialProvider", "DataError", "from_url", + "default_backoff", "InvalidResponse", "PubSubError", "ReadOnlyError", diff --git a/redis/asyncio/__init__.py b/redis/asyncio/__init__.py index 598791a..bf90dde 100644 --- a/redis/asyncio/__init__.py +++ b/redis/asyncio/__init__.py @@ -15,6 +15,7 @@ from redis.asyncio.sentinel import ( SentinelManagedSSLConnection, ) from redis.asyncio.utils import from_url +from redis.backoff import default_backoff from redis.exceptions import ( AuthenticationError, AuthenticationWrongNumberOfArgsError, @@ -43,6 +44,7 @@ __all__ = [ "ConnectionPool", "DataError", "from_url", + "default_backoff", "InvalidResponse", "PubSubError", "ReadOnlyError", diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index c085571..e0ed85e 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -276,6 +276,13 @@ class Redis( """Get the connection's key-word arguments""" return self.connection_pool.connection_kwargs + def get_retry(self) -> Optional["Retry"]: + return self.get_connection_kwargs().get("retry") + + def set_retry(self, retry: "Retry") -> None: + self.get_connection_kwargs().update({"retry": retry}) + self.connection_pool.set_retry(retry) + def load_external_module(self, funcname, func): """ This function can be used to add externally defined redis modules, diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 57aafbd..d5a38b2 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -26,6 +26,8 @@ from redis.asyncio.connection import ( ) from redis.asyncio.lock import Lock from redis.asyncio.parser import CommandsParser +from redis.asyncio.retry import Retry +from redis.backoff import default_backoff from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis from redis.cluster import ( PIPELINE_BLOCKED_COMMANDS, @@ -110,10 +112,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand :param startup_nodes: | :class:`~.ClusterNode` to used as a startup node :param require_full_coverage: - | When set to ``False``: the client will not require a full coverage of the - slots. However, if not all slots are covered, and at least one node has - ``cluster-require-full-coverage`` set to ``yes``, the server will throw a - :class:`~.ClusterDownError` for some key-based commands. + | When set to ``False``: the client will not require a full coverage of + the slots. However, if not all slots are covered, and at least one node + has ``cluster-require-full-coverage`` set to ``yes``, the server will throw + a :class:`~.ClusterDownError` for some key-based commands. | When set to ``True``: all slots must be covered to construct the cluster client. If not all slots are covered, :class:`~.RedisClusterException` will be thrown. @@ -136,7 +138,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered :param connection_error_retry_attempts: | Number of times to retry before reinitializing when :class:`~.TimeoutError` - or :class:`~.ConnectionError` are encountered + or :class:`~.ConnectionError` are encountered. + The default backoff strategy will be set if Retry object is not passed (see + default_backoff in backoff.py). To change it, pass a custom Retry object + using the "retry" keyword. :param max_connections: | Maximum number of connections per node. If there are no free connections & the maximum number of connections are already created, a @@ -214,9 +219,9 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand startup_nodes: Optional[List["ClusterNode"]] = None, require_full_coverage: bool = True, read_from_replicas: bool = False, - reinitialize_steps: int = 10, + reinitialize_steps: int = 5, cluster_error_retry_attempts: int = 3, - connection_error_retry_attempts: int = 5, + connection_error_retry_attempts: int = 3, max_connections: int = 2**31, # Client related kwargs db: Union[str, int] = 0, @@ -235,6 +240,8 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand socket_keepalive: bool = False, socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, socket_timeout: Optional[float] = None, + retry: Optional["Retry"] = None, + retry_on_error: Optional[List[Exception]] = None, # SSL related kwargs ssl: bool = False, ssl_ca_certs: Optional[str] = None, @@ -282,6 +289,7 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand "socket_keepalive": socket_keepalive, "socket_keepalive_options": socket_keepalive_options, "socket_timeout": socket_timeout, + "retry": retry, } if ssl: @@ -302,6 +310,18 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand # Call our on_connect function to configure READONLY mode kwargs["redis_connect_func"] = self.on_connect + self.retry = retry + if retry or retry_on_error or connection_error_retry_attempts > 0: + # Set a retry object for all cluster nodes + self.retry = retry or Retry( + default_backoff(), connection_error_retry_attempts + ) + if not retry_on_error: + # Default errors for retrying + retry_on_error = [ConnectionError, TimeoutError] + self.retry.update_supported_errors(retry_on_error) + kwargs.update({"retry": self.retry}) + kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy() self.connection_kwargs = kwargs @@ -323,7 +343,6 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand self.reinitialize_steps = reinitialize_steps self.cluster_error_retry_attempts = cluster_error_retry_attempts self.connection_error_retry_attempts = connection_error_retry_attempts - self.reinitialize_counter = 0 self.commands_parser = CommandsParser() self.node_flags = self.__class__.NODE_FLAGS.copy() @@ -481,6 +500,16 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand """Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`.""" return self.connection_kwargs + def get_retry(self) -> Optional["Retry"]: + return self.retry + + def set_retry(self, retry: "Retry") -> None: + self.retry = retry + for node in self.get_nodes(): + node.connection_kwargs.update({"retry": retry}) + for conn in node._connections: + conn.retry = retry + def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None: """Set a custom response callback.""" self.response_callbacks[command] = callback @@ -618,9 +647,11 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand if passed_targets and not self._is_node_flag(passed_targets): target_nodes = self._parse_target_nodes(passed_targets) target_nodes_specified = True - retry_attempts = 1 + retry_attempts = 0 - for _ in range(retry_attempts): + # Add one for the first execution + execute_attempts = 1 + retry_attempts + for _ in range(execute_attempts): if self._initialize: await self.initialize() try: @@ -658,17 +689,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand ) return dict(zip(keys, values)) except Exception as e: - if type(e) in self.__class__.ERRORS_ALLOW_RETRY: - # The nodes and slots cache were reinitialized. + if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: + # The nodes and slots cache were should be reinitialized. # Try again with the new cluster setup. - exception = e + retry_attempts -= 1 + continue else: - # All other errors should be raised. - raise - - # If it fails the configured number of times then raise exception back - # to caller of this method - raise exception + # raise the exception + raise e async def _execute_command( self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any @@ -676,7 +704,6 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand asking = moved = False redirect_addr = None ttl = self.RedisClusterRequestTTL - connection_error_retry_counter = 0 while ttl > 0: ttl -= 1 @@ -695,25 +722,18 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand moved = False return await target_node.execute_command(*args, **kwargs) - except BusyLoadingError: + except (BusyLoadingError, MaxConnectionsError): + raise + except (ConnectionError, TimeoutError): + # Connection retries are being handled in the node's + # Retry object. + # Remove the failed node from the startup nodes before we try + # to reinitialize the cluster + self.nodes_manager.startup_nodes.pop(target_node.name, None) + # Hard force of reinitialize of the node/slots setup + # and try again with the new setup + await self.close() raise - except (ConnectionError, TimeoutError) as e: - # Give the node 0.25 seconds to get back up and retry again with the - # same node and configuration. After the defined number of attempts, try - # to reinitialize the cluster and try again. - connection_error_retry_counter += 1 - if ( - connection_error_retry_counter - < self.connection_error_retry_attempts - ): - await asyncio.sleep(0.25) - else: - if isinstance(e, MaxConnectionsError): - raise - # Hard force of reinitialize of the node/slots setup - # and try again with the new setup - await self.close() - raise except ClusterDownError: # ClusterDownError can occur during a failover and to get # self-healed, we will try to reinitialize the cluster layout @@ -1145,26 +1165,11 @@ class NodesManager: ) cluster_slots = await startup_node.execute_command("CLUSTER SLOTS") startup_nodes_reachable = True - except (ConnectionError, TimeoutError) as e: + except Exception as e: + # Try the next startup node. + # The exception is saved and raised only if we have no more nodes. exception = e continue - except ResponseError as e: - # Isn't a cluster connection, so it won't parse these - # exceptions automatically - message = e.__str__() - if "CLUSTERDOWN" in message or "MASTERDOWN" in message: - continue - else: - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server: {startup_node}. error: {message}" - ) - except Exception as e: - message = e.__str__() - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server {startup_node.name}. error: {message}" - ) # CLUSTER SLOTS command results in the following output: # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] @@ -1245,8 +1250,8 @@ class NodesManager: if not startup_nodes_reachable: raise RedisClusterException( - "Redis Cluster cannot be connected. Please provide at least " - "one reachable node. " + f"Redis Cluster cannot be connected. Please provide at least " + f"one reachable node: {str(exception)}" ) from exception # Check if the slots are not fully covered diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index df066c4..4f19153 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -497,7 +497,7 @@ class Connection: retry_on_error.append(socket.timeout) retry_on_error.append(asyncio.TimeoutError) self.retry_on_error = retry_on_error - if retry_on_error: + if retry or retry_on_error: if not retry: self.retry = Retry(NoBackoff(), 1) else: @@ -1445,6 +1445,12 @@ class ConnectionPool: if exc: raise exc + def set_retry(self, retry: "Retry") -> None: + for conn in self._available_connections: + conn.retry = retry + for conn in self._in_use_connections: + conn.retry = retry + class BlockingConnectionPool(ConnectionPool): """ diff --git a/redis/backoff.py b/redis/backoff.py index 5ccdb91..c62e760 100644 --- a/redis/backoff.py +++ b/redis/backoff.py @@ -1,6 +1,11 @@ import random from abc import ABC, abstractmethod +# Maximum backoff between each retry in seconds +DEFAULT_CAP = 0.512 +# Minimum backoff between each retry in seconds +DEFAULT_BASE = 0.008 + class AbstractBackoff(ABC): """Backoff interface""" @@ -40,7 +45,7 @@ class NoBackoff(ConstantBackoff): class ExponentialBackoff(AbstractBackoff): """Exponential backoff upon failure""" - def __init__(self, cap, base): + def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -55,7 +60,7 @@ class ExponentialBackoff(AbstractBackoff): class FullJitterBackoff(AbstractBackoff): """Full jitter backoff upon failure""" - def __init__(self, cap, base): + def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -70,7 +75,7 @@ class FullJitterBackoff(AbstractBackoff): class EqualJitterBackoff(AbstractBackoff): """Equal jitter backoff upon failure""" - def __init__(self, cap, base): + def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -86,7 +91,7 @@ class EqualJitterBackoff(AbstractBackoff): class DecorrelatedJitterBackoff(AbstractBackoff): """Decorrelated jitter backoff upon failure""" - def __init__(self, cap, base): + def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -103,3 +108,7 @@ class DecorrelatedJitterBackoff(AbstractBackoff): temp = random.uniform(self._base, max_backoff) self._previous_backoff = min(self._cap, temp) return self._previous_backoff + + +def default_backoff(): + return EqualJitterBackoff() diff --git a/redis/client.py b/redis/client.py index 8356ba7..ed857c8 100755 --- a/redis/client.py +++ b/redis/client.py @@ -26,6 +26,7 @@ from redis.exceptions import ( WatchError, ) from redis.lock import Lock +from redis.retry import Retry from redis.utils import safe_str, str_if_bytes SYM_EMPTY = b"" @@ -1047,6 +1048,13 @@ class Redis(AbstractRedis, RedisModuleCommands, CoreCommands, SentinelCommands): """Get the connection's key-word arguments""" return self.connection_pool.connection_kwargs + def get_retry(self) -> Optional["Retry"]: + return self.get_connection_kwargs().get("retry") + + def set_retry(self, retry: "Retry") -> None: + self.get_connection_kwargs().update({"retry": retry}) + self.connection_pool.set_retry(retry) + def set_response_callback(self, command, callback): """Set a custom Response Callback""" self.response_callbacks[command] = callback diff --git a/redis/cluster.py b/redis/cluster.py index 027fe40..91deaea 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1,12 +1,12 @@ -import copy import random import socket import sys import threading import time from collections import OrderedDict -from typing import Any, Callable, Dict, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from redis.backoff import default_backoff from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url @@ -29,6 +29,7 @@ from redis.exceptions import ( TryAgainError, ) from redis.lock import Lock +from redis.retry import Retry from redis.utils import ( dict_merge, list_keys_to_dict, @@ -426,27 +427,28 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): def __init__( self, - host=None, - port=6379, - startup_nodes=None, - cluster_error_retry_attempts=3, - require_full_coverage=False, - reinitialize_steps=10, - read_from_replicas=False, - dynamic_startup_nodes=True, - url=None, + host: Optional[str] = None, + port: int = 6379, + startup_nodes: Optional[List["ClusterNode"]] = None, + cluster_error_retry_attempts: int = 3, + retry: Optional["Retry"] = None, + require_full_coverage: bool = False, + reinitialize_steps: int = 5, + read_from_replicas: bool = False, + dynamic_startup_nodes: bool = True, + url: Optional[str] = None, **kwargs, ): """ Initialize a new RedisCluster client. - :startup_nodes: 'list[ClusterNode]' + :param startup_nodes: List of nodes from which initial bootstrapping can be done - :host: 'str' + :param host: Can be used to point to a startup node - :port: 'int' + :param port: Can be used to point to a startup node - :require_full_coverage: 'bool' + :param require_full_coverage: When set to False (default value): the client will not require a full coverage of the slots. However, if not all slots are covered, and at least one node has 'cluster-require-full-coverage' set to @@ -456,12 +458,12 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): When set to True: all slots must be covered to construct the cluster client. If not all slots are covered, RedisClusterException will be thrown. - :read_from_replicas: 'bool' + :param read_from_replicas: Enable read from replicas in READONLY mode. You can read possibly stale data. When set to true, read commands will be assigned between the primary and its replications in a Round-Robin manner. - :dynamic_startup_nodes: 'bool' + :param dynamic_startup_nodes: Set the RedisCluster's startup nodes to all of the discovered nodes. If true (default value), the cluster's discovered nodes will be used to determine the cluster nodes-slots mapping in the next topology refresh. @@ -469,10 +471,11 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): listed in the CLUSTER SLOTS output. If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists specific IP addresses, it is best to set it to false. - :cluster_error_retry_attempts: 'int' - Retry command execution attempts when encountering ClusterDownError - or ConnectionError - :reinitialize_steps: 'int' + :param cluster_error_retry_attempts: + Number of times to retry before raising an error when + :class:`~.TimeoutError` or :class:`~.ConnectionError` or + :class:`~.ClusterDownError` are encountered + :param reinitialize_steps: Specifies the number of MOVED errors that need to occur before reinitializing the whole cluster topology. If a MOVED error occurs and the cluster does not need to be reinitialized on this current @@ -540,6 +543,11 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): self.user_on_connect_func = kwargs.pop("redis_connect_func", None) kwargs.update({"redis_connect_func": self.on_connect}) kwargs = cleanup_kwargs(**kwargs) + if retry: + self.retry = retry + kwargs.update({"retry": self.retry}) + else: + kwargs.update({"retry": Retry(default_backoff(), 0)}) self.encoder = Encoder( kwargs.get("encoding", "utf-8"), @@ -666,6 +674,14 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): self.nodes_manager.default_node = node return True + def get_retry(self) -> Optional["Retry"]: + return self.retry + + def set_retry(self, retry: "Retry") -> None: + self.retry = retry + for node in self.get_nodes(): + node.redis_connection.set_retry(retry) + def monitor(self, target_node=None): """ Returns a Monitor object for the specified target node. @@ -986,12 +1002,13 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): # nodes were passed to this function, we cannot retry the command # execution since the nodes may not be valid anymore after the tables # were reinitialized. So in case of passed target nodes, - # retry_attempts will be set to 1. + # retry_attempts will be set to 0. retry_attempts = ( - 1 if target_nodes_specified else self.cluster_error_retry_attempts + 0 if target_nodes_specified else self.cluster_error_retry_attempts ) - exception = None - for _ in range(0, retry_attempts): + # Add one for the first execution + execute_attempts = 1 + retry_attempts + for _ in range(execute_attempts): try: res = {} if not target_nodes_specified: @@ -1008,18 +1025,15 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): # Return the processed result return self._process_result(args[0], res, **kwargs) except Exception as e: - if type(e) in self.__class__.ERRORS_ALLOW_RETRY: + if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: # The nodes and slots cache were reinitialized. # Try again with the new cluster setup. - exception = e + retry_attempts -= 1 + continue else: - # All other errors should be raised. + # raise the exception raise e - # If it fails the configured number of times then raise exception back - # to caller of this method - raise exception - def _execute_command(self, target_node, *args, **kwargs): """ Send a command to a node in the cluster @@ -1031,7 +1045,6 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): asking = False moved = False ttl = int(self.RedisClusterRequestTTL) - connection_error_retry_counter = 0 while ttl > 0: ttl -= 1 @@ -1064,25 +1077,21 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): except AuthenticationError: raise except (ConnectionError, TimeoutError) as e: + # Connection retries are being handled in the node's + # Retry object. # ConnectionError can also be raised if we couldn't get a # connection from the pool before timing out, so check that # this is an actual connection before attempting to disconnect. if connection is not None: connection.disconnect() - connection_error_retry_counter += 1 - - # Give the node 0.25 seconds to get back up and retry again - # with same node and configuration. After 5 attempts then try - # to reinitialize the cluster and see if the nodes - # configuration has changed or not - if connection_error_retry_counter < 5: - time.sleep(0.25) - else: - # Hard force of reinitialize of the node/slots setup - # and try again with the new setup - target_node.redis_connection = None - self.nodes_manager.initialize() - raise e + + # Remove the failed node from the startup nodes before we try + # to reinitialize the cluster + self.nodes_manager.startup_nodes.pop(target_node.name, None) + # Reset the cluster node's connection + target_node.redis_connection = None + self.nodes_manager.initialize() + raise e except MovedError as e: # First, we will try to patch the slots/nodes cache with the # redirected node output and try again. If MovedError exceeds @@ -1406,17 +1415,15 @@ class NodesManager: startup_nodes_reachable = False fully_covered = False kwargs = self.connection_kwargs + exception = None for startup_node in self.startup_nodes.values(): try: if startup_node.redis_connection: r = startup_node.redis_connection else: - # Create a new Redis connection and let Redis decode the - # responses so we won't need to handle that - copy_kwargs = copy.deepcopy(kwargs) - copy_kwargs.update({"decode_responses": True, "encoding": "utf-8"}) + # Create a new Redis connection r = self.create_redis_node( - startup_node.host, startup_node.port, **copy_kwargs + startup_node.host, startup_node.port, **kwargs ) self.startup_nodes[startup_node.name].redis_connection = r # Make sure cluster mode is enabled on this node @@ -1426,25 +1433,11 @@ class NodesManager: ) cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) startup_nodes_reachable = True - except (ConnectionError, TimeoutError): - continue - except ResponseError as e: - # Isn't a cluster connection, so it won't parse these - # exceptions automatically - message = e.__str__() - if "CLUSTERDOWN" in message or "MASTERDOWN" in message: - continue - else: - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server: {startup_node}. error: {message}" - ) except Exception as e: - message = e.__str__() - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server {startup_node.name}. error: {message}" - ) + # Try the next startup node. + # The exception is saved and raised only if we have no more nodes. + exception = e + continue # CLUSTER SLOTS command results in the following output: # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] @@ -1514,9 +1507,9 @@ class NodesManager: if not startup_nodes_reachable: raise RedisClusterException( - "Redis Cluster cannot be connected. Please provide at least " - "one reachable node. " - ) + f"Redis Cluster cannot be connected. Please provide at least " + f"one reachable node: {str(exception)}" + ) from exception # Create Redis connections to all nodes self.create_redis_connections(list(tmp_nodes_cache.values())) @@ -1699,14 +1692,14 @@ class ClusterPipeline(RedisCluster): def __init__( self, - nodes_manager, - commands_parser, - result_callbacks=None, - cluster_response_callbacks=None, - startup_nodes=None, - read_from_replicas=False, - cluster_error_retry_attempts=5, - reinitialize_steps=10, + nodes_manager: "NodesManager", + commands_parser: "CommandsParser", + result_callbacks: Optional[Dict[str, Callable]] = None, + cluster_response_callbacks: Optional[Dict[str, Callable]] = None, + startup_nodes: Optional[List["ClusterNode"]] = None, + read_from_replicas: bool = False, + cluster_error_retry_attempts: int = 3, + reinitialize_steps: int = 5, lock=None, **kwargs, ): @@ -1858,22 +1851,22 @@ class ClusterPipeline(RedisCluster): """ if not stack: return [] - - for _ in range(0, self.cluster_error_retry_attempts): + retry_attempts = self.cluster_error_retry_attempts + while True: try: return self._send_cluster_commands( stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections, ) - except ClusterDownError: - # Try again with the new cluster setup. All other errors - # should be raised. - pass - - # If it fails the configured number of times then raise - # exception back to caller of this method - raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster") + except (ClusterDownError, ConnectionError) as e: + if retry_attempts > 0: + # Try again with the new cluster setup. All other errors + # should be raised. + retry_attempts -= 1 + pass + else: + raise e def _send_cluster_commands( self, stack, raise_on_error=True, allow_redirections=True @@ -1898,7 +1891,6 @@ class ClusterPipeline(RedisCluster): # we figure out the slot number that command maps to, then from # the slot determine the node. for c in attempt: - connection_error_retry_counter = 0 while True: # refer to our internal node -> slot table that # tells us where a given command should route to. @@ -1931,13 +1923,10 @@ class ClusterPipeline(RedisCluster): try: connection = get_connection(redis_node, c.args) except ConnectionError: - connection_error_retry_counter += 1 - if connection_error_retry_counter < 5: - # reinitialize the node -> slot table - self.nodes_manager.initialize() - continue - else: - raise + # Connection retries are being handled in the node's + # Retry object. Reinitialize the node -> slot table. + self.nodes_manager.initialize() + raise nodes[node_name] = NodeCommands( redis_node.parse_response, redis_node.connection_pool, diff --git a/redis/connection.py b/redis/connection.py index a2b0074..9c5b536 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -540,7 +540,7 @@ class Connection: # Add TimeoutError to the errors list to retry on retry_on_error.append(TimeoutError) self.retry_on_error = retry_on_error - if retry_on_error: + if retry or retry_on_error: if retry is None: self.retry = Retry(NoBackoff(), 1) else: @@ -1467,6 +1467,13 @@ class ConnectionPool: for connection in connections: connection.disconnect() + def set_retry(self, retry: "Retry") -> None: + self.connection_kwargs.update({"retry": retry}) + for conn in self._available_connections: + conn.retry = retry + for conn in self._in_use_connections: + conn.retry = retry + class BlockingConnectionPool(ConnectionPool): """ |