summaryrefslogtreecommitdiff
path: root/redis/cluster.py
diff options
context:
space:
mode:
authorBar Shaul <88437685+barshaul@users.noreply.github.com>2022-11-10 13:16:49 +0200
committerGitHub <noreply@github.com>2022-11-10 13:16:49 +0200
commit67214cc3eaa7890c87e45550b8320779f954094b (patch)
tree3bca8b8913224255bdf72de79265ca0441cecb1c /redis/cluster.py
parentbb06ccd52924800ac501d17c8a42038c8e5c5770 (diff)
downloadredis-py-67214cc3eaa7890c87e45550b8320779f954094b.tar.gz
Failover handling improvements for RedisCluster and Async RedisCluster (#2377)
* Cluster&AsyncCluster: Removed handling of timeouts/connection errors within the cluster loop, fixed "cannot pickle '_thread.lock' object" bug, added client's side failover handling improvements * Fixed linters * Type fixes * Added to CHANGES * Added getter and setter for the client's retry object and added more tests * Fixed linters * Fixed test * Fixed test_client_kill test * Changed get_default_backoff to default_backoff, removed retry_on_error and connection_error_retry_attempts from RedisCluster, default retry changed to no retries * Fixing linters * Reverting deletion of connection_error_retry_attempts to maintain backward compatibility * Updating retry object for existing and new connections * Changed the default value of reinitialize_steps from 10 to 5 * fix review comments Co-authored-by: Chayim <chayim@users.noreply.github.com> Co-authored-by: dvora-h <dvora.heller@redis.com> Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
Diffstat (limited to 'redis/cluster.py')
-rw-r--r--redis/cluster.py187
1 files changed, 88 insertions, 99 deletions
diff --git a/redis/cluster.py b/redis/cluster.py
index 027fe40..91deaea 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -1,12 +1,12 @@
-import copy
import random
import socket
import sys
import threading
import time
from collections import OrderedDict
-from typing import Any, Callable, Dict, Tuple, Union
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+from redis.backoff import default_backoff
from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan
from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands
from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url
@@ -29,6 +29,7 @@ from redis.exceptions import (
TryAgainError,
)
from redis.lock import Lock
+from redis.retry import Retry
from redis.utils import (
dict_merge,
list_keys_to_dict,
@@ -426,27 +427,28 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
def __init__(
self,
- host=None,
- port=6379,
- startup_nodes=None,
- cluster_error_retry_attempts=3,
- require_full_coverage=False,
- reinitialize_steps=10,
- read_from_replicas=False,
- dynamic_startup_nodes=True,
- url=None,
+ host: Optional[str] = None,
+ port: int = 6379,
+ startup_nodes: Optional[List["ClusterNode"]] = None,
+ cluster_error_retry_attempts: int = 3,
+ retry: Optional["Retry"] = None,
+ require_full_coverage: bool = False,
+ reinitialize_steps: int = 5,
+ read_from_replicas: bool = False,
+ dynamic_startup_nodes: bool = True,
+ url: Optional[str] = None,
**kwargs,
):
"""
Initialize a new RedisCluster client.
- :startup_nodes: 'list[ClusterNode]'
+ :param startup_nodes:
List of nodes from which initial bootstrapping can be done
- :host: 'str'
+ :param host:
Can be used to point to a startup node
- :port: 'int'
+ :param port:
Can be used to point to a startup node
- :require_full_coverage: 'bool'
+ :param require_full_coverage:
When set to False (default value): the client will not require a
full coverage of the slots. However, if not all slots are covered,
and at least one node has 'cluster-require-full-coverage' set to
@@ -456,12 +458,12 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
When set to True: all slots must be covered to construct the
cluster client. If not all slots are covered, RedisClusterException
will be thrown.
- :read_from_replicas: 'bool'
+ :param read_from_replicas:
Enable read from replicas in READONLY mode. You can read possibly
stale data.
When set to true, read commands will be assigned between the
primary and its replications in a Round-Robin manner.
- :dynamic_startup_nodes: 'bool'
+ :param dynamic_startup_nodes:
Set the RedisCluster's startup nodes to all of the discovered nodes.
If true (default value), the cluster's discovered nodes will be used to
determine the cluster nodes-slots mapping in the next topology refresh.
@@ -469,10 +471,11 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
listed in the CLUSTER SLOTS output.
If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
specific IP addresses, it is best to set it to false.
- :cluster_error_retry_attempts: 'int'
- Retry command execution attempts when encountering ClusterDownError
- or ConnectionError
- :reinitialize_steps: 'int'
+ :param cluster_error_retry_attempts:
+ Number of times to retry before raising an error when
+ :class:`~.TimeoutError` or :class:`~.ConnectionError` or
+ :class:`~.ClusterDownError` are encountered
+ :param reinitialize_steps:
Specifies the number of MOVED errors that need to occur before
reinitializing the whole cluster topology. If a MOVED error occurs
and the cluster does not need to be reinitialized on this current
@@ -540,6 +543,11 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
kwargs.update({"redis_connect_func": self.on_connect})
kwargs = cleanup_kwargs(**kwargs)
+ if retry:
+ self.retry = retry
+ kwargs.update({"retry": self.retry})
+ else:
+ kwargs.update({"retry": Retry(default_backoff(), 0)})
self.encoder = Encoder(
kwargs.get("encoding", "utf-8"),
@@ -666,6 +674,14 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
self.nodes_manager.default_node = node
return True
+ def get_retry(self) -> Optional["Retry"]:
+ return self.retry
+
+ def set_retry(self, retry: "Retry") -> None:
+ self.retry = retry
+ for node in self.get_nodes():
+ node.redis_connection.set_retry(retry)
+
def monitor(self, target_node=None):
"""
Returns a Monitor object for the specified target node.
@@ -986,12 +1002,13 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
# nodes were passed to this function, we cannot retry the command
# execution since the nodes may not be valid anymore after the tables
# were reinitialized. So in case of passed target nodes,
- # retry_attempts will be set to 1.
+ # retry_attempts will be set to 0.
retry_attempts = (
- 1 if target_nodes_specified else self.cluster_error_retry_attempts
+ 0 if target_nodes_specified else self.cluster_error_retry_attempts
)
- exception = None
- for _ in range(0, retry_attempts):
+ # Add one for the first execution
+ execute_attempts = 1 + retry_attempts
+ for _ in range(execute_attempts):
try:
res = {}
if not target_nodes_specified:
@@ -1008,18 +1025,15 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
# Return the processed result
return self._process_result(args[0], res, **kwargs)
except Exception as e:
- if type(e) in self.__class__.ERRORS_ALLOW_RETRY:
+ if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
# The nodes and slots cache were reinitialized.
# Try again with the new cluster setup.
- exception = e
+ retry_attempts -= 1
+ continue
else:
- # All other errors should be raised.
+ # raise the exception
raise e
- # If it fails the configured number of times then raise exception back
- # to caller of this method
- raise exception
-
def _execute_command(self, target_node, *args, **kwargs):
"""
Send a command to a node in the cluster
@@ -1031,7 +1045,6 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
asking = False
moved = False
ttl = int(self.RedisClusterRequestTTL)
- connection_error_retry_counter = 0
while ttl > 0:
ttl -= 1
@@ -1064,25 +1077,21 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
except AuthenticationError:
raise
except (ConnectionError, TimeoutError) as e:
+ # Connection retries are being handled in the node's
+ # Retry object.
# ConnectionError can also be raised if we couldn't get a
# connection from the pool before timing out, so check that
# this is an actual connection before attempting to disconnect.
if connection is not None:
connection.disconnect()
- connection_error_retry_counter += 1
-
- # Give the node 0.25 seconds to get back up and retry again
- # with same node and configuration. After 5 attempts then try
- # to reinitialize the cluster and see if the nodes
- # configuration has changed or not
- if connection_error_retry_counter < 5:
- time.sleep(0.25)
- else:
- # Hard force of reinitialize of the node/slots setup
- # and try again with the new setup
- target_node.redis_connection = None
- self.nodes_manager.initialize()
- raise e
+
+ # Remove the failed node from the startup nodes before we try
+ # to reinitialize the cluster
+ self.nodes_manager.startup_nodes.pop(target_node.name, None)
+ # Reset the cluster node's connection
+ target_node.redis_connection = None
+ self.nodes_manager.initialize()
+ raise e
except MovedError as e:
# First, we will try to patch the slots/nodes cache with the
# redirected node output and try again. If MovedError exceeds
@@ -1406,17 +1415,15 @@ class NodesManager:
startup_nodes_reachable = False
fully_covered = False
kwargs = self.connection_kwargs
+ exception = None
for startup_node in self.startup_nodes.values():
try:
if startup_node.redis_connection:
r = startup_node.redis_connection
else:
- # Create a new Redis connection and let Redis decode the
- # responses so we won't need to handle that
- copy_kwargs = copy.deepcopy(kwargs)
- copy_kwargs.update({"decode_responses": True, "encoding": "utf-8"})
+ # Create a new Redis connection
r = self.create_redis_node(
- startup_node.host, startup_node.port, **copy_kwargs
+ startup_node.host, startup_node.port, **kwargs
)
self.startup_nodes[startup_node.name].redis_connection = r
# Make sure cluster mode is enabled on this node
@@ -1426,25 +1433,11 @@ class NodesManager:
)
cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
startup_nodes_reachable = True
- except (ConnectionError, TimeoutError):
- continue
- except ResponseError as e:
- # Isn't a cluster connection, so it won't parse these
- # exceptions automatically
- message = e.__str__()
- if "CLUSTERDOWN" in message or "MASTERDOWN" in message:
- continue
- else:
- raise RedisClusterException(
- 'ERROR sending "cluster slots" command to redis '
- f"server: {startup_node}. error: {message}"
- )
except Exception as e:
- message = e.__str__()
- raise RedisClusterException(
- 'ERROR sending "cluster slots" command to redis '
- f"server {startup_node.name}. error: {message}"
- )
+ # Try the next startup node.
+ # The exception is saved and raised only if we have no more nodes.
+ exception = e
+ continue
# CLUSTER SLOTS command results in the following output:
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
@@ -1514,9 +1507,9 @@ class NodesManager:
if not startup_nodes_reachable:
raise RedisClusterException(
- "Redis Cluster cannot be connected. Please provide at least "
- "one reachable node. "
- )
+ f"Redis Cluster cannot be connected. Please provide at least "
+ f"one reachable node: {str(exception)}"
+ ) from exception
# Create Redis connections to all nodes
self.create_redis_connections(list(tmp_nodes_cache.values()))
@@ -1699,14 +1692,14 @@ class ClusterPipeline(RedisCluster):
def __init__(
self,
- nodes_manager,
- commands_parser,
- result_callbacks=None,
- cluster_response_callbacks=None,
- startup_nodes=None,
- read_from_replicas=False,
- cluster_error_retry_attempts=5,
- reinitialize_steps=10,
+ nodes_manager: "NodesManager",
+ commands_parser: "CommandsParser",
+ result_callbacks: Optional[Dict[str, Callable]] = None,
+ cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
+ startup_nodes: Optional[List["ClusterNode"]] = None,
+ read_from_replicas: bool = False,
+ cluster_error_retry_attempts: int = 3,
+ reinitialize_steps: int = 5,
lock=None,
**kwargs,
):
@@ -1858,22 +1851,22 @@ class ClusterPipeline(RedisCluster):
"""
if not stack:
return []
-
- for _ in range(0, self.cluster_error_retry_attempts):
+ retry_attempts = self.cluster_error_retry_attempts
+ while True:
try:
return self._send_cluster_commands(
stack,
raise_on_error=raise_on_error,
allow_redirections=allow_redirections,
)
- except ClusterDownError:
- # Try again with the new cluster setup. All other errors
- # should be raised.
- pass
-
- # If it fails the configured number of times then raise
- # exception back to caller of this method
- raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster")
+ except (ClusterDownError, ConnectionError) as e:
+ if retry_attempts > 0:
+ # Try again with the new cluster setup. All other errors
+ # should be raised.
+ retry_attempts -= 1
+ pass
+ else:
+ raise e
def _send_cluster_commands(
self, stack, raise_on_error=True, allow_redirections=True
@@ -1898,7 +1891,6 @@ class ClusterPipeline(RedisCluster):
# we figure out the slot number that command maps to, then from
# the slot determine the node.
for c in attempt:
- connection_error_retry_counter = 0
while True:
# refer to our internal node -> slot table that
# tells us where a given command should route to.
@@ -1931,13 +1923,10 @@ class ClusterPipeline(RedisCluster):
try:
connection = get_connection(redis_node, c.args)
except ConnectionError:
- connection_error_retry_counter += 1
- if connection_error_retry_counter < 5:
- # reinitialize the node -> slot table
- self.nodes_manager.initialize()
- continue
- else:
- raise
+ # Connection retries are being handled in the node's
+ # Retry object. Reinitialize the node -> slot table.
+ self.nodes_manager.initialize()
+ raise
nodes[node_name] = NodeCommands(
redis_node.parse_response,
redis_node.connection_pool,