diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2010-03-30 02:43:35 -0700 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2010-03-30 02:43:35 -0700 |
commit | b58a08b955ea87f72b76a0cc9d8155110f24cb8e (patch) | |
tree | c0cf55054497253efc2d3c272720a692598e2546 | |
parent | c148ede374784c9220ebc992676667254c443e75 (diff) | |
download | redis-py-b58a08b955ea87f72b76a0cc9d8155110f24cb8e.tar.gz |
added support for subscribe, unsubscribe, publish. tests incoming. pattern is:
redis.subscribe('my_channel')
for msg in redis.listen():
channel, data = msg['channel'], msg['message']
...
listen() is a generator that will generate new messages as they come in. it will continue to block until all channels are unsubscribed.
-rw-r--r-- | redis/client.py | 48 |
1 files changed, 46 insertions, 2 deletions
diff --git a/redis/client.py b/redis/client.py index b75dd7f..5d82348 100644 --- a/redis/client.py +++ b/redis/client.py @@ -170,7 +170,6 @@ def zset_score_pairs(response, **options): return response return zip(response[::2], map(float, response[1::2])) - class Redis(threading.local): """ Implementation of the Redis protocol. @@ -218,6 +217,9 @@ class Redis(threading.local): 'TTL': lambda r: r != -1 and r or None, } ) + + # commands that should NOT pull data off the network buffer when executed + SUBSCRIPTION_COMMANDS = set(['SUBSCRIBE', 'UNSUBSCRIBE']) def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, @@ -225,6 +227,7 @@ class Redis(threading.local): self.encoding = charset self.errors = errors self.connection = None + self.subscribed = False self.select(db, host, port, password, socket_timeout) #### Legacty accessors of connection information #### @@ -245,7 +248,13 @@ class Redis(threading.local): #### COMMAND EXECUTION AND PROTOCOL PARSING #### def _execute_command(self, command_name, command, **options): + subscription_command = command_name in self.SUBSCRIPTION_COMMANDS + if self.subscribed and not subscription_command: + raise RedisError("Cannot issue commands other than SUBSCRIBE and " + "UNSUBSCRIBE while channels are open") self.connection.send(command, self) + if subscription_command: + return None return self.parse_response(command_name, **options) def execute_command(self, command_name, command, **options): @@ -976,7 +985,42 @@ class Redis(threading.local): def hvals(self, name): "Return the list of values within hash ``name``" return self.format_inline('HVALS', name) - + + + # channels + def subscribe(self, channels): + "Subscribe to ``channels``, waiting for messages to be published" + if isinstance(channels, basestring): + channels = [channels] + response = self.format_inline('SUBSCRIBE', *channels) + # this is *after* the SUBSCRIBE in order to allow for lazy and broken + # connections that need to issue AUTH and SELECT commands + self.subscribed = True + return response + + def unsubscribe(self, channels=[]): + "Unsubscribe to ``channels``. If empty, unsubscribe from all channels" + if isinstance(channels, basestring): + channels = [channels] + return self.format_inline('UNSUBSCRIBE', *channels) + + def publish(self, channel, message): + """ + Publish ``message`` on ``channel``. + Returns the number of subscribers the message was delivered to. + """ + return self.format_bulk('PUBLISH', channel, message) + + def listen(self): + "Listen for messages on channels this client has been subscribed to" + while self.subscribed: + r = self.parse_response('LISTEN') + message = {'type': r[0], 'channel': r[1], 'message': r[2]} + yield message + if message['type'] == 'unsubscribe' and message['message'] == 0: + self.subscribed = False + + class Pipeline(Redis): """ Pipelines provide a way to transmit multiple commands to the Redis server |