summaryrefslogtreecommitdiff
path: root/redis/connection.py
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2019-07-23 16:54:26 -0700
committerAndy McCurdy <andy@andymccurdy.com>2019-07-28 12:39:13 -0700
commitf60b2b07caba276b9308340b8ea06e5844f3f0ab (patch)
tree797b3b5b1ade73092bca425200b0e9a65217f3a5 /redis/connection.py
parent0984b102264b2600a6534ad8fef6f4cab44b4ecc (diff)
downloadredis-py-f60b2b07caba276b9308340b8ea06e5844f3f0ab.tar.gz
PING/PONG health checks
The `Redis` class and the `ConnectionPool` class now support the "health_check_interval=N" option. By default N=0, which turns off health checks. `N` should be an integer, and when greater than 0, ensures that a health check is performed just before command execution anytime the underlying connection has been idle for more than N seconds. A health check is a full PING/PONG round trip to the Redis server. If a health check encounters a ConnectionError or TimeoutError, the connection is disconnected and reconnected and the health check is retried exactly once. Any error during the retry is raised to the caller. Health check retries are not governed by any other options such as `retry_on_timeout`. In systems where idle times are common, these health checks are the intended way to reconnect to the Redis server without harming any user data. When this option is enabled for PubSub connections, calling `get_message()` or `listen()` will send a health check anytime a message has not been read on the PubSub connection for `health_check_interval` seconds. Users should call `get_message()` or `listen()` at least every `health_check_interval` seconds in order to keep the connection open.
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-xredis/connection.py61
1 files changed, 47 insertions, 14 deletions
diff --git a/redis/connection.py b/redis/connection.py
index 7d4301a..b60a9fd 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -2,6 +2,7 @@ from __future__ import unicode_literals
from distutils.version import StrictVersion
from errno import EWOULDBLOCK
from itertools import chain
+from time import time
import io
import os
import socket
@@ -20,17 +21,17 @@ from redis._compat import (xrange, imap, byte_to_chr, unicode, long,
LifoQueue, Empty, Full, urlparse, parse_qs,
recv, recv_into, unquote, BlockingIOError)
from redis.exceptions import (
- DataError,
- RedisError,
- ConnectionError,
- TimeoutError,
+ AuthenticationError,
BusyLoadingError,
- ResponseError,
+ ConnectionError,
+ DataError,
+ ExecAbortError,
InvalidResponse,
- AuthenticationError,
NoScriptError,
- ExecAbortError,
- ReadOnlyError
+ ReadOnlyError,
+ RedisError,
+ ResponseError,
+ TimeoutError,
)
from redis.utils import HIREDIS_AVAILABLE
if HIREDIS_AVAILABLE:
@@ -460,7 +461,8 @@ class Connection(object):
socket_keepalive=False, socket_keepalive_options=None,
socket_type=0, retry_on_timeout=False, encoding='utf-8',
encoding_errors='strict', decode_responses=False,
- parser_class=DefaultParser, socket_read_size=65536):
+ parser_class=DefaultParser, socket_read_size=65536,
+ health_check_interval=0):
self.pid = os.getpid()
self.host = host
self.port = int(port)
@@ -472,6 +474,8 @@ class Connection(object):
self.socket_keepalive_options = socket_keepalive_options or {}
self.socket_type = socket_type
self.retry_on_timeout = retry_on_timeout
+ self.health_check_interval = health_check_interval
+ self.next_health_check = 0
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self._sock = None
self._parser = parser_class(socket_read_size=socket_read_size)
@@ -579,7 +583,9 @@ class Connection(object):
# if a password is specified, authenticate
if self.password:
- self.send_command('AUTH', self.password)
+ # avoid checking health here -- PING will fail if we try
+ # to check the health prior to the AUTH
+ self.send_command('AUTH', self.password, check_health=False)
if nativestr(self.read_response()) != 'OK':
raise AuthenticationError('Invalid Password')
@@ -602,10 +608,28 @@ class Connection(object):
pass
self._sock = None
- def send_packed_command(self, command):
+ def check_health(self):
+ "Check the health of the connection with a PING/PONG"
+ if self.health_check_interval and time() > self.next_health_check:
+ try:
+ self.send_command('PING', check_health=False)
+ if nativestr(self.read_response()) != 'PONG':
+ raise ConnectionError(
+ 'Bad response from PING health check')
+ except (ConnectionError, TimeoutError) as ex:
+ self.disconnect()
+ self.send_command('PING', check_health=False)
+ if nativestr(self.read_response()) != 'PONG':
+ raise ConnectionError(
+ 'Bad response from PING health check')
+
+ def send_packed_command(self, command, check_health=True):
"Send an already packed command to the Redis server"
if not self._sock:
self.connect()
+ # guard against health check recurrsion
+ if check_health:
+ self.check_health()
try:
if isinstance(command, str):
command = [command]
@@ -628,9 +652,10 @@ class Connection(object):
self.disconnect()
raise
- def send_command(self, *args):
+ def send_command(self, *args, **kwargs):
"Pack and send a command to the Redis server"
- self.send_packed_command(self.pack_command(*args))
+ self.send_packed_command(self.pack_command(*args),
+ check_health=kwargs.get('check_health', True))
def can_read(self, timeout=0):
"Poll the socket to see if there's data that can be read."
@@ -656,6 +681,10 @@ class Connection(object):
except: # noqa: E722
self.disconnect()
raise
+
+ if self.health_check_interval:
+ self.next_health_check = time() + self.health_check_interval
+
if isinstance(response, ResponseError):
raise response
return response
@@ -777,13 +806,16 @@ class UnixDomainSocketConnection(Connection):
socket_timeout=None, encoding='utf-8',
encoding_errors='strict', decode_responses=False,
retry_on_timeout=False,
- parser_class=DefaultParser, socket_read_size=65536):
+ parser_class=DefaultParser, socket_read_size=65536,
+ health_check_interval=0):
self.pid = os.getpid()
self.path = path
self.db = db
self.password = password
self.socket_timeout = socket_timeout
self.retry_on_timeout = retry_on_timeout
+ self.health_check_interval = health_check_interval
+ self.next_health_check = 0
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self._sock = None
self._parser = parser_class(socket_read_size=socket_read_size)
@@ -829,6 +861,7 @@ URL_QUERY_ARGUMENT_PARSERS = {
'socket_keepalive': to_bool,
'retry_on_timeout': to_bool,
'max_connections': int,
+ 'health_check_interval': int,
}