diff options
author | Chayim <chayim@users.noreply.github.com> | 2021-12-02 18:34:36 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-02 18:34:36 +0200 |
commit | f5d5610f0b77468bc84c3c9764a5d86ee7883410 (patch) | |
tree | 1c985280231667e1ad9cede848915b50a02940d7 /redis/commands | |
parent | c0b38584dc48f821606150d7965dca88c402192b (diff) | |
parent | d4a9825a72e1b7715d79ce8134e678d9ef537dce (diff) | |
download | redis-py-f5d5610f0b77468bc84c3c9764a5d86ee7883410.tar.gz |
Merge branch 'master' into ROLE
Diffstat (limited to 'redis/commands')
32 files changed, 3239 insertions, 1681 deletions
diff --git a/redis/commands/__init__.py b/redis/commands/__init__.py index a4728d0..07fa7f1 100644 --- a/redis/commands/__init__.py +++ b/redis/commands/__init__.py @@ -1,4 +1,4 @@ -from .cluster import ClusterCommands +from .cluster import RedisClusterCommands from .core import CoreCommands from .helpers import list_or_args from .parser import CommandsParser @@ -6,10 +6,10 @@ from .redismodules import RedisModuleCommands from .sentinel import SentinelCommands __all__ = [ - 'ClusterCommands', - 'CommandsParser', - 'CoreCommands', - 'list_or_args', - 'RedisModuleCommands', - 'SentinelCommands' + "RedisClusterCommands", + "CommandsParser", + "CoreCommands", + "list_or_args", + "RedisModuleCommands", + "SentinelCommands", ] diff --git a/redis/commands/bf/__init__.py b/redis/commands/bf/__init__.py new file mode 100644 index 0000000..f34e11d --- /dev/null +++ b/redis/commands/bf/__init__.py @@ -0,0 +1,204 @@ +from redis.client import bool_ok + +from ..helpers import parse_to_list +from .commands import * # noqa +from .info import BFInfo, CFInfo, CMSInfo, TDigestInfo, TopKInfo + + +class AbstractBloom(object): + """ + The client allows to interact with RedisBloom and use all of + it's functionality. + + - BF for Bloom Filter + - CF for Cuckoo Filter + - CMS for Count-Min Sketch + - TOPK for TopK Data Structure + - TDIGEST for estimate rank statistics + """ + + @staticmethod + def appendItems(params, items): + """Append ITEMS to params.""" + params.extend(["ITEMS"]) + params += items + + @staticmethod + def appendError(params, error): + """Append ERROR to params.""" + if error is not None: + params.extend(["ERROR", error]) + + @staticmethod + def appendCapacity(params, capacity): + """Append CAPACITY to params.""" + if capacity is not None: + params.extend(["CAPACITY", capacity]) + + @staticmethod + def appendExpansion(params, expansion): + """Append EXPANSION to params.""" + if expansion is not None: + params.extend(["EXPANSION", expansion]) + + @staticmethod + def appendNoScale(params, noScale): + """Append NONSCALING tag to params.""" + if noScale is not None: + params.extend(["NONSCALING"]) + + @staticmethod + def appendWeights(params, weights): + """Append WEIGHTS to params.""" + if len(weights) > 0: + params.append("WEIGHTS") + params += weights + + @staticmethod + def appendNoCreate(params, noCreate): + """Append NOCREATE tag to params.""" + if noCreate is not None: + params.extend(["NOCREATE"]) + + @staticmethod + def appendItemsAndIncrements(params, items, increments): + """Append pairs of items and increments to params.""" + for i in range(len(items)): + params.append(items[i]) + params.append(increments[i]) + + @staticmethod + def appendValuesAndWeights(params, items, weights): + """Append pairs of items and weights to params.""" + for i in range(len(items)): + params.append(items[i]) + params.append(weights[i]) + + @staticmethod + def appendMaxIterations(params, max_iterations): + """Append MAXITERATIONS to params.""" + if max_iterations is not None: + params.extend(["MAXITERATIONS", max_iterations]) + + @staticmethod + def appendBucketSize(params, bucket_size): + """Append BUCKETSIZE to params.""" + if bucket_size is not None: + params.extend(["BUCKETSIZE", bucket_size]) + + +class CMSBloom(CMSCommands, AbstractBloom): + def __init__(self, client, **kwargs): + """Create a new RedisBloom client.""" + # Set the module commands' callbacks + MODULE_CALLBACKS = { + CMS_INITBYDIM: bool_ok, + CMS_INITBYPROB: bool_ok, + # CMS_INCRBY: spaceHolder, + # CMS_QUERY: spaceHolder, + CMS_MERGE: bool_ok, + CMS_INFO: CMSInfo, + } + + self.client = client + self.commandmixin = CMSCommands + self.execute_command = client.execute_command + + for k, v in MODULE_CALLBACKS.items(): + self.client.set_response_callback(k, v) + + +class TOPKBloom(TOPKCommands, AbstractBloom): + def __init__(self, client, **kwargs): + """Create a new RedisBloom client.""" + # Set the module commands' callbacks + MODULE_CALLBACKS = { + TOPK_RESERVE: bool_ok, + TOPK_ADD: parse_to_list, + TOPK_INCRBY: parse_to_list, + # TOPK_QUERY: spaceHolder, + # TOPK_COUNT: spaceHolder, + TOPK_LIST: parse_to_list, + TOPK_INFO: TopKInfo, + } + + self.client = client + self.commandmixin = TOPKCommands + self.execute_command = client.execute_command + + for k, v in MODULE_CALLBACKS.items(): + self.client.set_response_callback(k, v) + + +class CFBloom(CFCommands, AbstractBloom): + def __init__(self, client, **kwargs): + """Create a new RedisBloom client.""" + # Set the module commands' callbacks + MODULE_CALLBACKS = { + CF_RESERVE: bool_ok, + # CF_ADD: spaceHolder, + # CF_ADDNX: spaceHolder, + # CF_INSERT: spaceHolder, + # CF_INSERTNX: spaceHolder, + # CF_EXISTS: spaceHolder, + # CF_DEL: spaceHolder, + # CF_COUNT: spaceHolder, + # CF_SCANDUMP: spaceHolder, + # CF_LOADCHUNK: spaceHolder, + CF_INFO: CFInfo, + } + + self.client = client + self.commandmixin = CFCommands + self.execute_command = client.execute_command + + for k, v in MODULE_CALLBACKS.items(): + self.client.set_response_callback(k, v) + + +class TDigestBloom(TDigestCommands, AbstractBloom): + def __init__(self, client, **kwargs): + """Create a new RedisBloom client.""" + # Set the module commands' callbacks + MODULE_CALLBACKS = { + TDIGEST_CREATE: bool_ok, + # TDIGEST_RESET: bool_ok, + # TDIGEST_ADD: spaceHolder, + # TDIGEST_MERGE: spaceHolder, + TDIGEST_CDF: float, + TDIGEST_QUANTILE: float, + TDIGEST_MIN: float, + TDIGEST_MAX: float, + TDIGEST_INFO: TDigestInfo, + } + + self.client = client + self.commandmixin = TDigestCommands + self.execute_command = client.execute_command + + for k, v in MODULE_CALLBACKS.items(): + self.client.set_response_callback(k, v) + + +class BFBloom(BFCommands, AbstractBloom): + def __init__(self, client, **kwargs): + """Create a new RedisBloom client.""" + # Set the module commands' callbacks + MODULE_CALLBACKS = { + BF_RESERVE: bool_ok, + # BF_ADD: spaceHolder, + # BF_MADD: spaceHolder, + # BF_INSERT: spaceHolder, + # BF_EXISTS: spaceHolder, + # BF_MEXISTS: spaceHolder, + # BF_SCANDUMP: spaceHolder, + # BF_LOADCHUNK: spaceHolder, + BF_INFO: BFInfo, + } + + self.client = client + self.commandmixin = BFCommands + self.execute_command = client.execute_command + + for k, v in MODULE_CALLBACKS.items(): + self.client.set_response_callback(k, v) diff --git a/redis/commands/bf/commands.py b/redis/commands/bf/commands.py new file mode 100644 index 0000000..3c8bf7f --- /dev/null +++ b/redis/commands/bf/commands.py @@ -0,0 +1,494 @@ +from redis.client import NEVER_DECODE +from redis.exceptions import ModuleError +from redis.utils import HIREDIS_AVAILABLE + +BF_RESERVE = "BF.RESERVE" +BF_ADD = "BF.ADD" +BF_MADD = "BF.MADD" +BF_INSERT = "BF.INSERT" +BF_EXISTS = "BF.EXISTS" +BF_MEXISTS = "BF.MEXISTS" +BF_SCANDUMP = "BF.SCANDUMP" +BF_LOADCHUNK = "BF.LOADCHUNK" +BF_INFO = "BF.INFO" + +CF_RESERVE = "CF.RESERVE" +CF_ADD = "CF.ADD" +CF_ADDNX = "CF.ADDNX" +CF_INSERT = "CF.INSERT" +CF_INSERTNX = "CF.INSERTNX" +CF_EXISTS = "CF.EXISTS" +CF_DEL = "CF.DEL" +CF_COUNT = "CF.COUNT" +CF_SCANDUMP = "CF.SCANDUMP" +CF_LOADCHUNK = "CF.LOADCHUNK" +CF_INFO = "CF.INFO" + +CMS_INITBYDIM = "CMS.INITBYDIM" +CMS_INITBYPROB = "CMS.INITBYPROB" +CMS_INCRBY = "CMS.INCRBY" +CMS_QUERY = "CMS.QUERY" +CMS_MERGE = "CMS.MERGE" +CMS_INFO = "CMS.INFO" + +TOPK_RESERVE = "TOPK.RESERVE" +TOPK_ADD = "TOPK.ADD" +TOPK_INCRBY = "TOPK.INCRBY" +TOPK_QUERY = "TOPK.QUERY" +TOPK_COUNT = "TOPK.COUNT" +TOPK_LIST = "TOPK.LIST" +TOPK_INFO = "TOPK.INFO" + +TDIGEST_CREATE = "TDIGEST.CREATE" +TDIGEST_RESET = "TDIGEST.RESET" +TDIGEST_ADD = "TDIGEST.ADD" +TDIGEST_MERGE = "TDIGEST.MERGE" +TDIGEST_CDF = "TDIGEST.CDF" +TDIGEST_QUANTILE = "TDIGEST.QUANTILE" +TDIGEST_MIN = "TDIGEST.MIN" +TDIGEST_MAX = "TDIGEST.MAX" +TDIGEST_INFO = "TDIGEST.INFO" + + +class BFCommands: + """RedisBloom commands.""" + + # region Bloom Filter Functions + def create(self, key, errorRate, capacity, expansion=None, noScale=None): + """ + Create a new Bloom Filter `key` with desired probability of false positives + `errorRate` expected entries to be inserted as `capacity`. + Default expansion value is 2. By default, filter is auto-scaling. + For more information see `BF.RESERVE <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfreserve>`_. + """ # noqa + params = [key, errorRate, capacity] + self.appendExpansion(params, expansion) + self.appendNoScale(params, noScale) + return self.execute_command(BF_RESERVE, *params) + + def add(self, key, item): + """ + Add to a Bloom Filter `key` an `item`. + For more information see `BF.ADD <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfadd>`_. + """ # noqa + params = [key, item] + return self.execute_command(BF_ADD, *params) + + def madd(self, key, *items): + """ + Add to a Bloom Filter `key` multiple `items`. + For more information see `BF.MADD <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfmadd>`_. + """ # noqa + params = [key] + params += items + return self.execute_command(BF_MADD, *params) + + def insert( + self, + key, + items, + capacity=None, + error=None, + noCreate=None, + expansion=None, + noScale=None, + ): + """ + Add to a Bloom Filter `key` multiple `items`. + + If `nocreate` remain `None` and `key` does not exist, a new Bloom Filter + `key` will be created with desired probability of false positives `errorRate` + and expected entries to be inserted as `size`. + For more information see `BF.INSERT <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfinsert>`_. + """ # noqa + params = [key] + self.appendCapacity(params, capacity) + self.appendError(params, error) + self.appendExpansion(params, expansion) + self.appendNoCreate(params, noCreate) + self.appendNoScale(params, noScale) + self.appendItems(params, items) + + return self.execute_command(BF_INSERT, *params) + + def exists(self, key, item): + """ + Check whether an `item` exists in Bloom Filter `key`. + For more information see `BF.EXISTS <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfexists>`_. + """ # noqa + params = [key, item] + return self.execute_command(BF_EXISTS, *params) + + def mexists(self, key, *items): + """ + Check whether `items` exist in Bloom Filter `key`. + For more information see `BF.MEXISTS <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfmexists>`_. + """ # noqa + params = [key] + params += items + return self.execute_command(BF_MEXISTS, *params) + + def scandump(self, key, iter): + """ + Begin an incremental save of the bloom filter `key`. + + This is useful for large bloom filters which cannot fit into the normal SAVE and RESTORE model. + The first time this command is called, the value of `iter` should be 0. + This command will return successive (iter, data) pairs until (0, NULL) to indicate completion. + For more information see `BF.SCANDUMP <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfscandump>`_. + """ # noqa + if HIREDIS_AVAILABLE: + raise ModuleError("This command cannot be used when hiredis is available.") + + params = [key, iter] + options = {} + options[NEVER_DECODE] = [] + return self.execute_command(BF_SCANDUMP, *params, **options) + + def loadchunk(self, key, iter, data): + """ + Restore a filter previously saved using SCANDUMP. + + See the SCANDUMP command for example usage. + This command will overwrite any bloom filter stored under key. + Ensure that the bloom filter will not be modified between invocations. + For more information see `BF.LOADCHUNK <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfloadchunk>`_. + """ # noqa + params = [key, iter, data] + return self.execute_command(BF_LOADCHUNK, *params) + + def info(self, key): + """ + Return capacity, size, number of filters, number of items inserted, and expansion rate. + For more information see `BF.INFO <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfinfo>`_. + """ # noqa + return self.execute_command(BF_INFO, key) + + +class CFCommands: + + # region Cuckoo Filter Functions + def create( + self, key, capacity, expansion=None, bucket_size=None, max_iterations=None + ): + """ + Create a new Cuckoo Filter `key` an initial `capacity` items. + For more information see `CF.RESERVE <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfreserve>`_. + """ # noqa + params = [key, capacity] + self.appendExpansion(params, expansion) + self.appendBucketSize(params, bucket_size) + self.appendMaxIterations(params, max_iterations) + return self.execute_command(CF_RESERVE, *params) + + def add(self, key, item): + """ + Add an `item` to a Cuckoo Filter `key`. + For more information see `CF.ADD <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfadd>`_. + """ # noqa + params = [key, item] + return self.execute_command(CF_ADD, *params) + + def addnx(self, key, item): + """ + Add an `item` to a Cuckoo Filter `key` only if item does not yet exist. + Command might be slower that `add`. + For more information see `CF.ADDNX <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfaddnx>`_. + """ # noqa + params = [key, item] + return self.execute_command(CF_ADDNX, *params) + + def insert(self, key, items, capacity=None, nocreate=None): + """ + Add multiple `items` to a Cuckoo Filter `key`, allowing the filter + to be created with a custom `capacity` if it does not yet exist. + `items` must be provided as a list. + For more information see `CF.INSERT <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfinsert>`_. + """ # noqa + params = [key] + self.appendCapacity(params, capacity) + self.appendNoCreate(params, nocreate) + self.appendItems(params, items) + return self.execute_command(CF_INSERT, *params) + + def insertnx(self, key, items, capacity=None, nocreate=None): + """ + Add multiple `items` to a Cuckoo Filter `key` only if they do not exist yet, + allowing the filter to be created with a custom `capacity` if it does not yet exist. + `items` must be provided as a list. + For more information see `CF.INSERTNX <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfinsertnx>`_. + """ # noqa + params = [key] + self.appendCapacity(params, capacity) + self.appendNoCreate(params, nocreate) + self.appendItems(params, items) + return self.execute_command(CF_INSERTNX, *params) + + def exists(self, key, item): + """ + Check whether an `item` exists in Cuckoo Filter `key`. + For more information see `CF.EXISTS <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfexists>`_. + """ # noqa + params = [key, item] + return self.execute_command(CF_EXISTS, *params) + + def delete(self, key, item): + """ + Delete `item` from `key`. + For more information see `CF.DEL <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfdel>`_. + """ # noqa + params = [key, item] + return self.execute_command(CF_DEL, *params) + + def count(self, key, item): + """ + Return the number of times an `item` may be in the `key`. + For more information see `CF.COUNT <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfcount>`_. + """ # noqa + params = [key, item] + return self.execute_command(CF_COUNT, *params) + + def scandump(self, key, iter): + """ + Begin an incremental save of the Cuckoo filter `key`. + + This is useful for large Cuckoo filters which cannot fit into the normal + SAVE and RESTORE model. + The first time this command is called, the value of `iter` should be 0. + This command will return successive (iter, data) pairs until + (0, NULL) to indicate completion. + For more information see `CF.SCANDUMP <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfscandump>`_. + """ # noqa + params = [key, iter] + return self.execute_command(CF_SCANDUMP, *params) + + def loadchunk(self, key, iter, data): + """ + Restore a filter previously saved using SCANDUMP. See the SCANDUMP command for example usage. + + This command will overwrite any Cuckoo filter stored under key. + Ensure that the Cuckoo filter will not be modified between invocations. + For more information see `CF.LOADCHUNK <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfloadchunk>`_. + """ # noqa + params = [key, iter, data] + return self.execute_command(CF_LOADCHUNK, *params) + + def info(self, key): + """ + Return size, number of buckets, number of filter, number of items inserted, + number of items deleted, bucket size, expansion rate, and max iteration. + For more information see `CF.INFO <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfinfo>`_. + """ # noqa + return self.execute_command(CF_INFO, key) + + +class TOPKCommands: + def reserve(self, key, k, width, depth, decay): + """ + Create a new Top-K Filter `key` with desired probability of false + positives `errorRate` expected entries to be inserted as `size`. + For more information see `TOPK.RESERVE <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkreserve>`_. + """ # noqa + params = [key, k, width, depth, decay] + return self.execute_command(TOPK_RESERVE, *params) + + def add(self, key, *items): + """ + Add one `item` or more to a Top-K Filter `key`. + For more information see `TOPK.ADD <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkadd>`_. + """ # noqa + params = [key] + params += items + return self.execute_command(TOPK_ADD, *params) + + def incrby(self, key, items, increments): + """ + Add/increase `items` to a Top-K Sketch `key` by ''increments''. + Both `items` and `increments` are lists. + For more information see `TOPK.INCRBY <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkincrby>`_. + + Example: + + >>> topkincrby('A', ['foo'], [1]) + """ # noqa + params = [key] + self.appendItemsAndIncrements(params, items, increments) + return self.execute_command(TOPK_INCRBY, *params) + + def query(self, key, *items): + """ + Check whether one `item` or more is a Top-K item at `key`. + For more information see `TOPK.QUERY <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkquery>`_. + """ # noqa + params = [key] + params += items + return self.execute_command(TOPK_QUERY, *params) + + def count(self, key, *items): + """ + Return count for one `item` or more from `key`. + For more information see `TOPK.COUNT <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkcount>`_. + """ # noqa + params = [key] + params += items + return self.execute_command(TOPK_COUNT, *params) + + def list(self, key, withcount=False): + """ + Return full list of items in Top-K list of `key`. + If `withcount` set to True, return full list of items + with probabilistic count in Top-K list of `key`. + For more information see `TOPK.LIST <https://oss.redis.com/redisbloom/master/TopK_Commands/#topklist>`_. + """ # noqa + params = [key] + if withcount: + params.append("WITHCOUNT") + return self.execute_command(TOPK_LIST, *params) + + def info(self, key): + """ + Return k, width, depth and decay values of `key`. + For more information see `TOPK.INFO <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkinfo>`_. + """ # noqa + return self.execute_command(TOPK_INFO, key) + + +class TDigestCommands: + def create(self, key, compression): + """ + Allocate the memory and initialize the t-digest. + For more information see `TDIGEST.CREATE <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestcreate>`_. + """ # noqa + params = [key, compression] + return self.execute_command(TDIGEST_CREATE, *params) + + def reset(self, key): + """ + Reset the sketch `key` to zero - empty out the sketch and re-initialize it. + For more information see `TDIGEST.RESET <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestreset>`_. + """ # noqa + return self.execute_command(TDIGEST_RESET, key) + + def add(self, key, values, weights): + """ + Add one or more samples (value with weight) to a sketch `key`. + Both `values` and `weights` are lists. + For more information see `TDIGEST.ADD <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestadd>`_. + + Example: + + >>> tdigestadd('A', [1500.0], [1.0]) + """ # noqa + params = [key] + self.appendValuesAndWeights(params, values, weights) + return self.execute_command(TDIGEST_ADD, *params) + + def merge(self, toKey, fromKey): + """ + Merge all of the values from 'fromKey' to 'toKey' sketch. + For more information see `TDIGEST.MERGE <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestmerge>`_. + """ # noqa + params = [toKey, fromKey] + return self.execute_command(TDIGEST_MERGE, *params) + + def min(self, key): + """ + Return minimum value from the sketch `key`. Will return DBL_MAX if the sketch is empty. + For more information see `TDIGEST.MIN <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestmin>`_. + """ # noqa + return self.execute_command(TDIGEST_MIN, key) + + def max(self, key): + """ + Return maximum value from the sketch `key`. Will return DBL_MIN if the sketch is empty. + For more information see `TDIGEST.MAX <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestmax>`_. + """ # noqa + return self.execute_command(TDIGEST_MAX, key) + + def quantile(self, key, quantile): + """ + Return double value estimate of the cutoff such that a specified fraction of the data + added to this TDigest would be less than or equal to the cutoff. + For more information see `TDIGEST.QUANTILE <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestquantile>`_. + """ # noqa + params = [key, quantile] + return self.execute_command(TDIGEST_QUANTILE, *params) + + def cdf(self, key, value): + """ + Return double fraction of all points added which are <= value. + For more information see `TDIGEST.CDF <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestcdf>`_. + """ # noqa + params = [key, value] + return self.execute_command(TDIGEST_CDF, *params) + + def info(self, key): + """ + Return Compression, Capacity, Merged Nodes, Unmerged Nodes, Merged Weight, Unmerged Weight + and Total Compressions. + For more information see `TDIGEST.INFO <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestinfo>`_. + """ # noqa + return self.execute_command(TDIGEST_INFO, key) + + +class CMSCommands: + + # region Count-Min Sketch Functions + def initbydim(self, key, width, depth): + """ + Initialize a Count-Min Sketch `key` to dimensions (`width`, `depth`) specified by user. + For more information see `CMS.INITBYDIM <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsinitbydim>`_. + """ # noqa + params = [key, width, depth] + return self.execute_command(CMS_INITBYDIM, *params) + + def initbyprob(self, key, error, probability): + """ + Initialize a Count-Min Sketch `key` to characteristics (`error`, `probability`) specified by user. + For more information see `CMS.INITBYPROB <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsinitbyprob>`_. + """ # noqa + params = [key, error, probability] + return self.execute_command(CMS_INITBYPROB, *params) + + def incrby(self, key, items, increments): + """ + Add/increase `items` to a Count-Min Sketch `key` by ''increments''. + Both `items` and `increments` are lists. + For more information see `CMS.INCRBY <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsincrby>`_. + + Example: + + >>> cmsincrby('A', ['foo'], [1]) + """ # noqa + params = [key] + self.appendItemsAndIncrements(params, items, increments) + return self.execute_command(CMS_INCRBY, *params) + + def query(self, key, *items): + """ + Return count for an `item` from `key`. Multiple items can be queried with one call. + For more information see `CMS.QUERY <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsquery>`_. + """ # noqa + params = [key] + params += items + return self.execute_command(CMS_QUERY, *params) + + def merge(self, destKey, numKeys, srcKeys, weights=[]): + """ + Merge `numKeys` of sketches into `destKey`. Sketches specified in `srcKeys`. + All sketches must have identical width and depth. + `Weights` can be used to multiply certain sketches. Default weight is 1. + Both `srcKeys` and `weights` are lists. + For more information see `CMS.MERGE <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsmerge>`_. + """ # noqa + params = [destKey, numKeys] + params += srcKeys + self.appendWeights(params, weights) + return self.execute_command(CMS_MERGE, *params) + + def info(self, key): + """ + Return width, depth and total count of the sketch. + For more information see `CMS.INFO <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsinfo>`_. + """ # noqa + return self.execute_command(CMS_INFO, key) diff --git a/redis/commands/bf/info.py b/redis/commands/bf/info.py new file mode 100644 index 0000000..24c5419 --- /dev/null +++ b/redis/commands/bf/info.py @@ -0,0 +1,85 @@ +from ..helpers import nativestr + + +class BFInfo(object): + capacity = None + size = None + filterNum = None + insertedNum = None + expansionRate = None + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.capacity = response["Capacity"] + self.size = response["Size"] + self.filterNum = response["Number of filters"] + self.insertedNum = response["Number of items inserted"] + self.expansionRate = response["Expansion rate"] + + +class CFInfo(object): + size = None + bucketNum = None + filterNum = None + insertedNum = None + deletedNum = None + bucketSize = None + expansionRate = None + maxIteration = None + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.size = response["Size"] + self.bucketNum = response["Number of buckets"] + self.filterNum = response["Number of filters"] + self.insertedNum = response["Number of items inserted"] + self.deletedNum = response["Number of items deleted"] + self.bucketSize = response["Bucket size"] + self.expansionRate = response["Expansion rate"] + self.maxIteration = response["Max iterations"] + + +class CMSInfo(object): + width = None + depth = None + count = None + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.width = response["width"] + self.depth = response["depth"] + self.count = response["count"] + + +class TopKInfo(object): + k = None + width = None + depth = None + decay = None + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.k = response["k"] + self.width = response["width"] + self.depth = response["depth"] + self.decay = response["decay"] + + +class TDigestInfo(object): + compression = None + capacity = None + mergedNodes = None + unmergedNodes = None + mergedWeight = None + unmergedWeight = None + totalCompressions = None + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.compression = response["Compression"] + self.capacity = response["Capacity"] + self.mergedNodes = response["Merged nodes"] + self.unmergedNodes = response["Unmerged nodes"] + self.mergedWeight = response["Merged weight"] + self.unmergedWeight = response["Unmerged weight"] + self.totalCompressions = response["Total compressions"] diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index e6b0a08..5d0e804 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -1,10 +1,7 @@ -from redis.exceptions import ( - ConnectionError, - DataError, - RedisError, -) from redis.crc import key_slot -from .core import DataAccessCommands +from redis.exceptions import RedisClusterException, RedisError + +from .core import ACLCommands, DataAccessCommands, ManagementCommands, PubSubCommands from .helpers import list_or_args @@ -36,6 +33,7 @@ class ClusterMultiKeyCommands: """ from redis.client import EMPTY_RESPONSE + options = {} if not args: options[EMPTY_RESPONSE] = [] @@ -50,8 +48,7 @@ class ClusterMultiKeyCommands: # We must make sure that the keys are returned in order all_results = {} for slot_keys in slots_to_keys.values(): - slot_values = self.execute_command( - 'MGET', *slot_keys, **options) + slot_values = self.execute_command("MGET", *slot_keys, **options) slot_results = dict(zip(slot_keys, slot_values)) all_results.update(slot_results) @@ -83,7 +80,7 @@ class ClusterMultiKeyCommands: # the results (one result per slot) res = [] for pairs in slots_to_pairs.values(): - res.append(self.execute_command('MSET', *pairs)) + res.append(self.execute_command("MSET", *pairs)) return res @@ -108,7 +105,7 @@ class ClusterMultiKeyCommands: whole cluster. The keys are first split up into slots and then an EXISTS command is sent for every slot """ - return self._split_command_across_slots('EXISTS', *keys) + return self._split_command_across_slots("EXISTS", *keys) def delete(self, *keys): """ @@ -119,7 +116,7 @@ class ClusterMultiKeyCommands: Non-existant keys are ignored. Returns the number of keys that were deleted. """ - return self._split_command_across_slots('DEL', *keys) + return self._split_command_across_slots("DEL", *keys) def touch(self, *keys): """ @@ -132,7 +129,7 @@ class ClusterMultiKeyCommands: Non-existant keys are ignored. Returns the number of keys that were touched. """ - return self._split_command_across_slots('TOUCH', *keys) + return self._split_command_across_slots("TOUCH", *keys) def unlink(self, *keys): """ @@ -144,600 +141,91 @@ class ClusterMultiKeyCommands: Non-existant keys are ignored. Returns the number of keys that were unlinked. """ - return self._split_command_across_slots('UNLINK', *keys) + return self._split_command_across_slots("UNLINK", *keys) -class ClusterManagementCommands: +class ClusterManagementCommands(ManagementCommands): """ - Redis Cluster management commands - - Commands with the 'target_nodes' argument can be executed on specified - nodes. By default, if target_nodes is not specified, the command will be - executed on the default cluster node. - - :param :target_nodes: type can be one of the followings: - - nodes flag: 'all', 'primaries', 'replicas', 'random' - - 'ClusterNode' - - 'list(ClusterNodes)' - - 'dict(any:clusterNodes)' + A class for Redis Cluster management commands - for example: - primary = r.get_primaries()[0] - r.bgsave(target_nodes=primary) - r.bgsave(target_nodes='primaries') + The class inherits from Redis's core ManagementCommands class and do the + required adjustments to work with cluster mode """ - def bgsave(self, schedule=True, target_nodes=None): - """ - Tell the Redis server to save its data to disk. Unlike save(), - this method is asynchronous and returns immediately. - """ - pieces = [] - if schedule: - pieces.append("SCHEDULE") - return self.execute_command('BGSAVE', - *pieces, - target_nodes=target_nodes) - - def client_getname(self, target_nodes=None): - """ - Returns the current connection name from all nodes. - The result will be a dictionary with the IP and - connection name. - """ - return self.execute_command('CLIENT GETNAME', - target_nodes=target_nodes) - - def client_getredir(self, target_nodes=None): - """Returns the ID (an integer) of the client to whom we are - redirecting tracking notifications. - - see: https://redis.io/commands/client-getredir - """ - return self.execute_command('CLIENT GETREDIR', - target_nodes=target_nodes) - - def client_id(self, target_nodes=None): - """Returns the current connection id""" - return self.execute_command('CLIENT ID', - target_nodes=target_nodes) - - def client_info(self, target_nodes=None): - """ - Returns information and statistics about the current - client connection. - """ - return self.execute_command('CLIENT INFO', - target_nodes=target_nodes) - - def client_kill_filter(self, _id=None, _type=None, addr=None, - skipme=None, laddr=None, user=None, - target_nodes=None): - """ - Disconnects client(s) using a variety of filter options - :param id: Kills a client by its unique ID field - :param type: Kills a client by type where type is one of 'normal', - 'master', 'slave' or 'pubsub' - :param addr: Kills a client by its 'address:port' - :param skipme: If True, then the client calling the command - will not get killed even if it is identified by one of the filter - options. If skipme is not provided, the server defaults to skipme=True - :param laddr: Kills a client by its 'local (bind) address:port' - :param user: Kills a client for a specific user name - """ - args = [] - if _type is not None: - client_types = ('normal', 'master', 'slave', 'pubsub') - if str(_type).lower() not in client_types: - raise DataError(f"CLIENT KILL type must be one of {client_types!r}") - args.extend((b'TYPE', _type)) - if skipme is not None: - if not isinstance(skipme, bool): - raise DataError("CLIENT KILL skipme must be a bool") - if skipme: - args.extend((b'SKIPME', b'YES')) - else: - args.extend((b'SKIPME', b'NO')) - if _id is not None: - args.extend((b'ID', _id)) - if addr is not None: - args.extend((b'ADDR', addr)) - if laddr is not None: - args.extend((b'LADDR', laddr)) - if user is not None: - args.extend((b'USER', user)) - if not args: - raise DataError("CLIENT KILL <filter> <value> ... ... <filter> " - "<value> must specify at least one filter") - return self.execute_command('CLIENT KILL', *args, - target_nodes=target_nodes) - - def client_kill(self, address, target_nodes=None): - "Disconnects the client at ``address`` (ip:port)" - return self.execute_command('CLIENT KILL', address, - target_nodes=target_nodes) - - def client_list(self, _type=None, target_nodes=None): - """ - Returns a list of currently connected clients to the entire cluster. - If type of client specified, only that type will be returned. - :param _type: optional. one of the client types (normal, master, - replica, pubsub) - """ - if _type is not None: - client_types = ('normal', 'master', 'replica', 'pubsub') - if str(_type).lower() not in client_types: - raise DataError(f"CLIENT LIST _type must be one of {client_types!r}") - return self.execute_command('CLIENT LIST', - b'TYPE', - _type, - target_noes=target_nodes) - return self.execute_command('CLIENT LIST', - target_nodes=target_nodes) - - def client_pause(self, timeout, target_nodes=None): - """ - Suspend all the Redis clients for the specified amount of time - :param timeout: milliseconds to pause clients - """ - if not isinstance(timeout, int): - raise DataError("CLIENT PAUSE timeout must be an integer") - return self.execute_command('CLIENT PAUSE', str(timeout), - target_nodes=target_nodes) - - def client_reply(self, reply, target_nodes=None): - """Enable and disable redis server replies. - ``reply`` Must be ON OFF or SKIP, - ON - The default most with server replies to commands - OFF - Disable server responses to commands - SKIP - Skip the response of the immediately following command. - - Note: When setting OFF or SKIP replies, you will need a client object - with a timeout specified in seconds, and will need to catch the - TimeoutError. - The test_client_reply unit test illustrates this, and - conftest.py has a client with a timeout. - See https://redis.io/commands/client-reply - """ - replies = ['ON', 'OFF', 'SKIP'] - if reply not in replies: - raise DataError(f'CLIENT REPLY must be one of {replies!r}') - return self.execute_command("CLIENT REPLY", reply, - target_nodes=target_nodes) - - def client_setname(self, name, target_nodes=None): - "Sets the current connection name" - return self.execute_command('CLIENT SETNAME', name, - target_nodes=target_nodes) - - def client_trackinginfo(self, target_nodes=None): - """ - Returns the information about the current client connection's - use of the server assisted client side cache. - See https://redis.io/commands/client-trackinginfo - """ - return self.execute_command('CLIENT TRACKINGINFO', - target_nodes=target_nodes) - - def client_unblock(self, client_id, error=False, target_nodes=None): - """ - Unblocks a connection by its client id. - If ``error`` is True, unblocks the client with a special error message. - If ``error`` is False (default), the client is unblocked using the - regular timeout mechanism. - """ - args = ['CLIENT UNBLOCK', int(client_id)] - if error: - args.append(b'ERROR') - return self.execute_command(*args, target_nodes=target_nodes) - def client_unpause(self, target_nodes=None): - """ - Unpause all redis clients - """ - return self.execute_command('CLIENT UNPAUSE', - target_nodes=target_nodes) - - def command(self, target_nodes=None): - """ - Returns dict reply of details about all Redis commands. - """ - return self.execute_command('COMMAND', target_nodes=target_nodes) - - def command_count(self, target_nodes=None): - """ - Returns Integer reply of number of total commands in this Redis server. - """ - return self.execute_command('COMMAND COUNT', target_nodes=target_nodes) - - def config_get(self, pattern="*", target_nodes=None): - """ - Return a dictionary of configuration based on the ``pattern`` - """ - return self.execute_command('CONFIG GET', - pattern, - target_nodes=target_nodes) - - def config_resetstat(self, target_nodes=None): - """Reset runtime statistics""" - return self.execute_command('CONFIG RESETSTAT', - target_nodes=target_nodes) - - def config_rewrite(self, target_nodes=None): - """ - Rewrite config file with the minimal change to reflect running config. - """ - return self.execute_command('CONFIG REWRITE', - target_nodes=target_nodes) - - def config_set(self, name, value, target_nodes=None): - "Set config item ``name`` with ``value``" - return self.execute_command('CONFIG SET', - name, - value, - target_nodes=target_nodes) - - def dbsize(self, target_nodes=None): - """ - Sums the number of keys in the target nodes' DB. - - :target_nodes: 'ClusterNode' or 'list(ClusterNodes)' - The node/s to execute the command on - """ - return self.execute_command('DBSIZE', - target_nodes=target_nodes) - - def debug_object(self, key): - raise NotImplementedError( - "DEBUG OBJECT is intentionally not implemented in the client." - ) - - def debug_segfault(self): - raise NotImplementedError( - "DEBUG SEGFAULT is intentionally not implemented in the client." - ) + def slaveof(self, *args, **kwargs): + raise RedisClusterException("SLAVEOF is not supported in cluster mode") - def echo(self, value, target_nodes): - """Echo the string back from the server""" - return self.execute_command('ECHO', value, - target_nodes=target_nodes) + def replicaof(self, *args, **kwargs): + raise RedisClusterException("REPLICAOF is not supported in cluster" " mode") - def flushall(self, asynchronous=False, target_nodes=None): - """ - Delete all keys in the database. - In cluster mode this method is the same as flushdb + def swapdb(self, *args, **kwargs): + raise RedisClusterException("SWAPDB is not supported in cluster" " mode") - ``asynchronous`` indicates whether the operation is - executed asynchronously by the server. - """ - args = [] - if asynchronous: - args.append(b'ASYNC') - return self.execute_command('FLUSHALL', - *args, - target_nodes=target_nodes) - - def flushdb(self, asynchronous=False, target_nodes=None): - """ - Delete all keys in the database. - ``asynchronous`` indicates whether the operation is - executed asynchronously by the server. - """ - args = [] - if asynchronous: - args.append(b'ASYNC') - return self.execute_command('FLUSHDB', - *args, - target_nodes=target_nodes) - - def info(self, section=None, target_nodes=None): - """ - Returns a dictionary containing information about the Redis server - - The ``section`` option can be used to select a specific section - of information - - The section option is not supported by older versions of Redis Server, - and will generate ResponseError - """ - if section is None: - return self.execute_command('INFO', - target_nodes=target_nodes) - else: - return self.execute_command('INFO', - section, - target_nodes=target_nodes) - - def keys(self, pattern='*', target_nodes=None): - "Returns a list of keys matching ``pattern``" - return self.execute_command('KEYS', pattern, target_nodes=target_nodes) - - def lastsave(self, target_nodes=None): - """ - Return a Python datetime object representing the last time the - Redis database was saved to disk - """ - return self.execute_command('LASTSAVE', - target_nodes=target_nodes) +class ClusterDataAccessCommands(DataAccessCommands): + """ + A class for Redis Cluster Data Access Commands - def memory_doctor(self): - raise NotImplementedError( - "MEMORY DOCTOR is intentionally not implemented in the client." - ) + The class inherits from Redis's core DataAccessCommand class and do the + required adjustments to work with cluster mode + """ - def memory_help(self): - raise NotImplementedError( - "MEMORY HELP is intentionally not implemented in the client." + def stralgo( + self, + algo, + value1, + value2, + specific_argument="strings", + len=False, + idx=False, + minmatchlen=None, + withmatchlen=False, + **kwargs, + ): + target_nodes = kwargs.pop("target_nodes", None) + if specific_argument == "strings" and target_nodes is None: + target_nodes = "default-node" + kwargs.update({"target_nodes": target_nodes}) + return super().stralgo( + algo, + value1, + value2, + specific_argument, + len, + idx, + minmatchlen, + withmatchlen, + **kwargs, ) - def memory_malloc_stats(self, target_nodes=None): - """Return an internal statistics report from the memory allocator.""" - return self.execute_command('MEMORY MALLOC-STATS', - target_nodes=target_nodes) - def memory_purge(self, target_nodes=None): - """Attempts to purge dirty pages for reclamation by allocator""" - return self.execute_command('MEMORY PURGE', - target_nodes=target_nodes) - - def memory_stats(self, target_nodes=None): - """Return a dictionary of memory stats""" - return self.execute_command('MEMORY STATS', - target_nodes=target_nodes) - - def memory_usage(self, key, samples=None): - """ - Return the total memory usage for key, its value and associated - administrative overheads. - - For nested data structures, ``samples`` is the number of elements to - sample. If left unspecified, the server's default is 5. Use 0 to sample - all elements. - """ - args = [] - if isinstance(samples, int): - args.extend([b'SAMPLES', samples]) - return self.execute_command('MEMORY USAGE', key, *args) - - def object(self, infotype, key): - """Return the encoding, idletime, or refcount about the key""" - return self.execute_command('OBJECT', infotype, key, infotype=infotype) - - def ping(self, target_nodes=None): - """ - Ping the cluster's servers. - If no target nodes are specified, sent to all nodes and returns True if - the ping was successful across all nodes. - """ - return self.execute_command('PING', - target_nodes=target_nodes) - - def randomkey(self, target_nodes=None): - """ - Returns the name of a random key" - """ - return self.execute_command('RANDOMKEY', target_nodes=target_nodes) - - def save(self, target_nodes=None): - """ - Tell the Redis server to save its data to disk, - blocking until the save is complete - """ - return self.execute_command('SAVE', target_nodes=target_nodes) - - def scan(self, cursor=0, match=None, count=None, _type=None, - target_nodes=None): - """ - Incrementally return lists of key names. Also return a cursor - indicating the scan position. - - ``match`` allows for filtering the keys by pattern - - ``count`` provides a hint to Redis about the number of keys to - return per batch. - - ``_type`` filters the returned values by a particular Redis type. - Stock Redis instances allow for the following types: - HASH, LIST, SET, STREAM, STRING, ZSET - Additionally, Redis modules can expose other types as well. - """ - pieces = [cursor] - if match is not None: - pieces.extend([b'MATCH', match]) - if count is not None: - pieces.extend([b'COUNT', count]) - if _type is not None: - pieces.extend([b'TYPE', _type]) - return self.execute_command('SCAN', *pieces, target_nodes=target_nodes) - - def scan_iter(self, match=None, count=None, _type=None, target_nodes=None): - """ - Make an iterator using the SCAN command so that the client doesn't - need to remember the cursor position. - - ``match`` allows for filtering the keys by pattern - - ``count`` provides a hint to Redis about the number of keys to - return per batch. - - ``_type`` filters the returned values by a particular Redis type. - Stock Redis instances allow for the following types: - HASH, LIST, SET, STREAM, STRING, ZSET - Additionally, Redis modules can expose other types as well. - """ - cursor = '0' - while cursor != 0: - cursor, data = self.scan(cursor=cursor, match=match, - count=count, _type=_type, - target_nodes=target_nodes) - yield from data - - def shutdown(self, save=False, nosave=False, target_nodes=None): - """Shutdown the Redis server. If Redis has persistence configured, - data will be flushed before shutdown. If the "save" option is set, - a data flush will be attempted even if there is no persistence - configured. If the "nosave" option is set, no data flush will be - attempted. The "save" and "nosave" options cannot both be set. - """ - if save and nosave: - raise DataError('SHUTDOWN save and nosave cannot both be set') - args = ['SHUTDOWN'] - if save: - args.append('SAVE') - if nosave: - args.append('NOSAVE') - try: - self.execute_command(*args, target_nodes=target_nodes) - except ConnectionError: - # a ConnectionError here is expected - return - raise RedisError("SHUTDOWN seems to have failed.") - - def slowlog_get(self, num=None, target_nodes=None): - """ - Get the entries from the slowlog. If ``num`` is specified, get the - most recent ``num`` items. - """ - args = ['SLOWLOG GET'] - if num is not None: - args.append(num) - - return self.execute_command(*args, - target_nodes=target_nodes) - - def slowlog_len(self, target_nodes=None): - "Get the number of items in the slowlog" - return self.execute_command('SLOWLOG LEN', - target_nodes=target_nodes) - - def slowlog_reset(self, target_nodes=None): - "Remove all items in the slowlog" - return self.execute_command('SLOWLOG RESET', - target_nodes=target_nodes) - - def stralgo(self, algo, value1, value2, specific_argument='strings', - len=False, idx=False, minmatchlen=None, withmatchlen=False, - target_nodes=None): - """ - Implements complex algorithms that operate on strings. - Right now the only algorithm implemented is the LCS algorithm - (longest common substring). However new algorithms could be - implemented in the future. - - ``algo`` Right now must be LCS - ``value1`` and ``value2`` Can be two strings or two keys - ``specific_argument`` Specifying if the arguments to the algorithm - will be keys or strings. strings is the default. - ``len`` Returns just the len of the match. - ``idx`` Returns the match positions in each string. - ``minmatchlen`` Restrict the list of matches to the ones of a given - minimal length. Can be provided only when ``idx`` set to True. - ``withmatchlen`` Returns the matches with the len of the match. - Can be provided only when ``idx`` set to True. - """ - # check validity - supported_algo = ['LCS'] - if algo not in supported_algo: - supported_algos_str = ', '.join(supported_algo) - raise DataError(f"The supported algorithms are: {supported_algos_str}") - if specific_argument not in ['keys', 'strings']: - raise DataError("specific_argument can be only keys or strings") - if len and idx: - raise DataError("len and idx cannot be provided together.") - - pieces = [algo, specific_argument.upper(), value1, value2] - if len: - pieces.append(b'LEN') - if idx: - pieces.append(b'IDX') - try: - int(minmatchlen) - pieces.extend([b'MINMATCHLEN', minmatchlen]) - except TypeError: - pass - if withmatchlen: - pieces.append(b'WITHMATCHLEN') - if specific_argument == 'strings' and target_nodes is None: - target_nodes = 'default-node' - return self.execute_command('STRALGO', *pieces, len=len, idx=idx, - minmatchlen=minmatchlen, - withmatchlen=withmatchlen, - target_nodes=target_nodes) - - def time(self, target_nodes=None): - """ - Returns the server time as a 2-item tuple of ints: - (seconds since epoch, microseconds into this second). - """ - return self.execute_command('TIME', target_nodes=target_nodes) - - def wait(self, num_replicas, timeout, target_nodes=None): - """ - Redis synchronous replication - That returns the number of replicas that processed the query when - we finally have at least ``num_replicas``, or when the ``timeout`` was - reached. - - If more than one target node are passed the result will be summed up - """ - return self.execute_command('WAIT', num_replicas, - timeout, - target_nodes=target_nodes) - - -class ClusterPubSubCommands: - """ - Redis PubSub commands for RedisCluster use. - see https://redis.io/topics/pubsub +class RedisClusterCommands( + ClusterMultiKeyCommands, + ClusterManagementCommands, + ACLCommands, + PubSubCommands, + ClusterDataAccessCommands, +): """ - def publish(self, channel, message, target_nodes=None): - """ - Publish ``message`` on ``channel``. - Returns the number of subscribers the message was delivered to. - """ - return self.execute_command('PUBLISH', channel, message, - target_nodes=target_nodes) + A class for all Redis Cluster commands - def pubsub_channels(self, pattern='*', target_nodes=None): - """ - Return a list of channels that have at least one subscriber - """ - return self.execute_command('PUBSUB CHANNELS', pattern, - target_nodes=target_nodes) + For key-based commands, the target node(s) will be internally determined + by the keys' hash slot. + Non-key-based commands can be executed with the 'target_nodes' argument to + target specific nodes. By default, if target_nodes is not specified, the + command will be executed on the default cluster node. - def pubsub_numpat(self, target_nodes=None): - """ - Returns the number of subscriptions to patterns - """ - return self.execute_command('PUBSUB NUMPAT', target_nodes=target_nodes) - - def pubsub_numsub(self, *args, target_nodes=None): - """ - Return a list of (channel, number of subscribers) tuples - for each channel given in ``*args`` - """ - return self.execute_command('PUBSUB NUMSUB', *args, - target_nodes=target_nodes) - - -class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, - ClusterPubSubCommands, DataAccessCommands): - """ - Redis Cluster commands - - Commands with the 'target_nodes' argument can be executed on specified - nodes. By default, if target_nodes is not specified, the command will be - executed on the default cluster node. :param :target_nodes: type can be one of the followings: - - nodes flag: 'all', 'primaries', 'replicas', 'random' + - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM - 'ClusterNode' - 'list(ClusterNodes)' - 'dict(any:clusterNodes)' for example: - r.cluster_info(target_nodes='all') + r.cluster_info(target_nodes=RedisCluster.ALL_NODES) """ + def cluster_addslots(self, target_node, *slots): """ Assign new hash slots to receiving node. Sends to specified node. @@ -745,22 +233,23 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, :target_node: 'ClusterNode' The node to execute the command on """ - return self.execute_command('CLUSTER ADDSLOTS', *slots, - target_nodes=target_node) + return self.execute_command( + "CLUSTER ADDSLOTS", *slots, target_nodes=target_node + ) def cluster_countkeysinslot(self, slot_id): """ Return the number of local keys in the specified hash slot Send to node based on specified slot_id """ - return self.execute_command('CLUSTER COUNTKEYSINSLOT', slot_id) + return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id) def cluster_count_failure_report(self, node_id): """ Return the number of failure reports active for a given node Sends to a random node """ - return self.execute_command('CLUSTER COUNT-FAILURE-REPORTS', node_id) + return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id) def cluster_delslots(self, *slots): """ @@ -769,10 +258,7 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, Returns a list of the results for each processed slot. """ - return [ - self.execute_command('CLUSTER DELSLOTS', slot) - for slot in slots - ] + return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots] def cluster_failover(self, target_node, option=None): """ @@ -783,15 +269,16 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, The node to execute the command on """ if option: - if option.upper() not in ['FORCE', 'TAKEOVER']: + if option.upper() not in ["FORCE", "TAKEOVER"]: raise RedisError( - f'Invalid option for CLUSTER FAILOVER command: {option}') + f"Invalid option for CLUSTER FAILOVER command: {option}" + ) else: - return self.execute_command('CLUSTER FAILOVER', option, - target_nodes=target_node) + return self.execute_command( + "CLUSTER FAILOVER", option, target_nodes=target_node + ) else: - return self.execute_command('CLUSTER FAILOVER', - target_nodes=target_node) + return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node) def cluster_info(self, target_nodes=None): """ @@ -799,22 +286,23 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, The command will be sent to a random node in the cluster if no target node is specified. """ - return self.execute_command('CLUSTER INFO', target_nodes=target_nodes) + return self.execute_command("CLUSTER INFO", target_nodes=target_nodes) def cluster_keyslot(self, key): """ Returns the hash slot of the specified key Sends to random node in the cluster """ - return self.execute_command('CLUSTER KEYSLOT', key) + return self.execute_command("CLUSTER KEYSLOT", key) def cluster_meet(self, host, port, target_nodes=None): """ Force a node cluster to handshake with another node. Sends to specified node. """ - return self.execute_command('CLUSTER MEET', host, port, - target_nodes=target_nodes) + return self.execute_command( + "CLUSTER MEET", host, port, target_nodes=target_nodes + ) def cluster_nodes(self): """ @@ -822,14 +310,15 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, Sends to random node in the cluster """ - return self.execute_command('CLUSTER NODES') + return self.execute_command("CLUSTER NODES") def cluster_replicate(self, target_nodes, node_id): """ Reconfigure a node as a slave of the specified master node """ - return self.execute_command('CLUSTER REPLICATE', node_id, - target_nodes=target_nodes) + return self.execute_command( + "CLUSTER REPLICATE", node_id, target_nodes=target_nodes + ) def cluster_reset(self, soft=True, target_nodes=None): """ @@ -838,29 +327,29 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, If 'soft' is True then it will send 'SOFT' argument If 'soft' is False then it will send 'HARD' argument """ - return self.execute_command('CLUSTER RESET', - b'SOFT' if soft else b'HARD', - target_nodes=target_nodes) + return self.execute_command( + "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes + ) def cluster_save_config(self, target_nodes=None): """ Forces the node to save cluster state on disk """ - return self.execute_command('CLUSTER SAVECONFIG', - target_nodes=target_nodes) + return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes) def cluster_get_keys_in_slot(self, slot, num_keys): """ Returns the number of keys in the specified cluster slot """ - return self.execute_command('CLUSTER GETKEYSINSLOT', slot, num_keys) + return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) def cluster_set_config_epoch(self, epoch, target_nodes=None): """ Set the configuration epoch in a new node """ - return self.execute_command('CLUSTER SET-CONFIG-EPOCH', epoch, - target_nodes=target_nodes) + return self.execute_command( + "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes + ) def cluster_setslot(self, target_node, node_id, slot_id, state): """ @@ -869,47 +358,48 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, :target_node: 'ClusterNode' The node to execute the command on """ - if state.upper() in ('IMPORTING', 'NODE', 'MIGRATING'): - return self.execute_command('CLUSTER SETSLOT', slot_id, state, - node_id, target_nodes=target_node) - elif state.upper() == 'STABLE': - raise RedisError('For "stable" state please use ' - 'cluster_setslot_stable') + if state.upper() in ("IMPORTING", "NODE", "MIGRATING"): + return self.execute_command( + "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node + ) + elif state.upper() == "STABLE": + raise RedisError('For "stable" state please use ' "cluster_setslot_stable") else: - raise RedisError(f'Invalid slot state: {state}') + raise RedisError(f"Invalid slot state: {state}") def cluster_setslot_stable(self, slot_id): """ Clears migrating / importing state from the slot. It determines by it self what node the slot is in and sends it there. """ - return self.execute_command('CLUSTER SETSLOT', slot_id, 'STABLE') + return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") def cluster_replicas(self, node_id, target_nodes=None): """ Provides a list of replica nodes replicating from the specified primary target node. """ - return self.execute_command('CLUSTER REPLICAS', node_id, - target_nodes=target_nodes) + return self.execute_command( + "CLUSTER REPLICAS", node_id, target_nodes=target_nodes + ) def cluster_slots(self, target_nodes=None): """ Get array of Cluster slot to node mappings """ - return self.execute_command('CLUSTER SLOTS', target_nodes=target_nodes) + return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes) def readonly(self, target_nodes=None): """ Enables read queries. The command will be sent to the default cluster node if target_nodes is not specified. - """ - if target_nodes == 'replicas' or target_nodes == 'all': + """ + if target_nodes == "replicas" or target_nodes == "all": # read_from_replicas will only be enabled if the READONLY command # is sent to all replicas self.read_from_replicas = True - return self.execute_command('READONLY', target_nodes=target_nodes) + return self.execute_command("READONLY", target_nodes=target_nodes) def readwrite(self, target_nodes=None): """ @@ -919,4 +409,4 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, """ # Reset read from replicas flag self.read_from_replicas = False - return self.execute_command('READWRITE', target_nodes=target_nodes) + return self.execute_command("READWRITE", target_nodes=target_nodes) diff --git a/redis/commands/core.py b/redis/commands/core.py index b769847..48e5324 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -1,15 +1,11 @@ import datetime +import hashlib import time import warnings -import hashlib + +from redis.exceptions import ConnectionError, DataError, NoScriptError, RedisError from .helpers import list_or_args -from redis.exceptions import ( - ConnectionError, - DataError, - NoScriptError, - RedisError, -) class ACLCommands: @@ -17,7 +13,8 @@ class ACLCommands: Redis Access Control List (ACL) commands. see: https://redis.io/topics/acl """ - def acl_cat(self, category=None): + + def acl_cat(self, category=None, **kwargs): """ Returns a list of categories or commands within a category. @@ -28,17 +25,17 @@ class ACLCommands: For more information check https://redis.io/commands/acl-cat """ pieces = [category] if category else [] - return self.execute_command('ACL CAT', *pieces) + return self.execute_command("ACL CAT", *pieces, **kwargs) - def acl_deluser(self, *username): + def acl_deluser(self, *username, **kwargs): """ Delete the ACL for the specified ``username``s For more information check https://redis.io/commands/acl-deluser """ - return self.execute_command('ACL DELUSER', *username) + return self.execute_command("ACL DELUSER", *username, **kwargs) - def acl_genpass(self, bits=None): + def acl_genpass(self, bits=None, **kwargs): """Generate a random password value. If ``bits`` is supplied then use this number of bits, rounded to the next multiple of 4. @@ -51,11 +48,12 @@ class ACLCommands: if b < 0 or b > 4096: raise ValueError except ValueError: - raise DataError('genpass optionally accepts a bits argument, ' - 'between 0 and 4096.') - return self.execute_command('ACL GENPASS', *pieces) + raise DataError( + "genpass optionally accepts a bits argument, " "between 0 and 4096." + ) + return self.execute_command("ACL GENPASS", *pieces, **kwargs) - def acl_getuser(self, username): + def acl_getuser(self, username, **kwargs): """ Get the ACL details for the specified ``username``. @@ -63,25 +61,25 @@ class ACLCommands: For more information check https://redis.io/commands/acl-getuser """ - return self.execute_command('ACL GETUSER', username) + return self.execute_command("ACL GETUSER", username, **kwargs) - def acl_help(self): + def acl_help(self, **kwargs): """The ACL HELP command returns helpful text describing the different subcommands. For more information check https://redis.io/commands/acl-help """ - return self.execute_command('ACL HELP') + return self.execute_command("ACL HELP", **kwargs) - def acl_list(self): + def acl_list(self, **kwargs): """ Return a list of all ACLs on the server For more information check https://redis.io/commands/acl-list """ - return self.execute_command('ACL LIST') + return self.execute_command("ACL LIST", **kwargs) - def acl_log(self, count=None): + def acl_log(self, count=None, **kwargs): """ Get ACL logs as a list. :param int count: Get logs[0:count]. @@ -92,23 +90,22 @@ class ACLCommands: args = [] if count is not None: if not isinstance(count, int): - raise DataError('ACL LOG count must be an ' - 'integer') + raise DataError("ACL LOG count must be an " "integer") args.append(count) - return self.execute_command('ACL LOG', *args) + return self.execute_command("ACL LOG", *args, **kwargs) - def acl_log_reset(self): + def acl_log_reset(self, **kwargs): """ Reset ACL logs. :rtype: Boolean. For more information check https://redis.io/commands/acl-log """ - args = [b'RESET'] - return self.execute_command('ACL LOG', *args) + args = [b"RESET"] + return self.execute_command("ACL LOG", *args, **kwargs) - def acl_load(self): + def acl_load(self, **kwargs): """ Load ACL rules from the configured ``aclfile``. @@ -117,9 +114,9 @@ class ACLCommands: For more information check https://redis.io/commands/acl-load """ - return self.execute_command('ACL LOAD') + return self.execute_command("ACL LOAD", **kwargs) - def acl_save(self): + def acl_save(self, **kwargs): """ Save ACL rules to the configured ``aclfile``. @@ -128,12 +125,23 @@ class ACLCommands: For more information check https://redis.io/commands/acl-save """ - return self.execute_command('ACL SAVE') - - def acl_setuser(self, username, enabled=False, nopass=False, - passwords=None, hashed_passwords=None, categories=None, - commands=None, keys=None, reset=False, reset_keys=False, - reset_passwords=False): + return self.execute_command("ACL SAVE", **kwargs) + + def acl_setuser( + self, + username, + enabled=False, + nopass=False, + passwords=None, + hashed_passwords=None, + categories=None, + commands=None, + keys=None, + reset=False, + reset_keys=False, + reset_passwords=False, + **kwargs, + ): """ Create or update an ACL user. @@ -195,26 +203,27 @@ class ACLCommands: For more information check https://redis.io/commands/acl-setuser """ - encoder = self.connection_pool.get_encoder() + encoder = self.get_encoder() pieces = [username] if reset: - pieces.append(b'reset') + pieces.append(b"reset") if reset_keys: - pieces.append(b'resetkeys') + pieces.append(b"resetkeys") if reset_passwords: - pieces.append(b'resetpass') + pieces.append(b"resetpass") if enabled: - pieces.append(b'on') + pieces.append(b"on") else: - pieces.append(b'off') + pieces.append(b"off") if (passwords or hashed_passwords) and nopass: - raise DataError('Cannot set \'nopass\' and supply ' - '\'passwords\' or \'hashed_passwords\'') + raise DataError( + "Cannot set 'nopass' and supply " "'passwords' or 'hashed_passwords'" + ) if passwords: # as most users will have only one password, allow remove_passwords @@ -222,13 +231,15 @@ class ACLCommands: passwords = list_or_args(passwords, []) for i, password in enumerate(passwords): password = encoder.encode(password) - if password.startswith(b'+'): - pieces.append(b'>%s' % password[1:]) - elif password.startswith(b'-'): - pieces.append(b'<%s' % password[1:]) + if password.startswith(b"+"): + pieces.append(b">%s" % password[1:]) + elif password.startswith(b"-"): + pieces.append(b"<%s" % password[1:]) else: - raise DataError(f'Password {i} must be prefixed with a ' - f'"+" to add or a "-" to remove') + raise DataError( + f"Password {i} must be prefixed with a " + f'"+" to add or a "-" to remove' + ) if hashed_passwords: # as most users will have only one password, allow remove_passwords @@ -236,29 +247,31 @@ class ACLCommands: hashed_passwords = list_or_args(hashed_passwords, []) for i, hashed_password in enumerate(hashed_passwords): hashed_password = encoder.encode(hashed_password) - if hashed_password.startswith(b'+'): - pieces.append(b'#%s' % hashed_password[1:]) - elif hashed_password.startswith(b'-'): - pieces.append(b'!%s' % hashed_password[1:]) + if hashed_password.startswith(b"+"): + pieces.append(b"#%s" % hashed_password[1:]) + elif hashed_password.startswith(b"-"): + pieces.append(b"!%s" % hashed_password[1:]) else: - raise DataError(f'Hashed password {i} must be prefixed with a ' - f'"+" to add or a "-" to remove') + raise DataError( + f"Hashed password {i} must be prefixed with a " + f'"+" to add or a "-" to remove' + ) if nopass: - pieces.append(b'nopass') + pieces.append(b"nopass") if categories: for category in categories: category = encoder.encode(category) # categories can be prefixed with one of (+@, +, -@, -) - if category.startswith(b'+@'): + if category.startswith(b"+@"): pieces.append(category) - elif category.startswith(b'+'): - pieces.append(b'+@%s' % category[1:]) - elif category.startswith(b'-@'): + elif category.startswith(b"+"): + pieces.append(b"+@%s" % category[1:]) + elif category.startswith(b"-@"): pieces.append(category) - elif category.startswith(b'-'): - pieces.append(b'-@%s' % category[1:]) + elif category.startswith(b"-"): + pieces.append(b"-@%s" % category[1:]) else: raise DataError( f'Category "{encoder.decode(category, force=True)}" ' @@ -267,7 +280,7 @@ class ACLCommands: if commands: for cmd in commands: cmd = encoder.encode(cmd) - if not cmd.startswith(b'+') and not cmd.startswith(b'-'): + if not cmd.startswith(b"+") and not cmd.startswith(b"-"): raise DataError( f'Command "{encoder.decode(cmd, force=True)}" ' 'must be prefixed with "+" or "-"' @@ -277,37 +290,38 @@ class ACLCommands: if keys: for key in keys: key = encoder.encode(key) - pieces.append(b'~%s' % key) + pieces.append(b"~%s" % key) - return self.execute_command('ACL SETUSER', *pieces) + return self.execute_command("ACL SETUSER", *pieces, **kwargs) - def acl_users(self): + def acl_users(self, **kwargs): """Returns a list of all registered users on the server. For more information check https://redis.io/commands/acl-users """ - return self.execute_command('ACL USERS') + return self.execute_command("ACL USERS", **kwargs) - def acl_whoami(self): + def acl_whoami(self, **kwargs): """Get the username for the current connection For more information check https://redis.io/commands/acl-whoami """ - return self.execute_command('ACL WHOAMI') + return self.execute_command("ACL WHOAMI", **kwargs) class ManagementCommands: """ Redis management commands """ - def bgrewriteaof(self): + + def bgrewriteaof(self, **kwargs): """Tell the Redis server to rewrite the AOF file from data in memory. For more information check https://redis.io/commands/bgrewriteaof """ - return self.execute_command('BGREWRITEAOF') + return self.execute_command("BGREWRITEAOF", **kwargs) - def bgsave(self, schedule=True): + def bgsave(self, schedule=True, **kwargs): """ Tell the Redis server to save its data to disk. Unlike save(), this method is asynchronous and returns immediately. @@ -317,25 +331,35 @@ class ManagementCommands: pieces = [] if schedule: pieces.append("SCHEDULE") - return self.execute_command('BGSAVE', *pieces) + return self.execute_command("BGSAVE", *pieces, **kwargs) def role(self): """ Provide information on the role of a Redis instance in the context of replication, by returning if the instance is currently a master, slave, or sentinel. + + For more information check https://redis.io/commands/role """ return self.execute_command('ROLE') - def client_kill(self, address): + def client_kill(self, address, **kwargs): """Disconnects the client at ``address`` (ip:port) For more information check https://redis.io/commands/client-kill """ - return self.execute_command('CLIENT KILL', address) + return self.execute_command("CLIENT KILL", address, **kwargs) - def client_kill_filter(self, _id=None, _type=None, addr=None, - skipme=None, laddr=None, user=None): + def client_kill_filter( + self, + _id=None, + _type=None, + addr=None, + skipme=None, + laddr=None, + user=None, + **kwargs, + ): """ Disconnects client(s) using a variety of filter options :param id: Kills a client by its unique ID field @@ -350,40 +374,42 @@ class ManagementCommands: """ args = [] if _type is not None: - client_types = ('normal', 'master', 'slave', 'pubsub') + client_types = ("normal", "master", "slave", "pubsub") if str(_type).lower() not in client_types: raise DataError(f"CLIENT KILL type must be one of {client_types!r}") - args.extend((b'TYPE', _type)) + args.extend((b"TYPE", _type)) if skipme is not None: if not isinstance(skipme, bool): raise DataError("CLIENT KILL skipme must be a bool") if skipme: - args.extend((b'SKIPME', b'YES')) + args.extend((b"SKIPME", b"YES")) else: - args.extend((b'SKIPME', b'NO')) + args.extend((b"SKIPME", b"NO")) if _id is not None: - args.extend((b'ID', _id)) + args.extend((b"ID", _id)) if addr is not None: - args.extend((b'ADDR', addr)) + args.extend((b"ADDR", addr)) if laddr is not None: - args.extend((b'LADDR', laddr)) + args.extend((b"LADDR", laddr)) if user is not None: - args.extend((b'USER', user)) + args.extend((b"USER", user)) if not args: - raise DataError("CLIENT KILL <filter> <value> ... ... <filter> " - "<value> must specify at least one filter") - return self.execute_command('CLIENT KILL', *args) + raise DataError( + "CLIENT KILL <filter> <value> ... ... <filter> " + "<value> must specify at least one filter" + ) + return self.execute_command("CLIENT KILL", *args, **kwargs) - def client_info(self): + def client_info(self, **kwargs): """ Returns information and statistics about the current client connection. For more information check https://redis.io/commands/client-info """ - return self.execute_command('CLIENT INFO') + return self.execute_command("CLIENT INFO", **kwargs) - def client_list(self, _type=None, client_id=[]): + def client_list(self, _type=None, client_id=[], **kwargs): """ Returns a list of currently connected clients. If type of client specified, only that type will be returned. @@ -395,36 +421,36 @@ class ManagementCommands: """ args = [] if _type is not None: - client_types = ('normal', 'master', 'replica', 'pubsub') + client_types = ("normal", "master", "replica", "pubsub") if str(_type).lower() not in client_types: raise DataError(f"CLIENT LIST _type must be one of {client_types!r}") - args.append(b'TYPE') + args.append(b"TYPE") args.append(_type) if not isinstance(client_id, list): raise DataError("client_id must be a list") if client_id != []: args.append(b"ID") - args.append(' '.join(client_id)) - return self.execute_command('CLIENT LIST', *args) + args.append(" ".join(client_id)) + return self.execute_command("CLIENT LIST", *args, **kwargs) - def client_getname(self): + def client_getname(self, **kwargs): """ Returns the current connection name For more information check https://redis.io/commands/client-getname """ - return self.execute_command('CLIENT GETNAME') + return self.execute_command("CLIENT GETNAME", **kwargs) - def client_getredir(self): + def client_getredir(self, **kwargs): """ Returns the ID (an integer) of the client to whom we are redirecting tracking notifications. see: https://redis.io/commands/client-getredir """ - return self.execute_command('CLIENT GETREDIR') + return self.execute_command("CLIENT GETREDIR", **kwargs) - def client_reply(self, reply): + def client_reply(self, reply, **kwargs): """ Enable and disable redis server replies. ``reply`` Must be ON OFF or SKIP, @@ -440,37 +466,37 @@ class ManagementCommands: See https://redis.io/commands/client-reply """ - replies = ['ON', 'OFF', 'SKIP'] + replies = ["ON", "OFF", "SKIP"] if reply not in replies: - raise DataError(f'CLIENT REPLY must be one of {replies!r}') - return self.execute_command("CLIENT REPLY", reply) + raise DataError(f"CLIENT REPLY must be one of {replies!r}") + return self.execute_command("CLIENT REPLY", reply, **kwargs) - def client_id(self): + def client_id(self, **kwargs): """ Returns the current connection id For more information check https://redis.io/commands/client-id """ - return self.execute_command('CLIENT ID') + return self.execute_command("CLIENT ID", **kwargs) - def client_trackinginfo(self): + def client_trackinginfo(self, **kwargs): """ Returns the information about the current client connection's use of the server assisted client side cache. See https://redis.io/commands/client-trackinginfo """ - return self.execute_command('CLIENT TRACKINGINFO') + return self.execute_command("CLIENT TRACKINGINFO", **kwargs) - def client_setname(self, name): + def client_setname(self, name, **kwargs): """ Sets the current connection name For more information check https://redis.io/commands/client-setname """ - return self.execute_command('CLIENT SETNAME', name) + return self.execute_command("CLIENT SETNAME", name, **kwargs) - def client_unblock(self, client_id, error=False): + def client_unblock(self, client_id, error=False, **kwargs): """ Unblocks a connection by its client id. If ``error`` is True, unblocks the client with a special error message. @@ -479,12 +505,12 @@ class ManagementCommands: For more information check https://redis.io/commands/client-unblock """ - args = ['CLIENT UNBLOCK', int(client_id)] + args = ["CLIENT UNBLOCK", int(client_id)] if error: - args.append(b'ERROR') - return self.execute_command(*args) + args.append(b"ERROR") + return self.execute_command(*args, **kwargs) - def client_pause(self, timeout): + def client_pause(self, timeout, **kwargs): """ Suspend all the Redis clients for the specified amount of time :param timeout: milliseconds to pause clients @@ -493,91 +519,80 @@ class ManagementCommands: """ if not isinstance(timeout, int): raise DataError("CLIENT PAUSE timeout must be an integer") - return self.execute_command('CLIENT PAUSE', str(timeout)) + return self.execute_command("CLIENT PAUSE", str(timeout), **kwargs) - def client_unpause(self): + def client_unpause(self, **kwargs): """ Unpause all redis clients For more information check https://redis.io/commands/client-unpause """ - return self.execute_command('CLIENT UNPAUSE') - - def command_info(self): - raise NotImplementedError( - "COMMAND INFO is intentionally not implemented in the client." - ) - - def command_count(self): - return self.execute_command('COMMAND COUNT') + return self.execute_command("CLIENT UNPAUSE", **kwargs) - def readwrite(self): + def command(self, **kwargs): """ - Disables read queries for a connection to a Redis Cluster slave node. + Returns dict reply of details about all Redis commands. - For more information check https://redis.io/commands/readwrite + For more information check https://redis.io/commands/command """ - return self.execute_command('READWRITE') + return self.execute_command("COMMAND", **kwargs) - def readonly(self): - """ - Enables read queries for a connection to a Redis Cluster replica node. + def command_info(self, **kwargs): + raise NotImplementedError( + "COMMAND INFO is intentionally not implemented in the client." + ) - For more information check https://redis.io/commands/readonly - """ - return self.execute_command('READONLY') + def command_count(self, **kwargs): + return self.execute_command("COMMAND COUNT", **kwargs) - def config_get(self, pattern="*"): + def config_get(self, pattern="*", **kwargs): """ Return a dictionary of configuration based on the ``pattern`` For more information check https://redis.io/commands/config-get """ - return self.execute_command('CONFIG GET', pattern) + return self.execute_command("CONFIG GET", pattern, **kwargs) - def config_set(self, name, value): + def config_set(self, name, value, **kwargs): """Set config item ``name`` with ``value`` For more information check https://redis.io/commands/config-set """ - return self.execute_command('CONFIG SET', name, value) + return self.execute_command("CONFIG SET", name, value, **kwargs) - def config_resetstat(self): + def config_resetstat(self, **kwargs): """ Reset runtime statistics For more information check https://redis.io/commands/config-resetstat """ - return self.execute_command('CONFIG RESETSTAT') + return self.execute_command("CONFIG RESETSTAT", **kwargs) - def config_rewrite(self): + def config_rewrite(self, **kwargs): """ Rewrite config file with the minimal change to reflect running config. For more information check https://redis.io/commands/config-rewrite """ - return self.execute_command('CONFIG REWRITE') - - def cluster(self, cluster_arg, *args): - return self.execute_command(f'CLUSTER {cluster_arg.upper()}', *args) + return self.execute_command("CONFIG REWRITE", **kwargs) - def dbsize(self): + def dbsize(self, **kwargs): """ Returns the number of keys in the current database For more information check https://redis.io/commands/dbsize """ - return self.execute_command('DBSIZE') + return self.execute_command("DBSIZE", **kwargs) - def debug_object(self, key): + def debug_object(self, key, **kwargs): """ Returns version specific meta information about a given key For more information check https://redis.io/commands/debug-object """ - return self.execute_command('DEBUG OBJECT', key) + return self.execute_command("DEBUG OBJECT", key, **kwargs) - def debug_segfault(self): + def debug_segfault(self, **kwargs): raise NotImplementedError( """ DEBUG SEGFAULT is intentionally not implemented in the client. @@ -586,15 +601,15 @@ class ManagementCommands: """ ) - def echo(self, value): + def echo(self, value, **kwargs): """ Echo the string back from the server For more information check https://redis.io/commands/echo """ - return self.execute_command('ECHO', value) + return self.execute_command("ECHO", value, **kwargs) - def flushall(self, asynchronous=False): + def flushall(self, asynchronous=False, **kwargs): """ Delete all keys in all databases on the current host. @@ -605,10 +620,10 @@ class ManagementCommands: """ args = [] if asynchronous: - args.append(b'ASYNC') - return self.execute_command('FLUSHALL', *args) + args.append(b"ASYNC") + return self.execute_command("FLUSHALL", *args, **kwargs) - def flushdb(self, asynchronous=False): + def flushdb(self, asynchronous=False, **kwargs): """ Delete all keys in the current database. @@ -619,18 +634,18 @@ class ManagementCommands: """ args = [] if asynchronous: - args.append(b'ASYNC') - return self.execute_command('FLUSHDB', *args) + args.append(b"ASYNC") + return self.execute_command("FLUSHDB", *args, **kwargs) - def swapdb(self, first, second): + def swapdb(self, first, second, **kwargs): """ Swap two databases For more information check https://redis.io/commands/swapdb """ - return self.execute_command('SWAPDB', first, second) + return self.execute_command("SWAPDB", first, second, **kwargs) - def info(self, section=None): + def info(self, section=None, **kwargs): """ Returns a dictionary containing information about the Redis server @@ -643,32 +658,42 @@ class ManagementCommands: For more information check https://redis.io/commands/info """ if section is None: - return self.execute_command('INFO') + return self.execute_command("INFO", **kwargs) else: - return self.execute_command('INFO', section) + return self.execute_command("INFO", section, **kwargs) - def lastsave(self): + def lastsave(self, **kwargs): """ Return a Python datetime object representing the last time the Redis database was saved to disk For more information check https://redis.io/commands/lastsave """ - return self.execute_command('LASTSAVE') + return self.execute_command("LASTSAVE", **kwargs) - def lolwut(self, *version_numbers): + def lolwut(self, *version_numbers, **kwargs): """ Get the Redis version and a piece of generative computer art See: https://redis.io/commands/lolwut """ if version_numbers: - return self.execute_command('LOLWUT VERSION', *version_numbers) + return self.execute_command("LOLWUT VERSION", *version_numbers, **kwargs) else: - return self.execute_command('LOLWUT') - - def migrate(self, host, port, keys, destination_db, timeout, - copy=False, replace=False, auth=None): + return self.execute_command("LOLWUT", **kwargs) + + def migrate( + self, + host, + port, + keys, + destination_db, + timeout, + copy=False, + replace=False, + auth=None, + **kwargs, + ): """ Migrate 1 or more keys from the current Redis server to a different server specified by the ``host``, ``port`` and ``destination_db``. @@ -690,27 +715,30 @@ class ManagementCommands: """ keys = list_or_args(keys, []) if not keys: - raise DataError('MIGRATE requires at least one key') + raise DataError("MIGRATE requires at least one key") pieces = [] if copy: - pieces.append(b'COPY') + pieces.append(b"COPY") if replace: - pieces.append(b'REPLACE') + pieces.append(b"REPLACE") if auth: - pieces.append(b'AUTH') + pieces.append(b"AUTH") pieces.append(auth) - pieces.append(b'KEYS') + pieces.append(b"KEYS") pieces.extend(keys) - return self.execute_command('MIGRATE', host, port, '', destination_db, - timeout, *pieces) + return self.execute_command( + "MIGRATE", host, port, "", destination_db, timeout, *pieces, **kwargs + ) - def object(self, infotype, key): + def object(self, infotype, key, **kwargs): """ Return the encoding, idletime, or refcount about the key """ - return self.execute_command('OBJECT', infotype, key, infotype=infotype) + return self.execute_command( + "OBJECT", infotype, key, infotype=infotype, **kwargs + ) - def memory_doctor(self): + def memory_doctor(self, **kwargs): raise NotImplementedError( """ MEMORY DOCTOR is intentionally not implemented in the client. @@ -719,7 +747,7 @@ class ManagementCommands: """ ) - def memory_help(self): + def memory_help(self, **kwargs): raise NotImplementedError( """ MEMORY HELP is intentionally not implemented in the client. @@ -728,23 +756,23 @@ class ManagementCommands: """ ) - def memory_stats(self): + def memory_stats(self, **kwargs): """ Return a dictionary of memory stats For more information check https://redis.io/commands/memory-stats """ - return self.execute_command('MEMORY STATS') + return self.execute_command("MEMORY STATS", **kwargs) - def memory_malloc_stats(self): + def memory_malloc_stats(self, **kwargs): """ Return an internal statistics report from the memory allocator. See: https://redis.io/commands/memory-malloc-stats """ - return self.execute_command('MEMORY MALLOC-STATS') + return self.execute_command("MEMORY MALLOC-STATS", **kwargs) - def memory_usage(self, key, samples=None): + def memory_usage(self, key, samples=None, **kwargs): """ Return the total memory usage for key, its value and associated administrative overheads. @@ -757,34 +785,34 @@ class ManagementCommands: """ args = [] if isinstance(samples, int): - args.extend([b'SAMPLES', samples]) - return self.execute_command('MEMORY USAGE', key, *args) + args.extend([b"SAMPLES", samples]) + return self.execute_command("MEMORY USAGE", key, *args, **kwargs) - def memory_purge(self): + def memory_purge(self, **kwargs): """ Attempts to purge dirty pages for reclamation by allocator For more information check https://redis.io/commands/memory-purge """ - return self.execute_command('MEMORY PURGE') + return self.execute_command("MEMORY PURGE", **kwargs) - def ping(self): + def ping(self, **kwargs): """ Ping the Redis server For more information check https://redis.io/commands/ping """ - return self.execute_command('PING') + return self.execute_command("PING", **kwargs) - def quit(self): + def quit(self, **kwargs): """ Ask the server to close the connection. For more information check https://redis.io/commands/quit """ - return self.execute_command('QUIT') + return self.execute_command("QUIT", **kwargs) - def replicaof(self, *args): + def replicaof(self, *args, **kwargs): """ Update the replication settings of a redis replica, on the fly. Examples of valid arguments include: @@ -793,18 +821,18 @@ class ManagementCommands: For more information check https://redis.io/commands/replicaof """ - return self.execute_command('REPLICAOF', *args) + return self.execute_command("REPLICAOF", *args, **kwargs) - def save(self): + def save(self, **kwargs): """ Tell the Redis server to save its data to disk, blocking until the save is complete For more information check https://redis.io/commands/save """ - return self.execute_command('SAVE') + return self.execute_command("SAVE", **kwargs) - def shutdown(self, save=False, nosave=False): + def shutdown(self, save=False, nosave=False, **kwargs): """Shutdown the Redis server. If Redis has persistence configured, data will be flushed before shutdown. If the "save" option is set, a data flush will be attempted even if there is no persistence @@ -814,20 +842,20 @@ class ManagementCommands: For more information check https://redis.io/commands/shutdown """ if save and nosave: - raise DataError('SHUTDOWN save and nosave cannot both be set') - args = ['SHUTDOWN'] + raise DataError("SHUTDOWN save and nosave cannot both be set") + args = ["SHUTDOWN"] if save: - args.append('SAVE') + args.append("SAVE") if nosave: - args.append('NOSAVE') + args.append("NOSAVE") try: - self.execute_command(*args) + self.execute_command(*args, **kwargs) except ConnectionError: # a ConnectionError here is expected return raise RedisError("SHUTDOWN seems to have failed.") - def slaveof(self, host=None, port=None): + def slaveof(self, host=None, port=None, **kwargs): """ Set the server to be a replicated slave of the instance identified by the ``host`` and ``port``. If called without arguments, the @@ -836,49 +864,48 @@ class ManagementCommands: For more information check https://redis.io/commands/slaveof """ if host is None and port is None: - return self.execute_command('SLAVEOF', b'NO', b'ONE') - return self.execute_command('SLAVEOF', host, port) + return self.execute_command("SLAVEOF", b"NO", b"ONE", **kwargs) + return self.execute_command("SLAVEOF", host, port, **kwargs) - def slowlog_get(self, num=None): + def slowlog_get(self, num=None, **kwargs): """ Get the entries from the slowlog. If ``num`` is specified, get the most recent ``num`` items. For more information check https://redis.io/commands/slowlog-get """ - args = ['SLOWLOG GET'] + args = ["SLOWLOG GET"] if num is not None: args.append(num) - decode_responses = self.connection_pool.connection_kwargs.get( - 'decode_responses', False) - return self.execute_command(*args, decode_responses=decode_responses) + decode_responses = self.get_connection_kwargs().get("decode_responses", False) + return self.execute_command(*args, decode_responses=decode_responses, **kwargs) - def slowlog_len(self): + def slowlog_len(self, **kwargs): """ Get the number of items in the slowlog For more information check https://redis.io/commands/slowlog-len """ - return self.execute_command('SLOWLOG LEN') + return self.execute_command("SLOWLOG LEN", **kwargs) - def slowlog_reset(self): + def slowlog_reset(self, **kwargs): """ Remove all items in the slowlog For more information check https://redis.io/commands/slowlog-reset """ - return self.execute_command('SLOWLOG RESET') + return self.execute_command("SLOWLOG RESET", **kwargs) - def time(self): + def time(self, **kwargs): """ Returns the server time as a 2-item tuple of ints: (seconds since epoch, microseconds into this second). For more information check https://redis.io/commands/time """ - return self.execute_command('TIME') + return self.execute_command("TIME", **kwargs) - def wait(self, num_replicas, timeout): + def wait(self, num_replicas, timeout, **kwargs): """ Redis synchronous replication That returns the number of replicas that processed the query when @@ -887,13 +914,14 @@ class ManagementCommands: For more information check https://redis.io/commands/wait """ - return self.execute_command('WAIT', num_replicas, timeout) + return self.execute_command("WAIT", num_replicas, timeout, **kwargs) class BasicKeyCommands: """ Redis basic key-based commands """ + def append(self, key, value): """ Appends the string ``value`` to the value at ``key``. If ``key`` @@ -902,7 +930,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/append """ - return self.execute_command('APPEND', key, value) + return self.execute_command("APPEND", key, value) def bitcount(self, key, start=None, end=None): """ @@ -915,10 +943,9 @@ class BasicKeyCommands: if start is not None and end is not None: params.append(start) params.append(end) - elif (start is not None and end is None) or \ - (end is not None and start is None): + elif (start is not None and end is None) or (end is not None and start is None): raise DataError("Both start and end must be specified") - return self.execute_command('BITCOUNT', *params) + return self.execute_command("BITCOUNT", *params) def bitfield(self, key, default_overflow=None): """ @@ -936,7 +963,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/bitop """ - return self.execute_command('BITOP', operation, dest, *keys) + return self.execute_command("BITOP", operation, dest, *keys) def bitpos(self, key, bit, start=None, end=None): """ @@ -948,7 +975,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/bitpos """ if bit not in (0, 1): - raise DataError('bit must be 0 or 1') + raise DataError("bit must be 0 or 1") params = [key, bit] start is not None and params.append(start) @@ -956,9 +983,8 @@ class BasicKeyCommands: if start is not None and end is not None: params.append(end) elif start is None and end is not None: - raise DataError("start argument is not set, " - "when end is specified") - return self.execute_command('BITPOS', *params) + raise DataError("start argument is not set, " "when end is specified") + return self.execute_command("BITPOS", *params) def copy(self, source, destination, destination_db=None, replace=False): """ @@ -978,7 +1004,7 @@ class BasicKeyCommands: params.extend(["DB", destination_db]) if replace: params.append("REPLACE") - return self.execute_command('COPY', *params) + return self.execute_command("COPY", *params) def decr(self, name, amount=1): """ @@ -998,13 +1024,13 @@ class BasicKeyCommands: For more information check https://redis.io/commands/decrby """ - return self.execute_command('DECRBY', name, amount) + return self.execute_command("DECRBY", name, amount) def delete(self, *names): """ Delete one or more keys specified by ``names`` """ - return self.execute_command('DEL', *names) + return self.execute_command("DEL", *names) def __delitem__(self, name): self.delete(name) @@ -1017,9 +1043,10 @@ class BasicKeyCommands: For more information check https://redis.io/commands/dump """ from redis.client import NEVER_DECODE + options = {} options[NEVER_DECODE] = [] - return self.execute_command('DUMP', name, **options) + return self.execute_command("DUMP", name, **options) def exists(self, *names): """ @@ -1027,7 +1054,8 @@ class BasicKeyCommands: For more information check https://redis.io/commands/exists """ - return self.execute_command('EXISTS', *names) + return self.execute_command("EXISTS", *names) + __contains__ = exists def expire(self, name, time): @@ -1039,7 +1067,7 @@ class BasicKeyCommands: """ if isinstance(time, datetime.timedelta): time = int(time.total_seconds()) - return self.execute_command('EXPIRE', name, time) + return self.execute_command("EXPIRE", name, time) def expireat(self, name, when): """ @@ -1050,7 +1078,7 @@ class BasicKeyCommands: """ if isinstance(when, datetime.datetime): when = int(time.mktime(when.timetuple())) - return self.execute_command('EXPIREAT', name, when) + return self.execute_command("EXPIREAT", name, when) def get(self, name): """ @@ -1058,7 +1086,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/get """ - return self.execute_command('GET', name) + return self.execute_command("GET", name) def getdel(self, name): """ @@ -1069,10 +1097,9 @@ class BasicKeyCommands: For more information check https://redis.io/commands/getdel """ - return self.execute_command('GETDEL', name) + return self.execute_command("GETDEL", name) - def getex(self, name, - ex=None, px=None, exat=None, pxat=None, persist=False): + def getex(self, name, ex=None, px=None, exat=None, pxat=None, persist=False): """ Get the value of key and optionally set its expiration. GETEX is similar to GET, but is a write command with @@ -1096,38 +1123,40 @@ class BasicKeyCommands: opset = {ex, px, exat, pxat} if len(opset) > 2 or len(opset) > 1 and persist: - raise DataError("``ex``, ``px``, ``exat``, ``pxat``, " - "and ``persist`` are mutually exclusive.") + raise DataError( + "``ex``, ``px``, ``exat``, ``pxat``, " + "and ``persist`` are mutually exclusive." + ) pieces = [] # similar to set command if ex is not None: - pieces.append('EX') + pieces.append("EX") if isinstance(ex, datetime.timedelta): ex = int(ex.total_seconds()) pieces.append(ex) if px is not None: - pieces.append('PX') + pieces.append("PX") if isinstance(px, datetime.timedelta): px = int(px.total_seconds() * 1000) pieces.append(px) # similar to pexpireat command if exat is not None: - pieces.append('EXAT') + pieces.append("EXAT") if isinstance(exat, datetime.datetime): s = int(exat.microsecond / 1000000) exat = int(time.mktime(exat.timetuple())) + s pieces.append(exat) if pxat is not None: - pieces.append('PXAT') + pieces.append("PXAT") if isinstance(pxat, datetime.datetime): ms = int(pxat.microsecond / 1000) pxat = int(time.mktime(pxat.timetuple())) * 1000 + ms pieces.append(pxat) if persist: - pieces.append('PERSIST') + pieces.append("PERSIST") - return self.execute_command('GETEX', name, *pieces) + return self.execute_command("GETEX", name, *pieces) def __getitem__(self, name): """ @@ -1145,7 +1174,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/getbit """ - return self.execute_command('GETBIT', name, offset) + return self.execute_command("GETBIT", name, offset) def getrange(self, key, start, end): """ @@ -1154,7 +1183,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/getrange """ - return self.execute_command('GETRANGE', key, start, end) + return self.execute_command("GETRANGE", key, start, end) def getset(self, name, value): """ @@ -1166,7 +1195,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/getset """ - return self.execute_command('GETSET', name, value) + return self.execute_command("GETSET", name, value) def incr(self, name, amount=1): """ @@ -1186,7 +1215,7 @@ class BasicKeyCommands: """ # An alias for ``incr()``, because it is already implemented # as INCRBY redis command. - return self.execute_command('INCRBY', name, amount) + return self.execute_command("INCRBY", name, amount) def incrbyfloat(self, name, amount=1.0): """ @@ -1195,15 +1224,15 @@ class BasicKeyCommands: For more information check https://redis.io/commands/incrbyfloat """ - return self.execute_command('INCRBYFLOAT', name, amount) + return self.execute_command("INCRBYFLOAT", name, amount) - def keys(self, pattern='*'): + def keys(self, pattern="*", **kwargs): """ Returns a list of keys matching ``pattern`` For more information check https://redis.io/commands/keys """ - return self.execute_command('KEYS', pattern) + return self.execute_command("KEYS", pattern, **kwargs) def lmove(self, first_list, second_list, src="LEFT", dest="RIGHT"): """ @@ -1216,8 +1245,7 @@ class BasicKeyCommands: params = [first_list, second_list, src, dest] return self.execute_command("LMOVE", *params) - def blmove(self, first_list, second_list, timeout, - src="LEFT", dest="RIGHT"): + def blmove(self, first_list, second_list, timeout, src="LEFT", dest="RIGHT"): """ Blocking version of lmove. @@ -1233,11 +1261,12 @@ class BasicKeyCommands: For more information check https://redis.io/commands/mget """ from redis.client import EMPTY_RESPONSE + args = list_or_args(keys, args) options = {} if not args: options[EMPTY_RESPONSE] = [] - return self.execute_command('MGET', *args, **options) + return self.execute_command("MGET", *args, **options) def mset(self, mapping): """ @@ -1250,7 +1279,7 @@ class BasicKeyCommands: items = [] for pair in mapping.items(): items.extend(pair) - return self.execute_command('MSET', *items) + return self.execute_command("MSET", *items) def msetnx(self, mapping): """ @@ -1264,7 +1293,7 @@ class BasicKeyCommands: items = [] for pair in mapping.items(): items.extend(pair) - return self.execute_command('MSETNX', *items) + return self.execute_command("MSETNX", *items) def move(self, name, db): """ @@ -1272,7 +1301,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/move """ - return self.execute_command('MOVE', name, db) + return self.execute_command("MOVE", name, db) def persist(self, name): """ @@ -1280,7 +1309,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/persist """ - return self.execute_command('PERSIST', name) + return self.execute_command("PERSIST", name) def pexpire(self, name, time): """ @@ -1292,7 +1321,7 @@ class BasicKeyCommands: """ if isinstance(time, datetime.timedelta): time = int(time.total_seconds() * 1000) - return self.execute_command('PEXPIRE', name, time) + return self.execute_command("PEXPIRE", name, time) def pexpireat(self, name, when): """ @@ -1305,7 +1334,7 @@ class BasicKeyCommands: if isinstance(when, datetime.datetime): ms = int(when.microsecond / 1000) when = int(time.mktime(when.timetuple())) * 1000 + ms - return self.execute_command('PEXPIREAT', name, when) + return self.execute_command("PEXPIREAT", name, when) def psetex(self, name, time_ms, value): """ @@ -1317,7 +1346,7 @@ class BasicKeyCommands: """ if isinstance(time_ms, datetime.timedelta): time_ms = int(time_ms.total_seconds() * 1000) - return self.execute_command('PSETEX', name, time_ms, value) + return self.execute_command("PSETEX", name, time_ms, value) def pttl(self, name): """ @@ -1325,7 +1354,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/pttl """ - return self.execute_command('PTTL', name) + return self.execute_command("PTTL", name) def hrandfield(self, key, count=None, withvalues=False): """ @@ -1349,13 +1378,13 @@ class BasicKeyCommands: return self.execute_command("HRANDFIELD", key, *params) - def randomkey(self): + def randomkey(self, **kwargs): """ Returns the name of a random key For more information check https://redis.io/commands/randomkey """ - return self.execute_command('RANDOMKEY') + return self.execute_command("RANDOMKEY", **kwargs) def rename(self, src, dst): """ @@ -1363,7 +1392,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/rename """ - return self.execute_command('RENAME', src, dst) + return self.execute_command("RENAME", src, dst) def renamenx(self, src, dst): """ @@ -1371,10 +1400,18 @@ class BasicKeyCommands: For more information check https://redis.io/commands/renamenx """ - return self.execute_command('RENAMENX', src, dst) + return self.execute_command("RENAMENX", src, dst) - def restore(self, name, ttl, value, replace=False, absttl=False, - idletime=None, frequency=None): + def restore( + self, + name, + ttl, + value, + replace=False, + absttl=False, + idletime=None, + frequency=None, + ): """ Create a key using the provided serialized value, previously obtained using DUMP. @@ -1396,28 +1433,38 @@ class BasicKeyCommands: """ params = [name, ttl, value] if replace: - params.append('REPLACE') + params.append("REPLACE") if absttl: - params.append('ABSTTL') + params.append("ABSTTL") if idletime is not None: - params.append('IDLETIME') + params.append("IDLETIME") try: params.append(int(idletime)) except ValueError: raise DataError("idletimemust be an integer") if frequency is not None: - params.append('FREQ') + params.append("FREQ") try: params.append(int(frequency)) except ValueError: raise DataError("frequency must be an integer") - return self.execute_command('RESTORE', *params) - - def set(self, name, value, - ex=None, px=None, nx=False, xx=False, keepttl=False, get=False, - exat=None, pxat=None): + return self.execute_command("RESTORE", *params) + + def set( + self, + name, + value, + ex=None, + px=None, + nx=False, + xx=False, + keepttl=False, + get=False, + exat=None, + pxat=None, + ): """ Set the value at key ``name`` to ``value`` @@ -1449,7 +1496,7 @@ class BasicKeyCommands: pieces = [name, value] options = {} if ex is not None: - pieces.append('EX') + pieces.append("EX") if isinstance(ex, datetime.timedelta): pieces.append(int(ex.total_seconds())) elif isinstance(ex, int): @@ -1457,7 +1504,7 @@ class BasicKeyCommands: else: raise DataError("ex must be datetime.timedelta or int") if px is not None: - pieces.append('PX') + pieces.append("PX") if isinstance(px, datetime.timedelta): pieces.append(int(px.total_seconds() * 1000)) elif isinstance(px, int): @@ -1465,30 +1512,30 @@ class BasicKeyCommands: else: raise DataError("px must be datetime.timedelta or int") if exat is not None: - pieces.append('EXAT') + pieces.append("EXAT") if isinstance(exat, datetime.datetime): s = int(exat.microsecond / 1000000) exat = int(time.mktime(exat.timetuple())) + s pieces.append(exat) if pxat is not None: - pieces.append('PXAT') + pieces.append("PXAT") if isinstance(pxat, datetime.datetime): ms = int(pxat.microsecond / 1000) pxat = int(time.mktime(pxat.timetuple())) * 1000 + ms pieces.append(pxat) if keepttl: - pieces.append('KEEPTTL') + pieces.append("KEEPTTL") if nx: - pieces.append('NX') + pieces.append("NX") if xx: - pieces.append('XX') + pieces.append("XX") if get: - pieces.append('GET') + pieces.append("GET") options["get"] = True - return self.execute_command('SET', *pieces, **options) + return self.execute_command("SET", *pieces, **options) def __setitem__(self, name, value): self.set(name, value) @@ -1501,7 +1548,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/setbit """ value = value and 1 or 0 - return self.execute_command('SETBIT', name, offset, value) + return self.execute_command("SETBIT", name, offset, value) def setex(self, name, time, value): """ @@ -1513,7 +1560,7 @@ class BasicKeyCommands: """ if isinstance(time, datetime.timedelta): time = int(time.total_seconds()) - return self.execute_command('SETEX', name, time, value) + return self.execute_command("SETEX", name, time, value) def setnx(self, name, value): """ @@ -1521,7 +1568,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/setnx """ - return self.execute_command('SETNX', name, value) + return self.execute_command("SETNX", name, value) def setrange(self, name, offset, value): """ @@ -1536,10 +1583,20 @@ class BasicKeyCommands: For more information check https://redis.io/commands/setrange """ - return self.execute_command('SETRANGE', name, offset, value) - - def stralgo(self, algo, value1, value2, specific_argument='strings', - len=False, idx=False, minmatchlen=None, withmatchlen=False): + return self.execute_command("SETRANGE", name, offset, value) + + def stralgo( + self, + algo, + value1, + value2, + specific_argument="strings", + len=False, + idx=False, + minmatchlen=None, + withmatchlen=False, + **kwargs, + ): """ Implements complex algorithms that operate on strings. Right now the only algorithm implemented is the LCS algorithm @@ -1560,31 +1617,37 @@ class BasicKeyCommands: For more information check https://redis.io/commands/stralgo """ # check validity - supported_algo = ['LCS'] + supported_algo = ["LCS"] if algo not in supported_algo: - supported_algos_str = ', '.join(supported_algo) + supported_algos_str = ", ".join(supported_algo) raise DataError(f"The supported algorithms are: {supported_algos_str}") - if specific_argument not in ['keys', 'strings']: + if specific_argument not in ["keys", "strings"]: raise DataError("specific_argument can be only keys or strings") if len and idx: raise DataError("len and idx cannot be provided together.") pieces = [algo, specific_argument.upper(), value1, value2] if len: - pieces.append(b'LEN') + pieces.append(b"LEN") if idx: - pieces.append(b'IDX') + pieces.append(b"IDX") try: int(minmatchlen) - pieces.extend([b'MINMATCHLEN', minmatchlen]) + pieces.extend([b"MINMATCHLEN", minmatchlen]) except TypeError: pass if withmatchlen: - pieces.append(b'WITHMATCHLEN') - - return self.execute_command('STRALGO', *pieces, len=len, idx=idx, - minmatchlen=minmatchlen, - withmatchlen=withmatchlen) + pieces.append(b"WITHMATCHLEN") + + return self.execute_command( + "STRALGO", + *pieces, + len=len, + idx=idx, + minmatchlen=minmatchlen, + withmatchlen=withmatchlen, + **kwargs, + ) def strlen(self, name): """ @@ -1592,14 +1655,14 @@ class BasicKeyCommands: For more information check https://redis.io/commands/strlen """ - return self.execute_command('STRLEN', name) + return self.execute_command("STRLEN", name) def substr(self, name, start, end=-1): """ Return a substring of the string at key ``name``. ``start`` and ``end`` are 0-based integers specifying the portion of the string to return. """ - return self.execute_command('SUBSTR', name, start, end) + return self.execute_command("SUBSTR", name, start, end) def touch(self, *args): """ @@ -1608,7 +1671,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/touch """ - return self.execute_command('TOUCH', *args) + return self.execute_command("TOUCH", *args) def ttl(self, name): """ @@ -1616,7 +1679,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/ttl """ - return self.execute_command('TTL', name) + return self.execute_command("TTL", name) def type(self, name): """ @@ -1624,7 +1687,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/type """ - return self.execute_command('TYPE', name) + return self.execute_command("TYPE", name) def watch(self, *names): """ @@ -1632,7 +1695,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/type """ - warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object')) + warnings.warn(DeprecationWarning("Call WATCH from a Pipeline object")) def unwatch(self): """ @@ -1640,8 +1703,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/unwatch """ - warnings.warn( - DeprecationWarning('Call UNWATCH from a Pipeline object')) + warnings.warn(DeprecationWarning("Call UNWATCH from a Pipeline object")) def unlink(self, *names): """ @@ -1649,7 +1711,7 @@ class BasicKeyCommands: For more information check https://redis.io/commands/unlink """ - return self.execute_command('UNLINK', *names) + return self.execute_command("UNLINK", *names) class ListCommands: @@ -1657,6 +1719,7 @@ class ListCommands: Redis commands for List data type. see: https://redis.io/topics/data-types#lists """ + def blpop(self, keys, timeout=0): """ LPOP a value off of the first non-empty list @@ -1674,7 +1737,7 @@ class ListCommands: timeout = 0 keys = list_or_args(keys, None) keys.append(timeout) - return self.execute_command('BLPOP', *keys) + return self.execute_command("BLPOP", *keys) def brpop(self, keys, timeout=0): """ @@ -1693,7 +1756,7 @@ class ListCommands: timeout = 0 keys = list_or_args(keys, None) keys.append(timeout) - return self.execute_command('BRPOP', *keys) + return self.execute_command("BRPOP", *keys) def brpoplpush(self, src, dst, timeout=0): """ @@ -1708,7 +1771,7 @@ class ListCommands: """ if timeout is None: timeout = 0 - return self.execute_command('BRPOPLPUSH', src, dst, timeout) + return self.execute_command("BRPOPLPUSH", src, dst, timeout) def lindex(self, name, index): """ @@ -1719,7 +1782,7 @@ class ListCommands: For more information check https://redis.io/commands/lindex """ - return self.execute_command('LINDEX', name, index) + return self.execute_command("LINDEX", name, index) def linsert(self, name, where, refvalue, value): """ @@ -1731,7 +1794,7 @@ class ListCommands: For more information check https://redis.io/commands/linsert """ - return self.execute_command('LINSERT', name, where, refvalue, value) + return self.execute_command("LINSERT", name, where, refvalue, value) def llen(self, name): """ @@ -1739,7 +1802,7 @@ class ListCommands: For more information check https://redis.io/commands/llen """ - return self.execute_command('LLEN', name) + return self.execute_command("LLEN", name) def lpop(self, name, count=None): """ @@ -1752,9 +1815,9 @@ class ListCommands: For more information check https://redis.io/commands/lpop """ if count is not None: - return self.execute_command('LPOP', name, count) + return self.execute_command("LPOP", name, count) else: - return self.execute_command('LPOP', name) + return self.execute_command("LPOP", name) def lpush(self, name, *values): """ @@ -1762,7 +1825,7 @@ class ListCommands: For more information check https://redis.io/commands/lpush """ - return self.execute_command('LPUSH', name, *values) + return self.execute_command("LPUSH", name, *values) def lpushx(self, name, *values): """ @@ -1770,7 +1833,7 @@ class ListCommands: For more information check https://redis.io/commands/lpushx """ - return self.execute_command('LPUSHX', name, *values) + return self.execute_command("LPUSHX", name, *values) def lrange(self, name, start, end): """ @@ -1782,7 +1845,7 @@ class ListCommands: For more information check https://redis.io/commands/lrange """ - return self.execute_command('LRANGE', name, start, end) + return self.execute_command("LRANGE", name, start, end) def lrem(self, name, count, value): """ @@ -1796,7 +1859,7 @@ class ListCommands: For more information check https://redis.io/commands/lrem """ - return self.execute_command('LREM', name, count, value) + return self.execute_command("LREM", name, count, value) def lset(self, name, index, value): """ @@ -1804,7 +1867,7 @@ class ListCommands: For more information check https://redis.io/commands/lset """ - return self.execute_command('LSET', name, index, value) + return self.execute_command("LSET", name, index, value) def ltrim(self, name, start, end): """ @@ -1816,7 +1879,7 @@ class ListCommands: For more information check https://redis.io/commands/ltrim """ - return self.execute_command('LTRIM', name, start, end) + return self.execute_command("LTRIM", name, start, end) def rpop(self, name, count=None): """ @@ -1829,9 +1892,9 @@ class ListCommands: For more information check https://redis.io/commands/rpop """ if count is not None: - return self.execute_command('RPOP', name, count) + return self.execute_command("RPOP", name, count) else: - return self.execute_command('RPOP', name) + return self.execute_command("RPOP", name) def rpoplpush(self, src, dst): """ @@ -1840,7 +1903,7 @@ class ListCommands: For more information check https://redis.io/commands/rpoplpush """ - return self.execute_command('RPOPLPUSH', src, dst) + return self.execute_command("RPOPLPUSH", src, dst) def rpush(self, name, *values): """ @@ -1848,7 +1911,7 @@ class ListCommands: For more information check https://redis.io/commands/rpush """ - return self.execute_command('RPUSH', name, *values) + return self.execute_command("RPUSH", name, *values) def rpushx(self, name, value): """ @@ -1856,7 +1919,7 @@ class ListCommands: For more information check https://redis.io/commands/rpushx """ - return self.execute_command('RPUSHX', name, value) + return self.execute_command("RPUSHX", name, value) def lpos(self, name, value, rank=None, count=None, maxlen=None): """ @@ -1886,18 +1949,28 @@ class ListCommands: """ pieces = [name, value] if rank is not None: - pieces.extend(['RANK', rank]) + pieces.extend(["RANK", rank]) if count is not None: - pieces.extend(['COUNT', count]) + pieces.extend(["COUNT", count]) if maxlen is not None: - pieces.extend(['MAXLEN', maxlen]) - - return self.execute_command('LPOS', *pieces) - - def sort(self, name, start=None, num=None, by=None, get=None, - desc=False, alpha=False, store=None, groups=False): + pieces.extend(["MAXLEN", maxlen]) + + return self.execute_command("LPOS", *pieces) + + def sort( + self, + name, + start=None, + num=None, + by=None, + get=None, + desc=False, + alpha=False, + store=None, + groups=False, + ): """ Sort and return the list, set or sorted set at ``name``. @@ -1923,39 +1996,40 @@ class ListCommands: For more information check https://redis.io/commands/sort """ - if (start is not None and num is None) or \ - (num is not None and start is None): + if (start is not None and num is None) or (num is not None and start is None): raise DataError("``start`` and ``num`` must both be specified") pieces = [name] if by is not None: - pieces.extend([b'BY', by]) + pieces.extend([b"BY", by]) if start is not None and num is not None: - pieces.extend([b'LIMIT', start, num]) + pieces.extend([b"LIMIT", start, num]) if get is not None: # If get is a string assume we want to get a single value. # Otherwise assume it's an interable and we want to get multiple # values. We can't just iterate blindly because strings are # iterable. if isinstance(get, (bytes, str)): - pieces.extend([b'GET', get]) + pieces.extend([b"GET", get]) else: for g in get: - pieces.extend([b'GET', g]) + pieces.extend([b"GET", g]) if desc: - pieces.append(b'DESC') + pieces.append(b"DESC") if alpha: - pieces.append(b'ALPHA') + pieces.append(b"ALPHA") if store is not None: - pieces.extend([b'STORE', store]) + pieces.extend([b"STORE", store]) if groups: if not get or isinstance(get, (bytes, str)) or len(get) < 2: - raise DataError('when using "groups" the "get" argument ' - 'must be specified and contain at least ' - 'two keys') + raise DataError( + 'when using "groups" the "get" argument ' + "must be specified and contain at least " + "two keys" + ) - options = {'groups': len(get) if groups else None} - return self.execute_command('SORT', *pieces, **options) + options = {"groups": len(get) if groups else None} + return self.execute_command("SORT", *pieces, **options) class ScanCommands: @@ -1963,7 +2037,8 @@ class ScanCommands: Redis SCAN commands. see: https://redis.io/commands/scan """ - def scan(self, cursor=0, match=None, count=None, _type=None): + + def scan(self, cursor=0, match=None, count=None, _type=None, **kwargs): """ Incrementally return lists of key names. Also return a cursor indicating the scan position. @@ -1982,14 +2057,14 @@ class ScanCommands: """ pieces = [cursor] if match is not None: - pieces.extend([b'MATCH', match]) + pieces.extend([b"MATCH", match]) if count is not None: - pieces.extend([b'COUNT', count]) + pieces.extend([b"COUNT", count]) if _type is not None: - pieces.extend([b'TYPE', _type]) - return self.execute_command('SCAN', *pieces) + pieces.extend([b"TYPE", _type]) + return self.execute_command("SCAN", *pieces, **kwargs) - def scan_iter(self, match=None, count=None, _type=None): + def scan_iter(self, match=None, count=None, _type=None, **kwargs): """ Make an iterator using the SCAN command so that the client doesn't need to remember the cursor position. @@ -2004,10 +2079,11 @@ class ScanCommands: HASH, LIST, SET, STREAM, STRING, ZSET Additionally, Redis modules can expose other types as well. """ - cursor = '0' + cursor = "0" while cursor != 0: - cursor, data = self.scan(cursor=cursor, match=match, - count=count, _type=_type) + cursor, data = self.scan( + cursor=cursor, match=match, count=count, _type=_type, **kwargs + ) yield from data def sscan(self, name, cursor=0, match=None, count=None): @@ -2023,10 +2099,10 @@ class ScanCommands: """ pieces = [name, cursor] if match is not None: - pieces.extend([b'MATCH', match]) + pieces.extend([b"MATCH", match]) if count is not None: - pieces.extend([b'COUNT', count]) - return self.execute_command('SSCAN', *pieces) + pieces.extend([b"COUNT", count]) + return self.execute_command("SSCAN", *pieces) def sscan_iter(self, name, match=None, count=None): """ @@ -2037,10 +2113,9 @@ class ScanCommands: ``count`` allows for hint the minimum number of returns """ - cursor = '0' + cursor = "0" while cursor != 0: - cursor, data = self.sscan(name, cursor=cursor, - match=match, count=count) + cursor, data = self.sscan(name, cursor=cursor, match=match, count=count) yield from data def hscan(self, name, cursor=0, match=None, count=None): @@ -2056,10 +2131,10 @@ class ScanCommands: """ pieces = [name, cursor] if match is not None: - pieces.extend([b'MATCH', match]) + pieces.extend([b"MATCH", match]) if count is not None: - pieces.extend([b'COUNT', count]) - return self.execute_command('HSCAN', *pieces) + pieces.extend([b"COUNT", count]) + return self.execute_command("HSCAN", *pieces) def hscan_iter(self, name, match=None, count=None): """ @@ -2070,14 +2145,12 @@ class ScanCommands: ``count`` allows for hint the minimum number of returns """ - cursor = '0' + cursor = "0" while cursor != 0: - cursor, data = self.hscan(name, cursor=cursor, - match=match, count=count) + cursor, data = self.hscan(name, cursor=cursor, match=match, count=count) yield from data.items() - def zscan(self, name, cursor=0, match=None, count=None, - score_cast_func=float): + def zscan(self, name, cursor=0, match=None, count=None, score_cast_func=float): """ Incrementally return lists of elements in a sorted set. Also return a cursor indicating the scan position. @@ -2092,14 +2165,13 @@ class ScanCommands: """ pieces = [name, cursor] if match is not None: - pieces.extend([b'MATCH', match]) + pieces.extend([b"MATCH", match]) if count is not None: - pieces.extend([b'COUNT', count]) - options = {'score_cast_func': score_cast_func} - return self.execute_command('ZSCAN', *pieces, **options) + pieces.extend([b"COUNT", count]) + options = {"score_cast_func": score_cast_func} + return self.execute_command("ZSCAN", *pieces, **options) - def zscan_iter(self, name, match=None, count=None, - score_cast_func=float): + def zscan_iter(self, name, match=None, count=None, score_cast_func=float): """ Make an iterator using the ZSCAN command so that the client doesn't need to remember the cursor position. @@ -2110,11 +2182,15 @@ class ScanCommands: ``score_cast_func`` a callable used to cast the score return value """ - cursor = '0' + cursor = "0" while cursor != 0: - cursor, data = self.zscan(name, cursor=cursor, match=match, - count=count, - score_cast_func=score_cast_func) + cursor, data = self.zscan( + name, + cursor=cursor, + match=match, + count=count, + score_cast_func=score_cast_func, + ) yield from data @@ -2123,13 +2199,14 @@ class SetCommands: Redis commands for Set data type. see: https://redis.io/topics/data-types#sets """ + def sadd(self, name, *values): """ Add ``value(s)`` to set ``name`` For more information check https://redis.io/commands/sadd """ - return self.execute_command('SADD', name, *values) + return self.execute_command("SADD", name, *values) def scard(self, name): """ @@ -2137,7 +2214,7 @@ class SetCommands: For more information check https://redis.io/commands/scard """ - return self.execute_command('SCARD', name) + return self.execute_command("SCARD", name) def sdiff(self, keys, *args): """ @@ -2146,7 +2223,7 @@ class SetCommands: For more information check https://redis.io/commands/sdiff """ args = list_or_args(keys, args) - return self.execute_command('SDIFF', *args) + return self.execute_command("SDIFF", *args) def sdiffstore(self, dest, keys, *args): """ @@ -2156,7 +2233,7 @@ class SetCommands: For more information check https://redis.io/commands/sdiffstore """ args = list_or_args(keys, args) - return self.execute_command('SDIFFSTORE', dest, *args) + return self.execute_command("SDIFFSTORE", dest, *args) def sinter(self, keys, *args): """ @@ -2165,7 +2242,7 @@ class SetCommands: For more information check https://redis.io/commands/sinter """ args = list_or_args(keys, args) - return self.execute_command('SINTER', *args) + return self.execute_command("SINTER", *args) def sinterstore(self, dest, keys, *args): """ @@ -2175,7 +2252,7 @@ class SetCommands: For more information check https://redis.io/commands/sinterstore """ args = list_or_args(keys, args) - return self.execute_command('SINTERSTORE', dest, *args) + return self.execute_command("SINTERSTORE", dest, *args) def sismember(self, name, value): """ @@ -2183,7 +2260,7 @@ class SetCommands: For more information check https://redis.io/commands/sismember """ - return self.execute_command('SISMEMBER', name, value) + return self.execute_command("SISMEMBER", name, value) def smembers(self, name): """ @@ -2191,7 +2268,7 @@ class SetCommands: For more information check https://redis.io/commands/smembers """ - return self.execute_command('SMEMBERS', name) + return self.execute_command("SMEMBERS", name) def smismember(self, name, values, *args): """ @@ -2201,7 +2278,7 @@ class SetCommands: For more information check https://redis.io/commands/smismember """ args = list_or_args(values, args) - return self.execute_command('SMISMEMBER', name, *args) + return self.execute_command("SMISMEMBER", name, *args) def smove(self, src, dst, value): """ @@ -2209,7 +2286,7 @@ class SetCommands: For more information check https://redis.io/commands/smove """ - return self.execute_command('SMOVE', src, dst, value) + return self.execute_command("SMOVE", src, dst, value) def spop(self, name, count=None): """ @@ -2218,7 +2295,7 @@ class SetCommands: For more information check https://redis.io/commands/spop """ args = (count is not None) and [count] or [] - return self.execute_command('SPOP', name, *args) + return self.execute_command("SPOP", name, *args) def srandmember(self, name, number=None): """ @@ -2231,7 +2308,7 @@ class SetCommands: For more information check https://redis.io/commands/srandmember """ args = (number is not None) and [number] or [] - return self.execute_command('SRANDMEMBER', name, *args) + return self.execute_command("SRANDMEMBER", name, *args) def srem(self, name, *values): """ @@ -2239,7 +2316,7 @@ class SetCommands: For more information check https://redis.io/commands/srem """ - return self.execute_command('SREM', name, *values) + return self.execute_command("SREM", name, *values) def sunion(self, keys, *args): """ @@ -2248,7 +2325,7 @@ class SetCommands: For more information check https://redis.io/commands/sunion """ args = list_or_args(keys, args) - return self.execute_command('SUNION', *args) + return self.execute_command("SUNION", *args) def sunionstore(self, dest, keys, *args): """ @@ -2258,7 +2335,7 @@ class SetCommands: For more information check https://redis.io/commands/sunionstore """ args = list_or_args(keys, args) - return self.execute_command('SUNIONSTORE', dest, *args) + return self.execute_command("SUNIONSTORE", dest, *args) class StreamCommands: @@ -2266,6 +2343,7 @@ class StreamCommands: Redis commands for Stream data type. see: https://redis.io/topics/streams-intro """ + def xack(self, name, groupname, *ids): """ Acknowledges the successful processing of one or more messages. @@ -2275,10 +2353,19 @@ class StreamCommands: For more information check https://redis.io/commands/xack """ - return self.execute_command('XACK', name, groupname, *ids) + return self.execute_command("XACK", name, groupname, *ids) - def xadd(self, name, fields, id='*', maxlen=None, approximate=True, - nomkstream=False, minid=None, limit=None): + def xadd( + self, + name, + fields, + id="*", + maxlen=None, + approximate=True, + nomkstream=False, + minid=None, + limit=None, + ): """ Add to a stream. name: name of the stream @@ -2296,34 +2383,43 @@ class StreamCommands: """ pieces = [] if maxlen is not None and minid is not None: - raise DataError("Only one of ```maxlen``` or ```minid``` " - "may be specified") + raise DataError( + "Only one of ```maxlen``` or ```minid``` " "may be specified" + ) if maxlen is not None: if not isinstance(maxlen, int) or maxlen < 1: - raise DataError('XADD maxlen must be a positive integer') - pieces.append(b'MAXLEN') + raise DataError("XADD maxlen must be a positive integer") + pieces.append(b"MAXLEN") if approximate: - pieces.append(b'~') + pieces.append(b"~") pieces.append(str(maxlen)) if minid is not None: - pieces.append(b'MINID') + pieces.append(b"MINID") if approximate: - pieces.append(b'~') + pieces.append(b"~") pieces.append(minid) if limit is not None: - pieces.extend([b'LIMIT', limit]) + pieces.extend([b"LIMIT", limit]) if nomkstream: - pieces.append(b'NOMKSTREAM') + pieces.append(b"NOMKSTREAM") pieces.append(id) if not isinstance(fields, dict) or len(fields) == 0: - raise DataError('XADD fields must be a non-empty dict') + raise DataError("XADD fields must be a non-empty dict") for pair in fields.items(): pieces.extend(pair) - return self.execute_command('XADD', name, *pieces) - - def xautoclaim(self, name, groupname, consumername, min_idle_time, - start_id=0, count=None, justid=False): + return self.execute_command("XADD", name, *pieces) + + def xautoclaim( + self, + name, + groupname, + consumername, + min_idle_time, + start_id=0, + count=None, + justid=False, + ): """ Transfers ownership of pending stream entries that match the specified criteria. Conceptually, equivalent to calling XPENDING and then XCLAIM, @@ -2344,8 +2440,9 @@ class StreamCommands: """ try: if int(min_idle_time) < 0: - raise DataError("XAUTOCLAIM min_idle_time must be a non" - "negative integer") + raise DataError( + "XAUTOCLAIM min_idle_time must be a non" "negative integer" + ) except TypeError: pass @@ -2355,18 +2452,28 @@ class StreamCommands: try: if int(count) < 0: raise DataError("XPENDING count must be a integer >= 0") - pieces.extend([b'COUNT', count]) + pieces.extend([b"COUNT", count]) except TypeError: pass if justid: - pieces.append(b'JUSTID') - kwargs['parse_justid'] = True - - return self.execute_command('XAUTOCLAIM', *pieces, **kwargs) - - def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, - idle=None, time=None, retrycount=None, force=False, - justid=False): + pieces.append(b"JUSTID") + kwargs["parse_justid"] = True + + return self.execute_command("XAUTOCLAIM", *pieces, **kwargs) + + def xclaim( + self, + name, + groupname, + consumername, + min_idle_time, + message_ids, + idle=None, + time=None, + retrycount=None, + force=False, + justid=False, + ): """ Changes the ownership of a pending message. name: name of the stream. @@ -2392,11 +2499,12 @@ class StreamCommands: For more information check https://redis.io/commands/xclaim """ if not isinstance(min_idle_time, int) or min_idle_time < 0: - raise DataError("XCLAIM min_idle_time must be a non negative " - "integer") + raise DataError("XCLAIM min_idle_time must be a non negative " "integer") if not isinstance(message_ids, (list, tuple)) or not message_ids: - raise DataError("XCLAIM message_ids must be a non empty list or " - "tuple of message IDs to claim") + raise DataError( + "XCLAIM message_ids must be a non empty list or " + "tuple of message IDs to claim" + ) kwargs = {} pieces = [name, groupname, consumername, str(min_idle_time)] @@ -2405,26 +2513,26 @@ class StreamCommands: if idle is not None: if not isinstance(idle, int): raise DataError("XCLAIM idle must be an integer") - pieces.extend((b'IDLE', str(idle))) + pieces.extend((b"IDLE", str(idle))) if time is not None: if not isinstance(time, int): raise DataError("XCLAIM time must be an integer") - pieces.extend((b'TIME', str(time))) + pieces.extend((b"TIME", str(time))) if retrycount is not None: if not isinstance(retrycount, int): raise DataError("XCLAIM retrycount must be an integer") - pieces.extend((b'RETRYCOUNT', str(retrycount))) + pieces.extend((b"RETRYCOUNT", str(retrycount))) if force: if not isinstance(force, bool): raise DataError("XCLAIM force must be a boolean") - pieces.append(b'FORCE') + pieces.append(b"FORCE") if justid: if not isinstance(justid, bool): raise DataError("XCLAIM justid must be a boolean") - pieces.append(b'JUSTID') - kwargs['parse_justid'] = True - return self.execute_command('XCLAIM', *pieces, **kwargs) + pieces.append(b"JUSTID") + kwargs["parse_justid"] = True + return self.execute_command("XCLAIM", *pieces, **kwargs) def xdel(self, name, *ids): """ @@ -2434,9 +2542,9 @@ class StreamCommands: For more information check https://redis.io/commands/xdel """ - return self.execute_command('XDEL', name, *ids) + return self.execute_command("XDEL", name, *ids) - def xgroup_create(self, name, groupname, id='$', mkstream=False): + def xgroup_create(self, name, groupname, id="$", mkstream=False): """ Create a new consumer group associated with a stream. name: name of the stream. @@ -2445,9 +2553,9 @@ class StreamCommands: For more information check https://redis.io/commands/xgroup-create """ - pieces = ['XGROUP CREATE', name, groupname, id] + pieces = ["XGROUP CREATE", name, groupname, id] if mkstream: - pieces.append(b'MKSTREAM') + pieces.append(b"MKSTREAM") return self.execute_command(*pieces) def xgroup_delconsumer(self, name, groupname, consumername): @@ -2461,8 +2569,7 @@ class StreamCommands: For more information check https://redis.io/commands/xgroup-delconsumer """ - return self.execute_command('XGROUP DELCONSUMER', name, groupname, - consumername) + return self.execute_command("XGROUP DELCONSUMER", name, groupname, consumername) def xgroup_destroy(self, name, groupname): """ @@ -2472,7 +2579,7 @@ class StreamCommands: For more information check https://redis.io/commands/xgroup-destroy """ - return self.execute_command('XGROUP DESTROY', name, groupname) + return self.execute_command("XGROUP DESTROY", name, groupname) def xgroup_createconsumer(self, name, groupname, consumername): """ @@ -2485,8 +2592,9 @@ class StreamCommands: See: https://redis.io/commands/xgroup-createconsumer """ - return self.execute_command('XGROUP CREATECONSUMER', name, groupname, - consumername) + return self.execute_command( + "XGROUP CREATECONSUMER", name, groupname, consumername + ) def xgroup_setid(self, name, groupname, id): """ @@ -2497,7 +2605,7 @@ class StreamCommands: For more information check https://redis.io/commands/xgroup-setid """ - return self.execute_command('XGROUP SETID', name, groupname, id) + return self.execute_command("XGROUP SETID", name, groupname, id) def xinfo_consumers(self, name, groupname): """ @@ -2507,7 +2615,7 @@ class StreamCommands: For more information check https://redis.io/commands/xinfo-consumers """ - return self.execute_command('XINFO CONSUMERS', name, groupname) + return self.execute_command("XINFO CONSUMERS", name, groupname) def xinfo_groups(self, name): """ @@ -2516,7 +2624,7 @@ class StreamCommands: For more information check https://redis.io/commands/xinfo-groups """ - return self.execute_command('XINFO GROUPS', name) + return self.execute_command("XINFO GROUPS", name) def xinfo_stream(self, name, full=False): """ @@ -2529,9 +2637,9 @@ class StreamCommands: pieces = [name] options = {} if full: - pieces.append(b'FULL') - options = {'full': full} - return self.execute_command('XINFO STREAM', *pieces, **options) + pieces.append(b"FULL") + options = {"full": full} + return self.execute_command("XINFO STREAM", *pieces, **options) def xlen(self, name): """ @@ -2539,7 +2647,7 @@ class StreamCommands: For more information check https://redis.io/commands/xlen """ - return self.execute_command('XLEN', name) + return self.execute_command("XLEN", name) def xpending(self, name, groupname): """ @@ -2549,11 +2657,18 @@ class StreamCommands: For more information check https://redis.io/commands/xpending """ - return self.execute_command('XPENDING', name, groupname) + return self.execute_command("XPENDING", name, groupname) - def xpending_range(self, name, groupname, idle=None, - min=None, max=None, count=None, - consumername=None): + def xpending_range( + self, + name, + groupname, + idle=None, + min=None, + max=None, + count=None, + consumername=None, + ): """ Returns information about pending messages, in a range. @@ -2568,20 +2683,24 @@ class StreamCommands: """ if {min, max, count} == {None}: if idle is not None or consumername is not None: - raise DataError("if XPENDING is provided with idle time" - " or consumername, it must be provided" - " with min, max and count parameters") + raise DataError( + "if XPENDING is provided with idle time" + " or consumername, it must be provided" + " with min, max and count parameters" + ) return self.xpending(name, groupname) pieces = [name, groupname] if min is None or max is None or count is None: - raise DataError("XPENDING must be provided with min, max " - "and count parameters, or none of them.") + raise DataError( + "XPENDING must be provided with min, max " + "and count parameters, or none of them." + ) # idle try: if int(idle) < 0: raise DataError("XPENDING idle must be a integer >= 0") - pieces.extend(['IDLE', idle]) + pieces.extend(["IDLE", idle]) except TypeError: pass # count @@ -2595,9 +2714,9 @@ class StreamCommands: if consumername: pieces.append(consumername) - return self.execute_command('XPENDING', *pieces, parse_detail=True) + return self.execute_command("XPENDING", *pieces, parse_detail=True) - def xrange(self, name, min='-', max='+', count=None): + def xrange(self, name, min="-", max="+", count=None): """ Read stream values within an interval. name: name of the stream. @@ -2613,11 +2732,11 @@ class StreamCommands: pieces = [min, max] if count is not None: if not isinstance(count, int) or count < 1: - raise DataError('XRANGE count must be a positive integer') - pieces.append(b'COUNT') + raise DataError("XRANGE count must be a positive integer") + pieces.append(b"COUNT") pieces.append(str(count)) - return self.execute_command('XRANGE', name, *pieces) + return self.execute_command("XRANGE", name, *pieces) def xread(self, streams, count=None, block=None): """ @@ -2633,24 +2752,25 @@ class StreamCommands: pieces = [] if block is not None: if not isinstance(block, int) or block < 0: - raise DataError('XREAD block must be a non-negative integer') - pieces.append(b'BLOCK') + raise DataError("XREAD block must be a non-negative integer") + pieces.append(b"BLOCK") pieces.append(str(block)) if count is not None: if not isinstance(count, int) or count < 1: - raise DataError('XREAD count must be a positive integer') - pieces.append(b'COUNT') + raise DataError("XREAD count must be a positive integer") + pieces.append(b"COUNT") pieces.append(str(count)) if not isinstance(streams, dict) or len(streams) == 0: - raise DataError('XREAD streams must be a non empty dict') - pieces.append(b'STREAMS') + raise DataError("XREAD streams must be a non empty dict") + pieces.append(b"STREAMS") keys, values = zip(*streams.items()) pieces.extend(keys) pieces.extend(values) - return self.execute_command('XREAD', *pieces) + return self.execute_command("XREAD", *pieces) - def xreadgroup(self, groupname, consumername, streams, count=None, - block=None, noack=False): + def xreadgroup( + self, groupname, consumername, streams, count=None, block=None, noack=False + ): """ Read from a stream via a consumer group. groupname: name of the consumer group. @@ -2664,28 +2784,27 @@ class StreamCommands: For more information check https://redis.io/commands/xreadgroup """ - pieces = [b'GROUP', groupname, consumername] + pieces = [b"GROUP", groupname, consumername] if count is not None: if not isinstance(count, int) or count < 1: raise DataError("XREADGROUP count must be a positive integer") - pieces.append(b'COUNT') + pieces.append(b"COUNT") pieces.append(str(count)) if block is not None: if not isinstance(block, int) or block < 0: - raise DataError("XREADGROUP block must be a non-negative " - "integer") - pieces.append(b'BLOCK') + raise DataError("XREADGROUP block must be a non-negative " "integer") + pieces.append(b"BLOCK") pieces.append(str(block)) if noack: - pieces.append(b'NOACK') + pieces.append(b"NOACK") if not isinstance(streams, dict) or len(streams) == 0: - raise DataError('XREADGROUP streams must be a non empty dict') - pieces.append(b'STREAMS') + raise DataError("XREADGROUP streams must be a non empty dict") + pieces.append(b"STREAMS") pieces.extend(streams.keys()) pieces.extend(streams.values()) - return self.execute_command('XREADGROUP', *pieces) + return self.execute_command("XREADGROUP", *pieces) - def xrevrange(self, name, max='+', min='-', count=None): + def xrevrange(self, name, max="+", min="-", count=None): """ Read stream values within an interval, in reverse order. name: name of the stream @@ -2701,14 +2820,13 @@ class StreamCommands: pieces = [max, min] if count is not None: if not isinstance(count, int) or count < 1: - raise DataError('XREVRANGE count must be a positive integer') - pieces.append(b'COUNT') + raise DataError("XREVRANGE count must be a positive integer") + pieces.append(b"COUNT") pieces.append(str(count)) - return self.execute_command('XREVRANGE', name, *pieces) + return self.execute_command("XREVRANGE", name, *pieces) - def xtrim(self, name, maxlen=None, approximate=True, minid=None, - limit=None): + def xtrim(self, name, maxlen=None, approximate=True, minid=None, limit=None): """ Trims old messages from a stream. name: name of the stream. @@ -2723,15 +2841,14 @@ class StreamCommands: """ pieces = [] if maxlen is not None and minid is not None: - raise DataError("Only one of ``maxlen`` or ``minid`` " - "may be specified") + raise DataError("Only one of ``maxlen`` or ``minid`` " "may be specified") if maxlen is not None: - pieces.append(b'MAXLEN') + pieces.append(b"MAXLEN") if minid is not None: - pieces.append(b'MINID') + pieces.append(b"MINID") if approximate: - pieces.append(b'~') + pieces.append(b"~") if maxlen is not None: pieces.append(maxlen) if minid is not None: @@ -2740,7 +2857,7 @@ class StreamCommands: pieces.append(b"LIMIT") pieces.append(limit) - return self.execute_command('XTRIM', name, *pieces) + return self.execute_command("XTRIM", name, *pieces) class SortedSetCommands: @@ -2748,8 +2865,10 @@ class SortedSetCommands: Redis commands for Sorted Sets data type. see: https://redis.io/topics/data-types-intro#redis-sorted-sets """ - def zadd(self, name, mapping, nx=False, xx=False, ch=False, incr=False, - gt=None, lt=None): + + def zadd( + self, name, mapping, nx=False, xx=False, ch=False, incr=False, gt=None, lt=None + ): """ Set any number of element-name, score pairs to the key ``name``. Pairs are specified as a dict of element-names keys to score values. @@ -2788,30 +2907,32 @@ class SortedSetCommands: if nx and xx: raise DataError("ZADD allows either 'nx' or 'xx', not both") if incr and len(mapping) != 1: - raise DataError("ZADD option 'incr' only works when passing a " - "single element/score pair") + raise DataError( + "ZADD option 'incr' only works when passing a " + "single element/score pair" + ) if nx is True and (gt is not None or lt is not None): raise DataError("Only one of 'nx', 'lt', or 'gr' may be defined.") pieces = [] options = {} if nx: - pieces.append(b'NX') + pieces.append(b"NX") if xx: - pieces.append(b'XX') + pieces.append(b"XX") if ch: - pieces.append(b'CH') + pieces.append(b"CH") if incr: - pieces.append(b'INCR') - options['as_score'] = True + pieces.append(b"INCR") + options["as_score"] = True if gt: - pieces.append(b'GT') + pieces.append(b"GT") if lt: - pieces.append(b'LT') + pieces.append(b"LT") for pair in mapping.items(): pieces.append(pair[1]) pieces.append(pair[0]) - return self.execute_command('ZADD', name, *pieces, **options) + return self.execute_command("ZADD", name, *pieces, **options) def zcard(self, name): """ @@ -2819,7 +2940,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zcard """ - return self.execute_command('ZCARD', name) + return self.execute_command("ZCARD", name) def zcount(self, name, min, max): """ @@ -2828,7 +2949,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zcount """ - return self.execute_command('ZCOUNT', name, min, max) + return self.execute_command("ZCOUNT", name, min, max) def zdiff(self, keys, withscores=False): """ @@ -2858,7 +2979,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zincrby """ - return self.execute_command('ZINCRBY', name, amount, value) + return self.execute_command("ZINCRBY", name, amount, value) def zinter(self, keys, aggregate=None, withscores=False): """ @@ -2872,8 +2993,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zinter """ - return self._zaggregate('ZINTER', None, keys, aggregate, - withscores=withscores) + return self._zaggregate("ZINTER", None, keys, aggregate, withscores=withscores) def zinterstore(self, dest, keys, aggregate=None): """ @@ -2887,7 +3007,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zinterstore """ - return self._zaggregate('ZINTERSTORE', dest, keys, aggregate) + return self._zaggregate("ZINTERSTORE", dest, keys, aggregate) def zlexcount(self, name, min, max): """ @@ -2896,7 +3016,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zlexcount """ - return self.execute_command('ZLEXCOUNT', name, min, max) + return self.execute_command("ZLEXCOUNT", name, min, max) def zpopmax(self, name, count=None): """ @@ -2906,10 +3026,8 @@ class SortedSetCommands: For more information check https://redis.io/commands/zpopmax """ args = (count is not None) and [count] or [] - options = { - 'withscores': True - } - return self.execute_command('ZPOPMAX', name, *args, **options) + options = {"withscores": True} + return self.execute_command("ZPOPMAX", name, *args, **options) def zpopmin(self, name, count=None): """ @@ -2919,10 +3037,8 @@ class SortedSetCommands: For more information check https://redis.io/commands/zpopmin """ args = (count is not None) and [count] or [] - options = { - 'withscores': True - } - return self.execute_command('ZPOPMIN', name, *args, **options) + options = {"withscores": True} + return self.execute_command("ZPOPMIN", name, *args, **options) def zrandmember(self, key, count=None, withscores=False): """ @@ -2965,7 +3081,7 @@ class SortedSetCommands: timeout = 0 keys = list_or_args(keys, None) keys.append(timeout) - return self.execute_command('BZPOPMAX', *keys) + return self.execute_command("BZPOPMAX", *keys) def bzpopmin(self, keys, timeout=0): """ @@ -2984,43 +3100,63 @@ class SortedSetCommands: timeout = 0 keys = list_or_args(keys, None) keys.append(timeout) - return self.execute_command('BZPOPMIN', *keys) - - def _zrange(self, command, dest, name, start, end, desc=False, - byscore=False, bylex=False, withscores=False, - score_cast_func=float, offset=None, num=None): + return self.execute_command("BZPOPMIN", *keys) + + def _zrange( + self, + command, + dest, + name, + start, + end, + desc=False, + byscore=False, + bylex=False, + withscores=False, + score_cast_func=float, + offset=None, + num=None, + ): if byscore and bylex: - raise DataError("``byscore`` and ``bylex`` can not be " - "specified together.") - if (offset is not None and num is None) or \ - (num is not None and offset is None): + raise DataError( + "``byscore`` and ``bylex`` can not be " "specified together." + ) + if (offset is not None and num is None) or (num is not None and offset is None): raise DataError("``offset`` and ``num`` must both be specified.") if bylex and withscores: - raise DataError("``withscores`` not supported in combination " - "with ``bylex``.") + raise DataError( + "``withscores`` not supported in combination " "with ``bylex``." + ) pieces = [command] if dest: pieces.append(dest) pieces.extend([name, start, end]) if byscore: - pieces.append('BYSCORE') + pieces.append("BYSCORE") if bylex: - pieces.append('BYLEX') + pieces.append("BYLEX") if desc: - pieces.append('REV') + pieces.append("REV") if offset is not None and num is not None: - pieces.extend(['LIMIT', offset, num]) + pieces.extend(["LIMIT", offset, num]) if withscores: - pieces.append('WITHSCORES') - options = { - 'withscores': withscores, - 'score_cast_func': score_cast_func - } + pieces.append("WITHSCORES") + options = {"withscores": withscores, "score_cast_func": score_cast_func} return self.execute_command(*pieces, **options) - def zrange(self, name, start, end, desc=False, withscores=False, - score_cast_func=float, byscore=False, bylex=False, - offset=None, num=None): + def zrange( + self, + name, + start, + end, + desc=False, + withscores=False, + score_cast_func=float, + byscore=False, + bylex=False, + offset=None, + num=None, + ): """ Return a range of values from sorted set ``name`` between ``start`` and ``end`` sorted in ascending order. @@ -3051,16 +3187,25 @@ class SortedSetCommands: """ # Need to support ``desc`` also when using old redis version # because it was supported in 3.5.3 (of redis-py) - if not byscore and not bylex and (offset is None and num is None) \ - and desc: - return self.zrevrange(name, start, end, withscores, - score_cast_func) - - return self._zrange('ZRANGE', None, name, start, end, desc, byscore, - bylex, withscores, score_cast_func, offset, num) + if not byscore and not bylex and (offset is None and num is None) and desc: + return self.zrevrange(name, start, end, withscores, score_cast_func) + + return self._zrange( + "ZRANGE", + None, + name, + start, + end, + desc, + byscore, + bylex, + withscores, + score_cast_func, + offset, + num, + ) - def zrevrange(self, name, start, end, withscores=False, - score_cast_func=float): + def zrevrange(self, name, start, end, withscores=False, score_cast_func=float): """ Return a range of values from sorted set ``name`` between ``start`` and ``end`` sorted in descending order. @@ -3074,18 +3219,24 @@ class SortedSetCommands: For more information check https://redis.io/commands/zrevrange """ - pieces = ['ZREVRANGE', name, start, end] + pieces = ["ZREVRANGE", name, start, end] if withscores: - pieces.append(b'WITHSCORES') - options = { - 'withscores': withscores, - 'score_cast_func': score_cast_func - } + pieces.append(b"WITHSCORES") + options = {"withscores": withscores, "score_cast_func": score_cast_func} return self.execute_command(*pieces, **options) - def zrangestore(self, dest, name, start, end, - byscore=False, bylex=False, desc=False, - offset=None, num=None): + def zrangestore( + self, + dest, + name, + start, + end, + byscore=False, + bylex=False, + desc=False, + offset=None, + num=None, + ): """ Stores in ``dest`` the result of a range of values from sorted set ``name`` between ``start`` and ``end`` sorted in ascending order. @@ -3109,8 +3260,20 @@ class SortedSetCommands: For more information check https://redis.io/commands/zrangestore """ - return self._zrange('ZRANGESTORE', dest, name, start, end, desc, - byscore, bylex, False, None, offset, num) + return self._zrange( + "ZRANGESTORE", + dest, + name, + start, + end, + desc, + byscore, + bylex, + False, + None, + offset, + num, + ) def zrangebylex(self, name, min, max, start=None, num=None): """ @@ -3122,12 +3285,11 @@ class SortedSetCommands: For more information check https://redis.io/commands/zrangebylex """ - if (start is not None and num is None) or \ - (num is not None and start is None): + if (start is not None and num is None) or (num is not None and start is None): raise DataError("``start`` and ``num`` must both be specified") - pieces = ['ZRANGEBYLEX', name, min, max] + pieces = ["ZRANGEBYLEX", name, min, max] if start is not None and num is not None: - pieces.extend([b'LIMIT', start, num]) + pieces.extend([b"LIMIT", start, num]) return self.execute_command(*pieces) def zrevrangebylex(self, name, max, min, start=None, num=None): @@ -3140,16 +3302,23 @@ class SortedSetCommands: For more information check https://redis.io/commands/zrevrangebylex """ - if (start is not None and num is None) or \ - (num is not None and start is None): + if (start is not None and num is None) or (num is not None and start is None): raise DataError("``start`` and ``num`` must both be specified") - pieces = ['ZREVRANGEBYLEX', name, max, min] + pieces = ["ZREVRANGEBYLEX", name, max, min] if start is not None and num is not None: - pieces.extend(['LIMIT', start, num]) + pieces.extend(["LIMIT", start, num]) return self.execute_command(*pieces) - def zrangebyscore(self, name, min, max, start=None, num=None, - withscores=False, score_cast_func=float): + def zrangebyscore( + self, + name, + min, + max, + start=None, + num=None, + withscores=False, + score_cast_func=float, + ): """ Return a range of values from the sorted set ``name`` with scores between ``min`` and ``max``. @@ -3164,22 +3333,26 @@ class SortedSetCommands: For more information check https://redis.io/commands/zrangebyscore """ - if (start is not None and num is None) or \ - (num is not None and start is None): + if (start is not None and num is None) or (num is not None and start is None): raise DataError("``start`` and ``num`` must both be specified") - pieces = ['ZRANGEBYSCORE', name, min, max] + pieces = ["ZRANGEBYSCORE", name, min, max] if start is not None and num is not None: - pieces.extend(['LIMIT', start, num]) + pieces.extend(["LIMIT", start, num]) if withscores: - pieces.append('WITHSCORES') - options = { - 'withscores': withscores, - 'score_cast_func': score_cast_func - } + pieces.append("WITHSCORES") + options = {"withscores": withscores, "score_cast_func": score_cast_func} return self.execute_command(*pieces, **options) - def zrevrangebyscore(self, name, max, min, start=None, num=None, - withscores=False, score_cast_func=float): + def zrevrangebyscore( + self, + name, + max, + min, + start=None, + num=None, + withscores=False, + score_cast_func=float, + ): """ Return a range of values from the sorted set ``name`` with scores between ``min`` and ``max`` in descending order. @@ -3194,18 +3367,14 @@ class SortedSetCommands: For more information check https://redis.io/commands/zrevrangebyscore """ - if (start is not None and num is None) or \ - (num is not None and start is None): + if (start is not None and num is None) or (num is not None and start is None): raise DataError("``start`` and ``num`` must both be specified") - pieces = ['ZREVRANGEBYSCORE', name, max, min] + pieces = ["ZREVRANGEBYSCORE", name, max, min] if start is not None and num is not None: - pieces.extend(['LIMIT', start, num]) + pieces.extend(["LIMIT", start, num]) if withscores: - pieces.append('WITHSCORES') - options = { - 'withscores': withscores, - 'score_cast_func': score_cast_func - } + pieces.append("WITHSCORES") + options = {"withscores": withscores, "score_cast_func": score_cast_func} return self.execute_command(*pieces, **options) def zrank(self, name, value): @@ -3215,7 +3384,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zrank """ - return self.execute_command('ZRANK', name, value) + return self.execute_command("ZRANK", name, value) def zrem(self, name, *values): """ @@ -3223,7 +3392,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zrem """ - return self.execute_command('ZREM', name, *values) + return self.execute_command("ZREM", name, *values) def zremrangebylex(self, name, min, max): """ @@ -3234,7 +3403,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zremrangebylex """ - return self.execute_command('ZREMRANGEBYLEX', name, min, max) + return self.execute_command("ZREMRANGEBYLEX", name, min, max) def zremrangebyrank(self, name, min, max): """ @@ -3245,7 +3414,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zremrangebyrank """ - return self.execute_command('ZREMRANGEBYRANK', name, min, max) + return self.execute_command("ZREMRANGEBYRANK", name, min, max) def zremrangebyscore(self, name, min, max): """ @@ -3254,7 +3423,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zremrangebyscore """ - return self.execute_command('ZREMRANGEBYSCORE', name, min, max) + return self.execute_command("ZREMRANGEBYSCORE", name, min, max) def zrevrank(self, name, value): """ @@ -3263,7 +3432,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zrevrank """ - return self.execute_command('ZREVRANK', name, value) + return self.execute_command("ZREVRANK", name, value) def zscore(self, name, value): """ @@ -3271,7 +3440,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zscore """ - return self.execute_command('ZSCORE', name, value) + return self.execute_command("ZSCORE", name, value) def zunion(self, keys, aggregate=None, withscores=False): """ @@ -3282,8 +3451,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zunion """ - return self._zaggregate('ZUNION', None, keys, aggregate, - withscores=withscores) + return self._zaggregate("ZUNION", None, keys, aggregate, withscores=withscores) def zunionstore(self, dest, keys, aggregate=None): """ @@ -3293,7 +3461,7 @@ class SortedSetCommands: For more information check https://redis.io/commands/zunionstore """ - return self._zaggregate('ZUNIONSTORE', dest, keys, aggregate) + return self._zaggregate("ZUNIONSTORE", dest, keys, aggregate) def zmscore(self, key, members): """ @@ -3307,12 +3475,11 @@ class SortedSetCommands: For more information check https://redis.io/commands/zmscore """ if not members: - raise DataError('ZMSCORE members must be a non-empty list') + raise DataError("ZMSCORE members must be a non-empty list") pieces = [key] + members - return self.execute_command('ZMSCORE', *pieces) + return self.execute_command("ZMSCORE", *pieces) - def _zaggregate(self, command, dest, keys, aggregate=None, - **options): + def _zaggregate(self, command, dest, keys, aggregate=None, **options): pieces = [command] if dest is not None: pieces.append(dest) @@ -3323,16 +3490,16 @@ class SortedSetCommands: weights = None pieces.extend(keys) if weights: - pieces.append(b'WEIGHTS') + pieces.append(b"WEIGHTS") pieces.extend(weights) if aggregate: - if aggregate.upper() in ['SUM', 'MIN', 'MAX']: - pieces.append(b'AGGREGATE') + if aggregate.upper() in ["SUM", "MIN", "MAX"]: + pieces.append(b"AGGREGATE") pieces.append(aggregate) else: raise DataError("aggregate can be sum, min or max.") - if options.get('withscores', False): - pieces.append(b'WITHSCORES') + if options.get("withscores", False): + pieces.append(b"WITHSCORES") return self.execute_command(*pieces, **options) @@ -3341,13 +3508,14 @@ class HyperlogCommands: Redis commands of HyperLogLogs data type. see: https://redis.io/topics/data-types-intro#hyperloglogs """ + def pfadd(self, name, *values): """ Adds the specified elements to the specified HyperLogLog. For more information check https://redis.io/commands/pfadd """ - return self.execute_command('PFADD', name, *values) + return self.execute_command("PFADD", name, *values) def pfcount(self, *sources): """ @@ -3356,7 +3524,7 @@ class HyperlogCommands: For more information check https://redis.io/commands/pfcount """ - return self.execute_command('PFCOUNT', *sources) + return self.execute_command("PFCOUNT", *sources) def pfmerge(self, dest, *sources): """ @@ -3364,7 +3532,7 @@ class HyperlogCommands: For more information check https://redis.io/commands/pfmerge """ - return self.execute_command('PFMERGE', dest, *sources) + return self.execute_command("PFMERGE", dest, *sources) class HashCommands: @@ -3372,13 +3540,14 @@ class HashCommands: Redis commands for Hash data type. see: https://redis.io/topics/data-types-intro#redis-hashes """ + def hdel(self, name, *keys): """ Delete ``keys`` from hash ``name`` For more information check https://redis.io/commands/hdel """ - return self.execute_command('HDEL', name, *keys) + return self.execute_command("HDEL", name, *keys) def hexists(self, name, key): """ @@ -3386,7 +3555,7 @@ class HashCommands: For more information check https://redis.io/commands/hexists """ - return self.execute_command('HEXISTS', name, key) + return self.execute_command("HEXISTS", name, key) def hget(self, name, key): """ @@ -3394,7 +3563,7 @@ class HashCommands: For more information check https://redis.io/commands/hget """ - return self.execute_command('HGET', name, key) + return self.execute_command("HGET", name, key) def hgetall(self, name): """ @@ -3402,7 +3571,7 @@ class HashCommands: For more information check https://redis.io/commands/hgetall """ - return self.execute_command('HGETALL', name) + return self.execute_command("HGETALL", name) def hincrby(self, name, key, amount=1): """ @@ -3410,7 +3579,7 @@ class HashCommands: For more information check https://redis.io/commands/hincrby """ - return self.execute_command('HINCRBY', name, key, amount) + return self.execute_command("HINCRBY", name, key, amount) def hincrbyfloat(self, name, key, amount=1.0): """ @@ -3418,7 +3587,7 @@ class HashCommands: For more information check https://redis.io/commands/hincrbyfloat """ - return self.execute_command('HINCRBYFLOAT', name, key, amount) + return self.execute_command("HINCRBYFLOAT", name, key, amount) def hkeys(self, name): """ @@ -3426,7 +3595,7 @@ class HashCommands: For more information check https://redis.io/commands/hkeys """ - return self.execute_command('HKEYS', name) + return self.execute_command("HKEYS", name) def hlen(self, name): """ @@ -3434,7 +3603,7 @@ class HashCommands: For more information check https://redis.io/commands/hlen """ - return self.execute_command('HLEN', name) + return self.execute_command("HLEN", name) def hset(self, name, key=None, value=None, mapping=None): """ @@ -3454,7 +3623,7 @@ class HashCommands: for pair in mapping.items(): items.extend(pair) - return self.execute_command('HSET', name, *items) + return self.execute_command("HSET", name, *items) def hsetnx(self, name, key, value): """ @@ -3463,7 +3632,7 @@ class HashCommands: For more information check https://redis.io/commands/hsetnx """ - return self.execute_command('HSETNX', name, key, value) + return self.execute_command("HSETNX", name, key, value) def hmset(self, name, mapping): """ @@ -3473,8 +3642,8 @@ class HashCommands: For more information check https://redis.io/commands/hmset """ warnings.warn( - f'{self.__class__.__name__}.hmset() is deprecated. ' - f'Use {self.__class__.__name__}.hset() instead.', + f"{self.__class__.__name__}.hmset() is deprecated. " + f"Use {self.__class__.__name__}.hset() instead.", DeprecationWarning, stacklevel=2, ) @@ -3483,7 +3652,7 @@ class HashCommands: items = [] for pair in mapping.items(): items.extend(pair) - return self.execute_command('HMSET', name, *items) + return self.execute_command("HMSET", name, *items) def hmget(self, name, keys, *args): """ @@ -3492,7 +3661,7 @@ class HashCommands: For more information check https://redis.io/commands/hmget """ args = list_or_args(keys, args) - return self.execute_command('HMGET', name, *args) + return self.execute_command("HMGET", name, *args) def hvals(self, name): """ @@ -3500,7 +3669,7 @@ class HashCommands: For more information check https://redis.io/commands/hvals """ - return self.execute_command('HVALS', name) + return self.execute_command("HVALS", name) def hstrlen(self, name, key): """ @@ -3509,7 +3678,7 @@ class HashCommands: For more information check https://redis.io/commands/hstrlen """ - return self.execute_command('HSTRLEN', name, key) + return self.execute_command("HSTRLEN", name, key) class PubSubCommands: @@ -3517,39 +3686,40 @@ class PubSubCommands: Redis PubSub commands. see https://redis.io/topics/pubsub """ - def publish(self, channel, message): + + def publish(self, channel, message, **kwargs): """ Publish ``message`` on ``channel``. Returns the number of subscribers the message was delivered to. For more information check https://redis.io/commands/publish """ - return self.execute_command('PUBLISH', channel, message) + return self.execute_command("PUBLISH", channel, message, **kwargs) - def pubsub_channels(self, pattern='*'): + def pubsub_channels(self, pattern="*", **kwargs): """ Return a list of channels that have at least one subscriber For more information check https://redis.io/commands/pubsub-channels """ - return self.execute_command('PUBSUB CHANNELS', pattern) + return self.execute_command("PUBSUB CHANNELS", pattern, **kwargs) - def pubsub_numpat(self): + def pubsub_numpat(self, **kwargs): """ Returns the number of subscriptions to patterns For more information check https://redis.io/commands/pubsub-numpat """ - return self.execute_command('PUBSUB NUMPAT') + return self.execute_command("PUBSUB NUMPAT", **kwargs) - def pubsub_numsub(self, *args): + def pubsub_numsub(self, *args, **kwargs): """ Return a list of (channel, number of subscribers) tuples for each channel given in ``*args`` For more information check https://redis.io/commands/pubsub-numsub """ - return self.execute_command('PUBSUB NUMSUB', *args) + return self.execute_command("PUBSUB NUMSUB", *args, **kwargs) class ScriptCommands: @@ -3557,6 +3727,7 @@ class ScriptCommands: Redis Lua script commands. see: https://redis.com/ebook/part-3-next-steps/chapter-11-scripting-redis-with-lua/ """ + def eval(self, script, numkeys, *keys_and_args): """ Execute the Lua ``script``, specifying the ``numkeys`` the script @@ -3568,7 +3739,7 @@ class ScriptCommands: For more information check https://redis.io/commands/eval """ - return self.execute_command('EVAL', script, numkeys, *keys_and_args) + return self.execute_command("EVAL", script, numkeys, *keys_and_args) def evalsha(self, sha, numkeys, *keys_and_args): """ @@ -3582,7 +3753,7 @@ class ScriptCommands: For more information check https://redis.io/commands/evalsha """ - return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args) + return self.execute_command("EVALSHA", sha, numkeys, *keys_and_args) def script_exists(self, *args): """ @@ -3592,7 +3763,7 @@ class ScriptCommands: For more information check https://redis.io/commands/script-exists """ - return self.execute_command('SCRIPT EXISTS', *args) + return self.execute_command("SCRIPT EXISTS", *args) def script_debug(self, *args): raise NotImplementedError( @@ -3608,14 +3779,16 @@ class ScriptCommands: # Redis pre 6 had no sync_type. if sync_type not in ["SYNC", "ASYNC", None]: - raise DataError("SCRIPT FLUSH defaults to SYNC in redis > 6.2, or " - "accepts SYNC/ASYNC. For older versions, " - "of redis leave as None.") + raise DataError( + "SCRIPT FLUSH defaults to SYNC in redis > 6.2, or " + "accepts SYNC/ASYNC. For older versions, " + "of redis leave as None." + ) if sync_type is None: pieces = [] else: pieces = [sync_type] - return self.execute_command('SCRIPT FLUSH', *pieces) + return self.execute_command("SCRIPT FLUSH", *pieces) def script_kill(self): """ @@ -3623,7 +3796,7 @@ class ScriptCommands: For more information check https://redis.io/commands/script-kill """ - return self.execute_command('SCRIPT KILL') + return self.execute_command("SCRIPT KILL") def script_load(self, script): """ @@ -3631,7 +3804,7 @@ class ScriptCommands: For more information check https://redis.io/commands/script-load """ - return self.execute_command('SCRIPT LOAD', script) + return self.execute_command("SCRIPT LOAD", script) def register_script(self, script): """ @@ -3648,6 +3821,7 @@ class GeoCommands: Redis Geospatial commands. see: https://redis.com/redis-best-practices/indexing-patterns/geospatial/ """ + def geoadd(self, name, values, nx=False, xx=False, ch=False): """ Add the specified geospatial items to the specified key identified @@ -3672,17 +3846,16 @@ class GeoCommands: if nx and xx: raise DataError("GEOADD allows either 'nx' or 'xx', not both") if len(values) % 3 != 0: - raise DataError("GEOADD requires places with lon, lat and name" - " values") + raise DataError("GEOADD requires places with lon, lat and name" " values") pieces = [name] if nx: - pieces.append('NX') + pieces.append("NX") if xx: - pieces.append('XX') + pieces.append("XX") if ch: - pieces.append('CH') + pieces.append("CH") pieces.extend(values) - return self.execute_command('GEOADD', *pieces) + return self.execute_command("GEOADD", *pieces) def geodist(self, name, place1, place2, unit=None): """ @@ -3694,11 +3867,11 @@ class GeoCommands: For more information check https://redis.io/commands/geodist """ pieces = [name, place1, place2] - if unit and unit not in ('m', 'km', 'mi', 'ft'): + if unit and unit not in ("m", "km", "mi", "ft"): raise DataError("GEODIST invalid unit") elif unit: pieces.append(unit) - return self.execute_command('GEODIST', *pieces) + return self.execute_command("GEODIST", *pieces) def geohash(self, name, *values): """ @@ -3707,7 +3880,7 @@ class GeoCommands: For more information check https://redis.io/commands/geohash """ - return self.execute_command('GEOHASH', name, *values) + return self.execute_command("GEOHASH", name, *values) def geopos(self, name, *values): """ @@ -3717,11 +3890,24 @@ class GeoCommands: For more information check https://redis.io/commands/geopos """ - return self.execute_command('GEOPOS', name, *values) - - def georadius(self, name, longitude, latitude, radius, unit=None, - withdist=False, withcoord=False, withhash=False, count=None, - sort=None, store=None, store_dist=None, any=False): + return self.execute_command("GEOPOS", name, *values) + + def georadius( + self, + name, + longitude, + latitude, + radius, + unit=None, + withdist=False, + withcoord=False, + withhash=False, + count=None, + sort=None, + store=None, + store_dist=None, + any=False, + ): """ Return the members of the specified key identified by the ``name`` argument which are within the borders of the area specified @@ -3752,17 +3938,38 @@ class GeoCommands: For more information check https://redis.io/commands/georadius """ - return self._georadiusgeneric('GEORADIUS', - name, longitude, latitude, radius, - unit=unit, withdist=withdist, - withcoord=withcoord, withhash=withhash, - count=count, sort=sort, store=store, - store_dist=store_dist, any=any) + return self._georadiusgeneric( + "GEORADIUS", + name, + longitude, + latitude, + radius, + unit=unit, + withdist=withdist, + withcoord=withcoord, + withhash=withhash, + count=count, + sort=sort, + store=store, + store_dist=store_dist, + any=any, + ) - def georadiusbymember(self, name, member, radius, unit=None, - withdist=False, withcoord=False, withhash=False, - count=None, sort=None, store=None, store_dist=None, - any=False): + def georadiusbymember( + self, + name, + member, + radius, + unit=None, + withdist=False, + withcoord=False, + withhash=False, + count=None, + sort=None, + store=None, + store_dist=None, + any=False, + ): """ This command is exactly like ``georadius`` with the sole difference that instead of taking, as the center of the area to query, a longitude @@ -3771,61 +3978,85 @@ class GeoCommands: For more information check https://redis.io/commands/georadiusbymember """ - return self._georadiusgeneric('GEORADIUSBYMEMBER', - name, member, radius, unit=unit, - withdist=withdist, withcoord=withcoord, - withhash=withhash, count=count, - sort=sort, store=store, - store_dist=store_dist, any=any) + return self._georadiusgeneric( + "GEORADIUSBYMEMBER", + name, + member, + radius, + unit=unit, + withdist=withdist, + withcoord=withcoord, + withhash=withhash, + count=count, + sort=sort, + store=store, + store_dist=store_dist, + any=any, + ) def _georadiusgeneric(self, command, *args, **kwargs): pieces = list(args) - if kwargs['unit'] and kwargs['unit'] not in ('m', 'km', 'mi', 'ft'): + if kwargs["unit"] and kwargs["unit"] not in ("m", "km", "mi", "ft"): raise DataError("GEORADIUS invalid unit") - elif kwargs['unit']: - pieces.append(kwargs['unit']) + elif kwargs["unit"]: + pieces.append(kwargs["unit"]) else: - pieces.append('m',) + pieces.append( + "m", + ) - if kwargs['any'] and kwargs['count'] is None: + if kwargs["any"] and kwargs["count"] is None: raise DataError("``any`` can't be provided without ``count``") for arg_name, byte_repr in ( - ('withdist', 'WITHDIST'), - ('withcoord', 'WITHCOORD'), - ('withhash', 'WITHHASH')): + ("withdist", "WITHDIST"), + ("withcoord", "WITHCOORD"), + ("withhash", "WITHHASH"), + ): if kwargs[arg_name]: pieces.append(byte_repr) - if kwargs['count'] is not None: - pieces.extend(['COUNT', kwargs['count']]) - if kwargs['any']: - pieces.append('ANY') + if kwargs["count"] is not None: + pieces.extend(["COUNT", kwargs["count"]]) + if kwargs["any"]: + pieces.append("ANY") - if kwargs['sort']: - if kwargs['sort'] == 'ASC': - pieces.append('ASC') - elif kwargs['sort'] == 'DESC': - pieces.append('DESC') + if kwargs["sort"]: + if kwargs["sort"] == "ASC": + pieces.append("ASC") + elif kwargs["sort"] == "DESC": + pieces.append("DESC") else: raise DataError("GEORADIUS invalid sort") - if kwargs['store'] and kwargs['store_dist']: - raise DataError("GEORADIUS store and store_dist cant be set" - " together") + if kwargs["store"] and kwargs["store_dist"]: + raise DataError("GEORADIUS store and store_dist cant be set" " together") - if kwargs['store']: - pieces.extend([b'STORE', kwargs['store']]) + if kwargs["store"]: + pieces.extend([b"STORE", kwargs["store"]]) - if kwargs['store_dist']: - pieces.extend([b'STOREDIST', kwargs['store_dist']]) + if kwargs["store_dist"]: + pieces.extend([b"STOREDIST", kwargs["store_dist"]]) return self.execute_command(command, *pieces, **kwargs) - def geosearch(self, name, member=None, longitude=None, latitude=None, - unit='m', radius=None, width=None, height=None, sort=None, - count=None, any=False, withcoord=False, - withdist=False, withhash=False): + def geosearch( + self, + name, + member=None, + longitude=None, + latitude=None, + unit="m", + radius=None, + width=None, + height=None, + sort=None, + count=None, + any=False, + withcoord=False, + withdist=False, + withhash=False, + ): """ Return the members of specified key identified by the ``name`` argument, which are within the borders of the @@ -3861,19 +4092,42 @@ class GeoCommands: For more information check https://redis.io/commands/geosearch """ - return self._geosearchgeneric('GEOSEARCH', - name, member=member, longitude=longitude, - latitude=latitude, unit=unit, - radius=radius, width=width, - height=height, sort=sort, count=count, - any=any, withcoord=withcoord, - withdist=withdist, withhash=withhash, - store=None, store_dist=None) + return self._geosearchgeneric( + "GEOSEARCH", + name, + member=member, + longitude=longitude, + latitude=latitude, + unit=unit, + radius=radius, + width=width, + height=height, + sort=sort, + count=count, + any=any, + withcoord=withcoord, + withdist=withdist, + withhash=withhash, + store=None, + store_dist=None, + ) - def geosearchstore(self, dest, name, member=None, longitude=None, - latitude=None, unit='m', radius=None, width=None, - height=None, sort=None, count=None, any=False, - storedist=False): + def geosearchstore( + self, + dest, + name, + member=None, + longitude=None, + latitude=None, + unit="m", + radius=None, + width=None, + height=None, + sort=None, + count=None, + any=False, + storedist=False, + ): """ This command is like GEOSEARCH, but stores the result in ``dest``. By default, it stores the results in the destination @@ -3884,74 +4138,86 @@ class GeoCommands: For more information check https://redis.io/commands/geosearchstore """ - return self._geosearchgeneric('GEOSEARCHSTORE', - dest, name, member=member, - longitude=longitude, latitude=latitude, - unit=unit, radius=radius, width=width, - height=height, sort=sort, count=count, - any=any, withcoord=None, - withdist=None, withhash=None, - store=None, store_dist=storedist) + return self._geosearchgeneric( + "GEOSEARCHSTORE", + dest, + name, + member=member, + longitude=longitude, + latitude=latitude, + unit=unit, + radius=radius, + width=width, + height=height, + sort=sort, + count=count, + any=any, + withcoord=None, + withdist=None, + withhash=None, + store=None, + store_dist=storedist, + ) def _geosearchgeneric(self, command, *args, **kwargs): pieces = list(args) # FROMMEMBER or FROMLONLAT - if kwargs['member'] is None: - if kwargs['longitude'] is None or kwargs['latitude'] is None: - raise DataError("GEOSEARCH must have member or" - " longitude and latitude") - if kwargs['member']: - if kwargs['longitude'] or kwargs['latitude']: - raise DataError("GEOSEARCH member and longitude or latitude" - " cant be set together") - pieces.extend([b'FROMMEMBER', kwargs['member']]) - if kwargs['longitude'] and kwargs['latitude']: - pieces.extend([b'FROMLONLAT', - kwargs['longitude'], kwargs['latitude']]) + if kwargs["member"] is None: + if kwargs["longitude"] is None or kwargs["latitude"] is None: + raise DataError( + "GEOSEARCH must have member or" " longitude and latitude" + ) + if kwargs["member"]: + if kwargs["longitude"] or kwargs["latitude"]: + raise DataError( + "GEOSEARCH member and longitude or latitude" " cant be set together" + ) + pieces.extend([b"FROMMEMBER", kwargs["member"]]) + if kwargs["longitude"] and kwargs["latitude"]: + pieces.extend([b"FROMLONLAT", kwargs["longitude"], kwargs["latitude"]]) # BYRADIUS or BYBOX - if kwargs['radius'] is None: - if kwargs['width'] is None or kwargs['height'] is None: - raise DataError("GEOSEARCH must have radius or" - " width and height") - if kwargs['unit'] is None: + if kwargs["radius"] is None: + if kwargs["width"] is None or kwargs["height"] is None: + raise DataError("GEOSEARCH must have radius or" " width and height") + if kwargs["unit"] is None: raise DataError("GEOSEARCH must have unit") - if kwargs['unit'].lower() not in ('m', 'km', 'mi', 'ft'): + if kwargs["unit"].lower() not in ("m", "km", "mi", "ft"): raise DataError("GEOSEARCH invalid unit") - if kwargs['radius']: - if kwargs['width'] or kwargs['height']: - raise DataError("GEOSEARCH radius and width or height" - " cant be set together") - pieces.extend([b'BYRADIUS', kwargs['radius'], kwargs['unit']]) - if kwargs['width'] and kwargs['height']: - pieces.extend([b'BYBOX', - kwargs['width'], kwargs['height'], kwargs['unit']]) + if kwargs["radius"]: + if kwargs["width"] or kwargs["height"]: + raise DataError( + "GEOSEARCH radius and width or height" " cant be set together" + ) + pieces.extend([b"BYRADIUS", kwargs["radius"], kwargs["unit"]]) + if kwargs["width"] and kwargs["height"]: + pieces.extend([b"BYBOX", kwargs["width"], kwargs["height"], kwargs["unit"]]) # sort - if kwargs['sort']: - if kwargs['sort'].upper() == 'ASC': - pieces.append(b'ASC') - elif kwargs['sort'].upper() == 'DESC': - pieces.append(b'DESC') + if kwargs["sort"]: + if kwargs["sort"].upper() == "ASC": + pieces.append(b"ASC") + elif kwargs["sort"].upper() == "DESC": + pieces.append(b"DESC") else: raise DataError("GEOSEARCH invalid sort") # count any - if kwargs['count']: - pieces.extend([b'COUNT', kwargs['count']]) - if kwargs['any']: - pieces.append(b'ANY') - elif kwargs['any']: - raise DataError("GEOSEARCH ``any`` can't be provided " - "without count") + if kwargs["count"]: + pieces.extend([b"COUNT", kwargs["count"]]) + if kwargs["any"]: + pieces.append(b"ANY") + elif kwargs["any"]: + raise DataError("GEOSEARCH ``any`` can't be provided " "without count") # other properties for arg_name, byte_repr in ( - ('withdist', b'WITHDIST'), - ('withcoord', b'WITHCOORD'), - ('withhash', b'WITHHASH'), - ('store_dist', b'STOREDIST')): + ("withdist", b"WITHDIST"), + ("withcoord", b"WITHCOORD"), + ("withhash", b"WITHHASH"), + ("store_dist", b"STOREDIST"), + ): if kwargs[arg_name]: pieces.append(byte_repr) @@ -3963,6 +4229,7 @@ class ModuleCommands: Redis Module commands. see: https://redis.io/topics/modules-intro """ + def module_load(self, path, *args): """ Loads the module from ``path``. @@ -3971,7 +4238,7 @@ class ModuleCommands: For more information check https://redis.io/commands/module-load """ - return self.execute_command('MODULE LOAD', path, *args) + return self.execute_command("MODULE LOAD", path, *args) def module_unload(self, name): """ @@ -3980,7 +4247,7 @@ class ModuleCommands: For more information check https://redis.io/commands/module-unload """ - return self.execute_command('MODULE UNLOAD', name) + return self.execute_command("MODULE UNLOAD", name) def module_list(self): """ @@ -3989,7 +4256,7 @@ class ModuleCommands: For more information check https://redis.io/commands/module-list """ - return self.execute_command('MODULE LIST') + return self.execute_command("MODULE LIST") def command_info(self): raise NotImplementedError( @@ -3997,13 +4264,13 @@ class ModuleCommands: ) def command_count(self): - return self.execute_command('COMMAND COUNT') + return self.execute_command("COMMAND COUNT") def command_getkeys(self, *args): - return self.execute_command('COMMAND GETKEYS', *args) + return self.execute_command("COMMAND GETKEYS", *args) def command(self): - return self.execute_command('COMMAND') + return self.execute_command("COMMAND") class Script: @@ -4030,6 +4297,7 @@ class Script: args = tuple(keys) + tuple(args) # make sure the Redis server knows about the script from redis.client import Pipeline + if isinstance(client, Pipeline): # Make sure the pipeline can register the script before executing. client.scripts.add(self) @@ -4047,6 +4315,7 @@ class BitFieldOperation: """ Command builder for BITFIELD commands. """ + def __init__(self, client, key, default_overflow=None): self.client = client self.key = key @@ -4058,7 +4327,7 @@ class BitFieldOperation: Reset the state of the instance to when it was constructed """ self.operations = [] - self._last_overflow = 'WRAP' + self._last_overflow = "WRAP" self.overflow(self._default_overflow or self._last_overflow) def overflow(self, overflow): @@ -4071,7 +4340,7 @@ class BitFieldOperation: overflow = overflow.upper() if overflow != self._last_overflow: self._last_overflow = overflow - self.operations.append(('OVERFLOW', overflow)) + self.operations.append(("OVERFLOW", overflow)) return self def incrby(self, fmt, offset, increment, overflow=None): @@ -4091,7 +4360,7 @@ class BitFieldOperation: if overflow is not None: self.overflow(overflow) - self.operations.append(('INCRBY', fmt, offset, increment)) + self.operations.append(("INCRBY", fmt, offset, increment)) return self def get(self, fmt, offset): @@ -4104,7 +4373,7 @@ class BitFieldOperation: fmt='u8', offset='#2', the offset will be 16. :returns: a :py:class:`BitFieldOperation` instance. """ - self.operations.append(('GET', fmt, offset)) + self.operations.append(("GET", fmt, offset)) return self def set(self, fmt, offset, value): @@ -4118,12 +4387,12 @@ class BitFieldOperation: :param int value: value to set at the given position. :returns: a :py:class:`BitFieldOperation` instance. """ - self.operations.append(('SET', fmt, offset, value)) + self.operations.append(("SET", fmt, offset, value)) return self @property def command(self): - cmd = ['BITFIELD', self.key] + cmd = ["BITFIELD", self.key] for ops in self.operations: cmd.extend(ops) return cmd @@ -4140,19 +4409,57 @@ class BitFieldOperation: return self.client.execute_command(*command) -class DataAccessCommands(BasicKeyCommands, ListCommands, - ScanCommands, SetCommands, StreamCommands, - SortedSetCommands, - HyperlogCommands, HashCommands, GeoCommands, - ): +class ClusterCommands: + """ + Class for Redis Cluster commands + """ + + def cluster(self, cluster_arg, *args, **kwargs): + return self.execute_command(f"CLUSTER {cluster_arg.upper()}", *args, **kwargs) + + def readwrite(self, **kwargs): + """ + Disables read queries for a connection to a Redis Cluster slave node. + + For more information check https://redis.io/commands/readwrite + """ + return self.execute_command("READWRITE", **kwargs) + + def readonly(self, **kwargs): + """ + Enables read queries for a connection to a Redis Cluster replica node. + + For more information check https://redis.io/commands/readonly + """ + return self.execute_command("READONLY", **kwargs) + + +class DataAccessCommands( + BasicKeyCommands, + HyperlogCommands, + HashCommands, + GeoCommands, + ListCommands, + ScanCommands, + SetCommands, + StreamCommands, + SortedSetCommands, +): """ A class containing all of the implemented data access redis commands. This class is to be used as a mixin. """ -class CoreCommands(ACLCommands, DataAccessCommands, ManagementCommands, - ModuleCommands, PubSubCommands, ScriptCommands): +class CoreCommands( + ACLCommands, + ClusterCommands, + DataAccessCommands, + ManagementCommands, + ModuleCommands, + PubSubCommands, + ScriptCommands, +): """ A class containing all of the implemented redis commands. This class is to be used as a mixin. diff --git a/redis/commands/graph/__init__.py b/redis/commands/graph/__init__.py new file mode 100644 index 0000000..7b9972a --- /dev/null +++ b/redis/commands/graph/__init__.py @@ -0,0 +1,162 @@ +from ..helpers import quote_string, random_string, stringify_param_value +from .commands import GraphCommands +from .edge import Edge # noqa +from .node import Node # noqa +from .path import Path # noqa + + +class Graph(GraphCommands): + """ + Graph, collection of nodes and edges. + """ + + def __init__(self, client, name=random_string()): + """ + Create a new graph. + """ + self.NAME = name # Graph key + self.client = client + self.execute_command = client.execute_command + + self.nodes = {} + self.edges = [] + self._labels = [] # List of node labels. + self._properties = [] # List of properties. + self._relationshipTypes = [] # List of relation types. + self.version = 0 # Graph version + + @property + def name(self): + return self.NAME + + def _clear_schema(self): + self._labels = [] + self._properties = [] + self._relationshipTypes = [] + + def _refresh_schema(self): + self._clear_schema() + self._refresh_labels() + self._refresh_relations() + self._refresh_attributes() + + def _refresh_labels(self): + lbls = self.labels() + + # Unpack data. + self._labels = [None] * len(lbls) + for i, l in enumerate(lbls): + self._labels[i] = l[0] + + def _refresh_relations(self): + rels = self.relationshipTypes() + + # Unpack data. + self._relationshipTypes = [None] * len(rels) + for i, r in enumerate(rels): + self._relationshipTypes[i] = r[0] + + def _refresh_attributes(self): + props = self.propertyKeys() + + # Unpack data. + self._properties = [None] * len(props) + for i, p in enumerate(props): + self._properties[i] = p[0] + + def get_label(self, idx): + """ + Returns a label by it's index + + Args: + + idx: + The index of the label + """ + try: + label = self._labels[idx] + except IndexError: + # Refresh labels. + self._refresh_labels() + label = self._labels[idx] + return label + + def get_relation(self, idx): + """ + Returns a relationship type by it's index + + Args: + + idx: + The index of the relation + """ + try: + relationship_type = self._relationshipTypes[idx] + except IndexError: + # Refresh relationship types. + self._refresh_relations() + relationship_type = self._relationshipTypes[idx] + return relationship_type + + def get_property(self, idx): + """ + Returns a property by it's index + + Args: + + idx: + The index of the property + """ + try: + propertie = self._properties[idx] + except IndexError: + # Refresh properties. + self._refresh_attributes() + propertie = self._properties[idx] + return propertie + + def add_node(self, node): + """ + Adds a node to the graph. + """ + if node.alias is None: + node.alias = random_string() + self.nodes[node.alias] = node + + def add_edge(self, edge): + """ + Adds an edge to the graph. + """ + if not (self.nodes[edge.src_node.alias] and self.nodes[edge.dest_node.alias]): + raise AssertionError("Both edge's end must be in the graph") + + self.edges.append(edge) + + def _build_params_header(self, params): + if not isinstance(params, dict): + raise TypeError("'params' must be a dict") + # Header starts with "CYPHER" + params_header = "CYPHER " + for key, value in params.items(): + params_header += str(key) + "=" + stringify_param_value(value) + " " + return params_header + + # Procedures. + def call_procedure(self, procedure, *args, read_only=False, **kwagrs): + args = [quote_string(arg) for arg in args] + q = f"CALL {procedure}({','.join(args)})" + + y = kwagrs.get("y", None) + if y: + q += f" YIELD {','.join(y)}" + + return self.query(q, read_only=read_only) + + def labels(self): + return self.call_procedure("db.labels", read_only=True).result_set + + def relationshipTypes(self): + return self.call_procedure("db.relationshipTypes", read_only=True).result_set + + def propertyKeys(self): + return self.call_procedure("db.propertyKeys", read_only=True).result_set diff --git a/redis/commands/graph/commands.py b/redis/commands/graph/commands.py new file mode 100644 index 0000000..f0c1d68 --- /dev/null +++ b/redis/commands/graph/commands.py @@ -0,0 +1,200 @@ +from redis import DataError +from redis.exceptions import ResponseError + +from .exceptions import VersionMismatchException +from .query_result import QueryResult + + +class GraphCommands: + def commit(self): + """ + Create entire graph. + For more information see `CREATE <https://oss.redis.com/redisgraph/master/commands/#create>`_. # noqa + """ + if len(self.nodes) == 0 and len(self.edges) == 0: + return None + + query = "CREATE " + for _, node in self.nodes.items(): + query += str(node) + "," + + query += ",".join([str(edge) for edge in self.edges]) + + # Discard leading comma. + if query[-1] == ",": + query = query[:-1] + + return self.query(query) + + def query(self, q, params=None, timeout=None, read_only=False, profile=False): + """ + Executes a query against the graph. + For more information see `GRAPH.QUERY <https://oss.redis.com/redisgraph/master/commands/#graphquery>`_. # noqa + + Args: + + ------- + q : + The query. + params : dict + Query parameters. + timeout : int + Maximum runtime for read queries in milliseconds. + read_only : bool + Executes a readonly query if set to True. + profile : bool + Return details on results produced by and time + spent in each operation. + """ + + # maintain original 'q' + query = q + + # handle query parameters + if params is not None: + query = self._build_params_header(params) + query + + # construct query command + # ask for compact result-set format + # specify known graph version + if profile: + cmd = "GRAPH.PROFILE" + else: + cmd = "GRAPH.RO_QUERY" if read_only else "GRAPH.QUERY" + command = [cmd, self.name, query, "--compact"] + + # include timeout is specified + if timeout: + if not isinstance(timeout, int): + raise Exception("Timeout argument must be a positive integer") + command += ["timeout", timeout] + + # issue query + try: + response = self.execute_command(*command) + return QueryResult(self, response, profile) + except ResponseError as e: + if "wrong number of arguments" in str(e): + print( + "Note: RedisGraph Python requires server version 2.2.8 or above" + ) # noqa + if "unknown command" in str(e) and read_only: + # `GRAPH.RO_QUERY` is unavailable in older versions. + return self.query(q, params, timeout, read_only=False) + raise e + except VersionMismatchException as e: + # client view over the graph schema is out of sync + # set client version and refresh local schema + self.version = e.version + self._refresh_schema() + # re-issue query + return self.query(q, params, timeout, read_only) + + def merge(self, pattern): + """ + Merge pattern. + For more information see `MERGE <https://oss.redis.com/redisgraph/master/commands/#merge>`_. # noqa + """ + query = "MERGE " + query += str(pattern) + + return self.query(query) + + def delete(self): + """ + Deletes graph. + For more information see `DELETE <https://oss.redis.com/redisgraph/master/commands/#delete>`_. # noqa + """ + self._clear_schema() + return self.execute_command("GRAPH.DELETE", self.name) + + # declared here, to override the built in redis.db.flush() + def flush(self): + """ + Commit the graph and reset the edges and the nodes to zero length. + """ + self.commit() + self.nodes = {} + self.edges = [] + + def explain(self, query, params=None): + """ + Get the execution plan for given query, + Returns an array of operations. + For more information see `GRAPH.EXPLAIN <https://oss.redis.com/redisgraph/master/commands/#graphexplain>`_. # noqa + + Args: + + ------- + query: + The query that will be executed. + params: dict + Query parameters. + """ + if params is not None: + query = self._build_params_header(params) + query + + plan = self.execute_command("GRAPH.EXPLAIN", self.name, query) + return "\n".join(plan) + + def bulk(self, **kwargs): + """Internal only. Not supported.""" + raise NotImplementedError( + "GRAPH.BULK is internal only. " + "Use https://github.com/redisgraph/redisgraph-bulk-loader." + ) + + def profile(self, query): + """ + Execute a query and produce an execution plan augmented with metrics + for each operation's execution. Return a string representation of a + query execution plan, with details on results produced by and time + spent in each operation. + For more information see `GRAPH.PROFILE <https://oss.redis.com/redisgraph/master/commands/#graphprofile>`_. # noqa + """ + return self.query(query, profile=True) + + def slowlog(self): + """ + Get a list containing up to 10 of the slowest queries issued + against the given graph ID. + For more information see `GRAPH.SLOWLOG <https://oss.redis.com/redisgraph/master/commands/#graphslowlog>`_. # noqa + + Each item in the list has the following structure: + 1. A unix timestamp at which the log entry was processed. + 2. The issued command. + 3. The issued query. + 4. The amount of time needed for its execution, in milliseconds. + """ + return self.execute_command("GRAPH.SLOWLOG", self.name) + + def config(self, name, value=None, set=False): + """ + Retrieve or update a RedisGraph configuration. + For more information see `GRAPH.CONFIG <https://oss.redis.com/redisgraph/master/commands/#graphconfig>`_. # noqa + + Args: + + name : str + The name of the configuration + value : + The value we want to ser (can be used only when `set` is on) + set : bool + Turn on to set a configuration. Default behavior is get. + """ + params = ["SET" if set else "GET", name] + if value is not None: + if set: + params.append(value) + else: + raise DataError( + "``value`` can be provided only when ``set`` is True" + ) # noqa + return self.execute_command("GRAPH.CONFIG", *params) + + def list_keys(self): + """ + Lists all graph keys in the keyspace. + For more information see `GRAPH.LIST <https://oss.redis.com/redisgraph/master/commands/#graphlist>`_. # noqa + """ + return self.execute_command("GRAPH.LIST") diff --git a/redis/commands/graph/edge.py b/redis/commands/graph/edge.py new file mode 100644 index 0000000..b334293 --- /dev/null +++ b/redis/commands/graph/edge.py @@ -0,0 +1,87 @@ +from ..helpers import quote_string +from .node import Node + + +class Edge: + """ + An edge connecting two nodes. + """ + + def __init__(self, src_node, relation, dest_node, edge_id=None, properties=None): + """ + Create a new edge. + """ + if src_node is None or dest_node is None: + # NOTE(bors-42): It makes sense to change AssertionError to + # ValueError here + raise AssertionError("Both src_node & dest_node must be provided") + + self.id = edge_id + self.relation = relation or "" + self.properties = properties or {} + self.src_node = src_node + self.dest_node = dest_node + + def toString(self): + res = "" + if self.properties: + props = ",".join( + key + ":" + str(quote_string(val)) + for key, val in sorted(self.properties.items()) + ) + res += "{" + props + "}" + + return res + + def __str__(self): + # Source node. + if isinstance(self.src_node, Node): + res = str(self.src_node) + else: + res = "()" + + # Edge + res += "-[" + if self.relation: + res += ":" + self.relation + if self.properties: + props = ",".join( + key + ":" + str(quote_string(val)) + for key, val in sorted(self.properties.items()) + ) + res += "{" + props + "}" + res += "]->" + + # Dest node. + if isinstance(self.dest_node, Node): + res += str(self.dest_node) + else: + res += "()" + + return res + + def __eq__(self, rhs): + # Quick positive check, if both IDs are set. + if self.id is not None and rhs.id is not None and self.id == rhs.id: + return True + + # Source and destination nodes should match. + if self.src_node != rhs.src_node: + return False + + if self.dest_node != rhs.dest_node: + return False + + # Relation should match. + if self.relation != rhs.relation: + return False + + # Quick check for number of properties. + if len(self.properties) != len(rhs.properties): + return False + + # Compare properties. + if self.properties != rhs.properties: + return False + + return True diff --git a/redis/commands/graph/exceptions.py b/redis/commands/graph/exceptions.py new file mode 100644 index 0000000..4bbac10 --- /dev/null +++ b/redis/commands/graph/exceptions.py @@ -0,0 +1,3 @@ +class VersionMismatchException(Exception): + def __init__(self, version): + self.version = version diff --git a/redis/commands/graph/node.py b/redis/commands/graph/node.py new file mode 100644 index 0000000..47e4eeb --- /dev/null +++ b/redis/commands/graph/node.py @@ -0,0 +1,84 @@ +from ..helpers import quote_string + + +class Node: + """ + A node within the graph. + """ + + def __init__(self, node_id=None, alias=None, label=None, properties=None): + """ + Create a new node. + """ + self.id = node_id + self.alias = alias + if isinstance(label, list): + label = [inner_label for inner_label in label if inner_label != ""] + + if ( + label is None + or label == "" + or (isinstance(label, list) and len(label) == 0) + ): + self.label = None + self.labels = None + elif isinstance(label, str): + self.label = label + self.labels = [label] + elif isinstance(label, list) and all( + [isinstance(inner_label, str) for inner_label in label] + ): + self.label = label[0] + self.labels = label + else: + raise AssertionError( + "label should be either None, " "string or a list of strings" + ) + + self.properties = properties or {} + + def toString(self): + res = "" + if self.properties: + props = ",".join( + key + ":" + str(quote_string(val)) + for key, val in sorted(self.properties.items()) + ) + res += "{" + props + "}" + + return res + + def __str__(self): + res = "(" + if self.alias: + res += self.alias + if self.labels: + res += ":" + ":".join(self.labels) + if self.properties: + props = ",".join( + key + ":" + str(quote_string(val)) + for key, val in sorted(self.properties.items()) + ) + res += "{" + props + "}" + res += ")" + + return res + + def __eq__(self, rhs): + # Quick positive check, if both IDs are set. + if self.id is not None and rhs.id is not None and self.id != rhs.id: + return False + + # Label should match. + if self.label != rhs.label: + return False + + # Quick check for number of properties. + if len(self.properties) != len(rhs.properties): + return False + + # Compare properties. + if self.properties != rhs.properties: + return False + + return True diff --git a/redis/commands/graph/path.py b/redis/commands/graph/path.py new file mode 100644 index 0000000..6f2214a --- /dev/null +++ b/redis/commands/graph/path.py @@ -0,0 +1,74 @@ +from .edge import Edge +from .node import Node + + +class Path: + def __init__(self, nodes, edges): + if not (isinstance(nodes, list) and isinstance(edges, list)): + raise TypeError("nodes and edges must be list") + + self._nodes = nodes + self._edges = edges + self.append_type = Node + + @classmethod + def new_empty_path(cls): + return cls([], []) + + def nodes(self): + return self._nodes + + def edges(self): + return self._edges + + def get_node(self, index): + return self._nodes[index] + + def get_relationship(self, index): + return self._edges[index] + + def first_node(self): + return self._nodes[0] + + def last_node(self): + return self._nodes[-1] + + def edge_count(self): + return len(self._edges) + + def nodes_count(self): + return len(self._nodes) + + def add_node(self, node): + if not isinstance(node, self.append_type): + raise AssertionError("Add Edge before adding Node") + self._nodes.append(node) + self.append_type = Edge + return self + + def add_edge(self, edge): + if not isinstance(edge, self.append_type): + raise AssertionError("Add Node before adding Edge") + self._edges.append(edge) + self.append_type = Node + return self + + def __eq__(self, other): + return self.nodes() == other.nodes() and self.edges() == other.edges() + + def __str__(self): + res = "<" + edge_count = self.edge_count() + for i in range(0, edge_count): + node_id = self.get_node(i).id + res += "(" + str(node_id) + ")" + edge = self.get_relationship(i) + res += ( + "-[" + str(int(edge.id)) + "]->" + if edge.src_node == node_id + else "<-[" + str(int(edge.id)) + "]-" + ) + node_id = self.get_node(edge_count).id + res += "(" + str(node_id) + ")" + res += ">" + return res diff --git a/redis/commands/graph/query_result.py b/redis/commands/graph/query_result.py new file mode 100644 index 0000000..e9d9f4d --- /dev/null +++ b/redis/commands/graph/query_result.py @@ -0,0 +1,362 @@ +from collections import OrderedDict + +# from prettytable import PrettyTable +from redis import ResponseError + +from .edge import Edge +from .exceptions import VersionMismatchException +from .node import Node +from .path import Path + +LABELS_ADDED = "Labels added" +NODES_CREATED = "Nodes created" +NODES_DELETED = "Nodes deleted" +RELATIONSHIPS_DELETED = "Relationships deleted" +PROPERTIES_SET = "Properties set" +RELATIONSHIPS_CREATED = "Relationships created" +INDICES_CREATED = "Indices created" +INDICES_DELETED = "Indices deleted" +CACHED_EXECUTION = "Cached execution" +INTERNAL_EXECUTION_TIME = "internal execution time" + +STATS = [ + LABELS_ADDED, + NODES_CREATED, + PROPERTIES_SET, + RELATIONSHIPS_CREATED, + NODES_DELETED, + RELATIONSHIPS_DELETED, + INDICES_CREATED, + INDICES_DELETED, + CACHED_EXECUTION, + INTERNAL_EXECUTION_TIME, +] + + +class ResultSetColumnTypes: + COLUMN_UNKNOWN = 0 + COLUMN_SCALAR = 1 + COLUMN_NODE = 2 # Unused as of RedisGraph v2.1.0, retained for backwards compatibility. # noqa + COLUMN_RELATION = 3 # Unused as of RedisGraph v2.1.0, retained for backwards compatibility. # noqa + + +class ResultSetScalarTypes: + VALUE_UNKNOWN = 0 + VALUE_NULL = 1 + VALUE_STRING = 2 + VALUE_INTEGER = 3 + VALUE_BOOLEAN = 4 + VALUE_DOUBLE = 5 + VALUE_ARRAY = 6 + VALUE_EDGE = 7 + VALUE_NODE = 8 + VALUE_PATH = 9 + VALUE_MAP = 10 + VALUE_POINT = 11 + + +class QueryResult: + def __init__(self, graph, response, profile=False): + """ + A class that represents a result of the query operation. + + Args: + + graph: + The graph on which the query was executed. + response: + The response from the server. + profile: + A boolean indicating if the query command was "GRAPH.PROFILE" + """ + self.graph = graph + self.header = [] + self.result_set = [] + + # in case of an error an exception will be raised + self._check_for_errors(response) + + if len(response) == 1: + self.parse_statistics(response[0]) + elif profile: + self.parse_profile(response) + else: + # start by parsing statistics, matches the one we have + self.parse_statistics(response[-1]) # Last element. + self.parse_results(response) + + def _check_for_errors(self, response): + if isinstance(response[0], ResponseError): + error = response[0] + if str(error) == "version mismatch": + version = response[1] + error = VersionMismatchException(version) + raise error + + # If we encountered a run-time error, the last response + # element will be an exception + if isinstance(response[-1], ResponseError): + raise response[-1] + + def parse_results(self, raw_result_set): + self.header = self.parse_header(raw_result_set) + + # Empty header. + if len(self.header) == 0: + return + + self.result_set = self.parse_records(raw_result_set) + + def parse_statistics(self, raw_statistics): + self.statistics = {} + + # decode statistics + for idx, stat in enumerate(raw_statistics): + if isinstance(stat, bytes): + raw_statistics[idx] = stat.decode() + + for s in STATS: + v = self._get_value(s, raw_statistics) + if v is not None: + self.statistics[s] = v + + def parse_header(self, raw_result_set): + # An array of column name/column type pairs. + header = raw_result_set[0] + return header + + def parse_records(self, raw_result_set): + records = [] + result_set = raw_result_set[1] + for row in result_set: + record = [] + for idx, cell in enumerate(row): + if self.header[idx][0] == ResultSetColumnTypes.COLUMN_SCALAR: # noqa + record.append(self.parse_scalar(cell)) + elif self.header[idx][0] == ResultSetColumnTypes.COLUMN_NODE: # noqa + record.append(self.parse_node(cell)) + elif ( + self.header[idx][0] == ResultSetColumnTypes.COLUMN_RELATION + ): # noqa + record.append(self.parse_edge(cell)) + else: + print("Unknown column type.\n") + records.append(record) + + return records + + def parse_entity_properties(self, props): + # [[name, value type, value] X N] + properties = {} + for prop in props: + prop_name = self.graph.get_property(prop[0]) + prop_value = self.parse_scalar(prop[1:]) + properties[prop_name] = prop_value + + return properties + + def parse_string(self, cell): + if isinstance(cell, bytes): + return cell.decode() + elif not isinstance(cell, str): + return str(cell) + else: + return cell + + def parse_node(self, cell): + # Node ID (integer), + # [label string offset (integer)], + # [[name, value type, value] X N] + + node_id = int(cell[0]) + labels = None + if len(cell[1]) > 0: + labels = [] + for inner_label in cell[1]: + labels.append(self.graph.get_label(inner_label)) + properties = self.parse_entity_properties(cell[2]) + return Node(node_id=node_id, label=labels, properties=properties) + + def parse_edge(self, cell): + # Edge ID (integer), + # reltype string offset (integer), + # src node ID offset (integer), + # dest node ID offset (integer), + # [[name, value, value type] X N] + + edge_id = int(cell[0]) + relation = self.graph.get_relation(cell[1]) + src_node_id = int(cell[2]) + dest_node_id = int(cell[3]) + properties = self.parse_entity_properties(cell[4]) + return Edge( + src_node_id, relation, dest_node_id, edge_id=edge_id, properties=properties + ) + + def parse_path(self, cell): + nodes = self.parse_scalar(cell[0]) + edges = self.parse_scalar(cell[1]) + return Path(nodes, edges) + + def parse_map(self, cell): + m = OrderedDict() + n_entries = len(cell) + + # A map is an array of key value pairs. + # 1. key (string) + # 2. array: (value type, value) + for i in range(0, n_entries, 2): + key = self.parse_string(cell[i]) + m[key] = self.parse_scalar(cell[i + 1]) + + return m + + def parse_point(self, cell): + p = {} + # A point is received an array of the form: [latitude, longitude] + # It is returned as a map of the form: {"latitude": latitude, "longitude": longitude} # noqa + p["latitude"] = float(cell[0]) + p["longitude"] = float(cell[1]) + return p + + def parse_scalar(self, cell): + scalar_type = int(cell[0]) + value = cell[1] + scalar = None + + if scalar_type == ResultSetScalarTypes.VALUE_NULL: + scalar = None + + elif scalar_type == ResultSetScalarTypes.VALUE_STRING: + scalar = self.parse_string(value) + + elif scalar_type == ResultSetScalarTypes.VALUE_INTEGER: + scalar = int(value) + + elif scalar_type == ResultSetScalarTypes.VALUE_BOOLEAN: + value = value.decode() if isinstance(value, bytes) else value + if value == "true": + scalar = True + elif value == "false": + scalar = False + else: + print("Unknown boolean type\n") + + elif scalar_type == ResultSetScalarTypes.VALUE_DOUBLE: + scalar = float(value) + + elif scalar_type == ResultSetScalarTypes.VALUE_ARRAY: + # array variable is introduced only for readability + scalar = array = value + for i in range(len(array)): + scalar[i] = self.parse_scalar(array[i]) + + elif scalar_type == ResultSetScalarTypes.VALUE_NODE: + scalar = self.parse_node(value) + + elif scalar_type == ResultSetScalarTypes.VALUE_EDGE: + scalar = self.parse_edge(value) + + elif scalar_type == ResultSetScalarTypes.VALUE_PATH: + scalar = self.parse_path(value) + + elif scalar_type == ResultSetScalarTypes.VALUE_MAP: + scalar = self.parse_map(value) + + elif scalar_type == ResultSetScalarTypes.VALUE_POINT: + scalar = self.parse_point(value) + + elif scalar_type == ResultSetScalarTypes.VALUE_UNKNOWN: + print("Unknown scalar type\n") + + return scalar + + def parse_profile(self, response): + self.result_set = [x[0 : x.index(",")].strip() for x in response] + + # """Prints the data from the query response: + # 1. First row result_set contains the columns names. + # Thus the first row in PrettyTable will contain the + # columns. + # 2. The row after that will contain the data returned, + # or 'No Data returned' if there is none. + # 3. Prints the statistics of the query. + # """ + + # def pretty_print(self): + # if not self.is_empty(): + # header = [col[1] for col in self.header] + # tbl = PrettyTable(header) + + # for row in self.result_set: + # record = [] + # for idx, cell in enumerate(row): + # if type(cell) is Node: + # record.append(cell.toString()) + # elif type(cell) is Edge: + # record.append(cell.toString()) + # else: + # record.append(cell) + # tbl.add_row(record) + + # if len(self.result_set) == 0: + # tbl.add_row(['No data returned.']) + + # print(str(tbl) + '\n') + + # for stat in self.statistics: + # print("%s %s" % (stat, self.statistics[stat])) + + def is_empty(self): + return len(self.result_set) == 0 + + @staticmethod + def _get_value(prop, statistics): + for stat in statistics: + if prop in stat: + return float(stat.split(": ")[1].split(" ")[0]) + + return None + + def _get_stat(self, stat): + return self.statistics[stat] if stat in self.statistics else 0 + + @property + def labels_added(self): + return self._get_stat(LABELS_ADDED) + + @property + def nodes_created(self): + return self._get_stat(NODES_CREATED) + + @property + def nodes_deleted(self): + return self._get_stat(NODES_DELETED) + + @property + def properties_set(self): + return self._get_stat(PROPERTIES_SET) + + @property + def relationships_created(self): + return self._get_stat(RELATIONSHIPS_CREATED) + + @property + def relationships_deleted(self): + return self._get_stat(RELATIONSHIPS_DELETED) + + @property + def indices_created(self): + return self._get_stat(INDICES_CREATED) + + @property + def indices_deleted(self): + return self._get_stat(INDICES_DELETED) + + @property + def cached_execution(self): + return self._get_stat(CACHED_EXECUTION) == 1 + + @property + def run_time_ms(self): + return self._get_stat(INTERNAL_EXECUTION_TIME) diff --git a/redis/commands/helpers.py b/redis/commands/helpers.py index dc5705b..afb4f9f 100644 --- a/redis/commands/helpers.py +++ b/redis/commands/helpers.py @@ -1,3 +1,4 @@ +import copy import random import string @@ -22,7 +23,7 @@ def list_or_args(keys, args): def nativestr(x): """Return the decoded binary string, or a string, depending on type.""" r = x.decode("utf-8", "replace") if isinstance(x, bytes) else x - if r == 'null': + if r == "null": return return r @@ -58,14 +59,14 @@ def parse_list_to_dict(response): res = {} for i in range(0, len(response), 2): if isinstance(response[i], list): - res['Child iterators'].append(parse_list_to_dict(response[i])) - elif isinstance(response[i+1], list): - res['Child iterators'] = [parse_list_to_dict(response[i+1])] + res["Child iterators"].append(parse_list_to_dict(response[i])) + elif isinstance(response[i + 1], list): + res["Child iterators"] = [parse_list_to_dict(response[i + 1])] else: try: - res[response[i]] = float(response[i+1]) + res[response[i]] = float(response[i + 1]) except (TypeError, ValueError): - res[response[i]] = response[i+1] + res[response[i]] = response[i + 1] return res @@ -114,3 +115,40 @@ def quote_string(v): v = v.replace('"', '\\"') return f'"{v}"' + + +def decodeDictKeys(obj): + """Decode the keys of the given dictionary with utf-8.""" + newobj = copy.copy(obj) + for k in obj.keys(): + if isinstance(k, bytes): + newobj[k.decode("utf-8")] = newobj[k] + newobj.pop(k) + return newobj + + +def stringify_param_value(value): + """ + Turn a parameter value into a string suitable for the params header of + a Cypher command. + You may pass any value that would be accepted by `json.dumps()`. + + Ways in which output differs from that of `str()`: + * Strings are quoted. + * None --> "null". + * In dictionaries, keys are _not_ quoted. + + :param value: The parameter value to be turned into a string. + :return: string + """ + + if isinstance(value, str): + return quote_string(value) + elif value is None: + return "null" + elif isinstance(value, (list, tuple)): + return f'[{",".join(map(stringify_param_value, value))}]' + elif isinstance(value, dict): + return f'{{{",".join(f"{k}:{stringify_param_value(v)}" for k, v in value.items())}}}' # noqa + else: + return str(value) diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py index d634dbd..12c0648 100644 --- a/redis/commands/json/__init__.py +++ b/redis/commands/json/__init__.py @@ -1,12 +1,10 @@ -from json import JSONDecoder, JSONEncoder, JSONDecodeError +from json import JSONDecodeError, JSONDecoder, JSONEncoder + +import redis -from .decoders import ( - decode_list, - bulk_of_jsons, -) from ..helpers import nativestr from .commands import JSONCommands -import redis +from .decoders import bulk_of_jsons, decode_list class JSON(JSONCommands): diff --git a/redis/commands/json/commands.py b/redis/commands/json/commands.py index 1affaaf..e7f07b6 100644 --- a/redis/commands/json/commands.py +++ b/redis/commands/json/commands.py @@ -1,8 +1,10 @@ -from .path import Path -from .decoders import decode_dict_keys from deprecated import deprecated + from redis.exceptions import DataError +from .decoders import decode_dict_keys +from .path import Path + class JSONCommands: """json commands.""" @@ -29,8 +31,7 @@ class JSONCommands: For more information: https://oss.redis.com/redisjson/commands/#jsonarrindex """ # noqa return self.execute_command( - "JSON.ARRINDEX", name, str(path), self._encode(scalar), - start, stop + "JSON.ARRINDEX", name, str(path), self._encode(scalar), start, stop ) def arrinsert(self, name, path, index, *args): @@ -66,8 +67,7 @@ class JSONCommands: For more information: https://oss.redis.com/redisjson/commands/#jsonarrtrim """ # noqa - return self.execute_command("JSON.ARRTRIM", name, str(path), - start, stop) + return self.execute_command("JSON.ARRTRIM", name, str(path), start, stop) def type(self, name, path=Path.rootPath()): """Get the type of the JSON value under ``path`` from key ``name``. @@ -109,7 +109,7 @@ class JSONCommands: "JSON.NUMINCRBY", name, str(path), self._encode(number) ) - @deprecated(version='4.0.0', reason='deprecated since redisjson 1.0.0') + @deprecated(version="4.0.0", reason="deprecated since redisjson 1.0.0") def nummultby(self, name, path, number): """Multiply the numeric (integer or floating point) JSON value under ``path`` at key ``name`` with the provided ``number``. @@ -218,7 +218,7 @@ class JSONCommands: ``name``. For more information: https://oss.redis.com/redisjson/commands/#jsonstrlen - """ # noqa + """ # noqa pieces = [name] if path is not None: pieces.append(str(path)) @@ -240,9 +240,7 @@ class JSONCommands: For more information: https://oss.redis.com/redisjson/commands/#jsonstrappend """ # noqa pieces = [name, str(path), self._encode(value)] - return self.execute_command( - "JSON.STRAPPEND", *pieces - ) + return self.execute_command("JSON.STRAPPEND", *pieces) def debug(self, subcommand, key=None, path=Path.rootPath()): """Return the memory usage in bytes of a value under ``path`` from @@ -252,8 +250,7 @@ class JSONCommands: """ # noqa valid_subcommands = ["MEMORY", "HELP"] if subcommand not in valid_subcommands: - raise DataError("The only valid subcommands are ", - str(valid_subcommands)) + raise DataError("The only valid subcommands are ", str(valid_subcommands)) pieces = [subcommand] if subcommand == "MEMORY": if key is None: @@ -262,17 +259,20 @@ class JSONCommands: pieces.append(str(path)) return self.execute_command("JSON.DEBUG", *pieces) - @deprecated(version='4.0.0', - reason='redisjson-py supported this, call get directly.') + @deprecated( + version="4.0.0", reason="redisjson-py supported this, call get directly." + ) def jsonget(self, *args, **kwargs): return self.get(*args, **kwargs) - @deprecated(version='4.0.0', - reason='redisjson-py supported this, call get directly.') + @deprecated( + version="4.0.0", reason="redisjson-py supported this, call get directly." + ) def jsonmget(self, *args, **kwargs): return self.mget(*args, **kwargs) - @deprecated(version='4.0.0', - reason='redisjson-py supported this, call get directly.') + @deprecated( + version="4.0.0", reason="redisjson-py supported this, call get directly." + ) def jsonset(self, *args, **kwargs): return self.set(*args, **kwargs) diff --git a/redis/commands/json/decoders.py b/redis/commands/json/decoders.py index b19395c..b938471 100644 --- a/redis/commands/json/decoders.py +++ b/redis/commands/json/decoders.py @@ -1,6 +1,7 @@ -from ..helpers import nativestr -import re import copy +import re + +from ..helpers import nativestr def bulk_of_jsons(d): @@ -33,7 +34,7 @@ def unstring(obj): One can't simply call int/float in a try/catch because there is a semantic difference between (for example) 15.0 and 15. """ - floatreg = '^\\d+.\\d+$' + floatreg = "^\\d+.\\d+$" match = re.findall(floatreg, obj) if match != []: return float(match[0]) diff --git a/redis/commands/parser.py b/redis/commands/parser.py index 26b190c..dadf3c6 100644 --- a/redis/commands/parser.py +++ b/redis/commands/parser.py @@ -1,7 +1,4 @@ -from redis.exceptions import ( - RedisError, - ResponseError -) +from redis.exceptions import RedisError, ResponseError from redis.utils import str_if_bytes @@ -13,6 +10,7 @@ class CommandsParser: 'movablekeys', and these commands' keys are determined by the command 'COMMAND GETKEYS'. """ + def __init__(self, redis_connection): self.initialized = False self.commands = {} @@ -51,20 +49,24 @@ class CommandsParser: ) command = self.commands.get(cmd_name) - if 'movablekeys' in command['flags']: + if "movablekeys" in command["flags"]: keys = self._get_moveable_keys(redis_conn, *args) - elif 'pubsub' in command['flags']: + elif "pubsub" in command["flags"]: keys = self._get_pubsub_keys(*args) else: - if command['step_count'] == 0 and command['first_key_pos'] == 0 \ - and command['last_key_pos'] == 0: + if ( + command["step_count"] == 0 + and command["first_key_pos"] == 0 + and command["last_key_pos"] == 0 + ): # The command doesn't have keys in it return None - last_key_pos = command['last_key_pos'] + last_key_pos = command["last_key_pos"] if last_key_pos < 0: last_key_pos = len(args) - abs(last_key_pos) - keys_pos = list(range(command['first_key_pos'], last_key_pos + 1, - command['step_count'])) + keys_pos = list( + range(command["first_key_pos"], last_key_pos + 1, command["step_count"]) + ) keys = [args[pos] for pos in keys_pos] return keys @@ -77,11 +79,13 @@ class CommandsParser: pieces = pieces + cmd_name.split() pieces = pieces + list(args[1:]) try: - keys = redis_conn.execute_command('COMMAND GETKEYS', *pieces) + keys = redis_conn.execute_command("COMMAND GETKEYS", *pieces) except ResponseError as e: message = e.__str__() - if 'Invalid arguments' in message or \ - 'The command has no key arguments' in message: + if ( + "Invalid arguments" in message + or "The command has no key arguments" in message + ): return None else: raise e @@ -99,18 +103,17 @@ class CommandsParser: return None args = [str_if_bytes(arg) for arg in args] command = args[0].upper() - if command == 'PUBSUB': + if command == "PUBSUB": # the second argument is a part of the command name, e.g. # ['PUBSUB', 'NUMSUB', 'foo']. pubsub_type = args[1].upper() - if pubsub_type in ['CHANNELS', 'NUMSUB']: + if pubsub_type in ["CHANNELS", "NUMSUB"]: keys = args[2:] - elif command in ['SUBSCRIBE', 'PSUBSCRIBE', 'UNSUBSCRIBE', - 'PUNSUBSCRIBE']: + elif command in ["SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE"]: # format example: # SUBSCRIBE channel [channel ...] keys = list(args[1:]) - elif command == 'PUBLISH': + elif command == "PUBLISH": # format example: # PUBLISH channel message keys = [args[1]] diff --git a/redis/commands/redismodules.py b/redis/commands/redismodules.py index 5f629fb..eafd650 100644 --- a/redis/commands/redismodules.py +++ b/redis/commands/redismodules.py @@ -1,4 +1,4 @@ -from json import JSONEncoder, JSONDecoder +from json import JSONDecoder, JSONEncoder class RedisModuleCommands: @@ -7,21 +7,18 @@ class RedisModuleCommands: """ def json(self, encoder=JSONEncoder(), decoder=JSONDecoder()): - """Access the json namespace, providing support for redis json. - """ + """Access the json namespace, providing support for redis json.""" from .json import JSON - jj = JSON( - client=self, - encoder=encoder, - decoder=decoder) + + jj = JSON(client=self, encoder=encoder, decoder=decoder) return jj def ft(self, index_name="idx"): - """Access the search namespace, providing support for redis search. - """ + """Access the search namespace, providing support for redis search.""" from .search import Search + s = Search(client=self, index_name=index_name) return s @@ -31,5 +28,56 @@ class RedisModuleCommands: """ from .timeseries import TimeSeries + s = TimeSeries(client=self) return s + + def bf(self): + """Access the bloom namespace.""" + + from .bf import BFBloom + + bf = BFBloom(client=self) + return bf + + def cf(self): + """Access the bloom namespace.""" + + from .bf import CFBloom + + cf = CFBloom(client=self) + return cf + + def cms(self): + """Access the bloom namespace.""" + + from .bf import CMSBloom + + cms = CMSBloom(client=self) + return cms + + def topk(self): + """Access the bloom namespace.""" + + from .bf import TOPKBloom + + topk = TOPKBloom(client=self) + return topk + + def tdigest(self): + """Access the bloom namespace.""" + + from .bf import TDigestBloom + + tdigest = TDigestBloom(client=self) + return tdigest + + def graph(self, index_name="idx"): + """Access the timeseries namespace, providing support for + redis timeseries data. + """ + + from .graph import Graph + + g = Graph(client=self, name=index_name) + return g diff --git a/redis/commands/search/__init__.py b/redis/commands/search/__init__.py index a30cebe..94bc037 100644 --- a/redis/commands/search/__init__.py +++ b/redis/commands/search/__init__.py @@ -35,7 +35,7 @@ class Search(SearchCommands): replace=False, partial=False, no_create=False, - **fields + **fields, ): """ Add a document to the batch query @@ -49,7 +49,7 @@ class Search(SearchCommands): replace=replace, partial=partial, no_create=no_create, - **fields + **fields, ) self.current_chunk += 1 self.total += 1 diff --git a/redis/commands/search/aggregation.py b/redis/commands/search/aggregation.py index b1ac6b0..061e69c 100644 --- a/redis/commands/search/aggregation.py +++ b/redis/commands/search/aggregation.py @@ -82,75 +82,6 @@ class Desc(SortDirection): DIRSTRING = "DESC" -class Group: - """ - This object automatically created in the `AggregateRequest.group_by()` - """ - - def __init__(self, fields, reducers): - if not reducers: - raise ValueError("Need at least one reducer") - - fields = [fields] if isinstance(fields, str) else fields - reducers = [reducers] if isinstance(reducers, Reducer) else reducers - - self.fields = fields - self.reducers = reducers - self.limit = Limit() - - def build_args(self): - ret = ["GROUPBY", str(len(self.fields))] - ret.extend(self.fields) - for reducer in self.reducers: - ret += ["REDUCE", reducer.NAME, str(len(reducer.args))] - ret.extend(reducer.args) - if reducer._alias is not None: - ret += ["AS", reducer._alias] - return ret - - -class Projection: - """ - This object automatically created in the `AggregateRequest.apply()` - """ - - def __init__(self, projector, alias=None): - self.alias = alias - self.projector = projector - - def build_args(self): - ret = ["APPLY", self.projector] - if self.alias is not None: - ret += ["AS", self.alias] - - return ret - - -class SortBy: - """ - This object automatically created in the `AggregateRequest.sort_by()` - """ - - def __init__(self, fields, max=0): - self.fields = fields - self.max = max - - def build_args(self): - fields_args = [] - for f in self.fields: - if isinstance(f, SortDirection): - fields_args += [f.field, f.DIRSTRING] - else: - fields_args += [f] - - ret = ["SORTBY", str(len(fields_args))] - ret.extend(fields_args) - if self.max > 0: - ret += ["MAX", str(self.max)] - - return ret - - class AggregateRequest: """ Aggregation request which can be passed to `Client.aggregate`. @@ -172,6 +103,7 @@ class AggregateRequest: self._query = query self._aggregateplan = [] self._loadfields = [] + self._loadall = False self._limit = Limit() self._max = 0 self._with_schema = False @@ -185,9 +117,13 @@ class AggregateRequest: ### Parameters - - **fields**: One or more fields in the format of `@field` + - **fields**: If fields not specified, all the fields will be loaded. + Otherwise, fields should be given in the format of `@field`. """ - self._loadfields.extend(fields) + if fields: + self._loadfields.extend(fields) + else: + self._loadall = True return self def group_by(self, fields, *reducers): @@ -202,9 +138,17 @@ class AggregateRequest: - **reducers**: One or more reducers. Reducers may be found in the `aggregation` module. """ - group = Group(fields, reducers) - self._aggregateplan.extend(group.build_args()) + fields = [fields] if isinstance(fields, str) else fields + reducers = [reducers] if isinstance(reducers, Reducer) else reducers + ret = ["GROUPBY", str(len(fields)), *fields] + for reducer in reducers: + ret += ["REDUCE", reducer.NAME, str(len(reducer.args))] + ret.extend(reducer.args) + if reducer._alias is not None: + ret += ["AS", reducer._alias] + + self._aggregateplan.extend(ret) return self def apply(self, **kwexpr): @@ -218,8 +162,10 @@ class AggregateRequest: expression itself, for example `apply(square_root="sqrt(@foo)")` """ for alias, expr in kwexpr.items(): - projection = Projection(expr, alias) - self._aggregateplan.extend(projection.build_args()) + ret = ["APPLY", expr] + if alias is not None: + ret += ["AS", alias] + self._aggregateplan.extend(ret) return self @@ -265,8 +211,7 @@ class AggregateRequest: `sort_by()` instead. """ - limit = Limit(offset, num) - self._limit = limit + self._limit = Limit(offset, num) return self def sort_by(self, *fields, **kwargs): @@ -300,10 +245,20 @@ class AggregateRequest: if isinstance(fields, (str, SortDirection)): fields = [fields] + fields_args = [] + for f in fields: + if isinstance(f, SortDirection): + fields_args += [f.field, f.DIRSTRING] + else: + fields_args += [f] + + ret = ["SORTBY", str(len(fields_args))] + ret.extend(fields_args) max = kwargs.get("max", 0) - sortby = SortBy(fields, max) + if max > 0: + ret += ["MAX", str(max)] - self._aggregateplan.extend(sortby.build_args()) + self._aggregateplan.extend(ret) return self def filter(self, expressions): @@ -358,7 +313,10 @@ class AggregateRequest: if self._cursor: ret += self._cursor - if self._loadfields: + if self._loadall: + ret.append("LOAD") + ret.append("*") + elif self._loadfields: ret.append("LOAD") ret.append(str(len(self._loadfields))) ret.extend(self._loadfields) diff --git a/redis/commands/search/commands.py b/redis/commands/search/commands.py index 553bc39..4ec6fc9 100644 --- a/redis/commands/search/commands.py +++ b/redis/commands/search/commands.py @@ -1,13 +1,13 @@ import itertools import time -from .document import Document -from .result import Result -from .query import Query +from ..helpers import parse_to_dict from ._util import to_string from .aggregation import AggregateRequest, AggregateResult, Cursor +from .document import Document +from .query import Query +from .result import Result from .suggestion import SuggestionParser -from ..helpers import parse_to_dict NUMERIC = "NUMERIC" @@ -148,7 +148,7 @@ class SearchCommands: partial=False, language=None, no_create=False, - **fields + **fields, ): """ Internal add_document used for both batch and single doc indexing @@ -211,7 +211,7 @@ class SearchCommands: partial=False, language=None, no_create=False, - **fields + **fields, ): """ Add a single document to the index. @@ -253,7 +253,7 @@ class SearchCommands: partial=partial, language=language, no_create=no_create, - **fields + **fields, ) def add_document_hash( @@ -274,7 +274,7 @@ class SearchCommands: - **replace**: if True, and the document already is in the index, we perform an update and reindex the document - **language**: Specify the language used for document tokenization. - + For more information: https://oss.redis.com/redisearch/Commands/#ftaddhash """ # noqa return self._add_document_hash( @@ -294,7 +294,7 @@ class SearchCommands: - **delete_actual_document**: if set to True, RediSearch also delete the actual document if it is in the index - + For more information: https://oss.redis.com/redisearch/Commands/#ftdel """ # noqa args = [DEL_CMD, self.index_name, doc_id] @@ -453,7 +453,7 @@ class SearchCommands: cmd = [PROFILE_CMD, self.index_name, ""] if limited: cmd.append("LIMITED") - cmd.append('QUERY') + cmd.append("QUERY") if isinstance(query, AggregateRequest): cmd[2] = "AGGREGATE" @@ -462,19 +462,20 @@ class SearchCommands: cmd[2] = "SEARCH" cmd += query.get_args() else: - raise ValueError("Must provide AggregateRequest object or " - "Query object.") + raise ValueError("Must provide AggregateRequest object or " "Query object.") res = self.execute_command(*cmd) if isinstance(query, AggregateRequest): result = self._get_AggregateResult(res[0], query, query._cursor) else: - result = Result(res[0], - not query._no_content, - duration=(time.time() - st) * 1000.0, - has_payload=query._with_payloads, - with_scores=query._with_scores,) + result = Result( + res[0], + not query._no_content, + duration=(time.time() - st) * 1000.0, + has_payload=query._with_payloads, + with_scores=query._with_scores, + ) return result, parse_to_dict(res[1]) @@ -535,8 +536,7 @@ class SearchCommands: # ] # } corrections[_correction[1]] = [ - {"score": _item[0], "suggestion": _item[1]} - for _item in _correction[2] + {"score": _item[0], "suggestion": _item[1]} for _item in _correction[2] ] return corrections @@ -704,8 +704,7 @@ class SearchCommands: return self.execute_command(SUGDEL_COMMAND, key, string) def sugget( - self, key, prefix, fuzzy=False, num=10, with_scores=False, - with_payloads=False + self, key, prefix, fuzzy=False, num=10, with_scores=False, with_payloads=False ): """ Get a list of suggestions from the AutoCompleter, for a given prefix. @@ -769,7 +768,7 @@ class SearchCommands: If set to true, we do not scan and index. terms : The terms. - + For more information: https://oss.redis.com/redisearch/Commands/#ftsynupdate """ # noqa cmd = [SYNUPDATE_CMD, self.index_name, groupid] diff --git a/redis/commands/search/field.py b/redis/commands/search/field.py index 076c872..69e3908 100644 --- a/redis/commands/search/field.py +++ b/redis/commands/search/field.py @@ -9,8 +9,7 @@ class Field: NOINDEX = "NOINDEX" AS = "AS" - def __init__(self, name, args=[], sortable=False, - no_index=False, as_name=None): + def __init__(self, name, args=[], sortable=False, no_index=False, as_name=None): self.name = name self.args = args self.args_suffix = list() @@ -47,8 +46,7 @@ class TextField(Field): def __init__( self, name, weight=1.0, no_stem=False, phonetic_matcher=None, **kwargs ): - Field.__init__(self, name, - args=[Field.TEXT, Field.WEIGHT, weight], **kwargs) + Field.__init__(self, name, args=[Field.TEXT, Field.WEIGHT, weight], **kwargs) if no_stem: Field.append_arg(self, self.NOSTEM) diff --git a/redis/commands/search/query.py b/redis/commands/search/query.py index 5534f7b..2bb8347 100644 --- a/redis/commands/search/query.py +++ b/redis/commands/search/query.py @@ -62,11 +62,9 @@ class Query: def _mk_field_list(self, fields): if not fields: return [] - return \ - [fields] if isinstance(fields, str) else list(fields) + return [fields] if isinstance(fields, str) else list(fields) - def summarize(self, fields=None, context_len=None, - num_frags=None, sep=None): + def summarize(self, fields=None, context_len=None, num_frags=None, sep=None): """ Return an abridged format of the field, containing only the segments of the field which contain the matching term(s). @@ -300,8 +298,7 @@ class NumericFilter(Filter): INF = "+inf" NEG_INF = "-inf" - def __init__(self, field, minval, maxval, minExclusive=False, - maxExclusive=False): + def __init__(self, field, minval, maxval, minExclusive=False, maxExclusive=False): args = [ minval if not minExclusive else f"({minval}", maxval if not maxExclusive else f"({maxval}", diff --git a/redis/commands/search/querystring.py b/redis/commands/search/querystring.py index ffba542..1da0387 100644 --- a/redis/commands/search/querystring.py +++ b/redis/commands/search/querystring.py @@ -15,8 +15,7 @@ def between(a, b, inclusive_min=True, inclusive_max=True): """ Indicate that value is a numeric range """ - return RangeValue(a, b, inclusive_min=inclusive_min, - inclusive_max=inclusive_max) + return RangeValue(a, b, inclusive_min=inclusive_min, inclusive_max=inclusive_max) def equal(n): @@ -200,9 +199,7 @@ class Node: return [BaseNode(f"@{key}:{vals[0].to_string()}")] if not vals[0].combinable: return [BaseNode(f"@{key}:{v.to_string()}") for v in vals] - s = BaseNode( - f"@{key}:({self.JOINSTR.join(v.to_string() for v in vals)})" - ) + s = BaseNode(f"@{key}:({self.JOINSTR.join(v.to_string() for v in vals)})") return [s] @classmethod diff --git a/redis/commands/search/result.py b/redis/commands/search/result.py index 57ba53d..5f4aca6 100644 --- a/redis/commands/search/result.py +++ b/redis/commands/search/result.py @@ -1,5 +1,5 @@ -from .document import Document from ._util import to_string +from .document import Document class Result: diff --git a/redis/commands/search/suggestion.py b/redis/commands/search/suggestion.py index 6d295a6..5d1eba6 100644 --- a/redis/commands/search/suggestion.py +++ b/redis/commands/search/suggestion.py @@ -46,8 +46,6 @@ class SuggestionParser: def __iter__(self): for i in range(0, len(self._sugs), self.sugsize): ss = self._sugs[i] - score = float(self._sugs[i + self._scoreidx]) \ - if self.with_scores else 1.0 - payload = self._sugs[i + self._payloadidx] \ - if self.with_payloads else None + score = float(self._sugs[i + self._scoreidx]) if self.with_scores else 1.0 + payload = self._sugs[i + self._payloadidx] if self.with_payloads else None yield Suggestion(ss, score, payload) diff --git a/redis/commands/sentinel.py b/redis/commands/sentinel.py index 1f02984..a9b06c2 100644 --- a/redis/commands/sentinel.py +++ b/redis/commands/sentinel.py @@ -9,41 +9,39 @@ class SentinelCommands: def sentinel(self, *args): "Redis Sentinel's SENTINEL command." - warnings.warn( - DeprecationWarning('Use the individual sentinel_* methods')) + warnings.warn(DeprecationWarning("Use the individual sentinel_* methods")) def sentinel_get_master_addr_by_name(self, service_name): "Returns a (host, port) pair for the given ``service_name``" - return self.execute_command('SENTINEL GET-MASTER-ADDR-BY-NAME', - service_name) + return self.execute_command("SENTINEL GET-MASTER-ADDR-BY-NAME", service_name) def sentinel_master(self, service_name): "Returns a dictionary containing the specified masters state." - return self.execute_command('SENTINEL MASTER', service_name) + return self.execute_command("SENTINEL MASTER", service_name) def sentinel_masters(self): "Returns a list of dictionaries containing each master's state." - return self.execute_command('SENTINEL MASTERS') + return self.execute_command("SENTINEL MASTERS") def sentinel_monitor(self, name, ip, port, quorum): "Add a new master to Sentinel to be monitored" - return self.execute_command('SENTINEL MONITOR', name, ip, port, quorum) + return self.execute_command("SENTINEL MONITOR", name, ip, port, quorum) def sentinel_remove(self, name): "Remove a master from Sentinel's monitoring" - return self.execute_command('SENTINEL REMOVE', name) + return self.execute_command("SENTINEL REMOVE", name) def sentinel_sentinels(self, service_name): "Returns a list of sentinels for ``service_name``" - return self.execute_command('SENTINEL SENTINELS', service_name) + return self.execute_command("SENTINEL SENTINELS", service_name) def sentinel_set(self, name, option, value): "Set Sentinel monitoring parameters for a given master" - return self.execute_command('SENTINEL SET', name, option, value) + return self.execute_command("SENTINEL SET", name, option, value) def sentinel_slaves(self, service_name): "Returns a list of slaves for ``service_name``" - return self.execute_command('SENTINEL SLAVES', service_name) + return self.execute_command("SENTINEL SLAVES", service_name) def sentinel_reset(self, pattern): """ @@ -54,7 +52,7 @@ class SentinelCommands: failover in progress), and removes every slave and sentinel already discovered and associated with the master. """ - return self.execute_command('SENTINEL RESET', pattern, once=True) + return self.execute_command("SENTINEL RESET", pattern, once=True) def sentinel_failover(self, new_master_name): """ @@ -63,7 +61,7 @@ class SentinelCommands: configuration will be published so that the other Sentinels will update their configurations). """ - return self.execute_command('SENTINEL FAILOVER', new_master_name) + return self.execute_command("SENTINEL FAILOVER", new_master_name) def sentinel_ckquorum(self, new_master_name): """ @@ -74,9 +72,7 @@ class SentinelCommands: This command should be used in monitoring systems to check if a Sentinel deployment is ok. """ - return self.execute_command('SENTINEL CKQUORUM', - new_master_name, - once=True) + return self.execute_command("SENTINEL CKQUORUM", new_master_name, once=True) def sentinel_flushconfig(self): """ @@ -94,4 +90,4 @@ class SentinelCommands: This command works even if the previous configuration file is completely missing. """ - return self.execute_command('SENTINEL FLUSHCONFIG') + return self.execute_command("SENTINEL FLUSHCONFIG") diff --git a/redis/commands/timeseries/__init__.py b/redis/commands/timeseries/__init__.py index 5ce538f..5b1f151 100644 --- a/redis/commands/timeseries/__init__.py +++ b/redis/commands/timeseries/__init__.py @@ -1,19 +1,12 @@ import redis.client -from .utils import ( - parse_range, - parse_get, - parse_m_range, - parse_m_get, -) -from .info import TSInfo from ..helpers import parse_to_list from .commands import ( ALTER_CMD, CREATE_CMD, CREATERULE_CMD, - DELETERULE_CMD, DEL_CMD, + DELETERULE_CMD, GET_CMD, INFO_CMD, MGET_CMD, @@ -24,6 +17,8 @@ from .commands import ( REVRANGE_CMD, TimeSeriesCommands, ) +from .info import TSInfo +from .utils import parse_get, parse_m_get, parse_m_range, parse_range class TimeSeries(TimeSeriesCommands): diff --git a/redis/commands/timeseries/commands.py b/redis/commands/timeseries/commands.py index 460ba76..c86e0b9 100644 --- a/redis/commands/timeseries/commands.py +++ b/redis/commands/timeseries/commands.py @@ -1,6 +1,5 @@ from redis.exceptions import DataError - ADD_CMD = "TS.ADD" ALTER_CMD = "TS.ALTER" CREATERULE_CMD = "TS.CREATERULE" @@ -58,7 +57,7 @@ class TimeSeriesCommands: - 'min': only override if the value is lower than the existing value. - 'max': only override if the value is higher than the existing value. When this is not set, the server-wide default will be used. - + For more information: https://oss.redis.com/redistimeseries/commands/#tscreate """ # noqa retention_msecs = kwargs.get("retention_msecs", None) @@ -81,7 +80,7 @@ class TimeSeriesCommands: For more information see The parameters are the same as TS.CREATE. - + For more information: https://oss.redis.com/redistimeseries/commands/#tsalter """ # noqa retention_msecs = kwargs.get("retention_msecs", None) @@ -129,7 +128,7 @@ class TimeSeriesCommands: - 'min': only override if the value is lower than the existing value. - 'max': only override if the value is higher than the existing value. When this is not set, the server-wide default will be used. - + For more information: https://oss.redis.com/redistimeseries/master/commands/#tsadd """ # noqa retention_msecs = kwargs.get("retention_msecs", None) @@ -276,13 +275,7 @@ class TimeSeriesCommands: """ # noqa return self.execute_command(DEL_CMD, key, from_time, to_time) - def createrule( - self, - source_key, - dest_key, - aggregation_type, - bucket_size_msec - ): + def createrule(self, source_key, dest_key, aggregation_type, bucket_size_msec): """ Create a compaction rule from values added to `source_key` into `dest_key`. Aggregating for `bucket_size_msec` where an `aggregation_type` can be @@ -321,11 +314,7 @@ class TimeSeriesCommands: """Create TS.RANGE and TS.REVRANGE arguments.""" params = [key, from_time, to_time] self._appendFilerByTs(params, filter_by_ts) - self._appendFilerByValue( - params, - filter_by_min_value, - filter_by_max_value - ) + self._appendFilerByValue(params, filter_by_min_value, filter_by_max_value) self._appendCount(params, count) self._appendAlign(params, align) self._appendAggregation(params, aggregation_type, bucket_size_msec) @@ -471,11 +460,7 @@ class TimeSeriesCommands: """Create TS.MRANGE and TS.MREVRANGE arguments.""" params = [from_time, to_time] self._appendFilerByTs(params, filter_by_ts) - self._appendFilerByValue( - params, - filter_by_min_value, - filter_by_max_value - ) + self._appendFilerByValue(params, filter_by_min_value, filter_by_max_value) self._appendCount(params, count) self._appendAlign(params, align) self._appendAggregation(params, aggregation_type, bucket_size_msec) @@ -654,7 +639,7 @@ class TimeSeriesCommands: return self.execute_command(MREVRANGE_CMD, *params) def get(self, key): - """ # noqa + """# noqa Get the last sample of `key`. For more information: https://oss.redis.com/redistimeseries/master/commands/#tsget @@ -662,7 +647,7 @@ class TimeSeriesCommands: return self.execute_command(GET_CMD, key) def mget(self, filters, with_labels=False): - """ # noqa + """# noqa Get the last samples matching the specific `filter`. For more information: https://oss.redis.com/redistimeseries/master/commands/#tsmget @@ -674,7 +659,7 @@ class TimeSeriesCommands: return self.execute_command(MGET_CMD, *params) def info(self, key): - """ # noqa + """# noqa Get information of `key`. For more information: https://oss.redis.com/redistimeseries/master/commands/#tsinfo @@ -682,7 +667,7 @@ class TimeSeriesCommands: return self.execute_command(INFO_CMD, key) def queryindex(self, filters): - """ # noqa + """# noqa Get all the keys matching the `filter` list. For more information: https://oss.redis.com/redistimeseries/master/commands/#tsqueryindex diff --git a/redis/commands/timeseries/info.py b/redis/commands/timeseries/info.py index 2b8acd1..fba7f09 100644 --- a/redis/commands/timeseries/info.py +++ b/redis/commands/timeseries/info.py @@ -1,5 +1,5 @@ -from .utils import list_to_dict from ..helpers import nativestr +from .utils import list_to_dict class TSInfo: diff --git a/redis/commands/timeseries/utils.py b/redis/commands/timeseries/utils.py index c33b7c5..c49b040 100644 --- a/redis/commands/timeseries/utils.py +++ b/redis/commands/timeseries/utils.py @@ -2,9 +2,7 @@ from ..helpers import nativestr def list_to_dict(aList): - return { - nativestr(aList[i][0]): nativestr(aList[i][1]) - for i in range(len(aList))} + return {nativestr(aList[i][0]): nativestr(aList[i][1]) for i in range(len(aList))} def parse_range(response): @@ -16,9 +14,7 @@ def parse_m_range(response): """Parse multi range response. Used by TS.MRANGE and TS.MREVRANGE.""" res = [] for item in response: - res.append( - {nativestr(item[0]): - [list_to_dict(item[1]), parse_range(item[2])]}) + res.append({nativestr(item[0]): [list_to_dict(item[1]), parse_range(item[2])]}) return sorted(res, key=lambda d: list(d.keys())) @@ -34,8 +30,7 @@ def parse_m_get(response): res = [] for item in response: if not item[2]: - res.append( - {nativestr(item[0]): [list_to_dict(item[1]), None, None]}) + res.append({nativestr(item[0]): [list_to_dict(item[1]), None, None]}) else: res.append( { |