diff options
author | Randall Leeds <randall.leeds@gmail.com> | 2011-07-11 19:22:13 -0700 |
---|---|---|
committer | Randall Leeds <randall.leeds@gmail.com> | 2011-07-11 19:25:19 -0700 |
commit | 4bc9b77431955ba317318410ce70f9b38367ee3a (patch) | |
tree | 9d29a6503b8097646215c207880ea01e8cff3f9a | |
parent | 82ca44f2572fdd04a63fa91158f4b6f6435527bc (diff) | |
parent | 964196837ef7870b858f5a53c388eec13a059d51 (diff) | |
download | redis-py-4bc9b77431955ba317318410ce70f9b38367ee3a.tar.gz |
Merge remote-tracking branch 'andymccurdy/watch' into watch_fixes
Conflicts:
redis/client.py
tests/server_commands.py
-rw-r--r-- | CHANGES | 11 | ||||
-rw-r--r-- | redis/client.py | 235 | ||||
-rw-r--r-- | redis/connection.py | 6 | ||||
-rw-r--r-- | tests/pipeline.py | 54 | ||||
-rw-r--r-- | tests/server_commands.py | 6 |
5 files changed, 193 insertions, 119 deletions
@@ -1,6 +1,17 @@ * 2.4.6 (in development) * Variadic arguments for SADD, SREM, ZREN, HDEL, LPUSH, and RPUSH. Thanks Raphaƫl Vinot. + * Fixed an error in the Hiredis parser that occasionally caused the + socket connection to become corrupted and unusable. This became noticeable + once connection pools started to be used. + * ZRANGE, ZREVRANGE, ZRANGEBYSCORE, and ZREVRANGEBYSCORE now take an + additional optional argument, score_cast_func, which is a callable used + to cast the score value in the return type. The default is float. + * Removed the PUBLISH method from the PubSub class. Connections that are + [P]SUBSCRIBEd cannot issue PUBLISH commands, so it doesn't make to have + it here. + * Pipelines now contain WATCH and UNWATCH. Calling WATCH or UNWATCH from + the base client class will result in a deprecation warning. * 2.4.5 * The PythonParser now works better when reading zero length strings. * 2.4.4 diff --git a/redis/client.py b/redis/client.py index 310f8f8..21f490e 100644 --- a/redis/client.py +++ b/redis/client.py @@ -81,8 +81,9 @@ def zset_score_pairs(response, **options): """ if not response or not options['withscores']: return response + score_cast_func = options.get('score_cast_func', float) it = iter(response) - return zip(it, imap(float, it)) + return zip(it, imap(score_cast_func, it)) def int_or_none(response): if response is None: @@ -519,6 +520,18 @@ class Redis(object): "Returns the type of key ``name``" return self.execute_command('TYPE', name) + def watch(self, *names): + """ + Watches the values at keys ``names``, or None if the key doesn't exist + """ + warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object')) + + def unwatch(self): + """ + Unwatches the value at key ``name``, or None of the key doesn't exist + """ + warnings.warn(DeprecationWarning('Call UNWATCH from a Pipeline object')) + #### LIST COMMANDS #### def blpop(self, keys, timeout=0): """ @@ -829,27 +842,31 @@ class Redis(object): """ return self._zaggregate('ZINTERSTORE', dest, keys, aggregate) - def zrange(self, name, start, end, desc=False, withscores=False): + def zrange(self, name, start, end, desc=False, withscores=False, + score_cast_func=float): """ Return a range of values from sorted set ``name`` between ``start`` and ``end`` sorted in ascending order. ``start`` and ``end`` can be negative, indicating the end of the range. - ``desc`` indicates to sort in descending order. + ``desc`` a boolean indicating whether to sort the results descendingly ``withscores`` indicates to return the scores along with the values. The return type is a list of (value, score) pairs + + ``score_cast_func`` a callable used to cast the score return value """ if desc: return self.zrevrange(name, start, end, withscores) pieces = ['ZRANGE', name, start, end] if withscores: pieces.append('withscores') - return self.execute_command(*pieces, **{'withscores': withscores}) + options = {'withscores': withscores, 'score_cast_func': score_cast_func} + return self.execute_command(*pieces, **options) def zrangebyscore(self, name, min, max, - start=None, num=None, withscores=False): + start=None, num=None, withscores=False, score_cast_func=float): """ Return a range of values from the sorted set ``name`` with scores between ``min`` and ``max``. @@ -859,6 +876,8 @@ class Redis(object): ``withscores`` indicates to return the scores along with the values. The return type is a list of (value, score) pairs + + `score_cast_func`` a callable used to cast the score return value """ if (start is not None and num is None) or \ (num is not None and start is None): @@ -868,7 +887,8 @@ class Redis(object): pieces.extend(['LIMIT', start, num]) if withscores: pieces.append('withscores') - return self.execute_command(*pieces, **{'withscores': withscores}) + options = {'withscores': withscores, 'score_cast_func': score_cast_func} + return self.execute_command(*pieces, **options) def zrank(self, name, value): """ @@ -897,7 +917,8 @@ class Redis(object): """ return self.execute_command('ZREMRANGEBYSCORE', name, min, max) - def zrevrange(self, name, start, num, withscores=False): + def zrevrange(self, name, start, num, withscores=False, + score_cast_func=float): """ Return a range of values from sorted set ``name`` between ``start`` and ``num`` sorted in descending order. @@ -906,14 +927,17 @@ class Redis(object): ``withscores`` indicates to return the scores along with the values The return type is a list of (value, score) pairs + + ``score_cast_func`` a callable used to cast the score return value """ pieces = ['ZREVRANGE', name, start, num] if withscores: pieces.append('withscores') - return self.execute_command(*pieces, **{'withscores': withscores}) + options = {'withscores': withscores, 'score_cast_func': score_cast_func} + return self.execute_command(*pieces, **options) def zrevrangebyscore(self, name, max, min, - start=None, num=None, withscores=False): + start=None, num=None, withscores=False, score_cast_func=float): """ Return a range of values from the sorted set ``name`` with scores between ``min`` and ``max`` in descending order. @@ -923,6 +947,8 @@ class Redis(object): ``withscores`` indicates to return the scores along with the values. The return type is a list of (value, score) pairs + + ``score_cast_func`` a callable used to cast the score return value """ if (start is not None and num is None) or \ (num is not None and start is None): @@ -932,7 +958,8 @@ class Redis(object): pieces.extend(['LIMIT', start, num]) if withscores: pieces.append('withscores') - return self.execute_command(*pieces, **{'withscores': withscores}) + options = {'withscores': withscores, 'score_cast_func': score_cast_func} + return self.execute_command(*pieces, **options) def zrevrank(self, name, value): """ @@ -1136,13 +1163,6 @@ class PubSub(object): pass return self.execute_command('UNSUBSCRIBE', *channels) - def publish(self, channel, message): - """ - Publish ``message`` on ``channel``. - Returns the number of subscribers the message was delivered to. - """ - return self.execute_command('PUBLISH', channel, message) - def listen(self): "Listen for messages on channels this client has been subscribed to" while self.subscription_count: @@ -1185,17 +1205,76 @@ class Pipeline(Redis): def __init__(self, connection_pool, response_callbacks, transaction, shard_hint): self.connection_pool = connection_pool + self.connection = None self.response_callbacks = response_callbacks self.transaction = transaction self.shard_hint = shard_hint + + self._real_exec = self.default_execute_command + self._pipe_exec = self.pipeline_execute_command + self._watching = False self.reset() + def _get_watch(self): + return self._watching + + def _set_watch(self, value): + self._watching = value + self.execute_command = value and self._real_exec or self._pipe_exec + + watching = property(_get_watch, _set_watch) + def reset(self): self.command_stack = [] + # make sure to reset the connection state in the event that we were + # watching something + if self.watching and self.connection: + try: + # call this manually since our unwatch or + # default_execute_command methods can call reset() + self.connection.send_command('UNWATCH') + self.connection.read_response() + except ConnectionError: + # disconnect will also remove any previous WATCHes + self.connection.disconnect() + # clean up the other instance attributes + self.watching = False if self.transaction: self.execute_command('MULTI') + # we can safely return the connection to the pool here since we're + # sure we're no longer WATCHing anything + if self.connection: + self.connection_pool.release(self.connection) + self.connection = None - def execute_command(self, *args, **options): + def multi(self): + """ + Start a transactional block of the pipeline after WATCH commands + are issued. End the transactional block with `execute`. + """ + self.execute_command = self._pipe_exec + + def default_execute_command(self, *args, **options): + """ + Execute a command, but don't auto-retry on a ConnectionError. Used + when issuing WATCH or subsequent commands retrieving their values + but before MULTI is called. + """ + command_name = args[0] + conn = self.connection + # if this is the first call, we need a connection + if not conn: + conn = self.connection_pool.get_connection(command_name, + self.shard_hint) + self.connection = conn + try: + conn.send_command(*args) + return self.parse_response(conn, command_name, **options) + except ConnectionError: + self.reset() + raise + + def pipeline_execute_command(self, *args, **options): """ Stage a command to be executed when execute() is next called @@ -1229,7 +1308,7 @@ class Pipeline(Redis): if len(response) != len(commands): raise ResponseError("Wrong number of response items from " - "pipeline execution") + "pipeline execution") # We have to run response callbacks manually data = [] for r, cmd in izip(response, commands): @@ -1242,7 +1321,7 @@ class Pipeline(Redis): return data def _execute_pipeline(self, connection, commands): - # build up all commands into a single request to increase network perf + # build up all commands into a single request to increase network perf all_cmds = ''.join(starmap(connection.pack_command, [args for args, options in commands])) connection.send_packed_command(all_cmds) @@ -1257,108 +1336,50 @@ class Pipeline(Redis): else: execute = self._execute_pipeline stack = self.command_stack - self.reset() - conn = self.connection_pool.get_connection('MULTI', self.shard_hint) + conn = self.connection or \ + self.connection_pool.get_connection('MULTI', self.shard_hint) try: return execute(conn, stack) except ConnectionError: conn.disconnect() + # if we watching a variable, the watch is no longer valid since + # this conncetion has died. + if self.watching: + raise WatchError("Watched variable changed.") return execute(conn, stack) finally: - self.connection_pool.release(conn) - - -class RedisConnection(Redis): - """ - A ``Redis`` which is bound to one single connection, allowing transactional - commands to be run in a thread-safe manner. - - Note that, unlike ``Redis``, ``RedisConnection`` may raise a - ``ConnectionError`` which should be handled by the caller. - - See also: ``Redis.connection()``. - """ - - connection = None - - def get_connection(self, command_name, options): - if self.connection is None: - # XXX: how is the 'command_name' used? - self.connection = self.connection_pool.get_connection(command_name, - **options) - return self.connection - - def execute_command(self, *args, **options): - """ - Execute a command and return a parsed response. - - Note: unlike Redis.execute_command, this may raise a - ``ConnectionError``, which should be handled by the calling code. - """ - - command_name = args[0] - connection = self.get_connection(command_name, options) - connection.send_command(*args) - return self.parse_response(connection, command_name, **options) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - - def close(self): - "If a connection exists, return it to the connection pool." - if self.connection is not None: - # XXX: some logic could be added here to only call ``discard`` if - # ``multi`` or ``watch`` were issued. - self.discard() - self.connection_pool.release(self.connection) - self.connection = None - - def pipeline(self, transaction=True, shard_hint=None): - # XXX: I don't think pipelines make any sense on a connection which is - # "bound" like this. Am I wrong in this? - raise Exception("not done yet") + self.reset() def watch(self, *names): """ - Watches the values at keys ``names``, or None if the key doesn't exist + Watches the values at keys ``names`` """ + if not self.transaction: + raise RedisError("Can only WATCH when using transactions") + # if more than 'MULTI' is in the command_stack, we can't WATCH anymore + if self.watching and len(self.command_stack) > 1: + raise RedisError("Can only WATCH before issuing pipeline commands") + self.watching = True return self.execute_command('WATCH', *names) def unwatch(self): """ - Unwatches the all watched keys. - """ - return self.execute_command('UNWATCH') - - def multi(self): - """ - Marks the start of a transaction block. - - All further commands will return None until ``execute`` is called. - """ - self.execute_command('MULTI') - - def execute(self): + Unwatches all previously specified keys """ - Executes all commands which have been executed since the last ``multi``. - - Returns a list of each command's result. - """ - self.execute_command('EXEC') - # XXX: Need to collect the results and return them - # XXX: update the docs to note that the command is 'execute' not 'exec'. - raise Exception("not done yet") - - def discard(self): - """ - Discards all commands which have been executed since the last ``multi``. - """ - self.execute_command('DISCARD') - - + if not self.transaction: + raise RedisError("Can only UNWATCH when using transactions") + # if more than 'MULTI' is in the command_stack, we can't UNWATCH anymore + if self.watching: + if len(self.command_stack) > 1: + raise RedisError("Can only UNWATCH before issuing " + "pipeline commands") + response = self.execute_command('UNWATCH') + else: + response = True + # it's safe to reset() here because we are no longer bound to a + # single connection and we're sure the command stack is empty. + self.reset() + return response class LockError(RedisError): "Errors thrown from the Lock" diff --git a/redis/connection.py b/redis/connection.py index 2b48e32..ff7f277 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -89,9 +89,9 @@ class HiredisParser(object): if not buffer: raise ConnectionError("Socket closed on remote end") self._reader.feed(buffer) - # if the data received doesn't end with \r\n, then there's more in - # the socket - if not buffer.endswith('\r\n'): + # proactively, but not conclusively, check if more data is in the + # buffer. if the data received doesn't end with \n, there's more. + if not buffer.endswith('\n'): continue response = self._reader.gets() return response diff --git a/tests/pipeline.py b/tests/pipeline.py index 6eda4f2..ee3b3c5 100644 --- a/tests/pipeline.py +++ b/tests/pipeline.py @@ -24,6 +24,14 @@ class PipelineTestCase(unittest.TestCase): ] ) + def test_pipeline_no_transaction(self): + pipe = self.client.pipeline(transaction=False) + pipe.set('a', 'a1').set('b', 'b1').set('c', 'c1') + self.assertEquals(pipe.execute(), [True, True, True]) + self.assertEquals(self.client['a'], 'a1') + self.assertEquals(self.client['b'], 'b1') + self.assertEquals(self.client['c'], 'c1') + def test_invalid_command_in_pipeline(self): # all commands but the invalid one should be excuted correctly self.client['c'] = 'a' @@ -46,11 +54,43 @@ class PipelineTestCase(unittest.TestCase): self.assertEquals(pipe.set('z', 'zzz').execute(), [True]) self.assertEquals(self.client['z'], 'zzz') - def test_pipeline_no_transaction(self): - pipe = self.client.pipeline(transaction=False) - pipe.set('a', 'a1').set('b', 'b1').set('c', 'c1') - self.assertEquals(pipe.execute(), [True, True, True]) - self.assertEquals(self.client['a'], 'a1') - self.assertEquals(self.client['b'], 'b1') - self.assertEquals(self.client['c'], 'c1') + def test_watch_succeed(self): + self.client.set('a', 1) + self.client.set('b', 2) + + pipe = self.client.pipeline() + pipe.watch('a', 'b') + self.assertEquals(pipe.watching, True) + a = pipe.get('a') + b = pipe.get('b') + self.assertEquals(a, '1') + self.assertEquals(b, '2') + pipe.multi() + + pipe.set('c', 3) + self.assertEquals(pipe.execute(), [True]) + self.assertEquals(pipe.watching, False) + def test_watch_failure(self): + self.client.set('a', 1) + self.client.set('b', 2) + + pipe = self.client.pipeline() + pipe.watch('a', 'b') + self.client.set('b', 3) + pipe.multi() + pipe.get('a') + self.assertRaises(redis.WatchError, pipe.execute) + self.assertEquals(pipe.watching, False) + + def test_unwatch(self): + self.client.set('a', 1) + self.client.set('b', 2) + + pipe = self.client.pipeline() + pipe.watch('a', 'b') + self.client.set('b', 3) + pipe.unwatch() + self.assertEquals(pipe.watching, False) + pipe.get('a') + self.assertEquals(pipe.execute(), ['1']) diff --git a/tests/server_commands.py b/tests/server_commands.py index c7b58ee..6e9fe99 100644 --- a/tests/server_commands.py +++ b/tests/server_commands.py @@ -800,7 +800,6 @@ class ServerCommandsTestCase(unittest.TestCase): [('a3', 20), ('a1', 23)] ) - def test_zrange(self): # key is not a zset self.client['a'] = 'a' @@ -814,10 +813,13 @@ class ServerCommandsTestCase(unittest.TestCase): [('a1', 1.0), ('a2', 2.0)]) self.assertEquals(self.client.zrange('a', 1, 2, withscores=True), [('a2', 2.0), ('a3', 3.0)]) + # test a custom score casting function returns the correct value + self.assertEquals( + self.client.zrange('a', 0, 1, withscores=True, score_cast_func=int), + [('a1', 1), ('a2', 2)]) # a non existant key should return empty list self.assertEquals(self.client.zrange('b', 0, 1, withscores=True), []) - def test_zrangebyscore(self): # key is not a zset self.client['a'] = 'a' |