summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authorRandall Leeds <randall.leeds@gmail.com>2011-07-07 01:26:47 -0700
committerRandall Leeds <randall.leeds@gmail.com>2011-07-07 01:29:39 -0700
commitfa3fac9e6e950f40dbd1e6b273dcedcc1c3cd557 (patch)
tree65abd73bc31e77bd89c3c929f745d2047430de52 /redis/client.py
parent194d29687dbb110d619b30274945832b2cab14d1 (diff)
downloadredis-py-fa3fac9e6e950f40dbd1e6b273dcedcc1c3cd557.tar.gz
fix #149 - hold connection for duration of WATCH
Diffstat (limited to 'redis/client.py')
-rw-r--r--redis/client.py32
1 files changed, 23 insertions, 9 deletions
diff --git a/redis/client.py b/redis/client.py
index 93ff9a9..9af1e58 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -181,6 +181,7 @@ class Redis(object):
})
connection_pool = ConnectionPool(**kwargs)
self.connection_pool = connection_pool
+ self.connection = None
self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy()
@@ -196,11 +197,14 @@ class Redis(object):
atomic, pipelines are useful for reducing the back-and-forth overhead
between the client and server.
"""
+ connection = self.connection
+ self.connection = None
return Pipeline(
self.connection_pool,
self.response_callbacks,
transaction,
- shard_hint)
+ shard_hint,
+ connection)
def lock(self, name, timeout=None, sleep=0.1):
"""
@@ -222,14 +226,15 @@ class Redis(object):
subscribe to channels and listen for messages that get published to
them.
"""
- return PubSub(self.connection_pool, shard_hint)
+ return PubSub(self.connection_pool, shard_hint, self.connection)
#### COMMAND EXECUTION AND PROTOCOL PARSING ####
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
- connection = pool.get_connection(command_name, **options)
+ connection = self.connection or \
+ pool.get_connection(command_name, **options)
try:
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
@@ -238,7 +243,8 @@ class Redis(object):
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
finally:
- pool.release(connection)
+ if not self.connection:
+ pool.release(connection)
def parse_response(self, connection, command_name, **options):
"Parses a response from the Redis server"
@@ -503,13 +509,18 @@ class Redis(object):
"""
Watches the values at keys ``names``, or None if the key doesn't exist
"""
+ self.connection = self.connection or \
+ self.connection_pool.get_connection('WATCH', *names)
return self.execute_command('WATCH', *names)
def unwatch(self):
"""
Unwatches the value at key ``name``, or None of the key doesn't exist
"""
- return self.execute_command('UNWATCH')
+ try:
+ return self.execute_command('UNWATCH')
+ finally:
+ self.connection = None
#### LIST COMMANDS ####
def blpop(self, keys, timeout=0):
@@ -1039,10 +1050,10 @@ class PubSub(object):
until a message arrives on one of the subscribed channels. That message
will be returned and it's safe to start listening again.
"""
- def __init__(self, connection_pool, shard_hint=None):
+ def __init__(self, connection_pool, shard_hint=None, connection=None):
self.connection_pool = connection_pool
self.shard_hint = shard_hint
- self.connection = None
+ self.connection = connection
self.channels = set()
self.patterns = set()
self.subscription_count = 0
@@ -1175,8 +1186,9 @@ class Pipeline(Redis):
on a key of a different datatype.
"""
def __init__(self, connection_pool, response_callbacks, transaction,
- shard_hint):
+ shard_hint, connection=None):
self.connection_pool = connection_pool
+ self.connection = connection
self.response_callbacks = response_callbacks
self.transaction = transaction
self.shard_hint = shard_hint
@@ -1250,13 +1262,15 @@ class Pipeline(Redis):
execute = self._execute_pipeline
stack = self.command_stack
self.reset()
- conn = self.connection_pool.get_connection('MULTI', self.shard_hint)
+ conn = self.connection or \
+ self.connection_pool.get_connection('MULTI', self.shard_hint)
try:
return execute(conn, stack)
except ConnectionError:
conn.disconnect()
return execute(conn, stack)
finally:
+ self.connection = None
self.connection_pool.release(conn)