summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/client.py')
-rw-r--r--redis/client.py92
1 files changed, 85 insertions, 7 deletions
diff --git a/redis/client.py b/redis/client.py
index 93ff9a9..8b8e7a8 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -503,13 +503,13 @@ class Redis(object):
"""
Watches the values at keys ``names``, or None if the key doesn't exist
"""
- return self.execute_command('WATCH', *names)
+ warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))
def unwatch(self):
"""
Unwatches the value at key ``name``, or None of the key doesn't exist
"""
- return self.execute_command('UNWATCH')
+ warnings.warn(DeprecationWarning('Call UNWATCH from a Pipeline object'))
#### LIST COMMANDS ####
def blpop(self, keys, timeout=0):
@@ -1177,17 +1177,61 @@ class Pipeline(Redis):
def __init__(self, connection_pool, response_callbacks, transaction,
shard_hint):
self.connection_pool = connection_pool
+ self.connection = None
self.response_callbacks = response_callbacks
self.transaction = transaction
self.shard_hint = shard_hint
+
+ self._real_exec = self.default_execute_command
+ self._pipe_exec = self.pipeline_execute_command
self.reset()
+ def _get_watch(self):
+ return self._watching
+
+ def _set_watch(self, value):
+ self._watching = value
+ self.execute_command = value and self._real_exec or self._pipe_exec
+
+ watching = property(_get_watch, _set_watch)
+
def reset(self):
self.command_stack = []
+ self.watching = False
if self.transaction:
self.execute_command('MULTI')
+ if self.connection:
+ self.connection_pool.release(self.connection)
+ self.connection = None
- def execute_command(self, *args, **options):
+ def multi(self):
+ """
+ Start a transactional block of the pipeline after WATCH commands
+ are issued. End the transactional block with `execute`.
+ """
+ self.execute_command = self._pipe_exec
+
+ def default_execute_command(self, *args, **options):
+ """
+ Execute a command, but don't auto-retry on a ConnectionError. Used
+ when issuing WATCH or subsequent commands retrieving their values
+ but before MULTI is called.
+ """
+ command_name = args[0]
+ conn = self.connection
+ # if this is the first call, we need a connection
+ if not conn:
+ conn = self.connection_pool.get_connection(command_name,
+ self.shard_hint)
+ self.connection = conn
+ try:
+ conn.send_command(*args)
+ return self.parse_response(conn, command_name, **options)
+ except ConnectionError:
+ self.reset()
+ raise
+
+ def pipeline_execute_command(self, *args, **options):
"""
Stage a command to be executed when execute() is next called
@@ -1221,7 +1265,7 @@ class Pipeline(Redis):
if len(response) != len(commands):
raise ResponseError("Wrong number of response items from "
- "pipeline execution")
+ "pipeline execution")
# We have to run response callbacks manually
data = []
for r, cmd in izip(response, commands):
@@ -1249,16 +1293,50 @@ class Pipeline(Redis):
else:
execute = self._execute_pipeline
stack = self.command_stack
- self.reset()
- conn = self.connection_pool.get_connection('MULTI', self.shard_hint)
+ conn = self.connection or \
+ self.connection_pool.get_connection('MULTI', self.shard_hint)
try:
return execute(conn, stack)
except ConnectionError:
conn.disconnect()
+ # if we watching a variable, the watch is no longer valid since
+ # this conncetion has died.
+ if self.watching:
+ raise WatchError("Watched variable changed.")
return execute(conn, stack)
finally:
- self.connection_pool.release(conn)
+ self.reset()
+
+ def watch(self, *names):
+ """
+ Watches the values at keys ``names``
+ """
+ if not self.transaction:
+ raise RedisError("Can only WATCH when using transactions")
+ # if more than 'MULTI' is in the command_stack, we can't WATCH anymore
+ if self.watching and len(self.command_stack) > 1:
+ raise RedisError("Can only WATCH before issuing pipeline commands")
+ self.watching = True
+ return self.execute_command('WATCH', *names)
+ def unwatch(self):
+ """
+ Unwatches all previously specified keys
+ """
+ if not self.transaction:
+ raise RedisError("Can only UNWATCH when using transactions")
+ # if more than 'MULTI' is in the command_stack, we can't UNWATCH anymore
+ if self.watching:
+ if len(self.command_stack) > 1:
+ raise RedisError("Can only UNWATCH before issuing "
+ "pipeline commands")
+ response = self.execute_command('UNWATCH')
+ else:
+ response = True
+ # it's safe to reset() here because we are no longer bound to a
+ # single connection and we're sure the command stack is empty.
+ self.reset()
+ return response
class LockError(RedisError):
"Errors thrown from the Lock"