diff options
author | Konstantin Merenkov <kmerenkov@gmail.com> | 2010-04-01 02:14:03 +0400 |
---|---|---|
committer | Konstantin Merenkov <kmerenkov@gmail.com> | 2010-04-01 02:14:03 +0400 |
commit | 9a4ee548b8066214fec0cbbb40fcd9812c3d17c7 (patch) | |
tree | b777cc23fb8dfe2d48d5c483040fde14d77f45c7 /redis/client.py | |
parent | e2772738e249ea47e5424627eca28f8689ab1dd8 (diff) | |
download | redis-py-9a4ee548b8066214fec0cbbb40fcd9812c3d17c7.tar.gz |
Move towards binary-safeness of keys and values.
* Dropped support of all protocols except multi-bulk
the only protocol that yet to be released redis 2.0 is able
understand
* As a side-effect there is an 'execute_command' function exposed
to the library user. As for now it is pretty useless :)
* Added (only) two tests
Diffstat (limited to 'redis/client.py')
-rw-r--r-- | redis/client.py | 245 |
1 files changed, 119 insertions, 126 deletions
diff --git a/redis/client.py b/redis/client.py index ee3f87a..f9357d3 100644 --- a/redis/client.py +++ b/redis/client.py @@ -249,28 +249,44 @@ class Redis(threading.local): def pipeline(self): return Pipeline(self.connection, self.encoding, self.errors) + #### COMMAND EXECUTION AND PROTOCOL PARSING #### def _execute_command(self, command_name, command, **options): + "Sends the command to the Redis server and returns it's response" subscription_command = command_name in self.SUBSCRIPTION_COMMANDS if self.subscribed and not subscription_command: raise RedisError("Cannot issue commands other than SUBSCRIBE and " "UNSUBSCRIBE while channels are open") - self.connection.send(command, self) - if subscription_command: - return None - return self.parse_response(command_name, **options) - - def execute_command(self, command_name, command, **options): - "Sends the command to the Redis server and returns it's response" try: - return self._execute_command(command_name, command, **options) + self.connection.send(command, self) + response = self.parse_response(command_name, **options) + if subscription_command: + return None + return response except ConnectionError: self.connection.disconnect() - return self._execute_command(command_name, command, **options) + self.connection.send(command, self) + response = self.parse_response(command_name, **options) + if subscription_command: + return None + return response + + def execute_command(self, *args, **options): + "Sends command to redis server and returns response" + cmd_count = len(args) + cmds = [] + for i in args: + enc_value = self.encode(i) + cmds.append('$%s\r\n%s\r\n' % (len(enc_value), enc_value)) + return self._execute_command( + args[0], + '*%s\r\n%s' % (cmd_count, ''.join(cmds)), + **options + ) def _parse_response(self, command_name, catch_errors): conn = self.connection - response = conn.read().strip() + response = conn.read()[:-2] # strip last two characters (\r\n) if not response: self.connection.disconnect() raise ConnectionError("Socket closed on remote end") @@ -338,34 +354,6 @@ class Redis(threading.local): # not a string or unicode, attempt to convert to a string return str(value) - def format_inline(self, *args, **options): - "Formats a request with the inline protocol" - cmd = '%s\r\n' % ' '.join([self.encode(a) for a in args]) - return self.execute_command(args[0], cmd, **options) - - def format_bulk(self, *args, **options): - "Formats a request with the bulk protocol" - bulk_value = self.encode(args[-1]) - cmd = '%s %s\r\n%s\r\n' % ( - ' '.join([self.encode(a) for a in args[:-1]]), - len(bulk_value), - bulk_value, - ) - return self.execute_command(args[0], cmd, **options) - - def format_multi_bulk(self, *args, **options): - "Formats the request with the multi-bulk protocol" - cmd_count = len(args) - cmds = [] - for i in args: - enc_value = self.encode(i) - cmds.append('$%s\r\n%s\r\n' % (len(enc_value), enc_value)) - return self.execute_command( - args[0], - '*%s\r\n%s' % (cmd_count, ''.join(cmds)), - **options - ) - #### CONNECTION HANDLING #### def get_connection(self, host, port, db, password, socket_timeout): "Returns a connection object" @@ -384,9 +372,9 @@ class Redis(threading.local): the appropriate database. """ if self.connection.password: - if not self.format_inline('AUTH', self.connection.password): + if not self.execute_command('AUTH', self.connection.password): raise AuthenticationError("Invalid Password") - self.format_inline('SELECT', self.connection.db) + self.execute_command('SELECT', self.connection.db) def select(self, db, host=None, port=None, password=None, socket_timeout=None): @@ -419,15 +407,15 @@ class Redis(threading.local): Tell the Redis server to save its data to disk. Unlike save(), this method is asynchronous and returns immediately. """ - return self.format_inline('BGSAVE') + return self.execute_command('BGSAVE') def dbsize(self): "Returns the number of keys in the current database" - return self.format_inline('DBSIZE') + return self.execute_command('DBSIZE') def delete(self, *names): "Delete one or more keys specified by ``names``" - return self.format_inline('DEL', *names) + return self.execute_command('DEL', *names) __delitem__ = delete def flush(self, all_dbs=False): @@ -440,33 +428,33 @@ class Redis(threading.local): def flushall(self): "Delete all keys in all databases on the current host" - return self.format_inline('FLUSHALL') + return self.execute_command('FLUSHALL') def flushdb(self): "Delete all keys in the current database" - return self.format_inline('FLUSHDB') + return self.execute_command('FLUSHDB') def info(self): "Returns a dictionary containing information about the Redis server" - return self.format_inline('INFO') + return self.execute_command('INFO') def lastsave(self): """ Return a Python datetime object representing the last time the Redis database was saved to disk """ - return self.format_inline('LASTSAVE') + return self.execute_command('LASTSAVE') def ping(self): "Ping the Redis server" - return self.format_inline('PING') + return self.execute_command('PING') def save(self): """ Tell the Redis server to save its data to disk, blocking until the save is complete """ - return self.format_inline('SAVE') + return self.execute_command('SAVE') #### BASIC KEY COMMANDS #### def decr(self, name, amount=1): @@ -474,22 +462,22 @@ class Redis(threading.local): Decrements the value of ``key`` by ``amount``. If no key exists, the value will be initialized as 0 - ``amount`` """ - return self.format_inline('DECRBY', name, amount) + return self.execute_command('DECRBY', name, amount) def exists(self, name): "Returns a boolean indicating whether key ``name`` exists" - return self.format_inline('EXISTS', name) + return self.execute_command('EXISTS', name) __contains__ = exists def expire(self, name, time): "Set an expire on key ``name`` for ``time`` seconds" - return self.format_inline('EXPIRE', name, time) + return self.execute_command('EXPIRE', name, time) def get(self, name): """ Return the value at key ``name``, or None of the key doesn't exist """ - return self.format_inline('GET', name) + return self.execute_command('GET', name) __getitem__ = get def getset(self, name, value): @@ -497,18 +485,18 @@ class Redis(threading.local): Set the value at key ``name`` to ``value`` if key doesn't exist Return the value at key ``name`` atomically """ - return self.format_bulk('GETSET', name, value) + return self.execute_command('GETSET', name, value) def incr(self, name, amount=1): """ Increments the value of ``key`` by ``amount``. If no key exists, the value will be initialized as ``amount`` """ - return self.format_inline('INCRBY', name, amount) + return self.execute_command('INCRBY', name, amount) def keys(self, pattern='*'): "Returns a list of keys matching ``pattern``" - return self.format_inline('KEYS', pattern) + return self.execute_command('KEYS', pattern) def mget(self, keys, *args): """ @@ -517,13 +505,13 @@ class Redis(threading.local): * Passing *args to this method has been deprecated * """ keys = list_or_args('mget', keys, args) - return self.format_inline('MGET', *keys) + return self.execute_command('MGET', *keys) def mset(self, mapping): "Sets each key in the ``mapping`` dict to its corresponding value" items = [] [items.extend(pair) for pair in mapping.iteritems()] - return self.format_multi_bulk('MSET', *items) + return self.execute_command('MSET', *items) def msetnx(self, mapping): """ @@ -532,15 +520,15 @@ class Redis(threading.local): """ items = [] [items.extend(pair) for pair in mapping.iteritems()] - return self.format_multi_bulk('MSETNX', *items) + return self.execute_command('MSETNX', *items) def move(self, name, db): "Moves the key ``name`` to a different Redis database ``db``" - return self.format_inline('MOVE', name, db) + return self.execute_command('MOVE', name, db) def randomkey(self): "Returns the name of a random key" - return self.format_inline('RANDOMKEY') + return self.execute_command('RANDOMKEY') def rename(self, src, dst, **kwargs): """ @@ -557,11 +545,11 @@ class Redis(threading.local): "use Redis.renamenx instead")) if kwargs['preserve']: return self.renamenx(src, dst) - return self.format_inline('RENAME', src, dst) + return self.execute_command('RENAME', src, dst) def renamenx(self, src, dst): "Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist" - return self.format_inline('RENAMENX', src, dst) + return self.execute_command('RENAMENX', src, dst) def set(self, name, value, **kwargs): @@ -587,20 +575,20 @@ class Redis(threading.local): "use Redis.setnx() instead")) if kwargs['preserve']: return self.setnx(name, value) - return self.format_bulk('SET', name, value) + return self.execute_command('SET', name, value) __setitem__ = set def setnx(self, name, value): "Set the value of key ``name`` to ``value`` if key doesn't exist" - return self.format_bulk('SETNX', name, value) + return self.execute_command('SETNX', name, value) def ttl(self, name): "Returns the number of seconds until the key ``name`` will expire" - return self.format_inline('TTL', name) + return self.execute_command('TTL', name) def type(self, name): "Returns the type of key ``name``" - return self.format_inline('TYPE', name) + return self.execute_command('TYPE', name) #### LIST COMMANDS #### @@ -617,7 +605,7 @@ class Redis(threading.local): """ keys = list(keys) keys.append(timeout) - return self.format_inline('BLPOP', *keys) + return self.execute_command('BLPOP', *keys) def brpop(self, keys, timeout=0): """ @@ -632,7 +620,7 @@ class Redis(threading.local): """ keys = list(keys) keys.append(timeout) - return self.format_inline('BRPOP', *keys) + return self.execute_command('BRPOP', *keys) def lindex(self, name, index): """ @@ -641,19 +629,19 @@ class Redis(threading.local): Negative indexes are supported and will return an item at the end of the list """ - return self.format_inline('LINDEX', name, index) + return self.execute_command('LINDEX', name, index) def llen(self, name): "Return the length of the list ``name``" - return self.format_inline('LLEN', name) + return self.execute_command('LLEN', name) def lpop(self, name): "Remove and return the first item of the list ``name``" - return self.format_inline('LPOP', name) + return self.execute_command('LPOP', name) def lpush(self, name, value): "Push ``value`` onto the head of the list ``name``" - return self.format_bulk('LPUSH', name, value) + return self.execute_command('LPUSH', name, value) def lrange(self, name, start, end): """ @@ -663,7 +651,7 @@ class Redis(threading.local): ``start`` and ``end`` can be negative numbers just like Python slicing notation """ - return self.format_inline('LRANGE', name, start, end) + return self.execute_command('LRANGE', name, start, end) def lrem(self, name, value, num=0): """ @@ -671,11 +659,11 @@ class Redis(threading.local): If ``num`` is 0, then all occurrences will be removed """ - return self.format_bulk('LREM', name, num, value) + return self.execute_command('LREM', name, num, value) def lset(self, name, index, value): "Set ``position`` of list ``name`` to ``value``" - return self.format_bulk('LSET', name, index, value) + return self.execute_command('LSET', name, index, value) def ltrim(self, name, start, end): """ @@ -685,7 +673,7 @@ class Redis(threading.local): ``start`` and ``end`` can be negative numbers just like Python slicing notation """ - return self.format_inline('LTRIM', name, start, end) + return self.execute_command('LTRIM', name, start, end) def pop(self, name, tail=False): """ @@ -717,18 +705,18 @@ class Redis(threading.local): def rpop(self, name): "Remove and return the last item of the list ``name``" - return self.format_inline('RPOP', name) + return self.execute_command('RPOP', name) def rpoplpush(self, src, dst): """ RPOP a value off of the ``src`` list and atomically LPUSH it on to the ``dst`` list. Returns the value. """ - return self.format_inline('RPOPLPUSH', src, dst) + return self.execute_command('RPOPLPUSH', src, dst) def rpush(self, name, value): "Push ``value`` onto the tail of the list ``name``" - return self.format_bulk('RPUSH', name, value) + return self.execute_command('RPUSH', name, value) def sort(self, name, start=None, num=None, by=None, get=None, desc=False, alpha=False, store=None): @@ -757,33 +745,38 @@ class Redis(threading.local): pieces = [name] if by is not None: - pieces.append('BY %s' % by) + pieces.append('BY') + pieces.append(by) if start is not None and num is not None: - pieces.append('LIMIT %s %s' % (start, num)) + pieces.append('LIMIT') + pieces.append(start) + pieces.append(num) if get is not None: - pieces.append('GET %s' % get) + pieces.append('GET') + pieces.append(get) if desc: pieces.append('DESC') if alpha: pieces.append('ALPHA') if store is not None: - pieces.append('STORE %s' % store) - return self.format_inline('SORT', *pieces) + pieces.append('STORE') + pieces.append(store) + return self.execute_command('SORT', *pieces) #### SET COMMANDS #### def sadd(self, name, value): "Add ``value`` to set ``name``" - return self.format_bulk('SADD', name, value) + return self.execute_command('SADD', name, value) def scard(self, name): "Return the number of elements in set ``name``" - return self.format_inline('SCARD', name) + return self.execute_command('SCARD', name) def sdiff(self, keys, *args): "Return the difference of sets specified by ``keys``" keys = list_or_args('sdiff', keys, args) - return self.format_inline('SDIFF', *keys) + return self.execute_command('SDIFF', *keys) def sdiffstore(self, dest, keys, *args): """ @@ -791,12 +784,12 @@ class Redis(threading.local): set named ``dest``. Returns the number of keys in the new set. """ keys = list_or_args('sdiffstore', keys, args) - return self.format_inline('SDIFFSTORE', dest, *keys) + return self.execute_command('SDIFFSTORE', dest, *keys) def sinter(self, keys, *args): "Return the intersection of sets specified by ``keys``" keys = list_or_args('sinter', keys, args) - return self.format_inline('SINTER', *keys) + return self.execute_command('SINTER', *keys) def sinterstore(self, dest, keys, *args): """ @@ -804,36 +797,36 @@ class Redis(threading.local): set named ``dest``. Returns the number of keys in the new set. """ keys = list_or_args('sinterstore', keys, args) - return self.format_inline('SINTERSTORE', dest, *keys) + return self.execute_command('SINTERSTORE', dest, *keys) def sismember(self, name, value): "Return a boolean indicating if ``value`` is a member of set ``name``" - return self.format_bulk('SISMEMBER', name, value) + return self.execute_command('SISMEMBER', name, value) def smembers(self, name): "Return all members of the set ``name``" - return self.format_inline('SMEMBERS', name) + return self.execute_command('SMEMBERS', name) def smove(self, src, dst, value): "Move ``value`` from set ``src`` to set ``dst`` atomically" - return self.format_bulk('SMOVE', src, dst, value) + return self.execute_command('SMOVE', src, dst, value) def spop(self, name): "Remove and return a random member of set ``name``" - return self.format_inline('SPOP', name) + return self.execute_command('SPOP', name) def srandmember(self, name): "Return a random member of set ``name``" - return self.format_inline('SRANDMEMBER', name) + return self.execute_command('SRANDMEMBER', name) def srem(self, name, value): "Remove ``value`` from set ``name``" - return self.format_bulk('SREM', name, value) + return self.execute_command('SREM', name, value) def sunion(self, keys, *args): "Return the union of sets specifiued by ``keys``" keys = list_or_args('sunion', keys, args) - return self.format_inline('SUNION', *keys) + return self.execute_command('SUNION', *keys) def sunionstore(self, dest, keys, *args): """ @@ -841,17 +834,17 @@ class Redis(threading.local): set named ``dest``. Returns the number of keys in the new set. """ keys = list_or_args('sunionstore', keys, args) - return self.format_inline('SUNIONSTORE', dest, *keys) + return self.execute_command('SUNIONSTORE', dest, *keys) #### SORTED SET COMMANDS #### def zadd(self, name, value, score): "Add member ``value`` with score ``score`` to sorted set ``name``" - return self.format_bulk('ZADD', name, score, value) + return self.execute_command('ZADD', name, score, value) def zcard(self, name): "Return the number of elements in the sorted set ``name``" - return self.format_inline('ZCARD', name) + return self.execute_command('ZCARD', name) def zincr(self, key, member, value=1): "This has been deprecated, use zincrby instead" @@ -862,7 +855,7 @@ class Redis(threading.local): def zincrby(self, name, value, amount=1): "Increment the score of ``value`` in sorted set ``name`` by ``amount``" - return self.format_bulk('ZINCRBY', name, amount, value) + return self.execute_command('ZINCRBY', name, amount, value) def zrange(self, name, start, end, desc=False, withscores=False): """ @@ -881,7 +874,7 @@ class Redis(threading.local): pieces = ['ZRANGE', name, start, end] if withscores: pieces.append('withscores') - return self.format_inline(*pieces, **{'withscores': withscores}) + return self.execute_command(*pieces, **{'withscores': withscores}) def zrangebyscore(self, name, min, max, start=None, num=None, withscores=False): @@ -902,25 +895,25 @@ class Redis(threading.local): pieces.extend(['LIMIT', start, num]) if withscores: pieces.append('withscores') - return self.format_inline(*pieces, **{'withscores': withscores}) + return self.execute_command(*pieces, **{'withscores': withscores}) def zrank(self, name, value): """ Returns a 0-based value indicating the rank of ``value`` in sorted set ``name`` """ - return self.format_bulk('ZRANK', name, value) + return self.execute_command('ZRANK', name, value) def zrem(self, name, value): "Remove member ``value`` from sorted set ``name``" - return self.format_bulk('ZREM', name, value) + return self.execute_command('ZREM', name, value) def zremrangebyscore(self, name, min, max): """ Remove all elements in the sorted set ``name`` with scores between ``min`` and ``max`` """ - return self.format_inline('ZREMRANGEBYSCORE', name, min, max) + return self.execute_command('ZREMRANGEBYSCORE', name, min, max) def zrevrange(self, name, start, num, withscores=False): """ @@ -935,59 +928,59 @@ class Redis(threading.local): pieces = ['ZREVRANGE', name, start, num] if withscores: pieces.append('withscores') - return self.format_inline(*pieces, **{'withscores': withscores}) + return self.execute_command(*pieces, **{'withscores': withscores}) def zrevrank(self, name, value): """ Returns a 0-based value indicating the descending rank of ``value`` in sorted set ``name`` """ - return self.format_bulk('ZREVRANK', name, value) + return self.execute_command('ZREVRANK', name, value) def zscore(self, name, value): "Return the score of element ``value`` in sorted set ``name``" - return self.format_bulk('ZSCORE', name, value) + return self.execute_command('ZSCORE', name, value) #### HASH COMMANDS #### def hdel(self, name, key): "Delete ``key`` from hash ``name``" - return self.format_bulk('HDEL', name, key) + return self.execute_command('HDEL', name, key) def hexists(self, name, key): "Returns a boolean indicating if ``key`` exists within hash ``name``" - return self.format_bulk('HEXISTS', name, key) + return self.execute_command('HEXISTS', name, key) def hget(self, name, key): "Return the value of ``key`` within the hash ``name``" - return self.format_bulk('HGET', name, key) + return self.execute_command('HGET', name, key) def hgetall(self, name): "Return a Python dict of the hash's name/value pairs" - return self.format_inline('HGETALL', name) + return self.execute_command('HGETALL', name) def hincrby(self, name, key, amount=1): "Increment the value of ``key`` in hash ``name`` by ``amount``" - return self.format_inline('HINCRBY', name, key, amount) + return self.execute_command('HINCRBY', name, key, amount) def hkeys(self, name): "Return the list of keys within hash ``name``" - return self.format_inline('HKEYS', name) + return self.execute_command('HKEYS', name) def hlen(self, name): "Return the number of elements in hash ``name``" - return self.format_inline('HLEN', name) + return self.execute_command('HLEN', name) def hset(self, name, key, value): """ Set ``key`` to ``value`` within hash ``name`` Returns 1 if HSET created a new field, otherwise 0 """ - return self.format_multi_bulk('HSET', name, key, value) + return self.execute_command('HSET', name, key, value) def hvals(self, name): "Return the list of values within hash ``name``" - return self.format_inline('HVALS', name) + return self.execute_command('HVALS', name) # channels @@ -995,7 +988,7 @@ class Redis(threading.local): "Subscribe to ``channels``, waiting for messages to be published" if isinstance(channels, basestring): channels = [channels] - response = self.format_inline('SUBSCRIBE', *channels) + response = self.execute_command('SUBSCRIBE', *channels) # this is *after* the SUBSCRIBE in order to allow for lazy and broken # connections that need to issue AUTH and SELECT commands self.subscribed = True @@ -1005,14 +998,14 @@ class Redis(threading.local): "Unsubscribe to ``channels``. If empty, unsubscribe from all channels" if isinstance(channels, basestring): channels = [channels] - return self.format_inline('UNSUBSCRIBE', *channels) + return self.execute_command('UNSUBSCRIBE', *channels) def publish(self, channel, message): """ Publish ``message`` on ``channel``. Returns the number of subscribers the message was delivered to. """ - return self.format_bulk('PUBLISH', channel, message) + return self.execute_command('PUBLISH', channel, message) def listen(self): "Listen for messages on channels this client has been subscribed to" @@ -1051,9 +1044,9 @@ class Pipeline(Redis): def reset(self): self.command_stack = [] - self.format_inline('MULTI') + self.execute_command('MULTI') - def execute_command(self, command_name, command, **options): + def _execute_command(self, command_name, command, **options): """ Stage a command to be executed when execute() is next called @@ -1070,7 +1063,7 @@ class Pipeline(Redis): # _setup_connection(). run these commands immediately without # buffering them. if command_name in ('AUTH', 'SELECT'): - return super(Pipeline, self).execute_command( + return super(Pipeline, self)._execute_command( command_name, command, **options) else: self.command_stack.append((command_name, command, options)) @@ -1103,7 +1096,7 @@ class Pipeline(Redis): def execute(self): "Execute all the commands in the current pipeline" - self.format_inline('EXEC') + self.execute_command('EXEC') stack = self.command_stack self.reset() try: |