summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandall Leeds <randall@meebo-inc.com>2011-07-08 16:37:22 -0700
committerRandall Leeds <randall@meebo-inc.com>2011-07-08 16:37:22 -0700
commit82ca44f2572fdd04a63fa91158f4b6f6435527bc (patch)
tree2c426a830757c3e2e4e0dd05bf450fd55000af64
parentf8e73233da33356d3d16124593905b360c306e74 (diff)
parentafb33372ef8ce680b43980b352a8686bcc363bdb (diff)
downloadredis-py-82ca44f2572fdd04a63fa91158f4b6f6435527bc.tar.gz
Merge remote-tracking branch 'wolever/threadsafe_transactions' into watch_fixes
Conflicts: redis/client.py
-rw-r--r--redis/client.py154
1 files changed, 120 insertions, 34 deletions
diff --git a/redis/client.py b/redis/client.py
index 9af1e58..310f8f8 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -181,7 +181,6 @@ class Redis(object):
})
connection_pool = ConnectionPool(**kwargs)
self.connection_pool = connection_pool
- self.connection = None
self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy()
@@ -197,14 +196,11 @@ class Redis(object):
atomic, pipelines are useful for reducing the back-and-forth overhead
between the client and server.
"""
- connection = self.connection
- self.connection = None
return Pipeline(
self.connection_pool,
self.response_callbacks,
transaction,
- shard_hint,
- connection)
+ shard_hint)
def lock(self, name, timeout=None, sleep=0.1):
"""
@@ -226,15 +222,34 @@ class Redis(object):
subscribe to channels and listen for messages that get published to
them.
"""
- return PubSub(self.connection_pool, shard_hint, self.connection)
+ return PubSub(self.connection_pool, shard_hint)
+
+ def connection(self):
+ """
+ Returns an instance of ``RedisSingleConnection`` which is bound to one
+ connection, allowing transactional commands to run in a thread-safe
+ manner.
+
+ Note that, unlike ``Redis``, ``RedisSingleConnection`` may raise a
+ ``ConnectionError`` which should be handled by the caller.
+
+ >>> with redis.connection() as cxn:
+ ... cxn.watch('foo')
+ ... old_foo = cxn.get('foo')
+ ... cxn.multi()
+ ... cxn.set('foo', old_foo + 1)
+ ... cxn.execute()
+ ...
+ >>>
+ """
+ return RedisConnection(connection_pool=self.connection_pool)
#### COMMAND EXECUTION AND PROTOCOL PARSING ####
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
- connection = self.connection or \
- pool.get_connection(command_name, **options)
+ connection = pool.get_connection(command_name, **options)
try:
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
@@ -243,8 +258,7 @@ class Redis(object):
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
finally:
- if not self.connection:
- pool.release(connection)
+ pool.release(connection)
def parse_response(self, connection, command_name, **options):
"Parses a response from the Redis server"
@@ -505,23 +519,6 @@ class Redis(object):
"Returns the type of key ``name``"
return self.execute_command('TYPE', name)
- def watch(self, *names):
- """
- Watches the values at keys ``names``, or None if the key doesn't exist
- """
- self.connection = self.connection or \
- self.connection_pool.get_connection('WATCH', *names)
- return self.execute_command('WATCH', *names)
-
- def unwatch(self):
- """
- Unwatches the value at key ``name``, or None of the key doesn't exist
- """
- try:
- return self.execute_command('UNWATCH')
- finally:
- self.connection = None
-
#### LIST COMMANDS ####
def blpop(self, keys, timeout=0):
"""
@@ -1050,10 +1047,10 @@ class PubSub(object):
until a message arrives on one of the subscribed channels. That message
will be returned and it's safe to start listening again.
"""
- def __init__(self, connection_pool, shard_hint=None, connection=None):
+ def __init__(self, connection_pool, shard_hint=None):
self.connection_pool = connection_pool
self.shard_hint = shard_hint
- self.connection = connection
+ self.connection = None
self.channels = set()
self.patterns = set()
self.subscription_count = 0
@@ -1186,9 +1183,8 @@ class Pipeline(Redis):
on a key of a different datatype.
"""
def __init__(self, connection_pool, response_callbacks, transaction,
- shard_hint, connection=None):
+ shard_hint):
self.connection_pool = connection_pool
- self.connection = connection
self.response_callbacks = response_callbacks
self.transaction = transaction
self.shard_hint = shard_hint
@@ -1262,18 +1258,108 @@ class Pipeline(Redis):
execute = self._execute_pipeline
stack = self.command_stack
self.reset()
- conn = self.connection or \
- self.connection_pool.get_connection('MULTI', self.shard_hint)
+ conn = self.connection_pool.get_connection('MULTI', self.shard_hint)
try:
return execute(conn, stack)
except ConnectionError:
conn.disconnect()
return execute(conn, stack)
finally:
- self.connection = None
self.connection_pool.release(conn)
+class RedisConnection(Redis):
+ """
+ A ``Redis`` which is bound to one single connection, allowing transactional
+ commands to be run in a thread-safe manner.
+
+ Note that, unlike ``Redis``, ``RedisConnection`` may raise a
+ ``ConnectionError`` which should be handled by the caller.
+
+ See also: ``Redis.connection()``.
+ """
+
+ connection = None
+
+ def get_connection(self, command_name, options):
+ if self.connection is None:
+ # XXX: how is the 'command_name' used?
+ self.connection = self.connection_pool.get_connection(command_name,
+ **options)
+ return self.connection
+
+ def execute_command(self, *args, **options):
+ """
+ Execute a command and return a parsed response.
+
+ Note: unlike Redis.execute_command, this may raise a
+ ``ConnectionError``, which should be handled by the calling code.
+ """
+
+ command_name = args[0]
+ connection = self.get_connection(command_name, options)
+ connection.send_command(*args)
+ return self.parse_response(connection, command_name, **options)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+
+ def close(self):
+ "If a connection exists, return it to the connection pool."
+ if self.connection is not None:
+ # XXX: some logic could be added here to only call ``discard`` if
+ # ``multi`` or ``watch`` were issued.
+ self.discard()
+ self.connection_pool.release(self.connection)
+ self.connection = None
+
+ def pipeline(self, transaction=True, shard_hint=None):
+ # XXX: I don't think pipelines make any sense on a connection which is
+ # "bound" like this. Am I wrong in this?
+ raise Exception("not done yet")
+
+ def watch(self, *names):
+ """
+ Watches the values at keys ``names``, or None if the key doesn't exist
+ """
+ return self.execute_command('WATCH', *names)
+
+ def unwatch(self):
+ """
+ Unwatches the all watched keys.
+ """
+ return self.execute_command('UNWATCH')
+
+ def multi(self):
+ """
+ Marks the start of a transaction block.
+
+ All further commands will return None until ``execute`` is called.
+ """
+ self.execute_command('MULTI')
+
+ def execute(self):
+ """
+ Executes all commands which have been executed since the last ``multi``.
+
+ Returns a list of each command's result.
+ """
+ self.execute_command('EXEC')
+ # XXX: Need to collect the results and return them
+ # XXX: update the docs to note that the command is 'execute' not 'exec'.
+ raise Exception("not done yet")
+
+ def discard(self):
+ """
+ Discards all commands which have been executed since the last ``multi``.
+ """
+ self.execute_command('DISCARD')
+
+
+
class LockError(RedisError):
"Errors thrown from the Lock"
pass