summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/client.py')
-rw-r--r--redis/client.py68
1 files changed, 48 insertions, 20 deletions
diff --git a/redis/client.py b/redis/client.py
index 96fe53b..42a01dd 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -1,6 +1,7 @@
from __future__ import with_statement
-from itertools import starmap
+from itertools import chain, starmap
import datetime
+import sys
import warnings
import time as mod_time
from redis._compat import (b, izip, imap, iteritems, dictkeys, dictvalues,
@@ -940,9 +941,16 @@ class StrictRedis(object):
"Remove and return a random member of set ``name``"
return self.execute_command('SPOP', name)
- def srandmember(self, name):
- "Return a random member of set ``name``"
- return self.execute_command('SRANDMEMBER', name)
+ def srandmember(self, name, number=None):
+ """
+ If ``number`` is None, returns a random member of set ``name``.
+
+ If ``number`` is supplied, returns a list of ``number`` random
+ memebers of set ``name``. Note this is only available when running
+ Redis 2.6+.
+ """
+ args = number and [number] or []
+ return self.execute_command('SRANDMEMBER', name, *args)
def srem(self, name, *values):
"Remove ``values`` from set ``name``"
@@ -1642,27 +1650,40 @@ class BasePipeline(object):
self.command_stack.append((args, options))
return self
- def _execute_transaction(self, connection, commands):
+ def _execute_transaction(self, connection, commands, raise_on_error):
+ cmds = chain([(('MULTI', ), {})], commands, [(('EXEC', ), {})])
all_cmds = SYM_EMPTY.join(
starmap(connection.pack_command,
- [args for args, options in commands]))
+ [args for args, options in cmds]))
connection.send_packed_command(all_cmds)
- # we don't care about the multi/exec any longer
- commands = commands[1:-1]
- # parse off the response for MULTI and all commands prior to EXEC.
- # the only data we care about is the response the EXEC
- # which is the last command
- for i in range(len(commands) + 1):
- self.parse_response(connection, '_')
+ # parse off the response for MULTI
+ self.parse_response(connection, '_')
+ # and all the other commands
+ errors = []
+ for i, _ in enumerate(commands):
+ try:
+ self.parse_response(connection, '_')
+ except ResponseError:
+ errors.append((i, sys.exc_info()[1]))
+
# parse the EXEC.
response = self.parse_response(connection, '_')
if response is None:
raise WatchError("Watched variable changed.")
+ # put any parse errors into the response
+ for i, e in errors:
+ response.insert(i, e)
+
if len(response) != len(commands):
raise ResponseError("Wrong number of response items from "
"pipeline execution")
+
+ # find any errors in the response and raise if necessary
+ if raise_on_error:
+ self.raise_first_error(response)
+
# We have to run response callbacks manually
data = []
for r, cmd in izip(response, commands):
@@ -1674,14 +1695,22 @@ class BasePipeline(object):
data.append(r)
return data
- def _execute_pipeline(self, connection, commands):
+ def _execute_pipeline(self, connection, commands, raise_on_error):
# build up all commands into a single request to increase network perf
all_cmds = SYM_EMPTY.join(
starmap(connection.pack_command,
[args for args, options in commands]))
connection.send_packed_command(all_cmds)
- return [self.parse_response(connection, args[0], **options)
- for args, options in commands]
+ response = [self.parse_response(connection, args[0], **options)
+ for args, options in commands]
+ if raise_on_error:
+ self.raise_first_error(response)
+ return response
+
+ def raise_first_error(self, response):
+ for r in response:
+ if isinstance(r, ResponseError):
+ raise r
def parse_response(self, connection, command_name, **options):
result = StrictRedis.parse_response(
@@ -1703,13 +1732,12 @@ class BasePipeline(object):
if not exist:
immediate('SCRIPT', 'LOAD', s.script, **{'parse': 'LOAD'})
- def execute(self):
+ def execute(self, raise_on_error=True):
"Execute all the commands in the current pipeline"
if self.scripts:
self.load_scripts()
stack = self.command_stack
if self.transaction or self.explicit_transaction:
- stack = [(('MULTI', ), {})] + stack + [(('EXEC', ), {})]
execute = self._execute_transaction
else:
execute = self._execute_pipeline
@@ -1723,7 +1751,7 @@ class BasePipeline(object):
self.connection = conn
try:
- return execute(conn, stack)
+ return execute(conn, stack, raise_on_error)
except ConnectionError:
conn.disconnect()
# if we were watching a variable, the watch is no longer valid
@@ -1736,7 +1764,7 @@ class BasePipeline(object):
"one or more keys")
# otherwise, it's safe to retry since the transaction isn't
# predicated on any state
- return execute(conn, stack)
+ return execute(conn, stack, raise_on_error)
finally:
self.reset()