diff options
-rw-r--r-- | CHANGES | 5 | ||||
-rw-r--r-- | redis/__init__.py | 2 | ||||
-rw-r--r-- | redis/_compat.py | 4 | ||||
-rwxr-xr-x | redis/connection.py | 155 | ||||
-rw-r--r-- | redis/selector.py | 196 | ||||
-rw-r--r-- | tests/test_connection_pool.py | 4 | ||||
-rw-r--r-- | tests/test_pubsub.py | 13 | ||||
-rw-r--r-- | tests/test_selector.py | 122 |
8 files changed, 128 insertions, 373 deletions
@@ -1,4 +1,4 @@ -* 3.2.2 (in development) +* 3.3.0 (in development) * Resolve a race condition with the PubSubWorkerThread. #1150 * Cleanup socket read error messages. Thanks Vic Yu. #1159 * Cleanup the Connection's selector correctly. Thanks Bruce Merry. #1153 @@ -17,6 +17,9 @@ cause the connection to be disconnected and cleaned up appropriately. #923 * Add READONLY and READWRITE commands. Thanks @theodesp. #1114 + * Remove selectors in favor of nonblocking sockets. Selectors had + issues in some environments including eventlet and gevent. This should + resolve those issues with no other side effects. * 3.2.1 * Fix SentinelConnectionPool to work in multiprocess/forked environments. * 3.2.0 diff --git a/redis/__init__.py b/redis/__init__.py index b74c403..2135fd8 100644 --- a/redis/__init__.py +++ b/redis/__init__.py @@ -29,7 +29,7 @@ def int_or_str(value): return value -__version__ = '3.2.1' +__version__ = '3.3.dev2' VERSION = tuple(map(int_or_str, __version__.split('.'))) __all__ = [ diff --git a/redis/_compat.py b/redis/_compat.py index bde6fb6..d70af2a 100644 --- a/redis/_compat.py +++ b/redis/_compat.py @@ -1,12 +1,12 @@ """Internal module for Python 2 backwards compatibility.""" import errno +import socket import sys # For Python older than 3.5, retry EINTR. if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and sys.version_info[1] < 5): # Adapted from https://bugs.python.org/review/23863/patch/14532/54418 - import socket import time # Wrapper for handling interruptable system calls. @@ -100,6 +100,7 @@ if sys.version_info[0] < 3: basestring = basestring unicode = unicode long = long + BlockingIOError = socket.error else: from urllib.parse import parse_qs, unquote, urlparse from string import ascii_letters @@ -129,6 +130,7 @@ else: unicode = str safe_unicode = str long = int + BlockingIOError = BlockingIOError try: # Python 3 from queue import LifoQueue, Empty, Full diff --git a/redis/connection.py b/redis/connection.py index 88286c8..7d4301a 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals from distutils.version import StrictVersion +from errno import EWOULDBLOCK from itertools import chain import io import os @@ -17,7 +18,7 @@ except ImportError: from redis._compat import (xrange, imap, byte_to_chr, unicode, long, nativestr, basestring, iteritems, LifoQueue, Empty, Full, urlparse, parse_qs, - recv, recv_into, unquote) + recv, recv_into, unquote, BlockingIOError) from redis.exceptions import ( DataError, RedisError, @@ -31,7 +32,6 @@ from redis.exceptions import ( ExecAbortError, ReadOnlyError ) -from redis.selector import DefaultSelector from redis.utils import HIREDIS_AVAILABLE if HIREDIS_AVAILABLE: import hiredis @@ -61,6 +61,8 @@ SYM_EMPTY = b'' SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server." +SENTINEL = object() + class Encoder(object): "Encode strings to bytes and decode bytes to strings" @@ -126,9 +128,10 @@ class BaseParser(object): class SocketBuffer(object): - def __init__(self, socket, socket_read_size): + def __init__(self, socket, socket_read_size, socket_timeout): self._sock = socket self.socket_read_size = socket_read_size + self.socket_timeout = socket_timeout self._buffer = io.BytesIO() # number of bytes written to the buffer from the socket self.bytes_written = 0 @@ -139,25 +142,51 @@ class SocketBuffer(object): def length(self): return self.bytes_written - self.bytes_read - def _read_from_socket(self, length=None): + def _read_from_socket(self, length=None, timeout=SENTINEL, + raise_on_timeout=True): + sock = self._sock socket_read_size = self.socket_read_size buf = self._buffer buf.seek(self.bytes_written) marker = 0 + custom_timeout = timeout is not SENTINEL - while True: - data = recv(self._sock, socket_read_size) - # an empty string indicates the server shutdown the socket - if isinstance(data, bytes) and len(data) == 0: - raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) - buf.write(data) - data_length = len(data) - self.bytes_written += data_length - marker += data_length - - if length is not None and length > marker: - continue - break + try: + if custom_timeout: + sock.settimeout(timeout) + while True: + data = recv(self._sock, socket_read_size) + # an empty string indicates the server shutdown the socket + if isinstance(data, bytes) and len(data) == 0: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) + buf.write(data) + data_length = len(data) + self.bytes_written += data_length + marker += data_length + + if length is not None and length > marker: + continue + return True + except BlockingIOError as ex: + # if we're in nonblocking mode and the recv raises a + # blocking error, simply return False indicating that + # there's no data to be read. otherwise raise the + # original exception. + if raise_on_timeout or ex.errno != EWOULDBLOCK: + raise + return False + except socket.timeout: + if raise_on_timeout: + raise + return False + finally: + if custom_timeout: + sock.settimeout(self.socket_timeout) + + def can_read(self, timeout): + return bool(self.length) or \ + self._read_from_socket(timeout=timeout, + raise_on_timeout=False) def read(self, length): length = length + 2 # make sure to read the \r\n terminator @@ -233,7 +262,9 @@ class PythonParser(BaseParser): def on_connect(self, connection): "Called when the socket connects" self._sock = connection._sock - self._buffer = SocketBuffer(self._sock, self.socket_read_size) + self._buffer = SocketBuffer(self._sock, + self.socket_read_size, + connection.socket_timeout) self.encoder = connection.encoder def on_disconnect(self): @@ -244,8 +275,8 @@ class PythonParser(BaseParser): self._buffer = None self.encoder = None - def can_read(self): - return self._buffer and bool(self._buffer.length) + def can_read(self, timeout): + return self._buffer and self._buffer.can_read(timeout) def read_response(self): response = self._buffer.readline() @@ -312,6 +343,7 @@ class HiredisParser(BaseParser): def on_connect(self, connection): self._sock = connection._sock + self._socket_timeout = connection.socket_timeout kwargs = { 'protocolError': InvalidResponse, 'replyError': self.parse_error, @@ -333,13 +365,52 @@ class HiredisParser(BaseParser): self._reader = None self._next_response = False - def can_read(self): + def can_read(self, timeout): if not self._reader: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) if self._next_response is False: self._next_response = self._reader.gets() - return self._next_response is not False + if self._next_response is False: + return self.read_from_socket(timeout=timeout, + raise_on_timeout=False) + return True + + def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True): + sock = self._sock + custom_timeout = timeout is not SENTINEL + try: + if custom_timeout: + sock.settimeout(timeout) + if HIREDIS_USE_BYTE_BUFFER: + bufflen = recv_into(self._sock, self._buffer) + if bufflen == 0: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) + self._reader.feed(self._buffer, 0, bufflen) + else: + buffer = recv(self._sock, self.socket_read_size) + # an empty string indicates the server shutdown the socket + if not isinstance(buffer, bytes) or len(buffer) == 0: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) + self._reader.feed(buffer) + # data was read from the socket and added to the buffer. + # return True to indicate that data was read. + return True + except BlockingIOError as ex: + # if we're in nonblocking mode and the recv raises a + # blocking error, simply return False indicating that + # there's no data to be read. otherwise raise the + # original exception. + if raise_on_timeout or ex.errno != EWOULDBLOCK: + raise + return False + except socket.timeout: + if not raise_on_timeout: + raise + return False + finally: + if custom_timeout: + sock.settimeout(self._socket_timeout) def read_response(self): if not self._reader: @@ -352,21 +423,8 @@ class HiredisParser(BaseParser): return response response = self._reader.gets() - socket_read_size = self.socket_read_size while response is False: - if HIREDIS_USE_BYTE_BUFFER: - bufflen = recv_into(self._sock, self._buffer) - if bufflen == 0: - raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) - else: - buffer = recv(self._sock, socket_read_size) - # an empty string indicates the server shutdown the socket - if not isinstance(buffer, bytes) or len(buffer) == 0: - raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) - if HIREDIS_USE_BYTE_BUFFER: - self._reader.feed(self._buffer, 0, bufflen) - else: - self._reader.feed(buffer) + self.read_from_socket() response = self._reader.gets() # if an older version of hiredis is installed, we need to attempt # to convert ResponseErrors to their appropriate types. @@ -416,7 +474,6 @@ class Connection(object): self.retry_on_timeout = retry_on_timeout self.encoder = Encoder(encoding, encoding_errors, decode_responses) self._sock = None - self._selector = None self._parser = parser_class(socket_read_size=socket_read_size) self._description_args = { 'host': self.host, @@ -454,7 +511,6 @@ class Connection(object): raise ConnectionError(self._error_message(e)) self._sock = sock - self._selector = DefaultSelector(sock) try: self.on_connect() except RedisError: @@ -538,9 +594,6 @@ class Connection(object): self._parser.on_disconnect() if self._sock is None: return - if self._selector is not None: - self._selector.close() - self._selector = None try: if os.getpid() == self.pid: self._sock.shutdown(socket.SHUT_RDWR) @@ -585,11 +638,7 @@ class Connection(object): if not sock: self.connect() sock = self._sock - return self._parser.can_read() or self._selector.can_read(timeout) - - def is_ready_for_command(self): - "Check if the connection is ready for a command" - return self._selector.is_ready_for_command() + return self._parser.can_read(timeout) def read_response(self): "Read the response from a previously sent command" @@ -963,10 +1012,13 @@ class ConnectionPool(object): # a command. if not, the connection was either returned to the # pool before all data has been read or the socket has been # closed. either way, reconnect and verify everything is good. - if not connection.is_ready_for_command(): + try: + if connection.can_read(): + raise ConnectionError('Connection has data') + except ConnectionError: connection.disconnect() connection.connect() - if not connection.is_ready_for_command(): + if connection.can_read(): raise ConnectionError('Connection not ready') except: # noqa: E722 # release the connection back to the pool so that we don't leak it @@ -1111,10 +1163,13 @@ class BlockingConnectionPool(ConnectionPool): # a command. if not, the connection was either returned to the # pool before all data has been read or the socket has been # closed. either way, reconnect and verify everything is good. - if not connection.is_ready_for_command(): + try: + if connection.can_read(): + raise ConnectionError('Connection has data') + except ConnectionError: connection.disconnect() connection.connect() - if not connection.is_ready_for_command(): + if connection.can_read(): raise ConnectionError('Connection not ready') except: # noqa: E722 # release the connection back to the pool so that we don't leak it diff --git a/redis/selector.py b/redis/selector.py deleted file mode 100644 index bce84a5..0000000 --- a/redis/selector.py +++ /dev/null @@ -1,196 +0,0 @@ -import errno -import select -from redis.exceptions import RedisError - - -_DEFAULT_SELECTOR = None - - -class BaseSelector(object): - """ - Base class for all Selectors - """ - def __init__(self, sock): - self.sock = sock - - def can_read(self, timeout=0): - """ - Return True if data is ready to be read from the socket, - otherwise False. - - This doesn't guarentee that the socket is still connected, just that - there is data to read. - - Automatically retries EINTR errors based on PEP 475. - """ - while True: - try: - return self.check_can_read(timeout) - except (select.error, IOError) as ex: - if self.errno_from_exception(ex) == errno.EINTR: - continue - return False - - def is_ready_for_command(self, timeout=0): - """ - Return True if the socket is ready to send a command, - otherwise False. - - Automatically retries EINTR errors based on PEP 475. - """ - while True: - try: - return self.check_is_ready_for_command(timeout) - except (select.error, IOError) as ex: - if self.errno_from_exception(ex) == errno.EINTR: - continue - return False - - def check_can_read(self, timeout): - """ - Perform the can_read check. Subclasses should implement this. - """ - raise NotImplementedError - - def check_is_ready_for_command(self, timeout): - """ - Perform the is_ready_for_command check. Subclasses should - implement this. - """ - raise NotImplementedError - - def close(self): - """ - Close the selector. - """ - self.sock = None - - def errno_from_exception(self, ex): - """ - Get the error number from an exception - """ - if hasattr(ex, 'errno'): - return ex.errno - elif ex.args: - return ex.args[0] - else: - return None - - -if hasattr(select, 'select'): - class SelectSelector(BaseSelector): - """ - A select-based selector that should work on most platforms. - - This is the worst poll strategy and should only be used if no other - option is available. - """ - def check_can_read(self, timeout): - """ - Return True if data is ready to be read from the socket, - otherwise False. - - This doesn't guarentee that the socket is still connected, just - that there is data to read. - """ - return bool(select.select([self.sock], [], [], timeout)[0]) - - def check_is_ready_for_command(self, timeout): - """ - Return True if the socket is ready to send a command, - otherwise False. - """ - r, w, e = select.select([self.sock], [self.sock], [self.sock], - timeout) - return bool(w and not r and not e) - - -if hasattr(select, 'poll'): - class PollSelector(BaseSelector): - """ - A poll-based selector that should work on (almost?) all versions - of Unix - """ - READ_MASK = select.POLLIN | select.POLLPRI - ERROR_MASK = select.POLLERR | select.POLLHUP - WRITE_MASK = select.POLLOUT - - _READ_POLLER_MASK = READ_MASK | ERROR_MASK - _READY_POLLER_MASK = READ_MASK | ERROR_MASK | WRITE_MASK - - def __init__(self, sock): - super(PollSelector, self).__init__(sock) - self.read_poller = select.poll() - self.read_poller.register(sock, self._READ_POLLER_MASK) - self.ready_poller = select.poll() - self.ready_poller.register(sock, self._READY_POLLER_MASK) - - def close(self): - """ - Close the selector. - """ - for poller in (self.read_poller, self.ready_poller): - try: - poller.unregister(self.sock) - except (KeyError, ValueError): - # KeyError is raised if somehow the socket was not - # registered - # ValueError is raised if the socket's file descriptor is - # negative. - # In either case, we can't do anything better than to - # remove the reference to the poller. - pass - self.read_poller = None - self.ready_poller = None - self.sock = None - - def check_can_read(self, timeout=0): - """ - Return True if data is ready to be read from the socket, - otherwise False. - - This doesn't guarentee that the socket is still connected, just - that there is data to read. - """ - timeout = int(timeout * 1000) - events = self.read_poller.poll(timeout) - return bool(events and events[0][1] & self.READ_MASK) - - def check_is_ready_for_command(self, timeout=0): - """ - Return True if the socket is ready to send a command, - otherwise False - """ - timeout = timeout * 1000 - events = self.ready_poller.poll(timeout) - return bool(events and events[0][1] == self.WRITE_MASK) - - -def has_selector(selector): - "Determine if the current platform has the selector available" - try: - if selector == 'poll': - # the select module offers the poll selector even if the platform - # doesn't support it. Attempt to poll for nothing to make sure - # poll is available - p = select.poll() - p.poll(0) - else: - # the other selectors will fail when instantiated - getattr(select, selector)().close() - return True - except (OSError, AttributeError): - return False - - -def DefaultSelector(sock): - "Return the best selector for the platform" - global _DEFAULT_SELECTOR - if _DEFAULT_SELECTOR is None: - if has_selector('poll'): - _DEFAULT_SELECTOR = PollSelector - elif hasattr(select, 'select'): - _DEFAULT_SELECTOR = SelectSelector - else: - raise RedisError('Platform does not support any selectors') - return _DEFAULT_SELECTOR(sock) diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py index f258411..2aea1e4 100644 --- a/tests/test_connection_pool.py +++ b/tests/test_connection_pool.py @@ -19,8 +19,8 @@ class DummyConnection(object): def connect(self): pass - def is_ready_for_command(self): - return True + def can_read(self): + return False class TestConnectionPool(object): diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index fc91abf..7f94b4a 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -490,3 +490,16 @@ class TestPubSubPings(object): assert wait_for_message(p) == make_message(type='pong', channel=None, data='hello world', pattern=None) + + +class TestPubSubConnectionKilled(object): + + @skip_if_server_version_lt('3.0.0') + def test_connection_error_raised_when_connection_dies(self, r): + p = r.pubsub(ignore_subscribe_messages=True) + p.subscribe('foo') + for client in r.client_list(): + if client['cmd'] == 'subscribe': + r.client_kill_filter(_id=client['id']) + with pytest.raises(ConnectionError): + wait_for_message(p) diff --git a/tests/test_selector.py b/tests/test_selector.py deleted file mode 100644 index 07bd6dc..0000000 --- a/tests/test_selector.py +++ /dev/null @@ -1,122 +0,0 @@ -import pytest -import time -from redis import selector - -_SELECTORS = ( - 'SelectSelector', - 'PollSelector', -) - - -@pytest.mark.parametrize('selector_name', _SELECTORS) -class TestSelector(object): - - @pytest.fixture() - def selector_patch(self, selector_name, request): - "A fixture to patch the DefaultSelector with each selector" - if not hasattr(selector, selector_name): - pytest.skip('selector %s unavailable' % selector_name) - default_selector = selector._DEFAULT_SELECTOR - - def revert_selector(): - selector._DEFAULT_SELECTOR = default_selector - request.addfinalizer(revert_selector) - - selector._DEFAULT_SELECTOR = getattr(selector, selector_name) - - def kill_connection(self, connection, r): - "Helper that tells the redis server to kill `connection`" - # set a name for the connection so that we can identify and kill it - connection.send_command('client', 'setname', 'redis-py-1') - assert connection.read_response() == b'OK' - - # find the client based on its name and kill it - for client in r.client_list(): - if client['name'] == 'redis-py-1': - assert r.client_kill(client['addr']) - break - else: - assert False, 'Client redis-py-1 not found in client list' - - def test_can_read(self, selector_patch, r): - c = r.connection_pool.get_connection('_') - - # a fresh connection should not be readable - assert not c.can_read() - - c.send_command('PING') - # a connection should be readable when a response is available - # note that we supply a timeout here to make sure the server has - # a chance to respond - assert c.can_read(1.0) - - assert c.read_response() == b'PONG' - - # once the response is read, the connection is no longer readable - assert not c.can_read() - - def test_is_ready_for_command(self, selector_patch, r): - c = r.connection_pool.get_connection('_') - - # a fresh connection should be ready for a new command - assert c.is_ready_for_command() - - c.send_command('PING') - # once the server replies with a response, the selector should report - # that the connection is no longer ready since there is data that - # can be read. note that we need to wait for the server to respond - wait_until = time.time() + 2 - while time.time() < wait_until: - if not c.is_ready_for_command(): - break - time.sleep(0.01) - - assert not c.is_ready_for_command() - - assert c.read_response() == b'PONG' - - # once the response is read, the connection should be ready again - assert c.is_ready_for_command() - - def test_killed_connection_no_longer_ready(self, selector_patch, r): - "A connection that becomes disconnected is no longer ready" - c = r.connection_pool.get_connection('_') - # the connection should start as ready - assert c.is_ready_for_command() - - self.kill_connection(c, r) - - # the selector should immediately report that the socket is no - # longer ready - assert not c.is_ready_for_command() - - def test_pool_restores_killed_connection(self, selector_patch, r2): - """ - The ConnectionPool only returns healthy connecdtions, even if the - connection was killed while idle in the pool. - """ - # r2 provides two separate clients/connection pools - r = r2[0] - c = r.connection_pool.get_connection('_') - c._test_client = True - # the connection should start as ready - assert c.is_ready_for_command() - - # release the connection back to the pool - r.connection_pool.release(c) - - # kill the connection that is now idle in the pool - # use the second redis client/pool instance run the kill command - # such that it doesn't manipulate the primary connection pool - self.kill_connection(c, r2[1]) - - assert not c.is_ready_for_command() - - # retrieving the connection from the pool should provide us with - # the same connection we were previously using and it should now - # be ready for a command - c2 = r.connection_pool.get_connection('_') - assert c2 == c - assert c2._test_client is True - - assert c.is_ready_for_command() |