diff options
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 21 |
1 files changed, 8 insertions, 13 deletions
diff --git a/redis/client.py b/redis/client.py index 24120d2..de9cf88 100755 --- a/redis/client.py +++ b/redis/client.py @@ -3073,13 +3073,11 @@ class PubSub(object): """ if args: args = list_or_args(args[0], args[1:]) - retval = self.execute_command('PUNSUBSCRIBE', *args) - if args: patterns = self._normalize_keys(dict.fromkeys(args)) else: patterns = self.patterns self.pending_unsubscribe_patterns.update(patterns) - return retval + return self.execute_command('PUNSUBSCRIBE', *args) def subscribe(self, *args, **kwargs): """ @@ -3109,13 +3107,11 @@ class PubSub(object): """ if args: args = list_or_args(args[0], args[1:]) - retval = self.execute_command('UNSUBSCRIBE', *args) - if args: channels = self._normalize_keys(dict.fromkeys(args)) else: channels = self.channels self.pending_unsubscribe_channels.update(channels) - return retval + return self.execute_command('UNSUBSCRIBE', *args) def listen(self): "Listen for messages on channels this client has been subscribed to" @@ -3172,6 +3168,7 @@ class PubSub(object): 'channel': response[1], 'data': response[2] } + # if this is an unsubscribe message, remove it from memory if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: if message_type == 'punsubscribe': @@ -3231,18 +3228,16 @@ class PubSubWorkerThread(threading.Thread): self._running = True pubsub = self.pubsub sleep_time = self.sleep_time - while pubsub.subscribed: + while self._running: pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time) pubsub.close() - self._running = False def stop(self): - # stopping simply unsubscribes from all channels and patterns. - # the unsubscribe responses that are generated will short circuit - # the loop in run(), calling pubsub.close() to clean up the connection - self.pubsub.unsubscribe() - self.pubsub.punsubscribe() + # trip the flag so the run loop exits. the run loop will + # close the pubsub connection, which disconnects the socket + # and returns the connection to the pool. + self._running = False class Pipeline(Redis): |