diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2019-04-18 23:59:04 -0700 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2019-04-18 23:59:04 -0700 |
commit | 29879bb670eacf95893f77ade61b1a2971e9064d (patch) | |
tree | fc2bf37fddc523a1f20870a389c27afdf863d46f | |
parent | 99c589946f27a30f733768684e428744bdb018a8 (diff) | |
download | redis-py-29879bb670eacf95893f77ade61b1a2971e9064d.tar.gz |
Fix PubSubWorkerThread race condition
Resolves a race condition found in the PubSubWorkerThread. Prior to this
change is was possible to receive the server's resonse to an unsubscribe
message prior to the channel/pattern being added to the
pending_unsubscribe set.
This also improves PubSubWorkerThread.stop so that the run function is
stopped immediately after the next iteration.
fixes #1150
-rwxr-xr-x | redis/client.py | 21 |
1 files changed, 8 insertions, 13 deletions
diff --git a/redis/client.py b/redis/client.py index 24120d2..de9cf88 100755 --- a/redis/client.py +++ b/redis/client.py @@ -3073,13 +3073,11 @@ class PubSub(object): """ if args: args = list_or_args(args[0], args[1:]) - retval = self.execute_command('PUNSUBSCRIBE', *args) - if args: patterns = self._normalize_keys(dict.fromkeys(args)) else: patterns = self.patterns self.pending_unsubscribe_patterns.update(patterns) - return retval + return self.execute_command('PUNSUBSCRIBE', *args) def subscribe(self, *args, **kwargs): """ @@ -3109,13 +3107,11 @@ class PubSub(object): """ if args: args = list_or_args(args[0], args[1:]) - retval = self.execute_command('UNSUBSCRIBE', *args) - if args: channels = self._normalize_keys(dict.fromkeys(args)) else: channels = self.channels self.pending_unsubscribe_channels.update(channels) - return retval + return self.execute_command('UNSUBSCRIBE', *args) def listen(self): "Listen for messages on channels this client has been subscribed to" @@ -3172,6 +3168,7 @@ class PubSub(object): 'channel': response[1], 'data': response[2] } + # if this is an unsubscribe message, remove it from memory if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: if message_type == 'punsubscribe': @@ -3231,18 +3228,16 @@ class PubSubWorkerThread(threading.Thread): self._running = True pubsub = self.pubsub sleep_time = self.sleep_time - while pubsub.subscribed: + while self._running: pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time) pubsub.close() - self._running = False def stop(self): - # stopping simply unsubscribes from all channels and patterns. - # the unsubscribe responses that are generated will short circuit - # the loop in run(), calling pubsub.close() to clean up the connection - self.pubsub.unsubscribe() - self.pubsub.punsubscribe() + # trip the flag so the run loop exits. the run loop will + # close the pubsub connection, which disconnects the socket + # and returns the connection to the pool. + self._running = False class Pipeline(Redis): |