summaryrefslogtreecommitdiff
path: root/redis/asyncio/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/asyncio/cluster.py')
-rw-r--r--redis/asyncio/cluster.py22
1 files changed, 21 insertions, 1 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index d5a38b2..e0e77c7 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -517,6 +517,8 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
async def _determine_nodes(
self, command: str, *args: Any, node_flag: Optional[str] = None
) -> List["ClusterNode"]:
+ # Determine which nodes should be executed the command on.
+ # Returns a list of target nodes.
if not node_flag:
# get the nodes group for this command if it was predefined
node_flag = self.command_flags.get(command)
@@ -654,6 +656,12 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
for _ in range(execute_attempts):
if self._initialize:
await self.initialize()
+ if (
+ len(target_nodes) == 1
+ and target_nodes[0] == self.get_default_node()
+ ):
+ # Replace the default cluster node
+ self.replace_default_node()
try:
if not target_nodes_specified:
# Determine the nodes to execute the command on
@@ -1450,7 +1458,6 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm
)
if len(target_nodes) > 1:
raise RedisClusterException(f"Too many targets for command {cmd.args}")
-
node = target_nodes[0]
if node.name not in nodes:
nodes[node.name] = (node, [])
@@ -1487,6 +1494,19 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm
result.args = (msg,) + result.args[1:]
raise result
+ default_node = nodes.get(client.get_default_node().name)
+ if default_node is not None:
+ # This pipeline execution used the default node, check if we need
+ # to replace it.
+ # Note: when the error is raised we'll reset the default node in the
+ # caller function.
+ for cmd in default_node[1]:
+ # Check if it has a command that failed with a relevant
+ # exception
+ if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY:
+ client.replace_default_node()
+ break
+
return [cmd.result for cmd in stack]
def _split_command_across_slots(