diff options
author | Bar Shaul <88437685+barshaul@users.noreply.github.com> | 2022-11-10 13:16:49 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-10 13:16:49 +0200 |
commit | 67214cc3eaa7890c87e45550b8320779f954094b (patch) | |
tree | 3bca8b8913224255bdf72de79265ca0441cecb1c /redis/asyncio/cluster.py | |
parent | bb06ccd52924800ac501d17c8a42038c8e5c5770 (diff) | |
download | redis-py-67214cc3eaa7890c87e45550b8320779f954094b.tar.gz |
Failover handling improvements for RedisCluster and Async RedisCluster (#2377)
* Cluster&AsyncCluster: Removed handling of timeouts/connection errors within the cluster loop, fixed "cannot pickle '_thread.lock' object" bug, added client's side failover handling improvements
* Fixed linters
* Type fixes
* Added to CHANGES
* Added getter and setter for the client's retry object and added more tests
* Fixed linters
* Fixed test
* Fixed test_client_kill test
* Changed get_default_backoff to default_backoff, removed retry_on_error and connection_error_retry_attempts from RedisCluster, default retry changed to no retries
* Fixing linters
* Reverting deletion of connection_error_retry_attempts to maintain backward compatibility
* Updating retry object for existing and new connections
* Changed the default value of reinitialize_steps from 10 to 5
* fix review comments
Co-authored-by: Chayim <chayim@users.noreply.github.com>
Co-authored-by: dvora-h <dvora.heller@redis.com>
Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
Diffstat (limited to 'redis/asyncio/cluster.py')
-rw-r--r-- | redis/asyncio/cluster.py | 121 |
1 files changed, 63 insertions, 58 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 57aafbd..d5a38b2 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -26,6 +26,8 @@ from redis.asyncio.connection import ( ) from redis.asyncio.lock import Lock from redis.asyncio.parser import CommandsParser +from redis.asyncio.retry import Retry +from redis.backoff import default_backoff from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis from redis.cluster import ( PIPELINE_BLOCKED_COMMANDS, @@ -110,10 +112,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand :param startup_nodes: | :class:`~.ClusterNode` to used as a startup node :param require_full_coverage: - | When set to ``False``: the client will not require a full coverage of the - slots. However, if not all slots are covered, and at least one node has - ``cluster-require-full-coverage`` set to ``yes``, the server will throw a - :class:`~.ClusterDownError` for some key-based commands. + | When set to ``False``: the client will not require a full coverage of + the slots. However, if not all slots are covered, and at least one node + has ``cluster-require-full-coverage`` set to ``yes``, the server will throw + a :class:`~.ClusterDownError` for some key-based commands. | When set to ``True``: all slots must be covered to construct the cluster client. If not all slots are covered, :class:`~.RedisClusterException` will be thrown. @@ -136,7 +138,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered :param connection_error_retry_attempts: | Number of times to retry before reinitializing when :class:`~.TimeoutError` - or :class:`~.ConnectionError` are encountered + or :class:`~.ConnectionError` are encountered. + The default backoff strategy will be set if Retry object is not passed (see + default_backoff in backoff.py). To change it, pass a custom Retry object + using the "retry" keyword. :param max_connections: | Maximum number of connections per node. If there are no free connections & the maximum number of connections are already created, a @@ -214,9 +219,9 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand startup_nodes: Optional[List["ClusterNode"]] = None, require_full_coverage: bool = True, read_from_replicas: bool = False, - reinitialize_steps: int = 10, + reinitialize_steps: int = 5, cluster_error_retry_attempts: int = 3, - connection_error_retry_attempts: int = 5, + connection_error_retry_attempts: int = 3, max_connections: int = 2**31, # Client related kwargs db: Union[str, int] = 0, @@ -235,6 +240,8 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand socket_keepalive: bool = False, socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, socket_timeout: Optional[float] = None, + retry: Optional["Retry"] = None, + retry_on_error: Optional[List[Exception]] = None, # SSL related kwargs ssl: bool = False, ssl_ca_certs: Optional[str] = None, @@ -282,6 +289,7 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand "socket_keepalive": socket_keepalive, "socket_keepalive_options": socket_keepalive_options, "socket_timeout": socket_timeout, + "retry": retry, } if ssl: @@ -302,6 +310,18 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand # Call our on_connect function to configure READONLY mode kwargs["redis_connect_func"] = self.on_connect + self.retry = retry + if retry or retry_on_error or connection_error_retry_attempts > 0: + # Set a retry object for all cluster nodes + self.retry = retry or Retry( + default_backoff(), connection_error_retry_attempts + ) + if not retry_on_error: + # Default errors for retrying + retry_on_error = [ConnectionError, TimeoutError] + self.retry.update_supported_errors(retry_on_error) + kwargs.update({"retry": self.retry}) + kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy() self.connection_kwargs = kwargs @@ -323,7 +343,6 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand self.reinitialize_steps = reinitialize_steps self.cluster_error_retry_attempts = cluster_error_retry_attempts self.connection_error_retry_attempts = connection_error_retry_attempts - self.reinitialize_counter = 0 self.commands_parser = CommandsParser() self.node_flags = self.__class__.NODE_FLAGS.copy() @@ -481,6 +500,16 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand """Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`.""" return self.connection_kwargs + def get_retry(self) -> Optional["Retry"]: + return self.retry + + def set_retry(self, retry: "Retry") -> None: + self.retry = retry + for node in self.get_nodes(): + node.connection_kwargs.update({"retry": retry}) + for conn in node._connections: + conn.retry = retry + def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None: """Set a custom response callback.""" self.response_callbacks[command] = callback @@ -618,9 +647,11 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand if passed_targets and not self._is_node_flag(passed_targets): target_nodes = self._parse_target_nodes(passed_targets) target_nodes_specified = True - retry_attempts = 1 + retry_attempts = 0 - for _ in range(retry_attempts): + # Add one for the first execution + execute_attempts = 1 + retry_attempts + for _ in range(execute_attempts): if self._initialize: await self.initialize() try: @@ -658,17 +689,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand ) return dict(zip(keys, values)) except Exception as e: - if type(e) in self.__class__.ERRORS_ALLOW_RETRY: - # The nodes and slots cache were reinitialized. + if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: + # The nodes and slots cache were should be reinitialized. # Try again with the new cluster setup. - exception = e + retry_attempts -= 1 + continue else: - # All other errors should be raised. - raise - - # If it fails the configured number of times then raise exception back - # to caller of this method - raise exception + # raise the exception + raise e async def _execute_command( self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any @@ -676,7 +704,6 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand asking = moved = False redirect_addr = None ttl = self.RedisClusterRequestTTL - connection_error_retry_counter = 0 while ttl > 0: ttl -= 1 @@ -695,25 +722,18 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand moved = False return await target_node.execute_command(*args, **kwargs) - except BusyLoadingError: + except (BusyLoadingError, MaxConnectionsError): + raise + except (ConnectionError, TimeoutError): + # Connection retries are being handled in the node's + # Retry object. + # Remove the failed node from the startup nodes before we try + # to reinitialize the cluster + self.nodes_manager.startup_nodes.pop(target_node.name, None) + # Hard force of reinitialize of the node/slots setup + # and try again with the new setup + await self.close() raise - except (ConnectionError, TimeoutError) as e: - # Give the node 0.25 seconds to get back up and retry again with the - # same node and configuration. After the defined number of attempts, try - # to reinitialize the cluster and try again. - connection_error_retry_counter += 1 - if ( - connection_error_retry_counter - < self.connection_error_retry_attempts - ): - await asyncio.sleep(0.25) - else: - if isinstance(e, MaxConnectionsError): - raise - # Hard force of reinitialize of the node/slots setup - # and try again with the new setup - await self.close() - raise except ClusterDownError: # ClusterDownError can occur during a failover and to get # self-healed, we will try to reinitialize the cluster layout @@ -1145,26 +1165,11 @@ class NodesManager: ) cluster_slots = await startup_node.execute_command("CLUSTER SLOTS") startup_nodes_reachable = True - except (ConnectionError, TimeoutError) as e: + except Exception as e: + # Try the next startup node. + # The exception is saved and raised only if we have no more nodes. exception = e continue - except ResponseError as e: - # Isn't a cluster connection, so it won't parse these - # exceptions automatically - message = e.__str__() - if "CLUSTERDOWN" in message or "MASTERDOWN" in message: - continue - else: - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server: {startup_node}. error: {message}" - ) - except Exception as e: - message = e.__str__() - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server {startup_node.name}. error: {message}" - ) # CLUSTER SLOTS command results in the following output: # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] @@ -1245,8 +1250,8 @@ class NodesManager: if not startup_nodes_reachable: raise RedisClusterException( - "Redis Cluster cannot be connected. Please provide at least " - "one reachable node. " + f"Redis Cluster cannot be connected. Please provide at least " + f"one reachable node: {str(exception)}" ) from exception # Check if the slots are not fully covered |