diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2019-07-23 16:54:26 -0700 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2019-07-28 12:39:13 -0700 |
commit | f60b2b07caba276b9308340b8ea06e5844f3f0ab (patch) | |
tree | 797b3b5b1ade73092bca425200b0e9a65217f3a5 /redis/connection.py | |
parent | 0984b102264b2600a6534ad8fef6f4cab44b4ecc (diff) | |
download | redis-py-f60b2b07caba276b9308340b8ea06e5844f3f0ab.tar.gz |
PING/PONG health checks
The `Redis` class and the `ConnectionPool` class now support the
"health_check_interval=N" option. By default N=0, which turns off health
checks. `N` should be an integer, and when greater than 0, ensures that
a health check is performed just before command execution anytime the
underlying connection has been idle for more than N seconds. A health
check is a full PING/PONG round trip to the Redis server.
If a health check encounters a ConnectionError or TimeoutError, the connection
is disconnected and reconnected and the health check is retried exactly once.
Any error during the retry is raised to the caller. Health check retries
are not governed by any other options such as `retry_on_timeout`. In systems
where idle times are common, these health checks are the intended way to
reconnect to the Redis server without harming any user data.
When this option is enabled for PubSub connections, calling `get_message()` or
`listen()` will send a health check anytime a message has not been read on
the PubSub connection for `health_check_interval` seconds. Users should
call `get_message()` or `listen()` at least every `health_check_interval`
seconds in order to keep the connection open.
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-x | redis/connection.py | 61 |
1 files changed, 47 insertions, 14 deletions
diff --git a/redis/connection.py b/redis/connection.py index 7d4301a..b60a9fd 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -2,6 +2,7 @@ from __future__ import unicode_literals from distutils.version import StrictVersion from errno import EWOULDBLOCK from itertools import chain +from time import time import io import os import socket @@ -20,17 +21,17 @@ from redis._compat import (xrange, imap, byte_to_chr, unicode, long, LifoQueue, Empty, Full, urlparse, parse_qs, recv, recv_into, unquote, BlockingIOError) from redis.exceptions import ( - DataError, - RedisError, - ConnectionError, - TimeoutError, + AuthenticationError, BusyLoadingError, - ResponseError, + ConnectionError, + DataError, + ExecAbortError, InvalidResponse, - AuthenticationError, NoScriptError, - ExecAbortError, - ReadOnlyError + ReadOnlyError, + RedisError, + ResponseError, + TimeoutError, ) from redis.utils import HIREDIS_AVAILABLE if HIREDIS_AVAILABLE: @@ -460,7 +461,8 @@ class Connection(object): socket_keepalive=False, socket_keepalive_options=None, socket_type=0, retry_on_timeout=False, encoding='utf-8', encoding_errors='strict', decode_responses=False, - parser_class=DefaultParser, socket_read_size=65536): + parser_class=DefaultParser, socket_read_size=65536, + health_check_interval=0): self.pid = os.getpid() self.host = host self.port = int(port) @@ -472,6 +474,8 @@ class Connection(object): self.socket_keepalive_options = socket_keepalive_options or {} self.socket_type = socket_type self.retry_on_timeout = retry_on_timeout + self.health_check_interval = health_check_interval + self.next_health_check = 0 self.encoder = Encoder(encoding, encoding_errors, decode_responses) self._sock = None self._parser = parser_class(socket_read_size=socket_read_size) @@ -579,7 +583,9 @@ class Connection(object): # if a password is specified, authenticate if self.password: - self.send_command('AUTH', self.password) + # avoid checking health here -- PING will fail if we try + # to check the health prior to the AUTH + self.send_command('AUTH', self.password, check_health=False) if nativestr(self.read_response()) != 'OK': raise AuthenticationError('Invalid Password') @@ -602,10 +608,28 @@ class Connection(object): pass self._sock = None - def send_packed_command(self, command): + def check_health(self): + "Check the health of the connection with a PING/PONG" + if self.health_check_interval and time() > self.next_health_check: + try: + self.send_command('PING', check_health=False) + if nativestr(self.read_response()) != 'PONG': + raise ConnectionError( + 'Bad response from PING health check') + except (ConnectionError, TimeoutError) as ex: + self.disconnect() + self.send_command('PING', check_health=False) + if nativestr(self.read_response()) != 'PONG': + raise ConnectionError( + 'Bad response from PING health check') + + def send_packed_command(self, command, check_health=True): "Send an already packed command to the Redis server" if not self._sock: self.connect() + # guard against health check recurrsion + if check_health: + self.check_health() try: if isinstance(command, str): command = [command] @@ -628,9 +652,10 @@ class Connection(object): self.disconnect() raise - def send_command(self, *args): + def send_command(self, *args, **kwargs): "Pack and send a command to the Redis server" - self.send_packed_command(self.pack_command(*args)) + self.send_packed_command(self.pack_command(*args), + check_health=kwargs.get('check_health', True)) def can_read(self, timeout=0): "Poll the socket to see if there's data that can be read." @@ -656,6 +681,10 @@ class Connection(object): except: # noqa: E722 self.disconnect() raise + + if self.health_check_interval: + self.next_health_check = time() + self.health_check_interval + if isinstance(response, ResponseError): raise response return response @@ -777,13 +806,16 @@ class UnixDomainSocketConnection(Connection): socket_timeout=None, encoding='utf-8', encoding_errors='strict', decode_responses=False, retry_on_timeout=False, - parser_class=DefaultParser, socket_read_size=65536): + parser_class=DefaultParser, socket_read_size=65536, + health_check_interval=0): self.pid = os.getpid() self.path = path self.db = db self.password = password self.socket_timeout = socket_timeout self.retry_on_timeout = retry_on_timeout + self.health_check_interval = health_check_interval + self.next_health_check = 0 self.encoder = Encoder(encoding, encoding_errors, decode_responses) self._sock = None self._parser = parser_class(socket_read_size=socket_read_size) @@ -829,6 +861,7 @@ URL_QUERY_ARGUMENT_PARSERS = { 'socket_keepalive': to_bool, 'retry_on_timeout': to_bool, 'max_connections': int, + 'health_check_interval': int, } |