diff options
Diffstat (limited to 'redis/asyncio/cluster.py')
-rw-r--r-- | redis/asyncio/cluster.py | 22 |
1 files changed, 21 insertions, 1 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index d5a38b2..e0e77c7 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -517,6 +517,8 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand async def _determine_nodes( self, command: str, *args: Any, node_flag: Optional[str] = None ) -> List["ClusterNode"]: + # Determine which nodes should be executed the command on. + # Returns a list of target nodes. if not node_flag: # get the nodes group for this command if it was predefined node_flag = self.command_flags.get(command) @@ -654,6 +656,12 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand for _ in range(execute_attempts): if self._initialize: await self.initialize() + if ( + len(target_nodes) == 1 + and target_nodes[0] == self.get_default_node() + ): + # Replace the default cluster node + self.replace_default_node() try: if not target_nodes_specified: # Determine the nodes to execute the command on @@ -1450,7 +1458,6 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm ) if len(target_nodes) > 1: raise RedisClusterException(f"Too many targets for command {cmd.args}") - node = target_nodes[0] if node.name not in nodes: nodes[node.name] = (node, []) @@ -1487,6 +1494,19 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm result.args = (msg,) + result.args[1:] raise result + default_node = nodes.get(client.get_default_node().name) + if default_node is not None: + # This pipeline execution used the default node, check if we need + # to replace it. + # Note: when the error is raised we'll reset the default node in the + # caller function. + for cmd in default_node[1]: + # Check if it has a command that failed with a relevant + # exception + if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY: + client.replace_default_node() + break + return [cmd.result for cmd in stack] def _split_command_across_slots( |