summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2011-06-06 19:30:42 -0700
committerAndy McCurdy <andy@andymccurdy.com>2011-06-06 19:30:42 -0700
commitf200f6fee3f3e9d410942014cdd34850f6ecd8e8 (patch)
tree834373c9c16423f89a23d673cb3a61d4ba5fa6ec /redis/client.py
parentfc41ca2d30f972f3360526802da2885c576c233c (diff)
downloadredis-py-f200f6fee3f3e9d410942014cdd34850f6ecd8e8.tar.gz
get a connection within the execute() method of the pipeline so we can respond to a ConnectionError. Fix for #139
Diffstat (limited to 'redis/client.py')
-rw-r--r--redis/client.py94
1 files changed, 45 insertions, 49 deletions
diff --git a/redis/client.py b/redis/client.py
index 14dde9e..b9ddb4f 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -1202,52 +1202,44 @@ class Pipeline(Redis):
self.command_stack.append((args, options))
return self
- def _execute_transaction(self, commands):
- conn = self.connection_pool.get_connection('MULTI', self.shard_hint)
- try:
- all_cmds = ''.join(starmap(conn.pack_command,
- [args for args, options in commands]))
- conn.send_packed_command(all_cmds)
- # we don't care about the multi/exec any longer
- commands = commands[1:-1]
- # parse off the response for MULTI and all commands prior to EXEC.
- # the only data we care about is the response the EXEC
- # which is the last command
- for i in range(len(commands)+1):
- _ = self.parse_response(conn, '_')
- # parse the EXEC.
- response = self.parse_response(conn, '_')
-
- if response is None:
- raise WatchError("Watched variable changed.")
-
- if len(response) != len(commands):
- raise ResponseError("Wrong number of response items from "
- "pipeline execution")
- # We have to run response callbacks manually
- data = []
- for r, cmd in izip(response, commands):
- if not isinstance(r, Exception):
- args, options = cmd
- command_name = args[0]
- if command_name in self.response_callbacks:
- r = self.response_callbacks[command_name](r, **options)
- data.append(r)
- return data
- finally:
- self.connection_pool.release(conn)
-
- def _execute_pipeline(self, commands):
- # build up all commands into a single request to increase network perf
- conn = self.connection_pool.get_connection('MULTI', self.shard_hint)
- try:
- all_cmds = ''.join(starmap(conn.pack_command,
- [args for args, options in commands]))
- conn.send_packed_command(all_cmds)
- return [self.parse_response(conn, args[0], **options)
- for args, options in commands]
- finally:
- self.connection_pool.release(conn)
+ def _execute_transaction(self, connection, commands):
+ all_cmds = ''.join(starmap(connection.pack_command,
+ [args for args, options in commands]))
+ connection.send_packed_command(all_cmds)
+ # we don't care about the multi/exec any longer
+ commands = commands[1:-1]
+ # parse off the response for MULTI and all commands prior to EXEC.
+ # the only data we care about is the response the EXEC
+ # which is the last command
+ for i in range(len(commands)+1):
+ self.parse_response(connection, '_')
+ # parse the EXEC.
+ response = self.parse_response(connection, '_')
+
+ if response is None:
+ raise WatchError("Watched variable changed.")
+
+ if len(response) != len(commands):
+ raise ResponseError("Wrong number of response items from "
+ "pipeline execution")
+ # We have to run response callbacks manually
+ data = []
+ for r, cmd in izip(response, commands):
+ if not isinstance(r, Exception):
+ args, options = cmd
+ command_name = args[0]
+ if command_name in self.response_callbacks:
+ r = self.response_callbacks[command_name](r, **options)
+ data.append(r)
+ return data
+
+ def _execute_pipeline(self, connection, commands):
+ # build up all commands into a single request to increase network perf
+ all_cmds = ''.join(starmap(connection.pack_command,
+ [args for args, options in commands]))
+ connection.send_packed_command(all_cmds)
+ return [self.parse_response(connection, args[0], **options)
+ for args, options in commands]
def execute(self):
"Execute all the commands in the current pipeline"
@@ -1258,11 +1250,15 @@ class Pipeline(Redis):
execute = self._execute_pipeline
stack = self.command_stack
self.reset()
+ conn = self.connection_pool.get_connection('MULTI', self.shard_hint)
try:
- return execute(stack)
+ return execute(conn, stack)
except ConnectionError:
- connection.disconnect()
- return execute(stack)
+ conn.disconnect()
+ return execute(conn, stack)
+ finally:
+ self.connection_pool.release(conn)
+
class LockError(RedisError):
"Errors thrown from the Lock"