summaryrefslogtreecommitdiff
path: root/redis/cluster.py
diff options
context:
space:
mode:
authorBar Shaul <88437685+barshaul@users.noreply.github.com>2022-12-01 13:16:26 +0200
committerGitHub <noreply@github.com>2022-12-01 13:16:26 +0200
commit2c121552faf0d39267969b62ce0c3276391b37cc (patch)
tree5f65a13f146bd9a04c363ae7fe637537f7e48104 /redis/cluster.py
parentf4d07dddba55a73df6b015b363d2ea7c96716ae5 (diff)
downloadredis-py-2c121552faf0d39267969b62ce0c3276391b37cc.tar.gz
Added a replacement for the default cluster node in the event of failure. (#2463)
Diffstat (limited to 'redis/cluster.py')
-rw-r--r--redis/cluster.py45
1 files changed, 43 insertions, 2 deletions
diff --git a/redis/cluster.py b/redis/cluster.py
index 91deaea..0b2c4f1 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -379,6 +379,30 @@ class AbstractRedisCluster:
ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError)
+ def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
+ """Replace the default cluster node.
+ A random cluster node will be chosen if target_node isn't passed, and primaries
+ will be prioritized. The default node will not be changed if there are no other
+ nodes in the cluster.
+
+ Args:
+ target_node (ClusterNode, optional): Target node to replace the default
+ node. Defaults to None.
+ """
+ if target_node:
+ self.nodes_manager.default_node = target_node
+ else:
+ curr_node = self.get_default_node()
+ primaries = [node for node in self.get_primaries() if node != curr_node]
+ if primaries:
+ # Choose a primary if the cluster contains different primaries
+ self.nodes_manager.default_node = random.choice(primaries)
+ else:
+ # Otherwise, hoose a primary if the cluster contains different primaries
+ replicas = [node for node in self.get_replicas() if node != curr_node]
+ if replicas:
+ self.nodes_manager.default_node = random.choice(replicas)
+
class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
@classmethod
@@ -811,7 +835,9 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
"""Set a custom Response Callback"""
self.cluster_response_callbacks[command] = callback
- def _determine_nodes(self, *args, **kwargs):
+ def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
+ # Determine which nodes should be executed the command on.
+ # Returns a list of target nodes.
command = args[0].upper()
if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
command = f"{args[0]} {args[1]}".upper()
@@ -990,6 +1016,7 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
dict<Any, ClusterNode>
"""
target_nodes_specified = False
+ is_default_node = False
target_nodes = None
passed_targets = kwargs.pop("target_nodes", None)
if passed_targets is not None and not self._is_nodes_flag(passed_targets):
@@ -1020,12 +1047,20 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
raise RedisClusterException(
f"No targets were found to execute {args} command on"
)
+ if (
+ len(target_nodes) == 1
+ and target_nodes[0] == self.get_default_node()
+ ):
+ is_default_node = True
for node in target_nodes:
res[node.name] = self._execute_command(node, *args, **kwargs)
# Return the processed result
return self._process_result(args[0], res, **kwargs)
except Exception as e:
if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
+ if is_default_node:
+ # Replace the default cluster node
+ self.replace_default_node()
# The nodes and slots cache were reinitialized.
# Try again with the new cluster setup.
retry_attempts -= 1
@@ -1883,7 +1918,7 @@ class ClusterPipeline(RedisCluster):
# if we have to run through it again, we only retry
# the commands that failed.
attempt = sorted(stack, key=lambda x: x.position)
-
+ is_default_node = False
# build a list of node objects based on node names we need to
nodes = {}
@@ -1913,6 +1948,8 @@ class ClusterPipeline(RedisCluster):
)
node = target_nodes[0]
+ if node == self.get_default_node():
+ is_default_node = True
# now that we know the name of the node
# ( it's just a string in the form of host:port )
@@ -1926,6 +1963,8 @@ class ClusterPipeline(RedisCluster):
# Connection retries are being handled in the node's
# Retry object. Reinitialize the node -> slot table.
self.nodes_manager.initialize()
+ if is_default_node:
+ self.replace_default_node()
raise
nodes[node_name] = NodeCommands(
redis_node.parse_response,
@@ -2007,6 +2046,8 @@ class ClusterPipeline(RedisCluster):
self.reinitialize_counter += 1
if self._should_reinitialized():
self.nodes_manager.initialize()
+ if is_default_node:
+ self.replace_default_node()
for c in attempt:
try:
# send each command individually like we