diff options
author | Bar Shaul <88437685+barshaul@users.noreply.github.com> | 2022-12-01 13:16:26 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-01 13:16:26 +0200 |
commit | 2c121552faf0d39267969b62ce0c3276391b37cc (patch) | |
tree | 5f65a13f146bd9a04c363ae7fe637537f7e48104 /redis/cluster.py | |
parent | f4d07dddba55a73df6b015b363d2ea7c96716ae5 (diff) | |
download | redis-py-2c121552faf0d39267969b62ce0c3276391b37cc.tar.gz |
Added a replacement for the default cluster node in the event of failure. (#2463)
Diffstat (limited to 'redis/cluster.py')
-rw-r--r-- | redis/cluster.py | 45 |
1 files changed, 43 insertions, 2 deletions
diff --git a/redis/cluster.py b/redis/cluster.py index 91deaea..0b2c4f1 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -379,6 +379,30 @@ class AbstractRedisCluster: ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError) + def replace_default_node(self, target_node: "ClusterNode" = None) -> None: + """Replace the default cluster node. + A random cluster node will be chosen if target_node isn't passed, and primaries + will be prioritized. The default node will not be changed if there are no other + nodes in the cluster. + + Args: + target_node (ClusterNode, optional): Target node to replace the default + node. Defaults to None. + """ + if target_node: + self.nodes_manager.default_node = target_node + else: + curr_node = self.get_default_node() + primaries = [node for node in self.get_primaries() if node != curr_node] + if primaries: + # Choose a primary if the cluster contains different primaries + self.nodes_manager.default_node = random.choice(primaries) + else: + # Otherwise, hoose a primary if the cluster contains different primaries + replicas = [node for node in self.get_replicas() if node != curr_node] + if replicas: + self.nodes_manager.default_node = random.choice(replicas) + class RedisCluster(AbstractRedisCluster, RedisClusterCommands): @classmethod @@ -811,7 +835,9 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): """Set a custom Response Callback""" self.cluster_response_callbacks[command] = callback - def _determine_nodes(self, *args, **kwargs): + def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: + # Determine which nodes should be executed the command on. + # Returns a list of target nodes. command = args[0].upper() if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: command = f"{args[0]} {args[1]}".upper() @@ -990,6 +1016,7 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): dict<Any, ClusterNode> """ target_nodes_specified = False + is_default_node = False target_nodes = None passed_targets = kwargs.pop("target_nodes", None) if passed_targets is not None and not self._is_nodes_flag(passed_targets): @@ -1020,12 +1047,20 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands): raise RedisClusterException( f"No targets were found to execute {args} command on" ) + if ( + len(target_nodes) == 1 + and target_nodes[0] == self.get_default_node() + ): + is_default_node = True for node in target_nodes: res[node.name] = self._execute_command(node, *args, **kwargs) # Return the processed result return self._process_result(args[0], res, **kwargs) except Exception as e: if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: + if is_default_node: + # Replace the default cluster node + self.replace_default_node() # The nodes and slots cache were reinitialized. # Try again with the new cluster setup. retry_attempts -= 1 @@ -1883,7 +1918,7 @@ class ClusterPipeline(RedisCluster): # if we have to run through it again, we only retry # the commands that failed. attempt = sorted(stack, key=lambda x: x.position) - + is_default_node = False # build a list of node objects based on node names we need to nodes = {} @@ -1913,6 +1948,8 @@ class ClusterPipeline(RedisCluster): ) node = target_nodes[0] + if node == self.get_default_node(): + is_default_node = True # now that we know the name of the node # ( it's just a string in the form of host:port ) @@ -1926,6 +1963,8 @@ class ClusterPipeline(RedisCluster): # Connection retries are being handled in the node's # Retry object. Reinitialize the node -> slot table. self.nodes_manager.initialize() + if is_default_node: + self.replace_default_node() raise nodes[node_name] = NodeCommands( redis_node.parse_response, @@ -2007,6 +2046,8 @@ class ClusterPipeline(RedisCluster): self.reinitialize_counter += 1 if self._should_reinitialized(): self.nodes_manager.initialize() + if is_default_node: + self.replace_default_node() for c in attempt: try: # send each command individually like we |