diff options
Diffstat (limited to 'redis/client.py')
-rw-r--r-- | redis/client.py | 92 |
1 files changed, 85 insertions, 7 deletions
diff --git a/redis/client.py b/redis/client.py index 93ff9a9..8b8e7a8 100644 --- a/redis/client.py +++ b/redis/client.py @@ -503,13 +503,13 @@ class Redis(object): """ Watches the values at keys ``names``, or None if the key doesn't exist """ - return self.execute_command('WATCH', *names) + warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object')) def unwatch(self): """ Unwatches the value at key ``name``, or None of the key doesn't exist """ - return self.execute_command('UNWATCH') + warnings.warn(DeprecationWarning('Call UNWATCH from a Pipeline object')) #### LIST COMMANDS #### def blpop(self, keys, timeout=0): @@ -1177,17 +1177,61 @@ class Pipeline(Redis): def __init__(self, connection_pool, response_callbacks, transaction, shard_hint): self.connection_pool = connection_pool + self.connection = None self.response_callbacks = response_callbacks self.transaction = transaction self.shard_hint = shard_hint + + self._real_exec = self.default_execute_command + self._pipe_exec = self.pipeline_execute_command self.reset() + def _get_watch(self): + return self._watching + + def _set_watch(self, value): + self._watching = value + self.execute_command = value and self._real_exec or self._pipe_exec + + watching = property(_get_watch, _set_watch) + def reset(self): self.command_stack = [] + self.watching = False if self.transaction: self.execute_command('MULTI') + if self.connection: + self.connection_pool.release(self.connection) + self.connection = None - def execute_command(self, *args, **options): + def multi(self): + """ + Start a transactional block of the pipeline after WATCH commands + are issued. End the transactional block with `execute`. + """ + self.execute_command = self._pipe_exec + + def default_execute_command(self, *args, **options): + """ + Execute a command, but don't auto-retry on a ConnectionError. Used + when issuing WATCH or subsequent commands retrieving their values + but before MULTI is called. + """ + command_name = args[0] + conn = self.connection + # if this is the first call, we need a connection + if not conn: + conn = self.connection_pool.get_connection(command_name, + self.shard_hint) + self.connection = conn + try: + conn.send_command(*args) + return self.parse_response(conn, command_name, **options) + except ConnectionError: + self.reset() + raise + + def pipeline_execute_command(self, *args, **options): """ Stage a command to be executed when execute() is next called @@ -1221,7 +1265,7 @@ class Pipeline(Redis): if len(response) != len(commands): raise ResponseError("Wrong number of response items from " - "pipeline execution") + "pipeline execution") # We have to run response callbacks manually data = [] for r, cmd in izip(response, commands): @@ -1249,16 +1293,50 @@ class Pipeline(Redis): else: execute = self._execute_pipeline stack = self.command_stack - self.reset() - conn = self.connection_pool.get_connection('MULTI', self.shard_hint) + conn = self.connection or \ + self.connection_pool.get_connection('MULTI', self.shard_hint) try: return execute(conn, stack) except ConnectionError: conn.disconnect() + # if we watching a variable, the watch is no longer valid since + # this conncetion has died. + if self.watching: + raise WatchError("Watched variable changed.") return execute(conn, stack) finally: - self.connection_pool.release(conn) + self.reset() + + def watch(self, *names): + """ + Watches the values at keys ``names`` + """ + if not self.transaction: + raise RedisError("Can only WATCH when using transactions") + # if more than 'MULTI' is in the command_stack, we can't WATCH anymore + if self.watching and len(self.command_stack) > 1: + raise RedisError("Can only WATCH before issuing pipeline commands") + self.watching = True + return self.execute_command('WATCH', *names) + def unwatch(self): + """ + Unwatches all previously specified keys + """ + if not self.transaction: + raise RedisError("Can only UNWATCH when using transactions") + # if more than 'MULTI' is in the command_stack, we can't UNWATCH anymore + if self.watching: + if len(self.command_stack) > 1: + raise RedisError("Can only UNWATCH before issuing " + "pipeline commands") + response = self.execute_command('UNWATCH') + else: + response = True + # it's safe to reset() here because we are no longer bound to a + # single connection and we're sure the command stack is empty. + self.reset() + return response class LockError(RedisError): "Errors thrown from the Lock" |