diff options
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-x | redis/connection.py | 129 |
1 files changed, 47 insertions, 82 deletions
diff --git a/redis/connection.py b/redis/connection.py index 22d3902..a29f9b2 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -1,7 +1,8 @@ -from __future__ import unicode_literals from distutils.version import StrictVersion from itertools import chain from time import time +from queue import LifoQueue, Empty, Full +from urllib.parse import parse_qs, unquote, urlparse import errno import io import os @@ -9,11 +10,6 @@ import socket import threading import warnings -from redis._compat import (xrange, imap, unicode, long, - nativestr, basestring, iteritems, - LifoQueue, Empty, Full, urlparse, parse_qs, - recv, recv_into, unquote, BlockingIOError, - sendall, shutdown, ssl_wrap_socket) from redis.exceptions import ( AuthenticationError, AuthenticationWrongNumberOfArgsError, @@ -31,7 +27,7 @@ from redis.exceptions import ( TimeoutError, ModuleError, ) -from redis.utils import HIREDIS_AVAILABLE +from redis.utils import HIREDIS_AVAILABLE, str_if_bytes try: import ssl @@ -50,16 +46,6 @@ if ssl_available: else: NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLError] = 2 -# In Python 2.7 a socket.error is raised for a nonblocking read. -# The _compat module aliases BlockingIOError to socket.error to be -# Python 2/3 compatible. -# However this means that all socket.error exceptions need to be handled -# properly within these exception handlers. -# We need to make sure socket.error is included in these handlers and -# provide a dummy error number that will never match a real exception. -if socket.error not in NONBLOCKING_EXCEPTION_ERROR_NUMBERS: - NONBLOCKING_EXCEPTION_ERROR_NUMBERS[socket.error] = -999999 - NONBLOCKING_EXCEPTIONS = tuple(NONBLOCKING_EXCEPTION_ERROR_NUMBERS.keys()) if HIREDIS_AVAILABLE: @@ -101,7 +87,7 @@ MODULE_EXPORTS_DATA_TYPES_ERROR = "Error unloading module: the module " \ "types, can't unload" -class Encoder(object): +class Encoder: "Encode strings to bytes-like and decode bytes-like to strings" def __init__(self, encoding, encoding_errors, decode_responses): @@ -117,17 +103,14 @@ class Encoder(object): # special case bool since it is a subclass of int raise DataError("Invalid input of type: 'bool'. Convert to a " "bytes, string, int or float first.") - elif isinstance(value, float): + elif isinstance(value, (int, float)): value = repr(value).encode() - elif isinstance(value, (int, long)): - # python 2 repr() on longs is '123L', so use str() instead - value = str(value).encode() - elif not isinstance(value, basestring): + elif not isinstance(value, str): # a value we don't know how to deal with. throw an error typename = type(value).__name__ raise DataError("Invalid input of type: '%s'. Convert to a " "bytes, string, int or float first." % typename) - if isinstance(value, unicode): + if isinstance(value, str): value = value.encode(self.encoding, self.encoding_errors) return value @@ -141,7 +124,7 @@ class Encoder(object): return value -class BaseParser(object): +class BaseParser: EXCEPTION_CLASSES = { 'ERR': { 'max number of clients reached': ConnectionError, @@ -180,7 +163,7 @@ class BaseParser(object): return ResponseError(response) -class SocketBuffer(object): +class SocketBuffer: def __init__(self, socket, socket_read_size, socket_timeout): self._sock = socket self.socket_read_size = socket_read_size @@ -208,7 +191,7 @@ class SocketBuffer(object): if custom_timeout: sock.settimeout(timeout) while True: - data = recv(self._sock, socket_read_size) + data = self._sock.recv(socket_read_size) # an empty string indicates the server shutdown the socket if isinstance(data, bytes) and len(data) == 0: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) @@ -345,7 +328,7 @@ class PythonParser(BaseParser): # server returned an error if byte == b'-': - response = nativestr(response) + response = response.decode('utf-8', errors='replace') error = self.parse_error(response) # if the error is a ConnectionError, raise immediately so the user # is notified @@ -361,7 +344,7 @@ class PythonParser(BaseParser): pass # int value elif byte == b':': - response = long(response) + response = int(response) # bulk response elif byte == b'$': length = int(response) @@ -373,7 +356,7 @@ class PythonParser(BaseParser): length = int(response) if length == -1: return None - response = [self.read_response() for i in xrange(length)] + response = [self.read_response() for i in range(length)] if isinstance(response, bytes): response = self.encoder.decode(response) return response @@ -437,12 +420,12 @@ class HiredisParser(BaseParser): if custom_timeout: sock.settimeout(timeout) if HIREDIS_USE_BYTE_BUFFER: - bufflen = recv_into(self._sock, self._buffer) + bufflen = self._sock.recv_into(self._buffer) if bufflen == 0: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) self._reader.feed(self._buffer, 0, bufflen) else: - buffer = recv(self._sock, self.socket_read_size) + buffer = self._sock.recv(self.socket_read_size) # an empty string indicates the server shutdown the socket if not isinstance(buffer, bytes) or len(buffer) == 0: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) @@ -507,7 +490,7 @@ else: DefaultParser = PythonParser -class Connection(object): +class Connection: "Manages TCP communication to and from a Redis server" def __init__(self, host='localhost', port=6379, db=0, password=None, @@ -606,7 +589,7 @@ class Connection(object): # TCP_KEEPALIVE if self.socket_keepalive: sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - for k, v in iteritems(self.socket_keepalive_options): + for k, v in self.socket_keepalive_options.items(): sock.setsockopt(socket.IPPROTO_TCP, k, v) # set the socket_connect_timeout before we connect @@ -619,14 +602,14 @@ class Connection(object): sock.settimeout(self.socket_timeout) return sock - except socket.error as _: + except OSError as _: err = _ if sock is not None: sock.close() if err is not None: raise err - raise socket.error("socket.getaddrinfo returned an empty list") + raise OSError("socket.getaddrinfo returned an empty list") def _error_message(self, exception): # args for socket.error can either be (errno, "message") @@ -662,19 +645,19 @@ class Connection(object): self.send_command('AUTH', self.password, check_health=False) auth_response = self.read_response() - if nativestr(auth_response) != 'OK': + if str_if_bytes(auth_response) != 'OK': raise AuthenticationError('Invalid Username or Password') # if a client_name is given, set it if self.client_name: self.send_command('CLIENT', 'SETNAME', self.client_name) - if nativestr(self.read_response()) != 'OK': + if str_if_bytes(self.read_response()) != 'OK': raise ConnectionError('Error setting client name') # if a database is specified, switch to it if self.db: self.send_command('SELECT', self.db) - if nativestr(self.read_response()) != 'OK': + if str_if_bytes(self.read_response()) != 'OK': raise ConnectionError('Invalid Database') def disconnect(self): @@ -684,9 +667,9 @@ class Connection(object): return try: if os.getpid() == self.pid: - shutdown(self._sock, socket.SHUT_RDWR) + self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() - except socket.error: + except OSError: pass self._sock = None @@ -695,13 +678,13 @@ class Connection(object): if self.health_check_interval and time() > self.next_health_check: try: self.send_command('PING', check_health=False) - if nativestr(self.read_response()) != 'PONG': + if str_if_bytes(self.read_response()) != 'PONG': raise ConnectionError( 'Bad response from PING health check') except (ConnectionError, TimeoutError): self.disconnect() self.send_command('PING', check_health=False) - if nativestr(self.read_response()) != 'PONG': + if str_if_bytes(self.read_response()) != 'PONG': raise ConnectionError( 'Bad response from PING health check') @@ -716,7 +699,7 @@ class Connection(object): if isinstance(command, str): command = [command] for item in command: - sendall(self._sock, item) + self._sock.sendall(item) except socket.timeout: self.disconnect() raise TimeoutError("Timeout writing to socket") @@ -777,7 +760,7 @@ class Connection(object): # arguments to be sent separately, so split the first argument # manually. These arguments should be bytestrings so that they are # not encoded. - if isinstance(args[0], unicode): + if isinstance(args[0], str): args = tuple(args[0].encode().split()) + args[1:] elif b' ' in args[0]: args = tuple(args[0].split()) + args[1:] @@ -785,7 +768,7 @@ class Connection(object): buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF)) buffer_cutoff = self._buffer_cutoff - for arg in imap(self.encoder.encode, args): + for arg in map(self.encoder.encode, args): # to avoid large string mallocs, chunk the command into the # output list if we're sending large values or memoryviews arg_length = len(arg) @@ -838,13 +821,13 @@ class SSLConnection(Connection): if not ssl_available: raise RedisError("Python wasn't built with SSL support") - super(SSLConnection, self).__init__(**kwargs) + super().__init__(**kwargs) self.keyfile = ssl_keyfile self.certfile = ssl_certfile if ssl_cert_reqs is None: ssl_cert_reqs = ssl.CERT_NONE - elif isinstance(ssl_cert_reqs, basestring): + elif isinstance(ssl_cert_reqs, str): CERT_REQS = { 'none': ssl.CERT_NONE, 'optional': ssl.CERT_OPTIONAL, @@ -861,27 +844,16 @@ class SSLConnection(Connection): def _connect(self): "Wrap the socket with SSL support" - sock = super(SSLConnection, self)._connect() - if hasattr(ssl, "create_default_context"): - context = ssl.create_default_context() - context.check_hostname = self.check_hostname - context.verify_mode = self.cert_reqs - if self.certfile and self.keyfile: - context.load_cert_chain(certfile=self.certfile, - keyfile=self.keyfile) - if self.ca_certs: - context.load_verify_locations(self.ca_certs) - sock = ssl_wrap_socket(context, sock, server_hostname=self.host) - else: - # In case this code runs in a version which is older than 2.7.9, - # we want to fall back to old code - sock = ssl_wrap_socket(ssl, - sock, - cert_reqs=self.cert_reqs, - keyfile=self.keyfile, - certfile=self.certfile, - ca_certs=self.ca_certs) - return sock + sock = super()._connect() + context = ssl.create_default_context() + context.check_hostname = self.check_hostname + context.verify_mode = self.cert_reqs + if self.certfile and self.keyfile: + context.load_cert_chain(certfile=self.certfile, + keyfile=self.keyfile) + if self.ca_certs: + context.load_verify_locations(self.ca_certs) + return context.wrap_socket(sock, server_hostname=self.host) class UnixDomainSocketConnection(Connection): @@ -941,7 +913,7 @@ FALSE_STRINGS = ('0', 'F', 'FALSE', 'N', 'NO') def to_bool(value): if value is None or value == '': return None - if isinstance(value, basestring) and value.upper() in FALSE_STRINGS: + if isinstance(value, str) and value.upper() in FALSE_STRINGS: return False return bool(value) @@ -957,7 +929,7 @@ URL_QUERY_ARGUMENT_PARSERS = { } -class ConnectionPool(object): +class ConnectionPool: """ Create a connection pool. ``If max_connections`` is set, then this object raises :py:class:`~redis.ConnectionError` when the pool's @@ -1019,7 +991,7 @@ class ConnectionPool(object): url = urlparse(url) url_options = {} - for name, value in iteritems(parse_qs(url.query)): + for name, value in parse_qs(url.query).items(): if value and len(value) > 0: parser = URL_QUERY_ARGUMENT_PARSERS.get(name) if parser: @@ -1096,7 +1068,7 @@ class ConnectionPool(object): def __init__(self, connection_class=Connection, max_connections=None, **connection_kwargs): max_connections = max_connections or 2 ** 31 - if not isinstance(max_connections, (int, long)) or max_connections < 0: + if not isinstance(max_connections, int) or max_connections < 0: raise ValueError('"max_connections" must be a positive integer') self.connection_class = connection_class @@ -1173,14 +1145,7 @@ class ConnectionPool(object): # that time it is assumed that the child is deadlocked and a # redis.ChildDeadlockedError error is raised. if self.pid != os.getpid(): - # python 2.7 doesn't support a timeout option to lock.acquire() - # we have to mimic lock timeouts ourselves. - timeout_at = time() + 5 - acquired = False - while time() < timeout_at: - acquired = self._fork_lock.acquire(False) - if acquired: - break + acquired = self._fork_lock.acquire(timeout=5) if not acquired: raise ChildDeadlockedError # reset() the instance for the new process if another thread @@ -1323,7 +1288,7 @@ class BlockingConnectionPool(ConnectionPool): self.queue_class = queue_class self.timeout = timeout - super(BlockingConnectionPool, self).__init__( + super().__init__( connection_class=connection_class, max_connections=max_connections, **connection_kwargs) |