summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2019-04-18 23:59:04 -0700
committerAndy McCurdy <andy@andymccurdy.com>2019-04-18 23:59:04 -0700
commit29879bb670eacf95893f77ade61b1a2971e9064d (patch)
treefc2bf37fddc523a1f20870a389c27afdf863d46f
parent99c589946f27a30f733768684e428744bdb018a8 (diff)
downloadredis-py-29879bb670eacf95893f77ade61b1a2971e9064d.tar.gz
Fix PubSubWorkerThread race condition
Resolves a race condition found in the PubSubWorkerThread. Prior to this change is was possible to receive the server's resonse to an unsubscribe message prior to the channel/pattern being added to the pending_unsubscribe set. This also improves PubSubWorkerThread.stop so that the run function is stopped immediately after the next iteration. fixes #1150
-rwxr-xr-xredis/client.py21
1 files changed, 8 insertions, 13 deletions
diff --git a/redis/client.py b/redis/client.py
index 24120d2..de9cf88 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -3073,13 +3073,11 @@ class PubSub(object):
"""
if args:
args = list_or_args(args[0], args[1:])
- retval = self.execute_command('PUNSUBSCRIBE', *args)
- if args:
patterns = self._normalize_keys(dict.fromkeys(args))
else:
patterns = self.patterns
self.pending_unsubscribe_patterns.update(patterns)
- return retval
+ return self.execute_command('PUNSUBSCRIBE', *args)
def subscribe(self, *args, **kwargs):
"""
@@ -3109,13 +3107,11 @@ class PubSub(object):
"""
if args:
args = list_or_args(args[0], args[1:])
- retval = self.execute_command('UNSUBSCRIBE', *args)
- if args:
channels = self._normalize_keys(dict.fromkeys(args))
else:
channels = self.channels
self.pending_unsubscribe_channels.update(channels)
- return retval
+ return self.execute_command('UNSUBSCRIBE', *args)
def listen(self):
"Listen for messages on channels this client has been subscribed to"
@@ -3172,6 +3168,7 @@ class PubSub(object):
'channel': response[1],
'data': response[2]
}
+
# if this is an unsubscribe message, remove it from memory
if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
if message_type == 'punsubscribe':
@@ -3231,18 +3228,16 @@ class PubSubWorkerThread(threading.Thread):
self._running = True
pubsub = self.pubsub
sleep_time = self.sleep_time
- while pubsub.subscribed:
+ while self._running:
pubsub.get_message(ignore_subscribe_messages=True,
timeout=sleep_time)
pubsub.close()
- self._running = False
def stop(self):
- # stopping simply unsubscribes from all channels and patterns.
- # the unsubscribe responses that are generated will short circuit
- # the loop in run(), calling pubsub.close() to clean up the connection
- self.pubsub.unsubscribe()
- self.pubsub.punsubscribe()
+ # trip the flag so the run loop exits. the run loop will
+ # close the pubsub connection, which disconnects the socket
+ # and returns the connection to the pool.
+ self._running = False
class Pipeline(Redis):