diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2011-06-06 19:30:42 -0700 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2011-06-06 19:30:42 -0700 |
commit | f200f6fee3f3e9d410942014cdd34850f6ecd8e8 (patch) | |
tree | 834373c9c16423f89a23d673cb3a61d4ba5fa6ec /redis/client.py | |
parent | fc41ca2d30f972f3360526802da2885c576c233c (diff) | |
download | redis-py-f200f6fee3f3e9d410942014cdd34850f6ecd8e8.tar.gz |
get a connection within the execute() method of the pipeline so we can respond to a ConnectionError. Fix for #139
Diffstat (limited to 'redis/client.py')
-rw-r--r-- | redis/client.py | 94 |
1 files changed, 45 insertions, 49 deletions
diff --git a/redis/client.py b/redis/client.py index 14dde9e..b9ddb4f 100644 --- a/redis/client.py +++ b/redis/client.py @@ -1202,52 +1202,44 @@ class Pipeline(Redis): self.command_stack.append((args, options)) return self - def _execute_transaction(self, commands): - conn = self.connection_pool.get_connection('MULTI', self.shard_hint) - try: - all_cmds = ''.join(starmap(conn.pack_command, - [args for args, options in commands])) - conn.send_packed_command(all_cmds) - # we don't care about the multi/exec any longer - commands = commands[1:-1] - # parse off the response for MULTI and all commands prior to EXEC. - # the only data we care about is the response the EXEC - # which is the last command - for i in range(len(commands)+1): - _ = self.parse_response(conn, '_') - # parse the EXEC. - response = self.parse_response(conn, '_') - - if response is None: - raise WatchError("Watched variable changed.") - - if len(response) != len(commands): - raise ResponseError("Wrong number of response items from " - "pipeline execution") - # We have to run response callbacks manually - data = [] - for r, cmd in izip(response, commands): - if not isinstance(r, Exception): - args, options = cmd - command_name = args[0] - if command_name in self.response_callbacks: - r = self.response_callbacks[command_name](r, **options) - data.append(r) - return data - finally: - self.connection_pool.release(conn) - - def _execute_pipeline(self, commands): - # build up all commands into a single request to increase network perf - conn = self.connection_pool.get_connection('MULTI', self.shard_hint) - try: - all_cmds = ''.join(starmap(conn.pack_command, - [args for args, options in commands])) - conn.send_packed_command(all_cmds) - return [self.parse_response(conn, args[0], **options) - for args, options in commands] - finally: - self.connection_pool.release(conn) + def _execute_transaction(self, connection, commands): + all_cmds = ''.join(starmap(connection.pack_command, + [args for args, options in commands])) + connection.send_packed_command(all_cmds) + # we don't care about the multi/exec any longer + commands = commands[1:-1] + # parse off the response for MULTI and all commands prior to EXEC. + # the only data we care about is the response the EXEC + # which is the last command + for i in range(len(commands)+1): + self.parse_response(connection, '_') + # parse the EXEC. + response = self.parse_response(connection, '_') + + if response is None: + raise WatchError("Watched variable changed.") + + if len(response) != len(commands): + raise ResponseError("Wrong number of response items from " + "pipeline execution") + # We have to run response callbacks manually + data = [] + for r, cmd in izip(response, commands): + if not isinstance(r, Exception): + args, options = cmd + command_name = args[0] + if command_name in self.response_callbacks: + r = self.response_callbacks[command_name](r, **options) + data.append(r) + return data + + def _execute_pipeline(self, connection, commands): + # build up all commands into a single request to increase network perf + all_cmds = ''.join(starmap(connection.pack_command, + [args for args, options in commands])) + connection.send_packed_command(all_cmds) + return [self.parse_response(connection, args[0], **options) + for args, options in commands] def execute(self): "Execute all the commands in the current pipeline" @@ -1258,11 +1250,15 @@ class Pipeline(Redis): execute = self._execute_pipeline stack = self.command_stack self.reset() + conn = self.connection_pool.get_connection('MULTI', self.shard_hint) try: - return execute(stack) + return execute(conn, stack) except ConnectionError: - connection.disconnect() - return execute(stack) + conn.disconnect() + return execute(conn, stack) + finally: + self.connection_pool.release(conn) + class LockError(RedisError): "Errors thrown from the Lock" |