diff options
Diffstat (limited to 'redis/client.py')
-rw-r--r-- | redis/client.py | 68 |
1 files changed, 48 insertions, 20 deletions
diff --git a/redis/client.py b/redis/client.py index 96fe53b..42a01dd 100644 --- a/redis/client.py +++ b/redis/client.py @@ -1,6 +1,7 @@ from __future__ import with_statement -from itertools import starmap +from itertools import chain, starmap import datetime +import sys import warnings import time as mod_time from redis._compat import (b, izip, imap, iteritems, dictkeys, dictvalues, @@ -940,9 +941,16 @@ class StrictRedis(object): "Remove and return a random member of set ``name``" return self.execute_command('SPOP', name) - def srandmember(self, name): - "Return a random member of set ``name``" - return self.execute_command('SRANDMEMBER', name) + def srandmember(self, name, number=None): + """ + If ``number`` is None, returns a random member of set ``name``. + + If ``number`` is supplied, returns a list of ``number`` random + memebers of set ``name``. Note this is only available when running + Redis 2.6+. + """ + args = number and [number] or [] + return self.execute_command('SRANDMEMBER', name, *args) def srem(self, name, *values): "Remove ``values`` from set ``name``" @@ -1642,27 +1650,40 @@ class BasePipeline(object): self.command_stack.append((args, options)) return self - def _execute_transaction(self, connection, commands): + def _execute_transaction(self, connection, commands, raise_on_error): + cmds = chain([(('MULTI', ), {})], commands, [(('EXEC', ), {})]) all_cmds = SYM_EMPTY.join( starmap(connection.pack_command, - [args for args, options in commands])) + [args for args, options in cmds])) connection.send_packed_command(all_cmds) - # we don't care about the multi/exec any longer - commands = commands[1:-1] - # parse off the response for MULTI and all commands prior to EXEC. - # the only data we care about is the response the EXEC - # which is the last command - for i in range(len(commands) + 1): - self.parse_response(connection, '_') + # parse off the response for MULTI + self.parse_response(connection, '_') + # and all the other commands + errors = [] + for i, _ in enumerate(commands): + try: + self.parse_response(connection, '_') + except ResponseError: + errors.append((i, sys.exc_info()[1])) + # parse the EXEC. response = self.parse_response(connection, '_') if response is None: raise WatchError("Watched variable changed.") + # put any parse errors into the response + for i, e in errors: + response.insert(i, e) + if len(response) != len(commands): raise ResponseError("Wrong number of response items from " "pipeline execution") + + # find any errors in the response and raise if necessary + if raise_on_error: + self.raise_first_error(response) + # We have to run response callbacks manually data = [] for r, cmd in izip(response, commands): @@ -1674,14 +1695,22 @@ class BasePipeline(object): data.append(r) return data - def _execute_pipeline(self, connection, commands): + def _execute_pipeline(self, connection, commands, raise_on_error): # build up all commands into a single request to increase network perf all_cmds = SYM_EMPTY.join( starmap(connection.pack_command, [args for args, options in commands])) connection.send_packed_command(all_cmds) - return [self.parse_response(connection, args[0], **options) - for args, options in commands] + response = [self.parse_response(connection, args[0], **options) + for args, options in commands] + if raise_on_error: + self.raise_first_error(response) + return response + + def raise_first_error(self, response): + for r in response: + if isinstance(r, ResponseError): + raise r def parse_response(self, connection, command_name, **options): result = StrictRedis.parse_response( @@ -1703,13 +1732,12 @@ class BasePipeline(object): if not exist: immediate('SCRIPT', 'LOAD', s.script, **{'parse': 'LOAD'}) - def execute(self): + def execute(self, raise_on_error=True): "Execute all the commands in the current pipeline" if self.scripts: self.load_scripts() stack = self.command_stack if self.transaction or self.explicit_transaction: - stack = [(('MULTI', ), {})] + stack + [(('EXEC', ), {})] execute = self._execute_transaction else: execute = self._execute_pipeline @@ -1723,7 +1751,7 @@ class BasePipeline(object): self.connection = conn try: - return execute(conn, stack) + return execute(conn, stack, raise_on_error) except ConnectionError: conn.disconnect() # if we were watching a variable, the watch is no longer valid @@ -1736,7 +1764,7 @@ class BasePipeline(object): "one or more keys") # otherwise, it's safe to retry since the transaction isn't # predicated on any state - return execute(conn, stack) + return execute(conn, stack, raise_on_error) finally: self.reset() |