summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorandy <andy@whiskeymedia.com>2011-07-12 12:37:42 -0700
committerandy <andy@whiskeymedia.com>2011-07-12 12:37:42 -0700
commit5c68cabecba2a2c2c02a7689def0f5cb9a11e6af (patch)
treecb2f6f1587404c14236c51839071f175882f92d2
parent3b40192740b94f1e5a6bf45ecae30cf6a8cb9f07 (diff)
downloadredis-py-watch.tar.gz
Attempt our best to retry pipeline connection failures where they make sense.watch
-rw-r--r--redis/client.py33
1 files changed, 21 insertions, 12 deletions
diff --git a/redis/client.py b/redis/client.py
index 6f65c48..123f225 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -1231,15 +1231,17 @@ class Pipeline(Redis):
self.explicit_transaction = True
def execute_command(self, *args, **kwargs):
- if self.watching and not self.explicit_transaction:
+ if (self.watching or args[0] == 'WATCH') and \
+ not self.explicit_transaction:
return self.immediate_execute_command(*args, **kwargs)
return self.pipeline_execute_command(*args, **kwargs)
def immediate_execute_command(self, *args, **options):
"""
Execute a command immediately, but don't auto-retry on a
- ConnectionError. Used when issuing WATCH or subsequent commands
- retrieving their values but before MULTI is called.
+ ConnectionError if we're already WATCHing a variable. Used when
+ issuing WATCH or subsequent commands retrieving their values but before
+ MULTI is called.
"""
command_name = args[0]
conn = self.connection
@@ -1252,6 +1254,12 @@ class Pipeline(Redis):
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)
except ConnectionError:
+ conn.disconnect()
+ # if we're not already watching, we can safely retry the command
+ # assuming it was a connection timeout
+ if not self.watching:
+ conn.send_command(*args)
+ return self.parse_response(conn, command_name, **options)
self.reset()
raise
@@ -1312,9 +1320,9 @@ class Pipeline(Redis):
def parse_response(self, connection, command_name, **options):
result = super(Pipeline, self).parse_response(
connection, command_name, **options)
- if command_name in self.__class__.UNWATCH_COMMANDS:
+ if command_name in self.UNWATCH_COMMANDS:
self.watching = False
- if command_name == 'WATCH':
+ elif command_name == 'WATCH':
self.watching = True
return result
@@ -1333,9 +1341,14 @@ class Pipeline(Redis):
except ConnectionError:
conn.disconnect()
# if we were watching a variable, the watch is no longer valid since
- # this connection has died.
+ # this connection has died. raise a WatchError, which indicates
+ # the user should retry his transaction. If this is more than a
+ # complete failure, the WATCH that the user next issue will fail,
+ # propegating the real ConnectionError
if self.watching:
raise WatchError("Watched variable changed.")
+ # otherwise, it's safe to retry since the transaction isn't
+ # predicated on any state
return execute(conn, stack)
finally:
self.reset()
@@ -1346,18 +1359,14 @@ class Pipeline(Redis):
"""
if self.explicit_transaction:
raise RedisError('Cannot issue a WATCH after a MULTI')
- self.watching = True
return self.execute_command('WATCH', *names)
def unwatch(self):
"""
Unwatches all previously specified keys
"""
- if self.watching:
- response = self.execute_command('UNWATCH')
- else:
- response = True
- return response
+ return self.watching and self.execute_command('UNWATCH') or True
+
class LockError(RedisError):
"Errors thrown from the Lock"