summaryrefslogtreecommitdiff
path: root/redis/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-xredis/connection.py63
1 files changed, 37 insertions, 26 deletions
diff --git a/redis/connection.py b/redis/connection.py
index cb9acb4..6ff3650 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -1,4 +1,4 @@
-from distutils.version import LooseVersion
+from packaging.version import Version
from itertools import chain
from time import time
from queue import LifoQueue, Empty, Full
@@ -9,9 +9,9 @@ import io
import os
import socket
import threading
-import warnings
import weakref
+from redis.backoff import NoBackoff
from redis.exceptions import (
AuthenticationError,
AuthenticationWrongNumberOfArgsError,
@@ -29,9 +29,9 @@ from redis.exceptions import (
TimeoutError,
ModuleError,
)
-from redis.utils import HIREDIS_AVAILABLE, str_if_bytes
-from redis.backoff import NoBackoff
+
from redis.retry import Retry
+from redis.utils import HIREDIS_AVAILABLE, str_if_bytes
try:
import ssl
@@ -55,26 +55,18 @@ NONBLOCKING_EXCEPTIONS = tuple(NONBLOCKING_EXCEPTION_ERROR_NUMBERS.keys())
if HIREDIS_AVAILABLE:
import hiredis
- hiredis_version = LooseVersion(hiredis.__version__)
+ hiredis_version = Version(hiredis.__version__)
HIREDIS_SUPPORTS_CALLABLE_ERRORS = \
- hiredis_version >= LooseVersion('0.1.3')
+ hiredis_version >= Version('0.1.3')
HIREDIS_SUPPORTS_BYTE_BUFFER = \
- hiredis_version >= LooseVersion('0.1.4')
+ hiredis_version >= Version('0.1.4')
HIREDIS_SUPPORTS_ENCODING_ERRORS = \
- hiredis_version >= LooseVersion('1.0.0')
-
- if not HIREDIS_SUPPORTS_BYTE_BUFFER:
- msg = ("redis-py works best with hiredis >= 0.1.4. You're running "
- "hiredis %s. Please consider upgrading." % hiredis.__version__)
- warnings.warn(msg)
+ hiredis_version >= Version('1.0.0')
HIREDIS_USE_BYTE_BUFFER = True
# only use byte buffer if hiredis supports it
if not HIREDIS_SUPPORTS_BYTE_BUFFER:
HIREDIS_USE_BYTE_BUFFER = False
-else:
- msg = "redis-py works best with hiredis. Please consider installing"
- warnings.warn(msg)
SYM_STAR = b'*'
SYM_DOLLAR = b'$'
@@ -323,7 +315,7 @@ class PythonParser(BaseParser):
def can_read(self, timeout):
return self._buffer and self._buffer.can_read(timeout)
- def read_response(self):
+ def read_response(self, disable_decoding=False):
raw = self._buffer.readline()
if not raw:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
@@ -363,8 +355,9 @@ class PythonParser(BaseParser):
length = int(response)
if length == -1:
return None
- response = [self.read_response() for i in range(length)]
- if isinstance(response, bytes):
+ response = [self.read_response(disable_decoding=disable_decoding)
+ for i in range(length)]
+ if isinstance(response, bytes) and disable_decoding is False:
response = self.encoder.decode(response)
return response
@@ -458,7 +451,7 @@ class HiredisParser(BaseParser):
if custom_timeout:
sock.settimeout(self._socket_timeout)
- def read_response(self):
+ def read_response(self, disable_decoding=False):
if not self._reader:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
@@ -507,7 +500,7 @@ class Connection:
encoding_errors='strict', decode_responses=False,
parser_class=DefaultParser, socket_read_size=65536,
health_check_interval=0, client_name=None, username=None,
- retry=None):
+ retry=None, redis_connect_func=None):
"""
Initialize a new Connection.
To specify a retry policy, first set `retry_on_timeout` to `True`
@@ -537,8 +530,10 @@ class Connection:
self.health_check_interval = health_check_interval
self.next_health_check = 0
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
+ self.redis_connect_func = redis_connect_func
self._sock = None
- self._parser = parser_class(socket_read_size=socket_read_size)
+ self._socket_read_size = socket_read_size
+ self.set_parser(parser_class)
self._connect_callbacks = []
self._buffer_cutoff = 6000
@@ -568,6 +563,14 @@ class Connection:
def clear_connect_callbacks(self):
self._connect_callbacks = []
+ def set_parser(self, parser_class):
+ """
+ Creates a new instance of parser_class with socket size:
+ _socket_read_size and assigns it to the parser for the connection
+ :param parser_class: The required parser class
+ """
+ self._parser = parser_class(socket_read_size=self._socket_read_size)
+
def connect(self):
"Connects to the Redis server if not already connected"
if self._sock:
@@ -581,7 +584,12 @@ class Connection:
self._sock = sock
try:
- self.on_connect()
+ if self.redis_connect_func is None:
+ # Use the default on_connect function
+ self.on_connect()
+ else:
+ # Use the passed function redis_connect_func
+ self.redis_connect_func(self)
except RedisError:
# clean up after any error in on_connect
self.disconnect()
@@ -751,10 +759,12 @@ class Connection:
self.connect()
return self._parser.can_read(timeout)
- def read_response(self):
+ def read_response(self, disable_decoding=False):
"""Read the response from a previously sent command"""
try:
- response = self._parser.read_response()
+ response = self._parser.read_response(
+ disable_decoding=disable_decoding
+ )
except socket.timeout:
self.disconnect()
raise TimeoutError("Timeout reading from %s:%s" %
@@ -912,7 +922,8 @@ class UnixDomainSocketConnection(Connection):
self.next_health_check = 0
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self._sock = None
- self._parser = parser_class(socket_read_size=socket_read_size)
+ self._socket_read_size = socket_read_size
+ self.set_parser(parser_class)
self._connect_callbacks = []
self._buffer_cutoff = 6000