summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authorKonstantin Merenkov <kmerenkov@gmail.com>2010-04-01 02:14:03 +0400
committerKonstantin Merenkov <kmerenkov@gmail.com>2010-04-01 02:14:03 +0400
commit9a4ee548b8066214fec0cbbb40fcd9812c3d17c7 (patch)
treeb777cc23fb8dfe2d48d5c483040fde14d77f45c7 /redis/client.py
parente2772738e249ea47e5424627eca28f8689ab1dd8 (diff)
downloadredis-py-9a4ee548b8066214fec0cbbb40fcd9812c3d17c7.tar.gz
Move towards binary-safeness of keys and values.
* Dropped support of all protocols except multi-bulk the only protocol that yet to be released redis 2.0 is able understand * As a side-effect there is an 'execute_command' function exposed to the library user. As for now it is pretty useless :) * Added (only) two tests
Diffstat (limited to 'redis/client.py')
-rw-r--r--redis/client.py245
1 files changed, 119 insertions, 126 deletions
diff --git a/redis/client.py b/redis/client.py
index ee3f87a..f9357d3 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -249,28 +249,44 @@ class Redis(threading.local):
def pipeline(self):
return Pipeline(self.connection, self.encoding, self.errors)
+
#### COMMAND EXECUTION AND PROTOCOL PARSING ####
def _execute_command(self, command_name, command, **options):
+ "Sends the command to the Redis server and returns it's response"
subscription_command = command_name in self.SUBSCRIPTION_COMMANDS
if self.subscribed and not subscription_command:
raise RedisError("Cannot issue commands other than SUBSCRIBE and "
"UNSUBSCRIBE while channels are open")
- self.connection.send(command, self)
- if subscription_command:
- return None
- return self.parse_response(command_name, **options)
-
- def execute_command(self, command_name, command, **options):
- "Sends the command to the Redis server and returns it's response"
try:
- return self._execute_command(command_name, command, **options)
+ self.connection.send(command, self)
+ response = self.parse_response(command_name, **options)
+ if subscription_command:
+ return None
+ return response
except ConnectionError:
self.connection.disconnect()
- return self._execute_command(command_name, command, **options)
+ self.connection.send(command, self)
+ response = self.parse_response(command_name, **options)
+ if subscription_command:
+ return None
+ return response
+
+ def execute_command(self, *args, **options):
+ "Sends command to redis server and returns response"
+ cmd_count = len(args)
+ cmds = []
+ for i in args:
+ enc_value = self.encode(i)
+ cmds.append('$%s\r\n%s\r\n' % (len(enc_value), enc_value))
+ return self._execute_command(
+ args[0],
+ '*%s\r\n%s' % (cmd_count, ''.join(cmds)),
+ **options
+ )
def _parse_response(self, command_name, catch_errors):
conn = self.connection
- response = conn.read().strip()
+ response = conn.read()[:-2] # strip last two characters (\r\n)
if not response:
self.connection.disconnect()
raise ConnectionError("Socket closed on remote end")
@@ -338,34 +354,6 @@ class Redis(threading.local):
# not a string or unicode, attempt to convert to a string
return str(value)
- def format_inline(self, *args, **options):
- "Formats a request with the inline protocol"
- cmd = '%s\r\n' % ' '.join([self.encode(a) for a in args])
- return self.execute_command(args[0], cmd, **options)
-
- def format_bulk(self, *args, **options):
- "Formats a request with the bulk protocol"
- bulk_value = self.encode(args[-1])
- cmd = '%s %s\r\n%s\r\n' % (
- ' '.join([self.encode(a) for a in args[:-1]]),
- len(bulk_value),
- bulk_value,
- )
- return self.execute_command(args[0], cmd, **options)
-
- def format_multi_bulk(self, *args, **options):
- "Formats the request with the multi-bulk protocol"
- cmd_count = len(args)
- cmds = []
- for i in args:
- enc_value = self.encode(i)
- cmds.append('$%s\r\n%s\r\n' % (len(enc_value), enc_value))
- return self.execute_command(
- args[0],
- '*%s\r\n%s' % (cmd_count, ''.join(cmds)),
- **options
- )
-
#### CONNECTION HANDLING ####
def get_connection(self, host, port, db, password, socket_timeout):
"Returns a connection object"
@@ -384,9 +372,9 @@ class Redis(threading.local):
the appropriate database.
"""
if self.connection.password:
- if not self.format_inline('AUTH', self.connection.password):
+ if not self.execute_command('AUTH', self.connection.password):
raise AuthenticationError("Invalid Password")
- self.format_inline('SELECT', self.connection.db)
+ self.execute_command('SELECT', self.connection.db)
def select(self, db, host=None, port=None, password=None,
socket_timeout=None):
@@ -419,15 +407,15 @@ class Redis(threading.local):
Tell the Redis server to save its data to disk. Unlike save(),
this method is asynchronous and returns immediately.
"""
- return self.format_inline('BGSAVE')
+ return self.execute_command('BGSAVE')
def dbsize(self):
"Returns the number of keys in the current database"
- return self.format_inline('DBSIZE')
+ return self.execute_command('DBSIZE')
def delete(self, *names):
"Delete one or more keys specified by ``names``"
- return self.format_inline('DEL', *names)
+ return self.execute_command('DEL', *names)
__delitem__ = delete
def flush(self, all_dbs=False):
@@ -440,33 +428,33 @@ class Redis(threading.local):
def flushall(self):
"Delete all keys in all databases on the current host"
- return self.format_inline('FLUSHALL')
+ return self.execute_command('FLUSHALL')
def flushdb(self):
"Delete all keys in the current database"
- return self.format_inline('FLUSHDB')
+ return self.execute_command('FLUSHDB')
def info(self):
"Returns a dictionary containing information about the Redis server"
- return self.format_inline('INFO')
+ return self.execute_command('INFO')
def lastsave(self):
"""
Return a Python datetime object representing the last time the
Redis database was saved to disk
"""
- return self.format_inline('LASTSAVE')
+ return self.execute_command('LASTSAVE')
def ping(self):
"Ping the Redis server"
- return self.format_inline('PING')
+ return self.execute_command('PING')
def save(self):
"""
Tell the Redis server to save its data to disk,
blocking until the save is complete
"""
- return self.format_inline('SAVE')
+ return self.execute_command('SAVE')
#### BASIC KEY COMMANDS ####
def decr(self, name, amount=1):
@@ -474,22 +462,22 @@ class Redis(threading.local):
Decrements the value of ``key`` by ``amount``. If no key exists,
the value will be initialized as 0 - ``amount``
"""
- return self.format_inline('DECRBY', name, amount)
+ return self.execute_command('DECRBY', name, amount)
def exists(self, name):
"Returns a boolean indicating whether key ``name`` exists"
- return self.format_inline('EXISTS', name)
+ return self.execute_command('EXISTS', name)
__contains__ = exists
def expire(self, name, time):
"Set an expire on key ``name`` for ``time`` seconds"
- return self.format_inline('EXPIRE', name, time)
+ return self.execute_command('EXPIRE', name, time)
def get(self, name):
"""
Return the value at key ``name``, or None of the key doesn't exist
"""
- return self.format_inline('GET', name)
+ return self.execute_command('GET', name)
__getitem__ = get
def getset(self, name, value):
@@ -497,18 +485,18 @@ class Redis(threading.local):
Set the value at key ``name`` to ``value`` if key doesn't exist
Return the value at key ``name`` atomically
"""
- return self.format_bulk('GETSET', name, value)
+ return self.execute_command('GETSET', name, value)
def incr(self, name, amount=1):
"""
Increments the value of ``key`` by ``amount``. If no key exists,
the value will be initialized as ``amount``
"""
- return self.format_inline('INCRBY', name, amount)
+ return self.execute_command('INCRBY', name, amount)
def keys(self, pattern='*'):
"Returns a list of keys matching ``pattern``"
- return self.format_inline('KEYS', pattern)
+ return self.execute_command('KEYS', pattern)
def mget(self, keys, *args):
"""
@@ -517,13 +505,13 @@ class Redis(threading.local):
* Passing *args to this method has been deprecated *
"""
keys = list_or_args('mget', keys, args)
- return self.format_inline('MGET', *keys)
+ return self.execute_command('MGET', *keys)
def mset(self, mapping):
"Sets each key in the ``mapping`` dict to its corresponding value"
items = []
[items.extend(pair) for pair in mapping.iteritems()]
- return self.format_multi_bulk('MSET', *items)
+ return self.execute_command('MSET', *items)
def msetnx(self, mapping):
"""
@@ -532,15 +520,15 @@ class Redis(threading.local):
"""
items = []
[items.extend(pair) for pair in mapping.iteritems()]
- return self.format_multi_bulk('MSETNX', *items)
+ return self.execute_command('MSETNX', *items)
def move(self, name, db):
"Moves the key ``name`` to a different Redis database ``db``"
- return self.format_inline('MOVE', name, db)
+ return self.execute_command('MOVE', name, db)
def randomkey(self):
"Returns the name of a random key"
- return self.format_inline('RANDOMKEY')
+ return self.execute_command('RANDOMKEY')
def rename(self, src, dst, **kwargs):
"""
@@ -557,11 +545,11 @@ class Redis(threading.local):
"use Redis.renamenx instead"))
if kwargs['preserve']:
return self.renamenx(src, dst)
- return self.format_inline('RENAME', src, dst)
+ return self.execute_command('RENAME', src, dst)
def renamenx(self, src, dst):
"Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist"
- return self.format_inline('RENAMENX', src, dst)
+ return self.execute_command('RENAMENX', src, dst)
def set(self, name, value, **kwargs):
@@ -587,20 +575,20 @@ class Redis(threading.local):
"use Redis.setnx() instead"))
if kwargs['preserve']:
return self.setnx(name, value)
- return self.format_bulk('SET', name, value)
+ return self.execute_command('SET', name, value)
__setitem__ = set
def setnx(self, name, value):
"Set the value of key ``name`` to ``value`` if key doesn't exist"
- return self.format_bulk('SETNX', name, value)
+ return self.execute_command('SETNX', name, value)
def ttl(self, name):
"Returns the number of seconds until the key ``name`` will expire"
- return self.format_inline('TTL', name)
+ return self.execute_command('TTL', name)
def type(self, name):
"Returns the type of key ``name``"
- return self.format_inline('TYPE', name)
+ return self.execute_command('TYPE', name)
#### LIST COMMANDS ####
@@ -617,7 +605,7 @@ class Redis(threading.local):
"""
keys = list(keys)
keys.append(timeout)
- return self.format_inline('BLPOP', *keys)
+ return self.execute_command('BLPOP', *keys)
def brpop(self, keys, timeout=0):
"""
@@ -632,7 +620,7 @@ class Redis(threading.local):
"""
keys = list(keys)
keys.append(timeout)
- return self.format_inline('BRPOP', *keys)
+ return self.execute_command('BRPOP', *keys)
def lindex(self, name, index):
"""
@@ -641,19 +629,19 @@ class Redis(threading.local):
Negative indexes are supported and will return an item at the
end of the list
"""
- return self.format_inline('LINDEX', name, index)
+ return self.execute_command('LINDEX', name, index)
def llen(self, name):
"Return the length of the list ``name``"
- return self.format_inline('LLEN', name)
+ return self.execute_command('LLEN', name)
def lpop(self, name):
"Remove and return the first item of the list ``name``"
- return self.format_inline('LPOP', name)
+ return self.execute_command('LPOP', name)
def lpush(self, name, value):
"Push ``value`` onto the head of the list ``name``"
- return self.format_bulk('LPUSH', name, value)
+ return self.execute_command('LPUSH', name, value)
def lrange(self, name, start, end):
"""
@@ -663,7 +651,7 @@ class Redis(threading.local):
``start`` and ``end`` can be negative numbers just like
Python slicing notation
"""
- return self.format_inline('LRANGE', name, start, end)
+ return self.execute_command('LRANGE', name, start, end)
def lrem(self, name, value, num=0):
"""
@@ -671,11 +659,11 @@ class Redis(threading.local):
If ``num`` is 0, then all occurrences will be removed
"""
- return self.format_bulk('LREM', name, num, value)
+ return self.execute_command('LREM', name, num, value)
def lset(self, name, index, value):
"Set ``position`` of list ``name`` to ``value``"
- return self.format_bulk('LSET', name, index, value)
+ return self.execute_command('LSET', name, index, value)
def ltrim(self, name, start, end):
"""
@@ -685,7 +673,7 @@ class Redis(threading.local):
``start`` and ``end`` can be negative numbers just like
Python slicing notation
"""
- return self.format_inline('LTRIM', name, start, end)
+ return self.execute_command('LTRIM', name, start, end)
def pop(self, name, tail=False):
"""
@@ -717,18 +705,18 @@ class Redis(threading.local):
def rpop(self, name):
"Remove and return the last item of the list ``name``"
- return self.format_inline('RPOP', name)
+ return self.execute_command('RPOP', name)
def rpoplpush(self, src, dst):
"""
RPOP a value off of the ``src`` list and atomically LPUSH it
on to the ``dst`` list. Returns the value.
"""
- return self.format_inline('RPOPLPUSH', src, dst)
+ return self.execute_command('RPOPLPUSH', src, dst)
def rpush(self, name, value):
"Push ``value`` onto the tail of the list ``name``"
- return self.format_bulk('RPUSH', name, value)
+ return self.execute_command('RPUSH', name, value)
def sort(self, name, start=None, num=None, by=None, get=None,
desc=False, alpha=False, store=None):
@@ -757,33 +745,38 @@ class Redis(threading.local):
pieces = [name]
if by is not None:
- pieces.append('BY %s' % by)
+ pieces.append('BY')
+ pieces.append(by)
if start is not None and num is not None:
- pieces.append('LIMIT %s %s' % (start, num))
+ pieces.append('LIMIT')
+ pieces.append(start)
+ pieces.append(num)
if get is not None:
- pieces.append('GET %s' % get)
+ pieces.append('GET')
+ pieces.append(get)
if desc:
pieces.append('DESC')
if alpha:
pieces.append('ALPHA')
if store is not None:
- pieces.append('STORE %s' % store)
- return self.format_inline('SORT', *pieces)
+ pieces.append('STORE')
+ pieces.append(store)
+ return self.execute_command('SORT', *pieces)
#### SET COMMANDS ####
def sadd(self, name, value):
"Add ``value`` to set ``name``"
- return self.format_bulk('SADD', name, value)
+ return self.execute_command('SADD', name, value)
def scard(self, name):
"Return the number of elements in set ``name``"
- return self.format_inline('SCARD', name)
+ return self.execute_command('SCARD', name)
def sdiff(self, keys, *args):
"Return the difference of sets specified by ``keys``"
keys = list_or_args('sdiff', keys, args)
- return self.format_inline('SDIFF', *keys)
+ return self.execute_command('SDIFF', *keys)
def sdiffstore(self, dest, keys, *args):
"""
@@ -791,12 +784,12 @@ class Redis(threading.local):
set named ``dest``. Returns the number of keys in the new set.
"""
keys = list_or_args('sdiffstore', keys, args)
- return self.format_inline('SDIFFSTORE', dest, *keys)
+ return self.execute_command('SDIFFSTORE', dest, *keys)
def sinter(self, keys, *args):
"Return the intersection of sets specified by ``keys``"
keys = list_or_args('sinter', keys, args)
- return self.format_inline('SINTER', *keys)
+ return self.execute_command('SINTER', *keys)
def sinterstore(self, dest, keys, *args):
"""
@@ -804,36 +797,36 @@ class Redis(threading.local):
set named ``dest``. Returns the number of keys in the new set.
"""
keys = list_or_args('sinterstore', keys, args)
- return self.format_inline('SINTERSTORE', dest, *keys)
+ return self.execute_command('SINTERSTORE', dest, *keys)
def sismember(self, name, value):
"Return a boolean indicating if ``value`` is a member of set ``name``"
- return self.format_bulk('SISMEMBER', name, value)
+ return self.execute_command('SISMEMBER', name, value)
def smembers(self, name):
"Return all members of the set ``name``"
- return self.format_inline('SMEMBERS', name)
+ return self.execute_command('SMEMBERS', name)
def smove(self, src, dst, value):
"Move ``value`` from set ``src`` to set ``dst`` atomically"
- return self.format_bulk('SMOVE', src, dst, value)
+ return self.execute_command('SMOVE', src, dst, value)
def spop(self, name):
"Remove and return a random member of set ``name``"
- return self.format_inline('SPOP', name)
+ return self.execute_command('SPOP', name)
def srandmember(self, name):
"Return a random member of set ``name``"
- return self.format_inline('SRANDMEMBER', name)
+ return self.execute_command('SRANDMEMBER', name)
def srem(self, name, value):
"Remove ``value`` from set ``name``"
- return self.format_bulk('SREM', name, value)
+ return self.execute_command('SREM', name, value)
def sunion(self, keys, *args):
"Return the union of sets specifiued by ``keys``"
keys = list_or_args('sunion', keys, args)
- return self.format_inline('SUNION', *keys)
+ return self.execute_command('SUNION', *keys)
def sunionstore(self, dest, keys, *args):
"""
@@ -841,17 +834,17 @@ class Redis(threading.local):
set named ``dest``. Returns the number of keys in the new set.
"""
keys = list_or_args('sunionstore', keys, args)
- return self.format_inline('SUNIONSTORE', dest, *keys)
+ return self.execute_command('SUNIONSTORE', dest, *keys)
#### SORTED SET COMMANDS ####
def zadd(self, name, value, score):
"Add member ``value`` with score ``score`` to sorted set ``name``"
- return self.format_bulk('ZADD', name, score, value)
+ return self.execute_command('ZADD', name, score, value)
def zcard(self, name):
"Return the number of elements in the sorted set ``name``"
- return self.format_inline('ZCARD', name)
+ return self.execute_command('ZCARD', name)
def zincr(self, key, member, value=1):
"This has been deprecated, use zincrby instead"
@@ -862,7 +855,7 @@ class Redis(threading.local):
def zincrby(self, name, value, amount=1):
"Increment the score of ``value`` in sorted set ``name`` by ``amount``"
- return self.format_bulk('ZINCRBY', name, amount, value)
+ return self.execute_command('ZINCRBY', name, amount, value)
def zrange(self, name, start, end, desc=False, withscores=False):
"""
@@ -881,7 +874,7 @@ class Redis(threading.local):
pieces = ['ZRANGE', name, start, end]
if withscores:
pieces.append('withscores')
- return self.format_inline(*pieces, **{'withscores': withscores})
+ return self.execute_command(*pieces, **{'withscores': withscores})
def zrangebyscore(self, name, min, max,
start=None, num=None, withscores=False):
@@ -902,25 +895,25 @@ class Redis(threading.local):
pieces.extend(['LIMIT', start, num])
if withscores:
pieces.append('withscores')
- return self.format_inline(*pieces, **{'withscores': withscores})
+ return self.execute_command(*pieces, **{'withscores': withscores})
def zrank(self, name, value):
"""
Returns a 0-based value indicating the rank of ``value`` in sorted set
``name``
"""
- return self.format_bulk('ZRANK', name, value)
+ return self.execute_command('ZRANK', name, value)
def zrem(self, name, value):
"Remove member ``value`` from sorted set ``name``"
- return self.format_bulk('ZREM', name, value)
+ return self.execute_command('ZREM', name, value)
def zremrangebyscore(self, name, min, max):
"""
Remove all elements in the sorted set ``name`` with scores
between ``min`` and ``max``
"""
- return self.format_inline('ZREMRANGEBYSCORE', name, min, max)
+ return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
def zrevrange(self, name, start, num, withscores=False):
"""
@@ -935,59 +928,59 @@ class Redis(threading.local):
pieces = ['ZREVRANGE', name, start, num]
if withscores:
pieces.append('withscores')
- return self.format_inline(*pieces, **{'withscores': withscores})
+ return self.execute_command(*pieces, **{'withscores': withscores})
def zrevrank(self, name, value):
"""
Returns a 0-based value indicating the descending rank of
``value`` in sorted set ``name``
"""
- return self.format_bulk('ZREVRANK', name, value)
+ return self.execute_command('ZREVRANK', name, value)
def zscore(self, name, value):
"Return the score of element ``value`` in sorted set ``name``"
- return self.format_bulk('ZSCORE', name, value)
+ return self.execute_command('ZSCORE', name, value)
#### HASH COMMANDS ####
def hdel(self, name, key):
"Delete ``key`` from hash ``name``"
- return self.format_bulk('HDEL', name, key)
+ return self.execute_command('HDEL', name, key)
def hexists(self, name, key):
"Returns a boolean indicating if ``key`` exists within hash ``name``"
- return self.format_bulk('HEXISTS', name, key)
+ return self.execute_command('HEXISTS', name, key)
def hget(self, name, key):
"Return the value of ``key`` within the hash ``name``"
- return self.format_bulk('HGET', name, key)
+ return self.execute_command('HGET', name, key)
def hgetall(self, name):
"Return a Python dict of the hash's name/value pairs"
- return self.format_inline('HGETALL', name)
+ return self.execute_command('HGETALL', name)
def hincrby(self, name, key, amount=1):
"Increment the value of ``key`` in hash ``name`` by ``amount``"
- return self.format_inline('HINCRBY', name, key, amount)
+ return self.execute_command('HINCRBY', name, key, amount)
def hkeys(self, name):
"Return the list of keys within hash ``name``"
- return self.format_inline('HKEYS', name)
+ return self.execute_command('HKEYS', name)
def hlen(self, name):
"Return the number of elements in hash ``name``"
- return self.format_inline('HLEN', name)
+ return self.execute_command('HLEN', name)
def hset(self, name, key, value):
"""
Set ``key`` to ``value`` within hash ``name``
Returns 1 if HSET created a new field, otherwise 0
"""
- return self.format_multi_bulk('HSET', name, key, value)
+ return self.execute_command('HSET', name, key, value)
def hvals(self, name):
"Return the list of values within hash ``name``"
- return self.format_inline('HVALS', name)
+ return self.execute_command('HVALS', name)
# channels
@@ -995,7 +988,7 @@ class Redis(threading.local):
"Subscribe to ``channels``, waiting for messages to be published"
if isinstance(channels, basestring):
channels = [channels]
- response = self.format_inline('SUBSCRIBE', *channels)
+ response = self.execute_command('SUBSCRIBE', *channels)
# this is *after* the SUBSCRIBE in order to allow for lazy and broken
# connections that need to issue AUTH and SELECT commands
self.subscribed = True
@@ -1005,14 +998,14 @@ class Redis(threading.local):
"Unsubscribe to ``channels``. If empty, unsubscribe from all channels"
if isinstance(channels, basestring):
channels = [channels]
- return self.format_inline('UNSUBSCRIBE', *channels)
+ return self.execute_command('UNSUBSCRIBE', *channels)
def publish(self, channel, message):
"""
Publish ``message`` on ``channel``.
Returns the number of subscribers the message was delivered to.
"""
- return self.format_bulk('PUBLISH', channel, message)
+ return self.execute_command('PUBLISH', channel, message)
def listen(self):
"Listen for messages on channels this client has been subscribed to"
@@ -1051,9 +1044,9 @@ class Pipeline(Redis):
def reset(self):
self.command_stack = []
- self.format_inline('MULTI')
+ self.execute_command('MULTI')
- def execute_command(self, command_name, command, **options):
+ def _execute_command(self, command_name, command, **options):
"""
Stage a command to be executed when execute() is next called
@@ -1070,7 +1063,7 @@ class Pipeline(Redis):
# _setup_connection(). run these commands immediately without
# buffering them.
if command_name in ('AUTH', 'SELECT'):
- return super(Pipeline, self).execute_command(
+ return super(Pipeline, self)._execute_command(
command_name, command, **options)
else:
self.command_stack.append((command_name, command, options))
@@ -1103,7 +1096,7 @@ class Pipeline(Redis):
def execute(self):
"Execute all the commands in the current pipeline"
- self.format_inline('EXEC')
+ self.execute_command('EXEC')
stack = self.command_stack
self.reset()
try: