diff options
author | nbraun-amazon <85549956+nbraun-amazon@users.noreply.github.com> | 2021-08-18 12:06:09 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-18 12:06:09 +0300 |
commit | e19a76c58f2a998d86e51c5a2a0f1db37563efce (patch) | |
tree | 876614bb653f6df4006ab64cece4078d0355f067 /redis/client.py | |
parent | b96af52e012bc002df97c4a82a5e4ad389cea3f3 (diff) | |
download | redis-py-e19a76c58f2a998d86e51c5a2a0f1db37563efce.tar.gz |
Add retry mechanism with backoff (#1494)
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 177 |
1 files changed, 110 insertions, 67 deletions
diff --git a/redis/client.py b/redis/client.py index 741c2d0..ab9246d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1,4 +1,5 @@ from itertools import chain +import copy import datetime import hashlib import re @@ -758,7 +759,13 @@ class Redis(Commands, object): ssl_cert_reqs='required', ssl_ca_certs=None, ssl_check_hostname=False, max_connections=None, single_connection_client=False, - health_check_interval=0, client_name=None, username=None): + health_check_interval=0, client_name=None, username=None, + retry=None): + """ + Initialize a new Redis client. + To specify a retry policy, first set `retry_on_timeout` to `True` + then set `retry` to a valid `Retry` object + """ if not connection_pool: if charset is not None: warnings.warn(DeprecationWarning( @@ -778,6 +785,7 @@ class Redis(Commands, object): 'encoding_errors': encoding_errors, 'decode_responses': decode_responses, 'retry_on_timeout': retry_on_timeout, + 'retry': copy.deepcopy(retry), 'max_connections': max_connections, 'health_check_interval': health_check_interval, 'client_name': client_name @@ -940,21 +948,41 @@ class Redis(Commands, object): self.connection = None self.connection_pool.release(conn) + def _send_command_parse_response(self, + conn, + command_name, + *args, + **options): + """ + Send a command and parse the response + """ + conn.send_command(*args) + return self.parse_response(conn, command_name, **options) + + def _disconnect_raise(self, conn, error): + """ + Close the connection and raise an exception + if retry_on_timeout is not set or the error + is not a TimeoutError + """ + conn.disconnect() + if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): + raise error + # COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): "Execute a command and return a parsed response" pool = self.connection_pool command_name = args[0] conn = self.connection or pool.get_connection(command_name, **options) + try: - conn.send_command(*args) - return self.parse_response(conn, command_name, **options) - except (ConnectionError, TimeoutError) as e: - conn.disconnect() - if not (conn.retry_on_timeout and isinstance(e, TimeoutError)): - raise - conn.send_command(*args) - return self.parse_response(conn, command_name, **options) + return conn.retry.call_with_retry( + lambda: self._send_command_parse_response(conn, + command_name, + *args, + **options), + lambda error: self._disconnect_raise(conn, error)) finally: if not self.connection: pool.release(conn) @@ -1142,24 +1170,31 @@ class PubSub: kwargs = {'check_health': not self.subscribed} self._execute(connection, connection.send_command, *args, **kwargs) - def _execute(self, connection, command, *args, **kwargs): - try: - return command(*args, **kwargs) - except (ConnectionError, TimeoutError) as e: - connection.disconnect() - if not (connection.retry_on_timeout and - isinstance(e, TimeoutError)): - raise - # Connect manually here. If the Redis server is down, this will - # fail and raise a ConnectionError as desired. - connection.connect() - # the ``on_connect`` callback should haven been called by the - # connection to resubscribe us to any channels and patterns we were - # previously listening to - return command(*args, **kwargs) + def _disconnect_raise_connect(self, conn, error): + """ + Close the connection and raise an exception + if retry_on_timeout is not set or the error + is not a TimeoutError. Otherwise, try to reconnect + """ + conn.disconnect() + if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): + raise error + conn.connect() + + def _execute(self, conn, command, *args, **kwargs): + """ + Connect manually upon disconnection. If the Redis server is down, + this will fail and raise a ConnectionError as desired. + After reconnection, the ``on_connect`` callback should have been + called by the # connection to resubscribe us to any channels and + patterns we were previously listening to + """ + return conn.retry.call_with_retry( + lambda: command(*args, **kwargs), + lambda error: self._disconnect_raise_connect(conn, error)) def parse_response(self, block=True, timeout=0): - "Parse the response from a publish/subscribe command" + """Parse the response from a publish/subscribe command""" conn = self.connection if conn is None: raise RuntimeError( @@ -1499,6 +1534,27 @@ class Pipeline(Redis): return self.immediate_execute_command(*args, **kwargs) return self.pipeline_execute_command(*args, **kwargs) + def _disconnect_reset_raise(self, conn, error): + """ + Close the connection, reset watching state and + raise an exception if we were watching, + retry_on_timeout is not set, + or the error is not a TimeoutError + """ + conn.disconnect() + # if we were already watching a variable, the watch is no longer + # valid since this connection has died. raise a WatchError, which + # indicates the user should retry this transaction. + if self.watching: + self.reset() + raise WatchError("A ConnectionError occurred on while " + "watching one or more keys") + # if retry_on_timeout is not set, or the error is not + # a TimeoutError, raise it + if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): + self.reset() + raise + def immediate_execute_command(self, *args, **options): """ Execute a command immediately, but don't auto-retry on a @@ -1513,33 +1569,13 @@ class Pipeline(Redis): conn = self.connection_pool.get_connection(command_name, self.shard_hint) self.connection = conn - try: - conn.send_command(*args) - return self.parse_response(conn, command_name, **options) - except (ConnectionError, TimeoutError) as e: - conn.disconnect() - # if we were already watching a variable, the watch is no longer - # valid since this connection has died. raise a WatchError, which - # indicates the user should retry this transaction. - if self.watching: - self.reset() - raise WatchError("A ConnectionError occurred on while " - "watching one or more keys") - # if retry_on_timeout is not set, or the error is not - # a TimeoutError, raise it - if not (conn.retry_on_timeout and isinstance(e, TimeoutError)): - self.reset() - raise - - # retry_on_timeout is set, this is a TimeoutError and we are not - # already WATCHing any variables. retry the command. - try: - conn.send_command(*args) - return self.parse_response(conn, command_name, **options) - except (ConnectionError, TimeoutError): - # a subsequent failure should simply be raised - self.reset() - raise + + return conn.retry.call_with_retry( + lambda: self._send_command_parse_response(conn, + command_name, + *args, + **options), + lambda error: self._disconnect_reset_raise(conn, error)) def pipeline_execute_command(self, *args, **options): """ @@ -1672,6 +1708,25 @@ class Pipeline(Redis): if not exist: s.sha = immediate('SCRIPT LOAD', s.script) + def _disconnect_raise_reset(self, conn, error): + """ + Close the connection, raise an exception if we were watching, + and raise an exception if retry_on_timeout is not set, + or the error is not a TimeoutError + """ + conn.disconnect() + # if we were watching a variable, the watch is no longer valid + # since this connection has died. raise a WatchError, which + # indicates the user should retry this transaction. + if self.watching: + raise WatchError("A ConnectionError occurred on while " + "watching one or more keys") + # if retry_on_timeout is not set, or the error is not + # a TimeoutError, raise it + if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): + self.reset() + raise + def execute(self, raise_on_error=True): "Execute all the commands in the current pipeline" stack = self.command_stack @@ -1693,21 +1748,9 @@ class Pipeline(Redis): self.connection = conn try: - return execute(conn, stack, raise_on_error) - except (ConnectionError, TimeoutError) as e: - conn.disconnect() - # if we were watching a variable, the watch is no longer valid - # since this connection has died. raise a WatchError, which - # indicates the user should retry this transaction. - if self.watching: - raise WatchError("A ConnectionError occurred on while " - "watching one or more keys") - # if retry_on_timeout is not set, or the error is not - # a TimeoutError, raise it - if not (conn.retry_on_timeout and isinstance(e, TimeoutError)): - raise - # retry a TimeoutError when retry_on_timeout is set - return execute(conn, stack, raise_on_error) + return conn.retry.call_with_retry( + lambda: execute(conn, stack, raise_on_error), + lambda error: self._disconnect_raise_reset(conn, error)) finally: self.reset() |