summaryrefslogtreecommitdiff
path: root/redis/commands.py
diff options
context:
space:
mode:
authorChayim <chayim@users.noreply.github.com>2021-10-25 17:06:04 +0300
committerGitHub <noreply@github.com>2021-10-25 17:06:04 +0300
commit3946da29d7e451a20289fb6e282516fa24e402af (patch)
tree25cf4b73b4e00d66c75288790616ea882823e2b7 /redis/commands.py
parent0ef4c0711693b4b313ce97261214bd151d8261d5 (diff)
downloadredis-py-3946da29d7e451a20289fb6e282516fa24e402af.tar.gz
redisjson support (#1636)
Diffstat (limited to 'redis/commands.py')
-rw-r--r--redis/commands.py3511
1 files changed, 0 insertions, 3511 deletions
diff --git a/redis/commands.py b/redis/commands.py
deleted file mode 100644
index 2697e78..0000000
--- a/redis/commands.py
+++ /dev/null
@@ -1,3511 +0,0 @@
-import datetime
-import time
-import warnings
-import hashlib
-
-from redis.exceptions import (
- ConnectionError,
- DataError,
- NoScriptError,
- RedisError,
-)
-
-
-def list_or_args(keys, args):
- # returns a single new list combining keys and args
- try:
- iter(keys)
- # a string or bytes instance can be iterated, but indicates
- # keys wasn't passed as a list
- if isinstance(keys, (bytes, str)):
- keys = [keys]
- else:
- keys = list(keys)
- except TypeError:
- keys = [keys]
- if args:
- keys.extend(args)
- return keys
-
-
-class Commands:
- """
- A class containing all of the implemented redis commands. This class is
- to be used as a mixin.
- """
-
- # SERVER INFORMATION
-
- # ACL methods
- def acl_cat(self, category=None):
- """
- Returns a list of categories or commands within a category.
-
- If ``category`` is not supplied, returns a list of all categories.
- If ``category`` is supplied, returns a list of all commands within
- that category.
- """
- pieces = [category] if category else []
- return self.execute_command('ACL CAT', *pieces)
-
- def acl_deluser(self, *username):
- "Delete the ACL for the specified ``username``s"
- return self.execute_command('ACL DELUSER', *username)
-
- def acl_genpass(self, bits=None):
- """Generate a random password value.
- If ``bits`` is supplied then use this number of bits, rounded to
- the next multiple of 4.
- See: https://redis.io/commands/acl-genpass
- """
- pieces = []
- if bits is not None:
- try:
- b = int(bits)
- if b < 0 or b > 4096:
- raise ValueError
- except ValueError:
- raise DataError('genpass optionally accepts a bits argument, '
- 'between 0 and 4096.')
- return self.execute_command('ACL GENPASS', *pieces)
-
- def acl_getuser(self, username):
- """
- Get the ACL details for the specified ``username``.
-
- If ``username`` does not exist, return None
- """
- return self.execute_command('ACL GETUSER', username)
-
- def acl_help(self):
- """The ACL HELP command returns helpful text describing
- the different subcommands.
- """
- return self.execute_command('ACL HELP')
-
- def acl_list(self):
- "Return a list of all ACLs on the server"
- return self.execute_command('ACL LIST')
-
- def acl_log(self, count=None):
- """
- Get ACL logs as a list.
- :param int count: Get logs[0:count].
- :rtype: List.
- """
- args = []
- if count is not None:
- if not isinstance(count, int):
- raise DataError('ACL LOG count must be an '
- 'integer')
- args.append(count)
-
- return self.execute_command('ACL LOG', *args)
-
- def acl_log_reset(self):
- """
- Reset ACL logs.
- :rtype: Boolean.
- """
- args = [b'RESET']
- return self.execute_command('ACL LOG', *args)
-
- def acl_load(self):
- """
- Load ACL rules from the configured ``aclfile``.
-
- Note that the server must be configured with the ``aclfile``
- directive to be able to load ACL rules from an aclfile.
- """
- return self.execute_command('ACL LOAD')
-
- def acl_save(self):
- """
- Save ACL rules to the configured ``aclfile``.
-
- Note that the server must be configured with the ``aclfile``
- directive to be able to save ACL rules to an aclfile.
- """
- return self.execute_command('ACL SAVE')
-
- def acl_setuser(self, username, enabled=False, nopass=False,
- passwords=None, hashed_passwords=None, categories=None,
- commands=None, keys=None, reset=False, reset_keys=False,
- reset_passwords=False):
- """
- Create or update an ACL user.
-
- Create or update the ACL for ``username``. If the user already exists,
- the existing ACL is completely overwritten and replaced with the
- specified values.
-
- ``enabled`` is a boolean indicating whether the user should be allowed
- to authenticate or not. Defaults to ``False``.
-
- ``nopass`` is a boolean indicating whether the can authenticate without
- a password. This cannot be True if ``passwords`` are also specified.
-
- ``passwords`` if specified is a list of plain text passwords
- to add to or remove from the user. Each password must be prefixed with
- a '+' to add or a '-' to remove. For convenience, the value of
- ``passwords`` can be a simple prefixed string when adding or
- removing a single password.
-
- ``hashed_passwords`` if specified is a list of SHA-256 hashed passwords
- to add to or remove from the user. Each hashed password must be
- prefixed with a '+' to add or a '-' to remove. For convenience,
- the value of ``hashed_passwords`` can be a simple prefixed string when
- adding or removing a single password.
-
- ``categories`` if specified is a list of strings representing category
- permissions. Each string must be prefixed with either a '+' to add the
- category permission or a '-' to remove the category permission.
-
- ``commands`` if specified is a list of strings representing command
- permissions. Each string must be prefixed with either a '+' to add the
- command permission or a '-' to remove the command permission.
-
- ``keys`` if specified is a list of key patterns to grant the user
- access to. Keys patterns allow '*' to support wildcard matching. For
- example, '*' grants access to all keys while 'cache:*' grants access
- to all keys that are prefixed with 'cache:'. ``keys`` should not be
- prefixed with a '~'.
-
- ``reset`` is a boolean indicating whether the user should be fully
- reset prior to applying the new ACL. Setting this to True will
- remove all existing passwords, flags and privileges from the user and
- then apply the specified rules. If this is False, the user's existing
- passwords, flags and privileges will be kept and any new specified
- rules will be applied on top.
-
- ``reset_keys`` is a boolean indicating whether the user's key
- permissions should be reset prior to applying any new key permissions
- specified in ``keys``. If this is False, the user's existing
- key permissions will be kept and any new specified key permissions
- will be applied on top.
-
- ``reset_passwords`` is a boolean indicating whether to remove all
- existing passwords and the 'nopass' flag from the user prior to
- applying any new passwords specified in 'passwords' or
- 'hashed_passwords'. If this is False, the user's existing passwords
- and 'nopass' status will be kept and any new specified passwords
- or hashed_passwords will be applied on top.
- """
- encoder = self.connection_pool.get_encoder()
- pieces = [username]
-
- if reset:
- pieces.append(b'reset')
-
- if reset_keys:
- pieces.append(b'resetkeys')
-
- if reset_passwords:
- pieces.append(b'resetpass')
-
- if enabled:
- pieces.append(b'on')
- else:
- pieces.append(b'off')
-
- if (passwords or hashed_passwords) and nopass:
- raise DataError('Cannot set \'nopass\' and supply '
- '\'passwords\' or \'hashed_passwords\'')
-
- if passwords:
- # as most users will have only one password, allow remove_passwords
- # to be specified as a simple string or a list
- passwords = list_or_args(passwords, [])
- for i, password in enumerate(passwords):
- password = encoder.encode(password)
- if password.startswith(b'+'):
- pieces.append(b'>%s' % password[1:])
- elif password.startswith(b'-'):
- pieces.append(b'<%s' % password[1:])
- else:
- raise DataError('Password %d must be prefixeed with a '
- '"+" to add or a "-" to remove' % i)
-
- if hashed_passwords:
- # as most users will have only one password, allow remove_passwords
- # to be specified as a simple string or a list
- hashed_passwords = list_or_args(hashed_passwords, [])
- for i, hashed_password in enumerate(hashed_passwords):
- hashed_password = encoder.encode(hashed_password)
- if hashed_password.startswith(b'+'):
- pieces.append(b'#%s' % hashed_password[1:])
- elif hashed_password.startswith(b'-'):
- pieces.append(b'!%s' % hashed_password[1:])
- else:
- raise DataError('Hashed %d password must be prefixeed '
- 'with a "+" to add or a "-" to remove' % i)
-
- if nopass:
- pieces.append(b'nopass')
-
- if categories:
- for category in categories:
- category = encoder.encode(category)
- # categories can be prefixed with one of (+@, +, -@, -)
- if category.startswith(b'+@'):
- pieces.append(category)
- elif category.startswith(b'+'):
- pieces.append(b'+@%s' % category[1:])
- elif category.startswith(b'-@'):
- pieces.append(category)
- elif category.startswith(b'-'):
- pieces.append(b'-@%s' % category[1:])
- else:
- raise DataError('Category "%s" must be prefixed with '
- '"+" or "-"'
- % encoder.decode(category, force=True))
- if commands:
- for cmd in commands:
- cmd = encoder.encode(cmd)
- if not cmd.startswith(b'+') and not cmd.startswith(b'-'):
- raise DataError('Command "%s" must be prefixed with '
- '"+" or "-"'
- % encoder.decode(cmd, force=True))
- pieces.append(cmd)
-
- if keys:
- for key in keys:
- key = encoder.encode(key)
- pieces.append(b'~%s' % key)
-
- return self.execute_command('ACL SETUSER', *pieces)
-
- def acl_users(self):
- "Returns a list of all registered users on the server."
- return self.execute_command('ACL USERS')
-
- def acl_whoami(self):
- "Get the username for the current connection"
- return self.execute_command('ACL WHOAMI')
-
- def bgrewriteaof(self):
- "Tell the Redis server to rewrite the AOF file from data in memory."
- return self.execute_command('BGREWRITEAOF')
-
- def bgsave(self, schedule=True):
- """
- Tell the Redis server to save its data to disk. Unlike save(),
- this method is asynchronous and returns immediately.
- """
- pieces = []
- if schedule:
- pieces.append("SCHEDULE")
- return self.execute_command('BGSAVE', *pieces)
-
- def client_kill(self, address):
- "Disconnects the client at ``address`` (ip:port)"
- return self.execute_command('CLIENT KILL', address)
-
- def client_kill_filter(self, _id=None, _type=None, addr=None,
- skipme=None, laddr=None, user=None):
- """
- Disconnects client(s) using a variety of filter options
- :param id: Kills a client by its unique ID field
- :param type: Kills a client by type where type is one of 'normal',
- 'master', 'slave' or 'pubsub'
- :param addr: Kills a client by its 'address:port'
- :param skipme: If True, then the client calling the command
- will not get killed even if it is identified by one of the filter
- options. If skipme is not provided, the server defaults to skipme=True
- :param laddr: Kills a client by its 'local (bind) address:port'
- :param user: Kills a client for a specific user name
- """
- args = []
- if _type is not None:
- client_types = ('normal', 'master', 'slave', 'pubsub')
- if str(_type).lower() not in client_types:
- raise DataError("CLIENT KILL type must be one of %r" % (
- client_types,))
- args.extend((b'TYPE', _type))
- if skipme is not None:
- if not isinstance(skipme, bool):
- raise DataError("CLIENT KILL skipme must be a bool")
- if skipme:
- args.extend((b'SKIPME', b'YES'))
- else:
- args.extend((b'SKIPME', b'NO'))
- if _id is not None:
- args.extend((b'ID', _id))
- if addr is not None:
- args.extend((b'ADDR', addr))
- if laddr is not None:
- args.extend((b'LADDR', laddr))
- if user is not None:
- args.extend((b'USER', user))
- if not args:
- raise DataError("CLIENT KILL <filter> <value> ... ... <filter> "
- "<value> must specify at least one filter")
- return self.execute_command('CLIENT KILL', *args)
-
- def client_info(self):
- """
- Returns information and statistics about the current
- client connection.
- """
- return self.execute_command('CLIENT INFO')
-
- def client_list(self, _type=None, client_id=[]):
- """
- Returns a list of currently connected clients.
- If type of client specified, only that type will be returned.
- :param _type: optional. one of the client types (normal, master,
- replica, pubsub)
- :param client_id: optional. a list of client ids
- """
- "Returns a list of currently connected clients"
- args = []
- if _type is not None:
- client_types = ('normal', 'master', 'replica', 'pubsub')
- if str(_type).lower() not in client_types:
- raise DataError("CLIENT LIST _type must be one of %r" % (
- client_types,))
- args.append(b'TYPE')
- args.append(_type)
- if not isinstance(client_id, list):
- raise DataError("client_id must be a list")
- if client_id != []:
- args.append(b"ID")
- args.append(' '.join(client_id))
- return self.execute_command('CLIENT LIST', *args)
-
- def client_getname(self):
- """Returns the current connection name"""
- return self.execute_command('CLIENT GETNAME')
-
- def client_getredir(self):
- """Returns the ID (an integer) of the client to whom we are
- redirecting tracking notifications.
-
- see: https://redis.io/commands/client-getredir
- """
- return self.execute_command('CLIENT GETREDIR')
-
- def client_reply(self, reply):
- """Enable and disable redis server replies.
- ``reply`` Must be ON OFF or SKIP,
- ON - The default most with server replies to commands
- OFF - Disable server responses to commands
- SKIP - Skip the response of the immediately following command.
-
- Note: When setting OFF or SKIP replies, you will need a client object
- with a timeout specified in seconds, and will need to catch the
- TimeoutError.
- The test_client_reply unit test illustrates this, and
- conftest.py has a client with a timeout.
- See https://redis.io/commands/client-reply
- """
- replies = ['ON', 'OFF', 'SKIP']
- if reply not in replies:
- raise DataError('CLIENT REPLY must be one of %r' % replies)
- return self.execute_command("CLIENT REPLY", reply)
-
- def client_id(self):
- """Returns the current connection id"""
- return self.execute_command('CLIENT ID')
-
- def client_trackinginfo(self):
- """
- Returns the information about the current client connection's
- use of the server assisted client side cache.
- See https://redis.io/commands/client-trackinginfo
- """
- return self.execute_command('CLIENT TRACKINGINFO')
-
- def client_setname(self, name):
- "Sets the current connection name"
- return self.execute_command('CLIENT SETNAME', name)
-
- def client_unblock(self, client_id, error=False):
- """
- Unblocks a connection by its client id.
- If ``error`` is True, unblocks the client with a special error message.
- If ``error`` is False (default), the client is unblocked using the
- regular timeout mechanism.
- """
- args = ['CLIENT UNBLOCK', int(client_id)]
- if error:
- args.append(b'ERROR')
- return self.execute_command(*args)
-
- def client_pause(self, timeout):
- """
- Suspend all the Redis clients for the specified amount of time
- :param timeout: milliseconds to pause clients
- """
- if not isinstance(timeout, int):
- raise DataError("CLIENT PAUSE timeout must be an integer")
- return self.execute_command('CLIENT PAUSE', str(timeout))
-
- def client_unpause(self):
- """
- Unpause all redis clients
- """
- return self.execute_command('CLIENT UNPAUSE')
-
- def readwrite(self):
- """
- Disables read queries for a connection to a Redis Cluster slave node.
- """
- return self.execute_command('READWRITE')
-
- def readonly(self):
- """
- Enables read queries for a connection to a Redis Cluster replica node.
- """
- return self.execute_command('READONLY')
-
- def config_get(self, pattern="*"):
- """Return a dictionary of configuration based on the ``pattern``"""
- return self.execute_command('CONFIG GET', pattern)
-
- def config_set(self, name, value):
- "Set config item ``name`` with ``value``"
- return self.execute_command('CONFIG SET', name, value)
-
- def config_resetstat(self):
- """Reset runtime statistics"""
- return self.execute_command('CONFIG RESETSTAT')
-
- def config_rewrite(self):
- """
- Rewrite config file with the minimal change to reflect running config.
- """
- return self.execute_command('CONFIG REWRITE')
-
- def dbsize(self):
- """Returns the number of keys in the current database"""
- return self.execute_command('DBSIZE')
-
- def debug_object(self, key):
- """Returns version specific meta information about a given key"""
- return self.execute_command('DEBUG OBJECT', key)
-
- def debug_segfault(self):
- raise NotImplementedError(
- "DEBUG SEGFAULT is intentionally not implemented in the client."
- )
-
- def echo(self, value):
- """Echo the string back from the server"""
- return self.execute_command('ECHO', value)
-
- def flushall(self, asynchronous=False):
- """
- Delete all keys in all databases on the current host.
-
- ``asynchronous`` indicates whether the operation is
- executed asynchronously by the server.
- """
- args = []
- if asynchronous:
- args.append(b'ASYNC')
- return self.execute_command('FLUSHALL', *args)
-
- def flushdb(self, asynchronous=False):
- """
- Delete all keys in the current database.
-
- ``asynchronous`` indicates whether the operation is
- executed asynchronously by the server.
- """
- args = []
- if asynchronous:
- args.append(b'ASYNC')
- return self.execute_command('FLUSHDB', *args)
-
- def swapdb(self, first, second):
- "Swap two databases"
- return self.execute_command('SWAPDB', first, second)
-
- def info(self, section=None):
- """
- Returns a dictionary containing information about the Redis server
-
- The ``section`` option can be used to select a specific section
- of information
-
- The section option is not supported by older versions of Redis Server,
- and will generate ResponseError
- """
- if section is None:
- return self.execute_command('INFO')
- else:
- return self.execute_command('INFO', section)
-
- def lastsave(self):
- """
- Return a Python datetime object representing the last time the
- Redis database was saved to disk
- """
- return self.execute_command('LASTSAVE')
-
- def lolwut(self, *version_numbers):
- """
- Get the Redis version and a piece of generative computer art
- See: https://redis.io/commands/lolwut
- """
- if version_numbers:
- return self.execute_command('LOLWUT VERSION', *version_numbers)
- else:
- return self.execute_command('LOLWUT')
-
- def migrate(self, host, port, keys, destination_db, timeout,
- copy=False, replace=False, auth=None):
- """
- Migrate 1 or more keys from the current Redis server to a different
- server specified by the ``host``, ``port`` and ``destination_db``.
-
- The ``timeout``, specified in milliseconds, indicates the maximum
- time the connection between the two servers can be idle before the
- command is interrupted.
-
- If ``copy`` is True, the specified ``keys`` are NOT deleted from
- the source server.
-
- If ``replace`` is True, this operation will overwrite the keys
- on the destination server if they exist.
-
- If ``auth`` is specified, authenticate to the destination server with
- the password provided.
- """
- keys = list_or_args(keys, [])
- if not keys:
- raise DataError('MIGRATE requires at least one key')
- pieces = []
- if copy:
- pieces.append(b'COPY')
- if replace:
- pieces.append(b'REPLACE')
- if auth:
- pieces.append(b'AUTH')
- pieces.append(auth)
- pieces.append(b'KEYS')
- pieces.extend(keys)
- return self.execute_command('MIGRATE', host, port, '', destination_db,
- timeout, *pieces)
-
- def object(self, infotype, key):
- """Return the encoding, idletime, or refcount about the key"""
- return self.execute_command('OBJECT', infotype, key, infotype=infotype)
-
- def memory_doctor(self):
- raise NotImplementedError(
- "MEMORY DOCTOR is intentionally not implemented in the client."
- )
-
- def memory_help(self):
- raise NotImplementedError(
- "MEMORY HELP is intentionally not implemented in the client."
- )
-
- def memory_stats(self):
- """Return a dictionary of memory stats"""
- return self.execute_command('MEMORY STATS')
-
- def memory_malloc_stats(self):
- """Return an internal statistics report from the memory allocator."""
- return self.execute_command('MEMORY MALLOC-STATS')
-
- def memory_usage(self, key, samples=None):
- """
- Return the total memory usage for key, its value and associated
- administrative overheads.
-
- For nested data structures, ``samples`` is the number of elements to
- sample. If left unspecified, the server's default is 5. Use 0 to sample
- all elements.
- """
- args = []
- if isinstance(samples, int):
- args.extend([b'SAMPLES', samples])
- return self.execute_command('MEMORY USAGE', key, *args)
-
- def memory_purge(self):
- """Attempts to purge dirty pages for reclamation by allocator"""
- return self.execute_command('MEMORY PURGE')
-
- def ping(self):
- """Ping the Redis server"""
- return self.execute_command('PING')
-
- def quit(self):
- """
- Ask the server to close the connection.
- https://redis.io/commands/quit
- """
- return self.execute_command('QUIT')
-
- def save(self):
- """
- Tell the Redis server to save its data to disk,
- blocking until the save is complete
- """
- return self.execute_command('SAVE')
-
- def shutdown(self, save=False, nosave=False):
- """Shutdown the Redis server. If Redis has persistence configured,
- data will be flushed before shutdown. If the "save" option is set,
- a data flush will be attempted even if there is no persistence
- configured. If the "nosave" option is set, no data flush will be
- attempted. The "save" and "nosave" options cannot both be set.
- """
- if save and nosave:
- raise DataError('SHUTDOWN save and nosave cannot both be set')
- args = ['SHUTDOWN']
- if save:
- args.append('SAVE')
- if nosave:
- args.append('NOSAVE')
- try:
- self.execute_command(*args)
- except ConnectionError:
- # a ConnectionError here is expected
- return
- raise RedisError("SHUTDOWN seems to have failed.")
-
- def slaveof(self, host=None, port=None):
- """
- Set the server to be a replicated slave of the instance identified
- by the ``host`` and ``port``. If called without arguments, the
- instance is promoted to a master instead.
- """
- if host is None and port is None:
- return self.execute_command('SLAVEOF', b'NO', b'ONE')
- return self.execute_command('SLAVEOF', host, port)
-
- def slowlog_get(self, num=None):
- """
- Get the entries from the slowlog. If ``num`` is specified, get the
- most recent ``num`` items.
- """
- args = ['SLOWLOG GET']
- if num is not None:
- args.append(num)
- decode_responses = self.connection_pool.connection_kwargs.get(
- 'decode_responses', False)
- return self.execute_command(*args, decode_responses=decode_responses)
-
- def slowlog_len(self):
- "Get the number of items in the slowlog"
- return self.execute_command('SLOWLOG LEN')
-
- def slowlog_reset(self):
- "Remove all items in the slowlog"
- return self.execute_command('SLOWLOG RESET')
-
- def time(self):
- """
- Returns the server time as a 2-item tuple of ints:
- (seconds since epoch, microseconds into this second).
- """
- return self.execute_command('TIME')
-
- def wait(self, num_replicas, timeout):
- """
- Redis synchronous replication
- That returns the number of replicas that processed the query when
- we finally have at least ``num_replicas``, or when the ``timeout`` was
- reached.
- """
- return self.execute_command('WAIT', num_replicas, timeout)
-
- # BASIC KEY COMMANDS
- def append(self, key, value):
- """
- Appends the string ``value`` to the value at ``key``. If ``key``
- doesn't already exist, create it with a value of ``value``.
- Returns the new length of the value at ``key``.
- """
- return self.execute_command('APPEND', key, value)
-
- def bitcount(self, key, start=None, end=None):
- """
- Returns the count of set bits in the value of ``key``. Optional
- ``start`` and ``end`` parameters indicate which bytes to consider
- """
- params = [key]
- if start is not None and end is not None:
- params.append(start)
- params.append(end)
- elif (start is not None and end is None) or \
- (end is not None and start is None):
- raise DataError("Both start and end must be specified")
- return self.execute_command('BITCOUNT', *params)
-
- def bitfield(self, key, default_overflow=None):
- """
- Return a BitFieldOperation instance to conveniently construct one or
- more bitfield operations on ``key``.
- """
- return BitFieldOperation(self, key, default_overflow=default_overflow)
-
- def bitop(self, operation, dest, *keys):
- """
- Perform a bitwise operation using ``operation`` between ``keys`` and
- store the result in ``dest``.
- """
- return self.execute_command('BITOP', operation, dest, *keys)
-
- def bitpos(self, key, bit, start=None, end=None):
- """
- Return the position of the first bit set to 1 or 0 in a string.
- ``start`` and ``end`` defines search range. The range is interpreted
- as a range of bytes and not a range of bits, so start=0 and end=2
- means to look at the first three bytes.
- """
- if bit not in (0, 1):
- raise DataError('bit must be 0 or 1')
- params = [key, bit]
-
- start is not None and params.append(start)
-
- if start is not None and end is not None:
- params.append(end)
- elif start is None and end is not None:
- raise DataError("start argument is not set, "
- "when end is specified")
- return self.execute_command('BITPOS', *params)
-
- def copy(self, source, destination, destination_db=None, replace=False):
- """
- Copy the value stored in the ``source`` key to the ``destination`` key.
-
- ``destination_db`` an alternative destination database. By default,
- the ``destination`` key is created in the source Redis database.
-
- ``replace`` whether the ``destination`` key should be removed before
- copying the value to it. By default, the value is not copied if
- the ``destination`` key already exists.
- """
- params = [source, destination]
- if destination_db is not None:
- params.extend(["DB", destination_db])
- if replace:
- params.append("REPLACE")
- return self.execute_command('COPY', *params)
-
- def decr(self, name, amount=1):
- """
- Decrements the value of ``key`` by ``amount``. If no key exists,
- the value will be initialized as 0 - ``amount``
- """
- # An alias for ``decr()``, because it is already implemented
- # as DECRBY redis command.
- return self.decrby(name, amount)
-
- def decrby(self, name, amount=1):
- """
- Decrements the value of ``key`` by ``amount``. If no key exists,
- the value will be initialized as 0 - ``amount``
- """
- return self.execute_command('DECRBY', name, amount)
-
- def delete(self, *names):
- "Delete one or more keys specified by ``names``"
- return self.execute_command('DEL', *names)
-
- def __delitem__(self, name):
- self.delete(name)
-
- def dump(self, name):
- """
- Return a serialized version of the value stored at the specified key.
- If key does not exist a nil bulk reply is returned.
- """
- return self.execute_command('DUMP', name)
-
- def exists(self, *names):
- "Returns the number of ``names`` that exist"
- return self.execute_command('EXISTS', *names)
- __contains__ = exists
-
- def expire(self, name, time):
- """
- Set an expire flag on key ``name`` for ``time`` seconds. ``time``
- can be represented by an integer or a Python timedelta object.
- """
- if isinstance(time, datetime.timedelta):
- time = int(time.total_seconds())
- return self.execute_command('EXPIRE', name, time)
-
- def expireat(self, name, when):
- """
- Set an expire flag on key ``name``. ``when`` can be represented
- as an integer indicating unix time or a Python datetime object.
- """
- if isinstance(when, datetime.datetime):
- when = int(time.mktime(when.timetuple()))
- return self.execute_command('EXPIREAT', name, when)
-
- def get(self, name):
- """
- Return the value at key ``name``, or None if the key doesn't exist
- """
- return self.execute_command('GET', name)
-
- def getdel(self, name):
- """
- Get the value at key ``name`` and delete the key. This command
- is similar to GET, except for the fact that it also deletes
- the key on success (if and only if the key's value type
- is a string).
- """
- return self.execute_command('GETDEL', name)
-
- def getex(self, name,
- ex=None, px=None, exat=None, pxat=None, persist=False):
- """
- Get the value of key and optionally set its expiration.
- GETEX is similar to GET, but is a write command with
- additional options. All time parameters can be given as
- datetime.timedelta or integers.
-
- ``ex`` sets an expire flag on key ``name`` for ``ex`` seconds.
-
- ``px`` sets an expire flag on key ``name`` for ``px`` milliseconds.
-
- ``exat`` sets an expire flag on key ``name`` for ``ex`` seconds,
- specified in unix time.
-
- ``pxat`` sets an expire flag on key ``name`` for ``ex`` milliseconds,
- specified in unix time.
-
- ``persist`` remove the time to live associated with ``name``.
- """
-
- opset = set([ex, px, exat, pxat])
- if len(opset) > 2 or len(opset) > 1 and persist:
- raise DataError("``ex``, ``px``, ``exat``, ``pxat``, "
- "and ``persist`` are mutually exclusive.")
-
- pieces = []
- # similar to set command
- if ex is not None:
- pieces.append('EX')
- if isinstance(ex, datetime.timedelta):
- ex = int(ex.total_seconds())
- pieces.append(ex)
- if px is not None:
- pieces.append('PX')
- if isinstance(px, datetime.timedelta):
- px = int(px.total_seconds() * 1000)
- pieces.append(px)
- # similar to pexpireat command
- if exat is not None:
- pieces.append('EXAT')
- if isinstance(exat, datetime.datetime):
- s = int(exat.microsecond / 1000000)
- exat = int(time.mktime(exat.timetuple())) + s
- pieces.append(exat)
- if pxat is not None:
- pieces.append('PXAT')
- if isinstance(pxat, datetime.datetime):
- ms = int(pxat.microsecond / 1000)
- pxat = int(time.mktime(pxat.timetuple())) * 1000 + ms
- pieces.append(pxat)
- if persist:
- pieces.append('PERSIST')
-
- return self.execute_command('GETEX', name, *pieces)
-
- def __getitem__(self, name):
- """
- Return the value at key ``name``, raises a KeyError if the key
- doesn't exist.
- """
- value = self.get(name)
- if value is not None:
- return value
- raise KeyError(name)
-
- def getbit(self, name, offset):
- "Returns a boolean indicating the value of ``offset`` in ``name``"
- return self.execute_command('GETBIT', name, offset)
-
- def getrange(self, key, start, end):
- """
- Returns the substring of the string value stored at ``key``,
- determined by the offsets ``start`` and ``end`` (both are inclusive)
- """
- return self.execute_command('GETRANGE', key, start, end)
-
- def getset(self, name, value):
- """
- Sets the value at key ``name`` to ``value``
- and returns the old value at key ``name`` atomically.
-
- As per Redis 6.2, GETSET is considered deprecated.
- Please use SET with GET parameter in new code.
- """
- return self.execute_command('GETSET', name, value)
-
- def incr(self, name, amount=1):
- """
- Increments the value of ``key`` by ``amount``. If no key exists,
- the value will be initialized as ``amount``
- """
- return self.incrby(name, amount)
-
- def incrby(self, name, amount=1):
- """
- Increments the value of ``key`` by ``amount``. If no key exists,
- the value will be initialized as ``amount``
- """
- # An alias for ``incr()``, because it is already implemented
- # as INCRBY redis command.
- return self.execute_command('INCRBY', name, amount)
-
- def incrbyfloat(self, name, amount=1.0):
- """
- Increments the value at key ``name`` by floating ``amount``.
- If no key exists, the value will be initialized as ``amount``
- """
- return self.execute_command('INCRBYFLOAT', name, amount)
-
- def keys(self, pattern='*'):
- "Returns a list of keys matching ``pattern``"
- return self.execute_command('KEYS', pattern)
-
- def lmove(self, first_list, second_list, src="LEFT", dest="RIGHT"):
- """
- Atomically returns and removes the first/last element of a list,
- pushing it as the first/last element on the destination list.
- Returns the element being popped and pushed.
- """
- params = [first_list, second_list, src, dest]
- return self.execute_command("LMOVE", *params)
-
- def blmove(self, first_list, second_list, timeout,
- src="LEFT", dest="RIGHT"):
- """
- Blocking version of lmove.
- """
- params = [first_list, second_list, src, dest, timeout]
- return self.execute_command("BLMOVE", *params)
-
- def mget(self, keys, *args):
- """
- Returns a list of values ordered identically to ``keys``
- """
- from redis.client import EMPTY_RESPONSE
- args = list_or_args(keys, args)
- options = {}
- if not args:
- options[EMPTY_RESPONSE] = []
- return self.execute_command('MGET', *args, **options)
-
- def mset(self, mapping):
- """
- Sets key/values based on a mapping. Mapping is a dictionary of
- key/value pairs. Both keys and values should be strings or types that
- can be cast to a string via str().
- """
- items = []
- for pair in mapping.items():
- items.extend(pair)
- return self.execute_command('MSET', *items)
-
- def msetnx(self, mapping):
- """
- Sets key/values based on a mapping if none of the keys are already set.
- Mapping is a dictionary of key/value pairs. Both keys and values
- should be strings or types that can be cast to a string via str().
- Returns a boolean indicating if the operation was successful.
- """
- items = []
- for pair in mapping.items():
- items.extend(pair)
- return self.execute_command('MSETNX', *items)
-
- def move(self, name, db):
- "Moves the key ``name`` to a different Redis database ``db``"
- return self.execute_command('MOVE', name, db)
-
- def persist(self, name):
- "Removes an expiration on ``name``"
- return self.execute_command('PERSIST', name)
-
- def pexpire(self, name, time):
- """
- Set an expire flag on key ``name`` for ``time`` milliseconds.
- ``time`` can be represented by an integer or a Python timedelta
- object.
- """
- if isinstance(time, datetime.timedelta):
- time = int(time.total_seconds() * 1000)
- return self.execute_command('PEXPIRE', name, time)
-
- def pexpireat(self, name, when):
- """
- Set an expire flag on key ``name``. ``when`` can be represented
- as an integer representing unix time in milliseconds (unix time * 1000)
- or a Python datetime object.
- """
- if isinstance(when, datetime.datetime):
- ms = int(when.microsecond / 1000)
- when = int(time.mktime(when.timetuple())) * 1000 + ms
- return self.execute_command('PEXPIREAT', name, when)
-
- def psetex(self, name, time_ms, value):
- """
- Set the value of key ``name`` to ``value`` that expires in ``time_ms``
- milliseconds. ``time_ms`` can be represented by an integer or a Python
- timedelta object
- """
- if isinstance(time_ms, datetime.timedelta):
- time_ms = int(time_ms.total_seconds() * 1000)
- return self.execute_command('PSETEX', name, time_ms, value)
-
- def pttl(self, name):
- "Returns the number of milliseconds until the key ``name`` will expire"
- return self.execute_command('PTTL', name)
-
- def hrandfield(self, key, count=None, withvalues=False):
- """
- Return a random field from the hash value stored at key.
-
- count: if the argument is positive, return an array of distinct fields.
- If called with a negative count, the behavior changes and the command
- is allowed to return the same field multiple times. In this case,
- the number of returned fields is the absolute value of the
- specified count.
- withvalues: The optional WITHVALUES modifier changes the reply so it
- includes the respective values of the randomly selected hash fields.
- """
- params = []
- if count is not None:
- params.append(count)
- if withvalues:
- params.append("WITHVALUES")
-
- return self.execute_command("HRANDFIELD", key, *params)
-
- def randomkey(self):
- """Returns the name of a random key"""
- return self.execute_command('RANDOMKEY')
-
- def rename(self, src, dst):
- """
- Rename key ``src`` to ``dst``
- """
- return self.execute_command('RENAME', src, dst)
-
- def renamenx(self, src, dst):
- """Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist"""
- return self.execute_command('RENAMENX', src, dst)
-
- def restore(self, name, ttl, value, replace=False, absttl=False,
- idletime=None, frequency=None):
- """
- Create a key using the provided serialized value, previously obtained
- using DUMP.
-
- ``replace`` allows an existing key on ``name`` to be overridden. If
- it's not specified an error is raised on collision.
-
- ``absttl`` if True, specified ``ttl`` should represent an absolute Unix
- timestamp in milliseconds in which the key will expire. (Redis 5.0 or
- greater).
-
- ``idletime`` Used for eviction, this is the number of seconds the
- key must be idle, prior to execution.
-
- ``frequency`` Used for eviction, this is the frequency counter of
- the object stored at the key, prior to execution.
- """
- params = [name, ttl, value]
- if replace:
- params.append('REPLACE')
- if absttl:
- params.append('ABSTTL')
- if idletime is not None:
- params.append('IDLETIME')
- try:
- params.append(int(idletime))
- except ValueError:
- raise DataError("idletimemust be an integer")
-
- if frequency is not None:
- params.append('FREQ')
- try:
- params.append(int(frequency))
- except ValueError:
- raise DataError("frequency must be an integer")
-
- return self.execute_command('RESTORE', *params)
-
- def set(self, name, value,
- ex=None, px=None, nx=False, xx=False, keepttl=False, get=False,
- exat=None, pxat=None):
- """
- Set the value at key ``name`` to ``value``
-
- ``ex`` sets an expire flag on key ``name`` for ``ex`` seconds.
-
- ``px`` sets an expire flag on key ``name`` for ``px`` milliseconds.
-
- ``nx`` if set to True, set the value at key ``name`` to ``value`` only
- if it does not exist.
-
- ``xx`` if set to True, set the value at key ``name`` to ``value`` only
- if it already exists.
-
- ``keepttl`` if True, retain the time to live associated with the key.
- (Available since Redis 6.0)
-
- ``get`` if True, set the value at key ``name`` to ``value`` and return
- the old value stored at key, or None if the key did not exist.
- (Available since Redis 6.2)
-
- ``exat`` sets an expire flag on key ``name`` for ``ex`` seconds,
- specified in unix time.
-
- ``pxat`` sets an expire flag on key ``name`` for ``ex`` milliseconds,
- specified in unix time.
- """
- pieces = [name, value]
- options = {}
- if ex is not None:
- pieces.append('EX')
- if isinstance(ex, datetime.timedelta):
- ex = int(ex.total_seconds())
- if isinstance(ex, int):
- pieces.append(ex)
- else:
- raise DataError("ex must be datetime.timedelta or int")
- if px is not None:
- pieces.append('PX')
- if isinstance(px, datetime.timedelta):
- px = int(px.total_seconds() * 1000)
- if isinstance(px, int):
- pieces.append(px)
- else:
- raise DataError("px must be datetime.timedelta or int")
- if exat is not None:
- pieces.append('EXAT')
- if isinstance(exat, datetime.datetime):
- s = int(exat.microsecond / 1000000)
- exat = int(time.mktime(exat.timetuple())) + s
- pieces.append(exat)
- if pxat is not None:
- pieces.append('PXAT')
- if isinstance(pxat, datetime.datetime):
- ms = int(pxat.microsecond / 1000)
- pxat = int(time.mktime(pxat.timetuple())) * 1000 + ms
- pieces.append(pxat)
- if keepttl:
- pieces.append('KEEPTTL')
-
- if nx:
- pieces.append('NX')
- if xx:
- pieces.append('XX')
-
- if get:
- pieces.append('GET')
- options["get"] = True
-
- return self.execute_command('SET', *pieces, **options)
-
- def __setitem__(self, name, value):
- self.set(name, value)
-
- def setbit(self, name, offset, value):
- """
- Flag the ``offset`` in ``name`` as ``value``. Returns a boolean
- indicating the previous value of ``offset``.
- """
- value = value and 1 or 0
- return self.execute_command('SETBIT', name, offset, value)
-
- def setex(self, name, time, value):
- """
- Set the value of key ``name`` to ``value`` that expires in ``time``
- seconds. ``time`` can be represented by an integer or a Python
- timedelta object.
- """
- if isinstance(time, datetime.timedelta):
- time = int(time.total_seconds())
- return self.execute_command('SETEX', name, time, value)
-
- def setnx(self, name, value):
- "Set the value of key ``name`` to ``value`` if key doesn't exist"
- return self.execute_command('SETNX', name, value)
-
- def setrange(self, name, offset, value):
- """
- Overwrite bytes in the value of ``name`` starting at ``offset`` with
- ``value``. If ``offset`` plus the length of ``value`` exceeds the
- length of the original value, the new value will be larger than before.
- If ``offset`` exceeds the length of the original value, null bytes
- will be used to pad between the end of the previous value and the start
- of what's being injected.
-
- Returns the length of the new string.
- """
- return self.execute_command('SETRANGE', name, offset, value)
-
- def stralgo(self, algo, value1, value2, specific_argument='strings',
- len=False, idx=False, minmatchlen=None, withmatchlen=False):
- """
- Implements complex algorithms that operate on strings.
- Right now the only algorithm implemented is the LCS algorithm
- (longest common substring). However new algorithms could be
- implemented in the future.
-
- ``algo`` Right now must be LCS
- ``value1`` and ``value2`` Can be two strings or two keys
- ``specific_argument`` Specifying if the arguments to the algorithm
- will be keys or strings. strings is the default.
- ``len`` Returns just the len of the match.
- ``idx`` Returns the match positions in each string.
- ``minmatchlen`` Restrict the list of matches to the ones of a given
- minimal length. Can be provided only when ``idx`` set to True.
- ``withmatchlen`` Returns the matches with the len of the match.
- Can be provided only when ``idx`` set to True.
- """
- # check validity
- supported_algo = ['LCS']
- if algo not in supported_algo:
- raise DataError("The supported algorithms are: %s"
- % (', '.join(supported_algo)))
- if specific_argument not in ['keys', 'strings']:
- raise DataError("specific_argument can be only"
- " keys or strings")
- if len and idx:
- raise DataError("len and idx cannot be provided together.")
-
- pieces = [algo, specific_argument.upper(), value1, value2]
- if len:
- pieces.append(b'LEN')
- if idx:
- pieces.append(b'IDX')
- try:
- int(minmatchlen)
- pieces.extend([b'MINMATCHLEN', minmatchlen])
- except TypeError:
- pass
- if withmatchlen:
- pieces.append(b'WITHMATCHLEN')
-
- return self.execute_command('STRALGO', *pieces, len=len, idx=idx,
- minmatchlen=minmatchlen,
- withmatchlen=withmatchlen)
-
- def strlen(self, name):
- "Return the number of bytes stored in the value of ``name``"
- return self.execute_command('STRLEN', name)
-
- def substr(self, name, start, end=-1):
- """
- Return a substring of the string at key ``name``. ``start`` and ``end``
- are 0-based integers specifying the portion of the string to return.
- """
- return self.execute_command('SUBSTR', name, start, end)
-
- def touch(self, *args):
- """
- Alters the last access time of a key(s) ``*args``. A key is ignored
- if it does not exist.
- """
- return self.execute_command('TOUCH', *args)
-
- def ttl(self, name):
- "Returns the number of seconds until the key ``name`` will expire"
- return self.execute_command('TTL', name)
-
- def type(self, name):
- "Returns the type of key ``name``"
- return self.execute_command('TYPE', name)
-
- def watch(self, *names):
- """
- Watches the values at keys ``names``, or None if the key doesn't exist
- """
- warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))
-
- def unwatch(self):
- """
- Unwatches the value at key ``name``, or None of the key doesn't exist
- """
- warnings.warn(
- DeprecationWarning('Call UNWATCH from a Pipeline object'))
-
- def unlink(self, *names):
- "Unlink one or more keys specified by ``names``"
- return self.execute_command('UNLINK', *names)
-
- # LIST COMMANDS
- def blpop(self, keys, timeout=0):
- """
- LPOP a value off of the first non-empty list
- named in the ``keys`` list.
-
- If none of the lists in ``keys`` has a value to LPOP, then block
- for ``timeout`` seconds, or until a value gets pushed on to one
- of the lists.
-
- If timeout is 0, then block indefinitely.
- """
- if timeout is None:
- timeout = 0
- keys = list_or_args(keys, None)
- keys.append(timeout)
- return self.execute_command('BLPOP', *keys)
-
- def brpop(self, keys, timeout=0):
- """
- RPOP a value off of the first non-empty list
- named in the ``keys`` list.
-
- If none of the lists in ``keys`` has a value to RPOP, then block
- for ``timeout`` seconds, or until a value gets pushed on to one
- of the lists.
-
- If timeout is 0, then block indefinitely.
- """
- if timeout is None:
- timeout = 0
- keys = list_or_args(keys, None)
- keys.append(timeout)
- return self.execute_command('BRPOP', *keys)
-
- def brpoplpush(self, src, dst, timeout=0):
- """
- Pop a value off the tail of ``src``, push it on the head of ``dst``
- and then return it.
-
- This command blocks until a value is in ``src`` or until ``timeout``
- seconds elapse, whichever is first. A ``timeout`` value of 0 blocks
- forever.
- """
- if timeout is None:
- timeout = 0
- return self.execute_command('BRPOPLPUSH', src, dst, timeout)
-
- def lindex(self, name, index):
- """
- Return the item from list ``name`` at position ``index``
-
- Negative indexes are supported and will return an item at the
- end of the list
- """
- return self.execute_command('LINDEX', name, index)
-
- def linsert(self, name, where, refvalue, value):
- """
- Insert ``value`` in list ``name`` either immediately before or after
- [``where``] ``refvalue``
-
- Returns the new length of the list on success or -1 if ``refvalue``
- is not in the list.
- """
- return self.execute_command('LINSERT', name, where, refvalue, value)
-
- def llen(self, name):
- "Return the length of the list ``name``"
- return self.execute_command('LLEN', name)
-
- def lpop(self, name, count=None):
- """
- Removes and returns the first elements of the list ``name``.
-
- By default, the command pops a single element from the beginning of
- the list. When provided with the optional ``count`` argument, the reply
- will consist of up to count elements, depending on the list's length.
- """
- if count is not None:
- return self.execute_command('LPOP', name, count)
- else:
- return self.execute_command('LPOP', name)
-
- def lpush(self, name, *values):
- "Push ``values`` onto the head of the list ``name``"
- return self.execute_command('LPUSH', name, *values)
-
- def lpushx(self, name, *values):
- "Push ``value`` onto the head of the list ``name`` if ``name`` exists"
- return self.execute_command('LPUSHX', name, *values)
-
- def lrange(self, name, start, end):
- """
- Return a slice of the list ``name`` between
- position ``start`` and ``end``
-
- ``start`` and ``end`` can be negative numbers just like
- Python slicing notation
- """
- return self.execute_command('LRANGE', name, start, end)
-
- def lrem(self, name, count, value):
- """
- Remove the first ``count`` occurrences of elements equal to ``value``
- from the list stored at ``name``.
-
- The count argument influences the operation in the following ways:
- count > 0: Remove elements equal to value moving from head to tail.
- count < 0: Remove elements equal to value moving from tail to head.
- count = 0: Remove all elements equal to value.
- """
- return self.execute_command('LREM', name, count, value)
-
- def lset(self, name, index, value):
- "Set ``position`` of list ``name`` to ``value``"
- return self.execute_command('LSET', name, index, value)
-
- def ltrim(self, name, start, end):
- """
- Trim the list ``name``, removing all values not within the slice
- between ``start`` and ``end``
-
- ``start`` and ``end`` can be negative numbers just like
- Python slicing notation
- """
- return self.execute_command('LTRIM', name, start, end)
-
- def rpop(self, name, count=None):
- """
- Removes and returns the last elements of the list ``name``.
-
- By default, the command pops a single element from the end of the list.
- When provided with the optional ``count`` argument, the reply will
- consist of up to count elements, depending on the list's length.
- """
- if count is not None:
- return self.execute_command('RPOP', name, count)
- else:
- return self.execute_command('RPOP', name)
-
- def rpoplpush(self, src, dst):
- """
- RPOP a value off of the ``src`` list and atomically LPUSH it
- on to the ``dst`` list. Returns the value.
- """
- return self.execute_command('RPOPLPUSH', src, dst)
-
- def rpush(self, name, *values):
- "Push ``values`` onto the tail of the list ``name``"
- return self.execute_command('RPUSH', name, *values)
-
- def rpushx(self, name, value):
- "Push ``value`` onto the tail of the list ``name`` if ``name`` exists"
- return self.execute_command('RPUSHX', name, value)
-
- def lpos(self, name, value, rank=None, count=None, maxlen=None):
- """
- Get position of ``value`` within the list ``name``
-
- If specified, ``rank`` indicates the "rank" of the first element to
- return in case there are multiple copies of ``value`` in the list.
- By default, LPOS returns the position of the first occurrence of
- ``value`` in the list. When ``rank`` 2, LPOS returns the position of
- the second ``value`` in the list. If ``rank`` is negative, LPOS
- searches the list in reverse. For example, -1 would return the
- position of the last occurrence of ``value`` and -2 would return the
- position of the next to last occurrence of ``value``.
-
- If specified, ``count`` indicates that LPOS should return a list of
- up to ``count`` positions. A ``count`` of 2 would return a list of
- up to 2 positions. A ``count`` of 0 returns a list of all positions
- matching ``value``. When ``count`` is specified and but ``value``
- does not exist in the list, an empty list is returned.
-
- If specified, ``maxlen`` indicates the maximum number of list
- elements to scan. A ``maxlen`` of 1000 will only return the
- position(s) of items within the first 1000 entries in the list.
- A ``maxlen`` of 0 (the default) will scan the entire list.
- """
- pieces = [name, value]
- if rank is not None:
- pieces.extend(['RANK', rank])
-
- if count is not None:
- pieces.extend(['COUNT', count])
-
- if maxlen is not None:
- pieces.extend(['MAXLEN', maxlen])
-
- return self.execute_command('LPOS', *pieces)
-
- def sort(self, name, start=None, num=None, by=None, get=None,
- desc=False, alpha=False, store=None, groups=False):
- """
- Sort and return the list, set or sorted set at ``name``.
-
- ``start`` and ``num`` allow for paging through the sorted data
-
- ``by`` allows using an external key to weight and sort the items.
- Use an "*" to indicate where in the key the item value is located
-
- ``get`` allows for returning items from external keys rather than the
- sorted data itself. Use an "*" to indicate where in the key
- the item value is located
-
- ``desc`` allows for reversing the sort
-
- ``alpha`` allows for sorting lexicographically rather than numerically
-
- ``store`` allows for storing the result of the sort into
- the key ``store``
-
- ``groups`` if set to True and if ``get`` contains at least two
- elements, sort will return a list of tuples, each containing the
- values fetched from the arguments to ``get``.
-
- """
- if (start is not None and num is None) or \
- (num is not None and start is None):
- raise DataError("``start`` and ``num`` must both be specified")
-
- pieces = [name]
- if by is not None:
- pieces.extend([b'BY', by])
- if start is not None and num is not None:
- pieces.extend([b'LIMIT', start, num])
- if get is not None:
- # If get is a string assume we want to get a single value.
- # Otherwise assume it's an interable and we want to get multiple
- # values. We can't just iterate blindly because strings are
- # iterable.
- if isinstance(get, (bytes, str)):
- pieces.extend([b'GET', get])
- else:
- for g in get:
- pieces.extend([b'GET', g])
- if desc:
- pieces.append(b'DESC')
- if alpha:
- pieces.append(b'ALPHA')
- if store is not None:
- pieces.extend([b'STORE', store])
- if groups:
- if not get or isinstance(get, (bytes, str)) or len(get) < 2:
- raise DataError('when using "groups" the "get" argument '
- 'must be specified and contain at least '
- 'two keys')
-
- options = {'groups': len(get) if groups else None}
- return self.execute_command('SORT', *pieces, **options)
-
- # SCAN COMMANDS
- def scan(self, cursor=0, match=None, count=None, _type=None):
- """
- Incrementally return lists of key names. Also return a cursor
- indicating the scan position.
-
- ``match`` allows for filtering the keys by pattern
-
- ``count`` provides a hint to Redis about the number of keys to
- return per batch.
-
- ``_type`` filters the returned values by a particular Redis type.
- Stock Redis instances allow for the following types:
- HASH, LIST, SET, STREAM, STRING, ZSET
- Additionally, Redis modules can expose other types as well.
- """
- pieces = [cursor]
- if match is not None:
- pieces.extend([b'MATCH', match])
- if count is not None:
- pieces.extend([b'COUNT', count])
- if _type is not None:
- pieces.extend([b'TYPE', _type])
- return self.execute_command('SCAN', *pieces)
-
- def scan_iter(self, match=None, count=None, _type=None):
- """
- Make an iterator using the SCAN command so that the client doesn't
- need to remember the cursor position.
-
- ``match`` allows for filtering the keys by pattern
-
- ``count`` provides a hint to Redis about the number of keys to
- return per batch.
-
- ``_type`` filters the returned values by a particular Redis type.
- Stock Redis instances allow for the following types:
- HASH, LIST, SET, STREAM, STRING, ZSET
- Additionally, Redis modules can expose other types as well.
- """
- cursor = '0'
- while cursor != 0:
- cursor, data = self.scan(cursor=cursor, match=match,
- count=count, _type=_type)
- yield from data
-
- def sscan(self, name, cursor=0, match=None, count=None):
- """
- Incrementally return lists of elements in a set. Also return a cursor
- indicating the scan position.
-
- ``match`` allows for filtering the keys by pattern
-
- ``count`` allows for hint the minimum number of returns
- """
- pieces = [name, cursor]
- if match is not None:
- pieces.extend([b'MATCH', match])
- if count is not None:
- pieces.extend([b'COUNT', count])
- return self.execute_command('SSCAN', *pieces)
-
- def sscan_iter(self, name, match=None, count=None):
- """
- Make an iterator using the SSCAN command so that the client doesn't
- need to remember the cursor position.
-
- ``match`` allows for filtering the keys by pattern
-
- ``count`` allows for hint the minimum number of returns
- """
- cursor = '0'
- while cursor != 0:
- cursor, data = self.sscan(name, cursor=cursor,
- match=match, count=count)
- yield from data
-
- def hscan(self, name, cursor=0, match=None, count=None):
- """
- Incrementally return key/value slices in a hash. Also return a cursor
- indicating the scan position.
-
- ``match`` allows for filtering the keys by pattern
-
- ``count`` allows for hint the minimum number of returns
- """
- pieces = [name, cursor]
- if match is not None:
- pieces.extend([b'MATCH', match])
- if count is not None:
- pieces.extend([b'COUNT', count])
- return self.execute_command('HSCAN', *pieces)
-
- def hscan_iter(self, name, match=None, count=None):
- """
- Make an iterator using the HSCAN command so that the client doesn't
- need to remember the cursor position.
-
- ``match`` allows for filtering the keys by pattern
-
- ``count`` allows for hint the minimum number of returns
- """
- cursor = '0'
- while cursor != 0:
- cursor, data = self.hscan(name, cursor=cursor,
- match=match, count=count)
- yield from data.items()
-
- def zscan(self, name, cursor=0, match=None, count=None,
- score_cast_func=float):
- """
- Incrementally return lists of elements in a sorted set. Also return a
- cursor indicating the scan position.
-
- ``match`` allows for filtering the keys by pattern
-
- ``count`` allows for hint the minimum number of returns
-
- ``score_cast_func`` a callable used to cast the score return value
- """
- pieces = [name, cursor]
- if match is not None:
- pieces.extend([b'MATCH', match])
- if count is not None:
- pieces.extend([b'COUNT', count])
- options = {'score_cast_func': score_cast_func}
- return self.execute_command('ZSCAN', *pieces, **options)
-
- def zscan_iter(self, name, match=None, count=None,
- score_cast_func=float):
- """
- Make an iterator using the ZSCAN command so that the client doesn't
- need to remember the cursor position.
-
- ``match`` allows for filtering the keys by pattern
-
- ``count`` allows for hint the minimum number of returns
-
- ``score_cast_func`` a callable used to cast the score return value
- """
- cursor = '0'
- while cursor != 0:
- cursor, data = self.zscan(name, cursor=cursor, match=match,
- count=count,
- score_cast_func=score_cast_func)
- yield from data
-
- # SET COMMANDS
- def sadd(self, name, *values):
- """Add ``value(s)`` to set ``name``"""
- return self.execute_command('SADD', name, *values)
-
- def scard(self, name):
- """Return the number of elements in set ``name``"""
- return self.execute_command('SCARD', name)
-
- def sdiff(self, keys, *args):
- """Return the difference of sets specified by ``keys``"""
- args = list_or_args(keys, args)
- return self.execute_command('SDIFF', *args)
-
- def sdiffstore(self, dest, keys, *args):
- """
- Store the difference of sets specified by ``keys`` into a new
- set named ``dest``. Returns the number of keys in the new set.
- """
- args = list_or_args(keys, args)
- return self.execute_command('SDIFFSTORE', dest, *args)
-
- def sinter(self, keys, *args):
- """Return the intersection of sets specified by ``keys``"""
- args = list_or_args(keys, args)
- return self.execute_command('SINTER', *args)
-
- def sinterstore(self, dest, keys, *args):
- """
- Store the intersection of sets specified by ``keys`` into a new
- set named ``dest``. Returns the number of keys in the new set.
- """
- args = list_or_args(keys, args)
- return self.execute_command('SINTERSTORE', dest, *args)
-
- def sismember(self, name, value):
- """
- Return a boolean indicating if ``value`` is a member of set ``name``
- """
- return self.execute_command('SISMEMBER', name, value)
-
- def smembers(self, name):
- """Return all members of the set ``name``"""
- return self.execute_command('SMEMBERS', name)
-
- def smove(self, src, dst, value):
- """Move ``value`` from set ``src`` to set ``dst`` atomically"""
- return self.execute_command('SMOVE', src, dst, value)
-
- def spop(self, name, count=None):
- "Remove and return a random member of set ``name``"
- args = (count is not None) and [count] or []
- return self.execute_command('SPOP', name, *args)
-
- def srandmember(self, name, number=None):
- """
- If ``number`` is None, returns a random member of set ``name``.
-
- If ``number`` is supplied, returns a list of ``number`` random
- members of set ``name``. Note this is only available when running
- Redis 2.6+.
- """
- args = (number is not None) and [number] or []
- return self.execute_command('SRANDMEMBER', name, *args)
-
- def srem(self, name, *values):
- "Remove ``values`` from set ``name``"
- return self.execute_command('SREM', name, *values)
-
- def sunion(self, keys, *args):
- "Return the union of sets specified by ``keys``"
- args = list_or_args(keys, args)
- return self.execute_command('SUNION', *args)
-
- def sunionstore(self, dest, keys, *args):
- """
- Store the union of sets specified by ``keys`` into a new
- set named ``dest``. Returns the number of keys in the new set.
- """
- args = list_or_args(keys, args)
- return self.execute_command('SUNIONSTORE', dest, *args)
-
- # STREAMS COMMANDS
- def xack(self, name, groupname, *ids):
- """
- Acknowledges the successful processing of one or more messages.
- name: name of the stream.
- groupname: name of the consumer group.
- *ids: message ids to acknowledge.
- """
- return self.execute_command('XACK', name, groupname, *ids)
-
- def xadd(self, name, fields, id='*', maxlen=None, approximate=True,
- nomkstream=False, minid=None, limit=None):
- """
- Add to a stream.
- name: name of the stream
- fields: dict of field/value pairs to insert into the stream
- id: Location to insert this record. By default it is appended.
- maxlen: truncate old stream members beyond this size.
- Can't be specified with minid.
- approximate: actual stream length may be slightly more than maxlen
- nomkstream: When set to true, do not make a stream
- minid: the minimum id in the stream to query.
- Can't be specified with maxlen.
- limit: specifies the maximum number of entries to retrieve
- """
- pieces = []
- if maxlen is not None and minid is not None:
- raise DataError("Only one of ```maxlen``` or ```minid``` "
- "may be specified")
-
- if maxlen is not None:
- if not isinstance(maxlen, int) or maxlen < 1:
- raise DataError('XADD maxlen must be a positive integer')
- pieces.append(b'MAXLEN')
- if approximate:
- pieces.append(b'~')
- pieces.append(str(maxlen))
- if minid is not None:
- pieces.append(b'MINID')
- if approximate:
- pieces.append(b'~')
- pieces.append(minid)
- if limit is not None:
- pieces.extend([b'LIMIT', limit])
- if nomkstream:
- pieces.append(b'NOMKSTREAM')
- pieces.append(id)
- if not isinstance(fields, dict) or len(fields) == 0:
- raise DataError('XADD fields must be a non-empty dict')
- for pair in fields.items():
- pieces.extend(pair)
- return self.execute_command('XADD', name, *pieces)
-
- def xautoclaim(self, name, groupname, consumername, min_idle_time,
- start_id=0, count=None, justid=False):
- """
- Transfers ownership of pending stream entries that match the specified
- criteria. Conceptually, equivalent to calling XPENDING and then XCLAIM,
- but provides a more straightforward way to deal with message delivery
- failures via SCAN-like semantics.
- name: name of the stream.
- groupname: name of the consumer group.
- consumername: name of a consumer that claims the message.
- min_idle_time: filter messages that were idle less than this amount of
- milliseconds.
- start_id: filter messages with equal or greater ID.
- count: optional integer, upper limit of the number of entries that the
- command attempts to claim. Set to 100 by default.
- justid: optional boolean, false by default. Return just an array of IDs
- of messages successfully claimed, without returning the actual message
- """
- try:
- if int(min_idle_time) < 0:
- raise DataError("XAUTOCLAIM min_idle_time must be a non"
- "negative integer")
- except TypeError:
- pass
-
- kwargs = {}
- pieces = [name, groupname, consumername, min_idle_time, start_id]
-
- try:
- if int(count) < 0:
- raise DataError("XPENDING count must be a integer >= 0")
- pieces.extend([b'COUNT', count])
- except TypeError:
- pass
- if justid:
- pieces.append(b'JUSTID')
- kwargs['parse_justid'] = True
-
- return self.execute_command('XAUTOCLAIM', *pieces, **kwargs)
-
- def xclaim(self, name, groupname, consumername, min_idle_time, message_ids,
- idle=None, time=None, retrycount=None, force=False,
- justid=False):
- """
- Changes the ownership of a pending message.
- name: name of the stream.
- groupname: name of the consumer group.
- consumername: name of a consumer that claims the message.
- min_idle_time: filter messages that were idle less than this amount of
- milliseconds
- message_ids: non-empty list or tuple of message IDs to claim
- idle: optional. Set the idle time (last time it was delivered) of the
- message in ms
- time: optional integer. This is the same as idle but instead of a
- relative amount of milliseconds, it sets the idle time to a specific
- Unix time (in milliseconds).
- retrycount: optional integer. set the retry counter to the specified
- value. This counter is incremented every time a message is delivered
- again.
- force: optional boolean, false by default. Creates the pending message
- entry in the PEL even if certain specified IDs are not already in the
- PEL assigned to a different client.
- justid: optional boolean, false by default. Return just an array of IDs
- of messages successfully claimed, without returning the actual message
- """
- if not isinstance(min_idle_time, int) or min_idle_time < 0:
- raise DataError("XCLAIM min_idle_time must be a non negative "
- "integer")
- if not isinstance(message_ids, (list, tuple)) or not message_ids:
- raise DataError("XCLAIM message_ids must be a non empty list or "
- "tuple of message IDs to claim")
-
- kwargs = {}
- pieces = [name, groupname, consumername, str(min_idle_time)]
- pieces.extend(list(message_ids))
-
- if idle is not None:
- if not isinstance(idle, int):
- raise DataError("XCLAIM idle must be an integer")
- pieces.extend((b'IDLE', str(idle)))
- if time is not None:
- if not isinstance(time, int):
- raise DataError("XCLAIM time must be an integer")
- pieces.extend((b'TIME', str(time)))
- if retrycount is not None:
- if not isinstance(retrycount, int):
- raise DataError("XCLAIM retrycount must be an integer")
- pieces.extend((b'RETRYCOUNT', str(retrycount)))
-
- if force:
- if not isinstance(force, bool):
- raise DataError("XCLAIM force must be a boolean")
- pieces.append(b'FORCE')
- if justid:
- if not isinstance(justid, bool):
- raise DataError("XCLAIM justid must be a boolean")
- pieces.append(b'JUSTID')
- kwargs['parse_justid'] = True
- return self.execute_command('XCLAIM', *pieces, **kwargs)
-
- def xdel(self, name, *ids):
- """
- Deletes one or more messages from a stream.
- name: name of the stream.
- *ids: message ids to delete.
- """
- return self.execute_command('XDEL', name, *ids)
-
- def xgroup_create(self, name, groupname, id='$', mkstream=False):
- """
- Create a new consumer group associated with a stream.
- name: name of the stream.
- groupname: name of the consumer group.
- id: ID of the last item in the stream to consider already delivered.
- """
- pieces = ['XGROUP CREATE', name, groupname, id]
- if mkstream:
- pieces.append(b'MKSTREAM')
- return self.execute_command(*pieces)
-
- def xgroup_delconsumer(self, name, groupname, consumername):
- """
- Remove a specific consumer from a consumer group.
- Returns the number of pending messages that the consumer had before it
- was deleted.
- name: name of the stream.
- groupname: name of the consumer group.
- consumername: name of consumer to delete
- """
- return self.execute_command('XGROUP DELCONSUMER', name, groupname,
- consumername)
-
- def xgroup_destroy(self, name, groupname):
- """
- Destroy a consumer group.
- name: name of the stream.
- groupname: name of the consumer group.
- """
- return self.execute_command('XGROUP DESTROY', name, groupname)
-
- def xgroup_createconsumer(self, name, groupname, consumername):
- """
- Consumers in a consumer group are auto-created every time a new
- consumer name is mentioned by some command.
- They can be explicitly created by using this command.
- name: name of the stream.
- groupname: name of the consumer group.
- consumername: name of consumer to create.
- """
- return self.execute_command('XGROUP CREATECONSUMER', name, groupname,
- consumername)
-
- def xgroup_setid(self, name, groupname, id):
- """
- Set the consumer group last delivered ID to something else.
- name: name of the stream.
- groupname: name of the consumer group.
- id: ID of the last item in the stream to consider already delivered.
- """
- return self.execute_command('XGROUP SETID', name, groupname, id)
-
- def xinfo_consumers(self, name, groupname):
- """
- Returns general information about the consumers in the group.
- name: name of the stream.
- groupname: name of the consumer group.
- """
- return self.execute_command('XINFO CONSUMERS', name, groupname)
-
- def xinfo_groups(self, name):
- """
- Returns general information about the consumer groups of the stream.
- name: name of the stream.
- """
- return self.execute_command('XINFO GROUPS', name)
-
- def xinfo_stream(self, name, full=False):
- """
- Returns general information about the stream.
- name: name of the stream.
- full: optional boolean, false by default. Return full summary
- """
- pieces = [name]
- options = {}
- if full:
- pieces.append(b'FULL')
- options = {'full': full}
- return self.execute_command('XINFO STREAM', *pieces, **options)
-
- def xlen(self, name):
- """
- Returns the number of elements in a given stream.
- """
- return self.execute_command('XLEN', name)
-
- def xpending(self, name, groupname):
- """
- Returns information about pending messages of a group.
- name: name of the stream.
- groupname: name of the consumer group.
- """
- return self.execute_command('XPENDING', name, groupname)
-
- def xpending_range(self, name, groupname, idle=None,
- min=None, max=None, count=None,
- consumername=None):
- """
- Returns information about pending messages, in a range.
-
- name: name of the stream.
- groupname: name of the consumer group.
- idle: available from version 6.2. filter entries by their
- idle-time, given in milliseconds (optional).
- min: minimum stream ID.
- max: maximum stream ID.
- count: number of messages to return
- consumername: name of a consumer to filter by (optional).
-
- """
- if {min, max, count} == {None}:
- if idle is not None or consumername is not None:
- raise DataError("if XPENDING is provided with idle time"
- " or consumername, it must be provided"
- " with min, max and count parameters")
- return self.xpending(name, groupname)
-
- pieces = [name, groupname]
- if min is None or max is None or count is None:
- raise DataError("XPENDING must be provided with min, max "
- "and count parameters, or none of them.")
- # idle
- try:
- if int(idle) < 0:
- raise DataError("XPENDING idle must be a integer >= 0")
- pieces.extend(['IDLE', idle])
- except TypeError:
- pass
- # count
- try:
- if int(count) < 0:
- raise DataError("XPENDING count must be a integer >= 0")
- pieces.extend([min, max, count])
- except TypeError:
- pass
- # consumername
- if consumername:
- pieces.append(consumername)
-
- return self.execute_command('XPENDING', *pieces, parse_detail=True)
-
- def xrange(self, name, min='-', max='+', count=None):
- """
- Read stream values within an interval.
- name: name of the stream.
- start: first stream ID. defaults to '-',
- meaning the earliest available.
- finish: last stream ID. defaults to '+',
- meaning the latest available.
- count: if set, only return this many items, beginning with the
- earliest available.
- """
- pieces = [min, max]
- if count is not None:
- if not isinstance(count, int) or count < 1:
- raise DataError('XRANGE count must be a positive integer')
- pieces.append(b'COUNT')
- pieces.append(str(count))
-
- return self.execute_command('XRANGE', name, *pieces)
-
- def xread(self, streams, count=None, block=None):
- """
- Block and monitor multiple streams for new data.
- streams: a dict of stream names to stream IDs, where
- IDs indicate the last ID already seen.
- count: if set, only return this many items, beginning with the
- earliest available.
- block: number of milliseconds to wait, if nothing already present.
- """
- pieces = []
- if block is not None:
- if not isinstance(block, int) or block < 0:
- raise DataError('XREAD block must be a non-negative integer')
- pieces.append(b'BLOCK')
- pieces.append(str(block))
- if count is not None:
- if not isinstance(count, int) or count < 1:
- raise DataError('XREAD count must be a positive integer')
- pieces.append(b'COUNT')
- pieces.append(str(count))
- if not isinstance(streams, dict) or len(streams) == 0:
- raise DataError('XREAD streams must be a non empty dict')
- pieces.append(b'STREAMS')
- keys, values = zip(*streams.items())
- pieces.extend(keys)
- pieces.extend(values)
- return self.execute_command('XREAD', *pieces)
-
- def xreadgroup(self, groupname, consumername, streams, count=None,
- block=None, noack=False):
- """
- Read from a stream via a consumer group.
- groupname: name of the consumer group.
- consumername: name of the requesting consumer.
- streams: a dict of stream names to stream IDs, where
- IDs indicate the last ID already seen.
- count: if set, only return this many items, beginning with the
- earliest available.
- block: number of milliseconds to wait, if nothing already present.
- noack: do not add messages to the PEL
- """
- pieces = [b'GROUP', groupname, consumername]
- if count is not None:
- if not isinstance(count, int) or count < 1:
- raise DataError("XREADGROUP count must be a positive integer")
- pieces.append(b'COUNT')
- pieces.append(str(count))
- if block is not None:
- if not isinstance(block, int) or block < 0:
- raise DataError("XREADGROUP block must be a non-negative "
- "integer")
- pieces.append(b'BLOCK')
- pieces.append(str(block))
- if noack:
- pieces.append(b'NOACK')
- if not isinstance(streams, dict) or len(streams) == 0:
- raise DataError('XREADGROUP streams must be a non empty dict')
- pieces.append(b'STREAMS')
- pieces.extend(streams.keys())
- pieces.extend(streams.values())
- return self.execute_command('XREADGROUP', *pieces)
-
- def xrevrange(self, name, max='+', min='-', count=None):
- """
- Read stream values within an interval, in reverse order.
- name: name of the stream
- start: first stream ID. defaults to '+',
- meaning the latest available.
- finish: last stream ID. defaults to '-',
- meaning the earliest available.
- count: if set, only return this many items, beginning with the
- latest available.
- """
- pieces = [max, min]
- if count is not None:
- if not isinstance(count, int) or count < 1:
- raise DataError('XREVRANGE count must be a positive integer')
- pieces.append(b'COUNT')
- pieces.append(str(count))
-
- return self.execute_command('XREVRANGE', name, *pieces)
-
- def xtrim(self, name, maxlen=None, approximate=True, minid=None,
- limit=None):
- """
- Trims old messages from a stream.
- name: name of the stream.
- maxlen: truncate old stream messages beyond this size
- Can't be specified with minid.
- approximate: actual stream length may be slightly more than maxlen
- minid: the minimum id in the stream to query
- Can't be specified with maxlen.
- limit: specifies the maximum number of entries to retrieve
- """
- pieces = []
- if maxlen is not None and minid is not None:
- raise DataError("Only one of ``maxlen`` or ``minid`` "
- "may be specified")
-
- if maxlen is not None:
- pieces.append(b'MAXLEN')
- if minid is not None:
- pieces.append(b'MINID')
- if approximate:
- pieces.append(b'~')
- if maxlen is not None:
- pieces.append(maxlen)
- if minid is not None:
- pieces.append(minid)
- if limit is not None:
- pieces.append(b"LIMIT")
- pieces.append(limit)
-
- return self.execute_command('XTRIM', name, *pieces)
-
- # SORTED SET COMMANDS
- def zadd(self, name, mapping, nx=False, xx=False, ch=False, incr=False,
- gt=None, lt=None):
- """
- Set any number of element-name, score pairs to the key ``name``. Pairs
- are specified as a dict of element-names keys to score values.
-
- ``nx`` forces ZADD to only create new elements and not to update
- scores for elements that already exist.
-
- ``xx`` forces ZADD to only update scores of elements that already
- exist. New elements will not be added.
-
- ``ch`` modifies the return value to be the numbers of elements changed.
- Changed elements include new elements that were added and elements
- whose scores changed.
-
- ``incr`` modifies ZADD to behave like ZINCRBY. In this mode only a
- single element/score pair can be specified and the score is the amount
- the existing score will be incremented by. When using this mode the
- return value of ZADD will be the new score of the element.
-
- ``LT`` Only update existing elements if the new score is less than
- the current score. This flag doesn't prevent adding new elements.
-
- ``GT`` Only update existing elements if the new score is greater than
- the current score. This flag doesn't prevent adding new elements.
-
- The return value of ZADD varies based on the mode specified. With no
- options, ZADD returns the number of new elements added to the sorted
- set.
-
- ``NX``, ``LT``, and ``GT`` are mutually exclusive options.
- See: https://redis.io/commands/ZADD
- """
- if not mapping:
- raise DataError("ZADD requires at least one element/score pair")
- if nx and xx:
- raise DataError("ZADD allows either 'nx' or 'xx', not both")
- if incr and len(mapping) != 1:
- raise DataError("ZADD option 'incr' only works when passing a "
- "single element/score pair")
- if nx is True and (gt is not None or lt is not None):
- raise DataError("Only one of 'nx', 'lt', or 'gr' may be defined.")
-
- pieces = []
- options = {}
- if nx:
- pieces.append(b'NX')
- if xx:
- pieces.append(b'XX')
- if ch:
- pieces.append(b'CH')
- if incr:
- pieces.append(b'INCR')
- options['as_score'] = True
- if gt:
- pieces.append(b'GT')
- if lt:
- pieces.append(b'LT')
- for pair in mapping.items():
- pieces.append(pair[1])
- pieces.append(pair[0])
- return self.execute_command('ZADD', name, *pieces, **options)
-
- def zcard(self, name):
- "Return the number of elements in the sorted set ``name``"
- return self.execute_command('ZCARD', name)
-
- def zcount(self, name, min, max):
- """
- Returns the number of elements in the sorted set at key ``name`` with
- a score between ``min`` and ``max``.
- """
- return self.execute_command('ZCOUNT', name, min, max)
-
- def zdiff(self, keys, withscores=False):
- """
- Returns the difference between the first and all successive input
- sorted sets provided in ``keys``.
- """
- pieces = [len(keys), *keys]
- if withscores:
- pieces.append("WITHSCORES")
- return self.execute_command("ZDIFF", *pieces)
-
- def zdiffstore(self, dest, keys):
- """
- Computes the difference between the first and all successive input
- sorted sets provided in ``keys`` and stores the result in ``dest``.
- """
- pieces = [len(keys), *keys]
- return self.execute_command("ZDIFFSTORE", dest, *pieces)
-
- def zincrby(self, name, amount, value):
- "Increment the score of ``value`` in sorted set ``name`` by ``amount``"
- return self.execute_command('ZINCRBY', name, amount, value)
-
- def zinter(self, keys, aggregate=None, withscores=False):
- """
- Return the intersect of multiple sorted sets specified by ``keys``.
- With the ``aggregate`` option, it is possible to specify how the
- results of the union are aggregated. This option defaults to SUM,
- where the score of an element is summed across the inputs where it
- exists. When this option is set to either MIN or MAX, the resulting
- set will contain the minimum or maximum score of an element across
- the inputs where it exists.
- """
- return self._zaggregate('ZINTER', None, keys, aggregate,
- withscores=withscores)
-
- def zinterstore(self, dest, keys, aggregate=None):
- """
- Intersect multiple sorted sets specified by ``keys`` into a new
- sorted set, ``dest``. Scores in the destination will be aggregated
- based on the ``aggregate``. This option defaults to SUM, where the
- score of an element is summed across the inputs where it exists.
- When this option is set to either MIN or MAX, the resulting set will
- contain the minimum or maximum score of an element across the inputs
- where it exists.
- """
- return self._zaggregate('ZINTERSTORE', dest, keys, aggregate)
-
- def zlexcount(self, name, min, max):
- """
- Return the number of items in the sorted set ``name`` between the
- lexicographical range ``min`` and ``max``.
- """
- return self.execute_command('ZLEXCOUNT', name, min, max)
-
- def zpopmax(self, name, count=None):
- """
- Remove and return up to ``count`` members with the highest scores
- from the sorted set ``name``.
- """
- args = (count is not None) and [count] or []
- options = {
- 'withscores': True
- }
- return self.execute_command('ZPOPMAX', name, *args, **options)
-
- def zpopmin(self, name, count=None):
- """
- Remove and return up to ``count`` members with the lowest scores
- from the sorted set ``name``.
- """
- args = (count is not None) and [count] or []
- options = {
- 'withscores': True
- }
- return self.execute_command('ZPOPMIN', name, *args, **options)
-
- def zrandmember(self, key, count=None, withscores=False):
- """
- Return a random element from the sorted set value stored at key.
-
- ``count`` if the argument is positive, return an array of distinct
- fields. If called with a negative count, the behavior changes and
- the command is allowed to return the same field multiple times.
- In this case, the number of returned fields is the absolute value
- of the specified count.
-
- ``withscores`` The optional WITHSCORES modifier changes the reply so it
- includes the respective scores of the randomly selected elements from
- the sorted set.
- """
- params = []
- if count is not None:
- params.append(count)
- if withscores:
- params.append("WITHSCORES")
-
- return self.execute_command("ZRANDMEMBER", key, *params)
-
- def bzpopmax(self, keys, timeout=0):
- """
- ZPOPMAX a value off of the first non-empty sorted set
- named in the ``keys`` list.
-
- If none of the sorted sets in ``keys`` has a value to ZPOPMAX,
- then block for ``timeout`` seconds, or until a member gets added
- to one of the sorted sets.
-
- If timeout is 0, then block indefinitely.
- """
- if timeout is None:
- timeout = 0
- keys = list_or_args(keys, None)
- keys.append(timeout)
- return self.execute_command('BZPOPMAX', *keys)
-
- def bzpopmin(self, keys, timeout=0):
- """
- ZPOPMIN a value off of the first non-empty sorted set
- named in the ``keys`` list.
-
- If none of the sorted sets in ``keys`` has a value to ZPOPMIN,
- then block for ``timeout`` seconds, or until a member gets added
- to one of the sorted sets.
-
- If timeout is 0, then block indefinitely.
- """
- if timeout is None:
- timeout = 0
- keys = list_or_args(keys, None)
- keys.append(timeout)
- return self.execute_command('BZPOPMIN', *keys)
-
- def _zrange(self, command, dest, name, start, end, desc=False,
- byscore=False, bylex=False, withscores=False,
- score_cast_func=float, offset=None, num=None):
- if byscore and bylex:
- raise DataError("``byscore`` and ``bylex`` can not be "
- "specified together.")
- if (offset is not None and num is None) or \
- (num is not None and offset is None):
- raise DataError("``offset`` and ``num`` must both be specified.")
- if bylex and withscores:
- raise DataError("``withscores`` not supported in combination "
- "with ``bylex``.")
- pieces = [command]
- if dest:
- pieces.append(dest)
- pieces.extend([name, start, end])
- if byscore:
- pieces.append('BYSCORE')
- if bylex:
- pieces.append('BYLEX')
- if desc:
- pieces.append('REV')
- if offset is not None and num is not None:
- pieces.extend(['LIMIT', offset, num])
- if withscores:
- pieces.append('WITHSCORES')
- options = {
- 'withscores': withscores,
- 'score_cast_func': score_cast_func
- }
- return self.execute_command(*pieces, **options)
-
- def zrange(self, name, start, end, desc=False, withscores=False,
- score_cast_func=float, byscore=False, bylex=False,
- offset=None, num=None):
- """
- Return a range of values from sorted set ``name`` between
- ``start`` and ``end`` sorted in ascending order.
-
- ``start`` and ``end`` can be negative, indicating the end of the range.
-
- ``desc`` a boolean indicating whether to sort the results in reversed
- order.
-
- ``withscores`` indicates to return the scores along with the values.
- The return type is a list of (value, score) pairs.
-
- ``score_cast_func`` a callable used to cast the score return value.
-
- ``byscore`` when set to True, returns the range of elements from the
- sorted set having scores equal or between ``start`` and ``end``.
-
- ``bylex`` when set to True, returns the range of elements from the
- sorted set between the ``start`` and ``end`` lexicographical closed
- range intervals.
- Valid ``start`` and ``end`` must start with ( or [, in order to specify
- whether the range interval is exclusive or inclusive, respectively.
-
- ``offset`` and ``num`` are specified, then return a slice of the range.
- Can't be provided when using ``bylex``.
- """
- return self._zrange('ZRANGE', None, name, start, end, desc, byscore,
- bylex, withscores, score_cast_func, offset, num)
-
- def zrevrange(self, name, start, end, withscores=False,
- score_cast_func=float):
- """
- Return a range of values from sorted set ``name`` between
- ``start`` and ``end`` sorted in descending order.
-
- ``start`` and ``end`` can be negative, indicating the end of the range.
-
- ``withscores`` indicates to return the scores along with the values
- The return type is a list of (value, score) pairs
-
- ``score_cast_func`` a callable used to cast the score return value
- """
- return self.zrange(name, start, end, desc=True,
- withscores=withscores,
- score_cast_func=score_cast_func)
-
- def zrangestore(self, dest, name, start, end,
- byscore=False, bylex=False, desc=False,
- offset=None, num=None):
- """
- Stores in ``dest`` the result of a range of values from sorted set
- ``name`` between ``start`` and ``end`` sorted in ascending order.
-
- ``start`` and ``end`` can be negative, indicating the end of the range.
-
- ``byscore`` when set to True, returns the range of elements from the
- sorted set having scores equal or between ``start`` and ``end``.
-
- ``bylex`` when set to True, returns the range of elements from the
- sorted set between the ``start`` and ``end`` lexicographical closed
- range intervals.
- Valid ``start`` and ``end`` must start with ( or [, in order to specify
- whether the range interval is exclusive or inclusive, respectively.
-
- ``desc`` a boolean indicating whether to sort the results in reversed
- order.
-
- ``offset`` and ``num`` are specified, then return a slice of the range.
- Can't be provided when using ``bylex``.
- """
- return self._zrange('ZRANGESTORE', dest, name, start, end, desc,
- byscore, bylex, False, None, offset, num)
-
- def zrangebylex(self, name, min, max, start=None, num=None):
- """
- Return the lexicographical range of values from sorted set ``name``
- between ``min`` and ``max``.
-
- If ``start`` and ``num`` are specified, then return a slice of the
- range.
- """
- return self.zrange(name, min, max, bylex=True, offset=start, num=num)
-
- def zrevrangebylex(self, name, max, min, start=None, num=None):
- """
- Return the reversed lexicographical range of values from sorted set
- ``name`` between ``max`` and ``min``.
-
- If ``start`` and ``num`` are specified, then return a slice of the
- range.
- """
- return self.zrange(name, max, min, desc=True,
- bylex=True, offset=start, num=num)
-
- def zrangebyscore(self, name, min, max, start=None, num=None,
- withscores=False, score_cast_func=float):
- """
- Return a range of values from the sorted set ``name`` with scores
- between ``min`` and ``max``.
-
- If ``start`` and ``num`` are specified, then return a slice
- of the range.
-
- ``withscores`` indicates to return the scores along with the values.
- The return type is a list of (value, score) pairs
-
- `score_cast_func`` a callable used to cast the score return value
- """
- return self.zrange(name, min, max, byscore=True,
- offset=start, num=num,
- withscores=withscores,
- score_cast_func=score_cast_func)
-
- def zrevrangebyscore(self, name, max, min, start=None, num=None,
- withscores=False, score_cast_func=float):
- """
- Return a range of values from the sorted set ``name`` with scores
- between ``min`` and ``max`` in descending order.
-
- If ``start`` and ``num`` are specified, then return a slice
- of the range.
-
- ``withscores`` indicates to return the scores along with the values.
- The return type is a list of (value, score) pairs
-
- ``score_cast_func`` a callable used to cast the score return value
- """
- return self.zrange(name, max, min, desc=True,
- byscore=True, offset=start,
- num=num, withscores=withscores,
- score_cast_func=score_cast_func)
-
- def zrank(self, name, value):
- """
- Returns a 0-based value indicating the rank of ``value`` in sorted set
- ``name``
- """
- return self.execute_command('ZRANK', name, value)
-
- def zrem(self, name, *values):
- "Remove member ``values`` from sorted set ``name``"
- return self.execute_command('ZREM', name, *values)
-
- def zremrangebylex(self, name, min, max):
- """
- Remove all elements in the sorted set ``name`` between the
- lexicographical range specified by ``min`` and ``max``.
-
- Returns the number of elements removed.
- """
- return self.execute_command('ZREMRANGEBYLEX', name, min, max)
-
- def zremrangebyrank(self, name, min, max):
- """
- Remove all elements in the sorted set ``name`` with ranks between
- ``min`` and ``max``. Values are 0-based, ordered from smallest score
- to largest. Values can be negative indicating the highest scores.
- Returns the number of elements removed
- """
- return self.execute_command('ZREMRANGEBYRANK', name, min, max)
-
- def zremrangebyscore(self, name, min, max):
- """
- Remove all elements in the sorted set ``name`` with scores
- between ``min`` and ``max``. Returns the number of elements removed.
- """
- return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
-
- def zrevrank(self, name, value):
- """
- Returns a 0-based value indicating the descending rank of
- ``value`` in sorted set ``name``
- """
- return self.execute_command('ZREVRANK', name, value)
-
- def zscore(self, name, value):
- "Return the score of element ``value`` in sorted set ``name``"
- return self.execute_command('ZSCORE', name, value)
-
- def zunion(self, keys, aggregate=None, withscores=False):
- """
- Return the union of multiple sorted sets specified by ``keys``.
- ``keys`` can be provided as dictionary of keys and their weights.
- Scores will be aggregated based on the ``aggregate``, or SUM if
- none is provided.
- """
- return self._zaggregate('ZUNION', None, keys, aggregate,
- withscores=withscores)
-
- def zunionstore(self, dest, keys, aggregate=None):
- """
- Union multiple sorted sets specified by ``keys`` into
- a new sorted set, ``dest``. Scores in the destination will be
- aggregated based on the ``aggregate``, or SUM if none is provided.
- """
- return self._zaggregate('ZUNIONSTORE', dest, keys, aggregate)
-
- def zmscore(self, key, members):
- """
- Returns the scores associated with the specified members
- in the sorted set stored at key.
- ``members`` should be a list of the member name.
- Return type is a list of score.
- If the member does not exist, a None will be returned
- in corresponding position.
- """
- if not members:
- raise DataError('ZMSCORE members must be a non-empty list')
- pieces = [key] + members
- return self.execute_command('ZMSCORE', *pieces)
-
- def _zaggregate(self, command, dest, keys, aggregate=None,
- **options):
- pieces = [command]
- if dest is not None:
- pieces.append(dest)
- pieces.append(len(keys))
- if isinstance(keys, dict):
- keys, weights = keys.keys(), keys.values()
- else:
- weights = None
- pieces.extend(keys)
- if weights:
- pieces.append(b'WEIGHTS')
- pieces.extend(weights)
- if aggregate:
- if aggregate.upper() in ['SUM', 'MIN', 'MAX']:
- pieces.append(b'AGGREGATE')
- pieces.append(aggregate)
- else:
- raise DataError("aggregate can be sum, min or max.")
- if options.get('withscores', False):
- pieces.append(b'WITHSCORES')
- return self.execute_command(*pieces, **options)
-
- # HYPERLOGLOG COMMANDS
- def pfadd(self, name, *values):
- "Adds the specified elements to the specified HyperLogLog."
- return self.execute_command('PFADD', name, *values)
-
- def pfcount(self, *sources):
- """
- Return the approximated cardinality of
- the set observed by the HyperLogLog at key(s).
- """
- return self.execute_command('PFCOUNT', *sources)
-
- def pfmerge(self, dest, *sources):
- "Merge N different HyperLogLogs into a single one."
- return self.execute_command('PFMERGE', dest, *sources)
-
- # HASH COMMANDS
- def hdel(self, name, *keys):
- "Delete ``keys`` from hash ``name``"
- return self.execute_command('HDEL', name, *keys)
-
- def hexists(self, name, key):
- "Returns a boolean indicating if ``key`` exists within hash ``name``"
- return self.execute_command('HEXISTS', name, key)
-
- def hget(self, name, key):
- "Return the value of ``key`` within the hash ``name``"
- return self.execute_command('HGET', name, key)
-
- def hgetall(self, name):
- "Return a Python dict of the hash's name/value pairs"
- return self.execute_command('HGETALL', name)
-
- def hincrby(self, name, key, amount=1):
- "Increment the value of ``key`` in hash ``name`` by ``amount``"
- return self.execute_command('HINCRBY', name, key, amount)
-
- def hincrbyfloat(self, name, key, amount=1.0):
- """
- Increment the value of ``key`` in hash ``name`` by floating ``amount``
- """
- return self.execute_command('HINCRBYFLOAT', name, key, amount)
-
- def hkeys(self, name):
- "Return the list of keys within hash ``name``"
- return self.execute_command('HKEYS', name)
-
- def hlen(self, name):
- "Return the number of elements in hash ``name``"
- return self.execute_command('HLEN', name)
-
- def hset(self, name, key=None, value=None, mapping=None):
- """
- Set ``key`` to ``value`` within hash ``name``,
- ``mapping`` accepts a dict of key/value pairs that will be
- added to hash ``name``.
- Returns the number of fields that were added.
- """
- if key is None and not mapping:
- raise DataError("'hset' with no key value pairs")
- items = []
- if key is not None:
- items.extend((key, value))
- if mapping:
- for pair in mapping.items():
- items.extend(pair)
-
- return self.execute_command('HSET', name, *items)
-
- def hsetnx(self, name, key, value):
- """
- Set ``key`` to ``value`` within hash ``name`` if ``key`` does not
- exist. Returns 1 if HSETNX created a field, otherwise 0.
- """
- return self.execute_command('HSETNX', name, key, value)
-
- def hmset(self, name, mapping):
- """
- Set key to value within hash ``name`` for each corresponding
- key and value from the ``mapping`` dict.
- """
- warnings.warn(
- '%s.hmset() is deprecated. Use %s.hset() instead.'
- % (self.__class__.__name__, self.__class__.__name__),
- DeprecationWarning,
- stacklevel=2,
- )
- if not mapping:
- raise DataError("'hmset' with 'mapping' of length 0")
- items = []
- for pair in mapping.items():
- items.extend(pair)
- return self.execute_command('HMSET', name, *items)
-
- def hmget(self, name, keys, *args):
- "Returns a list of values ordered identically to ``keys``"
- args = list_or_args(keys, args)
- return self.execute_command('HMGET', name, *args)
-
- def hvals(self, name):
- "Return the list of values within hash ``name``"
- return self.execute_command('HVALS', name)
-
- def hstrlen(self, name, key):
- """
- Return the number of bytes stored in the value of ``key``
- within hash ``name``
- """
- return self.execute_command('HSTRLEN', name, key)
-
- def publish(self, channel, message):
- """
- Publish ``message`` on ``channel``.
- Returns the number of subscribers the message was delivered to.
- """
- return self.execute_command('PUBLISH', channel, message)
-
- def pubsub_channels(self, pattern='*'):
- """
- Return a list of channels that have at least one subscriber
- """
- return self.execute_command('PUBSUB CHANNELS', pattern)
-
- def pubsub_numpat(self):
- """
- Returns the number of subscriptions to patterns
- """
- return self.execute_command('PUBSUB NUMPAT')
-
- def pubsub_numsub(self, *args):
- """
- Return a list of (channel, number of subscribers) tuples
- for each channel given in ``*args``
- """
- return self.execute_command('PUBSUB NUMSUB', *args)
-
- def cluster(self, cluster_arg, *args):
- return self.execute_command('CLUSTER %s' % cluster_arg.upper(), *args)
-
- def replicaof(self, *args):
- """
- Update the replication settings of a redis replica, on the fly.
- Examples of valid arguments include:
- NO ONE (set no replication)
- host port (set to the host and port of a redis server)
- see: https://redis.io/commands/replicaof
- """
- return self.execute_command('REPLICAOF', *args)
-
- def eval(self, script, numkeys, *keys_and_args):
- """
- Execute the Lua ``script``, specifying the ``numkeys`` the script
- will touch and the key names and argument values in ``keys_and_args``.
- Returns the result of the script.
-
- In practice, use the object returned by ``register_script``. This
- function exists purely for Redis API completion.
- """
- return self.execute_command('EVAL', script, numkeys, *keys_and_args)
-
- def evalsha(self, sha, numkeys, *keys_and_args):
- """
- Use the ``sha`` to execute a Lua script already registered via EVAL
- or SCRIPT LOAD. Specify the ``numkeys`` the script will touch and the
- key names and argument values in ``keys_and_args``. Returns the result
- of the script.
-
- In practice, use the object returned by ``register_script``. This
- function exists purely for Redis API completion.
- """
- return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)
-
- def script_exists(self, *args):
- """
- Check if a script exists in the script cache by specifying the SHAs of
- each script as ``args``. Returns a list of boolean values indicating if
- if each already script exists in the cache.
- """
- return self.execute_command('SCRIPT EXISTS', *args)
-
- def script_debug(self, *args):
- raise NotImplementedError(
- "SCRIPT DEBUG is intentionally not implemented in the client."
- )
-
- def script_flush(self, sync_type=None):
- """Flush all scripts from the script cache.
- ``sync_type`` is by default SYNC (synchronous) but it can also be
- ASYNC.
- See: https://redis.io/commands/script-flush
- """
-
- # Redis pre 6 had no sync_type.
- if sync_type not in ["SYNC", "ASYNC", None]:
- raise DataError("SCRIPT FLUSH defaults to SYNC in redis > 6.2, or "
- "accepts SYNC/ASYNC. For older versions, "
- "of redis leave as None.")
- if sync_type is None:
- pieces = []
- else:
- pieces = [sync_type]
- return self.execute_command('SCRIPT FLUSH', *pieces)
-
- def script_kill(self):
- "Kill the currently executing Lua script"
- return self.execute_command('SCRIPT KILL')
-
- def script_load(self, script):
- "Load a Lua ``script`` into the script cache. Returns the SHA."
- return self.execute_command('SCRIPT LOAD', script)
-
- def register_script(self, script):
- """
- Register a Lua ``script`` specifying the ``keys`` it will touch.
- Returns a Script object that is callable and hides the complexity of
- deal with scripts, keys, and shas. This is the preferred way to work
- with Lua scripts.
- """
- return Script(self, script)
-
- # GEO COMMANDS
- def geoadd(self, name, values, nx=False, xx=False, ch=False):
- """
- Add the specified geospatial items to the specified key identified
- by the ``name`` argument. The Geospatial items are given as ordered
- members of the ``values`` argument, each item or place is formed by
- the triad longitude, latitude and name.
-
- Note: You can use ZREM to remove elements.
-
- ``nx`` forces ZADD to only create new elements and not to update
- scores for elements that already exist.
-
- ``xx`` forces ZADD to only update scores of elements that already
- exist. New elements will not be added.
-
- ``ch`` modifies the return value to be the numbers of elements changed.
- Changed elements include new elements that were added and elements
- whose scores changed.
- """
- if nx and xx:
- raise DataError("GEOADD allows either 'nx' or 'xx', not both")
- if len(values) % 3 != 0:
- raise DataError("GEOADD requires places with lon, lat and name"
- " values")
- pieces = [name]
- if nx:
- pieces.append('NX')
- if xx:
- pieces.append('XX')
- if ch:
- pieces.append('CH')
- pieces.extend(values)
- return self.execute_command('GEOADD', *pieces)
-
- def geodist(self, name, place1, place2, unit=None):
- """
- Return the distance between ``place1`` and ``place2`` members of the
- ``name`` key.
- The units must be one of the following : m, km mi, ft. By default
- meters are used.
- """
- pieces = [name, place1, place2]
- if unit and unit not in ('m', 'km', 'mi', 'ft'):
- raise DataError("GEODIST invalid unit")
- elif unit:
- pieces.append(unit)
- return self.execute_command('GEODIST', *pieces)
-
- def geohash(self, name, *values):
- """
- Return the geo hash string for each item of ``values`` members of
- the specified key identified by the ``name`` argument.
- """
- return self.execute_command('GEOHASH', name, *values)
-
- def geopos(self, name, *values):
- """
- Return the positions of each item of ``values`` as members of
- the specified key identified by the ``name`` argument. Each position
- is represented by the pairs lon and lat.
- """
- return self.execute_command('GEOPOS', name, *values)
-
- def georadius(self, name, longitude, latitude, radius, unit=None,
- withdist=False, withcoord=False, withhash=False, count=None,
- sort=None, store=None, store_dist=None, any=False):
- """
- Return the members of the specified key identified by the
- ``name`` argument which are within the borders of the area specified
- with the ``latitude`` and ``longitude`` location and the maximum
- distance from the center specified by the ``radius`` value.
-
- The units must be one of the following : m, km mi, ft. By default
-
- ``withdist`` indicates to return the distances of each place.
-
- ``withcoord`` indicates to return the latitude and longitude of
- each place.
-
- ``withhash`` indicates to return the geohash string of each place.
-
- ``count`` indicates to return the number of elements up to N.
-
- ``sort`` indicates to return the places in a sorted way, ASC for
- nearest to fairest and DESC for fairest to nearest.
-
- ``store`` indicates to save the places names in a sorted set named
- with a specific key, each element of the destination sorted set is
- populated with the score got from the original geo sorted set.
-
- ``store_dist`` indicates to save the places names in a sorted set
- named with a specific key, instead of ``store`` the sorted set
- destination score is set with the distance.
- """
- return self._georadiusgeneric('GEORADIUS',
- name, longitude, latitude, radius,
- unit=unit, withdist=withdist,
- withcoord=withcoord, withhash=withhash,
- count=count, sort=sort, store=store,
- store_dist=store_dist, any=any)
-
- def georadiusbymember(self, name, member, radius, unit=None,
- withdist=False, withcoord=False, withhash=False,
- count=None, sort=None, store=None, store_dist=None,
- any=False):
- """
- This command is exactly like ``georadius`` with the sole difference
- that instead of taking, as the center of the area to query, a longitude
- and latitude value, it takes the name of a member already existing
- inside the geospatial index represented by the sorted set.
- """
- return self._georadiusgeneric('GEORADIUSBYMEMBER',
- name, member, radius, unit=unit,
- withdist=withdist, withcoord=withcoord,
- withhash=withhash, count=count,
- sort=sort, store=store,
- store_dist=store_dist, any=any)
-
- def _georadiusgeneric(self, command, *args, **kwargs):
- pieces = list(args)
- if kwargs['unit'] and kwargs['unit'] not in ('m', 'km', 'mi', 'ft'):
- raise DataError("GEORADIUS invalid unit")
- elif kwargs['unit']:
- pieces.append(kwargs['unit'])
- else:
- pieces.append('m',)
-
- if kwargs['any'] and kwargs['count'] is None:
- raise DataError("``any`` can't be provided without ``count``")
-
- for arg_name, byte_repr in (
- ('withdist', 'WITHDIST'),
- ('withcoord', 'WITHCOORD'),
- ('withhash', 'WITHHASH')):
- if kwargs[arg_name]:
- pieces.append(byte_repr)
-
- if kwargs['count'] is not None:
- pieces.extend(['COUNT', kwargs['count']])
- if kwargs['any']:
- pieces.append('ANY')
-
- if kwargs['sort']:
- if kwargs['sort'] == 'ASC':
- pieces.append('ASC')
- elif kwargs['sort'] == 'DESC':
- pieces.append('DESC')
- else:
- raise DataError("GEORADIUS invalid sort")
-
- if kwargs['store'] and kwargs['store_dist']:
- raise DataError("GEORADIUS store and store_dist cant be set"
- " together")
-
- if kwargs['store']:
- pieces.extend([b'STORE', kwargs['store']])
-
- if kwargs['store_dist']:
- pieces.extend([b'STOREDIST', kwargs['store_dist']])
-
- return self.execute_command(command, *pieces, **kwargs)
-
- def geosearch(self, name, member=None, longitude=None, latitude=None,
- unit='m', radius=None, width=None, height=None, sort=None,
- count=None, any=False, withcoord=False,
- withdist=False, withhash=False):
- """
- Return the members of specified key identified by the
- ``name`` argument, which are within the borders of the
- area specified by a given shape. This command extends the
- GEORADIUS command, so in addition to searching within circular
- areas, it supports searching within rectangular areas.
- This command should be used in place of the deprecated
- GEORADIUS and GEORADIUSBYMEMBER commands.
- ``member`` Use the position of the given existing
- member in the sorted set. Can't be given with ``longitude``
- and ``latitude``.
- ``longitude`` and ``latitude`` Use the position given by
- this coordinates. Can't be given with ``member``
- ``radius`` Similar to GEORADIUS, search inside circular
- area according the given radius. Can't be given with
- ``height`` and ``width``.
- ``height`` and ``width`` Search inside an axis-aligned
- rectangle, determined by the given height and width.
- Can't be given with ``radius``
- ``unit`` must be one of the following : m, km, mi, ft.
- `m` for meters (the default value), `km` for kilometers,
- `mi` for miles and `ft` for feet.
- ``sort`` indicates to return the places in a sorted way,
- ASC for nearest to farest and DESC for farest to nearest.
- ``count`` limit the results to the first count matching items.
- ``any`` is set to True, the command will return as soon as
- enough matches are found. Can't be provided without ``count``
- ``withdist`` indicates to return the distances of each place.
- ``withcoord`` indicates to return the latitude and longitude of
- each place.
- ``withhash`` indicates to return the geohash string of each place.
- """
-
- return self._geosearchgeneric('GEOSEARCH',
- name, member=member, longitude=longitude,
- latitude=latitude, unit=unit,
- radius=radius, width=width,
- height=height, sort=sort, count=count,
- any=any, withcoord=withcoord,
- withdist=withdist, withhash=withhash,
- store=None, store_dist=None)
-
- def geosearchstore(self, dest, name, member=None, longitude=None,
- latitude=None, unit='m', radius=None, width=None,
- height=None, sort=None, count=None, any=False,
- storedist=False):
- """
- This command is like GEOSEARCH, but stores the result in
- ``dest``. By default, it stores the results in the destination
- sorted set with their geospatial information.
- if ``store_dist`` set to True, the command will stores the
- items in a sorted set populated with their distance from the
- center of the circle or box, as a floating-point number.
- """
- return self._geosearchgeneric('GEOSEARCHSTORE',
- dest, name, member=member,
- longitude=longitude, latitude=latitude,
- unit=unit, radius=radius, width=width,
- height=height, sort=sort, count=count,
- any=any, withcoord=None,
- withdist=None, withhash=None,
- store=None, store_dist=storedist)
-
- def _geosearchgeneric(self, command, *args, **kwargs):
- pieces = list(args)
-
- # FROMMEMBER or FROMLONLAT
- if kwargs['member'] is None:
- if kwargs['longitude'] is None or kwargs['latitude'] is None:
- raise DataError("GEOSEARCH must have member or"
- " longitude and latitude")
- if kwargs['member']:
- if kwargs['longitude'] or kwargs['latitude']:
- raise DataError("GEOSEARCH member and longitude or latitude"
- " cant be set together")
- pieces.extend([b'FROMMEMBER', kwargs['member']])
- if kwargs['longitude'] and kwargs['latitude']:
- pieces.extend([b'FROMLONLAT',
- kwargs['longitude'], kwargs['latitude']])
-
- # BYRADIUS or BYBOX
- if kwargs['radius'] is None:
- if kwargs['width'] is None or kwargs['height'] is None:
- raise DataError("GEOSEARCH must have radius or"
- " width and height")
- if kwargs['unit'] is None:
- raise DataError("GEOSEARCH must have unit")
- if kwargs['unit'].lower() not in ('m', 'km', 'mi', 'ft'):
- raise DataError("GEOSEARCH invalid unit")
- if kwargs['radius']:
- if kwargs['width'] or kwargs['height']:
- raise DataError("GEOSEARCH radius and width or height"
- " cant be set together")
- pieces.extend([b'BYRADIUS', kwargs['radius'], kwargs['unit']])
- if kwargs['width'] and kwargs['height']:
- pieces.extend([b'BYBOX',
- kwargs['width'], kwargs['height'], kwargs['unit']])
-
- # sort
- if kwargs['sort']:
- if kwargs['sort'].upper() == 'ASC':
- pieces.append(b'ASC')
- elif kwargs['sort'].upper() == 'DESC':
- pieces.append(b'DESC')
- else:
- raise DataError("GEOSEARCH invalid sort")
-
- # count any
- if kwargs['count']:
- pieces.extend([b'COUNT', kwargs['count']])
- if kwargs['any']:
- pieces.append(b'ANY')
- elif kwargs['any']:
- raise DataError("GEOSEARCH ``any`` can't be provided "
- "without count")
-
- # other properties
- for arg_name, byte_repr in (
- ('withdist', b'WITHDIST'),
- ('withcoord', b'WITHCOORD'),
- ('withhash', b'WITHHASH'),
- ('store_dist', b'STOREDIST')):
- if kwargs[arg_name]:
- pieces.append(byte_repr)
-
- return self.execute_command(command, *pieces, **kwargs)
-
- # MODULE COMMANDS
- def module_load(self, path, *args):
- """
- Loads the module from ``path``.
- Passes all ``*args`` to the module, during loading.
- Raises ``ModuleError`` if a module is not found at ``path``.
- """
- return self.execute_command('MODULE LOAD', path, *args)
-
- def module_unload(self, name):
- """
- Unloads the module ``name``.
- Raises ``ModuleError`` if ``name`` is not in loaded modules.
- """
- return self.execute_command('MODULE UNLOAD', name)
-
- def module_list(self):
- """
- Returns a list of dictionaries containing the name and version of
- all loaded modules.
- """
- return self.execute_command('MODULE LIST')
-
- def command_info(self):
- raise NotImplementedError(
- "COMMAND INFO is intentionally not implemented in the client."
- )
-
- def command_count(self):
- return self.execute_command('COMMAND COUNT')
-
-
-class Script:
- "An executable Lua script object returned by ``register_script``"
-
- def __init__(self, registered_client, script):
- self.registered_client = registered_client
- self.script = script
- # Precalculate and store the SHA1 hex digest of the script.
-
- if isinstance(script, str):
- # We need the encoding from the client in order to generate an
- # accurate byte representation of the script
- encoder = registered_client.connection_pool.get_encoder()
- script = encoder.encode(script)
- self.sha = hashlib.sha1(script).hexdigest()
-
- def __call__(self, keys=[], args=[], client=None):
- "Execute the script, passing any required ``args``"
- if client is None:
- client = self.registered_client
- args = tuple(keys) + tuple(args)
- # make sure the Redis server knows about the script
- from redis.client import Pipeline
- if isinstance(client, Pipeline):
- # Make sure the pipeline can register the script before executing.
- client.scripts.add(self)
- try:
- return client.evalsha(self.sha, len(keys), *args)
- except NoScriptError:
- # Maybe the client is pointed to a different server than the client
- # that created this instance?
- # Overwrite the sha just in case there was a discrepancy.
- self.sha = client.script_load(self.script)
- return client.evalsha(self.sha, len(keys), *args)
-
-
-class BitFieldOperation:
- """
- Command builder for BITFIELD commands.
- """
- def __init__(self, client, key, default_overflow=None):
- self.client = client
- self.key = key
- self._default_overflow = default_overflow
- self.reset()
-
- def reset(self):
- """
- Reset the state of the instance to when it was constructed
- """
- self.operations = []
- self._last_overflow = 'WRAP'
- self.overflow(self._default_overflow or self._last_overflow)
-
- def overflow(self, overflow):
- """
- Update the overflow algorithm of successive INCRBY operations
- :param overflow: Overflow algorithm, one of WRAP, SAT, FAIL. See the
- Redis docs for descriptions of these algorithmsself.
- :returns: a :py:class:`BitFieldOperation` instance.
- """
- overflow = overflow.upper()
- if overflow != self._last_overflow:
- self._last_overflow = overflow
- self.operations.append(('OVERFLOW', overflow))
- return self
-
- def incrby(self, fmt, offset, increment, overflow=None):
- """
- Increment a bitfield by a given amount.
- :param fmt: format-string for the bitfield being updated, e.g. 'u8'
- for an unsigned 8-bit integer.
- :param offset: offset (in number of bits). If prefixed with a
- '#', this is an offset multiplier, e.g. given the arguments
- fmt='u8', offset='#2', the offset will be 16.
- :param int increment: value to increment the bitfield by.
- :param str overflow: overflow algorithm. Defaults to WRAP, but other
- acceptable values are SAT and FAIL. See the Redis docs for
- descriptions of these algorithms.
- :returns: a :py:class:`BitFieldOperation` instance.
- """
- if overflow is not None:
- self.overflow(overflow)
-
- self.operations.append(('INCRBY', fmt, offset, increment))
- return self
-
- def get(self, fmt, offset):
- """
- Get the value of a given bitfield.
- :param fmt: format-string for the bitfield being read, e.g. 'u8' for
- an unsigned 8-bit integer.
- :param offset: offset (in number of bits). If prefixed with a
- '#', this is an offset multiplier, e.g. given the arguments
- fmt='u8', offset='#2', the offset will be 16.
- :returns: a :py:class:`BitFieldOperation` instance.
- """
- self.operations.append(('GET', fmt, offset))
- return self
-
- def set(self, fmt, offset, value):
- """
- Set the value of a given bitfield.
- :param fmt: format-string for the bitfield being read, e.g. 'u8' for
- an unsigned 8-bit integer.
- :param offset: offset (in number of bits). If prefixed with a
- '#', this is an offset multiplier, e.g. given the arguments
- fmt='u8', offset='#2', the offset will be 16.
- :param int value: value to set at the given position.
- :returns: a :py:class:`BitFieldOperation` instance.
- """
- self.operations.append(('SET', fmt, offset, value))
- return self
-
- @property
- def command(self):
- cmd = ['BITFIELD', self.key]
- for ops in self.operations:
- cmd.extend(ops)
- return cmd
-
- def execute(self):
- """
- Execute the operation(s) in a single BITFIELD command. The return value
- is a list of values corresponding to each operation. If the client
- used to create this instance was a pipeline, the list of values
- will be present within the pipeline's execute.
- """
- command = self.command
- self.reset()
- return self.client.execute_command(*command)
-
-
-class SentinelCommands:
- """
- A class containing the commands specific to redis sentinal. This class is
- to be used as a mixin.
- """
-
- def sentinel(self, *args):
- "Redis Sentinel's SENTINEL command."
- warnings.warn(
- DeprecationWarning('Use the individual sentinel_* methods'))
-
- def sentinel_get_master_addr_by_name(self, service_name):
- "Returns a (host, port) pair for the given ``service_name``"
- return self.execute_command('SENTINEL GET-MASTER-ADDR-BY-NAME',
- service_name)
-
- def sentinel_master(self, service_name):
- "Returns a dictionary containing the specified masters state."
- return self.execute_command('SENTINEL MASTER', service_name)
-
- def sentinel_masters(self):
- "Returns a list of dictionaries containing each master's state."
- return self.execute_command('SENTINEL MASTERS')
-
- def sentinel_monitor(self, name, ip, port, quorum):
- "Add a new master to Sentinel to be monitored"
- return self.execute_command('SENTINEL MONITOR', name, ip, port, quorum)
-
- def sentinel_remove(self, name):
- "Remove a master from Sentinel's monitoring"
- return self.execute_command('SENTINEL REMOVE', name)
-
- def sentinel_sentinels(self, service_name):
- "Returns a list of sentinels for ``service_name``"
- return self.execute_command('SENTINEL SENTINELS', service_name)
-
- def sentinel_set(self, name, option, value):
- "Set Sentinel monitoring parameters for a given master"
- return self.execute_command('SENTINEL SET', name, option, value)
-
- def sentinel_slaves(self, service_name):
- "Returns a list of slaves for ``service_name``"
- return self.execute_command('SENTINEL SLAVES', service_name)
-
- def sentinel_reset(self, pattern):
- """
- This command will reset all the masters with matching name.
- The pattern argument is a glob-style pattern.
-
- The reset process clears any previous state in a master (including a
- failover in progress), and removes every slave and sentinel already
- discovered and associated with the master.
- """
- return self.execute_command('SENTINEL RESET', pattern, once=True)
-
- def sentinel_failover(self, new_master_name):
- """
- Force a failover as if the master was not reachable, and without
- asking for agreement to other Sentinels (however a new version of the
- configuration will be published so that the other Sentinels will
- update their configurations).
- """
- return self.execute_command('SENTINEL FAILOVER', new_master_name)
-
- def sentinel_ckquorum(self, new_master_name):
- """
- Check if the current Sentinel configuration is able to reach the
- quorum needed to failover a master, and the majority needed to
- authorize the failover.
-
- This command should be used in monitoring systems to check if a
- Sentinel deployment is ok.
- """
- return self.execute_command('SENTINEL CKQUORUM',
- new_master_name,
- once=True)
-
- def sentinel_flushconfig(self):
- """
- Force Sentinel to rewrite its configuration on disk, including the
- current Sentinel state.
-
- Normally Sentinel rewrites the configuration every time something
- changes in its state (in the context of the subset of the state which
- is persisted on disk across restart).
- However sometimes it is possible that the configuration file is lost
- because of operation errors, disk failures, package upgrade scripts or
- configuration managers. In those cases a way to to force Sentinel to
- rewrite the configuration file is handy.
-
- This command works even if the previous configuration file is
- completely missing.
- """
- return self.execute_command('SENTINEL FLUSHCONFIG')