summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBar Shaul <88437685+barshaul@users.noreply.github.com>2021-12-23 12:18:02 +0200
committerGitHub <noreply@github.com>2021-12-23 12:18:02 +0200
commit3347888bfa19f9e82a71ae6dc13a4837c87ea893 (patch)
treea56e496b40a906b5e403442206cdabfda37c9eca
parentd6cb997bc7b96f4e646a497a89f9466c97aeffef (diff)
downloadredis-py-3347888bfa19f9e82a71ae6dc13a4837c87ea893.tar.gz
Retry on error exception and timeout fixes (#1821)
-rw-r--r--redis/cluster.py101
-rw-r--r--tests/test_cluster.py39
2 files changed, 58 insertions, 82 deletions
diff --git a/redis/cluster.py b/redis/cluster.py
index 0c2fc71..5707a9d 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -17,6 +17,7 @@ from redis.exceptions import (
ClusterCrossSlotError,
ClusterDownError,
ClusterError,
+ ConnectionError,
DataError,
MasterDownError,
MovedError,
@@ -374,6 +375,12 @@ class RedisCluster(RedisClusterCommands):
),
)
+ ERRORS_ALLOW_RETRY = (
+ ConnectionError,
+ TimeoutError,
+ ClusterDownError,
+ )
+
def __init__(
self,
host=None,
@@ -385,8 +392,6 @@ class RedisCluster(RedisClusterCommands):
reinitialize_steps=10,
read_from_replicas=False,
url=None,
- retry_on_timeout=False,
- retry=None,
**kwargs,
):
"""
@@ -417,11 +422,6 @@ class RedisCluster(RedisClusterCommands):
:cluster_error_retry_attempts: 'int'
Retry command execution attempts when encountering ClusterDownError
or ConnectionError
- :retry_on_timeout: 'bool'
- To specify a retry policy, first set `retry_on_timeout` to `True`
- then set `retry` to a valid `Retry` object
- :retry: 'Retry'
- a `Retry` object
:reinitialize_steps: 'int'
Specifies the number of MOVED errors that need to occur before
reinitializing the whole cluster topology. If a MOVED error occurs
@@ -452,9 +452,6 @@ class RedisCluster(RedisClusterCommands):
"Argument 'db' is not possible to use in cluster mode"
)
- if retry_on_timeout:
- kwargs.update({"retry_on_timeout": retry_on_timeout, "retry": retry})
-
# Get the startup node/s
from_url = False
if url is not None:
@@ -850,7 +847,7 @@ class RedisCluster(RedisClusterCommands):
def execute_command(self, *args, **kwargs):
"""
- Wrapper for ClusterDownError and ConnectionError error handling.
+ Wrapper for ERRORS_ALLOW_RETRY error handling.
It will try the number of times specified by the config option
"self.cluster_error_retry_attempts" which defaults to 3 unless manually
@@ -865,18 +862,19 @@ class RedisCluster(RedisClusterCommands):
dict<Any, ClusterNode>
"""
target_nodes_specified = False
- target_nodes = kwargs.pop("target_nodes", None)
- if target_nodes is not None and not self._is_nodes_flag(target_nodes):
- target_nodes = self._parse_target_nodes(target_nodes)
+ target_nodes = None
+ passed_targets = kwargs.pop("target_nodes", None)
+ if passed_targets is not None and not self._is_nodes_flag(passed_targets):
+ target_nodes = self._parse_target_nodes(passed_targets)
target_nodes_specified = True
- # If ClusterDownError/ConnectionError were thrown, the nodes
- # and slots cache were reinitialized. We will retry executing the
- # command with the updated cluster setup only when the target nodes
- # can be determined again with the new cache tables. Therefore,
- # when target nodes were passed to this function, we cannot retry
- # the command execution since the nodes may not be valid anymore
- # after the tables were reinitialized. So in case of passed target
- # nodes, retry_attempts will be set to 1.
+ # If an error that allows retrying was thrown, the nodes and slots
+ # cache were reinitialized. We will retry executing the command with
+ # the updated cluster setup only when the target nodes can be
+ # determined again with the new cache tables. Therefore, when target
+ # nodes were passed to this function, we cannot retry the command
+ # execution since the nodes may not be valid anymore after the tables
+ # were reinitialized. So in case of passed target nodes,
+ # retry_attempts will be set to 1.
retry_attempts = (
1 if target_nodes_specified else self.cluster_error_retry_attempts
)
@@ -887,7 +885,7 @@ class RedisCluster(RedisClusterCommands):
if not target_nodes_specified:
# Determine the nodes to execute the command on
target_nodes = self._determine_nodes(
- *args, **kwargs, nodes_flag=target_nodes
+ *args, **kwargs, nodes_flag=passed_targets
)
if not target_nodes:
raise RedisClusterException(
@@ -897,11 +895,14 @@ class RedisCluster(RedisClusterCommands):
res[node.name] = self._execute_command(node, *args, **kwargs)
# Return the processed result
return self._process_result(args[0], res, **kwargs)
- except (ClusterDownError, ConnectionError) as e:
- # The nodes and slots cache were reinitialized.
- # Try again with the new cluster setup. All other errors
- # should be raised.
- exception = e
+ except BaseException as e:
+ if type(e) in RedisCluster.ERRORS_ALLOW_RETRY:
+ # The nodes and slots cache were reinitialized.
+ # Try again with the new cluster setup.
+ exception = e
+ else:
+ # All other errors should be raised.
+ raise e
# If it fails the configured number of times then raise exception back
# to caller of this method
@@ -953,11 +954,11 @@ class RedisCluster(RedisClusterCommands):
)
return response
- except (RedisClusterException, BusyLoadingError):
- log.exception("RedisClusterException || BusyLoadingError")
+ except (RedisClusterException, BusyLoadingError) as e:
+ log.exception(type(e))
raise
- except ConnectionError:
- log.exception("ConnectionError")
+ except (ConnectionError, TimeoutError) as e:
+ log.exception(type(e))
# ConnectionError can also be raised if we couldn't get a
# connection from the pool before timing out, so check that
# this is an actual connection before attempting to disconnect.
@@ -976,13 +977,6 @@ class RedisCluster(RedisClusterCommands):
# and try again with the new setup
self.nodes_manager.initialize()
raise
- except TimeoutError:
- log.exception("TimeoutError")
- if connection is not None:
- connection.disconnect()
-
- if ttl < self.RedisClusterRequestTTL / 2:
- time.sleep(0.05)
except MovedError as e:
# First, we will try to patch the slots/nodes cache with the
# redirected node output and try again. If MovedError exceeds
@@ -1016,7 +1010,7 @@ class RedisCluster(RedisClusterCommands):
# ClusterDownError can occur during a failover and to get
# self-healed, we will try to reinitialize the cluster layout
# and retry executing the command
- time.sleep(0.05)
+ time.sleep(0.25)
self.nodes_manager.initialize()
raise e
except ResponseError as e:
@@ -1342,7 +1336,7 @@ class NodesManager:
raise RedisClusterException(
"Cluster mode is not enabled on this node"
)
- cluster_slots = r.execute_command("CLUSTER SLOTS")
+ cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
startup_nodes_reachable = True
except (ConnectionError, TimeoutError) as e:
msg = e.__str__
@@ -1631,21 +1625,20 @@ class ClusterPubSub(PubSub):
return self.node.redis_connection
-ERRORS_ALLOW_RETRY = (
- ConnectionError,
- TimeoutError,
- MovedError,
- AskError,
- TryAgainError,
-)
-
-
class ClusterPipeline(RedisCluster):
"""
Support for Redis pipeline
in cluster mode
"""
+ ERRORS_ALLOW_RETRY = (
+ ConnectionError,
+ TimeoutError,
+ MovedError,
+ AskError,
+ TryAgainError,
+ )
+
def __init__(
self,
nodes_manager,
@@ -1653,7 +1646,7 @@ class ClusterPipeline(RedisCluster):
cluster_response_callbacks=None,
startup_nodes=None,
read_from_replicas=False,
- cluster_error_retry_attempts=3,
+ cluster_error_retry_attempts=5,
reinitialize_steps=10,
**kwargs,
):
@@ -1915,7 +1908,11 @@ class ClusterPipeline(RedisCluster):
# collect all the commands we are allowed to retry.
# (MOVED, ASK, or connection errors or timeout errors)
attempt = sorted(
- (c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)),
+ (
+ c
+ for c in attempt
+ if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
+ ),
key=lambda x: x.position,
)
if attempt and allow_redirections:
diff --git a/tests/test_cluster.py b/tests/test_cluster.py
index e1c8c34..496ed98 100644
--- a/tests/test_cluster.py
+++ b/tests/test_cluster.py
@@ -22,6 +22,7 @@ from redis.crc import key_slot
from redis.exceptions import (
AskError,
ClusterDownError,
+ ConnectionError,
DataError,
MovedError,
NoPermissionError,
@@ -555,46 +556,24 @@ class TestRedisClusterObj:
for node in r.get_primaries():
assert node in nodes
- def test_cluster_down_overreaches_retry_attempts(self):
+ @pytest.mark.parametrize("error", RedisCluster.ERRORS_ALLOW_RETRY)
+ def test_cluster_down_overreaches_retry_attempts(self, error):
"""
- When ClusterDownError is thrown, test that we retry executing the
- command as many times as configured in cluster_error_retry_attempts
+ When error that allows retry is thrown, test that we retry executing
+ the command as many times as configured in cluster_error_retry_attempts
and then raise the exception
"""
with patch.object(RedisCluster, "_execute_command") as execute_command:
- def raise_cluster_down_error(target_node, *args, **kwargs):
+ def raise_error(target_node, *args, **kwargs):
execute_command.failed_calls += 1
- raise ClusterDownError(
- "CLUSTERDOWN The cluster is down. Use CLUSTER INFO for "
- "more information"
- )
+ raise error("mocked error")
- execute_command.side_effect = raise_cluster_down_error
+ execute_command.side_effect = raise_error
rc = get_mocked_redis_client(host=default_host, port=default_port)
- with pytest.raises(ClusterDownError):
- rc.get("bar")
- assert execute_command.failed_calls == rc.cluster_error_retry_attempts
-
- def test_connection_error_overreaches_retry_attempts(self):
- """
- When ConnectionError is thrown, test that we retry executing the
- command as many times as configured in cluster_error_retry_attempts
- and then raise the exception
- """
- with patch.object(RedisCluster, "_execute_command") as execute_command:
-
- def raise_conn_error(target_node, *args, **kwargs):
- execute_command.failed_calls += 1
- raise ConnectionError()
-
- execute_command.side_effect = raise_conn_error
-
- rc = get_mocked_redis_client(host=default_host, port=default_port)
-
- with pytest.raises(ConnectionError):
+ with pytest.raises(error):
rc.get("bar")
assert execute_command.failed_calls == rc.cluster_error_retry_attempts