summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2015-01-02 10:48:49 -0800
committerAndy McCurdy <andy@andymccurdy.com>2015-01-02 10:48:49 -0800
commit69ce3269dfcb87e972ef29a84772d781d1613c9c (patch)
tree83902c36e772b5c4b5e2b289ecae2615ad6a2def /redis/client.py
parentfa714e08ef59d37b145616cb60939eedafda0bd4 (diff)
downloadredis-py-69ce3269dfcb87e972ef29a84772d781d1613c9c.tar.gz
better handling of the pub/sub workerthread's stop behavior
properly unsubscribes and cleans up the connection now. also allows callbacks to call thread.stop() without errors.
Diffstat (limited to 'redis/client.py')
-rwxr-xr-xredis/client.py49
1 files changed, 29 insertions, 20 deletions
diff --git a/redis/client.py b/redis/client.py
index e444571..cf44340 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -2288,30 +2288,39 @@ class PubSub(object):
for pattern, handler in iteritems(self.patterns):
if handler is None:
raise PubSubError("Pattern: '%s' has no handler registered")
- pubsub = self
-
- class WorkerThread(threading.Thread):
- def __init__(self, *args, **kwargs):
- super(WorkerThread, self).__init__(*args, **kwargs)
- self._running = False
-
- def run(self):
- if self._running:
- return
- self._running = True
- while self._running and pubsub.subscribed:
- pubsub.get_message(ignore_subscribe_messages=True,
- timeout=sleep_time)
-
- def stop(self):
- self._running = False
- self.join()
-
- thread = WorkerThread()
+
+ thread = PubSubWorkerThread(self, sleep_time)
thread.start()
return thread
+class PubSubWorkerThread(threading.Thread):
+ def __init__(self, pubsub, sleep_time):
+ super(PubSubWorkerThread, self).__init__()
+ self.pubsub = pubsub
+ self.sleep_time = sleep_time
+ self._running = False
+
+ def run(self):
+ if self._running:
+ return
+ self._running = True
+ pubsub = self.pubsub
+ sleep_time = self.sleep_time
+ while pubsub.subscribed:
+ pubsub.get_message(ignore_subscribe_messages=True,
+ timeout=sleep_time)
+ pubsub.close()
+ self._running = False
+
+ def stop(self):
+ # stopping simply unsubscribes from all channels and patterns.
+ # the unsubscribe responses that are generated will short circuit
+ # the loop in run(), calling pubsub.close() to clean up the connection
+ self.pubsub.unsubscribe()
+ self.pubsub.punsubscribe()
+
+
class BasePipeline(object):
"""
Pipelines provide a way to transmit multiple commands to the Redis server