summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandall Leeds <randall.leeds@gmail.com>2011-07-11 19:22:13 -0700
committerRandall Leeds <randall.leeds@gmail.com>2011-07-11 19:25:19 -0700
commit4bc9b77431955ba317318410ce70f9b38367ee3a (patch)
tree9d29a6503b8097646215c207880ea01e8cff3f9a
parent82ca44f2572fdd04a63fa91158f4b6f6435527bc (diff)
parent964196837ef7870b858f5a53c388eec13a059d51 (diff)
downloadredis-py-4bc9b77431955ba317318410ce70f9b38367ee3a.tar.gz
Merge remote-tracking branch 'andymccurdy/watch' into watch_fixes
Conflicts: redis/client.py tests/server_commands.py
-rw-r--r--CHANGES11
-rw-r--r--redis/client.py235
-rw-r--r--redis/connection.py6
-rw-r--r--tests/pipeline.py54
-rw-r--r--tests/server_commands.py6
5 files changed, 193 insertions, 119 deletions
diff --git a/CHANGES b/CHANGES
index 64797b2..72f1444 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,17 @@
* 2.4.6 (in development)
* Variadic arguments for SADD, SREM, ZREN, HDEL, LPUSH, and RPUSH. Thanks
Raphaƫl Vinot.
+ * Fixed an error in the Hiredis parser that occasionally caused the
+ socket connection to become corrupted and unusable. This became noticeable
+ once connection pools started to be used.
+ * ZRANGE, ZREVRANGE, ZRANGEBYSCORE, and ZREVRANGEBYSCORE now take an
+ additional optional argument, score_cast_func, which is a callable used
+ to cast the score value in the return type. The default is float.
+ * Removed the PUBLISH method from the PubSub class. Connections that are
+ [P]SUBSCRIBEd cannot issue PUBLISH commands, so it doesn't make to have
+ it here.
+ * Pipelines now contain WATCH and UNWATCH. Calling WATCH or UNWATCH from
+ the base client class will result in a deprecation warning.
* 2.4.5
* The PythonParser now works better when reading zero length strings.
* 2.4.4
diff --git a/redis/client.py b/redis/client.py
index 310f8f8..21f490e 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -81,8 +81,9 @@ def zset_score_pairs(response, **options):
"""
if not response or not options['withscores']:
return response
+ score_cast_func = options.get('score_cast_func', float)
it = iter(response)
- return zip(it, imap(float, it))
+ return zip(it, imap(score_cast_func, it))
def int_or_none(response):
if response is None:
@@ -519,6 +520,18 @@ class Redis(object):
"Returns the type of key ``name``"
return self.execute_command('TYPE', name)
+ def watch(self, *names):
+ """
+ Watches the values at keys ``names``, or None if the key doesn't exist
+ """
+ warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))
+
+ def unwatch(self):
+ """
+ Unwatches the value at key ``name``, or None of the key doesn't exist
+ """
+ warnings.warn(DeprecationWarning('Call UNWATCH from a Pipeline object'))
+
#### LIST COMMANDS ####
def blpop(self, keys, timeout=0):
"""
@@ -829,27 +842,31 @@ class Redis(object):
"""
return self._zaggregate('ZINTERSTORE', dest, keys, aggregate)
- def zrange(self, name, start, end, desc=False, withscores=False):
+ def zrange(self, name, start, end, desc=False, withscores=False,
+ score_cast_func=float):
"""
Return a range of values from sorted set ``name`` between
``start`` and ``end`` sorted in ascending order.
``start`` and ``end`` can be negative, indicating the end of the range.
- ``desc`` indicates to sort in descending order.
+ ``desc`` a boolean indicating whether to sort the results descendingly
``withscores`` indicates to return the scores along with the values.
The return type is a list of (value, score) pairs
+
+ ``score_cast_func`` a callable used to cast the score return value
"""
if desc:
return self.zrevrange(name, start, end, withscores)
pieces = ['ZRANGE', name, start, end]
if withscores:
pieces.append('withscores')
- return self.execute_command(*pieces, **{'withscores': withscores})
+ options = {'withscores': withscores, 'score_cast_func': score_cast_func}
+ return self.execute_command(*pieces, **options)
def zrangebyscore(self, name, min, max,
- start=None, num=None, withscores=False):
+ start=None, num=None, withscores=False, score_cast_func=float):
"""
Return a range of values from the sorted set ``name`` with scores
between ``min`` and ``max``.
@@ -859,6 +876,8 @@ class Redis(object):
``withscores`` indicates to return the scores along with the values.
The return type is a list of (value, score) pairs
+
+ `score_cast_func`` a callable used to cast the score return value
"""
if (start is not None and num is None) or \
(num is not None and start is None):
@@ -868,7 +887,8 @@ class Redis(object):
pieces.extend(['LIMIT', start, num])
if withscores:
pieces.append('withscores')
- return self.execute_command(*pieces, **{'withscores': withscores})
+ options = {'withscores': withscores, 'score_cast_func': score_cast_func}
+ return self.execute_command(*pieces, **options)
def zrank(self, name, value):
"""
@@ -897,7 +917,8 @@ class Redis(object):
"""
return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
- def zrevrange(self, name, start, num, withscores=False):
+ def zrevrange(self, name, start, num, withscores=False,
+ score_cast_func=float):
"""
Return a range of values from sorted set ``name`` between
``start`` and ``num`` sorted in descending order.
@@ -906,14 +927,17 @@ class Redis(object):
``withscores`` indicates to return the scores along with the values
The return type is a list of (value, score) pairs
+
+ ``score_cast_func`` a callable used to cast the score return value
"""
pieces = ['ZREVRANGE', name, start, num]
if withscores:
pieces.append('withscores')
- return self.execute_command(*pieces, **{'withscores': withscores})
+ options = {'withscores': withscores, 'score_cast_func': score_cast_func}
+ return self.execute_command(*pieces, **options)
def zrevrangebyscore(self, name, max, min,
- start=None, num=None, withscores=False):
+ start=None, num=None, withscores=False, score_cast_func=float):
"""
Return a range of values from the sorted set ``name`` with scores
between ``min`` and ``max`` in descending order.
@@ -923,6 +947,8 @@ class Redis(object):
``withscores`` indicates to return the scores along with the values.
The return type is a list of (value, score) pairs
+
+ ``score_cast_func`` a callable used to cast the score return value
"""
if (start is not None and num is None) or \
(num is not None and start is None):
@@ -932,7 +958,8 @@ class Redis(object):
pieces.extend(['LIMIT', start, num])
if withscores:
pieces.append('withscores')
- return self.execute_command(*pieces, **{'withscores': withscores})
+ options = {'withscores': withscores, 'score_cast_func': score_cast_func}
+ return self.execute_command(*pieces, **options)
def zrevrank(self, name, value):
"""
@@ -1136,13 +1163,6 @@ class PubSub(object):
pass
return self.execute_command('UNSUBSCRIBE', *channels)
- def publish(self, channel, message):
- """
- Publish ``message`` on ``channel``.
- Returns the number of subscribers the message was delivered to.
- """
- return self.execute_command('PUBLISH', channel, message)
-
def listen(self):
"Listen for messages on channels this client has been subscribed to"
while self.subscription_count:
@@ -1185,17 +1205,76 @@ class Pipeline(Redis):
def __init__(self, connection_pool, response_callbacks, transaction,
shard_hint):
self.connection_pool = connection_pool
+ self.connection = None
self.response_callbacks = response_callbacks
self.transaction = transaction
self.shard_hint = shard_hint
+
+ self._real_exec = self.default_execute_command
+ self._pipe_exec = self.pipeline_execute_command
+ self._watching = False
self.reset()
+ def _get_watch(self):
+ return self._watching
+
+ def _set_watch(self, value):
+ self._watching = value
+ self.execute_command = value and self._real_exec or self._pipe_exec
+
+ watching = property(_get_watch, _set_watch)
+
def reset(self):
self.command_stack = []
+ # make sure to reset the connection state in the event that we were
+ # watching something
+ if self.watching and self.connection:
+ try:
+ # call this manually since our unwatch or
+ # default_execute_command methods can call reset()
+ self.connection.send_command('UNWATCH')
+ self.connection.read_response()
+ except ConnectionError:
+ # disconnect will also remove any previous WATCHes
+ self.connection.disconnect()
+ # clean up the other instance attributes
+ self.watching = False
if self.transaction:
self.execute_command('MULTI')
+ # we can safely return the connection to the pool here since we're
+ # sure we're no longer WATCHing anything
+ if self.connection:
+ self.connection_pool.release(self.connection)
+ self.connection = None
- def execute_command(self, *args, **options):
+ def multi(self):
+ """
+ Start a transactional block of the pipeline after WATCH commands
+ are issued. End the transactional block with `execute`.
+ """
+ self.execute_command = self._pipe_exec
+
+ def default_execute_command(self, *args, **options):
+ """
+ Execute a command, but don't auto-retry on a ConnectionError. Used
+ when issuing WATCH or subsequent commands retrieving their values
+ but before MULTI is called.
+ """
+ command_name = args[0]
+ conn = self.connection
+ # if this is the first call, we need a connection
+ if not conn:
+ conn = self.connection_pool.get_connection(command_name,
+ self.shard_hint)
+ self.connection = conn
+ try:
+ conn.send_command(*args)
+ return self.parse_response(conn, command_name, **options)
+ except ConnectionError:
+ self.reset()
+ raise
+
+ def pipeline_execute_command(self, *args, **options):
"""
Stage a command to be executed when execute() is next called
@@ -1229,7 +1308,7 @@ class Pipeline(Redis):
if len(response) != len(commands):
raise ResponseError("Wrong number of response items from "
- "pipeline execution")
+ "pipeline execution")
# We have to run response callbacks manually
data = []
for r, cmd in izip(response, commands):
@@ -1242,7 +1321,7 @@ class Pipeline(Redis):
return data
def _execute_pipeline(self, connection, commands):
- # build up all commands into a single request to increase network perf
+ # build up all commands into a single request to increase network perf
all_cmds = ''.join(starmap(connection.pack_command,
[args for args, options in commands]))
connection.send_packed_command(all_cmds)
@@ -1257,108 +1336,50 @@ class Pipeline(Redis):
else:
execute = self._execute_pipeline
stack = self.command_stack
- self.reset()
- conn = self.connection_pool.get_connection('MULTI', self.shard_hint)
+ conn = self.connection or \
+ self.connection_pool.get_connection('MULTI', self.shard_hint)
try:
return execute(conn, stack)
except ConnectionError:
conn.disconnect()
+ # if we watching a variable, the watch is no longer valid since
+ # this conncetion has died.
+ if self.watching:
+ raise WatchError("Watched variable changed.")
return execute(conn, stack)
finally:
- self.connection_pool.release(conn)
-
-
-class RedisConnection(Redis):
- """
- A ``Redis`` which is bound to one single connection, allowing transactional
- commands to be run in a thread-safe manner.
-
- Note that, unlike ``Redis``, ``RedisConnection`` may raise a
- ``ConnectionError`` which should be handled by the caller.
-
- See also: ``Redis.connection()``.
- """
-
- connection = None
-
- def get_connection(self, command_name, options):
- if self.connection is None:
- # XXX: how is the 'command_name' used?
- self.connection = self.connection_pool.get_connection(command_name,
- **options)
- return self.connection
-
- def execute_command(self, *args, **options):
- """
- Execute a command and return a parsed response.
-
- Note: unlike Redis.execute_command, this may raise a
- ``ConnectionError``, which should be handled by the calling code.
- """
-
- command_name = args[0]
- connection = self.get_connection(command_name, options)
- connection.send_command(*args)
- return self.parse_response(connection, command_name, **options)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.close()
-
- def close(self):
- "If a connection exists, return it to the connection pool."
- if self.connection is not None:
- # XXX: some logic could be added here to only call ``discard`` if
- # ``multi`` or ``watch`` were issued.
- self.discard()
- self.connection_pool.release(self.connection)
- self.connection = None
-
- def pipeline(self, transaction=True, shard_hint=None):
- # XXX: I don't think pipelines make any sense on a connection which is
- # "bound" like this. Am I wrong in this?
- raise Exception("not done yet")
+ self.reset()
def watch(self, *names):
"""
- Watches the values at keys ``names``, or None if the key doesn't exist
+ Watches the values at keys ``names``
"""
+ if not self.transaction:
+ raise RedisError("Can only WATCH when using transactions")
+ # if more than 'MULTI' is in the command_stack, we can't WATCH anymore
+ if self.watching and len(self.command_stack) > 1:
+ raise RedisError("Can only WATCH before issuing pipeline commands")
+ self.watching = True
return self.execute_command('WATCH', *names)
def unwatch(self):
"""
- Unwatches the all watched keys.
- """
- return self.execute_command('UNWATCH')
-
- def multi(self):
- """
- Marks the start of a transaction block.
-
- All further commands will return None until ``execute`` is called.
- """
- self.execute_command('MULTI')
-
- def execute(self):
+ Unwatches all previously specified keys
"""
- Executes all commands which have been executed since the last ``multi``.
-
- Returns a list of each command's result.
- """
- self.execute_command('EXEC')
- # XXX: Need to collect the results and return them
- # XXX: update the docs to note that the command is 'execute' not 'exec'.
- raise Exception("not done yet")
-
- def discard(self):
- """
- Discards all commands which have been executed since the last ``multi``.
- """
- self.execute_command('DISCARD')
-
-
+ if not self.transaction:
+ raise RedisError("Can only UNWATCH when using transactions")
+ # if more than 'MULTI' is in the command_stack, we can't UNWATCH anymore
+ if self.watching:
+ if len(self.command_stack) > 1:
+ raise RedisError("Can only UNWATCH before issuing "
+ "pipeline commands")
+ response = self.execute_command('UNWATCH')
+ else:
+ response = True
+ # it's safe to reset() here because we are no longer bound to a
+ # single connection and we're sure the command stack is empty.
+ self.reset()
+ return response
class LockError(RedisError):
"Errors thrown from the Lock"
diff --git a/redis/connection.py b/redis/connection.py
index 2b48e32..ff7f277 100644
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -89,9 +89,9 @@ class HiredisParser(object):
if not buffer:
raise ConnectionError("Socket closed on remote end")
self._reader.feed(buffer)
- # if the data received doesn't end with \r\n, then there's more in
- # the socket
- if not buffer.endswith('\r\n'):
+ # proactively, but not conclusively, check if more data is in the
+ # buffer. if the data received doesn't end with \n, there's more.
+ if not buffer.endswith('\n'):
continue
response = self._reader.gets()
return response
diff --git a/tests/pipeline.py b/tests/pipeline.py
index 6eda4f2..ee3b3c5 100644
--- a/tests/pipeline.py
+++ b/tests/pipeline.py
@@ -24,6 +24,14 @@ class PipelineTestCase(unittest.TestCase):
]
)
+ def test_pipeline_no_transaction(self):
+ pipe = self.client.pipeline(transaction=False)
+ pipe.set('a', 'a1').set('b', 'b1').set('c', 'c1')
+ self.assertEquals(pipe.execute(), [True, True, True])
+ self.assertEquals(self.client['a'], 'a1')
+ self.assertEquals(self.client['b'], 'b1')
+ self.assertEquals(self.client['c'], 'c1')
+
def test_invalid_command_in_pipeline(self):
# all commands but the invalid one should be excuted correctly
self.client['c'] = 'a'
@@ -46,11 +54,43 @@ class PipelineTestCase(unittest.TestCase):
self.assertEquals(pipe.set('z', 'zzz').execute(), [True])
self.assertEquals(self.client['z'], 'zzz')
- def test_pipeline_no_transaction(self):
- pipe = self.client.pipeline(transaction=False)
- pipe.set('a', 'a1').set('b', 'b1').set('c', 'c1')
- self.assertEquals(pipe.execute(), [True, True, True])
- self.assertEquals(self.client['a'], 'a1')
- self.assertEquals(self.client['b'], 'b1')
- self.assertEquals(self.client['c'], 'c1')
+ def test_watch_succeed(self):
+ self.client.set('a', 1)
+ self.client.set('b', 2)
+
+ pipe = self.client.pipeline()
+ pipe.watch('a', 'b')
+ self.assertEquals(pipe.watching, True)
+ a = pipe.get('a')
+ b = pipe.get('b')
+ self.assertEquals(a, '1')
+ self.assertEquals(b, '2')
+ pipe.multi()
+
+ pipe.set('c', 3)
+ self.assertEquals(pipe.execute(), [True])
+ self.assertEquals(pipe.watching, False)
+ def test_watch_failure(self):
+ self.client.set('a', 1)
+ self.client.set('b', 2)
+
+ pipe = self.client.pipeline()
+ pipe.watch('a', 'b')
+ self.client.set('b', 3)
+ pipe.multi()
+ pipe.get('a')
+ self.assertRaises(redis.WatchError, pipe.execute)
+ self.assertEquals(pipe.watching, False)
+
+ def test_unwatch(self):
+ self.client.set('a', 1)
+ self.client.set('b', 2)
+
+ pipe = self.client.pipeline()
+ pipe.watch('a', 'b')
+ self.client.set('b', 3)
+ pipe.unwatch()
+ self.assertEquals(pipe.watching, False)
+ pipe.get('a')
+ self.assertEquals(pipe.execute(), ['1'])
diff --git a/tests/server_commands.py b/tests/server_commands.py
index c7b58ee..6e9fe99 100644
--- a/tests/server_commands.py
+++ b/tests/server_commands.py
@@ -800,7 +800,6 @@ class ServerCommandsTestCase(unittest.TestCase):
[('a3', 20), ('a1', 23)]
)
-
def test_zrange(self):
# key is not a zset
self.client['a'] = 'a'
@@ -814,10 +813,13 @@ class ServerCommandsTestCase(unittest.TestCase):
[('a1', 1.0), ('a2', 2.0)])
self.assertEquals(self.client.zrange('a', 1, 2, withscores=True),
[('a2', 2.0), ('a3', 3.0)])
+ # test a custom score casting function returns the correct value
+ self.assertEquals(
+ self.client.zrange('a', 0, 1, withscores=True, score_cast_func=int),
+ [('a1', 1), ('a2', 2)])
# a non existant key should return empty list
self.assertEquals(self.client.zrange('b', 0, 1, withscores=True), [])
-
def test_zrangebyscore(self):
# key is not a zset
self.client['a'] = 'a'