diff options
author | Greg Melton <gmelton@gmail.com> | 2022-08-02 01:08:40 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-02 11:08:40 +0300 |
commit | fd9fea6bc07bf0970a5a42c5ec1788272446910c (patch) | |
tree | 93d2abbbd235e932fd67bdc325b62519e37bbd85 /redis/cluster.py | |
parent | da9d903249d6a5ed1ef8bf66d9e9408a1ab7cd31 (diff) | |
download | redis-py-fd9fea6bc07bf0970a5a42c5ec1788272446910c.tar.gz |
ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
* adds a retry that forces the nodes_manager to reinitialize if cluster pipleline connect to the one of the mapped nodes
* fix line length error
* add trailing comma
* move appending cmd
* updates changes
* trigger build
* fix linting errors
Diffstat (limited to 'redis/cluster.py')
-rw-r--r-- | redis/cluster.py | 69 |
1 files changed, 44 insertions, 25 deletions
diff --git a/redis/cluster.py b/redis/cluster.py index f5844fd..cee578b 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1897,34 +1897,53 @@ class ClusterPipeline(RedisCluster): # we figure out the slot number that command maps to, then from # the slot determine the node. for c in attempt: - # refer to our internal node -> slot table that - # tells us where a given - # command should route to. - passed_targets = c.options.pop("target_nodes", None) - if passed_targets and not self._is_nodes_flag(passed_targets): - target_nodes = self._parse_target_nodes(passed_targets) - else: - target_nodes = self._determine_nodes(*c.args, node_flag=passed_targets) - if not target_nodes: + connection_error_retry_counter = 0 + while True: + # refer to our internal node -> slot table that + # tells us where a given command should route to. + # (it might be possible we have a cached node that no longer + # exists in the cluster, which is why we do this in a loop) + passed_targets = c.options.pop("target_nodes", None) + if passed_targets and not self._is_nodes_flag(passed_targets): + target_nodes = self._parse_target_nodes(passed_targets) + else: + target_nodes = self._determine_nodes( + *c.args, node_flag=passed_targets + ) + if not target_nodes: + raise RedisClusterException( + f"No targets were found to execute {c.args} command on" + ) + if len(target_nodes) > 1: raise RedisClusterException( - f"No targets were found to execute {c.args} command on" + f"Too many targets for command {c.args}" ) - if len(target_nodes) > 1: - raise RedisClusterException(f"Too many targets for command {c.args}") - - node = target_nodes[0] - # now that we know the name of the node - # ( it's just a string in the form of host:port ) - # we can build a list of commands for each node. - node_name = node.name - if node_name not in nodes: - redis_node = self.get_redis_connection(node) - connection = get_connection(redis_node, c.args) - nodes[node_name] = NodeCommands( - redis_node.parse_response, redis_node.connection_pool, connection - ) - nodes[node_name].append(c) + node = target_nodes[0] + + # now that we know the name of the node + # ( it's just a string in the form of host:port ) + # we can build a list of commands for each node. + node_name = node.name + if node_name not in nodes: + redis_node = self.get_redis_connection(node) + try: + connection = get_connection(redis_node, c.args) + except ConnectionError: + connection_error_retry_counter += 1 + if connection_error_retry_counter < 5: + # reinitialize the node -> slot table + self.nodes_manager.initialize() + continue + else: + raise + nodes[node_name] = NodeCommands( + redis_node.parse_response, + redis_node.connection_pool, + connection, + ) + nodes[node_name].append(c) + break # send the commands in sequence. # we write to all the open sockets for each node first, |