summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES5
-rw-r--r--redis/__init__.py2
-rw-r--r--redis/_compat.py4
-rwxr-xr-xredis/connection.py155
-rw-r--r--redis/selector.py196
-rw-r--r--tests/test_connection_pool.py4
-rw-r--r--tests/test_pubsub.py13
-rw-r--r--tests/test_selector.py122
8 files changed, 128 insertions, 373 deletions
diff --git a/CHANGES b/CHANGES
index 5256faf..e2d06c1 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,4 +1,4 @@
-* 3.2.2 (in development)
+* 3.3.0 (in development)
* Resolve a race condition with the PubSubWorkerThread. #1150
* Cleanup socket read error messages. Thanks Vic Yu. #1159
* Cleanup the Connection's selector correctly. Thanks Bruce Merry. #1153
@@ -17,6 +17,9 @@
cause the connection to be disconnected and cleaned up appropriately.
#923
* Add READONLY and READWRITE commands. Thanks @theodesp. #1114
+ * Remove selectors in favor of nonblocking sockets. Selectors had
+ issues in some environments including eventlet and gevent. This should
+ resolve those issues with no other side effects.
* 3.2.1
* Fix SentinelConnectionPool to work in multiprocess/forked environments.
* 3.2.0
diff --git a/redis/__init__.py b/redis/__init__.py
index b74c403..2135fd8 100644
--- a/redis/__init__.py
+++ b/redis/__init__.py
@@ -29,7 +29,7 @@ def int_or_str(value):
return value
-__version__ = '3.2.1'
+__version__ = '3.3.dev2'
VERSION = tuple(map(int_or_str, __version__.split('.')))
__all__ = [
diff --git a/redis/_compat.py b/redis/_compat.py
index bde6fb6..d70af2a 100644
--- a/redis/_compat.py
+++ b/redis/_compat.py
@@ -1,12 +1,12 @@
"""Internal module for Python 2 backwards compatibility."""
import errno
+import socket
import sys
# For Python older than 3.5, retry EINTR.
if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and
sys.version_info[1] < 5):
# Adapted from https://bugs.python.org/review/23863/patch/14532/54418
- import socket
import time
# Wrapper for handling interruptable system calls.
@@ -100,6 +100,7 @@ if sys.version_info[0] < 3:
basestring = basestring
unicode = unicode
long = long
+ BlockingIOError = socket.error
else:
from urllib.parse import parse_qs, unquote, urlparse
from string import ascii_letters
@@ -129,6 +130,7 @@ else:
unicode = str
safe_unicode = str
long = int
+ BlockingIOError = BlockingIOError
try: # Python 3
from queue import LifoQueue, Empty, Full
diff --git a/redis/connection.py b/redis/connection.py
index 88286c8..7d4301a 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -1,5 +1,6 @@
from __future__ import unicode_literals
from distutils.version import StrictVersion
+from errno import EWOULDBLOCK
from itertools import chain
import io
import os
@@ -17,7 +18,7 @@ except ImportError:
from redis._compat import (xrange, imap, byte_to_chr, unicode, long,
nativestr, basestring, iteritems,
LifoQueue, Empty, Full, urlparse, parse_qs,
- recv, recv_into, unquote)
+ recv, recv_into, unquote, BlockingIOError)
from redis.exceptions import (
DataError,
RedisError,
@@ -31,7 +32,6 @@ from redis.exceptions import (
ExecAbortError,
ReadOnlyError
)
-from redis.selector import DefaultSelector
from redis.utils import HIREDIS_AVAILABLE
if HIREDIS_AVAILABLE:
import hiredis
@@ -61,6 +61,8 @@ SYM_EMPTY = b''
SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server."
+SENTINEL = object()
+
class Encoder(object):
"Encode strings to bytes and decode bytes to strings"
@@ -126,9 +128,10 @@ class BaseParser(object):
class SocketBuffer(object):
- def __init__(self, socket, socket_read_size):
+ def __init__(self, socket, socket_read_size, socket_timeout):
self._sock = socket
self.socket_read_size = socket_read_size
+ self.socket_timeout = socket_timeout
self._buffer = io.BytesIO()
# number of bytes written to the buffer from the socket
self.bytes_written = 0
@@ -139,25 +142,51 @@ class SocketBuffer(object):
def length(self):
return self.bytes_written - self.bytes_read
- def _read_from_socket(self, length=None):
+ def _read_from_socket(self, length=None, timeout=SENTINEL,
+ raise_on_timeout=True):
+ sock = self._sock
socket_read_size = self.socket_read_size
buf = self._buffer
buf.seek(self.bytes_written)
marker = 0
+ custom_timeout = timeout is not SENTINEL
- while True:
- data = recv(self._sock, socket_read_size)
- # an empty string indicates the server shutdown the socket
- if isinstance(data, bytes) and len(data) == 0:
- raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
- buf.write(data)
- data_length = len(data)
- self.bytes_written += data_length
- marker += data_length
-
- if length is not None and length > marker:
- continue
- break
+ try:
+ if custom_timeout:
+ sock.settimeout(timeout)
+ while True:
+ data = recv(self._sock, socket_read_size)
+ # an empty string indicates the server shutdown the socket
+ if isinstance(data, bytes) and len(data) == 0:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
+ buf.write(data)
+ data_length = len(data)
+ self.bytes_written += data_length
+ marker += data_length
+
+ if length is not None and length > marker:
+ continue
+ return True
+ except BlockingIOError as ex:
+ # if we're in nonblocking mode and the recv raises a
+ # blocking error, simply return False indicating that
+ # there's no data to be read. otherwise raise the
+ # original exception.
+ if raise_on_timeout or ex.errno != EWOULDBLOCK:
+ raise
+ return False
+ except socket.timeout:
+ if raise_on_timeout:
+ raise
+ return False
+ finally:
+ if custom_timeout:
+ sock.settimeout(self.socket_timeout)
+
+ def can_read(self, timeout):
+ return bool(self.length) or \
+ self._read_from_socket(timeout=timeout,
+ raise_on_timeout=False)
def read(self, length):
length = length + 2 # make sure to read the \r\n terminator
@@ -233,7 +262,9 @@ class PythonParser(BaseParser):
def on_connect(self, connection):
"Called when the socket connects"
self._sock = connection._sock
- self._buffer = SocketBuffer(self._sock, self.socket_read_size)
+ self._buffer = SocketBuffer(self._sock,
+ self.socket_read_size,
+ connection.socket_timeout)
self.encoder = connection.encoder
def on_disconnect(self):
@@ -244,8 +275,8 @@ class PythonParser(BaseParser):
self._buffer = None
self.encoder = None
- def can_read(self):
- return self._buffer and bool(self._buffer.length)
+ def can_read(self, timeout):
+ return self._buffer and self._buffer.can_read(timeout)
def read_response(self):
response = self._buffer.readline()
@@ -312,6 +343,7 @@ class HiredisParser(BaseParser):
def on_connect(self, connection):
self._sock = connection._sock
+ self._socket_timeout = connection.socket_timeout
kwargs = {
'protocolError': InvalidResponse,
'replyError': self.parse_error,
@@ -333,13 +365,52 @@ class HiredisParser(BaseParser):
self._reader = None
self._next_response = False
- def can_read(self):
+ def can_read(self, timeout):
if not self._reader:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
if self._next_response is False:
self._next_response = self._reader.gets()
- return self._next_response is not False
+ if self._next_response is False:
+ return self.read_from_socket(timeout=timeout,
+ raise_on_timeout=False)
+ return True
+
+ def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True):
+ sock = self._sock
+ custom_timeout = timeout is not SENTINEL
+ try:
+ if custom_timeout:
+ sock.settimeout(timeout)
+ if HIREDIS_USE_BYTE_BUFFER:
+ bufflen = recv_into(self._sock, self._buffer)
+ if bufflen == 0:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
+ self._reader.feed(self._buffer, 0, bufflen)
+ else:
+ buffer = recv(self._sock, self.socket_read_size)
+ # an empty string indicates the server shutdown the socket
+ if not isinstance(buffer, bytes) or len(buffer) == 0:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
+ self._reader.feed(buffer)
+ # data was read from the socket and added to the buffer.
+ # return True to indicate that data was read.
+ return True
+ except BlockingIOError as ex:
+ # if we're in nonblocking mode and the recv raises a
+ # blocking error, simply return False indicating that
+ # there's no data to be read. otherwise raise the
+ # original exception.
+ if raise_on_timeout or ex.errno != EWOULDBLOCK:
+ raise
+ return False
+ except socket.timeout:
+ if not raise_on_timeout:
+ raise
+ return False
+ finally:
+ if custom_timeout:
+ sock.settimeout(self._socket_timeout)
def read_response(self):
if not self._reader:
@@ -352,21 +423,8 @@ class HiredisParser(BaseParser):
return response
response = self._reader.gets()
- socket_read_size = self.socket_read_size
while response is False:
- if HIREDIS_USE_BYTE_BUFFER:
- bufflen = recv_into(self._sock, self._buffer)
- if bufflen == 0:
- raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
- else:
- buffer = recv(self._sock, socket_read_size)
- # an empty string indicates the server shutdown the socket
- if not isinstance(buffer, bytes) or len(buffer) == 0:
- raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
- if HIREDIS_USE_BYTE_BUFFER:
- self._reader.feed(self._buffer, 0, bufflen)
- else:
- self._reader.feed(buffer)
+ self.read_from_socket()
response = self._reader.gets()
# if an older version of hiredis is installed, we need to attempt
# to convert ResponseErrors to their appropriate types.
@@ -416,7 +474,6 @@ class Connection(object):
self.retry_on_timeout = retry_on_timeout
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self._sock = None
- self._selector = None
self._parser = parser_class(socket_read_size=socket_read_size)
self._description_args = {
'host': self.host,
@@ -454,7 +511,6 @@ class Connection(object):
raise ConnectionError(self._error_message(e))
self._sock = sock
- self._selector = DefaultSelector(sock)
try:
self.on_connect()
except RedisError:
@@ -538,9 +594,6 @@ class Connection(object):
self._parser.on_disconnect()
if self._sock is None:
return
- if self._selector is not None:
- self._selector.close()
- self._selector = None
try:
if os.getpid() == self.pid:
self._sock.shutdown(socket.SHUT_RDWR)
@@ -585,11 +638,7 @@ class Connection(object):
if not sock:
self.connect()
sock = self._sock
- return self._parser.can_read() or self._selector.can_read(timeout)
-
- def is_ready_for_command(self):
- "Check if the connection is ready for a command"
- return self._selector.is_ready_for_command()
+ return self._parser.can_read(timeout)
def read_response(self):
"Read the response from a previously sent command"
@@ -963,10 +1012,13 @@ class ConnectionPool(object):
# a command. if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
- if not connection.is_ready_for_command():
+ try:
+ if connection.can_read():
+ raise ConnectionError('Connection has data')
+ except ConnectionError:
connection.disconnect()
connection.connect()
- if not connection.is_ready_for_command():
+ if connection.can_read():
raise ConnectionError('Connection not ready')
except: # noqa: E722
# release the connection back to the pool so that we don't leak it
@@ -1111,10 +1163,13 @@ class BlockingConnectionPool(ConnectionPool):
# a command. if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
- if not connection.is_ready_for_command():
+ try:
+ if connection.can_read():
+ raise ConnectionError('Connection has data')
+ except ConnectionError:
connection.disconnect()
connection.connect()
- if not connection.is_ready_for_command():
+ if connection.can_read():
raise ConnectionError('Connection not ready')
except: # noqa: E722
# release the connection back to the pool so that we don't leak it
diff --git a/redis/selector.py b/redis/selector.py
deleted file mode 100644
index bce84a5..0000000
--- a/redis/selector.py
+++ /dev/null
@@ -1,196 +0,0 @@
-import errno
-import select
-from redis.exceptions import RedisError
-
-
-_DEFAULT_SELECTOR = None
-
-
-class BaseSelector(object):
- """
- Base class for all Selectors
- """
- def __init__(self, sock):
- self.sock = sock
-
- def can_read(self, timeout=0):
- """
- Return True if data is ready to be read from the socket,
- otherwise False.
-
- This doesn't guarentee that the socket is still connected, just that
- there is data to read.
-
- Automatically retries EINTR errors based on PEP 475.
- """
- while True:
- try:
- return self.check_can_read(timeout)
- except (select.error, IOError) as ex:
- if self.errno_from_exception(ex) == errno.EINTR:
- continue
- return False
-
- def is_ready_for_command(self, timeout=0):
- """
- Return True if the socket is ready to send a command,
- otherwise False.
-
- Automatically retries EINTR errors based on PEP 475.
- """
- while True:
- try:
- return self.check_is_ready_for_command(timeout)
- except (select.error, IOError) as ex:
- if self.errno_from_exception(ex) == errno.EINTR:
- continue
- return False
-
- def check_can_read(self, timeout):
- """
- Perform the can_read check. Subclasses should implement this.
- """
- raise NotImplementedError
-
- def check_is_ready_for_command(self, timeout):
- """
- Perform the is_ready_for_command check. Subclasses should
- implement this.
- """
- raise NotImplementedError
-
- def close(self):
- """
- Close the selector.
- """
- self.sock = None
-
- def errno_from_exception(self, ex):
- """
- Get the error number from an exception
- """
- if hasattr(ex, 'errno'):
- return ex.errno
- elif ex.args:
- return ex.args[0]
- else:
- return None
-
-
-if hasattr(select, 'select'):
- class SelectSelector(BaseSelector):
- """
- A select-based selector that should work on most platforms.
-
- This is the worst poll strategy and should only be used if no other
- option is available.
- """
- def check_can_read(self, timeout):
- """
- Return True if data is ready to be read from the socket,
- otherwise False.
-
- This doesn't guarentee that the socket is still connected, just
- that there is data to read.
- """
- return bool(select.select([self.sock], [], [], timeout)[0])
-
- def check_is_ready_for_command(self, timeout):
- """
- Return True if the socket is ready to send a command,
- otherwise False.
- """
- r, w, e = select.select([self.sock], [self.sock], [self.sock],
- timeout)
- return bool(w and not r and not e)
-
-
-if hasattr(select, 'poll'):
- class PollSelector(BaseSelector):
- """
- A poll-based selector that should work on (almost?) all versions
- of Unix
- """
- READ_MASK = select.POLLIN | select.POLLPRI
- ERROR_MASK = select.POLLERR | select.POLLHUP
- WRITE_MASK = select.POLLOUT
-
- _READ_POLLER_MASK = READ_MASK | ERROR_MASK
- _READY_POLLER_MASK = READ_MASK | ERROR_MASK | WRITE_MASK
-
- def __init__(self, sock):
- super(PollSelector, self).__init__(sock)
- self.read_poller = select.poll()
- self.read_poller.register(sock, self._READ_POLLER_MASK)
- self.ready_poller = select.poll()
- self.ready_poller.register(sock, self._READY_POLLER_MASK)
-
- def close(self):
- """
- Close the selector.
- """
- for poller in (self.read_poller, self.ready_poller):
- try:
- poller.unregister(self.sock)
- except (KeyError, ValueError):
- # KeyError is raised if somehow the socket was not
- # registered
- # ValueError is raised if the socket's file descriptor is
- # negative.
- # In either case, we can't do anything better than to
- # remove the reference to the poller.
- pass
- self.read_poller = None
- self.ready_poller = None
- self.sock = None
-
- def check_can_read(self, timeout=0):
- """
- Return True if data is ready to be read from the socket,
- otherwise False.
-
- This doesn't guarentee that the socket is still connected, just
- that there is data to read.
- """
- timeout = int(timeout * 1000)
- events = self.read_poller.poll(timeout)
- return bool(events and events[0][1] & self.READ_MASK)
-
- def check_is_ready_for_command(self, timeout=0):
- """
- Return True if the socket is ready to send a command,
- otherwise False
- """
- timeout = timeout * 1000
- events = self.ready_poller.poll(timeout)
- return bool(events and events[0][1] == self.WRITE_MASK)
-
-
-def has_selector(selector):
- "Determine if the current platform has the selector available"
- try:
- if selector == 'poll':
- # the select module offers the poll selector even if the platform
- # doesn't support it. Attempt to poll for nothing to make sure
- # poll is available
- p = select.poll()
- p.poll(0)
- else:
- # the other selectors will fail when instantiated
- getattr(select, selector)().close()
- return True
- except (OSError, AttributeError):
- return False
-
-
-def DefaultSelector(sock):
- "Return the best selector for the platform"
- global _DEFAULT_SELECTOR
- if _DEFAULT_SELECTOR is None:
- if has_selector('poll'):
- _DEFAULT_SELECTOR = PollSelector
- elif hasattr(select, 'select'):
- _DEFAULT_SELECTOR = SelectSelector
- else:
- raise RedisError('Platform does not support any selectors')
- return _DEFAULT_SELECTOR(sock)
diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py
index f258411..2aea1e4 100644
--- a/tests/test_connection_pool.py
+++ b/tests/test_connection_pool.py
@@ -19,8 +19,8 @@ class DummyConnection(object):
def connect(self):
pass
- def is_ready_for_command(self):
- return True
+ def can_read(self):
+ return False
class TestConnectionPool(object):
diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py
index fc91abf..7f94b4a 100644
--- a/tests/test_pubsub.py
+++ b/tests/test_pubsub.py
@@ -490,3 +490,16 @@ class TestPubSubPings(object):
assert wait_for_message(p) == make_message(type='pong', channel=None,
data='hello world',
pattern=None)
+
+
+class TestPubSubConnectionKilled(object):
+
+ @skip_if_server_version_lt('3.0.0')
+ def test_connection_error_raised_when_connection_dies(self, r):
+ p = r.pubsub(ignore_subscribe_messages=True)
+ p.subscribe('foo')
+ for client in r.client_list():
+ if client['cmd'] == 'subscribe':
+ r.client_kill_filter(_id=client['id'])
+ with pytest.raises(ConnectionError):
+ wait_for_message(p)
diff --git a/tests/test_selector.py b/tests/test_selector.py
deleted file mode 100644
index 07bd6dc..0000000
--- a/tests/test_selector.py
+++ /dev/null
@@ -1,122 +0,0 @@
-import pytest
-import time
-from redis import selector
-
-_SELECTORS = (
- 'SelectSelector',
- 'PollSelector',
-)
-
-
-@pytest.mark.parametrize('selector_name', _SELECTORS)
-class TestSelector(object):
-
- @pytest.fixture()
- def selector_patch(self, selector_name, request):
- "A fixture to patch the DefaultSelector with each selector"
- if not hasattr(selector, selector_name):
- pytest.skip('selector %s unavailable' % selector_name)
- default_selector = selector._DEFAULT_SELECTOR
-
- def revert_selector():
- selector._DEFAULT_SELECTOR = default_selector
- request.addfinalizer(revert_selector)
-
- selector._DEFAULT_SELECTOR = getattr(selector, selector_name)
-
- def kill_connection(self, connection, r):
- "Helper that tells the redis server to kill `connection`"
- # set a name for the connection so that we can identify and kill it
- connection.send_command('client', 'setname', 'redis-py-1')
- assert connection.read_response() == b'OK'
-
- # find the client based on its name and kill it
- for client in r.client_list():
- if client['name'] == 'redis-py-1':
- assert r.client_kill(client['addr'])
- break
- else:
- assert False, 'Client redis-py-1 not found in client list'
-
- def test_can_read(self, selector_patch, r):
- c = r.connection_pool.get_connection('_')
-
- # a fresh connection should not be readable
- assert not c.can_read()
-
- c.send_command('PING')
- # a connection should be readable when a response is available
- # note that we supply a timeout here to make sure the server has
- # a chance to respond
- assert c.can_read(1.0)
-
- assert c.read_response() == b'PONG'
-
- # once the response is read, the connection is no longer readable
- assert not c.can_read()
-
- def test_is_ready_for_command(self, selector_patch, r):
- c = r.connection_pool.get_connection('_')
-
- # a fresh connection should be ready for a new command
- assert c.is_ready_for_command()
-
- c.send_command('PING')
- # once the server replies with a response, the selector should report
- # that the connection is no longer ready since there is data that
- # can be read. note that we need to wait for the server to respond
- wait_until = time.time() + 2
- while time.time() < wait_until:
- if not c.is_ready_for_command():
- break
- time.sleep(0.01)
-
- assert not c.is_ready_for_command()
-
- assert c.read_response() == b'PONG'
-
- # once the response is read, the connection should be ready again
- assert c.is_ready_for_command()
-
- def test_killed_connection_no_longer_ready(self, selector_patch, r):
- "A connection that becomes disconnected is no longer ready"
- c = r.connection_pool.get_connection('_')
- # the connection should start as ready
- assert c.is_ready_for_command()
-
- self.kill_connection(c, r)
-
- # the selector should immediately report that the socket is no
- # longer ready
- assert not c.is_ready_for_command()
-
- def test_pool_restores_killed_connection(self, selector_patch, r2):
- """
- The ConnectionPool only returns healthy connecdtions, even if the
- connection was killed while idle in the pool.
- """
- # r2 provides two separate clients/connection pools
- r = r2[0]
- c = r.connection_pool.get_connection('_')
- c._test_client = True
- # the connection should start as ready
- assert c.is_ready_for_command()
-
- # release the connection back to the pool
- r.connection_pool.release(c)
-
- # kill the connection that is now idle in the pool
- # use the second redis client/pool instance run the kill command
- # such that it doesn't manipulate the primary connection pool
- self.kill_connection(c, r2[1])
-
- assert not c.is_ready_for_command()
-
- # retrieving the connection from the pool should provide us with
- # the same connection we were previously using and it should now
- # be ready for a command
- c2 = r.connection_pool.get_connection('_')
- assert c2 == c
- assert c2._test_client is True
-
- assert c.is_ready_for_command()