summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/client.py')
-rw-r--r--redis/client.py70
1 files changed, 57 insertions, 13 deletions
diff --git a/redis/client.py b/redis/client.py
index d0f7516..f5a3447 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -225,7 +225,7 @@ class Redis(threading.local):
self.connection.disconnect()
return self._execute_command(command_name, command, **options)
- def _parse_response(self, command_name):
+ def _parse_response(self, command_name, catch_errors):
conn = self.connection
response = conn.read().strip()
if not response:
@@ -261,13 +261,27 @@ class Redis(threading.local):
length = int(response)
if length == -1:
return None
- return [self._parse_response(command_name) for i in range(length)]
+ if not catch_errors:
+ return [self._parse_response(command_name, catch_errors)
+ for i in range(length)]
+ else:
+ # for pipelines, we need to read everything, including response errors.
+ # otherwise we'd completely mess up the receive buffer
+ data = []
+ for i in range(length):
+ try:
+ data.append(
+ self._parse_response(command_name, catch_errors)
+ )
+ except Exception, e:
+ data.append(e)
+ return data
raise InvalidResponse("Unknown response type for: %s" % command_name)
- def parse_response(self, command_name, **options):
+ def parse_response(self, command_name, catch_errors=False, **options):
"Parses a response from the Redis server"
- response = self._parse_response(command_name)
+ response = self._parse_response(command_name, catch_errors)
if command_name in self.RESPONSE_CALLBACKS:
return self.RESPONSE_CALLBACKS[command_name](response, **options)
return response
@@ -886,16 +900,27 @@ class Pipeline(Redis):
in one transmission. This is convenient for batch processing, such as
saving all the values in a list to Redis.
- Note that pipelining does *not* guarantee all the commands will be executed
- together atomically, nor does it guarantee any transactional consistency.
- If the third command in the batch fails, the first two will still have been
- executed and "committed"
+ All commands executed within a pipeline are wrapped with MULTI and EXEC
+ calls. This guarantees all commands executed in the pipeline will be
+ executed atomically.
+
+ Any command raising an exception does *not* halt the execution of
+ subsequent commands in the pipeline. Instead, the exception is caught
+ and its instance is placed into the response list returned by execute().
+ Code iterating over the response list should be able to deal with an
+ instance of an exception as a potential value. In general, these will be
+ ResponseError exceptions, such as those raised when issuing a command
+ on a key of a different datatype.
"""
def __init__(self, connection, charset, errors):
self.connection = connection
- self.command_stack = []
self.encoding = charset
self.errors = errors
+ self.reset()
+
+ def reset(self):
+ self.command_stack = []
+ self.format_inline('MULTI')
def execute_command(self, command_name, command, **options):
"""
@@ -921,15 +946,34 @@ class Pipeline(Redis):
return self
def _execute(self, commands):
- for _, command, options in commands:
+ for name, command, options in commands:
self.connection.send(command, self)
- return [self.parse_response(name, **options)
- for name, _, options in commands]
+ # we only care about the last item in the response, which should be
+ # the EXEC command
+ for i in range(len(commands)-1):
+ _ = self.parse_response('_')
+ # tell the response parse to catch errors and return them as
+ # part of the response
+ response = self.parse_response('_', catch_errors=True)
+ # don't return the results of the MULTI or EXEC command
+ commands = [(c[0], c[2]) for c in commands[1:-1]]
+ if len(response) != len(commands):
+ raise ResponseError("Wrong number of response items from "
+ "pipline execution")
+ # Run any callbacks for the commands run in the pipeline
+ data = []
+ for r, cmd in zip(response, commands):
+ if not isinstance(r, Exception):
+ if cmd[0] in self.RESPONSE_CALLBACKS:
+ r = self.RESPONSE_CALLBACKS[cmd[0]](r, **cmd[1])
+ data.append(r)
+ return data
def execute(self):
"Execute all the commands in the current pipeline"
+ self.format_inline('EXEC')
stack = self.command_stack
- self.command_stack = []
+ self.reset()
try:
return self._execute(stack)
except ConnectionError: