diff options
author | Randall Leeds <randall.leeds@gmail.com> | 2011-07-11 20:02:56 -0700 |
---|---|---|
committer | Randall Leeds <randall.leeds@gmail.com> | 2011-07-11 20:02:56 -0700 |
commit | 7a40f7da6e1bf812e94b94d1863321cc792015dd (patch) | |
tree | 416286b3cfc7ef5c59b77d4570cf0671c59b3fba | |
parent | 9e918fdb481aef700561592121322a3a82072f72 (diff) | |
parent | 4fb889deed3dd1921b8e39faaf07a67dd518e84d (diff) | |
download | redis-py-7a40f7da6e1bf812e94b94d1863321cc792015dd.tar.gz |
Merge remote-tracking branch 'wolever/watch' into watch_fixes
-rw-r--r-- | redis/client.py | 57 | ||||
-rw-r--r-- | tests/pipeline.py | 25 |
2 files changed, 48 insertions, 34 deletions
diff --git a/redis/client.py b/redis/client.py index a58dc9e..b68c1df 100644 --- a/redis/client.py +++ b/redis/client.py @@ -1210,20 +1210,9 @@ class Pipeline(Redis): self.transaction = transaction self.shard_hint = shard_hint - self._real_exec = self.default_execute_command - self._pipe_exec = self.pipeline_execute_command - self._watching = False + self.watching = False self.reset() - def _get_watch(self): - return self._watching - - def _set_watch(self, value): - self._watching = value - self.execute_command = value and self._real_exec or self._pipe_exec - - watching = property(_get_watch, _set_watch) - def reset(self): self.command_stack = [] # make sure to reset the connection state in the event that we were @@ -1231,7 +1220,7 @@ class Pipeline(Redis): if self.watching and self.connection: try: # call this manually since our unwatch or - # default_execute_command methods can call reset() + # immediate_execute_command methods can call reset() self.connection.send_command('UNWATCH') self.connection.read_response() except ConnectionError: @@ -1239,8 +1228,7 @@ class Pipeline(Redis): self.connection.disconnect() # clean up the other instance attributes self.watching = False - if self.transaction: - self.execute_command('MULTI') + self.explicit_transaction = False # we can safely return the connection to the pool here since we're # sure we're no longer WATCHing anything if self.connection: @@ -1252,13 +1240,23 @@ class Pipeline(Redis): Start a transactional block of the pipeline after WATCH commands are issued. End the transactional block with `execute`. """ - self.execute_command = self._pipe_exec + if self.explicit_transaction: + raise RedisError('Cannot issue nested calls to MULTI') + if self.command_stack: + raise RedisError('Commands without an initial WATCH have already ' + 'been issued') + self.explicit_transaction = True - def default_execute_command(self, *args, **options): + def execute_command(self, *args, **kwargs): + if self.watching and not self.explicit_transaction: + return self.immediate_execute_command(*args, **kwargs) + return self.pipeline_execute_command(*args, **kwargs) + + def immediate_execute_command(self, *args, **options): """ - Execute a command, but don't auto-retry on a ConnectionError. Used - when issuing WATCH or subsequent commands retrieving their values - but before MULTI is called. + Execute a command immediately, but don't auto-retry on a + ConnectionError. Used when issuing WATCH or subsequent commands + retrieving their values but before MULTI is called. """ command_name = args[0] conn = self.connection @@ -1330,12 +1328,12 @@ class Pipeline(Redis): def execute(self): "Execute all the commands in the current pipeline" - if self.transaction: - self.execute_command('EXEC') + stack = self.command_stack + if self.transaction or self.explicit_transaction: + stack = [(('MULTI' ,), {})] + stack + [(('EXEC', ), {})] execute = self._execute_transaction else: execute = self._execute_pipeline - stack = self.command_stack conn = self.connection or \ self.connection_pool.get_connection('MULTI', self.shard_hint) try: @@ -1354,11 +1352,8 @@ class Pipeline(Redis): """ Watches the values at keys ``names`` """ - if not self.transaction: - raise RedisError("Can only WATCH when using transactions") - # if more than 'MULTI' is in the command_stack, we can't WATCH anymore - if self.watching and len(self.command_stack) > 1: - raise RedisError("Can only WATCH before issuing pipeline commands") + if self.explicit_transaction: + raise RedisError('Cannot issue a WATCH after a MULTI') self.watching = True return self.execute_command('WATCH', *names) @@ -1366,13 +1361,7 @@ class Pipeline(Redis): """ Unwatches all previously specified keys """ - if not self.transaction: - raise RedisError("Can only UNWATCH when using transactions") - # if more than 'MULTI' is in the command_stack, we can't UNWATCH anymore if self.watching: - if len(self.command_stack) > 1: - raise RedisError("Can only UNWATCH before issuing " - "pipeline commands") response = self.execute_command('UNWATCH') else: response = True diff --git a/tests/pipeline.py b/tests/pipeline.py index ee3b3c5..82ca4bc 100644 --- a/tests/pipeline.py +++ b/tests/pipeline.py @@ -32,6 +32,31 @@ class PipelineTestCase(unittest.TestCase): self.assertEquals(self.client['b'], 'b1') self.assertEquals(self.client['c'], 'c1') + def test_pipeline_no_transaction_watch(self): + self.client.set('a', 0) + + pipe = self.client.pipeline(transaction=False) + pipe.watch('a') + a = pipe.get('a') + + pipe.multi() + pipe.set('a', int(a) + 1) + result = pipe.execute() + self.assertEquals(result, [True]) + + def test_pipeline_no_transaction_watch_failure(self): + self.client.set('a', 0) + + pipe = self.client.pipeline(transaction=False) + pipe.watch('a') + a = pipe.get('a') + + self.client.set('a', 'bad') + + pipe.multi() + pipe.set('a', int(a) + 1) + self.assertRaises(redis.WatchError, pipe.execute) + def test_invalid_command_in_pipeline(self): # all commands but the invalid one should be excuted correctly self.client['c'] = 'a' |