diff options
author | Randall Leeds <randall@meebo-inc.com> | 2011-07-08 16:37:22 -0700 |
---|---|---|
committer | Randall Leeds <randall@meebo-inc.com> | 2011-07-08 16:37:22 -0700 |
commit | 82ca44f2572fdd04a63fa91158f4b6f6435527bc (patch) | |
tree | 2c426a830757c3e2e4e0dd05bf450fd55000af64 | |
parent | f8e73233da33356d3d16124593905b360c306e74 (diff) | |
parent | afb33372ef8ce680b43980b352a8686bcc363bdb (diff) | |
download | redis-py-82ca44f2572fdd04a63fa91158f4b6f6435527bc.tar.gz |
Merge remote-tracking branch 'wolever/threadsafe_transactions' into watch_fixes
Conflicts:
redis/client.py
-rw-r--r-- | redis/client.py | 154 |
1 files changed, 120 insertions, 34 deletions
diff --git a/redis/client.py b/redis/client.py index 9af1e58..310f8f8 100644 --- a/redis/client.py +++ b/redis/client.py @@ -181,7 +181,6 @@ class Redis(object): }) connection_pool = ConnectionPool(**kwargs) self.connection_pool = connection_pool - self.connection = None self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy() @@ -197,14 +196,11 @@ class Redis(object): atomic, pipelines are useful for reducing the back-and-forth overhead between the client and server. """ - connection = self.connection - self.connection = None return Pipeline( self.connection_pool, self.response_callbacks, transaction, - shard_hint, - connection) + shard_hint) def lock(self, name, timeout=None, sleep=0.1): """ @@ -226,15 +222,34 @@ class Redis(object): subscribe to channels and listen for messages that get published to them. """ - return PubSub(self.connection_pool, shard_hint, self.connection) + return PubSub(self.connection_pool, shard_hint) + + def connection(self): + """ + Returns an instance of ``RedisSingleConnection`` which is bound to one + connection, allowing transactional commands to run in a thread-safe + manner. + + Note that, unlike ``Redis``, ``RedisSingleConnection`` may raise a + ``ConnectionError`` which should be handled by the caller. + + >>> with redis.connection() as cxn: + ... cxn.watch('foo') + ... old_foo = cxn.get('foo') + ... cxn.multi() + ... cxn.set('foo', old_foo + 1) + ... cxn.execute() + ... + >>> + """ + return RedisConnection(connection_pool=self.connection_pool) #### COMMAND EXECUTION AND PROTOCOL PARSING #### def execute_command(self, *args, **options): "Execute a command and return a parsed response" pool = self.connection_pool command_name = args[0] - connection = self.connection or \ - pool.get_connection(command_name, **options) + connection = pool.get_connection(command_name, **options) try: connection.send_command(*args) return self.parse_response(connection, command_name, **options) @@ -243,8 +258,7 @@ class Redis(object): connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: - if not self.connection: - pool.release(connection) + pool.release(connection) def parse_response(self, connection, command_name, **options): "Parses a response from the Redis server" @@ -505,23 +519,6 @@ class Redis(object): "Returns the type of key ``name``" return self.execute_command('TYPE', name) - def watch(self, *names): - """ - Watches the values at keys ``names``, or None if the key doesn't exist - """ - self.connection = self.connection or \ - self.connection_pool.get_connection('WATCH', *names) - return self.execute_command('WATCH', *names) - - def unwatch(self): - """ - Unwatches the value at key ``name``, or None of the key doesn't exist - """ - try: - return self.execute_command('UNWATCH') - finally: - self.connection = None - #### LIST COMMANDS #### def blpop(self, keys, timeout=0): """ @@ -1050,10 +1047,10 @@ class PubSub(object): until a message arrives on one of the subscribed channels. That message will be returned and it's safe to start listening again. """ - def __init__(self, connection_pool, shard_hint=None, connection=None): + def __init__(self, connection_pool, shard_hint=None): self.connection_pool = connection_pool self.shard_hint = shard_hint - self.connection = connection + self.connection = None self.channels = set() self.patterns = set() self.subscription_count = 0 @@ -1186,9 +1183,8 @@ class Pipeline(Redis): on a key of a different datatype. """ def __init__(self, connection_pool, response_callbacks, transaction, - shard_hint, connection=None): + shard_hint): self.connection_pool = connection_pool - self.connection = connection self.response_callbacks = response_callbacks self.transaction = transaction self.shard_hint = shard_hint @@ -1262,18 +1258,108 @@ class Pipeline(Redis): execute = self._execute_pipeline stack = self.command_stack self.reset() - conn = self.connection or \ - self.connection_pool.get_connection('MULTI', self.shard_hint) + conn = self.connection_pool.get_connection('MULTI', self.shard_hint) try: return execute(conn, stack) except ConnectionError: conn.disconnect() return execute(conn, stack) finally: - self.connection = None self.connection_pool.release(conn) +class RedisConnection(Redis): + """ + A ``Redis`` which is bound to one single connection, allowing transactional + commands to be run in a thread-safe manner. + + Note that, unlike ``Redis``, ``RedisConnection`` may raise a + ``ConnectionError`` which should be handled by the caller. + + See also: ``Redis.connection()``. + """ + + connection = None + + def get_connection(self, command_name, options): + if self.connection is None: + # XXX: how is the 'command_name' used? + self.connection = self.connection_pool.get_connection(command_name, + **options) + return self.connection + + def execute_command(self, *args, **options): + """ + Execute a command and return a parsed response. + + Note: unlike Redis.execute_command, this may raise a + ``ConnectionError``, which should be handled by the calling code. + """ + + command_name = args[0] + connection = self.get_connection(command_name, options) + connection.send_command(*args) + return self.parse_response(connection, command_name, **options) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def close(self): + "If a connection exists, return it to the connection pool." + if self.connection is not None: + # XXX: some logic could be added here to only call ``discard`` if + # ``multi`` or ``watch`` were issued. + self.discard() + self.connection_pool.release(self.connection) + self.connection = None + + def pipeline(self, transaction=True, shard_hint=None): + # XXX: I don't think pipelines make any sense on a connection which is + # "bound" like this. Am I wrong in this? + raise Exception("not done yet") + + def watch(self, *names): + """ + Watches the values at keys ``names``, or None if the key doesn't exist + """ + return self.execute_command('WATCH', *names) + + def unwatch(self): + """ + Unwatches the all watched keys. + """ + return self.execute_command('UNWATCH') + + def multi(self): + """ + Marks the start of a transaction block. + + All further commands will return None until ``execute`` is called. + """ + self.execute_command('MULTI') + + def execute(self): + """ + Executes all commands which have been executed since the last ``multi``. + + Returns a list of each command's result. + """ + self.execute_command('EXEC') + # XXX: Need to collect the results and return them + # XXX: update the docs to note that the command is 'execute' not 'exec'. + raise Exception("not done yet") + + def discard(self): + """ + Discards all commands which have been executed since the last ``multi``. + """ + self.execute_command('DISCARD') + + + class LockError(RedisError): "Errors thrown from the Lock" pass |