summaryrefslogtreecommitdiff
path: root/redis/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/cluster.py')
-rw-r--r--redis/cluster.py69
1 files changed, 44 insertions, 25 deletions
diff --git a/redis/cluster.py b/redis/cluster.py
index f5844fd..cee578b 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -1897,34 +1897,53 @@ class ClusterPipeline(RedisCluster):
# we figure out the slot number that command maps to, then from
# the slot determine the node.
for c in attempt:
- # refer to our internal node -> slot table that
- # tells us where a given
- # command should route to.
- passed_targets = c.options.pop("target_nodes", None)
- if passed_targets and not self._is_nodes_flag(passed_targets):
- target_nodes = self._parse_target_nodes(passed_targets)
- else:
- target_nodes = self._determine_nodes(*c.args, node_flag=passed_targets)
- if not target_nodes:
+ connection_error_retry_counter = 0
+ while True:
+ # refer to our internal node -> slot table that
+ # tells us where a given command should route to.
+ # (it might be possible we have a cached node that no longer
+ # exists in the cluster, which is why we do this in a loop)
+ passed_targets = c.options.pop("target_nodes", None)
+ if passed_targets and not self._is_nodes_flag(passed_targets):
+ target_nodes = self._parse_target_nodes(passed_targets)
+ else:
+ target_nodes = self._determine_nodes(
+ *c.args, node_flag=passed_targets
+ )
+ if not target_nodes:
+ raise RedisClusterException(
+ f"No targets were found to execute {c.args} command on"
+ )
+ if len(target_nodes) > 1:
raise RedisClusterException(
- f"No targets were found to execute {c.args} command on"
+ f"Too many targets for command {c.args}"
)
- if len(target_nodes) > 1:
- raise RedisClusterException(f"Too many targets for command {c.args}")
-
- node = target_nodes[0]
- # now that we know the name of the node
- # ( it's just a string in the form of host:port )
- # we can build a list of commands for each node.
- node_name = node.name
- if node_name not in nodes:
- redis_node = self.get_redis_connection(node)
- connection = get_connection(redis_node, c.args)
- nodes[node_name] = NodeCommands(
- redis_node.parse_response, redis_node.connection_pool, connection
- )
- nodes[node_name].append(c)
+ node = target_nodes[0]
+
+ # now that we know the name of the node
+ # ( it's just a string in the form of host:port )
+ # we can build a list of commands for each node.
+ node_name = node.name
+ if node_name not in nodes:
+ redis_node = self.get_redis_connection(node)
+ try:
+ connection = get_connection(redis_node, c.args)
+ except ConnectionError:
+ connection_error_retry_counter += 1
+ if connection_error_retry_counter < 5:
+ # reinitialize the node -> slot table
+ self.nodes_manager.initialize()
+ continue
+ else:
+ raise
+ nodes[node_name] = NodeCommands(
+ redis_node.parse_response,
+ redis_node.connection_pool,
+ connection,
+ )
+ nodes[node_name].append(c)
+ break
# send the commands in sequence.
# we write to all the open sockets for each node first,