diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2010-03-31 19:20:42 -0700 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2010-03-31 19:20:42 -0700 |
commit | 74d4666a4190541ee11da3ec629a60b9b1ce6d9d (patch) | |
tree | a16d0d50866365f990df3f8e6af9fe7c4d94d4e7 /redis/client.py | |
parent | b33a6fc9351e19fb058eb5302d1f185ec50c75c6 (diff) | |
download | redis-py-74d4666a4190541ee11da3ec629a60b9b1ce6d9d.tar.gz |
Pipeines can not optionally be transactions (wrapped in MULTI/EXEC) or not by passing the transaction parameter. This fixes #23.
Diffstat (limited to 'redis/client.py')
-rw-r--r-- | redis/client.py | 64 |
1 files changed, 46 insertions, 18 deletions
diff --git a/redis/client.py b/redis/client.py index 496e28f..1c0e8fa 100644 --- a/redis/client.py +++ b/redis/client.py @@ -3,6 +3,7 @@ import errno import socket import threading import warnings +from itertools import chain from redis.exceptions import ConnectionError, ResponseError, InvalidResponse from redis.exceptions import RedisError, AuthenticationError @@ -246,8 +247,20 @@ class Redis(threading.local): return self.connection.db db = property(_get_db) - def pipeline(self): - return Pipeline(self.connection, self.encoding, self.errors) + def pipeline(self, transaction=True): + """ + Return a new pipeline object that can queue multiple commands for + later execution. ``transaction`` indicates whether all commands + should be executed atomically. Apart from multiple atomic operations, + pipelines are useful for batch loading of data as they reduce the + number of back and forth network operations between client and server. + """ + return Pipeline( + self.connection, + transaction, + self.encoding, + self.errors + ) #### COMMAND EXECUTION AND PROTOCOL PARSING #### @@ -1032,8 +1045,9 @@ class Pipeline(Redis): ResponseError exceptions, such as those raised when issuing a command on a key of a different datatype. """ - def __init__(self, connection, charset, errors): + def __init__(self, connection, transaction, charset, errors): self.connection = connection + self.transaction = transaction self.encoding = charset self.errors = errors self.subscribed = False # NOTE not in use, but necessary @@ -1041,7 +1055,6 @@ class Pipeline(Redis): def reset(self): self.command_stack = [] - self.execute_command('MULTI') def _execute_command(self, command_name, command, **options): """ @@ -1066,19 +1079,20 @@ class Pipeline(Redis): self.command_stack.append((command_name, command, options)) return self - def _execute(self, commands): - # build up all commands into a single request to increase network perf - all_cmds = ''.join([c for _1, c, _2 in commands]) + def _execute_transaction(self, commands): + # wrap the commands in MULTI ... EXEC statements to indicate an + # atomic operation + all_cmds = ''.join([c for _1, c, _2 in chain( + (('', 'MULTI\r\n', ''),), + commands, + (('', 'EXEC\r\n', ''),) + )]) self.connection.send(all_cmds, self) - # we only care about the last item in the response, which should be - # the EXEC command - for i in range(len(commands)-1): + # parse off the response for MULTI and all commands prior to EXEC + for i in range(len(commands)+1): _ = self.parse_response('_') - # tell the response parse to catch errors and return them as - # part of the response + # parse the EXEC. we want errors returned as items in the response response = self.parse_response('_', catch_errors=True) - # don't return the results of the MULTI or EXEC command - commands = [(c[0], c[2]) for c in commands[1:-1]] if len(response) != len(commands): raise ResponseError("Wrong number of response items from " "pipline execution") @@ -1087,20 +1101,34 @@ class Pipeline(Redis): for r, cmd in zip(response, commands): if not isinstance(r, Exception): if cmd[0] in self.RESPONSE_CALLBACKS: - r = self.RESPONSE_CALLBACKS[cmd[0]](r, **cmd[1]) + r = self.RESPONSE_CALLBACKS[cmd[0]](r, **cmd[2]) data.append(r) return data + def _execute_pipeline(self, commands): + # build up all commands into a single request to increase network perf + all_cmds = ''.join([c for _1, c, _2 in commands]) + self.connection.send(all_cmds, self) + data = [] + for command_name, _, options in commands: + data.append( + self.parse_response(command_name, catch_errors=True, **options) + ) + return data + def execute(self): "Execute all the commands in the current pipeline" - self.execute_command('EXEC') stack = self.command_stack self.reset() + if self.transaction: + execute = self._execute_transaction + else: + execute = self._execute_pipeline try: - return self._execute(stack) + return execute(stack) except ConnectionError: self.connection.disconnect() - return self._execute(stack) + return execute(stack) def select(self, *args, **kwargs): raise RedisError("Cannot select a different database from a pipeline") |