summaryrefslogtreecommitdiff
path: root/redis/cluster.py
diff options
context:
space:
mode:
authorGreg Melton <gmelton@gmail.com>2022-08-02 01:08:40 -0700
committerGitHub <noreply@github.com>2022-08-02 11:08:40 +0300
commitfd9fea6bc07bf0970a5a42c5ec1788272446910c (patch)
tree93d2abbbd235e932fd67bdc325b62519e37bbd85 /redis/cluster.py
parentda9d903249d6a5ed1ef8bf66d9e9408a1ab7cd31 (diff)
downloadredis-py-fd9fea6bc07bf0970a5a42c5ec1788272446910c.tar.gz
ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
* adds a retry that forces the nodes_manager to reinitialize if cluster pipleline connect to the one of the mapped nodes * fix line length error * add trailing comma * move appending cmd * updates changes * trigger build * fix linting errors
Diffstat (limited to 'redis/cluster.py')
-rw-r--r--redis/cluster.py69
1 files changed, 44 insertions, 25 deletions
diff --git a/redis/cluster.py b/redis/cluster.py
index f5844fd..cee578b 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -1897,34 +1897,53 @@ class ClusterPipeline(RedisCluster):
# we figure out the slot number that command maps to, then from
# the slot determine the node.
for c in attempt:
- # refer to our internal node -> slot table that
- # tells us where a given
- # command should route to.
- passed_targets = c.options.pop("target_nodes", None)
- if passed_targets and not self._is_nodes_flag(passed_targets):
- target_nodes = self._parse_target_nodes(passed_targets)
- else:
- target_nodes = self._determine_nodes(*c.args, node_flag=passed_targets)
- if not target_nodes:
+ connection_error_retry_counter = 0
+ while True:
+ # refer to our internal node -> slot table that
+ # tells us where a given command should route to.
+ # (it might be possible we have a cached node that no longer
+ # exists in the cluster, which is why we do this in a loop)
+ passed_targets = c.options.pop("target_nodes", None)
+ if passed_targets and not self._is_nodes_flag(passed_targets):
+ target_nodes = self._parse_target_nodes(passed_targets)
+ else:
+ target_nodes = self._determine_nodes(
+ *c.args, node_flag=passed_targets
+ )
+ if not target_nodes:
+ raise RedisClusterException(
+ f"No targets were found to execute {c.args} command on"
+ )
+ if len(target_nodes) > 1:
raise RedisClusterException(
- f"No targets were found to execute {c.args} command on"
+ f"Too many targets for command {c.args}"
)
- if len(target_nodes) > 1:
- raise RedisClusterException(f"Too many targets for command {c.args}")
-
- node = target_nodes[0]
- # now that we know the name of the node
- # ( it's just a string in the form of host:port )
- # we can build a list of commands for each node.
- node_name = node.name
- if node_name not in nodes:
- redis_node = self.get_redis_connection(node)
- connection = get_connection(redis_node, c.args)
- nodes[node_name] = NodeCommands(
- redis_node.parse_response, redis_node.connection_pool, connection
- )
- nodes[node_name].append(c)
+ node = target_nodes[0]
+
+ # now that we know the name of the node
+ # ( it's just a string in the form of host:port )
+ # we can build a list of commands for each node.
+ node_name = node.name
+ if node_name not in nodes:
+ redis_node = self.get_redis_connection(node)
+ try:
+ connection = get_connection(redis_node, c.args)
+ except ConnectionError:
+ connection_error_retry_counter += 1
+ if connection_error_retry_counter < 5:
+ # reinitialize the node -> slot table
+ self.nodes_manager.initialize()
+ continue
+ else:
+ raise
+ nodes[node_name] = NodeCommands(
+ redis_node.parse_response,
+ redis_node.connection_pool,
+ connection,
+ )
+ nodes[node_name].append(c)
+ break
# send the commands in sequence.
# we write to all the open sockets for each node first,