summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2011-07-11 00:14:39 -0700
committerAndy McCurdy <andy@andymccurdy.com>2011-07-11 00:14:39 -0700
commit53c928d44acd3d1fbcb3896cadad0bde4671987a (patch)
tree52ce6641f8136fc0ae972d82f37db44ace96f2ae /redis/client.py
parent24b0f17a80b51ed2d7f7e1ca139428f27bf642c9 (diff)
downloadredis-py-53c928d44acd3d1fbcb3896cadad0bde4671987a.tar.gz
WATCH and UNWATCH have been broken since 2.4 because of connection pooling. This fix moves WATCH and UNWATCH to the Pipeline class, where they belong and tests to prove they work.
Diffstat (limited to 'redis/client.py')
-rw-r--r--redis/client.py92
1 files changed, 85 insertions, 7 deletions
diff --git a/redis/client.py b/redis/client.py
index 93ff9a9..8b8e7a8 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -503,13 +503,13 @@ class Redis(object):
"""
Watches the values at keys ``names``, or None if the key doesn't exist
"""
- return self.execute_command('WATCH', *names)
+ warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))
def unwatch(self):
"""
Unwatches the value at key ``name``, or None of the key doesn't exist
"""
- return self.execute_command('UNWATCH')
+ warnings.warn(DeprecationWarning('Call UNWATCH from a Pipeline object'))
#### LIST COMMANDS ####
def blpop(self, keys, timeout=0):
@@ -1177,17 +1177,61 @@ class Pipeline(Redis):
def __init__(self, connection_pool, response_callbacks, transaction,
shard_hint):
self.connection_pool = connection_pool
+ self.connection = None
self.response_callbacks = response_callbacks
self.transaction = transaction
self.shard_hint = shard_hint
+
+ self._real_exec = self.default_execute_command
+ self._pipe_exec = self.pipeline_execute_command
self.reset()
+ def _get_watch(self):
+ return self._watching
+
+ def _set_watch(self, value):
+ self._watching = value
+ self.execute_command = value and self._real_exec or self._pipe_exec
+
+ watching = property(_get_watch, _set_watch)
+
def reset(self):
self.command_stack = []
+ self.watching = False
if self.transaction:
self.execute_command('MULTI')
+ if self.connection:
+ self.connection_pool.release(self.connection)
+ self.connection = None
- def execute_command(self, *args, **options):
+ def multi(self):
+ """
+ Start a transactional block of the pipeline after WATCH commands
+ are issued. End the transactional block with `execute`.
+ """
+ self.execute_command = self._pipe_exec
+
+ def default_execute_command(self, *args, **options):
+ """
+ Execute a command, but don't auto-retry on a ConnectionError. Used
+ when issuing WATCH or subsequent commands retrieving their values
+ but before MULTI is called.
+ """
+ command_name = args[0]
+ conn = self.connection
+ # if this is the first call, we need a connection
+ if not conn:
+ conn = self.connection_pool.get_connection(command_name,
+ self.shard_hint)
+ self.connection = conn
+ try:
+ conn.send_command(*args)
+ return self.parse_response(conn, command_name, **options)
+ except ConnectionError:
+ self.reset()
+ raise
+
+ def pipeline_execute_command(self, *args, **options):
"""
Stage a command to be executed when execute() is next called
@@ -1221,7 +1265,7 @@ class Pipeline(Redis):
if len(response) != len(commands):
raise ResponseError("Wrong number of response items from "
- "pipeline execution")
+ "pipeline execution")
# We have to run response callbacks manually
data = []
for r, cmd in izip(response, commands):
@@ -1249,16 +1293,50 @@ class Pipeline(Redis):
else:
execute = self._execute_pipeline
stack = self.command_stack
- self.reset()
- conn = self.connection_pool.get_connection('MULTI', self.shard_hint)
+ conn = self.connection or \
+ self.connection_pool.get_connection('MULTI', self.shard_hint)
try:
return execute(conn, stack)
except ConnectionError:
conn.disconnect()
+ # if we watching a variable, the watch is no longer valid since
+ # this conncetion has died.
+ if self.watching:
+ raise WatchError("Watched variable changed.")
return execute(conn, stack)
finally:
- self.connection_pool.release(conn)
+ self.reset()
+
+ def watch(self, *names):
+ """
+ Watches the values at keys ``names``
+ """
+ if not self.transaction:
+ raise RedisError("Can only WATCH when using transactions")
+ # if more than 'MULTI' is in the command_stack, we can't WATCH anymore
+ if self.watching and len(self.command_stack) > 1:
+ raise RedisError("Can only WATCH before issuing pipeline commands")
+ self.watching = True
+ return self.execute_command('WATCH', *names)
+ def unwatch(self):
+ """
+ Unwatches all previously specified keys
+ """
+ if not self.transaction:
+ raise RedisError("Can only UNWATCH when using transactions")
+ # if more than 'MULTI' is in the command_stack, we can't UNWATCH anymore
+ if self.watching:
+ if len(self.command_stack) > 1:
+ raise RedisError("Can only UNWATCH before issuing "
+ "pipeline commands")
+ response = self.execute_command('UNWATCH')
+ else:
+ response = True
+ # it's safe to reset() here because we are no longer bound to a
+ # single connection and we're sure the command stack is empty.
+ self.reset()
+ return response
class LockError(RedisError):
"Errors thrown from the Lock"