diff options
author | Randall Leeds <randall.leeds@gmail.com> | 2011-07-07 01:26:47 -0700 |
---|---|---|
committer | Randall Leeds <randall.leeds@gmail.com> | 2011-07-07 01:29:39 -0700 |
commit | fa3fac9e6e950f40dbd1e6b273dcedcc1c3cd557 (patch) | |
tree | 65abd73bc31e77bd89c3c929f745d2047430de52 /redis/client.py | |
parent | 194d29687dbb110d619b30274945832b2cab14d1 (diff) | |
download | redis-py-fa3fac9e6e950f40dbd1e6b273dcedcc1c3cd557.tar.gz |
fix #149 - hold connection for duration of WATCH
Diffstat (limited to 'redis/client.py')
-rw-r--r-- | redis/client.py | 32 |
1 files changed, 23 insertions, 9 deletions
diff --git a/redis/client.py b/redis/client.py index 93ff9a9..9af1e58 100644 --- a/redis/client.py +++ b/redis/client.py @@ -181,6 +181,7 @@ class Redis(object): }) connection_pool = ConnectionPool(**kwargs) self.connection_pool = connection_pool + self.connection = None self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy() @@ -196,11 +197,14 @@ class Redis(object): atomic, pipelines are useful for reducing the back-and-forth overhead between the client and server. """ + connection = self.connection + self.connection = None return Pipeline( self.connection_pool, self.response_callbacks, transaction, - shard_hint) + shard_hint, + connection) def lock(self, name, timeout=None, sleep=0.1): """ @@ -222,14 +226,15 @@ class Redis(object): subscribe to channels and listen for messages that get published to them. """ - return PubSub(self.connection_pool, shard_hint) + return PubSub(self.connection_pool, shard_hint, self.connection) #### COMMAND EXECUTION AND PROTOCOL PARSING #### def execute_command(self, *args, **options): "Execute a command and return a parsed response" pool = self.connection_pool command_name = args[0] - connection = pool.get_connection(command_name, **options) + connection = self.connection or \ + pool.get_connection(command_name, **options) try: connection.send_command(*args) return self.parse_response(connection, command_name, **options) @@ -238,7 +243,8 @@ class Redis(object): connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: - pool.release(connection) + if not self.connection: + pool.release(connection) def parse_response(self, connection, command_name, **options): "Parses a response from the Redis server" @@ -503,13 +509,18 @@ class Redis(object): """ Watches the values at keys ``names``, or None if the key doesn't exist """ + self.connection = self.connection or \ + self.connection_pool.get_connection('WATCH', *names) return self.execute_command('WATCH', *names) def unwatch(self): """ Unwatches the value at key ``name``, or None of the key doesn't exist """ - return self.execute_command('UNWATCH') + try: + return self.execute_command('UNWATCH') + finally: + self.connection = None #### LIST COMMANDS #### def blpop(self, keys, timeout=0): @@ -1039,10 +1050,10 @@ class PubSub(object): until a message arrives on one of the subscribed channels. That message will be returned and it's safe to start listening again. """ - def __init__(self, connection_pool, shard_hint=None): + def __init__(self, connection_pool, shard_hint=None, connection=None): self.connection_pool = connection_pool self.shard_hint = shard_hint - self.connection = None + self.connection = connection self.channels = set() self.patterns = set() self.subscription_count = 0 @@ -1175,8 +1186,9 @@ class Pipeline(Redis): on a key of a different datatype. """ def __init__(self, connection_pool, response_callbacks, transaction, - shard_hint): + shard_hint, connection=None): self.connection_pool = connection_pool + self.connection = connection self.response_callbacks = response_callbacks self.transaction = transaction self.shard_hint = shard_hint @@ -1250,13 +1262,15 @@ class Pipeline(Redis): execute = self._execute_pipeline stack = self.command_stack self.reset() - conn = self.connection_pool.get_connection('MULTI', self.shard_hint) + conn = self.connection or \ + self.connection_pool.get_connection('MULTI', self.shard_hint) try: return execute(conn, stack) except ConnectionError: conn.disconnect() return execute(conn, stack) finally: + self.connection = None self.connection_pool.release(conn) |