diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2015-01-02 10:48:49 -0800 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2015-01-02 10:48:49 -0800 |
commit | 69ce3269dfcb87e972ef29a84772d781d1613c9c (patch) | |
tree | 83902c36e772b5c4b5e2b289ecae2615ad6a2def /redis/client.py | |
parent | fa714e08ef59d37b145616cb60939eedafda0bd4 (diff) | |
download | redis-py-69ce3269dfcb87e972ef29a84772d781d1613c9c.tar.gz |
better handling of the pub/sub workerthread's stop behavior
properly unsubscribes and cleans up the connection now. also allows
callbacks to call thread.stop() without errors.
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 49 |
1 files changed, 29 insertions, 20 deletions
diff --git a/redis/client.py b/redis/client.py index e444571..cf44340 100755 --- a/redis/client.py +++ b/redis/client.py @@ -2288,30 +2288,39 @@ class PubSub(object): for pattern, handler in iteritems(self.patterns): if handler is None: raise PubSubError("Pattern: '%s' has no handler registered") - pubsub = self - - class WorkerThread(threading.Thread): - def __init__(self, *args, **kwargs): - super(WorkerThread, self).__init__(*args, **kwargs) - self._running = False - - def run(self): - if self._running: - return - self._running = True - while self._running and pubsub.subscribed: - pubsub.get_message(ignore_subscribe_messages=True, - timeout=sleep_time) - - def stop(self): - self._running = False - self.join() - - thread = WorkerThread() + + thread = PubSubWorkerThread(self, sleep_time) thread.start() return thread +class PubSubWorkerThread(threading.Thread): + def __init__(self, pubsub, sleep_time): + super(PubSubWorkerThread, self).__init__() + self.pubsub = pubsub + self.sleep_time = sleep_time + self._running = False + + def run(self): + if self._running: + return + self._running = True + pubsub = self.pubsub + sleep_time = self.sleep_time + while pubsub.subscribed: + pubsub.get_message(ignore_subscribe_messages=True, + timeout=sleep_time) + pubsub.close() + self._running = False + + def stop(self): + # stopping simply unsubscribes from all channels and patterns. + # the unsubscribe responses that are generated will short circuit + # the loop in run(), calling pubsub.close() to clean up the connection + self.pubsub.unsubscribe() + self.pubsub.punsubscribe() + + class BasePipeline(object): """ Pipelines provide a way to transmit multiple commands to the Redis server |