summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAvital Fine <79420960+AvitalFineRedis@users.noreply.github.com>2021-12-01 19:03:18 +0100
committerGitHub <noreply@github.com>2021-12-01 20:03:18 +0200
commit42101fc383829bb179a266420132d3f862861972 (patch)
treea13d70d174de9f68282d8a7f38cd3b4a26d464b4
parentd4252277a9dafed5af34b3f40ed7a57fc952d273 (diff)
downloadredis-py-42101fc383829bb179a266420132d3f862861972.tar.gz
Integrate RedisBloom support (#1683)
Co-authored-by: Chayim I. Kirshen <c@kirshen.com>
-rw-r--r--redis/commands/bf/__init__.py204
-rw-r--r--redis/commands/bf/commands.py494
-rw-r--r--redis/commands/bf/info.py85
-rw-r--r--redis/commands/redismodules.py40
-rw-r--r--setup.py1
-rw-r--r--tests/test_bloom.py383
-rw-r--r--tox.ini1
7 files changed, 1208 insertions, 0 deletions
diff --git a/redis/commands/bf/__init__.py b/redis/commands/bf/__init__.py
new file mode 100644
index 0000000..f34e11d
--- /dev/null
+++ b/redis/commands/bf/__init__.py
@@ -0,0 +1,204 @@
+from redis.client import bool_ok
+
+from ..helpers import parse_to_list
+from .commands import * # noqa
+from .info import BFInfo, CFInfo, CMSInfo, TDigestInfo, TopKInfo
+
+
+class AbstractBloom(object):
+ """
+ The client allows to interact with RedisBloom and use all of
+ it's functionality.
+
+ - BF for Bloom Filter
+ - CF for Cuckoo Filter
+ - CMS for Count-Min Sketch
+ - TOPK for TopK Data Structure
+ - TDIGEST for estimate rank statistics
+ """
+
+ @staticmethod
+ def appendItems(params, items):
+ """Append ITEMS to params."""
+ params.extend(["ITEMS"])
+ params += items
+
+ @staticmethod
+ def appendError(params, error):
+ """Append ERROR to params."""
+ if error is not None:
+ params.extend(["ERROR", error])
+
+ @staticmethod
+ def appendCapacity(params, capacity):
+ """Append CAPACITY to params."""
+ if capacity is not None:
+ params.extend(["CAPACITY", capacity])
+
+ @staticmethod
+ def appendExpansion(params, expansion):
+ """Append EXPANSION to params."""
+ if expansion is not None:
+ params.extend(["EXPANSION", expansion])
+
+ @staticmethod
+ def appendNoScale(params, noScale):
+ """Append NONSCALING tag to params."""
+ if noScale is not None:
+ params.extend(["NONSCALING"])
+
+ @staticmethod
+ def appendWeights(params, weights):
+ """Append WEIGHTS to params."""
+ if len(weights) > 0:
+ params.append("WEIGHTS")
+ params += weights
+
+ @staticmethod
+ def appendNoCreate(params, noCreate):
+ """Append NOCREATE tag to params."""
+ if noCreate is not None:
+ params.extend(["NOCREATE"])
+
+ @staticmethod
+ def appendItemsAndIncrements(params, items, increments):
+ """Append pairs of items and increments to params."""
+ for i in range(len(items)):
+ params.append(items[i])
+ params.append(increments[i])
+
+ @staticmethod
+ def appendValuesAndWeights(params, items, weights):
+ """Append pairs of items and weights to params."""
+ for i in range(len(items)):
+ params.append(items[i])
+ params.append(weights[i])
+
+ @staticmethod
+ def appendMaxIterations(params, max_iterations):
+ """Append MAXITERATIONS to params."""
+ if max_iterations is not None:
+ params.extend(["MAXITERATIONS", max_iterations])
+
+ @staticmethod
+ def appendBucketSize(params, bucket_size):
+ """Append BUCKETSIZE to params."""
+ if bucket_size is not None:
+ params.extend(["BUCKETSIZE", bucket_size])
+
+
+class CMSBloom(CMSCommands, AbstractBloom):
+ def __init__(self, client, **kwargs):
+ """Create a new RedisBloom client."""
+ # Set the module commands' callbacks
+ MODULE_CALLBACKS = {
+ CMS_INITBYDIM: bool_ok,
+ CMS_INITBYPROB: bool_ok,
+ # CMS_INCRBY: spaceHolder,
+ # CMS_QUERY: spaceHolder,
+ CMS_MERGE: bool_ok,
+ CMS_INFO: CMSInfo,
+ }
+
+ self.client = client
+ self.commandmixin = CMSCommands
+ self.execute_command = client.execute_command
+
+ for k, v in MODULE_CALLBACKS.items():
+ self.client.set_response_callback(k, v)
+
+
+class TOPKBloom(TOPKCommands, AbstractBloom):
+ def __init__(self, client, **kwargs):
+ """Create a new RedisBloom client."""
+ # Set the module commands' callbacks
+ MODULE_CALLBACKS = {
+ TOPK_RESERVE: bool_ok,
+ TOPK_ADD: parse_to_list,
+ TOPK_INCRBY: parse_to_list,
+ # TOPK_QUERY: spaceHolder,
+ # TOPK_COUNT: spaceHolder,
+ TOPK_LIST: parse_to_list,
+ TOPK_INFO: TopKInfo,
+ }
+
+ self.client = client
+ self.commandmixin = TOPKCommands
+ self.execute_command = client.execute_command
+
+ for k, v in MODULE_CALLBACKS.items():
+ self.client.set_response_callback(k, v)
+
+
+class CFBloom(CFCommands, AbstractBloom):
+ def __init__(self, client, **kwargs):
+ """Create a new RedisBloom client."""
+ # Set the module commands' callbacks
+ MODULE_CALLBACKS = {
+ CF_RESERVE: bool_ok,
+ # CF_ADD: spaceHolder,
+ # CF_ADDNX: spaceHolder,
+ # CF_INSERT: spaceHolder,
+ # CF_INSERTNX: spaceHolder,
+ # CF_EXISTS: spaceHolder,
+ # CF_DEL: spaceHolder,
+ # CF_COUNT: spaceHolder,
+ # CF_SCANDUMP: spaceHolder,
+ # CF_LOADCHUNK: spaceHolder,
+ CF_INFO: CFInfo,
+ }
+
+ self.client = client
+ self.commandmixin = CFCommands
+ self.execute_command = client.execute_command
+
+ for k, v in MODULE_CALLBACKS.items():
+ self.client.set_response_callback(k, v)
+
+
+class TDigestBloom(TDigestCommands, AbstractBloom):
+ def __init__(self, client, **kwargs):
+ """Create a new RedisBloom client."""
+ # Set the module commands' callbacks
+ MODULE_CALLBACKS = {
+ TDIGEST_CREATE: bool_ok,
+ # TDIGEST_RESET: bool_ok,
+ # TDIGEST_ADD: spaceHolder,
+ # TDIGEST_MERGE: spaceHolder,
+ TDIGEST_CDF: float,
+ TDIGEST_QUANTILE: float,
+ TDIGEST_MIN: float,
+ TDIGEST_MAX: float,
+ TDIGEST_INFO: TDigestInfo,
+ }
+
+ self.client = client
+ self.commandmixin = TDigestCommands
+ self.execute_command = client.execute_command
+
+ for k, v in MODULE_CALLBACKS.items():
+ self.client.set_response_callback(k, v)
+
+
+class BFBloom(BFCommands, AbstractBloom):
+ def __init__(self, client, **kwargs):
+ """Create a new RedisBloom client."""
+ # Set the module commands' callbacks
+ MODULE_CALLBACKS = {
+ BF_RESERVE: bool_ok,
+ # BF_ADD: spaceHolder,
+ # BF_MADD: spaceHolder,
+ # BF_INSERT: spaceHolder,
+ # BF_EXISTS: spaceHolder,
+ # BF_MEXISTS: spaceHolder,
+ # BF_SCANDUMP: spaceHolder,
+ # BF_LOADCHUNK: spaceHolder,
+ BF_INFO: BFInfo,
+ }
+
+ self.client = client
+ self.commandmixin = BFCommands
+ self.execute_command = client.execute_command
+
+ for k, v in MODULE_CALLBACKS.items():
+ self.client.set_response_callback(k, v)
diff --git a/redis/commands/bf/commands.py b/redis/commands/bf/commands.py
new file mode 100644
index 0000000..3c8bf7f
--- /dev/null
+++ b/redis/commands/bf/commands.py
@@ -0,0 +1,494 @@
+from redis.client import NEVER_DECODE
+from redis.exceptions import ModuleError
+from redis.utils import HIREDIS_AVAILABLE
+
+BF_RESERVE = "BF.RESERVE"
+BF_ADD = "BF.ADD"
+BF_MADD = "BF.MADD"
+BF_INSERT = "BF.INSERT"
+BF_EXISTS = "BF.EXISTS"
+BF_MEXISTS = "BF.MEXISTS"
+BF_SCANDUMP = "BF.SCANDUMP"
+BF_LOADCHUNK = "BF.LOADCHUNK"
+BF_INFO = "BF.INFO"
+
+CF_RESERVE = "CF.RESERVE"
+CF_ADD = "CF.ADD"
+CF_ADDNX = "CF.ADDNX"
+CF_INSERT = "CF.INSERT"
+CF_INSERTNX = "CF.INSERTNX"
+CF_EXISTS = "CF.EXISTS"
+CF_DEL = "CF.DEL"
+CF_COUNT = "CF.COUNT"
+CF_SCANDUMP = "CF.SCANDUMP"
+CF_LOADCHUNK = "CF.LOADCHUNK"
+CF_INFO = "CF.INFO"
+
+CMS_INITBYDIM = "CMS.INITBYDIM"
+CMS_INITBYPROB = "CMS.INITBYPROB"
+CMS_INCRBY = "CMS.INCRBY"
+CMS_QUERY = "CMS.QUERY"
+CMS_MERGE = "CMS.MERGE"
+CMS_INFO = "CMS.INFO"
+
+TOPK_RESERVE = "TOPK.RESERVE"
+TOPK_ADD = "TOPK.ADD"
+TOPK_INCRBY = "TOPK.INCRBY"
+TOPK_QUERY = "TOPK.QUERY"
+TOPK_COUNT = "TOPK.COUNT"
+TOPK_LIST = "TOPK.LIST"
+TOPK_INFO = "TOPK.INFO"
+
+TDIGEST_CREATE = "TDIGEST.CREATE"
+TDIGEST_RESET = "TDIGEST.RESET"
+TDIGEST_ADD = "TDIGEST.ADD"
+TDIGEST_MERGE = "TDIGEST.MERGE"
+TDIGEST_CDF = "TDIGEST.CDF"
+TDIGEST_QUANTILE = "TDIGEST.QUANTILE"
+TDIGEST_MIN = "TDIGEST.MIN"
+TDIGEST_MAX = "TDIGEST.MAX"
+TDIGEST_INFO = "TDIGEST.INFO"
+
+
+class BFCommands:
+ """RedisBloom commands."""
+
+ # region Bloom Filter Functions
+ def create(self, key, errorRate, capacity, expansion=None, noScale=None):
+ """
+ Create a new Bloom Filter `key` with desired probability of false positives
+ `errorRate` expected entries to be inserted as `capacity`.
+ Default expansion value is 2. By default, filter is auto-scaling.
+ For more information see `BF.RESERVE <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfreserve>`_.
+ """ # noqa
+ params = [key, errorRate, capacity]
+ self.appendExpansion(params, expansion)
+ self.appendNoScale(params, noScale)
+ return self.execute_command(BF_RESERVE, *params)
+
+ def add(self, key, item):
+ """
+ Add to a Bloom Filter `key` an `item`.
+ For more information see `BF.ADD <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfadd>`_.
+ """ # noqa
+ params = [key, item]
+ return self.execute_command(BF_ADD, *params)
+
+ def madd(self, key, *items):
+ """
+ Add to a Bloom Filter `key` multiple `items`.
+ For more information see `BF.MADD <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfmadd>`_.
+ """ # noqa
+ params = [key]
+ params += items
+ return self.execute_command(BF_MADD, *params)
+
+ def insert(
+ self,
+ key,
+ items,
+ capacity=None,
+ error=None,
+ noCreate=None,
+ expansion=None,
+ noScale=None,
+ ):
+ """
+ Add to a Bloom Filter `key` multiple `items`.
+
+ If `nocreate` remain `None` and `key` does not exist, a new Bloom Filter
+ `key` will be created with desired probability of false positives `errorRate`
+ and expected entries to be inserted as `size`.
+ For more information see `BF.INSERT <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfinsert>`_.
+ """ # noqa
+ params = [key]
+ self.appendCapacity(params, capacity)
+ self.appendError(params, error)
+ self.appendExpansion(params, expansion)
+ self.appendNoCreate(params, noCreate)
+ self.appendNoScale(params, noScale)
+ self.appendItems(params, items)
+
+ return self.execute_command(BF_INSERT, *params)
+
+ def exists(self, key, item):
+ """
+ Check whether an `item` exists in Bloom Filter `key`.
+ For more information see `BF.EXISTS <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfexists>`_.
+ """ # noqa
+ params = [key, item]
+ return self.execute_command(BF_EXISTS, *params)
+
+ def mexists(self, key, *items):
+ """
+ Check whether `items` exist in Bloom Filter `key`.
+ For more information see `BF.MEXISTS <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfmexists>`_.
+ """ # noqa
+ params = [key]
+ params += items
+ return self.execute_command(BF_MEXISTS, *params)
+
+ def scandump(self, key, iter):
+ """
+ Begin an incremental save of the bloom filter `key`.
+
+ This is useful for large bloom filters which cannot fit into the normal SAVE and RESTORE model.
+ The first time this command is called, the value of `iter` should be 0.
+ This command will return successive (iter, data) pairs until (0, NULL) to indicate completion.
+ For more information see `BF.SCANDUMP <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfscandump>`_.
+ """ # noqa
+ if HIREDIS_AVAILABLE:
+ raise ModuleError("This command cannot be used when hiredis is available.")
+
+ params = [key, iter]
+ options = {}
+ options[NEVER_DECODE] = []
+ return self.execute_command(BF_SCANDUMP, *params, **options)
+
+ def loadchunk(self, key, iter, data):
+ """
+ Restore a filter previously saved using SCANDUMP.
+
+ See the SCANDUMP command for example usage.
+ This command will overwrite any bloom filter stored under key.
+ Ensure that the bloom filter will not be modified between invocations.
+ For more information see `BF.LOADCHUNK <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfloadchunk>`_.
+ """ # noqa
+ params = [key, iter, data]
+ return self.execute_command(BF_LOADCHUNK, *params)
+
+ def info(self, key):
+ """
+ Return capacity, size, number of filters, number of items inserted, and expansion rate.
+ For more information see `BF.INFO <https://oss.redis.com/redisbloom/master/Bloom_Commands/#bfinfo>`_.
+ """ # noqa
+ return self.execute_command(BF_INFO, key)
+
+
+class CFCommands:
+
+ # region Cuckoo Filter Functions
+ def create(
+ self, key, capacity, expansion=None, bucket_size=None, max_iterations=None
+ ):
+ """
+ Create a new Cuckoo Filter `key` an initial `capacity` items.
+ For more information see `CF.RESERVE <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfreserve>`_.
+ """ # noqa
+ params = [key, capacity]
+ self.appendExpansion(params, expansion)
+ self.appendBucketSize(params, bucket_size)
+ self.appendMaxIterations(params, max_iterations)
+ return self.execute_command(CF_RESERVE, *params)
+
+ def add(self, key, item):
+ """
+ Add an `item` to a Cuckoo Filter `key`.
+ For more information see `CF.ADD <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfadd>`_.
+ """ # noqa
+ params = [key, item]
+ return self.execute_command(CF_ADD, *params)
+
+ def addnx(self, key, item):
+ """
+ Add an `item` to a Cuckoo Filter `key` only if item does not yet exist.
+ Command might be slower that `add`.
+ For more information see `CF.ADDNX <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfaddnx>`_.
+ """ # noqa
+ params = [key, item]
+ return self.execute_command(CF_ADDNX, *params)
+
+ def insert(self, key, items, capacity=None, nocreate=None):
+ """
+ Add multiple `items` to a Cuckoo Filter `key`, allowing the filter
+ to be created with a custom `capacity` if it does not yet exist.
+ `items` must be provided as a list.
+ For more information see `CF.INSERT <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfinsert>`_.
+ """ # noqa
+ params = [key]
+ self.appendCapacity(params, capacity)
+ self.appendNoCreate(params, nocreate)
+ self.appendItems(params, items)
+ return self.execute_command(CF_INSERT, *params)
+
+ def insertnx(self, key, items, capacity=None, nocreate=None):
+ """
+ Add multiple `items` to a Cuckoo Filter `key` only if they do not exist yet,
+ allowing the filter to be created with a custom `capacity` if it does not yet exist.
+ `items` must be provided as a list.
+ For more information see `CF.INSERTNX <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfinsertnx>`_.
+ """ # noqa
+ params = [key]
+ self.appendCapacity(params, capacity)
+ self.appendNoCreate(params, nocreate)
+ self.appendItems(params, items)
+ return self.execute_command(CF_INSERTNX, *params)
+
+ def exists(self, key, item):
+ """
+ Check whether an `item` exists in Cuckoo Filter `key`.
+ For more information see `CF.EXISTS <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfexists>`_.
+ """ # noqa
+ params = [key, item]
+ return self.execute_command(CF_EXISTS, *params)
+
+ def delete(self, key, item):
+ """
+ Delete `item` from `key`.
+ For more information see `CF.DEL <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfdel>`_.
+ """ # noqa
+ params = [key, item]
+ return self.execute_command(CF_DEL, *params)
+
+ def count(self, key, item):
+ """
+ Return the number of times an `item` may be in the `key`.
+ For more information see `CF.COUNT <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfcount>`_.
+ """ # noqa
+ params = [key, item]
+ return self.execute_command(CF_COUNT, *params)
+
+ def scandump(self, key, iter):
+ """
+ Begin an incremental save of the Cuckoo filter `key`.
+
+ This is useful for large Cuckoo filters which cannot fit into the normal
+ SAVE and RESTORE model.
+ The first time this command is called, the value of `iter` should be 0.
+ This command will return successive (iter, data) pairs until
+ (0, NULL) to indicate completion.
+ For more information see `CF.SCANDUMP <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfscandump>`_.
+ """ # noqa
+ params = [key, iter]
+ return self.execute_command(CF_SCANDUMP, *params)
+
+ def loadchunk(self, key, iter, data):
+ """
+ Restore a filter previously saved using SCANDUMP. See the SCANDUMP command for example usage.
+
+ This command will overwrite any Cuckoo filter stored under key.
+ Ensure that the Cuckoo filter will not be modified between invocations.
+ For more information see `CF.LOADCHUNK <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfloadchunk>`_.
+ """ # noqa
+ params = [key, iter, data]
+ return self.execute_command(CF_LOADCHUNK, *params)
+
+ def info(self, key):
+ """
+ Return size, number of buckets, number of filter, number of items inserted,
+ number of items deleted, bucket size, expansion rate, and max iteration.
+ For more information see `CF.INFO <https://oss.redis.com/redisbloom/master/Cuckoo_Commands/#cfinfo>`_.
+ """ # noqa
+ return self.execute_command(CF_INFO, key)
+
+
+class TOPKCommands:
+ def reserve(self, key, k, width, depth, decay):
+ """
+ Create a new Top-K Filter `key` with desired probability of false
+ positives `errorRate` expected entries to be inserted as `size`.
+ For more information see `TOPK.RESERVE <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkreserve>`_.
+ """ # noqa
+ params = [key, k, width, depth, decay]
+ return self.execute_command(TOPK_RESERVE, *params)
+
+ def add(self, key, *items):
+ """
+ Add one `item` or more to a Top-K Filter `key`.
+ For more information see `TOPK.ADD <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkadd>`_.
+ """ # noqa
+ params = [key]
+ params += items
+ return self.execute_command(TOPK_ADD, *params)
+
+ def incrby(self, key, items, increments):
+ """
+ Add/increase `items` to a Top-K Sketch `key` by ''increments''.
+ Both `items` and `increments` are lists.
+ For more information see `TOPK.INCRBY <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkincrby>`_.
+
+ Example:
+
+ >>> topkincrby('A', ['foo'], [1])
+ """ # noqa
+ params = [key]
+ self.appendItemsAndIncrements(params, items, increments)
+ return self.execute_command(TOPK_INCRBY, *params)
+
+ def query(self, key, *items):
+ """
+ Check whether one `item` or more is a Top-K item at `key`.
+ For more information see `TOPK.QUERY <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkquery>`_.
+ """ # noqa
+ params = [key]
+ params += items
+ return self.execute_command(TOPK_QUERY, *params)
+
+ def count(self, key, *items):
+ """
+ Return count for one `item` or more from `key`.
+ For more information see `TOPK.COUNT <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkcount>`_.
+ """ # noqa
+ params = [key]
+ params += items
+ return self.execute_command(TOPK_COUNT, *params)
+
+ def list(self, key, withcount=False):
+ """
+ Return full list of items in Top-K list of `key`.
+ If `withcount` set to True, return full list of items
+ with probabilistic count in Top-K list of `key`.
+ For more information see `TOPK.LIST <https://oss.redis.com/redisbloom/master/TopK_Commands/#topklist>`_.
+ """ # noqa
+ params = [key]
+ if withcount:
+ params.append("WITHCOUNT")
+ return self.execute_command(TOPK_LIST, *params)
+
+ def info(self, key):
+ """
+ Return k, width, depth and decay values of `key`.
+ For more information see `TOPK.INFO <https://oss.redis.com/redisbloom/master/TopK_Commands/#topkinfo>`_.
+ """ # noqa
+ return self.execute_command(TOPK_INFO, key)
+
+
+class TDigestCommands:
+ def create(self, key, compression):
+ """
+ Allocate the memory and initialize the t-digest.
+ For more information see `TDIGEST.CREATE <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestcreate>`_.
+ """ # noqa
+ params = [key, compression]
+ return self.execute_command(TDIGEST_CREATE, *params)
+
+ def reset(self, key):
+ """
+ Reset the sketch `key` to zero - empty out the sketch and re-initialize it.
+ For more information see `TDIGEST.RESET <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestreset>`_.
+ """ # noqa
+ return self.execute_command(TDIGEST_RESET, key)
+
+ def add(self, key, values, weights):
+ """
+ Add one or more samples (value with weight) to a sketch `key`.
+ Both `values` and `weights` are lists.
+ For more information see `TDIGEST.ADD <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestadd>`_.
+
+ Example:
+
+ >>> tdigestadd('A', [1500.0], [1.0])
+ """ # noqa
+ params = [key]
+ self.appendValuesAndWeights(params, values, weights)
+ return self.execute_command(TDIGEST_ADD, *params)
+
+ def merge(self, toKey, fromKey):
+ """
+ Merge all of the values from 'fromKey' to 'toKey' sketch.
+ For more information see `TDIGEST.MERGE <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestmerge>`_.
+ """ # noqa
+ params = [toKey, fromKey]
+ return self.execute_command(TDIGEST_MERGE, *params)
+
+ def min(self, key):
+ """
+ Return minimum value from the sketch `key`. Will return DBL_MAX if the sketch is empty.
+ For more information see `TDIGEST.MIN <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestmin>`_.
+ """ # noqa
+ return self.execute_command(TDIGEST_MIN, key)
+
+ def max(self, key):
+ """
+ Return maximum value from the sketch `key`. Will return DBL_MIN if the sketch is empty.
+ For more information see `TDIGEST.MAX <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestmax>`_.
+ """ # noqa
+ return self.execute_command(TDIGEST_MAX, key)
+
+ def quantile(self, key, quantile):
+ """
+ Return double value estimate of the cutoff such that a specified fraction of the data
+ added to this TDigest would be less than or equal to the cutoff.
+ For more information see `TDIGEST.QUANTILE <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestquantile>`_.
+ """ # noqa
+ params = [key, quantile]
+ return self.execute_command(TDIGEST_QUANTILE, *params)
+
+ def cdf(self, key, value):
+ """
+ Return double fraction of all points added which are <= value.
+ For more information see `TDIGEST.CDF <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestcdf>`_.
+ """ # noqa
+ params = [key, value]
+ return self.execute_command(TDIGEST_CDF, *params)
+
+ def info(self, key):
+ """
+ Return Compression, Capacity, Merged Nodes, Unmerged Nodes, Merged Weight, Unmerged Weight
+ and Total Compressions.
+ For more information see `TDIGEST.INFO <https://oss.redis.com/redisbloom/master/TDigest_Commands/#tdigestinfo>`_.
+ """ # noqa
+ return self.execute_command(TDIGEST_INFO, key)
+
+
+class CMSCommands:
+
+ # region Count-Min Sketch Functions
+ def initbydim(self, key, width, depth):
+ """
+ Initialize a Count-Min Sketch `key` to dimensions (`width`, `depth`) specified by user.
+ For more information see `CMS.INITBYDIM <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsinitbydim>`_.
+ """ # noqa
+ params = [key, width, depth]
+ return self.execute_command(CMS_INITBYDIM, *params)
+
+ def initbyprob(self, key, error, probability):
+ """
+ Initialize a Count-Min Sketch `key` to characteristics (`error`, `probability`) specified by user.
+ For more information see `CMS.INITBYPROB <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsinitbyprob>`_.
+ """ # noqa
+ params = [key, error, probability]
+ return self.execute_command(CMS_INITBYPROB, *params)
+
+ def incrby(self, key, items, increments):
+ """
+ Add/increase `items` to a Count-Min Sketch `key` by ''increments''.
+ Both `items` and `increments` are lists.
+ For more information see `CMS.INCRBY <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsincrby>`_.
+
+ Example:
+
+ >>> cmsincrby('A', ['foo'], [1])
+ """ # noqa
+ params = [key]
+ self.appendItemsAndIncrements(params, items, increments)
+ return self.execute_command(CMS_INCRBY, *params)
+
+ def query(self, key, *items):
+ """
+ Return count for an `item` from `key`. Multiple items can be queried with one call.
+ For more information see `CMS.QUERY <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsquery>`_.
+ """ # noqa
+ params = [key]
+ params += items
+ return self.execute_command(CMS_QUERY, *params)
+
+ def merge(self, destKey, numKeys, srcKeys, weights=[]):
+ """
+ Merge `numKeys` of sketches into `destKey`. Sketches specified in `srcKeys`.
+ All sketches must have identical width and depth.
+ `Weights` can be used to multiply certain sketches. Default weight is 1.
+ Both `srcKeys` and `weights` are lists.
+ For more information see `CMS.MERGE <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsmerge>`_.
+ """ # noqa
+ params = [destKey, numKeys]
+ params += srcKeys
+ self.appendWeights(params, weights)
+ return self.execute_command(CMS_MERGE, *params)
+
+ def info(self, key):
+ """
+ Return width, depth and total count of the sketch.
+ For more information see `CMS.INFO <https://oss.redis.com/redisbloom/master/CountMinSketch_Commands/#cmsinfo>`_.
+ """ # noqa
+ return self.execute_command(CMS_INFO, key)
diff --git a/redis/commands/bf/info.py b/redis/commands/bf/info.py
new file mode 100644
index 0000000..24c5419
--- /dev/null
+++ b/redis/commands/bf/info.py
@@ -0,0 +1,85 @@
+from ..helpers import nativestr
+
+
+class BFInfo(object):
+ capacity = None
+ size = None
+ filterNum = None
+ insertedNum = None
+ expansionRate = None
+
+ def __init__(self, args):
+ response = dict(zip(map(nativestr, args[::2]), args[1::2]))
+ self.capacity = response["Capacity"]
+ self.size = response["Size"]
+ self.filterNum = response["Number of filters"]
+ self.insertedNum = response["Number of items inserted"]
+ self.expansionRate = response["Expansion rate"]
+
+
+class CFInfo(object):
+ size = None
+ bucketNum = None
+ filterNum = None
+ insertedNum = None
+ deletedNum = None
+ bucketSize = None
+ expansionRate = None
+ maxIteration = None
+
+ def __init__(self, args):
+ response = dict(zip(map(nativestr, args[::2]), args[1::2]))
+ self.size = response["Size"]
+ self.bucketNum = response["Number of buckets"]
+ self.filterNum = response["Number of filters"]
+ self.insertedNum = response["Number of items inserted"]
+ self.deletedNum = response["Number of items deleted"]
+ self.bucketSize = response["Bucket size"]
+ self.expansionRate = response["Expansion rate"]
+ self.maxIteration = response["Max iterations"]
+
+
+class CMSInfo(object):
+ width = None
+ depth = None
+ count = None
+
+ def __init__(self, args):
+ response = dict(zip(map(nativestr, args[::2]), args[1::2]))
+ self.width = response["width"]
+ self.depth = response["depth"]
+ self.count = response["count"]
+
+
+class TopKInfo(object):
+ k = None
+ width = None
+ depth = None
+ decay = None
+
+ def __init__(self, args):
+ response = dict(zip(map(nativestr, args[::2]), args[1::2]))
+ self.k = response["k"]
+ self.width = response["width"]
+ self.depth = response["depth"]
+ self.decay = response["decay"]
+
+
+class TDigestInfo(object):
+ compression = None
+ capacity = None
+ mergedNodes = None
+ unmergedNodes = None
+ mergedWeight = None
+ unmergedWeight = None
+ totalCompressions = None
+
+ def __init__(self, args):
+ response = dict(zip(map(nativestr, args[::2]), args[1::2]))
+ self.compression = response["Compression"]
+ self.capacity = response["Capacity"]
+ self.mergedNodes = response["Merged nodes"]
+ self.unmergedNodes = response["Unmerged nodes"]
+ self.mergedWeight = response["Merged weight"]
+ self.unmergedWeight = response["Unmerged weight"]
+ self.totalCompressions = response["Total compressions"]
diff --git a/redis/commands/redismodules.py b/redis/commands/redismodules.py
index e5ace63..eafd650 100644
--- a/redis/commands/redismodules.py
+++ b/redis/commands/redismodules.py
@@ -32,6 +32,46 @@ class RedisModuleCommands:
s = TimeSeries(client=self)
return s
+ def bf(self):
+ """Access the bloom namespace."""
+
+ from .bf import BFBloom
+
+ bf = BFBloom(client=self)
+ return bf
+
+ def cf(self):
+ """Access the bloom namespace."""
+
+ from .bf import CFBloom
+
+ cf = CFBloom(client=self)
+ return cf
+
+ def cms(self):
+ """Access the bloom namespace."""
+
+ from .bf import CMSBloom
+
+ cms = CMSBloom(client=self)
+ return cms
+
+ def topk(self):
+ """Access the bloom namespace."""
+
+ from .bf import TOPKBloom
+
+ topk = TOPKBloom(client=self)
+ return topk
+
+ def tdigest(self):
+ """Access the bloom namespace."""
+
+ from .bf import TDigestBloom
+
+ tdigest = TDigestBloom(client=self)
+ return tdigest
+
def graph(self, index_name="idx"):
"""Access the timeseries namespace, providing support for
redis timeseries data.
diff --git a/setup.py b/setup.py
index d830801..58d753f 100644
--- a/setup.py
+++ b/setup.py
@@ -15,6 +15,7 @@ setup(
include=[
"redis",
"redis.commands",
+ "redis.commands.bf",
"redis.commands.json",
"redis.commands.search",
"redis.commands.timeseries",
diff --git a/tests/test_bloom.py b/tests/test_bloom.py
new file mode 100644
index 0000000..8936584
--- /dev/null
+++ b/tests/test_bloom.py
@@ -0,0 +1,383 @@
+import pytest
+
+import redis.commands.bf
+from redis.exceptions import ModuleError, RedisError
+from redis.utils import HIREDIS_AVAILABLE
+
+
+def intlist(obj):
+ return [int(v) for v in obj]
+
+
+@pytest.fixture
+def client(modclient):
+ assert isinstance(modclient.bf(), redis.commands.bf.BFBloom)
+ assert isinstance(modclient.cf(), redis.commands.bf.CFBloom)
+ assert isinstance(modclient.cms(), redis.commands.bf.CMSBloom)
+ assert isinstance(modclient.tdigest(), redis.commands.bf.TDigestBloom)
+ assert isinstance(modclient.topk(), redis.commands.bf.TOPKBloom)
+
+ modclient.flushdb()
+ return modclient
+
+
+@pytest.mark.redismod
+def test_create(client):
+ """Test CREATE/RESERVE calls"""
+ assert client.bf().create("bloom", 0.01, 1000)
+ assert client.bf().create("bloom_e", 0.01, 1000, expansion=1)
+ assert client.bf().create("bloom_ns", 0.01, 1000, noScale=True)
+ assert client.cf().create("cuckoo", 1000)
+ assert client.cf().create("cuckoo_e", 1000, expansion=1)
+ assert client.cf().create("cuckoo_bs", 1000, bucket_size=4)
+ assert client.cf().create("cuckoo_mi", 1000, max_iterations=10)
+ assert client.cms().initbydim("cmsDim", 100, 5)
+ assert client.cms().initbyprob("cmsProb", 0.01, 0.01)
+ assert client.topk().reserve("topk", 5, 100, 5, 0.9)
+ assert client.tdigest().create("tDigest", 100)
+
+
+# region Test Bloom Filter
+@pytest.mark.redismod
+def test_bf_add(client):
+ assert client.bf().create("bloom", 0.01, 1000)
+ assert 1 == client.bf().add("bloom", "foo")
+ assert 0 == client.bf().add("bloom", "foo")
+ assert [0] == intlist(client.bf().madd("bloom", "foo"))
+ assert [0, 1] == client.bf().madd("bloom", "foo", "bar")
+ assert [0, 0, 1] == client.bf().madd("bloom", "foo", "bar", "baz")
+ assert 1 == client.bf().exists("bloom", "foo")
+ assert 0 == client.bf().exists("bloom", "noexist")
+ assert [1, 0] == intlist(client.bf().mexists("bloom", "foo", "noexist"))
+
+
+@pytest.mark.redismod
+def test_bf_insert(client):
+ assert client.bf().create("bloom", 0.01, 1000)
+ assert [1] == intlist(client.bf().insert("bloom", ["foo"]))
+ assert [0, 1] == intlist(client.bf().insert("bloom", ["foo", "bar"]))
+ assert [1] == intlist(client.bf().insert("captest", ["foo"], capacity=10))
+ assert [1] == intlist(client.bf().insert("errtest", ["foo"], error=0.01))
+ assert 1 == client.bf().exists("bloom", "foo")
+ assert 0 == client.bf().exists("bloom", "noexist")
+ assert [1, 0] == intlist(client.bf().mexists("bloom", "foo", "noexist"))
+ info = client.bf().info("bloom")
+ assert 2 == info.insertedNum
+ assert 1000 == info.capacity
+ assert 1 == info.filterNum
+
+
+@pytest.mark.redismod
+def test_bf_scandump_and_loadchunk(client):
+ # Store a filter
+ client.bf().create("myBloom", "0.0001", "1000")
+
+ # test is probabilistic and might fail. It is OK to change variables if
+ # certain to not break anything
+ def do_verify():
+ res = 0
+ for x in range(1000):
+ client.bf().add("myBloom", x)
+ rv = client.bf().exists("myBloom", x)
+ assert rv
+ rv = client.bf().exists("myBloom", f"nonexist_{x}")
+ res += rv == x
+ assert res < 5
+
+ do_verify()
+ cmds = []
+ if HIREDIS_AVAILABLE:
+ with pytest.raises(ModuleError):
+ cur = client.bf().scandump("myBloom", 0)
+ return
+
+ cur = client.bf().scandump("myBloom", 0)
+ first = cur[0]
+ cmds.append(cur)
+
+ while True:
+ cur = client.bf().scandump("myBloom", first)
+ first = cur[0]
+ if first == 0:
+ break
+ else:
+ cmds.append(cur)
+ prev_info = client.bf().execute_command("bf.debug", "myBloom")
+
+ # Remove the filter
+ client.bf().client.delete("myBloom")
+
+ # Now, load all the commands:
+ for cmd in cmds:
+ client.bf().loadchunk("myBloom", *cmd)
+
+ cur_info = client.bf().execute_command("bf.debug", "myBloom")
+ assert prev_info == cur_info
+ do_verify()
+
+ client.bf().client.delete("myBloom")
+ client.bf().create("myBloom", "0.0001", "10000000")
+
+
+@pytest.mark.redismod
+def test_bf_info(client):
+ expansion = 4
+ # Store a filter
+ client.bf().create("nonscaling", "0.0001", "1000", noScale=True)
+ info = client.bf().info("nonscaling")
+ assert info.expansionRate is None
+
+ client.bf().create("expanding", "0.0001", "1000", expansion=expansion)
+ info = client.bf().info("expanding")
+ assert info.expansionRate == 4
+
+ try:
+ # noScale mean no expansion
+ client.bf().create(
+ "myBloom", "0.0001", "1000", expansion=expansion, noScale=True
+ )
+ assert False
+ except RedisError:
+ assert True
+
+
+# region Test Cuckoo Filter
+@pytest.mark.redismod
+def test_cf_add_and_insert(client):
+ assert client.cf().create("cuckoo", 1000)
+ assert client.cf().add("cuckoo", "filter")
+ assert not client.cf().addnx("cuckoo", "filter")
+ assert 1 == client.cf().addnx("cuckoo", "newItem")
+ assert [1] == client.cf().insert("captest", ["foo"])
+ assert [1] == client.cf().insert("captest", ["foo"], capacity=1000)
+ assert [1] == client.cf().insertnx("captest", ["bar"])
+ assert [1] == client.cf().insertnx("captest", ["food"], nocreate="1")
+ assert [0, 0, 1] == client.cf().insertnx("captest", ["foo", "bar", "baz"])
+ assert [0] == client.cf().insertnx("captest", ["bar"], capacity=1000)
+ assert [1] == client.cf().insert("empty1", ["foo"], capacity=1000)
+ assert [1] == client.cf().insertnx("empty2", ["bar"], capacity=1000)
+ info = client.cf().info("captest")
+ assert 5 == info.insertedNum
+ assert 0 == info.deletedNum
+ assert 1 == info.filterNum
+
+
+@pytest.mark.redismod
+def test_cf_exists_and_del(client):
+ assert client.cf().create("cuckoo", 1000)
+ assert client.cf().add("cuckoo", "filter")
+ assert client.cf().exists("cuckoo", "filter")
+ assert not client.cf().exists("cuckoo", "notexist")
+ assert 1 == client.cf().count("cuckoo", "filter")
+ assert 0 == client.cf().count("cuckoo", "notexist")
+ assert client.cf().delete("cuckoo", "filter")
+ assert 0 == client.cf().count("cuckoo", "filter")
+
+
+# region Test Count-Min Sketch
+@pytest.mark.redismod
+def test_cms(client):
+ assert client.cms().initbydim("dim", 1000, 5)
+ assert client.cms().initbyprob("prob", 0.01, 0.01)
+ assert client.cms().incrby("dim", ["foo"], [5])
+ assert [0] == client.cms().query("dim", "notexist")
+ assert [5] == client.cms().query("dim", "foo")
+ assert [10, 15] == client.cms().incrby("dim", ["foo", "bar"], [5, 15])
+ assert [10, 15] == client.cms().query("dim", "foo", "bar")
+ info = client.cms().info("dim")
+ assert 1000 == info.width
+ assert 5 == info.depth
+ assert 25 == info.count
+
+
+@pytest.mark.redismod
+def test_cms_merge(client):
+ assert client.cms().initbydim("A", 1000, 5)
+ assert client.cms().initbydim("B", 1000, 5)
+ assert client.cms().initbydim("C", 1000, 5)
+ assert client.cms().incrby("A", ["foo", "bar", "baz"], [5, 3, 9])
+ assert client.cms().incrby("B", ["foo", "bar", "baz"], [2, 3, 1])
+ assert [5, 3, 9] == client.cms().query("A", "foo", "bar", "baz")
+ assert [2, 3, 1] == client.cms().query("B", "foo", "bar", "baz")
+ assert client.cms().merge("C", 2, ["A", "B"])
+ assert [7, 6, 10] == client.cms().query("C", "foo", "bar", "baz")
+ assert client.cms().merge("C", 2, ["A", "B"], ["1", "2"])
+ assert [9, 9, 11] == client.cms().query("C", "foo", "bar", "baz")
+ assert client.cms().merge("C", 2, ["A", "B"], ["2", "3"])
+ assert [16, 15, 21] == client.cms().query("C", "foo", "bar", "baz")
+
+
+# endregion
+
+
+# region Test Top-K
+@pytest.mark.redismod
+def test_topk(client):
+ # test list with empty buckets
+ assert client.topk().reserve("topk", 3, 50, 4, 0.9)
+ assert [
+ None,
+ None,
+ None,
+ "A",
+ "C",
+ "D",
+ None,
+ None,
+ "E",
+ None,
+ "B",
+ "C",
+ None,
+ None,
+ None,
+ "D",
+ None,
+ ] == client.topk().add(
+ "topk",
+ "A",
+ "B",
+ "C",
+ "D",
+ "E",
+ "A",
+ "A",
+ "B",
+ "C",
+ "G",
+ "D",
+ "B",
+ "D",
+ "A",
+ "E",
+ "E",
+ 1,
+ )
+ assert [1, 1, 0, 0, 1, 0, 0] == client.topk().query(
+ "topk", "A", "B", "C", "D", "E", "F", "G"
+ )
+ assert [4, 3, 2, 3, 3, 0, 1] == client.topk().count(
+ "topk", "A", "B", "C", "D", "E", "F", "G"
+ )
+
+ # test full list
+ assert client.topk().reserve("topklist", 3, 50, 3, 0.9)
+ assert client.topk().add(
+ "topklist",
+ "A",
+ "B",
+ "C",
+ "D",
+ "E",
+ "A",
+ "A",
+ "B",
+ "C",
+ "G",
+ "D",
+ "B",
+ "D",
+ "A",
+ "E",
+ "E",
+ )
+ assert ["A", "B", "E"] == client.topk().list("topklist")
+ assert ["A", 4, "B", 3, "E", 3] == client.topk().list("topklist", withcount=True)
+ info = client.topk().info("topklist")
+ assert 3 == info.k
+ assert 50 == info.width
+ assert 3 == info.depth
+ assert 0.9 == round(float(info.decay), 1)
+
+
+@pytest.mark.redismod
+def test_topk_incrby(client):
+ client.flushdb()
+ assert client.topk().reserve("topk", 3, 10, 3, 1)
+ assert [None, None, None] == client.topk().incrby(
+ "topk", ["bar", "baz", "42"], [3, 6, 2]
+ )
+ assert [None, "bar"] == client.topk().incrby("topk", ["42", "xyzzy"], [8, 4])
+ assert [3, 6, 10, 4, 0] == client.topk().count(
+ "topk", "bar", "baz", "42", "xyzzy", 4
+ )
+
+
+# region Test T-Digest
+@pytest.mark.redismod
+def test_tdigest_reset(client):
+ assert client.tdigest().create("tDigest", 10)
+ # reset on empty histogram
+ assert client.tdigest().reset("tDigest")
+ # insert data-points into sketch
+ assert client.tdigest().add("tDigest", list(range(10)), [1.0] * 10)
+
+ assert client.tdigest().reset("tDigest")
+ # assert we have 0 unmerged nodes
+ assert 0 == client.tdigest().info("tDigest").unmergedNodes
+
+
+@pytest.mark.redismod
+def test_tdigest_merge(client):
+ assert client.tdigest().create("to-tDigest", 10)
+ assert client.tdigest().create("from-tDigest", 10)
+ # insert data-points into sketch
+ assert client.tdigest().add("from-tDigest", [1.0] * 10, [1.0] * 10)
+ assert client.tdigest().add("to-tDigest", [2.0] * 10, [10.0] * 10)
+ # merge from-tdigest into to-tdigest
+ assert client.tdigest().merge("to-tDigest", "from-tDigest")
+ # we should now have 110 weight on to-histogram
+ info = client.tdigest().info("to-tDigest")
+ total_weight_to = float(info.mergedWeight) + float(info.unmergedWeight)
+ assert 110 == total_weight_to
+
+
+@pytest.mark.redismod
+def test_tdigest_min_and_max(client):
+ assert client.tdigest().create("tDigest", 100)
+ # insert data-points into sketch
+ assert client.tdigest().add("tDigest", [1, 2, 3], [1.0] * 3)
+ # min/max
+ assert 3 == client.tdigest().max("tDigest")
+ assert 1 == client.tdigest().min("tDigest")
+
+
+@pytest.mark.redismod
+def test_tdigest_quantile(client):
+ assert client.tdigest().create("tDigest", 500)
+ # insert data-points into sketch
+ assert client.tdigest().add(
+ "tDigest", list([x * 0.01 for x in range(1, 10000)]), [1.0] * 10000
+ )
+ # assert min min/max have same result as quantile 0 and 1
+ assert client.tdigest().max("tDigest") == client.tdigest().quantile("tDigest", 1.0)
+ assert client.tdigest().min("tDigest") == client.tdigest().quantile("tDigest", 0.0)
+
+ assert 1.0 == round(client.tdigest().quantile("tDigest", 0.01), 2)
+ assert 99.0 == round(client.tdigest().quantile("tDigest", 0.99), 2)
+
+
+@pytest.mark.redismod
+def test_tdigest_cdf(client):
+ assert client.tdigest().create("tDigest", 100)
+ # insert data-points into sketch
+ assert client.tdigest().add("tDigest", list(range(1, 10)), [1.0] * 10)
+ assert 0.1 == round(client.tdigest().cdf("tDigest", 1.0), 1)
+ assert 0.9 == round(client.tdigest().cdf("tDigest", 9.0), 1)
+
+
+# @pytest.mark.redismod
+# def test_pipeline(client):
+# pipeline = client.bf().pipeline()
+# assert not client.bf().execute_command("get pipeline")
+#
+# assert client.bf().create("pipeline", 0.01, 1000)
+# for i in range(100):
+# pipeline.add("pipeline", i)
+# for i in range(100):
+# assert not (client.bf().exists("pipeline", i))
+#
+# pipeline.execute()
+#
+# for i in range(100):
+# assert client.bf().exists("pipeline", i)
diff --git a/tox.ini b/tox.ini
index 0ccc9bb..f4eaedc 100644
--- a/tox.ini
+++ b/tox.ini
@@ -170,6 +170,7 @@ exclude =
venv*,
whitelist.py
ignore =
+ F405
W503
E203
E126