diff options
-rw-r--r-- | CHANGES | 6 | ||||
-rw-r--r-- | README.rst | 2 | ||||
-rw-r--r-- | redis/__init__.py | 2 | ||||
-rwxr-xr-x | redis/client.py | 21 | ||||
-rwxr-xr-x | redis/connection.py | 71 | ||||
-rw-r--r-- | redis/selector.py | 2 | ||||
-rw-r--r-- | redis/sentinel.py | 12 |
7 files changed, 52 insertions, 64 deletions
@@ -1,3 +1,9 @@ +* 3.2.2 (in development) + * Resolve a race condition with the PubSubWorkerThread. #1150 + * Cleanup socket read error messages. Thanks Vic Yu. #1159 + * Cleanup the Connection's selector correctly. Thanks Bruce Merry. #1153 +* 3.2.1 + * Fix SentinelConnectionPool to work in multiprocess/forked environments. * 3.2.0 * Added support for `select.poll` to test whether data can be read on a socket. This should allow for significantly more connections to @@ -582,7 +582,7 @@ application. >>> p = r.pubsub(ignore_subscribe_messages=True) >>> p.subscribe('my-channel') >>> p.get_message() # hides the subscribe message and returns None - >>> r.publish('my-channel') + >>> r.publish('my-channel', 'my data') 1 >>> p.get_message() {'channel': 'my-channel', 'data': 'my data', 'pattern': None, 'type': 'message'} diff --git a/redis/__init__.py b/redis/__init__.py index 7c5355e..b74c403 100644 --- a/redis/__init__.py +++ b/redis/__init__.py @@ -29,7 +29,7 @@ def int_or_str(value): return value -__version__ = '3.2.0' +__version__ = '3.2.1' VERSION = tuple(map(int_or_str, __version__.split('.'))) __all__ = [ diff --git a/redis/client.py b/redis/client.py index f246071..07df19a 100755 --- a/redis/client.py +++ b/redis/client.py @@ -3122,13 +3122,11 @@ class PubSub(object): """ if args: args = list_or_args(args[0], args[1:]) - retval = self.execute_command('PUNSUBSCRIBE', *args) - if args: patterns = self._normalize_keys(dict.fromkeys(args)) else: patterns = self.patterns self.pending_unsubscribe_patterns.update(patterns) - return retval + return self.execute_command('PUNSUBSCRIBE', *args) def subscribe(self, *args, **kwargs): """ @@ -3158,13 +3156,11 @@ class PubSub(object): """ if args: args = list_or_args(args[0], args[1:]) - retval = self.execute_command('UNSUBSCRIBE', *args) - if args: channels = self._normalize_keys(dict.fromkeys(args)) else: channels = self.channels self.pending_unsubscribe_channels.update(channels) - return retval + return self.execute_command('UNSUBSCRIBE', *args) def listen(self): "Listen for messages on channels this client has been subscribed to" @@ -3221,6 +3217,7 @@ class PubSub(object): 'channel': response[1], 'data': response[2] } + # if this is an unsubscribe message, remove it from memory if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: if message_type == 'punsubscribe': @@ -3280,18 +3277,16 @@ class PubSubWorkerThread(threading.Thread): self._running = True pubsub = self.pubsub sleep_time = self.sleep_time - while pubsub.subscribed: + while self._running: pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time) pubsub.close() - self._running = False def stop(self): - # stopping simply unsubscribes from all channels and patterns. - # the unsubscribe responses that are generated will short circuit - # the loop in run(), calling pubsub.close() to clean up the connection - self.pubsub.unsubscribe() - self.pubsub.punsubscribe() + # trip the flag so the run loop exits. the run loop will + # close the pubsub connection, which disconnects the socket + # and returns the connection to the pool. + self._running = False class Pipeline(Redis): diff --git a/redis/connection.py b/redis/connection.py index beeba30..eff88b0 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -177,26 +177,19 @@ class SocketBuffer(object): buf.seek(self.bytes_written) marker = 0 - try: - while True: - data = recv(self._sock, socket_read_size) - # an empty string indicates the server shutdown the socket - if isinstance(data, bytes) and len(data) == 0: - raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) - buf.write(data) - data_length = len(data) - self.bytes_written += data_length - marker += data_length - - if length is not None and length > marker: - continue - break - except socket.timeout: - raise TimeoutError("Timeout reading from socket") - except socket.error: - e = sys.exc_info()[1] - raise ConnectionError("Error while reading from socket: %s" % - (e.args,)) + while True: + data = recv(self._sock, socket_read_size) + # an empty string indicates the server shutdown the socket + if isinstance(data, bytes) and len(data) == 0: + raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) + buf.write(data) + data_length = len(data) + self.bytes_written += data_length + marker += data_length + + if length is not None and length > marker: + continue + break def read(self, length): length = length + 2 # make sure to read the \r\n terminator @@ -391,22 +384,15 @@ class HiredisParser(BaseParser): response = self._reader.gets() socket_read_size = self.socket_read_size while response is False: - try: - if HIREDIS_USE_BYTE_BUFFER: - bufflen = recv_into(self._sock, self._buffer) - if bufflen == 0: - raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) - else: - buffer = recv(self._sock, socket_read_size) - # an empty string indicates the server shutdown the socket - if not isinstance(buffer, bytes) or len(buffer) == 0: - raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) - except socket.timeout: - raise TimeoutError("Timeout reading from socket") - except socket.error: - e = sys.exc_info()[1] - raise ConnectionError("Error while reading from socket: %s" % - (e.args,)) + if HIREDIS_USE_BYTE_BUFFER: + bufflen = recv_into(self._sock, self._buffer) + if bufflen == 0: + raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) + else: + buffer = recv(self._sock, socket_read_size) + # an empty string indicates the server shutdown the socket + if not isinstance(buffer, bytes) or len(buffer) == 0: + raise socket.error(SERVER_CLOSED_CONNECTION_ERROR) if HIREDIS_USE_BYTE_BUFFER: self._reader.feed(self._buffer, 0, bufflen) else: @@ -460,6 +446,7 @@ class Connection(object): self.retry_on_timeout = retry_on_timeout self.encoder = Encoder(encoding, encoding_errors, decode_responses) self._sock = None + self._selector = None self._parser = parser_class(socket_read_size=socket_read_size) self._description_args = { 'host': self.host, @@ -581,6 +568,9 @@ class Connection(object): self._parser.on_disconnect() if self._sock is None: return + if self._selector is not None: + self._selector.close() + self._selector = None try: if os.getpid() == self.pid: self._sock.shutdown(socket.SHUT_RDWR) @@ -635,6 +625,15 @@ class Connection(object): "Read the response from a previously sent command" try: response = self._parser.read_response() + except socket.timeout: + self.disconnect() + raise TimeoutError("Timeout reading from %s:%s" % + (self.host, self.port)) + except socket.error: + self.disconnect() + e = sys.exc_info()[1] + raise ConnectionError("Error while reading from %s:%s : %s" % + (self.host, self.port, e.args)) except: # noqa: E722 self.disconnect() raise diff --git a/redis/selector.py b/redis/selector.py index 8c0ab8a..bce84a5 100644 --- a/redis/selector.py +++ b/redis/selector.py @@ -131,7 +131,7 @@ if hasattr(select, 'poll'): """ for poller in (self.read_poller, self.ready_poller): try: - self.read_poller.unregister(self.sock) + poller.unregister(self.sock) except (KeyError, ValueError): # KeyError is raised if somehow the socket was not # registered diff --git a/redis/sentinel.py b/redis/sentinel.py index 9df2997..11263d2 100644 --- a/redis/sentinel.py +++ b/redis/sentinel.py @@ -1,4 +1,3 @@ -import os import random import weakref @@ -125,17 +124,6 @@ class SentinelConnectionPool(ConnectionPool): pass raise SlaveNotFoundError('No slave found for %r' % (self.service_name)) - def _checkpid(self): - if self.pid != os.getpid(): - self.disconnect() - self.reset() - self.__init__(self.service_name, self.sentinel_manager, - is_master=self.is_master, - check_connection=self.check_connection, - connection_class=self.connection_class, - max_connections=self.max_connections, - **self.connection_kwargs) - class Sentinel(object): """ |