diff options
author | andy <andy@whiskeymedia.com> | 2011-07-17 15:15:15 -0700 |
---|---|---|
committer | andy <andy@whiskeymedia.com> | 2011-07-17 15:15:15 -0700 |
commit | 2e185683e310513d4efdcf9ec212115383f03aff (patch) | |
tree | 2b23bb4d72d3c45d7e3a07065d5e8cfdb7583f3c | |
parent | 2d428eca210154d07ab4260fdb1cccf14954295e (diff) | |
download | redis-py-2e185683e310513d4efdcf9ec212115383f03aff.tar.gz |
Pipelines can now be used as Context Managers. Thanks David Wolever. Fixes #160
-rw-r--r-- | CHANGES | 9 | ||||
-rw-r--r-- | README.md | 11 | ||||
-rw-r--r-- | redis/client.py | 6 | ||||
-rw-r--r-- | tests/pipeline.py | 152 |
4 files changed, 94 insertions, 84 deletions
@@ -1,6 +1,9 @@ -* 2.4.7 (in developmen) - * Fixed a bug where some connections we're getting released back to the - connection pool during pipelines. +* 2.4.7 (in development) + * Fixed a bug where some connections were not getting released back to the + connection pool after pipeline execution. + * Pipelines can now be used as context managers. This is the preferred way + of use to ensure that connections get cleaned up properly. Thanks + David Wolever. * 2.4.6 * Variadic arguments for SADD, SREM, ZREN, HDEL, LPUSH, and RPUSH. Thanks Raphaƫl Vinot. @@ -198,11 +198,12 @@ could do something like this: ... # our best bet is to just retry. ... continue -Note that, because the `Pipeline` must bind to a single connection for the -duration of a `watch`, care must be taken to ensure that he connection is -returned to the connection pool by calling the `reset()` method. If the -`Pipeline` is used as a context manager (as in the example above) `reset()` -will be called automatically... But it can also be called manually, like this: +Note that, because the Pipeline must bind to a single connection for the +duration of a WATCH, care must be taken to ensure that he connection is +returned to the connection pool by calling the reset() method. If the +Pipeline is used as a context manager (as in the example above) reset() +will be called automatically. Of course you can do this the manual way as by +explicity calling reset(): >>> pipe = r.pipeline() >>> while 1: diff --git a/redis/client.py b/redis/client.py index 10e5060..9ab467f 100644 --- a/redis/client.py +++ b/redis/client.py @@ -1196,6 +1196,12 @@ class Pipeline(Redis): self.watching = False self.reset() + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.reset() + def reset(self): self.command_stack = [] # make sure to reset the connection state in the event that we were diff --git a/tests/pipeline.py b/tests/pipeline.py index 82ca4bc..b199f1c 100644 --- a/tests/pipeline.py +++ b/tests/pipeline.py @@ -10,112 +10,112 @@ class PipelineTestCase(unittest.TestCase): self.client.flushdb() def test_pipeline(self): - pipe = self.client.pipeline() - pipe.set('a', 'a1').get('a').zadd('z', z1=1).zadd('z', z2=4) - pipe.zincrby('z', 'z1').zrange('z', 0, 5, withscores=True) - self.assertEquals(pipe.execute(), - [ - True, - 'a1', - True, - True, - 2.0, - [('z1', 2.0), ('z2', 4)], - ] - ) + with self.client.pipeline() as pipe: + pipe.set('a', 'a1').get('a').zadd('z', z1=1).zadd('z', z2=4) + pipe.zincrby('z', 'z1').zrange('z', 0, 5, withscores=True) + self.assertEquals(pipe.execute(), + [ + True, + 'a1', + True, + True, + 2.0, + [('z1', 2.0), ('z2', 4)], + ] + ) def test_pipeline_no_transaction(self): - pipe = self.client.pipeline(transaction=False) - pipe.set('a', 'a1').set('b', 'b1').set('c', 'c1') - self.assertEquals(pipe.execute(), [True, True, True]) - self.assertEquals(self.client['a'], 'a1') - self.assertEquals(self.client['b'], 'b1') - self.assertEquals(self.client['c'], 'c1') + with self.client.pipeline(transaction=False) as pipe: + pipe.set('a', 'a1').set('b', 'b1').set('c', 'c1') + self.assertEquals(pipe.execute(), [True, True, True]) + self.assertEquals(self.client['a'], 'a1') + self.assertEquals(self.client['b'], 'b1') + self.assertEquals(self.client['c'], 'c1') def test_pipeline_no_transaction_watch(self): self.client.set('a', 0) - pipe = self.client.pipeline(transaction=False) - pipe.watch('a') - a = pipe.get('a') + with self.client.pipeline(transaction=False) as pipe: + pipe.watch('a') + a = pipe.get('a') - pipe.multi() - pipe.set('a', int(a) + 1) - result = pipe.execute() - self.assertEquals(result, [True]) + pipe.multi() + pipe.set('a', int(a) + 1) + result = pipe.execute() + self.assertEquals(result, [True]) def test_pipeline_no_transaction_watch_failure(self): self.client.set('a', 0) - pipe = self.client.pipeline(transaction=False) - pipe.watch('a') - a = pipe.get('a') + with self.client.pipeline(transaction=False) as pipe: + pipe.watch('a') + a = pipe.get('a') - self.client.set('a', 'bad') + self.client.set('a', 'bad') - pipe.multi() - pipe.set('a', int(a) + 1) - self.assertRaises(redis.WatchError, pipe.execute) + pipe.multi() + pipe.set('a', int(a) + 1) + self.assertRaises(redis.WatchError, pipe.execute) def test_invalid_command_in_pipeline(self): # all commands but the invalid one should be excuted correctly self.client['c'] = 'a' - pipe = self.client.pipeline() - pipe.set('a', 1).set('b', 2).lpush('c', 3).set('d', 4) - result = pipe.execute() - - self.assertEquals(result[0], True) - self.assertEquals(self.client['a'], '1') - self.assertEquals(result[1], True) - self.assertEquals(self.client['b'], '2') - # we can't lpush to a key that's a string value, so this should - # be a ResponseError exception - self.assert_(isinstance(result[2], redis.ResponseError)) - self.assertEquals(self.client['c'], 'a') - self.assertEquals(result[3], True) - self.assertEquals(self.client['d'], '4') - - # make sure the pipe was restored to a working state - self.assertEquals(pipe.set('z', 'zzz').execute(), [True]) - self.assertEquals(self.client['z'], 'zzz') + with self.client.pipeline() as pipe: + pipe.set('a', 1).set('b', 2).lpush('c', 3).set('d', 4) + result = pipe.execute() + + self.assertEquals(result[0], True) + self.assertEquals(self.client['a'], '1') + self.assertEquals(result[1], True) + self.assertEquals(self.client['b'], '2') + # we can't lpush to a key that's a string value, so this should + # be a ResponseError exception + self.assert_(isinstance(result[2], redis.ResponseError)) + self.assertEquals(self.client['c'], 'a') + self.assertEquals(result[3], True) + self.assertEquals(self.client['d'], '4') + + # make sure the pipe was restored to a working state + self.assertEquals(pipe.set('z', 'zzz').execute(), [True]) + self.assertEquals(self.client['z'], 'zzz') def test_watch_succeed(self): self.client.set('a', 1) self.client.set('b', 2) - pipe = self.client.pipeline() - pipe.watch('a', 'b') - self.assertEquals(pipe.watching, True) - a = pipe.get('a') - b = pipe.get('b') - self.assertEquals(a, '1') - self.assertEquals(b, '2') - pipe.multi() + with self.client.pipeline() as pipe: + pipe.watch('a', 'b') + self.assertEquals(pipe.watching, True) + a = pipe.get('a') + b = pipe.get('b') + self.assertEquals(a, '1') + self.assertEquals(b, '2') + pipe.multi() - pipe.set('c', 3) - self.assertEquals(pipe.execute(), [True]) - self.assertEquals(pipe.watching, False) + pipe.set('c', 3) + self.assertEquals(pipe.execute(), [True]) + self.assertEquals(pipe.watching, False) def test_watch_failure(self): self.client.set('a', 1) self.client.set('b', 2) - pipe = self.client.pipeline() - pipe.watch('a', 'b') - self.client.set('b', 3) - pipe.multi() - pipe.get('a') - self.assertRaises(redis.WatchError, pipe.execute) - self.assertEquals(pipe.watching, False) + with self.client.pipeline() as pipe: + pipe.watch('a', 'b') + self.client.set('b', 3) + pipe.multi() + pipe.get('a') + self.assertRaises(redis.WatchError, pipe.execute) + self.assertEquals(pipe.watching, False) def test_unwatch(self): self.client.set('a', 1) self.client.set('b', 2) - pipe = self.client.pipeline() - pipe.watch('a', 'b') - self.client.set('b', 3) - pipe.unwatch() - self.assertEquals(pipe.watching, False) - pipe.get('a') - self.assertEquals(pipe.execute(), ['1']) + with self.client.pipeline() as pipe: + pipe.watch('a', 'b') + self.client.set('b', 3) + pipe.unwatch() + self.assertEquals(pipe.watching, False) + pipe.get('a') + self.assertEquals(pipe.execute(), ['1']) |