diff options
author | Avital Fine <79420960+AvitalFineRedis@users.noreply.github.com> | 2021-12-01 19:03:18 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-01 20:03:18 +0200 |
commit | 42101fc383829bb179a266420132d3f862861972 (patch) | |
tree | a13d70d174de9f68282d8a7f38cd3b4a26d464b4 | |
parent | d4252277a9dafed5af34b3f40ed7a57fc952d273 (diff) | |
download | redis-py-42101fc383829bb179a266420132d3f862861972.tar.gz |
Integrate RedisBloom support (#1683)
Co-authored-by: Chayim I. Kirshen <c@kirshen.com>
-rw-r--r-- | redis/commands/bf/__init__.py | 204 | ||||
-rw-r--r-- | redis/commands/bf/commands.py | 494 | ||||
-rw-r--r-- | redis/commands/bf/info.py | 85 | ||||
-rw-r--r-- | redis/commands/redismodules.py | 40 | ||||
-rw-r--r-- | setup.py | 1 | ||||
-rw-r--r-- | tests/test_bloom.py | 383 | ||||
-rw-r--r-- | tox.ini | 1 |
7 files changed, 1208 insertions, 0 deletions
diff --git a/redis/commands/bf/__init__.py b/redis/commands/bf/__init__.py new file mode 100644 index 0000000..f34e11d --- /dev/null +++ b/redis/commands/bf/__init__.py @@ -0,0 +1,204 @@ +from redis.client import bool_ok + +from ..helpers import parse_to_list +from .commands import * # noqa +from .info import BFInfo, CFInfo, CMSInfo, TDigestInfo, TopKInfo + + +class AbstractBloom(object): + """ + The client allows to interact with RedisBloom and use all of + it's functionality. + + - BF for Bloom Filter + - CF for Cuckoo Filter + - CMS for Count-Min Sketch + - TOPK for TopK Data Structure + - TDIGEST for estimate rank statistics + """ + + @staticmethod + def appendItems(params, items): + """Append ITEMS to params.""" + params.extend(["ITEMS"]) + params += items + + @staticmethod + def appendError(params, error): + """Append ERROR to params.""" + if error is not None: + params.extend(["ERROR", error]) + + @staticmethod + def appendCapacity(params, capacity): + """Append CAPACITY to params.""" + if capacity is not None: + params.extend(["CAPACITY", capacity]) + + @staticmethod + def appendExpansion(params, expansion): + """Append EXPANSION to params.""" + if expansion is not None: + params.extend(["EXPANSION", expansion]) + + @staticmethod + def appendNoScale(params, noScale): + """Append NONSCALING tag to params.""" + if noScale is not None: + params.extend(["NONSCALING"]) + + @staticmethod + def appendWeights(params, weights): + """Append WEIGHTS to params.""" + if len(weights) > 0: + params.append("WEIGHTS") + params += weights + + @staticmethod + def appendNoCreate(params, noCreate): + """Append NOCREATE tag to params.""" + if noCreate is not None: + params.extend(["NOCREATE"]) + + @staticmethod + def appendItemsAndIncrements(params, items, increments): + """Append pairs of items and increments to params.""" + for i in range(len(items)): + params.append(items[i]) + params.append(increments[i]) + + @staticmethod + def appendValuesAndWeights(params, items, weights): + """Append pairs of items and weights to params.""" + for i in range(len(items)): + params.append(items[i]) + params.append(weights[i]) + + @staticmethod + def appendMaxIterations(params, max_iterations): + """Append MAXITERATIONS to params.""" + if max_iterations is not None: + params.extend(["MAXITERATIONS", max_iterations]) + + @staticmethod + def appendBucketSize(params, bucket_size): + """Append BUCKETSIZE to params.""" + if bucket_size is not None: + params.extend(["BUCKETSIZE", bucket_size]) + + +class CMSBloom(CMSCommands, AbstractBloom): + def __init__(self, client, **kwargs): + """Create a new RedisBloom client.""" + # Set the module commands' callbacks + MODULE_CALLBACKS = { + CMS_INITBYDIM: bool_ok, + CMS_INITBYPROB: bool_ok, + # CMS_INCRBY: spaceHolder, + # CMS_QUERY: spaceHolder, + CMS_MERGE: bool_ok, + CMS_INFO: CMSInfo, + } + + self.client = client + self.commandmixin = CMSCommands + self.execute_command = client.execute_command + + for k, v in MODULE_CALLBACKS.items(): + self.client.set_response_callback(k, v) + + +class TOPKBloom(TOPKCommands, AbstractBloom): + def __init__(self, client, **kwargs): + """Create a new RedisBloom client.""" + # Set the module commands' callbacks + MODULE_CALLBACKS = { + TOPK_RESERVE: bool_ok, + TOPK_ADD: parse_to_list, + TOPK_INCRBY: parse_to_list, + # TOPK_QUERY: spaceHolder, + # TOPK_COUNT: spaceHolder, + TOPK_LIST: parse_to_list, + TOPK_INFO: TopKInfo, + } + + self.client = client + self.commandmixin = TOPKCommands + self.execute_command = client.execute_command + + for k, v in MODULE_CALLBACKS.items(): + self.client.set_response_callback(k, v) + + +class CFBloom(CFCommands, AbstractBloom): + def __init__(self, client, **kwargs): + """Create a new RedisBloom client.""" + # Set the module commands' callbacks + MODULE_CALLBACKS = { + CF_RESERVE: bool_ok, + # CF_ADD: spaceHolder, + # CF_ADDNX: spaceHolder, + # CF_INSERT: spaceHolder, + # CF_INSERTNX: spaceHolder, + # CF_EXISTS: spaceHolder, + # CF_DEL: spaceHolder, + # CF_COUNT: spaceHolder, + # CF_SCANDUMP: spaceHolder, + # CF_LOADCHUNK: spaceHolder, + CF_INFO: CFInfo, + } + + self.client = client + self.commandmixin = CFCommands + self.execute_command = client.execute_command + + for k, v in MODULE_CALLBACKS.items(): + self.client.set_response_callback(k, v) + + +class TDigestBloom(TDigestCommands, AbstractBloom): + def __init__(self, client, **kwargs): + """Create a new RedisBloom client.""" + # Set the module commands' callbacks + MODULE_CALLBACKS = { + TDIGEST_CREATE: bool_ok, + # TDIGEST_RESET: bool_ok, + # TDIGEST_ADD: spaceHolder, + # TDIGEST_MERGE: spaceHolder, + TDIGEST_CDF: float, + TDIGEST_QUANTILE: float, + TDIGEST_MIN: float, + TDIGEST_MAX: float, + TDIGEST_INFO: TDigestInfo, + } + + self.client = client + self.commandmixin = TDigestCommands + self.execute_command = client.execute_command + + for k, v in MODULE_CALLBACKS.items(): + self.client.set_response_callback(k, v) + + +class BFBloom(BFCommands, AbstractBloom): + def __init__(self, client, **kwargs): + """Create a new RedisBloom client.""" + # Set the module commands' callbacks + MODULE_CALLBACKS = { + BF_RESERVE: bool_ok, + # BF_ADD: spaceHolder, + # BF_MADD: spaceHolder, + # BF_INSERT: spaceHolder, + # BF_EXISTS: spaceHolder, + # BF_MEXISTS: spaceHolder, + # BF_SCANDUMP: spaceHolder, + # BF_LOADCHUNK: spaceHolder, + BF_INFO: BFInfo, + } + + self.client = client + self.commandmixin = BFCommands + self.execute_command = client.execute_command + + for k, v in MODULE_CALLBACKS.items(): + self.client.set_response_callback(k, v) diff --git a/redis/commands/bf/commands.py b/redis/commands/bf/commands.py new file mode 100644 index 0000000..3c8bf7f --- /dev/null +++ b/redis/commands/bf/commands.py @@ -0,0 +1,494 @@ +from redis.client import NEVER_DECODE +from redis.exceptions import ModuleError +from redis.utils import HIREDIS_AVAILABLE + +BF_RESERVE = "BF.RESERVE" +BF_ADD = "BF.ADD" +BF_MADD = "BF.MADD" +BF_INSERT = "BF.INSERT" +BF_EXISTS = "BF.EXISTS" +BF_MEXISTS = "BF.MEXISTS" +BF_SCANDUMP = "BF.SCANDUMP" +BF_LOADCHUNK = "BF.LOADCHUNK" +BF_INFO = "BF.INFO" + +CF_RESERVE = "CF.RESERVE" +CF_ADD = "CF.ADD" +CF_ADDNX = "CF.ADDNX" +CF_INSERT = "CF.INSERT" +CF_INSERTNX = "CF.INSERTNX" +CF_EXISTS = "CF.EXISTS" +CF_DEL = "CF.DEL" +CF_COUNT = "CF.COUNT" +CF_SCANDUMP = "CF.SCANDUMP" +CF_LOADCHUNK = "CF.LOADCHUNK" +CF_INFO = "CF.INFO" + +CMS_INITBYDIM = "CMS.INITBYDIM" +CMS_INITBYPROB = "CMS.INITBYPROB" +CMS_INCRBY = "CMS.INCRBY" +CMS_QUERY = "CMS.QUERY" +CMS_MERGE = "CMS.MERGE" +CMS_INFO = "CMS.INFO" + +TOPK_RESERVE = "TOPK.RESERVE" +TOPK_ADD = "TOPK.ADD" +TOPK_INCRBY = "TOPK.INCRBY" +TOPK_QUERY = "TOPK.QUERY" +TOPK_COUNT = "TOPK.COUNT" +TOPK_LIST = "TOPK.LIST" +TOPK_INFO = "TOPK.INFO" + +TDIGEST_CREATE = "TDIGEST.CREATE" +TDIGEST_RESET = "TDIGEST.RESET" +TDIGEST_ADD = "TDIGEST.ADD" +TDIGEST_MERGE = "TDIGEST.MERGE" +TDIGEST_CDF = "TDIGEST.CDF" +TDIGEST_QUANTILE = "TDIGEST.QUANTILE" +TDIGEST_MIN = "TDIGEST.MIN" +TDIGEST_MAX = "TDIGEST.MAX" +TDIGEST_INFO = "TDIGEST.INFO" + + +class BFCommands: + """RedisBloom commands.""" + + # region Bloom Filter Functions + def create(self, key, errorRate, capacity, expansion=None, noScale=None): + """ + Create a new Bloom Filter `key` with desired probability of false positives + `errorRate` expected entries to be inserted as `capacity`. + Default expansion value is 2. By default, filter is auto-scaling. + For more information see `BF.RESERVE <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfreserve>`_. + """ # noqa + params = [key, errorRate, capacity] + self.appendExpansion(params, expansion) + self.appendNoScale(params, noScale) + return self.execute_command(BF_RESERVE, *params) + + def add(self, key, item): + """ + Add to a Bloom Filter `key` an `item`. + For more information see `BF.ADD <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfadd>`_. + """ # noqa + params = [key, item] + return self.execute_command(BF_ADD, *params) + + def madd(self, key, *items): + """ + Add to a Bloom Filter `key` multiple `items`. + For more information see `BF.MADD <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfmadd>`_. + """ # noqa + params = [key] + params += items + return self.execute_command(BF_MADD, *params) + + def insert( + self, + key, + items, + capacity=None, + error=None, + noCreate=None, + expansion=None, + noScale=None, + ): + """ + Add to a Bloom Filter `key` multiple `items`. + + If `nocreate` remain `None` and `key` does not exist, a new Bloom Filter + `key` will be created with desired probability of false positives `errorRate` + and expected entries to be inserted as `size`. + For more information see `BF.INSERT <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfinsert>`_. + """ # noqa + params = [key] + self.appendCapacity(params, capacity) + self.appendError(params, error) + self.appendExpansion(params, expansion) + self.appendNoCreate(params, noCreate) + self.appendNoScale(params, noScale) + self.appendItems(params, items) + + return self.execute_command(BF_INSERT, *params) + + def exists(self, key, item): + """ + Check whether an `item` exists in Bloom Filter `key`. + For more information see `BF.EXISTS <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfexists>`_. + """ # noqa + params = [key, item] + return self.execute_command(BF_EXISTS, *params) + + def mexists(self, key, *items): + """ + Check whether `items` exist in Bloom Filter `key`. + For more information see `BF.MEXISTS <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfmexists>`_. + """ # noqa + params = [key] + params += items + return self.execute_command(BF_MEXISTS, *params) + + def scandump(self, key, iter): + """ + Begin an incremental save of the bloom filter `key`. + + This is useful for large bloom filters which cannot fit into the normal SAVE and RESTORE model. + The first time this command is called, the value of `iter` should be 0. + This command will return successive (iter, data) pairs until (0, NULL) to indicate completion. + For more information see `BF.SCANDUMP <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfscandump>`_. + """ # noqa + if HIREDIS_AVAILABLE: + raise ModuleError("This command cannot be used when hiredis is available.") + + params = [key, iter] + options = {} + options[NEVER_DECODE] = [] + return self.execute_command(BF_SCANDUMP, *params, **options) + + def loadchunk(self, key, iter, data): + """ + Restore a filter previously saved using SCANDUMP. + + See the SCANDUMP command for example usage. + This command will overwrite any bloom filter stored under key. + Ensure that the bloom filter will not be modified between invocations. + For more information see `BF.LOADCHUNK <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfloadchunk>`_. + """ # noqa + params = [key, iter, data] + return self.execute_command(BF_LOADCHUNK, *params) + + def info(self, key): + """ + Return capacity, size, number of filters, number of items inserted, and expansion rate. + For more information see `BF.INFO <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfinfo>`_. + """ # noqa + return self.execute_command(BF_INFO, key) + + +class CFCommands: + + # region Cuckoo Filter Functions + def create( + self, key, capacity, expansion=None, bucket_size=None, max_iterations=None + ): + """ + Create a new Cuckoo Filter `key` an initial `capacity` items. + For more information see `CF.RESERVE <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfreserve>`_. + """ # noqa + params = [key, capacity] + self.appendExpansion(params, expansion) + self.appendBucketSize(params, bucket_size) + self.appendMaxIterations(params, max_iterations) + return self.execute_command(CF_RESERVE, *params) + + def add(self, key, item): + """ + Add an `item` to a Cuckoo Filter `key`. + For more information see `CF.ADD <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfadd>`_. + """ # noqa + params = [key, item] + return self.execute_command(CF_ADD, *params) + + def addnx(self, key, item): + """ + Add an `item` to a Cuckoo Filter `key` only if item does not yet exist. + Command might be slower that `add`. + For more information see `CF.ADDNX <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfaddnx>`_. + """ # noqa + params = [key, item] + return self.execute_command(CF_ADDNX, *params) + + def insert(self, key, items, capacity=None, nocreate=None): + """ + Add multiple `items` to a Cuckoo Filter `key`, allowing the filter + to be created with a custom `capacity` if it does not yet exist. + `items` must be provided as a list. + For more information see `CF.INSERT <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfinsert>`_. + """ # noqa + params = [key] + self.appendCapacity(params, capacity) + self.appendNoCreate(params, nocreate) + self.appendItems(params, items) + return self.execute_command(CF_INSERT, *params) + + def insertnx(self, key, items, capacity=None, nocreate=None): + """ + Add multiple `items` to a Cuckoo Filter `key` only if they do not exist yet, + allowing the filter to be created with a custom `capacity` if it does not yet exist. + `items` must be provided as a list. + For more information see `CF.INSERTNX <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfinsertnx>`_. + """ # noqa + params = [key] + self.appendCapacity(params, capacity) + self.appendNoCreate(params, nocreate) + self.appendItems(params, items) + return self.execute_command(CF_INSERTNX, *params) + + def exists(self, key, item): + """ + Check whether an `item` exists in Cuckoo Filter `key`. + For more information see `CF.EXISTS <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfexists>`_. + """ # noqa + params = [key, item] + return self.execute_command(CF_EXISTS, *params) + + def delete(self, key, item): + """ + Delete `item` from `key`. + For more information see `CF.DEL <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfdel>`_. + """ # noqa + params = [key, item] + return self.execute_command(CF_DEL, *params) + + def count(self, key, item): + """ + Return the number of times an `item` may be in the `key`. + For more information see `CF.COUNT <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfcount>`_. + """ # noqa + params = [key, item] + return self.execute_command(CF_COUNT, *params) + + def scandump(self, key, iter): + """ + Begin an incremental save of the Cuckoo filter `key`. + + This is useful for large Cuckoo filters which cannot fit into the normal + SAVE and RESTORE model. + The first time this command is called, the value of `iter` should be 0. + This command will return successive (iter, data) pairs until + (0, NULL) to indicate completion. + For more information see `CF.SCANDUMP <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfscandump>`_. + """ # noqa + params = [key, iter] + return self.execute_command(CF_SCANDUMP, *params) + + def loadchunk(self, key, iter, data): + """ + Restore a filter previously saved using SCANDUMP. See the SCANDUMP command for example usage. + + This command will overwrite any Cuckoo filter stored under key. + Ensure that the Cuckoo filter will not be modified between invocations. + For more information see `CF.LOADCHUNK <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfloadchunk>`_. + """ # noqa + params = [key, iter, data] + return self.execute_command(CF_LOADCHUNK, *params) + + def info(self, key): + """ + Return size, number of buckets, number of filter, number of items inserted, + number of items deleted, bucket size, expansion rate, and max iteration. + For more information see `CF.INFO <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfinfo>`_. + """ # noqa + return self.execute_command(CF_INFO, key) + + +class TOPKCommands: + def reserve(self, key, k, width, depth, decay): + """ + Create a new Top-K Filter `key` with desired probability of false + positives `errorRate` expected entries to be inserted as `size`. + For more information see `TOPK.RESERVE <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkreserve>`_. + """ # noqa + params = [key, k, width, depth, decay] + return self.execute_command(TOPK_RESERVE, *params) + + def add(self, key, *items): + """ + Add one `item` or more to a Top-K Filter `key`. + For more information see `TOPK.ADD <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkadd>`_. + """ # noqa + params = [key] + params += items + return self.execute_command(TOPK_ADD, *params) + + def incrby(self, key, items, increments): + """ + Add/increase `items` to a Top-K Sketch `key` by ''increments''. + Both `items` and `increments` are lists. + For more information see `TOPK.INCRBY <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkincrby>`_. + + Example: + + >>> topkincrby('A', ['foo'], [1]) + """ # noqa + params = [key] + self.appendItemsAndIncrements(params, items, increments) + return self.execute_command(TOPK_INCRBY, *params) + + def query(self, key, *items): + """ + Check whether one `item` or more is a Top-K item at `key`. + For more information see `TOPK.QUERY <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkquery>`_. + """ # noqa + params = [key] + params += items + return self.execute_command(TOPK_QUERY, *params) + + def count(self, key, *items): + """ + Return count for one `item` or more from `key`. + For more information see `TOPK.COUNT <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkcount>`_. + """ # noqa + params = [key] + params += items + return self.execute_command(TOPK_COUNT, *params) + + def list(self, key, withcount=False): + """ + Return full list of items in Top-K list of `key`. + If `withcount` set to True, return full list of items + with probabilistic count in Top-K list of `key`. + For more information see `TOPK.LIST <https://oss.redis.com/redisbloom/master/TopK_Commands/#topklist>`_. + """ # noqa + params = [key] + if withcount: + params.append("WITHCOUNT") + return self.execute_command(TOPK_LIST, *params) + + def info(self, key): + """ + Return k, width, depth and decay values of `key`. + For more information see `TOPK.INFO <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkinfo>`_. + """ # noqa + return self.execute_command(TOPK_INFO, key) + + +class TDigestCommands: + def create(self, key, compression): + """ + Allocate the memory and initialize the t-digest. + For more information see `TDIGEST.CREATE <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestcreate>`_. + """ # noqa + params = [key, compression] + return self.execute_command(TDIGEST_CREATE, *params) + + def reset(self, key): + """ + Reset the sketch `key` to zero - empty out the sketch and re-initialize it. + For more information see `TDIGEST.RESET <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestreset>`_. + """ # noqa + return self.execute_command(TDIGEST_RESET, key) + + def add(self, key, values, weights): + """ + Add one or more samples (value with weight) to a sketch `key`. + Both `values` and `weights` are lists. + For more information see `TDIGEST.ADD <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestadd>`_. + + Example: + + >>> tdigestadd('A', [1500.0], [1.0]) + """ # noqa + params = [key] + self.appendValuesAndWeights(params, values, weights) + return self.execute_command(TDIGEST_ADD, *params) + + def merge(self, toKey, fromKey): + """ + Merge all of the values from 'fromKey' to 'toKey' sketch. + For more information see `TDIGEST.MERGE <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestmerge>`_. + """ # noqa + params = [toKey, fromKey] + return self.execute_command(TDIGEST_MERGE, *params) + + def min(self, key): + """ + Return minimum value from the sketch `key`. Will return DBL_MAX if the sketch is empty. + For more information see `TDIGEST.MIN <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestmin>`_. + """ # noqa + return self.execute_command(TDIGEST_MIN, key) + + def max(self, key): + """ + Return maximum value from the sketch `key`. Will return DBL_MIN if the sketch is empty. + For more information see `TDIGEST.MAX <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestmax>`_. + """ # noqa + return self.execute_command(TDIGEST_MAX, key) + + def quantile(self, key, quantile): + """ + Return double value estimate of the cutoff such that a specified fraction of the data + added to this TDigest would be less than or equal to the cutoff. + For more information see `TDIGEST.QUANTILE <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestquantile>`_. + """ # noqa + params = [key, quantile] + return self.execute_command(TDIGEST_QUANTILE, *params) + + def cdf(self, key, value): + """ + Return double fraction of all points added which are <= value. + For more information see `TDIGEST.CDF <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestcdf>`_. + """ # noqa + params = [key, value] + return self.execute_command(TDIGEST_CDF, *params) + + def info(self, key): + """ + Return Compression, Capacity, Merged Nodes, Unmerged Nodes, Merged Weight, Unmerged Weight + and Total Compressions. + For more information see `TDIGEST.INFO <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestinfo>`_. + """ # noqa + return self.execute_command(TDIGEST_INFO, key) + + +class CMSCommands: + + # region Count-Min Sketch Functions + def initbydim(self, key, width, depth): + """ + Initialize a Count-Min Sketch `key` to dimensions (`width`, `depth`) specified by user. + For more information see `CMS.INITBYDIM <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsinitbydim>`_. + """ # noqa + params = [key, width, depth] + return self.execute_command(CMS_INITBYDIM, *params) + + def initbyprob(self, key, error, probability): + """ + Initialize a Count-Min Sketch `key` to characteristics (`error`, `probability`) specified by user. + For more information see `CMS.INITBYPROB <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsinitbyprob>`_. + """ # noqa + params = [key, error, probability] + return self.execute_command(CMS_INITBYPROB, *params) + + def incrby(self, key, items, increments): + """ + Add/increase `items` to a Count-Min Sketch `key` by ''increments''. + Both `items` and `increments` are lists. + For more information see `CMS.INCRBY <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsincrby>`_. + + Example: + + >>> cmsincrby('A', ['foo'], [1]) + """ # noqa + params = [key] + self.appendItemsAndIncrements(params, items, increments) + return self.execute_command(CMS_INCRBY, *params) + + def query(self, key, *items): + """ + Return count for an `item` from `key`. Multiple items can be queried with one call. + For more information see `CMS.QUERY <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsquery>`_. + """ # noqa + params = [key] + params += items + return self.execute_command(CMS_QUERY, *params) + + def merge(self, destKey, numKeys, srcKeys, weights=[]): + """ + Merge `numKeys` of sketches into `destKey`. Sketches specified in `srcKeys`. + All sketches must have identical width and depth. + `Weights` can be used to multiply certain sketches. Default weight is 1. + Both `srcKeys` and `weights` are lists. + For more information see `CMS.MERGE <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsmerge>`_. + """ # noqa + params = [destKey, numKeys] + params += srcKeys + self.appendWeights(params, weights) + return self.execute_command(CMS_MERGE, *params) + + def info(self, key): + """ + Return width, depth and total count of the sketch. + For more information see `CMS.INFO <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsinfo>`_. + """ # noqa + return self.execute_command(CMS_INFO, key) diff --git a/redis/commands/bf/info.py b/redis/commands/bf/info.py new file mode 100644 index 0000000..24c5419 --- /dev/null +++ b/redis/commands/bf/info.py @@ -0,0 +1,85 @@ +from ..helpers import nativestr + + +class BFInfo(object): + capacity = None + size = None + filterNum = None + insertedNum = None + expansionRate = None + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.capacity = response["Capacity"] + self.size = response["Size"] + self.filterNum = response["Number of filters"] + self.insertedNum = response["Number of items inserted"] + self.expansionRate = response["Expansion rate"] + + +class CFInfo(object): + size = None + bucketNum = None + filterNum = None + insertedNum = None + deletedNum = None + bucketSize = None + expansionRate = None + maxIteration = None + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.size = response["Size"] + self.bucketNum = response["Number of buckets"] + self.filterNum = response["Number of filters"] + self.insertedNum = response["Number of items inserted"] + self.deletedNum = response["Number of items deleted"] + self.bucketSize = response["Bucket size"] + self.expansionRate = response["Expansion rate"] + self.maxIteration = response["Max iterations"] + + +class CMSInfo(object): + width = None + depth = None + count = None + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.width = response["width"] + self.depth = response["depth"] + self.count = response["count"] + + +class TopKInfo(object): + k = None + width = None + depth = None + decay = None + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.k = response["k"] + self.width = response["width"] + self.depth = response["depth"] + self.decay = response["decay"] + + +class TDigestInfo(object): + compression = None + capacity = None + mergedNodes = None + unmergedNodes = None + mergedWeight = None + unmergedWeight = None + totalCompressions = None + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.compression = response["Compression"] + self.capacity = response["Capacity"] + self.mergedNodes = response["Merged nodes"] + self.unmergedNodes = response["Unmerged nodes"] + self.mergedWeight = response["Merged weight"] + self.unmergedWeight = response["Unmerged weight"] + self.totalCompressions = response["Total compressions"] diff --git a/redis/commands/redismodules.py b/redis/commands/redismodules.py index e5ace63..eafd650 100644 --- a/redis/commands/redismodules.py +++ b/redis/commands/redismodules.py @@ -32,6 +32,46 @@ class RedisModuleCommands: s = TimeSeries(client=self) return s + def bf(self): + """Access the bloom namespace.""" + + from .bf import BFBloom + + bf = BFBloom(client=self) + return bf + + def cf(self): + """Access the bloom namespace.""" + + from .bf import CFBloom + + cf = CFBloom(client=self) + return cf + + def cms(self): + """Access the bloom namespace.""" + + from .bf import CMSBloom + + cms = CMSBloom(client=self) + return cms + + def topk(self): + """Access the bloom namespace.""" + + from .bf import TOPKBloom + + topk = TOPKBloom(client=self) + return topk + + def tdigest(self): + """Access the bloom namespace.""" + + from .bf import TDigestBloom + + tdigest = TDigestBloom(client=self) + return tdigest + def graph(self, index_name="idx"): """Access the timeseries namespace, providing support for redis timeseries data. @@ -15,6 +15,7 @@ setup( include=[ "redis", "redis.commands", + "redis.commands.bf", "redis.commands.json", "redis.commands.search", "redis.commands.timeseries", diff --git a/tests/test_bloom.py b/tests/test_bloom.py new file mode 100644 index 0000000..8936584 --- /dev/null +++ b/tests/test_bloom.py @@ -0,0 +1,383 @@ +import pytest + +import redis.commands.bf +from redis.exceptions import ModuleError, RedisError +from redis.utils import HIREDIS_AVAILABLE + + +def intlist(obj): + return [int(v) for v in obj] + + +@pytest.fixture +def client(modclient): + assert isinstance(modclient.bf(), redis.commands.bf.BFBloom) + assert isinstance(modclient.cf(), redis.commands.bf.CFBloom) + assert isinstance(modclient.cms(), redis.commands.bf.CMSBloom) + assert isinstance(modclient.tdigest(), redis.commands.bf.TDigestBloom) + assert isinstance(modclient.topk(), redis.commands.bf.TOPKBloom) + + modclient.flushdb() + return modclient + + +@pytest.mark.redismod +def test_create(client): + """Test CREATE/RESERVE calls""" + assert client.bf().create("bloom", 0.01, 1000) + assert client.bf().create("bloom_e", 0.01, 1000, expansion=1) + assert client.bf().create("bloom_ns", 0.01, 1000, noScale=True) + assert client.cf().create("cuckoo", 1000) + assert client.cf().create("cuckoo_e", 1000, expansion=1) + assert client.cf().create("cuckoo_bs", 1000, bucket_size=4) + assert client.cf().create("cuckoo_mi", 1000, max_iterations=10) + assert client.cms().initbydim("cmsDim", 100, 5) + assert client.cms().initbyprob("cmsProb", 0.01, 0.01) + assert client.topk().reserve("topk", 5, 100, 5, 0.9) + assert client.tdigest().create("tDigest", 100) + + +# region Test Bloom Filter +@pytest.mark.redismod +def test_bf_add(client): + assert client.bf().create("bloom", 0.01, 1000) + assert 1 == client.bf().add("bloom", "foo") + assert 0 == client.bf().add("bloom", "foo") + assert [0] == intlist(client.bf().madd("bloom", "foo")) + assert [0, 1] == client.bf().madd("bloom", "foo", "bar") + assert [0, 0, 1] == client.bf().madd("bloom", "foo", "bar", "baz") + assert 1 == client.bf().exists("bloom", "foo") + assert 0 == client.bf().exists("bloom", "noexist") + assert [1, 0] == intlist(client.bf().mexists("bloom", "foo", "noexist")) + + +@pytest.mark.redismod +def test_bf_insert(client): + assert client.bf().create("bloom", 0.01, 1000) + assert [1] == intlist(client.bf().insert("bloom", ["foo"])) + assert [0, 1] == intlist(client.bf().insert("bloom", ["foo", "bar"])) + assert [1] == intlist(client.bf().insert("captest", ["foo"], capacity=10)) + assert [1] == intlist(client.bf().insert("errtest", ["foo"], error=0.01)) + assert 1 == client.bf().exists("bloom", "foo") + assert 0 == client.bf().exists("bloom", "noexist") + assert [1, 0] == intlist(client.bf().mexists("bloom", "foo", "noexist")) + info = client.bf().info("bloom") + assert 2 == info.insertedNum + assert 1000 == info.capacity + assert 1 == info.filterNum + + +@pytest.mark.redismod +def test_bf_scandump_and_loadchunk(client): + # Store a filter + client.bf().create("myBloom", "0.0001", "1000") + + # test is probabilistic and might fail. It is OK to change variables if + # certain to not break anything + def do_verify(): + res = 0 + for x in range(1000): + client.bf().add("myBloom", x) + rv = client.bf().exists("myBloom", x) + assert rv + rv = client.bf().exists("myBloom", f"nonexist_{x}") + res += rv == x + assert res < 5 + + do_verify() + cmds = [] + if HIREDIS_AVAILABLE: + with pytest.raises(ModuleError): + cur = client.bf().scandump("myBloom", 0) + return + + cur = client.bf().scandump("myBloom", 0) + first = cur[0] + cmds.append(cur) + + while True: + cur = client.bf().scandump("myBloom", first) + first = cur[0] + if first == 0: + break + else: + cmds.append(cur) + prev_info = client.bf().execute_command("bf.debug", "myBloom") + + # Remove the filter + client.bf().client.delete("myBloom") + + # Now, load all the commands: + for cmd in cmds: + client.bf().loadchunk("myBloom", *cmd) + + cur_info = client.bf().execute_command("bf.debug", "myBloom") + assert prev_info == cur_info + do_verify() + + client.bf().client.delete("myBloom") + client.bf().create("myBloom", "0.0001", "10000000") + + +@pytest.mark.redismod +def test_bf_info(client): + expansion = 4 + # Store a filter + client.bf().create("nonscaling", "0.0001", "1000", noScale=True) + info = client.bf().info("nonscaling") + assert info.expansionRate is None + + client.bf().create("expanding", "0.0001", "1000", expansion=expansion) + info = client.bf().info("expanding") + assert info.expansionRate == 4 + + try: + # noScale mean no expansion + client.bf().create( + "myBloom", "0.0001", "1000", expansion=expansion, noScale=True + ) + assert False + except RedisError: + assert True + + +# region Test Cuckoo Filter +@pytest.mark.redismod +def test_cf_add_and_insert(client): + assert client.cf().create("cuckoo", 1000) + assert client.cf().add("cuckoo", "filter") + assert not client.cf().addnx("cuckoo", "filter") + assert 1 == client.cf().addnx("cuckoo", "newItem") + assert [1] == client.cf().insert("captest", ["foo"]) + assert [1] == client.cf().insert("captest", ["foo"], capacity=1000) + assert [1] == client.cf().insertnx("captest", ["bar"]) + assert [1] == client.cf().insertnx("captest", ["food"], nocreate="1") + assert [0, 0, 1] == client.cf().insertnx("captest", ["foo", "bar", "baz"]) + assert [0] == client.cf().insertnx("captest", ["bar"], capacity=1000) + assert [1] == client.cf().insert("empty1", ["foo"], capacity=1000) + assert [1] == client.cf().insertnx("empty2", ["bar"], capacity=1000) + info = client.cf().info("captest") + assert 5 == info.insertedNum + assert 0 == info.deletedNum + assert 1 == info.filterNum + + +@pytest.mark.redismod +def test_cf_exists_and_del(client): + assert client.cf().create("cuckoo", 1000) + assert client.cf().add("cuckoo", "filter") + assert client.cf().exists("cuckoo", "filter") + assert not client.cf().exists("cuckoo", "notexist") + assert 1 == client.cf().count("cuckoo", "filter") + assert 0 == client.cf().count("cuckoo", "notexist") + assert client.cf().delete("cuckoo", "filter") + assert 0 == client.cf().count("cuckoo", "filter") + + +# region Test Count-Min Sketch +@pytest.mark.redismod +def test_cms(client): + assert client.cms().initbydim("dim", 1000, 5) + assert client.cms().initbyprob("prob", 0.01, 0.01) + assert client.cms().incrby("dim", ["foo"], [5]) + assert [0] == client.cms().query("dim", "notexist") + assert [5] == client.cms().query("dim", "foo") + assert [10, 15] == client.cms().incrby("dim", ["foo", "bar"], [5, 15]) + assert [10, 15] == client.cms().query("dim", "foo", "bar") + info = client.cms().info("dim") + assert 1000 == info.width + assert 5 == info.depth + assert 25 == info.count + + +@pytest.mark.redismod +def test_cms_merge(client): + assert client.cms().initbydim("A", 1000, 5) + assert client.cms().initbydim("B", 1000, 5) + assert client.cms().initbydim("C", 1000, 5) + assert client.cms().incrby("A", ["foo", "bar", "baz"], [5, 3, 9]) + assert client.cms().incrby("B", ["foo", "bar", "baz"], [2, 3, 1]) + assert [5, 3, 9] == client.cms().query("A", "foo", "bar", "baz") + assert [2, 3, 1] == client.cms().query("B", "foo", "bar", "baz") + assert client.cms().merge("C", 2, ["A", "B"]) + assert [7, 6, 10] == client.cms().query("C", "foo", "bar", "baz") + assert client.cms().merge("C", 2, ["A", "B"], ["1", "2"]) + assert [9, 9, 11] == client.cms().query("C", "foo", "bar", "baz") + assert client.cms().merge("C", 2, ["A", "B"], ["2", "3"]) + assert [16, 15, 21] == client.cms().query("C", "foo", "bar", "baz") + + +# endregion + + +# region Test Top-K +@pytest.mark.redismod +def test_topk(client): + # test list with empty buckets + assert client.topk().reserve("topk", 3, 50, 4, 0.9) + assert [ + None, + None, + None, + "A", + "C", + "D", + None, + None, + "E", + None, + "B", + "C", + None, + None, + None, + "D", + None, + ] == client.topk().add( + "topk", + "A", + "B", + "C", + "D", + "E", + "A", + "A", + "B", + "C", + "G", + "D", + "B", + "D", + "A", + "E", + "E", + 1, + ) + assert [1, 1, 0, 0, 1, 0, 0] == client.topk().query( + "topk", "A", "B", "C", "D", "E", "F", "G" + ) + assert [4, 3, 2, 3, 3, 0, 1] == client.topk().count( + "topk", "A", "B", "C", "D", "E", "F", "G" + ) + + # test full list + assert client.topk().reserve("topklist", 3, 50, 3, 0.9) + assert client.topk().add( + "topklist", + "A", + "B", + "C", + "D", + "E", + "A", + "A", + "B", + "C", + "G", + "D", + "B", + "D", + "A", + "E", + "E", + ) + assert ["A", "B", "E"] == client.topk().list("topklist") + assert ["A", 4, "B", 3, "E", 3] == client.topk().list("topklist", withcount=True) + info = client.topk().info("topklist") + assert 3 == info.k + assert 50 == info.width + assert 3 == info.depth + assert 0.9 == round(float(info.decay), 1) + + +@pytest.mark.redismod +def test_topk_incrby(client): + client.flushdb() + assert client.topk().reserve("topk", 3, 10, 3, 1) + assert [None, None, None] == client.topk().incrby( + "topk", ["bar", "baz", "42"], [3, 6, 2] + ) + assert [None, "bar"] == client.topk().incrby("topk", ["42", "xyzzy"], [8, 4]) + assert [3, 6, 10, 4, 0] == client.topk().count( + "topk", "bar", "baz", "42", "xyzzy", 4 + ) + + +# region Test T-Digest +@pytest.mark.redismod +def test_tdigest_reset(client): + assert client.tdigest().create("tDigest", 10) + # reset on empty histogram + assert client.tdigest().reset("tDigest") + # insert data-points into sketch + assert client.tdigest().add("tDigest", list(range(10)), [1.0] * 10) + + assert client.tdigest().reset("tDigest") + # assert we have 0 unmerged nodes + assert 0 == client.tdigest().info("tDigest").unmergedNodes + + +@pytest.mark.redismod +def test_tdigest_merge(client): + assert client.tdigest().create("to-tDigest", 10) + assert client.tdigest().create("from-tDigest", 10) + # insert data-points into sketch + assert client.tdigest().add("from-tDigest", [1.0] * 10, [1.0] * 10) + assert client.tdigest().add("to-tDigest", [2.0] * 10, [10.0] * 10) + # merge from-tdigest into to-tdigest + assert client.tdigest().merge("to-tDigest", "from-tDigest") + # we should now have 110 weight on to-histogram + info = client.tdigest().info("to-tDigest") + total_weight_to = float(info.mergedWeight) + float(info.unmergedWeight) + assert 110 == total_weight_to + + +@pytest.mark.redismod +def test_tdigest_min_and_max(client): + assert client.tdigest().create("tDigest", 100) + # insert data-points into sketch + assert client.tdigest().add("tDigest", [1, 2, 3], [1.0] * 3) + # min/max + assert 3 == client.tdigest().max("tDigest") + assert 1 == client.tdigest().min("tDigest") + + +@pytest.mark.redismod +def test_tdigest_quantile(client): + assert client.tdigest().create("tDigest", 500) + # insert data-points into sketch + assert client.tdigest().add( + "tDigest", list([x * 0.01 for x in range(1, 10000)]), [1.0] * 10000 + ) + # assert min min/max have same result as quantile 0 and 1 + assert client.tdigest().max("tDigest") == client.tdigest().quantile("tDigest", 1.0) + assert client.tdigest().min("tDigest") == client.tdigest().quantile("tDigest", 0.0) + + assert 1.0 == round(client.tdigest().quantile("tDigest", 0.01), 2) + assert 99.0 == round(client.tdigest().quantile("tDigest", 0.99), 2) + + +@pytest.mark.redismod +def test_tdigest_cdf(client): + assert client.tdigest().create("tDigest", 100) + # insert data-points into sketch + assert client.tdigest().add("tDigest", list(range(1, 10)), [1.0] * 10) + assert 0.1 == round(client.tdigest().cdf("tDigest", 1.0), 1) + assert 0.9 == round(client.tdigest().cdf("tDigest", 9.0), 1) + + +# @pytest.mark.redismod +# def test_pipeline(client): +# pipeline = client.bf().pipeline() +# assert not client.bf().execute_command("get pipeline") +# +# assert client.bf().create("pipeline", 0.01, 1000) +# for i in range(100): +# pipeline.add("pipeline", i) +# for i in range(100): +# assert not (client.bf().exists("pipeline", i)) +# +# pipeline.execute() +# +# for i in range(100): +# assert client.bf().exists("pipeline", i) @@ -170,6 +170,7 @@ exclude = venv*, whitelist.py ignore = + F405 W503 E203 E126 |