summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authornbraun-amazon <85549956+nbraun-amazon@users.noreply.github.com>2021-08-18 12:06:09 +0300
committerGitHub <noreply@github.com>2021-08-18 12:06:09 +0300
commite19a76c58f2a998d86e51c5a2a0f1db37563efce (patch)
tree876614bb653f6df4006ab64cece4078d0355f067 /redis/client.py
parentb96af52e012bc002df97c4a82a5e4ad389cea3f3 (diff)
downloadredis-py-e19a76c58f2a998d86e51c5a2a0f1db37563efce.tar.gz
Add retry mechanism with backoff (#1494)
Diffstat (limited to 'redis/client.py')
-rwxr-xr-xredis/client.py177
1 files changed, 110 insertions, 67 deletions
diff --git a/redis/client.py b/redis/client.py
index 741c2d0..ab9246d 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -1,4 +1,5 @@
from itertools import chain
+import copy
import datetime
import hashlib
import re
@@ -758,7 +759,13 @@ class Redis(Commands, object):
ssl_cert_reqs='required', ssl_ca_certs=None,
ssl_check_hostname=False,
max_connections=None, single_connection_client=False,
- health_check_interval=0, client_name=None, username=None):
+ health_check_interval=0, client_name=None, username=None,
+ retry=None):
+ """
+ Initialize a new Redis client.
+ To specify a retry policy, first set `retry_on_timeout` to `True`
+ then set `retry` to a valid `Retry` object
+ """
if not connection_pool:
if charset is not None:
warnings.warn(DeprecationWarning(
@@ -778,6 +785,7 @@ class Redis(Commands, object):
'encoding_errors': encoding_errors,
'decode_responses': decode_responses,
'retry_on_timeout': retry_on_timeout,
+ 'retry': copy.deepcopy(retry),
'max_connections': max_connections,
'health_check_interval': health_check_interval,
'client_name': client_name
@@ -940,21 +948,41 @@ class Redis(Commands, object):
self.connection = None
self.connection_pool.release(conn)
+ def _send_command_parse_response(self,
+ conn,
+ command_name,
+ *args,
+ **options):
+ """
+ Send a command and parse the response
+ """
+ conn.send_command(*args)
+ return self.parse_response(conn, command_name, **options)
+
+ def _disconnect_raise(self, conn, error):
+ """
+ Close the connection and raise an exception
+ if retry_on_timeout is not set or the error
+ is not a TimeoutError
+ """
+ conn.disconnect()
+ if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
+ raise error
+
# COMMAND EXECUTION AND PROTOCOL PARSING
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
conn = self.connection or pool.get_connection(command_name, **options)
+
try:
- conn.send_command(*args)
- return self.parse_response(conn, command_name, **options)
- except (ConnectionError, TimeoutError) as e:
- conn.disconnect()
- if not (conn.retry_on_timeout and isinstance(e, TimeoutError)):
- raise
- conn.send_command(*args)
- return self.parse_response(conn, command_name, **options)
+ return conn.retry.call_with_retry(
+ lambda: self._send_command_parse_response(conn,
+ command_name,
+ *args,
+ **options),
+ lambda error: self._disconnect_raise(conn, error))
finally:
if not self.connection:
pool.release(conn)
@@ -1142,24 +1170,31 @@ class PubSub:
kwargs = {'check_health': not self.subscribed}
self._execute(connection, connection.send_command, *args, **kwargs)
- def _execute(self, connection, command, *args, **kwargs):
- try:
- return command(*args, **kwargs)
- except (ConnectionError, TimeoutError) as e:
- connection.disconnect()
- if not (connection.retry_on_timeout and
- isinstance(e, TimeoutError)):
- raise
- # Connect manually here. If the Redis server is down, this will
- # fail and raise a ConnectionError as desired.
- connection.connect()
- # the ``on_connect`` callback should haven been called by the
- # connection to resubscribe us to any channels and patterns we were
- # previously listening to
- return command(*args, **kwargs)
+ def _disconnect_raise_connect(self, conn, error):
+ """
+ Close the connection and raise an exception
+ if retry_on_timeout is not set or the error
+ is not a TimeoutError. Otherwise, try to reconnect
+ """
+ conn.disconnect()
+ if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
+ raise error
+ conn.connect()
+
+ def _execute(self, conn, command, *args, **kwargs):
+ """
+ Connect manually upon disconnection. If the Redis server is down,
+ this will fail and raise a ConnectionError as desired.
+ After reconnection, the ``on_connect`` callback should have been
+ called by the # connection to resubscribe us to any channels and
+ patterns we were previously listening to
+ """
+ return conn.retry.call_with_retry(
+ lambda: command(*args, **kwargs),
+ lambda error: self._disconnect_raise_connect(conn, error))
def parse_response(self, block=True, timeout=0):
- "Parse the response from a publish/subscribe command"
+ """Parse the response from a publish/subscribe command"""
conn = self.connection
if conn is None:
raise RuntimeError(
@@ -1499,6 +1534,27 @@ class Pipeline(Redis):
return self.immediate_execute_command(*args, **kwargs)
return self.pipeline_execute_command(*args, **kwargs)
+ def _disconnect_reset_raise(self, conn, error):
+ """
+ Close the connection, reset watching state and
+ raise an exception if we were watching,
+ retry_on_timeout is not set,
+ or the error is not a TimeoutError
+ """
+ conn.disconnect()
+ # if we were already watching a variable, the watch is no longer
+ # valid since this connection has died. raise a WatchError, which
+ # indicates the user should retry this transaction.
+ if self.watching:
+ self.reset()
+ raise WatchError("A ConnectionError occurred on while "
+ "watching one or more keys")
+ # if retry_on_timeout is not set, or the error is not
+ # a TimeoutError, raise it
+ if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
+ self.reset()
+ raise
+
def immediate_execute_command(self, *args, **options):
"""
Execute a command immediately, but don't auto-retry on a
@@ -1513,33 +1569,13 @@ class Pipeline(Redis):
conn = self.connection_pool.get_connection(command_name,
self.shard_hint)
self.connection = conn
- try:
- conn.send_command(*args)
- return self.parse_response(conn, command_name, **options)
- except (ConnectionError, TimeoutError) as e:
- conn.disconnect()
- # if we were already watching a variable, the watch is no longer
- # valid since this connection has died. raise a WatchError, which
- # indicates the user should retry this transaction.
- if self.watching:
- self.reset()
- raise WatchError("A ConnectionError occurred on while "
- "watching one or more keys")
- # if retry_on_timeout is not set, or the error is not
- # a TimeoutError, raise it
- if not (conn.retry_on_timeout and isinstance(e, TimeoutError)):
- self.reset()
- raise
-
- # retry_on_timeout is set, this is a TimeoutError and we are not
- # already WATCHing any variables. retry the command.
- try:
- conn.send_command(*args)
- return self.parse_response(conn, command_name, **options)
- except (ConnectionError, TimeoutError):
- # a subsequent failure should simply be raised
- self.reset()
- raise
+
+ return conn.retry.call_with_retry(
+ lambda: self._send_command_parse_response(conn,
+ command_name,
+ *args,
+ **options),
+ lambda error: self._disconnect_reset_raise(conn, error))
def pipeline_execute_command(self, *args, **options):
"""
@@ -1672,6 +1708,25 @@ class Pipeline(Redis):
if not exist:
s.sha = immediate('SCRIPT LOAD', s.script)
+ def _disconnect_raise_reset(self, conn, error):
+ """
+ Close the connection, raise an exception if we were watching,
+ and raise an exception if retry_on_timeout is not set,
+ or the error is not a TimeoutError
+ """
+ conn.disconnect()
+ # if we were watching a variable, the watch is no longer valid
+ # since this connection has died. raise a WatchError, which
+ # indicates the user should retry this transaction.
+ if self.watching:
+ raise WatchError("A ConnectionError occurred on while "
+ "watching one or more keys")
+ # if retry_on_timeout is not set, or the error is not
+ # a TimeoutError, raise it
+ if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
+ self.reset()
+ raise
+
def execute(self, raise_on_error=True):
"Execute all the commands in the current pipeline"
stack = self.command_stack
@@ -1693,21 +1748,9 @@ class Pipeline(Redis):
self.connection = conn
try:
- return execute(conn, stack, raise_on_error)
- except (ConnectionError, TimeoutError) as e:
- conn.disconnect()
- # if we were watching a variable, the watch is no longer valid
- # since this connection has died. raise a WatchError, which
- # indicates the user should retry this transaction.
- if self.watching:
- raise WatchError("A ConnectionError occurred on while "
- "watching one or more keys")
- # if retry_on_timeout is not set, or the error is not
- # a TimeoutError, raise it
- if not (conn.retry_on_timeout and isinstance(e, TimeoutError)):
- raise
- # retry a TimeoutError when retry_on_timeout is set
- return execute(conn, stack, raise_on_error)
+ return conn.retry.call_with_retry(
+ lambda: execute(conn, stack, raise_on_error),
+ lambda error: self._disconnect_raise_reset(conn, error))
finally:
self.reset()