summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/client.py')
-rw-r--r--redis/client.py48
1 files changed, 46 insertions, 2 deletions
diff --git a/redis/client.py b/redis/client.py
index b75dd7f..5d82348 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -170,7 +170,6 @@ def zset_score_pairs(response, **options):
return response
return zip(response[::2], map(float, response[1::2]))
-
class Redis(threading.local):
"""
Implementation of the Redis protocol.
@@ -218,6 +217,9 @@ class Redis(threading.local):
'TTL': lambda r: r != -1 and r or None,
}
)
+
+ # commands that should NOT pull data off the network buffer when executed
+ SUBSCRIPTION_COMMANDS = set(['SUBSCRIBE', 'UNSUBSCRIBE'])
def __init__(self, host='localhost', port=6379,
db=0, password=None, socket_timeout=None,
@@ -225,6 +227,7 @@ class Redis(threading.local):
self.encoding = charset
self.errors = errors
self.connection = None
+ self.subscribed = False
self.select(db, host, port, password, socket_timeout)
#### Legacty accessors of connection information ####
@@ -245,7 +248,13 @@ class Redis(threading.local):
#### COMMAND EXECUTION AND PROTOCOL PARSING ####
def _execute_command(self, command_name, command, **options):
+ subscription_command = command_name in self.SUBSCRIPTION_COMMANDS
+ if self.subscribed and not subscription_command:
+ raise RedisError("Cannot issue commands other than SUBSCRIBE and "
+ "UNSUBSCRIBE while channels are open")
self.connection.send(command, self)
+ if subscription_command:
+ return None
return self.parse_response(command_name, **options)
def execute_command(self, command_name, command, **options):
@@ -976,7 +985,42 @@ class Redis(threading.local):
def hvals(self, name):
"Return the list of values within hash ``name``"
return self.format_inline('HVALS', name)
-
+
+
+ # channels
+ def subscribe(self, channels):
+ "Subscribe to ``channels``, waiting for messages to be published"
+ if isinstance(channels, basestring):
+ channels = [channels]
+ response = self.format_inline('SUBSCRIBE', *channels)
+ # this is *after* the SUBSCRIBE in order to allow for lazy and broken
+ # connections that need to issue AUTH and SELECT commands
+ self.subscribed = True
+ return response
+
+ def unsubscribe(self, channels=[]):
+ "Unsubscribe to ``channels``. If empty, unsubscribe from all channels"
+ if isinstance(channels, basestring):
+ channels = [channels]
+ return self.format_inline('UNSUBSCRIBE', *channels)
+
+ def publish(self, channel, message):
+ """
+ Publish ``message`` on ``channel``.
+ Returns the number of subscribers the message was delivered to.
+ """
+ return self.format_bulk('PUBLISH', channel, message)
+
+ def listen(self):
+ "Listen for messages on channels this client has been subscribed to"
+ while self.subscribed:
+ r = self.parse_response('LISTEN')
+ message = {'type': r[0], 'channel': r[1], 'message': r[2]}
+ yield message
+ if message['type'] == 'unsubscribe' and message['message'] == 0:
+ self.subscribed = False
+
+
class Pipeline(Redis):
"""
Pipelines provide a way to transmit multiple commands to the Redis server