summaryrefslogtreecommitdiff
path: root/redis/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-xredis/connection.py129
1 files changed, 47 insertions, 82 deletions
diff --git a/redis/connection.py b/redis/connection.py
index 22d3902..a29f9b2 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -1,7 +1,8 @@
-from __future__ import unicode_literals
from distutils.version import StrictVersion
from itertools import chain
from time import time
+from queue import LifoQueue, Empty, Full
+from urllib.parse import parse_qs, unquote, urlparse
import errno
import io
import os
@@ -9,11 +10,6 @@ import socket
import threading
import warnings
-from redis._compat import (xrange, imap, unicode, long,
- nativestr, basestring, iteritems,
- LifoQueue, Empty, Full, urlparse, parse_qs,
- recv, recv_into, unquote, BlockingIOError,
- sendall, shutdown, ssl_wrap_socket)
from redis.exceptions import (
AuthenticationError,
AuthenticationWrongNumberOfArgsError,
@@ -31,7 +27,7 @@ from redis.exceptions import (
TimeoutError,
ModuleError,
)
-from redis.utils import HIREDIS_AVAILABLE
+from redis.utils import HIREDIS_AVAILABLE, str_if_bytes
try:
import ssl
@@ -50,16 +46,6 @@ if ssl_available:
else:
NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLError] = 2
-# In Python 2.7 a socket.error is raised for a nonblocking read.
-# The _compat module aliases BlockingIOError to socket.error to be
-# Python 2/3 compatible.
-# However this means that all socket.error exceptions need to be handled
-# properly within these exception handlers.
-# We need to make sure socket.error is included in these handlers and
-# provide a dummy error number that will never match a real exception.
-if socket.error not in NONBLOCKING_EXCEPTION_ERROR_NUMBERS:
- NONBLOCKING_EXCEPTION_ERROR_NUMBERS[socket.error] = -999999
-
NONBLOCKING_EXCEPTIONS = tuple(NONBLOCKING_EXCEPTION_ERROR_NUMBERS.keys())
if HIREDIS_AVAILABLE:
@@ -101,7 +87,7 @@ MODULE_EXPORTS_DATA_TYPES_ERROR = "Error unloading module: the module " \
"types, can't unload"
-class Encoder(object):
+class Encoder:
"Encode strings to bytes-like and decode bytes-like to strings"
def __init__(self, encoding, encoding_errors, decode_responses):
@@ -117,17 +103,14 @@ class Encoder(object):
# special case bool since it is a subclass of int
raise DataError("Invalid input of type: 'bool'. Convert to a "
"bytes, string, int or float first.")
- elif isinstance(value, float):
+ elif isinstance(value, (int, float)):
value = repr(value).encode()
- elif isinstance(value, (int, long)):
- # python 2 repr() on longs is '123L', so use str() instead
- value = str(value).encode()
- elif not isinstance(value, basestring):
+ elif not isinstance(value, str):
# a value we don't know how to deal with. throw an error
typename = type(value).__name__
raise DataError("Invalid input of type: '%s'. Convert to a "
"bytes, string, int or float first." % typename)
- if isinstance(value, unicode):
+ if isinstance(value, str):
value = value.encode(self.encoding, self.encoding_errors)
return value
@@ -141,7 +124,7 @@ class Encoder(object):
return value
-class BaseParser(object):
+class BaseParser:
EXCEPTION_CLASSES = {
'ERR': {
'max number of clients reached': ConnectionError,
@@ -180,7 +163,7 @@ class BaseParser(object):
return ResponseError(response)
-class SocketBuffer(object):
+class SocketBuffer:
def __init__(self, socket, socket_read_size, socket_timeout):
self._sock = socket
self.socket_read_size = socket_read_size
@@ -208,7 +191,7 @@ class SocketBuffer(object):
if custom_timeout:
sock.settimeout(timeout)
while True:
- data = recv(self._sock, socket_read_size)
+ data = self._sock.recv(socket_read_size)
# an empty string indicates the server shutdown the socket
if isinstance(data, bytes) and len(data) == 0:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
@@ -345,7 +328,7 @@ class PythonParser(BaseParser):
# server returned an error
if byte == b'-':
- response = nativestr(response)
+ response = response.decode('utf-8', errors='replace')
error = self.parse_error(response)
# if the error is a ConnectionError, raise immediately so the user
# is notified
@@ -361,7 +344,7 @@ class PythonParser(BaseParser):
pass
# int value
elif byte == b':':
- response = long(response)
+ response = int(response)
# bulk response
elif byte == b'$':
length = int(response)
@@ -373,7 +356,7 @@ class PythonParser(BaseParser):
length = int(response)
if length == -1:
return None
- response = [self.read_response() for i in xrange(length)]
+ response = [self.read_response() for i in range(length)]
if isinstance(response, bytes):
response = self.encoder.decode(response)
return response
@@ -437,12 +420,12 @@ class HiredisParser(BaseParser):
if custom_timeout:
sock.settimeout(timeout)
if HIREDIS_USE_BYTE_BUFFER:
- bufflen = recv_into(self._sock, self._buffer)
+ bufflen = self._sock.recv_into(self._buffer)
if bufflen == 0:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
self._reader.feed(self._buffer, 0, bufflen)
else:
- buffer = recv(self._sock, self.socket_read_size)
+ buffer = self._sock.recv(self.socket_read_size)
# an empty string indicates the server shutdown the socket
if not isinstance(buffer, bytes) or len(buffer) == 0:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
@@ -507,7 +490,7 @@ else:
DefaultParser = PythonParser
-class Connection(object):
+class Connection:
"Manages TCP communication to and from a Redis server"
def __init__(self, host='localhost', port=6379, db=0, password=None,
@@ -606,7 +589,7 @@ class Connection(object):
# TCP_KEEPALIVE
if self.socket_keepalive:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- for k, v in iteritems(self.socket_keepalive_options):
+ for k, v in self.socket_keepalive_options.items():
sock.setsockopt(socket.IPPROTO_TCP, k, v)
# set the socket_connect_timeout before we connect
@@ -619,14 +602,14 @@ class Connection(object):
sock.settimeout(self.socket_timeout)
return sock
- except socket.error as _:
+ except OSError as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
- raise socket.error("socket.getaddrinfo returned an empty list")
+ raise OSError("socket.getaddrinfo returned an empty list")
def _error_message(self, exception):
# args for socket.error can either be (errno, "message")
@@ -662,19 +645,19 @@ class Connection(object):
self.send_command('AUTH', self.password, check_health=False)
auth_response = self.read_response()
- if nativestr(auth_response) != 'OK':
+ if str_if_bytes(auth_response) != 'OK':
raise AuthenticationError('Invalid Username or Password')
# if a client_name is given, set it
if self.client_name:
self.send_command('CLIENT', 'SETNAME', self.client_name)
- if nativestr(self.read_response()) != 'OK':
+ if str_if_bytes(self.read_response()) != 'OK':
raise ConnectionError('Error setting client name')
# if a database is specified, switch to it
if self.db:
self.send_command('SELECT', self.db)
- if nativestr(self.read_response()) != 'OK':
+ if str_if_bytes(self.read_response()) != 'OK':
raise ConnectionError('Invalid Database')
def disconnect(self):
@@ -684,9 +667,9 @@ class Connection(object):
return
try:
if os.getpid() == self.pid:
- shutdown(self._sock, socket.SHUT_RDWR)
+ self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
- except socket.error:
+ except OSError:
pass
self._sock = None
@@ -695,13 +678,13 @@ class Connection(object):
if self.health_check_interval and time() > self.next_health_check:
try:
self.send_command('PING', check_health=False)
- if nativestr(self.read_response()) != 'PONG':
+ if str_if_bytes(self.read_response()) != 'PONG':
raise ConnectionError(
'Bad response from PING health check')
except (ConnectionError, TimeoutError):
self.disconnect()
self.send_command('PING', check_health=False)
- if nativestr(self.read_response()) != 'PONG':
+ if str_if_bytes(self.read_response()) != 'PONG':
raise ConnectionError(
'Bad response from PING health check')
@@ -716,7 +699,7 @@ class Connection(object):
if isinstance(command, str):
command = [command]
for item in command:
- sendall(self._sock, item)
+ self._sock.sendall(item)
except socket.timeout:
self.disconnect()
raise TimeoutError("Timeout writing to socket")
@@ -777,7 +760,7 @@ class Connection(object):
# arguments to be sent separately, so split the first argument
# manually. These arguments should be bytestrings so that they are
# not encoded.
- if isinstance(args[0], unicode):
+ if isinstance(args[0], str):
args = tuple(args[0].encode().split()) + args[1:]
elif b' ' in args[0]:
args = tuple(args[0].split()) + args[1:]
@@ -785,7 +768,7 @@ class Connection(object):
buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF))
buffer_cutoff = self._buffer_cutoff
- for arg in imap(self.encoder.encode, args):
+ for arg in map(self.encoder.encode, args):
# to avoid large string mallocs, chunk the command into the
# output list if we're sending large values or memoryviews
arg_length = len(arg)
@@ -838,13 +821,13 @@ class SSLConnection(Connection):
if not ssl_available:
raise RedisError("Python wasn't built with SSL support")
- super(SSLConnection, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.keyfile = ssl_keyfile
self.certfile = ssl_certfile
if ssl_cert_reqs is None:
ssl_cert_reqs = ssl.CERT_NONE
- elif isinstance(ssl_cert_reqs, basestring):
+ elif isinstance(ssl_cert_reqs, str):
CERT_REQS = {
'none': ssl.CERT_NONE,
'optional': ssl.CERT_OPTIONAL,
@@ -861,27 +844,16 @@ class SSLConnection(Connection):
def _connect(self):
"Wrap the socket with SSL support"
- sock = super(SSLConnection, self)._connect()
- if hasattr(ssl, "create_default_context"):
- context = ssl.create_default_context()
- context.check_hostname = self.check_hostname
- context.verify_mode = self.cert_reqs
- if self.certfile and self.keyfile:
- context.load_cert_chain(certfile=self.certfile,
- keyfile=self.keyfile)
- if self.ca_certs:
- context.load_verify_locations(self.ca_certs)
- sock = ssl_wrap_socket(context, sock, server_hostname=self.host)
- else:
- # In case this code runs in a version which is older than 2.7.9,
- # we want to fall back to old code
- sock = ssl_wrap_socket(ssl,
- sock,
- cert_reqs=self.cert_reqs,
- keyfile=self.keyfile,
- certfile=self.certfile,
- ca_certs=self.ca_certs)
- return sock
+ sock = super()._connect()
+ context = ssl.create_default_context()
+ context.check_hostname = self.check_hostname
+ context.verify_mode = self.cert_reqs
+ if self.certfile and self.keyfile:
+ context.load_cert_chain(certfile=self.certfile,
+ keyfile=self.keyfile)
+ if self.ca_certs:
+ context.load_verify_locations(self.ca_certs)
+ return context.wrap_socket(sock, server_hostname=self.host)
class UnixDomainSocketConnection(Connection):
@@ -941,7 +913,7 @@ FALSE_STRINGS = ('0', 'F', 'FALSE', 'N', 'NO')
def to_bool(value):
if value is None or value == '':
return None
- if isinstance(value, basestring) and value.upper() in FALSE_STRINGS:
+ if isinstance(value, str) and value.upper() in FALSE_STRINGS:
return False
return bool(value)
@@ -957,7 +929,7 @@ URL_QUERY_ARGUMENT_PARSERS = {
}
-class ConnectionPool(object):
+class ConnectionPool:
"""
Create a connection pool. ``If max_connections`` is set, then this
object raises :py:class:`~redis.ConnectionError` when the pool's
@@ -1019,7 +991,7 @@ class ConnectionPool(object):
url = urlparse(url)
url_options = {}
- for name, value in iteritems(parse_qs(url.query)):
+ for name, value in parse_qs(url.query).items():
if value and len(value) > 0:
parser = URL_QUERY_ARGUMENT_PARSERS.get(name)
if parser:
@@ -1096,7 +1068,7 @@ class ConnectionPool(object):
def __init__(self, connection_class=Connection, max_connections=None,
**connection_kwargs):
max_connections = max_connections or 2 ** 31
- if not isinstance(max_connections, (int, long)) or max_connections < 0:
+ if not isinstance(max_connections, int) or max_connections < 0:
raise ValueError('"max_connections" must be a positive integer')
self.connection_class = connection_class
@@ -1173,14 +1145,7 @@ class ConnectionPool(object):
# that time it is assumed that the child is deadlocked and a
# redis.ChildDeadlockedError error is raised.
if self.pid != os.getpid():
- # python 2.7 doesn't support a timeout option to lock.acquire()
- # we have to mimic lock timeouts ourselves.
- timeout_at = time() + 5
- acquired = False
- while time() < timeout_at:
- acquired = self._fork_lock.acquire(False)
- if acquired:
- break
+ acquired = self._fork_lock.acquire(timeout=5)
if not acquired:
raise ChildDeadlockedError
# reset() the instance for the new process if another thread
@@ -1323,7 +1288,7 @@ class BlockingConnectionPool(ConnectionPool):
self.queue_class = queue_class
self.timeout = timeout
- super(BlockingConnectionPool, self).__init__(
+ super().__init__(
connection_class=connection_class,
max_connections=max_connections,
**connection_kwargs)