summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES2
-rw-r--r--redis/cluster.py69
2 files changed, 46 insertions, 25 deletions
diff --git a/CHANGES b/CHANGES
index 2abf627..d2033ca 100644
--- a/CHANGES
+++ b/CHANGES
@@ -16,6 +16,8 @@
* Added dynaminc_startup_nodes configuration to RedisCluster
* Fix reusing the old nodes' connections when cluster topology refresh is being done
* Fix RedisCluster to immediately raise AuthenticationError without a retry
+ * ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
+
* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
* Add redis5 and redis4 dockers (#1871)
diff --git a/redis/cluster.py b/redis/cluster.py
index f5844fd..cee578b 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -1897,34 +1897,53 @@ class ClusterPipeline(RedisCluster):
# we figure out the slot number that command maps to, then from
# the slot determine the node.
for c in attempt:
- # refer to our internal node -> slot table that
- # tells us where a given
- # command should route to.
- passed_targets = c.options.pop("target_nodes", None)
- if passed_targets and not self._is_nodes_flag(passed_targets):
- target_nodes = self._parse_target_nodes(passed_targets)
- else:
- target_nodes = self._determine_nodes(*c.args, node_flag=passed_targets)
- if not target_nodes:
+ connection_error_retry_counter = 0
+ while True:
+ # refer to our internal node -> slot table that
+ # tells us where a given command should route to.
+ # (it might be possible we have a cached node that no longer
+ # exists in the cluster, which is why we do this in a loop)
+ passed_targets = c.options.pop("target_nodes", None)
+ if passed_targets and not self._is_nodes_flag(passed_targets):
+ target_nodes = self._parse_target_nodes(passed_targets)
+ else:
+ target_nodes = self._determine_nodes(
+ *c.args, node_flag=passed_targets
+ )
+ if not target_nodes:
+ raise RedisClusterException(
+ f"No targets were found to execute {c.args} command on"
+ )
+ if len(target_nodes) > 1:
raise RedisClusterException(
- f"No targets were found to execute {c.args} command on"
+ f"Too many targets for command {c.args}"
)
- if len(target_nodes) > 1:
- raise RedisClusterException(f"Too many targets for command {c.args}")
-
- node = target_nodes[0]
- # now that we know the name of the node
- # ( it's just a string in the form of host:port )
- # we can build a list of commands for each node.
- node_name = node.name
- if node_name not in nodes:
- redis_node = self.get_redis_connection(node)
- connection = get_connection(redis_node, c.args)
- nodes[node_name] = NodeCommands(
- redis_node.parse_response, redis_node.connection_pool, connection
- )
- nodes[node_name].append(c)
+ node = target_nodes[0]
+
+ # now that we know the name of the node
+ # ( it's just a string in the form of host:port )
+ # we can build a list of commands for each node.
+ node_name = node.name
+ if node_name not in nodes:
+ redis_node = self.get_redis_connection(node)
+ try:
+ connection = get_connection(redis_node, c.args)
+ except ConnectionError:
+ connection_error_retry_counter += 1
+ if connection_error_retry_counter < 5:
+ # reinitialize the node -> slot table
+ self.nodes_manager.initialize()
+ continue
+ else:
+ raise
+ nodes[node_name] = NodeCommands(
+ redis_node.parse_response,
+ redis_node.connection_pool,
+ connection,
+ )
+ nodes[node_name].append(c)
+ break
# send the commands in sequence.
# we write to all the open sockets for each node first,