summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authorRandall Leeds <randall.leeds@gmail.com>2011-07-11 19:22:13 -0700
committerRandall Leeds <randall.leeds@gmail.com>2011-07-11 19:25:19 -0700
commit4bc9b77431955ba317318410ce70f9b38367ee3a (patch)
tree9d29a6503b8097646215c207880ea01e8cff3f9a /redis/client.py
parent82ca44f2572fdd04a63fa91158f4b6f6435527bc (diff)
parent964196837ef7870b858f5a53c388eec13a059d51 (diff)
downloadredis-py-4bc9b77431955ba317318410ce70f9b38367ee3a.tar.gz
Merge remote-tracking branch 'andymccurdy/watch' into watch_fixes
Conflicts: redis/client.py tests/server_commands.py
Diffstat (limited to 'redis/client.py')
-rw-r--r--redis/client.py235
1 files changed, 128 insertions, 107 deletions
diff --git a/redis/client.py b/redis/client.py
index 310f8f8..21f490e 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -81,8 +81,9 @@ def zset_score_pairs(response, **options):
"""
if not response or not options['withscores']:
return response
+ score_cast_func = options.get('score_cast_func', float)
it = iter(response)
- return zip(it, imap(float, it))
+ return zip(it, imap(score_cast_func, it))
def int_or_none(response):
if response is None:
@@ -519,6 +520,18 @@ class Redis(object):
"Returns the type of key ``name``"
return self.execute_command('TYPE', name)
+ def watch(self, *names):
+ """
+ Watches the values at keys ``names``, or None if the key doesn't exist
+ """
+ warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))
+
+ def unwatch(self):
+ """
+ Unwatches the value at key ``name``, or None of the key doesn't exist
+ """
+ warnings.warn(DeprecationWarning('Call UNWATCH from a Pipeline object'))
+
#### LIST COMMANDS ####
def blpop(self, keys, timeout=0):
"""
@@ -829,27 +842,31 @@ class Redis(object):
"""
return self._zaggregate('ZINTERSTORE', dest, keys, aggregate)
- def zrange(self, name, start, end, desc=False, withscores=False):
+ def zrange(self, name, start, end, desc=False, withscores=False,
+ score_cast_func=float):
"""
Return a range of values from sorted set ``name`` between
``start`` and ``end`` sorted in ascending order.
``start`` and ``end`` can be negative, indicating the end of the range.
- ``desc`` indicates to sort in descending order.
+ ``desc`` a boolean indicating whether to sort the results descendingly
``withscores`` indicates to return the scores along with the values.
The return type is a list of (value, score) pairs
+
+ ``score_cast_func`` a callable used to cast the score return value
"""
if desc:
return self.zrevrange(name, start, end, withscores)
pieces = ['ZRANGE', name, start, end]
if withscores:
pieces.append('withscores')
- return self.execute_command(*pieces, **{'withscores': withscores})
+ options = {'withscores': withscores, 'score_cast_func': score_cast_func}
+ return self.execute_command(*pieces, **options)
def zrangebyscore(self, name, min, max,
- start=None, num=None, withscores=False):
+ start=None, num=None, withscores=False, score_cast_func=float):
"""
Return a range of values from the sorted set ``name`` with scores
between ``min`` and ``max``.
@@ -859,6 +876,8 @@ class Redis(object):
``withscores`` indicates to return the scores along with the values.
The return type is a list of (value, score) pairs
+
+ `score_cast_func`` a callable used to cast the score return value
"""
if (start is not None and num is None) or \
(num is not None and start is None):
@@ -868,7 +887,8 @@ class Redis(object):
pieces.extend(['LIMIT', start, num])
if withscores:
pieces.append('withscores')
- return self.execute_command(*pieces, **{'withscores': withscores})
+ options = {'withscores': withscores, 'score_cast_func': score_cast_func}
+ return self.execute_command(*pieces, **options)
def zrank(self, name, value):
"""
@@ -897,7 +917,8 @@ class Redis(object):
"""
return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
- def zrevrange(self, name, start, num, withscores=False):
+ def zrevrange(self, name, start, num, withscores=False,
+ score_cast_func=float):
"""
Return a range of values from sorted set ``name`` between
``start`` and ``num`` sorted in descending order.
@@ -906,14 +927,17 @@ class Redis(object):
``withscores`` indicates to return the scores along with the values
The return type is a list of (value, score) pairs
+
+ ``score_cast_func`` a callable used to cast the score return value
"""
pieces = ['ZREVRANGE', name, start, num]
if withscores:
pieces.append('withscores')
- return self.execute_command(*pieces, **{'withscores': withscores})
+ options = {'withscores': withscores, 'score_cast_func': score_cast_func}
+ return self.execute_command(*pieces, **options)
def zrevrangebyscore(self, name, max, min,
- start=None, num=None, withscores=False):
+ start=None, num=None, withscores=False, score_cast_func=float):
"""
Return a range of values from the sorted set ``name`` with scores
between ``min`` and ``max`` in descending order.
@@ -923,6 +947,8 @@ class Redis(object):
``withscores`` indicates to return the scores along with the values.
The return type is a list of (value, score) pairs
+
+ ``score_cast_func`` a callable used to cast the score return value
"""
if (start is not None and num is None) or \
(num is not None and start is None):
@@ -932,7 +958,8 @@ class Redis(object):
pieces.extend(['LIMIT', start, num])
if withscores:
pieces.append('withscores')
- return self.execute_command(*pieces, **{'withscores': withscores})
+ options = {'withscores': withscores, 'score_cast_func': score_cast_func}
+ return self.execute_command(*pieces, **options)
def zrevrank(self, name, value):
"""
@@ -1136,13 +1163,6 @@ class PubSub(object):
pass
return self.execute_command('UNSUBSCRIBE', *channels)
- def publish(self, channel, message):
- """
- Publish ``message`` on ``channel``.
- Returns the number of subscribers the message was delivered to.
- """
- return self.execute_command('PUBLISH', channel, message)
-
def listen(self):
"Listen for messages on channels this client has been subscribed to"
while self.subscription_count:
@@ -1185,17 +1205,76 @@ class Pipeline(Redis):
def __init__(self, connection_pool, response_callbacks, transaction,
shard_hint):
self.connection_pool = connection_pool
+ self.connection = None
self.response_callbacks = response_callbacks
self.transaction = transaction
self.shard_hint = shard_hint
+
+ self._real_exec = self.default_execute_command
+ self._pipe_exec = self.pipeline_execute_command
+ self._watching = False
self.reset()
+ def _get_watch(self):
+ return self._watching
+
+ def _set_watch(self, value):
+ self._watching = value
+ self.execute_command = value and self._real_exec or self._pipe_exec
+
+ watching = property(_get_watch, _set_watch)
+
def reset(self):
self.command_stack = []
+ # make sure to reset the connection state in the event that we were
+ # watching something
+ if self.watching and self.connection:
+ try:
+ # call this manually since our unwatch or
+ # default_execute_command methods can call reset()
+ self.connection.send_command('UNWATCH')
+ self.connection.read_response()
+ except ConnectionError:
+ # disconnect will also remove any previous WATCHes
+ self.connection.disconnect()
+ # clean up the other instance attributes
+ self.watching = False
if self.transaction:
self.execute_command('MULTI')
+ # we can safely return the connection to the pool here since we're
+ # sure we're no longer WATCHing anything
+ if self.connection:
+ self.connection_pool.release(self.connection)
+ self.connection = None
- def execute_command(self, *args, **options):
+ def multi(self):
+ """
+ Start a transactional block of the pipeline after WATCH commands
+ are issued. End the transactional block with `execute`.
+ """
+ self.execute_command = self._pipe_exec
+
+ def default_execute_command(self, *args, **options):
+ """
+ Execute a command, but don't auto-retry on a ConnectionError. Used
+ when issuing WATCH or subsequent commands retrieving their values
+ but before MULTI is called.
+ """
+ command_name = args[0]
+ conn = self.connection
+ # if this is the first call, we need a connection
+ if not conn:
+ conn = self.connection_pool.get_connection(command_name,
+ self.shard_hint)
+ self.connection = conn
+ try:
+ conn.send_command(*args)
+ return self.parse_response(conn, command_name, **options)
+ except ConnectionError:
+ self.reset()
+ raise
+
+ def pipeline_execute_command(self, *args, **options):
"""
Stage a command to be executed when execute() is next called
@@ -1229,7 +1308,7 @@ class Pipeline(Redis):
if len(response) != len(commands):
raise ResponseError("Wrong number of response items from "
- "pipeline execution")
+ "pipeline execution")
# We have to run response callbacks manually
data = []
for r, cmd in izip(response, commands):
@@ -1242,7 +1321,7 @@ class Pipeline(Redis):
return data
def _execute_pipeline(self, connection, commands):
- # build up all commands into a single request to increase network perf
+ # build up all commands into a single request to increase network perf
all_cmds = ''.join(starmap(connection.pack_command,
[args for args, options in commands]))
connection.send_packed_command(all_cmds)
@@ -1257,108 +1336,50 @@ class Pipeline(Redis):
else:
execute = self._execute_pipeline
stack = self.command_stack
- self.reset()
- conn = self.connection_pool.get_connection('MULTI', self.shard_hint)
+ conn = self.connection or \
+ self.connection_pool.get_connection('MULTI', self.shard_hint)
try:
return execute(conn, stack)
except ConnectionError:
conn.disconnect()
+ # if we watching a variable, the watch is no longer valid since
+ # this conncetion has died.
+ if self.watching:
+ raise WatchError("Watched variable changed.")
return execute(conn, stack)
finally:
- self.connection_pool.release(conn)
-
-
-class RedisConnection(Redis):
- """
- A ``Redis`` which is bound to one single connection, allowing transactional
- commands to be run in a thread-safe manner.
-
- Note that, unlike ``Redis``, ``RedisConnection`` may raise a
- ``ConnectionError`` which should be handled by the caller.
-
- See also: ``Redis.connection()``.
- """
-
- connection = None
-
- def get_connection(self, command_name, options):
- if self.connection is None:
- # XXX: how is the 'command_name' used?
- self.connection = self.connection_pool.get_connection(command_name,
- **options)
- return self.connection
-
- def execute_command(self, *args, **options):
- """
- Execute a command and return a parsed response.
-
- Note: unlike Redis.execute_command, this may raise a
- ``ConnectionError``, which should be handled by the calling code.
- """
-
- command_name = args[0]
- connection = self.get_connection(command_name, options)
- connection.send_command(*args)
- return self.parse_response(connection, command_name, **options)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.close()
-
- def close(self):
- "If a connection exists, return it to the connection pool."
- if self.connection is not None:
- # XXX: some logic could be added here to only call ``discard`` if
- # ``multi`` or ``watch`` were issued.
- self.discard()
- self.connection_pool.release(self.connection)
- self.connection = None
-
- def pipeline(self, transaction=True, shard_hint=None):
- # XXX: I don't think pipelines make any sense on a connection which is
- # "bound" like this. Am I wrong in this?
- raise Exception("not done yet")
+ self.reset()
def watch(self, *names):
"""
- Watches the values at keys ``names``, or None if the key doesn't exist
+ Watches the values at keys ``names``
"""
+ if not self.transaction:
+ raise RedisError("Can only WATCH when using transactions")
+ # if more than 'MULTI' is in the command_stack, we can't WATCH anymore
+ if self.watching and len(self.command_stack) > 1:
+ raise RedisError("Can only WATCH before issuing pipeline commands")
+ self.watching = True
return self.execute_command('WATCH', *names)
def unwatch(self):
"""
- Unwatches the all watched keys.
- """
- return self.execute_command('UNWATCH')
-
- def multi(self):
- """
- Marks the start of a transaction block.
-
- All further commands will return None until ``execute`` is called.
- """
- self.execute_command('MULTI')
-
- def execute(self):
+ Unwatches all previously specified keys
"""
- Executes all commands which have been executed since the last ``multi``.
-
- Returns a list of each command's result.
- """
- self.execute_command('EXEC')
- # XXX: Need to collect the results and return them
- # XXX: update the docs to note that the command is 'execute' not 'exec'.
- raise Exception("not done yet")
-
- def discard(self):
- """
- Discards all commands which have been executed since the last ``multi``.
- """
- self.execute_command('DISCARD')
-
-
+ if not self.transaction:
+ raise RedisError("Can only UNWATCH when using transactions")
+ # if more than 'MULTI' is in the command_stack, we can't UNWATCH anymore
+ if self.watching:
+ if len(self.command_stack) > 1:
+ raise RedisError("Can only UNWATCH before issuing "
+ "pipeline commands")
+ response = self.execute_command('UNWATCH')
+ else:
+ response = True
+ # it's safe to reset() here because we are no longer bound to a
+ # single connection and we're sure the command stack is empty.
+ self.reset()
+ return response
class LockError(RedisError):
"Errors thrown from the Lock"