summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES4
-rw-r--r--README.rst2
-rwxr-xr-xredis/client.py21
-rwxr-xr-xredis/connection.py71
-rw-r--r--redis/selector.py2
5 files changed, 49 insertions, 51 deletions
diff --git a/CHANGES b/CHANGES
index 8306675..a3d98f5 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,3 +1,7 @@
+* 3.2.2 (in development)
+ * Resolve a race condition with the PubSubWorkerThread. #1150
+ * Cleanup socket read error messages. Thanks Vic Yu. #1159
+ * Cleanup the Connection's selector correctly. Thanks Bruce Merry. #1153
* 3.2.1
* Fix SentinelConnectionPool to work in multiprocess/forked environments.
* 3.2.0
diff --git a/README.rst b/README.rst
index 383b731..8726d39 100644
--- a/README.rst
+++ b/README.rst
@@ -582,7 +582,7 @@ application.
>>> p = r.pubsub(ignore_subscribe_messages=True)
>>> p.subscribe('my-channel')
>>> p.get_message() # hides the subscribe message and returns None
- >>> r.publish('my-channel')
+ >>> r.publish('my-channel', 'my data')
1
>>> p.get_message()
{'channel': 'my-channel', 'data': 'my data', 'pattern': None, 'type': 'message'}
diff --git a/redis/client.py b/redis/client.py
index 7a3a7c8..1d73d32 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -3075,13 +3075,11 @@ class PubSub(object):
"""
if args:
args = list_or_args(args[0], args[1:])
- retval = self.execute_command('PUNSUBSCRIBE', *args)
- if args:
patterns = self._normalize_keys(dict.fromkeys(args))
else:
patterns = self.patterns
self.pending_unsubscribe_patterns.update(patterns)
- return retval
+ return self.execute_command('PUNSUBSCRIBE', *args)
def subscribe(self, *args, **kwargs):
"""
@@ -3111,13 +3109,11 @@ class PubSub(object):
"""
if args:
args = list_or_args(args[0], args[1:])
- retval = self.execute_command('UNSUBSCRIBE', *args)
- if args:
channels = self._normalize_keys(dict.fromkeys(args))
else:
channels = self.channels
self.pending_unsubscribe_channels.update(channels)
- return retval
+ return self.execute_command('UNSUBSCRIBE', *args)
def listen(self):
"Listen for messages on channels this client has been subscribed to"
@@ -3174,6 +3170,7 @@ class PubSub(object):
'channel': response[1],
'data': response[2]
}
+
# if this is an unsubscribe message, remove it from memory
if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
if message_type == 'punsubscribe':
@@ -3233,18 +3230,16 @@ class PubSubWorkerThread(threading.Thread):
self._running = True
pubsub = self.pubsub
sleep_time = self.sleep_time
- while pubsub.subscribed:
+ while self._running:
pubsub.get_message(ignore_subscribe_messages=True,
timeout=sleep_time)
pubsub.close()
- self._running = False
def stop(self):
- # stopping simply unsubscribes from all channels and patterns.
- # the unsubscribe responses that are generated will short circuit
- # the loop in run(), calling pubsub.close() to clean up the connection
- self.pubsub.unsubscribe()
- self.pubsub.punsubscribe()
+ # trip the flag so the run loop exits. the run loop will
+ # close the pubsub connection, which disconnects the socket
+ # and returns the connection to the pool.
+ self._running = False
class Pipeline(Redis):
diff --git a/redis/connection.py b/redis/connection.py
index beeba30..eff88b0 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -177,26 +177,19 @@ class SocketBuffer(object):
buf.seek(self.bytes_written)
marker = 0
- try:
- while True:
- data = recv(self._sock, socket_read_size)
- # an empty string indicates the server shutdown the socket
- if isinstance(data, bytes) and len(data) == 0:
- raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
- buf.write(data)
- data_length = len(data)
- self.bytes_written += data_length
- marker += data_length
-
- if length is not None and length > marker:
- continue
- break
- except socket.timeout:
- raise TimeoutError("Timeout reading from socket")
- except socket.error:
- e = sys.exc_info()[1]
- raise ConnectionError("Error while reading from socket: %s" %
- (e.args,))
+ while True:
+ data = recv(self._sock, socket_read_size)
+ # an empty string indicates the server shutdown the socket
+ if isinstance(data, bytes) and len(data) == 0:
+ raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
+ buf.write(data)
+ data_length = len(data)
+ self.bytes_written += data_length
+ marker += data_length
+
+ if length is not None and length > marker:
+ continue
+ break
def read(self, length):
length = length + 2 # make sure to read the \r\n terminator
@@ -391,22 +384,15 @@ class HiredisParser(BaseParser):
response = self._reader.gets()
socket_read_size = self.socket_read_size
while response is False:
- try:
- if HIREDIS_USE_BYTE_BUFFER:
- bufflen = recv_into(self._sock, self._buffer)
- if bufflen == 0:
- raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
- else:
- buffer = recv(self._sock, socket_read_size)
- # an empty string indicates the server shutdown the socket
- if not isinstance(buffer, bytes) or len(buffer) == 0:
- raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
- except socket.timeout:
- raise TimeoutError("Timeout reading from socket")
- except socket.error:
- e = sys.exc_info()[1]
- raise ConnectionError("Error while reading from socket: %s" %
- (e.args,))
+ if HIREDIS_USE_BYTE_BUFFER:
+ bufflen = recv_into(self._sock, self._buffer)
+ if bufflen == 0:
+ raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
+ else:
+ buffer = recv(self._sock, socket_read_size)
+ # an empty string indicates the server shutdown the socket
+ if not isinstance(buffer, bytes) or len(buffer) == 0:
+ raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
if HIREDIS_USE_BYTE_BUFFER:
self._reader.feed(self._buffer, 0, bufflen)
else:
@@ -460,6 +446,7 @@ class Connection(object):
self.retry_on_timeout = retry_on_timeout
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self._sock = None
+ self._selector = None
self._parser = parser_class(socket_read_size=socket_read_size)
self._description_args = {
'host': self.host,
@@ -581,6 +568,9 @@ class Connection(object):
self._parser.on_disconnect()
if self._sock is None:
return
+ if self._selector is not None:
+ self._selector.close()
+ self._selector = None
try:
if os.getpid() == self.pid:
self._sock.shutdown(socket.SHUT_RDWR)
@@ -635,6 +625,15 @@ class Connection(object):
"Read the response from a previously sent command"
try:
response = self._parser.read_response()
+ except socket.timeout:
+ self.disconnect()
+ raise TimeoutError("Timeout reading from %s:%s" %
+ (self.host, self.port))
+ except socket.error:
+ self.disconnect()
+ e = sys.exc_info()[1]
+ raise ConnectionError("Error while reading from %s:%s : %s" %
+ (self.host, self.port, e.args))
except: # noqa: E722
self.disconnect()
raise
diff --git a/redis/selector.py b/redis/selector.py
index 8c0ab8a..bce84a5 100644
--- a/redis/selector.py
+++ b/redis/selector.py
@@ -131,7 +131,7 @@ if hasattr(select, 'poll'):
"""
for poller in (self.read_poller, self.ready_poller):
try:
- self.read_poller.unregister(self.sock)
+ poller.unregister(self.sock)
except (KeyError, ValueError):
# KeyError is raised if somehow the socket was not
# registered