diff options
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-x | redis/connection.py | 155 |
1 files changed, 105 insertions, 50 deletions
diff --git a/redis/connection.py b/redis/connection.py index 88286c8..7d4301a 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals from distutils.version import StrictVersion +from errno import EWOULDBLOCK from itertools import chain import io import os @@ -17,7 +18,7 @@ except ImportError: from redis._compat import (xrange, imap, byte_to_chr, unicode, long, nativestr, basestring, iteritems, LifoQueue, Empty, Full, urlparse, parse_qs, - recv, recv_into, unquote) + recv, recv_into, unquote, BlockingIOError) from redis.exceptions import ( DataError, RedisError, @@ -31,7 +32,6 @@ from redis.exceptions import ( ExecAbortError, ReadOnlyError ) -from redis.selector import DefaultSelector from redis.utils import HIREDIS_AVAILABLE if HIREDIS_AVAILABLE: import hiredis @@ -61,6 +61,8 @@ SYM_EMPTY = b'' SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server." +SENTINEL = object() + class Encoder(object): "Encode strings to bytes and decode bytes to strings" @@ -126,9 +128,10 @@ class BaseParser(object): class SocketBuffer(object): - def __init__(self, socket, socket_read_size): + def __init__(self, socket, socket_read_size, socket_timeout): self._sock = socket self.socket_read_size = socket_read_size + self.socket_timeout = socket_timeout self._buffer = io.BytesIO() # number of bytes written to the buffer from the socket self.bytes_written = 0 @@ -139,25 +142,51 @@ class SocketBuffer(object): def length(self): return self.bytes_written - self.bytes_read - def _read_from_socket(self, length=None): + def _read_from_socket(self, length=None, timeout=SENTINEL, + raise_on_timeout=True): + sock = self._sock socket_read_size = self.socket_read_size buf = self._buffer buf.seek(self.bytes_written) marker = 0 + custom_timeout = timeout is not SENTINEL - while True: - data = recv(self._sock, socket_read_size) - # an empty string indicates the server shutdown the socket - if isinstance(data, bytes) and len(data) == 0: - raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) - buf.write(data) - data_length = len(data) - self.bytes_written += data_length - marker += data_length - - if length is not None and length > marker: - continue - break + try: + if custom_timeout: + sock.settimeout(timeout) + while True: + data = recv(self._sock, socket_read_size) + # an empty string indicates the server shutdown the socket + if isinstance(data, bytes) and len(data) == 0: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) + buf.write(data) + data_length = len(data) + self.bytes_written += data_length + marker += data_length + + if length is not None and length > marker: + continue + return True + except BlockingIOError as ex: + # if we're in nonblocking mode and the recv raises a + # blocking error, simply return False indicating that + # there's no data to be read. otherwise raise the + # original exception. + if raise_on_timeout or ex.errno != EWOULDBLOCK: + raise + return False + except socket.timeout: + if raise_on_timeout: + raise + return False + finally: + if custom_timeout: + sock.settimeout(self.socket_timeout) + + def can_read(self, timeout): + return bool(self.length) or \ + self._read_from_socket(timeout=timeout, + raise_on_timeout=False) def read(self, length): length = length + 2 # make sure to read the \r\n terminator @@ -233,7 +262,9 @@ class PythonParser(BaseParser): def on_connect(self, connection): "Called when the socket connects" self._sock = connection._sock - self._buffer = SocketBuffer(self._sock, self.socket_read_size) + self._buffer = SocketBuffer(self._sock, + self.socket_read_size, + connection.socket_timeout) self.encoder = connection.encoder def on_disconnect(self): @@ -244,8 +275,8 @@ class PythonParser(BaseParser): self._buffer = None self.encoder = None - def can_read(self): - return self._buffer and bool(self._buffer.length) + def can_read(self, timeout): + return self._buffer and self._buffer.can_read(timeout) def read_response(self): response = self._buffer.readline() @@ -312,6 +343,7 @@ class HiredisParser(BaseParser): def on_connect(self, connection): self._sock = connection._sock + self._socket_timeout = connection.socket_timeout kwargs = { 'protocolError': InvalidResponse, 'replyError': self.parse_error, @@ -333,13 +365,52 @@ class HiredisParser(BaseParser): self._reader = None self._next_response = False - def can_read(self): + def can_read(self, timeout): if not self._reader: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) if self._next_response is False: self._next_response = self._reader.gets() - return self._next_response is not False + if self._next_response is False: + return self.read_from_socket(timeout=timeout, + raise_on_timeout=False) + return True + + def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True): + sock = self._sock + custom_timeout = timeout is not SENTINEL + try: + if custom_timeout: + sock.settimeout(timeout) + if HIREDIS_USE_BYTE_BUFFER: + bufflen = recv_into(self._sock, self._buffer) + if bufflen == 0: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) + self._reader.feed(self._buffer, 0, bufflen) + else: + buffer = recv(self._sock, self.socket_read_size) + # an empty string indicates the server shutdown the socket + if not isinstance(buffer, bytes) or len(buffer) == 0: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) + self._reader.feed(buffer) + # data was read from the socket and added to the buffer. + # return True to indicate that data was read. + return True + except BlockingIOError as ex: + # if we're in nonblocking mode and the recv raises a + # blocking error, simply return False indicating that + # there's no data to be read. otherwise raise the + # original exception. + if raise_on_timeout or ex.errno != EWOULDBLOCK: + raise + return False + except socket.timeout: + if not raise_on_timeout: + raise + return False + finally: + if custom_timeout: + sock.settimeout(self._socket_timeout) def read_response(self): if not self._reader: @@ -352,21 +423,8 @@ class HiredisParser(BaseParser): return response response = self._reader.gets() - socket_read_size = self.socket_read_size while response is False: - if HIREDIS_USE_BYTE_BUFFER: - bufflen = recv_into(self._sock, self._buffer) - if bufflen == 0: - raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) - else: - buffer = recv(self._sock, socket_read_size) - # an empty string indicates the server shutdown the socket - if not isinstance(buffer, bytes) or len(buffer) == 0: - raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) - if HIREDIS_USE_BYTE_BUFFER: - self._reader.feed(self._buffer, 0, bufflen) - else: - self._reader.feed(buffer) + self.read_from_socket() response = self._reader.gets() # if an older version of hiredis is installed, we need to attempt # to convert ResponseErrors to their appropriate types. @@ -416,7 +474,6 @@ class Connection(object): self.retry_on_timeout = retry_on_timeout self.encoder = Encoder(encoding, encoding_errors, decode_responses) self._sock = None - self._selector = None self._parser = parser_class(socket_read_size=socket_read_size) self._description_args = { 'host': self.host, @@ -454,7 +511,6 @@ class Connection(object): raise ConnectionError(self._error_message(e)) self._sock = sock - self._selector = DefaultSelector(sock) try: self.on_connect() except RedisError: @@ -538,9 +594,6 @@ class Connection(object): self._parser.on_disconnect() if self._sock is None: return - if self._selector is not None: - self._selector.close() - self._selector = None try: if os.getpid() == self.pid: self._sock.shutdown(socket.SHUT_RDWR) @@ -585,11 +638,7 @@ class Connection(object): if not sock: self.connect() sock = self._sock - return self._parser.can_read() or self._selector.can_read(timeout) - - def is_ready_for_command(self): - "Check if the connection is ready for a command" - return self._selector.is_ready_for_command() + return self._parser.can_read(timeout) def read_response(self): "Read the response from a previously sent command" @@ -963,10 +1012,13 @@ class ConnectionPool(object): # a command. if not, the connection was either returned to the # pool before all data has been read or the socket has been # closed. either way, reconnect and verify everything is good. - if not connection.is_ready_for_command(): + try: + if connection.can_read(): + raise ConnectionError('Connection has data') + except ConnectionError: connection.disconnect() connection.connect() - if not connection.is_ready_for_command(): + if connection.can_read(): raise ConnectionError('Connection not ready') except: # noqa: E722 # release the connection back to the pool so that we don't leak it @@ -1111,10 +1163,13 @@ class BlockingConnectionPool(ConnectionPool): # a command. if not, the connection was either returned to the # pool before all data has been read or the socket has been # closed. either way, reconnect and verify everything is good. - if not connection.is_ready_for_command(): + try: + if connection.can_read(): + raise ConnectionError('Connection has data') + except ConnectionError: connection.disconnect() connection.connect() - if not connection.is_ready_for_command(): + if connection.can_read(): raise ConnectionError('Connection not ready') except: # noqa: E722 # release the connection back to the pool so that we don't leak it |