diff options
-rw-r--r-- | redis/client.py | 70 | ||||
-rw-r--r-- | tests/pipeline.py | 30 |
2 files changed, 87 insertions, 13 deletions
diff --git a/redis/client.py b/redis/client.py index d0f7516..f5a3447 100644 --- a/redis/client.py +++ b/redis/client.py @@ -225,7 +225,7 @@ class Redis(threading.local): self.connection.disconnect() return self._execute_command(command_name, command, **options) - def _parse_response(self, command_name): + def _parse_response(self, command_name, catch_errors): conn = self.connection response = conn.read().strip() if not response: @@ -261,13 +261,27 @@ class Redis(threading.local): length = int(response) if length == -1: return None - return [self._parse_response(command_name) for i in range(length)] + if not catch_errors: + return [self._parse_response(command_name, catch_errors) + for i in range(length)] + else: + # for pipelines, we need to read everything, including response errors. + # otherwise we'd completely mess up the receive buffer + data = [] + for i in range(length): + try: + data.append( + self._parse_response(command_name, catch_errors) + ) + except Exception, e: + data.append(e) + return data raise InvalidResponse("Unknown response type for: %s" % command_name) - def parse_response(self, command_name, **options): + def parse_response(self, command_name, catch_errors=False, **options): "Parses a response from the Redis server" - response = self._parse_response(command_name) + response = self._parse_response(command_name, catch_errors) if command_name in self.RESPONSE_CALLBACKS: return self.RESPONSE_CALLBACKS[command_name](response, **options) return response @@ -886,16 +900,27 @@ class Pipeline(Redis): in one transmission. This is convenient for batch processing, such as saving all the values in a list to Redis. - Note that pipelining does *not* guarantee all the commands will be executed - together atomically, nor does it guarantee any transactional consistency. - If the third command in the batch fails, the first two will still have been - executed and "committed" + All commands executed within a pipeline are wrapped with MULTI and EXEC + calls. This guarantees all commands executed in the pipeline will be + executed atomically. + + Any command raising an exception does *not* halt the execution of + subsequent commands in the pipeline. Instead, the exception is caught + and its instance is placed into the response list returned by execute(). + Code iterating over the response list should be able to deal with an + instance of an exception as a potential value. In general, these will be + ResponseError exceptions, such as those raised when issuing a command + on a key of a different datatype. """ def __init__(self, connection, charset, errors): self.connection = connection - self.command_stack = [] self.encoding = charset self.errors = errors + self.reset() + + def reset(self): + self.command_stack = [] + self.format_inline('MULTI') def execute_command(self, command_name, command, **options): """ @@ -921,15 +946,34 @@ class Pipeline(Redis): return self def _execute(self, commands): - for _, command, options in commands: + for name, command, options in commands: self.connection.send(command, self) - return [self.parse_response(name, **options) - for name, _, options in commands] + # we only care about the last item in the response, which should be + # the EXEC command + for i in range(len(commands)-1): + _ = self.parse_response('_') + # tell the response parse to catch errors and return them as + # part of the response + response = self.parse_response('_', catch_errors=True) + # don't return the results of the MULTI or EXEC command + commands = [(c[0], c[2]) for c in commands[1:-1]] + if len(response) != len(commands): + raise ResponseError("Wrong number of response items from " + "pipline execution") + # Run any callbacks for the commands run in the pipeline + data = [] + for r, cmd in zip(response, commands): + if not isinstance(r, Exception): + if cmd[0] in self.RESPONSE_CALLBACKS: + r = self.RESPONSE_CALLBACKS[cmd[0]](r, **cmd[1]) + data.append(r) + return data def execute(self): "Execute all the commands in the current pipeline" + self.format_inline('EXEC') stack = self.command_stack - self.command_stack = [] + self.reset() try: return self._execute(stack) except ConnectionError: diff --git a/tests/pipeline.py b/tests/pipeline.py index cd130d1..da85e55 100644 --- a/tests/pipeline.py +++ b/tests/pipeline.py @@ -23,3 +23,33 @@ class PipelineTestCase(unittest.TestCase): [('z1', 2.0), ('z2', 4)] ] ) + + def test_invalid_command_in_pipeline(self): + # all commands but the invalid one should be excuted correctly + self.client['c'] = 'a' + pipe = self.client.pipeline() + pipe.set('a', 1).set('b', 2).lpush('c', 3).set('d', 4) + result = pipe.execute() + + self.assertEquals(result[0], True) + self.assertEquals(self.client['a'], '1') + self.assertEquals(result[1], True) + self.assertEquals(self.client['b'], '2') + # we can't lpush to a key that's a string value, so this should + # be a ResponseError exception + self.assert_(isinstance(result[2], redis.ResponseError)) + self.assertEquals(self.client['c'], 'a') + self.assertEquals(result[3], True) + self.assertEquals(self.client['d'], '4') + + # make sure the pipe was restored to a working state + self.assertEquals(pipe.set('z', 'zzz').execute(), [True]) + self.assertEquals(self.client['z'], 'zzz') + + def test_pipe_cannot_select(self): + pipe = self.client.pipeline() + self.assertRaises(redis.RedisError, + pipe.select, 'localhost', 6379, db=9) + + + |