diff options
Diffstat (limited to 'redis/client.py')
-rw-r--r-- | redis/client.py | 70 |
1 files changed, 57 insertions, 13 deletions
diff --git a/redis/client.py b/redis/client.py index d0f7516..f5a3447 100644 --- a/redis/client.py +++ b/redis/client.py @@ -225,7 +225,7 @@ class Redis(threading.local): self.connection.disconnect() return self._execute_command(command_name, command, **options) - def _parse_response(self, command_name): + def _parse_response(self, command_name, catch_errors): conn = self.connection response = conn.read().strip() if not response: @@ -261,13 +261,27 @@ class Redis(threading.local): length = int(response) if length == -1: return None - return [self._parse_response(command_name) for i in range(length)] + if not catch_errors: + return [self._parse_response(command_name, catch_errors) + for i in range(length)] + else: + # for pipelines, we need to read everything, including response errors. + # otherwise we'd completely mess up the receive buffer + data = [] + for i in range(length): + try: + data.append( + self._parse_response(command_name, catch_errors) + ) + except Exception, e: + data.append(e) + return data raise InvalidResponse("Unknown response type for: %s" % command_name) - def parse_response(self, command_name, **options): + def parse_response(self, command_name, catch_errors=False, **options): "Parses a response from the Redis server" - response = self._parse_response(command_name) + response = self._parse_response(command_name, catch_errors) if command_name in self.RESPONSE_CALLBACKS: return self.RESPONSE_CALLBACKS[command_name](response, **options) return response @@ -886,16 +900,27 @@ class Pipeline(Redis): in one transmission. This is convenient for batch processing, such as saving all the values in a list to Redis. - Note that pipelining does *not* guarantee all the commands will be executed - together atomically, nor does it guarantee any transactional consistency. - If the third command in the batch fails, the first two will still have been - executed and "committed" + All commands executed within a pipeline are wrapped with MULTI and EXEC + calls. This guarantees all commands executed in the pipeline will be + executed atomically. + + Any command raising an exception does *not* halt the execution of + subsequent commands in the pipeline. Instead, the exception is caught + and its instance is placed into the response list returned by execute(). + Code iterating over the response list should be able to deal with an + instance of an exception as a potential value. In general, these will be + ResponseError exceptions, such as those raised when issuing a command + on a key of a different datatype. """ def __init__(self, connection, charset, errors): self.connection = connection - self.command_stack = [] self.encoding = charset self.errors = errors + self.reset() + + def reset(self): + self.command_stack = [] + self.format_inline('MULTI') def execute_command(self, command_name, command, **options): """ @@ -921,15 +946,34 @@ class Pipeline(Redis): return self def _execute(self, commands): - for _, command, options in commands: + for name, command, options in commands: self.connection.send(command, self) - return [self.parse_response(name, **options) - for name, _, options in commands] + # we only care about the last item in the response, which should be + # the EXEC command + for i in range(len(commands)-1): + _ = self.parse_response('_') + # tell the response parse to catch errors and return them as + # part of the response + response = self.parse_response('_', catch_errors=True) + # don't return the results of the MULTI or EXEC command + commands = [(c[0], c[2]) for c in commands[1:-1]] + if len(response) != len(commands): + raise ResponseError("Wrong number of response items from " + "pipline execution") + # Run any callbacks for the commands run in the pipeline + data = [] + for r, cmd in zip(response, commands): + if not isinstance(r, Exception): + if cmd[0] in self.RESPONSE_CALLBACKS: + r = self.RESPONSE_CALLBACKS[cmd[0]](r, **cmd[1]) + data.append(r) + return data def execute(self): "Execute all the commands in the current pipeline" + self.format_inline('EXEC') stack = self.command_stack - self.command_stack = [] + self.reset() try: return self._execute(stack) except ConnectionError: |