summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandall Leeds <randall.leeds@gmail.com>2011-07-11 20:02:56 -0700
committerRandall Leeds <randall.leeds@gmail.com>2011-07-11 20:02:56 -0700
commit7a40f7da6e1bf812e94b94d1863321cc792015dd (patch)
tree416286b3cfc7ef5c59b77d4570cf0671c59b3fba
parent9e918fdb481aef700561592121322a3a82072f72 (diff)
parent4fb889deed3dd1921b8e39faaf07a67dd518e84d (diff)
downloadredis-py-7a40f7da6e1bf812e94b94d1863321cc792015dd.tar.gz
Merge remote-tracking branch 'wolever/watch' into watch_fixes
-rw-r--r--redis/client.py57
-rw-r--r--tests/pipeline.py25
2 files changed, 48 insertions, 34 deletions
diff --git a/redis/client.py b/redis/client.py
index a58dc9e..b68c1df 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -1210,20 +1210,9 @@ class Pipeline(Redis):
self.transaction = transaction
self.shard_hint = shard_hint
- self._real_exec = self.default_execute_command
- self._pipe_exec = self.pipeline_execute_command
- self._watching = False
+ self.watching = False
self.reset()
- def _get_watch(self):
- return self._watching
-
- def _set_watch(self, value):
- self._watching = value
- self.execute_command = value and self._real_exec or self._pipe_exec
-
- watching = property(_get_watch, _set_watch)
-
def reset(self):
self.command_stack = []
# make sure to reset the connection state in the event that we were
@@ -1231,7 +1220,7 @@ class Pipeline(Redis):
if self.watching and self.connection:
try:
# call this manually since our unwatch or
- # default_execute_command methods can call reset()
+ # immediate_execute_command methods can call reset()
self.connection.send_command('UNWATCH')
self.connection.read_response()
except ConnectionError:
@@ -1239,8 +1228,7 @@ class Pipeline(Redis):
self.connection.disconnect()
# clean up the other instance attributes
self.watching = False
- if self.transaction:
- self.execute_command('MULTI')
+ self.explicit_transaction = False
# we can safely return the connection to the pool here since we're
# sure we're no longer WATCHing anything
if self.connection:
@@ -1252,13 +1240,23 @@ class Pipeline(Redis):
Start a transactional block of the pipeline after WATCH commands
are issued. End the transactional block with `execute`.
"""
- self.execute_command = self._pipe_exec
+ if self.explicit_transaction:
+ raise RedisError('Cannot issue nested calls to MULTI')
+ if self.command_stack:
+ raise RedisError('Commands without an initial WATCH have already '
+ 'been issued')
+ self.explicit_transaction = True
- def default_execute_command(self, *args, **options):
+ def execute_command(self, *args, **kwargs):
+ if self.watching and not self.explicit_transaction:
+ return self.immediate_execute_command(*args, **kwargs)
+ return self.pipeline_execute_command(*args, **kwargs)
+
+ def immediate_execute_command(self, *args, **options):
"""
- Execute a command, but don't auto-retry on a ConnectionError. Used
- when issuing WATCH or subsequent commands retrieving their values
- but before MULTI is called.
+ Execute a command immediately, but don't auto-retry on a
+ ConnectionError. Used when issuing WATCH or subsequent commands
+ retrieving their values but before MULTI is called.
"""
command_name = args[0]
conn = self.connection
@@ -1330,12 +1328,12 @@ class Pipeline(Redis):
def execute(self):
"Execute all the commands in the current pipeline"
- if self.transaction:
- self.execute_command('EXEC')
+ stack = self.command_stack
+ if self.transaction or self.explicit_transaction:
+ stack = [(('MULTI' ,), {})] + stack + [(('EXEC', ), {})]
execute = self._execute_transaction
else:
execute = self._execute_pipeline
- stack = self.command_stack
conn = self.connection or \
self.connection_pool.get_connection('MULTI', self.shard_hint)
try:
@@ -1354,11 +1352,8 @@ class Pipeline(Redis):
"""
Watches the values at keys ``names``
"""
- if not self.transaction:
- raise RedisError("Can only WATCH when using transactions")
- # if more than 'MULTI' is in the command_stack, we can't WATCH anymore
- if self.watching and len(self.command_stack) > 1:
- raise RedisError("Can only WATCH before issuing pipeline commands")
+ if self.explicit_transaction:
+ raise RedisError('Cannot issue a WATCH after a MULTI')
self.watching = True
return self.execute_command('WATCH', *names)
@@ -1366,13 +1361,7 @@ class Pipeline(Redis):
"""
Unwatches all previously specified keys
"""
- if not self.transaction:
- raise RedisError("Can only UNWATCH when using transactions")
- # if more than 'MULTI' is in the command_stack, we can't UNWATCH anymore
if self.watching:
- if len(self.command_stack) > 1:
- raise RedisError("Can only UNWATCH before issuing "
- "pipeline commands")
response = self.execute_command('UNWATCH')
else:
response = True
diff --git a/tests/pipeline.py b/tests/pipeline.py
index ee3b3c5..82ca4bc 100644
--- a/tests/pipeline.py
+++ b/tests/pipeline.py
@@ -32,6 +32,31 @@ class PipelineTestCase(unittest.TestCase):
self.assertEquals(self.client['b'], 'b1')
self.assertEquals(self.client['c'], 'c1')
+ def test_pipeline_no_transaction_watch(self):
+ self.client.set('a', 0)
+
+ pipe = self.client.pipeline(transaction=False)
+ pipe.watch('a')
+ a = pipe.get('a')
+
+ pipe.multi()
+ pipe.set('a', int(a) + 1)
+ result = pipe.execute()
+ self.assertEquals(result, [True])
+
+ def test_pipeline_no_transaction_watch_failure(self):
+ self.client.set('a', 0)
+
+ pipe = self.client.pipeline(transaction=False)
+ pipe.watch('a')
+ a = pipe.get('a')
+
+ self.client.set('a', 'bad')
+
+ pipe.multi()
+ pipe.set('a', int(a) + 1)
+ self.assertRaises(redis.WatchError, pipe.execute)
+
def test_invalid_command_in_pipeline(self):
# all commands but the invalid one should be excuted correctly
self.client['c'] = 'a'