summaryrefslogtreecommitdiff
path: root/redis/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-xredis/connection.py155
1 files changed, 105 insertions, 50 deletions
diff --git a/redis/connection.py b/redis/connection.py
index 88286c8..7d4301a 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -1,5 +1,6 @@
from __future__ import unicode_literals
from distutils.version import StrictVersion
+from errno import EWOULDBLOCK
from itertools import chain
import io
import os
@@ -17,7 +18,7 @@ except ImportError:
from redis._compat import (xrange, imap, byte_to_chr, unicode, long,
nativestr, basestring, iteritems,
LifoQueue, Empty, Full, urlparse, parse_qs,
- recv, recv_into, unquote)
+ recv, recv_into, unquote, BlockingIOError)
from redis.exceptions import (
DataError,
RedisError,
@@ -31,7 +32,6 @@ from redis.exceptions import (
ExecAbortError,
ReadOnlyError
)
-from redis.selector import DefaultSelector
from redis.utils import HIREDIS_AVAILABLE
if HIREDIS_AVAILABLE:
import hiredis
@@ -61,6 +61,8 @@ SYM_EMPTY = b''
SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server."
+SENTINEL = object()
+
class Encoder(object):
"Encode strings to bytes and decode bytes to strings"
@@ -126,9 +128,10 @@ class BaseParser(object):
class SocketBuffer(object):
- def __init__(self, socket, socket_read_size):
+ def __init__(self, socket, socket_read_size, socket_timeout):
self._sock = socket
self.socket_read_size = socket_read_size
+ self.socket_timeout = socket_timeout
self._buffer = io.BytesIO()
# number of bytes written to the buffer from the socket
self.bytes_written = 0
@@ -139,25 +142,51 @@ class SocketBuffer(object):
def length(self):
return self.bytes_written - self.bytes_read
- def _read_from_socket(self, length=None):
+ def _read_from_socket(self, length=None, timeout=SENTINEL,
+ raise_on_timeout=True):
+ sock = self._sock
socket_read_size = self.socket_read_size
buf = self._buffer
buf.seek(self.bytes_written)
marker = 0
+ custom_timeout = timeout is not SENTINEL
- while True:
- data = recv(self._sock, socket_read_size)
- # an empty string indicates the server shutdown the socket
- if isinstance(data, bytes) and len(data) == 0:
- raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
- buf.write(data)
- data_length = len(data)
- self.bytes_written += data_length
- marker += data_length
-
- if length is not None and length > marker:
- continue
- break
+ try:
+ if custom_timeout:
+ sock.settimeout(timeout)
+ while True:
+ data = recv(self._sock, socket_read_size)
+ # an empty string indicates the server shutdown the socket
+ if isinstance(data, bytes) and len(data) == 0:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
+ buf.write(data)
+ data_length = len(data)
+ self.bytes_written += data_length
+ marker += data_length
+
+ if length is not None and length > marker:
+ continue
+ return True
+ except BlockingIOError as ex:
+ # if we're in nonblocking mode and the recv raises a
+ # blocking error, simply return False indicating that
+ # there's no data to be read. otherwise raise the
+ # original exception.
+ if raise_on_timeout or ex.errno != EWOULDBLOCK:
+ raise
+ return False
+ except socket.timeout:
+ if raise_on_timeout:
+ raise
+ return False
+ finally:
+ if custom_timeout:
+ sock.settimeout(self.socket_timeout)
+
+ def can_read(self, timeout):
+ return bool(self.length) or \
+ self._read_from_socket(timeout=timeout,
+ raise_on_timeout=False)
def read(self, length):
length = length + 2 # make sure to read the \r\n terminator
@@ -233,7 +262,9 @@ class PythonParser(BaseParser):
def on_connect(self, connection):
"Called when the socket connects"
self._sock = connection._sock
- self._buffer = SocketBuffer(self._sock, self.socket_read_size)
+ self._buffer = SocketBuffer(self._sock,
+ self.socket_read_size,
+ connection.socket_timeout)
self.encoder = connection.encoder
def on_disconnect(self):
@@ -244,8 +275,8 @@ class PythonParser(BaseParser):
self._buffer = None
self.encoder = None
- def can_read(self):
- return self._buffer and bool(self._buffer.length)
+ def can_read(self, timeout):
+ return self._buffer and self._buffer.can_read(timeout)
def read_response(self):
response = self._buffer.readline()
@@ -312,6 +343,7 @@ class HiredisParser(BaseParser):
def on_connect(self, connection):
self._sock = connection._sock
+ self._socket_timeout = connection.socket_timeout
kwargs = {
'protocolError': InvalidResponse,
'replyError': self.parse_error,
@@ -333,13 +365,52 @@ class HiredisParser(BaseParser):
self._reader = None
self._next_response = False
- def can_read(self):
+ def can_read(self, timeout):
if not self._reader:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
if self._next_response is False:
self._next_response = self._reader.gets()
- return self._next_response is not False
+ if self._next_response is False:
+ return self.read_from_socket(timeout=timeout,
+ raise_on_timeout=False)
+ return True
+
+ def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True):
+ sock = self._sock
+ custom_timeout = timeout is not SENTINEL
+ try:
+ if custom_timeout:
+ sock.settimeout(timeout)
+ if HIREDIS_USE_BYTE_BUFFER:
+ bufflen = recv_into(self._sock, self._buffer)
+ if bufflen == 0:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
+ self._reader.feed(self._buffer, 0, bufflen)
+ else:
+ buffer = recv(self._sock, self.socket_read_size)
+ # an empty string indicates the server shutdown the socket
+ if not isinstance(buffer, bytes) or len(buffer) == 0:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
+ self._reader.feed(buffer)
+ # data was read from the socket and added to the buffer.
+ # return True to indicate that data was read.
+ return True
+ except BlockingIOError as ex:
+ # if we're in nonblocking mode and the recv raises a
+ # blocking error, simply return False indicating that
+ # there's no data to be read. otherwise raise the
+ # original exception.
+ if raise_on_timeout or ex.errno != EWOULDBLOCK:
+ raise
+ return False
+ except socket.timeout:
+ if not raise_on_timeout:
+ raise
+ return False
+ finally:
+ if custom_timeout:
+ sock.settimeout(self._socket_timeout)
def read_response(self):
if not self._reader:
@@ -352,21 +423,8 @@ class HiredisParser(BaseParser):
return response
response = self._reader.gets()
- socket_read_size = self.socket_read_size
while response is False:
- if HIREDIS_USE_BYTE_BUFFER:
- bufflen = recv_into(self._sock, self._buffer)
- if bufflen == 0:
- raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
- else:
- buffer = recv(self._sock, socket_read_size)
- # an empty string indicates the server shutdown the socket
- if not isinstance(buffer, bytes) or len(buffer) == 0:
- raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
- if HIREDIS_USE_BYTE_BUFFER:
- self._reader.feed(self._buffer, 0, bufflen)
- else:
- self._reader.feed(buffer)
+ self.read_from_socket()
response = self._reader.gets()
# if an older version of hiredis is installed, we need to attempt
# to convert ResponseErrors to their appropriate types.
@@ -416,7 +474,6 @@ class Connection(object):
self.retry_on_timeout = retry_on_timeout
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self._sock = None
- self._selector = None
self._parser = parser_class(socket_read_size=socket_read_size)
self._description_args = {
'host': self.host,
@@ -454,7 +511,6 @@ class Connection(object):
raise ConnectionError(self._error_message(e))
self._sock = sock
- self._selector = DefaultSelector(sock)
try:
self.on_connect()
except RedisError:
@@ -538,9 +594,6 @@ class Connection(object):
self._parser.on_disconnect()
if self._sock is None:
return
- if self._selector is not None:
- self._selector.close()
- self._selector = None
try:
if os.getpid() == self.pid:
self._sock.shutdown(socket.SHUT_RDWR)
@@ -585,11 +638,7 @@ class Connection(object):
if not sock:
self.connect()
sock = self._sock
- return self._parser.can_read() or self._selector.can_read(timeout)
-
- def is_ready_for_command(self):
- "Check if the connection is ready for a command"
- return self._selector.is_ready_for_command()
+ return self._parser.can_read(timeout)
def read_response(self):
"Read the response from a previously sent command"
@@ -963,10 +1012,13 @@ class ConnectionPool(object):
# a command. if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
- if not connection.is_ready_for_command():
+ try:
+ if connection.can_read():
+ raise ConnectionError('Connection has data')
+ except ConnectionError:
connection.disconnect()
connection.connect()
- if not connection.is_ready_for_command():
+ if connection.can_read():
raise ConnectionError('Connection not ready')
except: # noqa: E722
# release the connection back to the pool so that we don't leak it
@@ -1111,10 +1163,13 @@ class BlockingConnectionPool(ConnectionPool):
# a command. if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
- if not connection.is_ready_for_command():
+ try:
+ if connection.can_read():
+ raise ConnectionError('Connection has data')
+ except ConnectionError:
connection.disconnect()
connection.connect()
- if not connection.is_ready_for_command():
+ if connection.can_read():
raise ConnectionError('Connection not ready')
except: # noqa: E722
# release the connection back to the pool so that we don't leak it