diff options
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 49 |
1 files changed, 29 insertions, 20 deletions
diff --git a/redis/client.py b/redis/client.py index e444571..cf44340 100755 --- a/redis/client.py +++ b/redis/client.py @@ -2288,30 +2288,39 @@ class PubSub(object): for pattern, handler in iteritems(self.patterns): if handler is None: raise PubSubError("Pattern: '%s' has no handler registered") - pubsub = self - - class WorkerThread(threading.Thread): - def __init__(self, *args, **kwargs): - super(WorkerThread, self).__init__(*args, **kwargs) - self._running = False - - def run(self): - if self._running: - return - self._running = True - while self._running and pubsub.subscribed: - pubsub.get_message(ignore_subscribe_messages=True, - timeout=sleep_time) - - def stop(self): - self._running = False - self.join() - - thread = WorkerThread() + + thread = PubSubWorkerThread(self, sleep_time) thread.start() return thread +class PubSubWorkerThread(threading.Thread): + def __init__(self, pubsub, sleep_time): + super(PubSubWorkerThread, self).__init__() + self.pubsub = pubsub + self.sleep_time = sleep_time + self._running = False + + def run(self): + if self._running: + return + self._running = True + pubsub = self.pubsub + sleep_time = self.sleep_time + while pubsub.subscribed: + pubsub.get_message(ignore_subscribe_messages=True, + timeout=sleep_time) + pubsub.close() + self._running = False + + def stop(self): + # stopping simply unsubscribes from all channels and patterns. + # the unsubscribe responses that are generated will short circuit + # the loop in run(), calling pubsub.close() to clean up the connection + self.pubsub.unsubscribe() + self.pubsub.punsubscribe() + + class BasePipeline(object): """ Pipelines provide a way to transmit multiple commands to the Redis server |