summaryrefslogtreecommitdiff
path: root/redis/asyncio/cluster.py
diff options
context:
space:
mode:
authorBar Shaul <88437685+barshaul@users.noreply.github.com>2022-11-10 13:16:49 +0200
committerGitHub <noreply@github.com>2022-11-10 13:16:49 +0200
commit67214cc3eaa7890c87e45550b8320779f954094b (patch)
tree3bca8b8913224255bdf72de79265ca0441cecb1c /redis/asyncio/cluster.py
parentbb06ccd52924800ac501d17c8a42038c8e5c5770 (diff)
downloadredis-py-67214cc3eaa7890c87e45550b8320779f954094b.tar.gz
Failover handling improvements for RedisCluster and Async RedisCluster (#2377)
* Cluster&AsyncCluster: Removed handling of timeouts/connection errors within the cluster loop, fixed "cannot pickle '_thread.lock' object" bug, added client's side failover handling improvements * Fixed linters * Type fixes * Added to CHANGES * Added getter and setter for the client's retry object and added more tests * Fixed linters * Fixed test * Fixed test_client_kill test * Changed get_default_backoff to default_backoff, removed retry_on_error and connection_error_retry_attempts from RedisCluster, default retry changed to no retries * Fixing linters * Reverting deletion of connection_error_retry_attempts to maintain backward compatibility * Updating retry object for existing and new connections * Changed the default value of reinitialize_steps from 10 to 5 * fix review comments Co-authored-by: Chayim <chayim@users.noreply.github.com> Co-authored-by: dvora-h <dvora.heller@redis.com> Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
Diffstat (limited to 'redis/asyncio/cluster.py')
-rw-r--r--redis/asyncio/cluster.py121
1 files changed, 63 insertions, 58 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index 57aafbd..d5a38b2 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -26,6 +26,8 @@ from redis.asyncio.connection import (
)
from redis.asyncio.lock import Lock
from redis.asyncio.parser import CommandsParser
+from redis.asyncio.retry import Retry
+from redis.backoff import default_backoff
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
from redis.cluster import (
PIPELINE_BLOCKED_COMMANDS,
@@ -110,10 +112,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
:param startup_nodes:
| :class:`~.ClusterNode` to used as a startup node
:param require_full_coverage:
- | When set to ``False``: the client will not require a full coverage of the
- slots. However, if not all slots are covered, and at least one node has
- ``cluster-require-full-coverage`` set to ``yes``, the server will throw a
- :class:`~.ClusterDownError` for some key-based commands.
+ | When set to ``False``: the client will not require a full coverage of
+ the slots. However, if not all slots are covered, and at least one node
+ has ``cluster-require-full-coverage`` set to ``yes``, the server will throw
+ a :class:`~.ClusterDownError` for some key-based commands.
| When set to ``True``: all slots must be covered to construct the cluster
client. If not all slots are covered, :class:`~.RedisClusterException` will be
thrown.
@@ -136,7 +138,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered
:param connection_error_retry_attempts:
| Number of times to retry before reinitializing when :class:`~.TimeoutError`
- or :class:`~.ConnectionError` are encountered
+ or :class:`~.ConnectionError` are encountered.
+ The default backoff strategy will be set if Retry object is not passed (see
+ default_backoff in backoff.py). To change it, pass a custom Retry object
+ using the "retry" keyword.
:param max_connections:
| Maximum number of connections per node. If there are no free connections & the
maximum number of connections are already created, a
@@ -214,9 +219,9 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
startup_nodes: Optional[List["ClusterNode"]] = None,
require_full_coverage: bool = True,
read_from_replicas: bool = False,
- reinitialize_steps: int = 10,
+ reinitialize_steps: int = 5,
cluster_error_retry_attempts: int = 3,
- connection_error_retry_attempts: int = 5,
+ connection_error_retry_attempts: int = 3,
max_connections: int = 2**31,
# Client related kwargs
db: Union[str, int] = 0,
@@ -235,6 +240,8 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
socket_keepalive: bool = False,
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
socket_timeout: Optional[float] = None,
+ retry: Optional["Retry"] = None,
+ retry_on_error: Optional[List[Exception]] = None,
# SSL related kwargs
ssl: bool = False,
ssl_ca_certs: Optional[str] = None,
@@ -282,6 +289,7 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
"socket_keepalive": socket_keepalive,
"socket_keepalive_options": socket_keepalive_options,
"socket_timeout": socket_timeout,
+ "retry": retry,
}
if ssl:
@@ -302,6 +310,18 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
# Call our on_connect function to configure READONLY mode
kwargs["redis_connect_func"] = self.on_connect
+ self.retry = retry
+ if retry or retry_on_error or connection_error_retry_attempts > 0:
+ # Set a retry object for all cluster nodes
+ self.retry = retry or Retry(
+ default_backoff(), connection_error_retry_attempts
+ )
+ if not retry_on_error:
+ # Default errors for retrying
+ retry_on_error = [ConnectionError, TimeoutError]
+ self.retry.update_supported_errors(retry_on_error)
+ kwargs.update({"retry": self.retry})
+
kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy()
self.connection_kwargs = kwargs
@@ -323,7 +343,6 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
self.reinitialize_steps = reinitialize_steps
self.cluster_error_retry_attempts = cluster_error_retry_attempts
self.connection_error_retry_attempts = connection_error_retry_attempts
-
self.reinitialize_counter = 0
self.commands_parser = CommandsParser()
self.node_flags = self.__class__.NODE_FLAGS.copy()
@@ -481,6 +500,16 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
"""Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
return self.connection_kwargs
+ def get_retry(self) -> Optional["Retry"]:
+ return self.retry
+
+ def set_retry(self, retry: "Retry") -> None:
+ self.retry = retry
+ for node in self.get_nodes():
+ node.connection_kwargs.update({"retry": retry})
+ for conn in node._connections:
+ conn.retry = retry
+
def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
"""Set a custom response callback."""
self.response_callbacks[command] = callback
@@ -618,9 +647,11 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
if passed_targets and not self._is_node_flag(passed_targets):
target_nodes = self._parse_target_nodes(passed_targets)
target_nodes_specified = True
- retry_attempts = 1
+ retry_attempts = 0
- for _ in range(retry_attempts):
+ # Add one for the first execution
+ execute_attempts = 1 + retry_attempts
+ for _ in range(execute_attempts):
if self._initialize:
await self.initialize()
try:
@@ -658,17 +689,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
)
return dict(zip(keys, values))
except Exception as e:
- if type(e) in self.__class__.ERRORS_ALLOW_RETRY:
- # The nodes and slots cache were reinitialized.
+ if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
+ # The nodes and slots cache were should be reinitialized.
# Try again with the new cluster setup.
- exception = e
+ retry_attempts -= 1
+ continue
else:
- # All other errors should be raised.
- raise
-
- # If it fails the configured number of times then raise exception back
- # to caller of this method
- raise exception
+ # raise the exception
+ raise e
async def _execute_command(
self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any
@@ -676,7 +704,6 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
asking = moved = False
redirect_addr = None
ttl = self.RedisClusterRequestTTL
- connection_error_retry_counter = 0
while ttl > 0:
ttl -= 1
@@ -695,25 +722,18 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
moved = False
return await target_node.execute_command(*args, **kwargs)
- except BusyLoadingError:
+ except (BusyLoadingError, MaxConnectionsError):
+ raise
+ except (ConnectionError, TimeoutError):
+ # Connection retries are being handled in the node's
+ # Retry object.
+ # Remove the failed node from the startup nodes before we try
+ # to reinitialize the cluster
+ self.nodes_manager.startup_nodes.pop(target_node.name, None)
+ # Hard force of reinitialize of the node/slots setup
+ # and try again with the new setup
+ await self.close()
raise
- except (ConnectionError, TimeoutError) as e:
- # Give the node 0.25 seconds to get back up and retry again with the
- # same node and configuration. After the defined number of attempts, try
- # to reinitialize the cluster and try again.
- connection_error_retry_counter += 1
- if (
- connection_error_retry_counter
- < self.connection_error_retry_attempts
- ):
- await asyncio.sleep(0.25)
- else:
- if isinstance(e, MaxConnectionsError):
- raise
- # Hard force of reinitialize of the node/slots setup
- # and try again with the new setup
- await self.close()
- raise
except ClusterDownError:
# ClusterDownError can occur during a failover and to get
# self-healed, we will try to reinitialize the cluster layout
@@ -1145,26 +1165,11 @@ class NodesManager:
)
cluster_slots = await startup_node.execute_command("CLUSTER SLOTS")
startup_nodes_reachable = True
- except (ConnectionError, TimeoutError) as e:
+ except Exception as e:
+ # Try the next startup node.
+ # The exception is saved and raised only if we have no more nodes.
exception = e
continue
- except ResponseError as e:
- # Isn't a cluster connection, so it won't parse these
- # exceptions automatically
- message = e.__str__()
- if "CLUSTERDOWN" in message or "MASTERDOWN" in message:
- continue
- else:
- raise RedisClusterException(
- 'ERROR sending "cluster slots" command to redis '
- f"server: {startup_node}. error: {message}"
- )
- except Exception as e:
- message = e.__str__()
- raise RedisClusterException(
- 'ERROR sending "cluster slots" command to redis '
- f"server {startup_node.name}. error: {message}"
- )
# CLUSTER SLOTS command results in the following output:
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
@@ -1245,8 +1250,8 @@ class NodesManager:
if not startup_nodes_reachable:
raise RedisClusterException(
- "Redis Cluster cannot be connected. Please provide at least "
- "one reachable node. "
+ f"Redis Cluster cannot be connected. Please provide at least "
+ f"one reachable node: {str(exception)}"
) from exception
# Check if the slots are not fully covered