summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChayim <chayim@users.noreply.github.com>2021-10-28 09:57:03 +0300
committerGitHub <noreply@github.com>2021-10-28 09:57:03 +0300
commiteaa56b7d721182541bab087a6d61304c778f7ea9 (patch)
tree9ff017a9ef6f4fc9da152951a9939ef5297f8969
parent1f11f8c4ecbc2227c28077b9e9764e543c38d0a5 (diff)
downloadredis-py-eaa56b7d721182541bab087a6d61304c778f7ea9.tar.gz
redis timeseries support (#1652)
-rw-r--r--redis/commands/helpers.py14
-rw-r--r--redis/commands/json/__init__.py4
-rw-r--r--redis/commands/redismodules.py16
-rw-r--r--redis/commands/timeseries/__init__.py61
-rw-r--r--redis/commands/timeseries/commands.py775
-rw-r--r--redis/commands/timeseries/info.py82
-rw-r--r--redis/commands/timeseries/utils.py49
-rw-r--r--setup.py1
-rw-r--r--tests/test_timeseries.py593
9 files changed, 1590 insertions, 5 deletions
diff --git a/redis/commands/helpers.py b/redis/commands/helpers.py
index b012621..a92c025 100644
--- a/redis/commands/helpers.py
+++ b/redis/commands/helpers.py
@@ -23,3 +23,17 @@ def nativestr(x):
def delist(x):
"""Given a list of binaries, return the stringified version."""
return [nativestr(obj) for obj in x]
+
+
+def parse_to_list(response):
+ """Optimistally parse the response to a list.
+ """
+ res = []
+ for item in response:
+ try:
+ res.append(int(item))
+ except ValueError:
+ res.append(nativestr(item))
+ except TypeError:
+ res.append(None)
+ return res
diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py
index 2e26de3..9783705 100644
--- a/redis/commands/json/__init__.py
+++ b/redis/commands/json/__init__.py
@@ -1,12 +1,8 @@
-# from typing import Optional
from json import JSONDecoder, JSONEncoder
-# # from redis.client import Redis
-
from .helpers import bulk_of_jsons
from ..helpers import nativestr, delist
from .commands import JSONCommands
-# from ..feature import AbstractFeature
class JSON(JSONCommands):
diff --git a/redis/commands/redismodules.py b/redis/commands/redismodules.py
index 3ecce29..457a69e 100644
--- a/redis/commands/redismodules.py
+++ b/redis/commands/redismodules.py
@@ -27,8 +27,22 @@ class RedisModuleCommands:
try:
modversion = self.loaded_modules['search']
except IndexError:
- raise ModuleError("rejson is not a loaded in the redis instance.")
+ raise ModuleError("search is not a loaded in the redis instance.")
from .search import Search
s = Search(client=self, version=modversion, index_name=index_name)
return s
+
+ def ts(self, index_name="idx"):
+ """Access the timeseries namespace, providing support for
+ redis timeseries data.
+ """
+ try:
+ modversion = self.loaded_modules['timeseries']
+ except IndexError:
+ raise ModuleError("timeseries is not a loaded in "
+ "the redis instance.")
+
+ from .timeseries import TimeSeries
+ s = TimeSeries(client=self, version=modversion, index_name=index_name)
+ return s
diff --git a/redis/commands/timeseries/__init__.py b/redis/commands/timeseries/__init__.py
new file mode 100644
index 0000000..db9c3a5
--- /dev/null
+++ b/redis/commands/timeseries/__init__.py
@@ -0,0 +1,61 @@
+from redis.client import bool_ok
+
+from .utils import (
+ parse_range,
+ parse_get,
+ parse_m_range,
+ parse_m_get,
+)
+from .info import TSInfo
+from ..helpers import parse_to_list
+from .commands import (
+ ALTER_CMD,
+ CREATE_CMD,
+ CREATERULE_CMD,
+ DELETERULE_CMD,
+ DEL_CMD,
+ GET_CMD,
+ INFO_CMD,
+ MGET_CMD,
+ MRANGE_CMD,
+ MREVRANGE_CMD,
+ QUERYINDEX_CMD,
+ RANGE_CMD,
+ REVRANGE_CMD,
+ TimeSeriesCommands,
+)
+
+
+class TimeSeries(TimeSeriesCommands):
+ """
+ This class subclasses redis-py's `Redis` and implements RedisTimeSeries's
+ commands (prefixed with "ts").
+ The client allows to interact with RedisTimeSeries and use all of it's
+ functionality.
+ """
+
+ def __init__(self, client=None, version=None, **kwargs):
+ """Create a new RedisTimeSeries client."""
+ # Set the module commands' callbacks
+ MODULE_CALLBACKS = {
+ CREATE_CMD: bool_ok,
+ ALTER_CMD: bool_ok,
+ CREATERULE_CMD: bool_ok,
+ DEL_CMD: int,
+ DELETERULE_CMD: bool_ok,
+ RANGE_CMD: parse_range,
+ REVRANGE_CMD: parse_range,
+ MRANGE_CMD: parse_m_range,
+ MREVRANGE_CMD: parse_m_range,
+ GET_CMD: parse_get,
+ MGET_CMD: parse_m_get,
+ INFO_CMD: TSInfo,
+ QUERYINDEX_CMD: parse_to_list,
+ }
+
+ self.client = client
+ self.execute_command = client.execute_command
+ self.MODULE_VERSION = version
+
+ for k in MODULE_CALLBACKS:
+ self.client.set_response_callback(k, MODULE_CALLBACKS[k])
diff --git a/redis/commands/timeseries/commands.py b/redis/commands/timeseries/commands.py
new file mode 100644
index 0000000..3b9ee0f
--- /dev/null
+++ b/redis/commands/timeseries/commands.py
@@ -0,0 +1,775 @@
+from redis.exceptions import DataError
+
+
+ADD_CMD = "TS.ADD"
+ALTER_CMD = "TS.ALTER"
+CREATERULE_CMD = "TS.CREATERULE"
+CREATE_CMD = "TS.CREATE"
+DECRBY_CMD = "TS.DECRBY"
+DELETERULE_CMD = "TS.DELETERULE"
+DEL_CMD = "TS.DEL"
+GET_CMD = "TS.GET"
+INCRBY_CMD = "TS.INCRBY"
+INFO_CMD = "TS.INFO"
+MADD_CMD = "TS.MADD"
+MGET_CMD = "TS.MGET"
+MRANGE_CMD = "TS.MRANGE"
+MREVRANGE_CMD = "TS.MREVRANGE"
+QUERYINDEX_CMD = "TS.QUERYINDEX"
+RANGE_CMD = "TS.RANGE"
+REVRANGE_CMD = "TS.REVRANGE"
+
+
+class TimeSeriesCommands:
+ """RedisTimeSeries Commands."""
+
+ def create(self, key, **kwargs):
+ """
+ Create a new time-series.
+ For more information see
+ `TS.CREATE <https://oss.redis.com/redistimeseries/master/commands/#tscreate>`_. # noqa
+
+ Args:
+
+ key:
+ time-series key
+ retention_msecs:
+ Maximum age for samples compared to last event time (in milliseconds).
+ If None or 0 is passed then the series is not trimmed at all.
+ uncompressed:
+ Since RedisTimeSeries v1.2, both timestamps and values are
+ compressed by default.
+ Adding this flag will keep data in an uncompressed form.
+ Compression not only saves
+ memory but usually improve performance due to lower number
+ of memory accesses.
+ labels:
+ Set of label-value pairs that represent metadata labels of the key.
+ chunk_size:
+ Each time-serie uses chunks of memory of fixed size for
+ time series samples.
+ You can alter the default TSDB chunk size by passing the
+ chunk_size argument (in Bytes).
+ duplicate_policy:
+ Since RedisTimeSeries v1.4 you can specify the duplicate sample policy
+ ( Configure what to do on duplicate sample. )
+ Can be one of:
+ - 'block': an error will occur for any out of order sample.
+ - 'first': ignore the new value.
+ - 'last': override with latest value.
+ - 'min': only override if the value is lower than the existing value.
+ - 'max': only override if the value is higher than the existing value.
+ When this is not set, the server-wide default will be used.
+ """
+ retention_msecs = kwargs.get("retention_msecs", None)
+ uncompressed = kwargs.get("uncompressed", False)
+ labels = kwargs.get("labels", {})
+ chunk_size = kwargs.get("chunk_size", None)
+ duplicate_policy = kwargs.get("duplicate_policy", None)
+ params = [key]
+ self._appendRetention(params, retention_msecs)
+ self._appendUncompressed(params, uncompressed)
+ self._appendChunkSize(params, chunk_size)
+ self._appendDuplicatePolicy(params, CREATE_CMD, duplicate_policy)
+ self._appendLabels(params, labels)
+
+ return self.execute_command(CREATE_CMD, *params)
+
+ def alter(self, key, **kwargs):
+ """
+ Update the retention, labels of an existing key.
+ For more information see
+ `TS.ALTER <https://oss.redis.com/redistimeseries/master/commands/#tsalter>`_. # noqa
+
+ The parameters are the same as TS.CREATE.
+ """
+ retention_msecs = kwargs.get("retention_msecs", None)
+ labels = kwargs.get("labels", {})
+ duplicate_policy = kwargs.get("duplicate_policy", None)
+ params = [key]
+ self._appendRetention(params, retention_msecs)
+ self._appendDuplicatePolicy(params, ALTER_CMD, duplicate_policy)
+ self._appendLabels(params, labels)
+
+ return self.execute_command(ALTER_CMD, *params)
+
+ def add(self, key, timestamp, value, **kwargs):
+ """
+ Append (or create and append) a new sample to the series.
+ For more information see
+ `TS.ADD <https://oss.redis.com/redistimeseries/master/commands/#tsadd>`_. # noqa
+
+ Args:
+
+ key:
+ time-series key
+ timestamp:
+ Timestamp of the sample. * can be used for automatic timestamp (using the system clock).
+ value:
+ Numeric data value of the sample
+ retention_msecs:
+ Maximum age for samples compared to last event time (in milliseconds).
+ If None or 0 is passed then the series is not trimmed at all.
+ uncompressed:
+ Since RedisTimeSeries v1.2, both timestamps and values are compressed by default.
+ Adding this flag will keep data in an uncompressed form. Compression not only saves
+ memory but usually improve performance due to lower number of memory accesses.
+ labels:
+ Set of label-value pairs that represent metadata labels of the key.
+ chunk_size:
+ Each time-serie uses chunks of memory of fixed size for time series samples.
+ You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
+ duplicate_policy:
+ Since RedisTimeSeries v1.4 you can specify the duplicate sample policy
+ (Configure what to do on duplicate sample).
+ Can be one of:
+ - 'block': an error will occur for any out of order sample.
+ - 'first': ignore the new value.
+ - 'last': override with latest value.
+ - 'min': only override if the value is lower than the existing value.
+ - 'max': only override if the value is higher than the existing value.
+ When this is not set, the server-wide default will be used.
+ """
+ retention_msecs = kwargs.get("retention_msecs", None)
+ uncompressed = kwargs.get("uncompressed", False)
+ labels = kwargs.get("labels", {})
+ chunk_size = kwargs.get("chunk_size", None)
+ duplicate_policy = kwargs.get("duplicate_policy", None)
+ params = [key, timestamp, value]
+ self._appendRetention(params, retention_msecs)
+ self._appendUncompressed(params, uncompressed)
+ self._appendChunkSize(params, chunk_size)
+ self._appendDuplicatePolicy(params, ADD_CMD, duplicate_policy)
+ self._appendLabels(params, labels)
+
+ return self.execute_command(ADD_CMD, *params)
+
+ def madd(self, ktv_tuples):
+ """
+ Append (or create and append) a new `value` to series
+ `key` with `timestamp`.
+ Expects a list of `tuples` as (`key`,`timestamp`, `value`).
+ Return value is an array with timestamps of insertions.
+ For more information see
+ `TS.MADD <https://oss.redis.com/redistimeseries/master/commands/#tsmadd>`_. # noqa
+ """
+ params = []
+ for ktv in ktv_tuples:
+ for item in ktv:
+ params.append(item)
+
+ return self.execute_command(MADD_CMD, *params)
+
+ def incrby(self, key, value, **kwargs):
+ """
+ Increment (or create an time-series and increment) the latest
+ sample's of a series.
+ This command can be used as a counter or gauge that automatically gets
+ history as a time series.
+ For more information see
+ `TS.INCRBY <https://oss.redis.com/redistimeseries/master/commands/#tsincrbytsdecrby>`_. # noqa
+
+ Args:
+
+ key:
+ time-series key
+ value:
+ Numeric data value of the sample
+ timestamp:
+ Timestamp of the sample. None can be used for automatic timestamp (using the system clock).
+ retention_msecs:
+ Maximum age for samples compared to last event time (in milliseconds).
+ If None or 0 is passed then the series is not trimmed at all.
+ uncompressed:
+ Since RedisTimeSeries v1.2, both timestamps and values are compressed by default.
+ Adding this flag will keep data in an uncompressed form. Compression not only saves
+ memory but usually improve performance due to lower number of memory accesses.
+ labels:
+ Set of label-value pairs that represent metadata labels of the key.
+ chunk_size:
+ Each time-series uses chunks of memory of fixed size for time series samples.
+ You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
+ """
+ timestamp = kwargs.get("timestamp", None)
+ retention_msecs = kwargs.get("retention_msecs", None)
+ uncompressed = kwargs.get("uncompressed", False)
+ labels = kwargs.get("labels", {})
+ chunk_size = kwargs.get("chunk_size", None)
+ params = [key, value]
+ self._appendTimestamp(params, timestamp)
+ self._appendRetention(params, retention_msecs)
+ self._appendUncompressed(params, uncompressed)
+ self._appendChunkSize(params, chunk_size)
+ self._appendLabels(params, labels)
+
+ return self.execute_command(INCRBY_CMD, *params)
+
+ def decrby(self, key, value, **kwargs):
+ """
+ Decrement (or create an time-series and decrement) the
+ latest sample's of a series.
+ This command can be used as a counter or gauge that
+ automatically gets history as a time series.
+ For more information see
+ `TS.DECRBY <https://oss.redis.com/redistimeseries/master/commands/#tsincrbytsdecrby>`_. # noqa
+
+ Args:
+
+ key:
+ time-series key
+ value:
+ Numeric data value of the sample
+ timestamp:
+ Timestamp of the sample. None can be used for automatic
+ timestamp (using the system clock).
+ retention_msecs:
+ Maximum age for samples compared to last event time (in milliseconds).
+ If None or 0 is passed then the series is not trimmed at all.
+ uncompressed:
+ Since RedisTimeSeries v1.2, both timestamps and values are
+ compressed by default.
+ Adding this flag will keep data in an uncompressed form.
+ Compression not only saves
+ memory but usually improve performance due to lower number
+ of memory accesses.
+ labels:
+ Set of label-value pairs that represent metadata labels of the key.
+ chunk_size:
+ Each time-series uses chunks of memory of fixed size for time series samples.
+ You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
+ """
+ timestamp = kwargs.get("timestamp", None)
+ retention_msecs = kwargs.get("retention_msecs", None)
+ uncompressed = kwargs.get("uncompressed", False)
+ labels = kwargs.get("labels", {})
+ chunk_size = kwargs.get("chunk_size", None)
+ params = [key, value]
+ self._appendTimestamp(params, timestamp)
+ self._appendRetention(params, retention_msecs)
+ self._appendUncompressed(params, uncompressed)
+ self._appendChunkSize(params, chunk_size)
+ self._appendLabels(params, labels)
+
+ return self.execute_command(DECRBY_CMD, *params)
+
+ def delete(self, key, from_time, to_time):
+ """
+ Delete data points for a given timeseries and interval range
+ in the form of start and end delete timestamps.
+ The given timestamp interval is closed (inclusive), meaning start
+ and end data points will also be deleted.
+ Return the count for deleted items.
+ For more information see
+ `TS.DEL <https://oss.redis.com/redistimeseries/master/commands/#tsdel>`_. # noqa
+
+ Args:
+
+ key:
+ time-series key.
+ from_time:
+ Start timestamp for the range deletion.
+ to_time:
+ End timestamp for the range deletion.
+ """
+ return self.execute_command(DEL_CMD, key, from_time, to_time)
+
+ def createrule(
+ self,
+ source_key,
+ dest_key,
+ aggregation_type,
+ bucket_size_msec
+ ):
+ """
+ Create a compaction rule from values added to `source_key` into `dest_key`.
+ Aggregating for `bucket_size_msec` where an `aggregation_type` can be
+ [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`,
+ `std.p`, `std.s`, `var.p`, `var.s`]
+ For more information see
+ `TS.CREATERULE <https://oss.redis.com/redistimeseries/master/commands/#tscreaterule>`_. # noqa
+ """
+ params = [source_key, dest_key]
+ self._appendAggregation(params, aggregation_type, bucket_size_msec)
+
+ return self.execute_command(CREATERULE_CMD, *params)
+
+ def deleterule(self, source_key, dest_key):
+ """
+ Delete a compaction rule.
+ For more information see
+ `TS.DELETERULE <https://oss.redis.com/redistimeseries/master/commands/#tsdeleterule>`_. # noqa
+ """
+ return self.execute_command(DELETERULE_CMD, source_key, dest_key)
+
+ def __range_params(
+ self,
+ key,
+ from_time,
+ to_time,
+ count,
+ aggregation_type,
+ bucket_size_msec,
+ filter_by_ts,
+ filter_by_min_value,
+ filter_by_max_value,
+ align,
+ ):
+ """Create TS.RANGE and TS.REVRANGE arguments."""
+ params = [key, from_time, to_time]
+ self._appendFilerByTs(params, filter_by_ts)
+ self._appendFilerByValue(
+ params,
+ filter_by_min_value,
+ filter_by_max_value
+ )
+ self._appendCount(params, count)
+ self._appendAlign(params, align)
+ self._appendAggregation(params, aggregation_type, bucket_size_msec)
+
+ return params
+
+ def range(
+ self,
+ key,
+ from_time,
+ to_time,
+ count=None,
+ aggregation_type=None,
+ bucket_size_msec=0,
+ filter_by_ts=None,
+ filter_by_min_value=None,
+ filter_by_max_value=None,
+ align=None,
+ ):
+ """
+ Query a range in forward direction for a specific time-serie.
+ For more information see
+ `TS.RANGE <https://oss.redis.com/redistimeseries/master/commands/#tsrangetsrevrange>`_. # noqa
+
+ Args:
+
+ key:
+ Key name for timeseries.
+ from_time:
+ Start timestamp for the range query. - can be used to express
+ the minimum possible timestamp (0).
+ to_time:
+ End timestamp for range query, + can be used to express the
+ maximum possible timestamp.
+ count:
+ Optional maximum number of returned results.
+ aggregation_type:
+ Optional aggregation type. Can be one of
+ [`avg`, `sum`, `min`, `max`, `range`, `count`,
+ `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`]
+ bucket_size_msec:
+ Time bucket for aggregation in milliseconds.
+ filter_by_ts:
+ List of timestamps to filter the result by specific timestamps.
+ filter_by_min_value:
+ Filter result by minimum value (must mention also filter
+ by_max_value).
+ filter_by_max_value:
+ Filter result by maximum value (must mention also filter
+ by_min_value).
+ align:
+ Timestamp for alignment control for aggregation.
+ """
+ params = self.__range_params(
+ key,
+ from_time,
+ to_time,
+ count,
+ aggregation_type,
+ bucket_size_msec,
+ filter_by_ts,
+ filter_by_min_value,
+ filter_by_max_value,
+ align,
+ )
+ return self.execute_command(RANGE_CMD, *params)
+
+ def revrange(
+ self,
+ key,
+ from_time,
+ to_time,
+ count=None,
+ aggregation_type=None,
+ bucket_size_msec=0,
+ filter_by_ts=None,
+ filter_by_min_value=None,
+ filter_by_max_value=None,
+ align=None,
+ ):
+ """
+ Query a range in reverse direction for a specific time-series.
+ For more information see
+ `TS.REVRANGE <https://oss.redis.com/redistimeseries/master/commands/#tsrangetsrevrange>`_. # noqa
+
+ **Note**: This command is only available since RedisTimeSeries >= v1.4
+
+ Args:
+
+ key:
+ Key name for timeseries.
+ from_time:
+ Start timestamp for the range query. - can be used to express the minimum possible timestamp (0).
+ to_time:
+ End timestamp for range query, + can be used to express the maximum possible timestamp.
+ count:
+ Optional maximum number of returned results.
+ aggregation_type:
+ Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`,
+ `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`]
+ bucket_size_msec:
+ Time bucket for aggregation in milliseconds.
+ filter_by_ts:
+ List of timestamps to filter the result by specific timestamps.
+ filter_by_min_value:
+ Filter result by minimum value (must mention also filter_by_max_value).
+ filter_by_max_value:
+ Filter result by maximum value (must mention also filter_by_min_value).
+ align:
+ Timestamp for alignment control for aggregation.
+ """
+ params = self.__range_params(
+ key,
+ from_time,
+ to_time,
+ count,
+ aggregation_type,
+ bucket_size_msec,
+ filter_by_ts,
+ filter_by_min_value,
+ filter_by_max_value,
+ align,
+ )
+ return self.execute_command(REVRANGE_CMD, *params)
+
+ def __mrange_params(
+ self,
+ aggregation_type,
+ bucket_size_msec,
+ count,
+ filters,
+ from_time,
+ to_time,
+ with_labels,
+ filter_by_ts,
+ filter_by_min_value,
+ filter_by_max_value,
+ groupby,
+ reduce,
+ select_labels,
+ align,
+ ):
+ """Create TS.MRANGE and TS.MREVRANGE arguments."""
+ params = [from_time, to_time]
+ self._appendFilerByTs(params, filter_by_ts)
+ self._appendFilerByValue(
+ params,
+ filter_by_min_value,
+ filter_by_max_value
+ )
+ self._appendCount(params, count)
+ self._appendAlign(params, align)
+ self._appendAggregation(params, aggregation_type, bucket_size_msec)
+ self._appendWithLabels(params, with_labels, select_labels)
+ params.extend(["FILTER"])
+ params += filters
+ self._appendGroupbyReduce(params, groupby, reduce)
+ return params
+
+ def mrange(
+ self,
+ from_time,
+ to_time,
+ filters,
+ count=None,
+ aggregation_type=None,
+ bucket_size_msec=0,
+ with_labels=False,
+ filter_by_ts=None,
+ filter_by_min_value=None,
+ filter_by_max_value=None,
+ groupby=None,
+ reduce=None,
+ select_labels=None,
+ align=None,
+ ):
+ """
+ Query a range across multiple time-series by filters in forward direction.
+ For more information see
+ `TS.MRANGE <https://oss.redis.com/redistimeseries/master/commands/#tsmrangetsmrevrange>`_. # noqa
+
+ Args:
+
+ from_time:
+ Start timestamp for the range query. `-` can be used to
+ express the minimum possible timestamp (0).
+ to_time:
+ End timestamp for range query, `+` can be used to express
+ the maximum possible timestamp.
+ filters:
+ filter to match the time-series labels.
+ count:
+ Optional maximum number of returned results.
+ aggregation_type:
+ Optional aggregation type. Can be one of
+ [`avg`, `sum`, `min`, `max`, `range`, `count`,
+ `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`]
+ bucket_size_msec:
+ Time bucket for aggregation in milliseconds.
+ with_labels:
+ Include in the reply the label-value pairs that represent metadata
+ labels of the time-series.
+ If this argument is not set, by default, an empty Array will be
+ replied on the labels array position.
+ filter_by_ts:
+ List of timestamps to filter the result by specific timestamps.
+ filter_by_min_value:
+ Filter result by minimum value (must mention also
+ filter_by_max_value).
+ filter_by_max_value:
+ Filter result by maximum value (must mention also
+ filter_by_min_value).
+ groupby:
+ Grouping by fields the results (must mention also reduce).
+ reduce:
+ Applying reducer functions on each group. Can be one
+ of [`sum`, `min`, `max`].
+ select_labels:
+ Include in the reply only a subset of the key-value
+ pair labels of a series.
+ align:
+ Timestamp for alignment control for aggregation.
+ """
+ params = self.__mrange_params(
+ aggregation_type,
+ bucket_size_msec,
+ count,
+ filters,
+ from_time,
+ to_time,
+ with_labels,
+ filter_by_ts,
+ filter_by_min_value,
+ filter_by_max_value,
+ groupby,
+ reduce,
+ select_labels,
+ align,
+ )
+
+ return self.execute_command(MRANGE_CMD, *params)
+
+ def mrevrange(
+ self,
+ from_time,
+ to_time,
+ filters,
+ count=None,
+ aggregation_type=None,
+ bucket_size_msec=0,
+ with_labels=False,
+ filter_by_ts=None,
+ filter_by_min_value=None,
+ filter_by_max_value=None,
+ groupby=None,
+ reduce=None,
+ select_labels=None,
+ align=None,
+ ):
+ """
+ Query a range across multiple time-series by filters in reverse direction.
+ For more information see
+ `TS.MREVRANGE <https://oss.redis.com/redistimeseries/master/commands/#tsmrangetsmrevrange>`_. # noqa
+
+ Args:
+
+ from_time:
+ Start timestamp for the range query. - can be used to express
+ the minimum possible timestamp (0).
+ to_time:
+ End timestamp for range query, + can be used to express
+ the maximum possible timestamp.
+ filters:
+ Filter to match the time-series labels.
+ count:
+ Optional maximum number of returned results.
+ aggregation_type:
+ Optional aggregation type. Can be one of
+ [`avg`, `sum`, `min`, `max`, `range`, `count`,
+ `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`]
+ bucket_size_msec:
+ Time bucket for aggregation in milliseconds.
+ with_labels:
+ Include in the reply the label-value pairs that represent
+ metadata labels
+ of the time-series.
+ If this argument is not set, by default, an empty Array
+ will be replied
+ on the labels array position.
+ filter_by_ts:
+ List of timestamps to filter the result by specific timestamps.
+ filter_by_min_value:
+ Filter result by minimum value (must mention also filter
+ by_max_value).
+ filter_by_max_value:
+ Filter result by maximum value (must mention also filter
+ by_min_value).
+ groupby:
+ Grouping by fields the results (must mention also reduce).
+ reduce:
+ Applying reducer functions on each group. Can be one
+ of [`sum`, `min`, `max`].
+ select_labels:
+ Include in the reply only a subset of the key-value pair
+ labels of a series.
+ align:
+ Timestamp for alignment control for aggregation.
+ """
+ params = self.__mrange_params(
+ aggregation_type,
+ bucket_size_msec,
+ count,
+ filters,
+ from_time,
+ to_time,
+ with_labels,
+ filter_by_ts,
+ filter_by_min_value,
+ filter_by_max_value,
+ groupby,
+ reduce,
+ select_labels,
+ align,
+ )
+
+ return self.execute_command(MREVRANGE_CMD, *params)
+
+ def get(self, key):
+ """ # noqa
+ Get the last sample of `key`.
+ For more information see `TS.GET <https://oss.redis.com/redistimeseries/master/commands/#tsget>`_.
+ """
+ return self.execute_command(GET_CMD, key)
+
+ def mget(self, filters, with_labels=False):
+ """ # noqa
+ Get the last samples matching the specific `filter`.
+ For more information see `TS.MGET <https://oss.redis.com/redistimeseries/master/commands/#tsmget>`_.
+ """
+ params = []
+ self._appendWithLabels(params, with_labels)
+ params.extend(["FILTER"])
+ params += filters
+ return self.execute_command(MGET_CMD, *params)
+
+ def info(self, key):
+ """ # noqa
+ Get information of `key`.
+ For more information see `TS.INFO <https://oss.redis.com/redistimeseries/master/commands/#tsinfo>`_.
+ """
+ return self.execute_command(INFO_CMD, key)
+
+ def queryindex(self, filters):
+ """ # noqa
+ Get all the keys matching the `filter` list.
+ For more information see `TS.QUERYINDEX <https://oss.redis.com/redistimeseries/master/commands/#tsqueryindex>`_.
+ """
+ return self.execute_command(QUERYINDEX_CMD, *filters)
+
+ @staticmethod
+ def _appendUncompressed(params, uncompressed):
+ """Append UNCOMPRESSED tag to params."""
+ if uncompressed:
+ params.extend(["UNCOMPRESSED"])
+
+ @staticmethod
+ def _appendWithLabels(params, with_labels, select_labels=None):
+ """Append labels behavior to params."""
+ if with_labels and select_labels:
+ raise DataError(
+ "with_labels and select_labels cannot be provided together."
+ )
+
+ if with_labels:
+ params.extend(["WITHLABELS"])
+ if select_labels:
+ params.extend(["SELECTED_LABELS", *select_labels])
+
+ @staticmethod
+ def _appendGroupbyReduce(params, groupby, reduce):
+ """Append GROUPBY REDUCE property to params."""
+ if groupby is not None and reduce is not None:
+ params.extend(["GROUPBY", groupby, "REDUCE", reduce.upper()])
+
+ @staticmethod
+ def _appendRetention(params, retention):
+ """Append RETENTION property to params."""
+ if retention is not None:
+ params.extend(["RETENTION", retention])
+
+ @staticmethod
+ def _appendLabels(params, labels):
+ """Append LABELS property to params."""
+ if labels:
+ params.append("LABELS")
+ for k, v in labels.items():
+ params.extend([k, v])
+
+ @staticmethod
+ def _appendCount(params, count):
+ """Append COUNT property to params."""
+ if count is not None:
+ params.extend(["COUNT", count])
+
+ @staticmethod
+ def _appendTimestamp(params, timestamp):
+ """Append TIMESTAMP property to params."""
+ if timestamp is not None:
+ params.extend(["TIMESTAMP", timestamp])
+
+ @staticmethod
+ def _appendAlign(params, align):
+ """Append ALIGN property to params."""
+ if align is not None:
+ params.extend(["ALIGN", align])
+
+ @staticmethod
+ def _appendAggregation(params, aggregation_type, bucket_size_msec):
+ """Append AGGREGATION property to params."""
+ if aggregation_type is not None:
+ params.append("AGGREGATION")
+ params.extend([aggregation_type, bucket_size_msec])
+
+ @staticmethod
+ def _appendChunkSize(params, chunk_size):
+ """Append CHUNK_SIZE property to params."""
+ if chunk_size is not None:
+ params.extend(["CHUNK_SIZE", chunk_size])
+
+ @staticmethod
+ def _appendDuplicatePolicy(params, command, duplicate_policy):
+ """Append DUPLICATE_POLICY property to params on CREATE
+ and ON_DUPLICATE on ADD.
+ """
+ if duplicate_policy is not None:
+ if command == "TS.ADD":
+ params.extend(["ON_DUPLICATE", duplicate_policy])
+ else:
+ params.extend(["DUPLICATE_POLICY", duplicate_policy])
+
+ @staticmethod
+ def _appendFilerByTs(params, ts_list):
+ """Append FILTER_BY_TS property to params."""
+ if ts_list is not None:
+ params.extend(["FILTER_BY_TS", *ts_list])
+
+ @staticmethod
+ def _appendFilerByValue(params, min_value, max_value):
+ """Append FILTER_BY_VALUE property to params."""
+ if min_value is not None and max_value is not None:
+ params.extend(["FILTER_BY_VALUE", min_value, max_value])
diff --git a/redis/commands/timeseries/info.py b/redis/commands/timeseries/info.py
new file mode 100644
index 0000000..3b89503
--- /dev/null
+++ b/redis/commands/timeseries/info.py
@@ -0,0 +1,82 @@
+from .utils import list_to_dict
+from ..helpers import nativestr
+
+
+class TSInfo(object):
+ """
+ Hold information and statistics on the time-series.
+ Can be created using ``tsinfo`` command
+ https://oss.redis.com/redistimeseries/commands/#tsinfo.
+ """
+
+ rules = []
+ labels = []
+ sourceKey = None
+ chunk_count = None
+ memory_usage = None
+ total_samples = None
+ retention_msecs = None
+ last_time_stamp = None
+ first_time_stamp = None
+
+ max_samples_per_chunk = None
+ chunk_size = None
+ duplicate_policy = None
+
+ def __init__(self, args):
+ """
+ Hold information and statistics on the time-series.
+
+ The supported params that can be passed as args:
+
+ rules:
+ A list of compaction rules of the time series.
+ sourceKey:
+ Key name for source time series in case the current series
+ is a target of a rule.
+ chunkCount:
+ Number of Memory Chunks used for the time series.
+ memoryUsage:
+ Total number of bytes allocated for the time series.
+ totalSamples:
+ Total number of samples in the time series.
+ labels:
+ A list of label-value pairs that represent the metadata
+ labels of the time series.
+ retentionTime:
+ Retention time, in milliseconds, for the time series.
+ lastTimestamp:
+ Last timestamp present in the time series.
+ firstTimestamp:
+ First timestamp present in the time series.
+ maxSamplesPerChunk:
+ Deprecated.
+ chunkSize:
+ Amount of memory, in bytes, allocated for data.
+ duplicatePolicy:
+ Policy that will define handling of duplicate samples.
+
+ Can read more about on
+ https://oss.redis.com/redistimeseries/configuration/#duplicate_policy
+ """
+ response = dict(zip(map(nativestr, args[::2]), args[1::2]))
+ self.rules = response["rules"]
+ self.source_key = response["sourceKey"]
+ self.chunk_count = response["chunkCount"]
+ self.memory_usage = response["memoryUsage"]
+ self.total_samples = response["totalSamples"]
+ self.labels = list_to_dict(response["labels"])
+ self.retention_msecs = response["retentionTime"]
+ self.lastTimeStamp = response["lastTimestamp"]
+ self.first_time_stamp = response["firstTimestamp"]
+ if "maxSamplesPerChunk" in response:
+ self.max_samples_per_chunk = response["maxSamplesPerChunk"]
+ self.chunk_size = (
+ self.max_samples_per_chunk * 16
+ ) # backward compatible changes
+ if "chunkSize" in response:
+ self.chunk_size = response["chunkSize"]
+ if "duplicatePolicy" in response:
+ self.duplicate_policy = response["duplicatePolicy"]
+ if type(self.duplicate_policy) == bytes:
+ self.duplicate_policy = self.duplicate_policy.decode()
diff --git a/redis/commands/timeseries/utils.py b/redis/commands/timeseries/utils.py
new file mode 100644
index 0000000..c33b7c5
--- /dev/null
+++ b/redis/commands/timeseries/utils.py
@@ -0,0 +1,49 @@
+from ..helpers import nativestr
+
+
+def list_to_dict(aList):
+ return {
+ nativestr(aList[i][0]): nativestr(aList[i][1])
+ for i in range(len(aList))}
+
+
+def parse_range(response):
+ """Parse range response. Used by TS.RANGE and TS.REVRANGE."""
+ return [tuple((r[0], float(r[1]))) for r in response]
+
+
+def parse_m_range(response):
+ """Parse multi range response. Used by TS.MRANGE and TS.MREVRANGE."""
+ res = []
+ for item in response:
+ res.append(
+ {nativestr(item[0]):
+ [list_to_dict(item[1]), parse_range(item[2])]})
+ return sorted(res, key=lambda d: list(d.keys()))
+
+
+def parse_get(response):
+ """Parse get response. Used by TS.GET."""
+ if not response:
+ return None
+ return int(response[0]), float(response[1])
+
+
+def parse_m_get(response):
+ """Parse multi get response. Used by TS.MGET."""
+ res = []
+ for item in response:
+ if not item[2]:
+ res.append(
+ {nativestr(item[0]): [list_to_dict(item[1]), None, None]})
+ else:
+ res.append(
+ {
+ nativestr(item[0]): [
+ list_to_dict(item[1]),
+ int(item[2][0]),
+ float(item[2][1]),
+ ]
+ }
+ )
+ return sorted(res, key=lambda d: list(d.keys()))
diff --git a/setup.py b/setup.py
index 9788d2e..d0c81b4 100644
--- a/setup.py
+++ b/setup.py
@@ -16,6 +16,7 @@ setup(
"redis.commands",
"redis.commands.json",
"redis.commands.search",
+ "redis.commands.timeseries",
]
),
url="https://github.com/redis/redis-py",
diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py
new file mode 100644
index 0000000..b2df3fe
--- /dev/null
+++ b/tests/test_timeseries.py
@@ -0,0 +1,593 @@
+import pytest
+import time
+from time import sleep
+from .conftest import skip_ifmodversion_lt
+
+
+@pytest.fixture
+def client(modclient):
+ modclient.flushdb()
+ return modclient
+
+
+@pytest.mark.redismod
+def testCreate(client):
+ assert client.ts().create(1)
+ assert client.ts().create(2, retention_msecs=5)
+ assert client.ts().create(3, labels={"Redis": "Labs"})
+ assert client.ts().create(4, retention_msecs=20, labels={"Time": "Series"})
+ info = client.ts().info(4)
+ assert 20 == info.retention_msecs
+ assert "Series" == info.labels["Time"]
+
+ # Test for a chunk size of 128 Bytes
+ assert client.ts().create("time-serie-1", chunk_size=128)
+ info = client.ts().info("time-serie-1")
+ assert 128, info.chunk_size
+
+
+@pytest.mark.redismod
+@skip_ifmodversion_lt("1.4.0", "timeseries")
+def testCreateDuplicatePolicy(client):
+ # Test for duplicate policy
+ for duplicate_policy in ["block", "last", "first", "min", "max"]:
+ ts_name = "time-serie-ooo-{0}".format(duplicate_policy)
+ assert client.ts().create(ts_name, duplicate_policy=duplicate_policy)
+ info = client.ts().info(ts_name)
+ assert duplicate_policy == info.duplicate_policy
+
+
+@pytest.mark.redismod
+def testAlter(client):
+ assert client.ts().create(1)
+ assert 0 == client.ts().info(1).retention_msecs
+ assert client.ts().alter(1, retention_msecs=10)
+ assert {} == client.ts().info(1).labels
+ assert 10, client.ts().info(1).retention_msecs
+ assert client.ts().alter(1, labels={"Time": "Series"})
+ assert "Series" == client.ts().info(1).labels["Time"]
+ assert 10 == client.ts().info(1).retention_msecs
+
+
+# pipe = client.ts().pipeline()
+# assert pipe.create(2)
+
+
+@pytest.mark.redismod
+@skip_ifmodversion_lt("1.4.0", "timeseries")
+def testAlterDiplicatePolicy(client):
+ assert client.ts().create(1)
+ info = client.ts().info(1)
+ assert info.duplicate_policy is None
+ assert client.ts().alter(1, duplicate_policy="min")
+ info = client.ts().info(1)
+ assert "min" == info.duplicate_policy
+
+
+@pytest.mark.redismod
+def testAdd(client):
+ assert 1 == client.ts().add(1, 1, 1)
+ assert 2 == client.ts().add(2, 2, 3, retention_msecs=10)
+ assert 3 == client.ts().add(3, 3, 2, labels={"Redis": "Labs"})
+ assert 4 == client.ts().add(
+ 4, 4, 2, retention_msecs=10, labels={"Redis": "Labs", "Time": "Series"}
+ )
+ assert round(time.time()) == \
+ round(float(client.ts().add(5, "*", 1)) / 1000)
+
+ info = client.ts().info(4)
+ assert 10 == info.retention_msecs
+ assert "Labs" == info.labels["Redis"]
+
+ # Test for a chunk size of 128 Bytes on TS.ADD
+ assert client.ts().add("time-serie-1", 1, 10.0, chunk_size=128)
+ info = client.ts().info("time-serie-1")
+ assert 128 == info.chunk_size
+
+
+@pytest.mark.redismod
+@skip_ifmodversion_lt("1.4.0", "timeseries")
+def testAddDuplicatePolicy(client):
+
+ # Test for duplicate policy BLOCK
+ assert 1 == client.ts().add("time-serie-add-ooo-block", 1, 5.0)
+ with pytest.raises(Exception):
+ client.ts().add(
+ "time-serie-add-ooo-block",
+ 1,
+ 5.0,
+ duplicate_policy="block"
+ )
+
+ # Test for duplicate policy LAST
+ assert 1 == client.ts().add("time-serie-add-ooo-last", 1, 5.0)
+ assert 1 == client.ts().add(
+ "time-serie-add-ooo-last", 1, 10.0, duplicate_policy="last"
+ )
+ assert 10.0 == client.ts().get("time-serie-add-ooo-last")[1]
+
+ # Test for duplicate policy FIRST
+ assert 1 == client.ts().add("time-serie-add-ooo-first", 1, 5.0)
+ assert 1 == client.ts().add(
+ "time-serie-add-ooo-first", 1, 10.0, duplicate_policy="first"
+ )
+ assert 5.0 == client.ts().get("time-serie-add-ooo-first")[1]
+
+ # Test for duplicate policy MAX
+ assert 1 == client.ts().add("time-serie-add-ooo-max", 1, 5.0)
+ assert 1 == client.ts().add(
+ "time-serie-add-ooo-max", 1, 10.0, duplicate_policy="max"
+ )
+ assert 10.0 == client.ts().get("time-serie-add-ooo-max")[1]
+
+ # Test for duplicate policy MIN
+ assert 1 == client.ts().add("time-serie-add-ooo-min", 1, 5.0)
+ assert 1 == client.ts().add(
+ "time-serie-add-ooo-min", 1, 10.0, duplicate_policy="min"
+ )
+ assert 5.0 == client.ts().get("time-serie-add-ooo-min")[1]
+
+
+@pytest.mark.redismod
+def testMAdd(client):
+ client.ts().create("a")
+ assert [1, 2, 3] == \
+ client.ts().madd([("a", 1, 5), ("a", 2, 10), ("a", 3, 15)])
+
+
+@pytest.mark.redismod
+def testIncrbyDecrby(client):
+ for _ in range(100):
+ assert client.ts().incrby(1, 1)
+ sleep(0.001)
+ assert 100 == client.ts().get(1)[1]
+ for _ in range(100):
+ assert client.ts().decrby(1, 1)
+ sleep(0.001)
+ assert 0 == client.ts().get(1)[1]
+
+ assert client.ts().incrby(2, 1.5, timestamp=5)
+ assert (5, 1.5) == client.ts().get(2)
+ assert client.ts().incrby(2, 2.25, timestamp=7)
+ assert (7, 3.75) == client.ts().get(2)
+ assert client.ts().decrby(2, 1.5, timestamp=15)
+ assert (15, 2.25) == client.ts().get(2)
+
+ # Test for a chunk size of 128 Bytes on TS.INCRBY
+ assert client.ts().incrby("time-serie-1", 10, chunk_size=128)
+ info = client.ts().info("time-serie-1")
+ assert 128 == info.chunk_size
+
+ # Test for a chunk size of 128 Bytes on TS.DECRBY
+ assert client.ts().decrby("time-serie-2", 10, chunk_size=128)
+ info = client.ts().info("time-serie-2")
+ assert 128 == info.chunk_size
+
+
+@pytest.mark.redismod
+def testCreateAndDeleteRule(client):
+ # test rule creation
+ time = 100
+ client.ts().create(1)
+ client.ts().create(2)
+ client.ts().createrule(1, 2, "avg", 100)
+ for i in range(50):
+ client.ts().add(1, time + i * 2, 1)
+ client.ts().add(1, time + i * 2 + 1, 2)
+ client.ts().add(1, time * 2, 1.5)
+ assert round(client.ts().get(2)[1], 5) == 1.5
+ info = client.ts().info(1)
+ assert info.rules[0][1] == 100
+
+ # test rule deletion
+ client.ts().deleterule(1, 2)
+ info = client.ts().info(1)
+ assert not info.rules
+
+
+@pytest.mark.redismod
+@skip_ifmodversion_lt("99.99.99", "timeseries")
+def testDelRange(client):
+ try:
+ client.ts().delete("test", 0, 100)
+ except Exception as e:
+ assert e.__str__() != ""
+
+ for i in range(100):
+ client.ts().add(1, i, i % 7)
+ assert 22 == client.ts().delete(1, 0, 21)
+ assert [] == client.ts().range(1, 0, 21)
+ assert [(22, 1.0)] == client.ts().range(1, 22, 22)
+
+
+@pytest.mark.redismod
+def testRange(client):
+ for i in range(100):
+ client.ts().add(1, i, i % 7)
+ assert 100 == len(client.ts().range(1, 0, 200))
+ for i in range(100):
+ client.ts().add(1, i + 200, i % 7)
+ assert 200 == len(client.ts().range(1, 0, 500))
+ # last sample isn't returned
+ assert 20 == len(
+ client.ts().range(
+ 1,
+ 0,
+ 500,
+ aggregation_type="avg",
+ bucket_size_msec=10
+ )
+ )
+ assert 10 == len(client.ts().range(1, 0, 500, count=10))
+
+
+@pytest.mark.redismod
+@skip_ifmodversion_lt("99.99.99", "timeseries")
+def testRangeAdvanced(client):
+ for i in range(100):
+ client.ts().add(1, i, i % 7)
+ client.ts().add(1, i + 200, i % 7)
+
+ assert 2 == len(
+ client.ts().range(
+ 1,
+ 0,
+ 500,
+ filter_by_ts=[i for i in range(10, 20)],
+ filter_by_min_value=1,
+ filter_by_max_value=2,
+ )
+ )
+ assert [(0, 10.0), (10, 1.0)] == client.ts().range(
+ 1, 0, 10, aggregation_type="count", bucket_size_msec=10, align="+"
+ )
+ assert [(-5, 5.0), (5, 6.0)] == client.ts().range(
+ 1, 0, 10, aggregation_type="count", bucket_size_msec=10, align=5
+ )
+
+
+@pytest.mark.redismod
+@skip_ifmodversion_lt("99.99.99", "timeseries")
+def testRevRange(client):
+ for i in range(100):
+ client.ts().add(1, i, i % 7)
+ assert 100 == len(client.ts().range(1, 0, 200))
+ for i in range(100):
+ client.ts().add(1, i + 200, i % 7)
+ assert 200 == len(client.ts().range(1, 0, 500))
+ # first sample isn't returned
+ assert 20 == len(
+ client.ts().revrange(
+ 1,
+ 0,
+ 500,
+ aggregation_type="avg",
+ bucket_size_msec=10
+ )
+ )
+ assert 10 == len(client.ts().revrange(1, 0, 500, count=10))
+ assert 2 == len(
+ client.ts().revrange(
+ 1,
+ 0,
+ 500,
+ filter_by_ts=[i for i in range(10, 20)],
+ filter_by_min_value=1,
+ filter_by_max_value=2,
+ )
+ )
+ assert [(10, 1.0), (0, 10.0)] == client.ts().revrange(
+ 1, 0, 10, aggregation_type="count", bucket_size_msec=10, align="+"
+ )
+ assert [(1, 10.0), (-9, 1.0)] == client.ts().revrange(
+ 1, 0, 10, aggregation_type="count", bucket_size_msec=10, align=1
+ )
+
+
+@pytest.mark.redismod
+def testMultiRange(client):
+ client.ts().create(1, labels={"Test": "This", "team": "ny"})
+ client.ts().create(
+ 2,
+ labels={"Test": "This", "Taste": "That", "team": "sf"}
+ )
+ for i in range(100):
+ client.ts().add(1, i, i % 7)
+ client.ts().add(2, i, i % 11)
+
+ res = client.ts().mrange(0, 200, filters=["Test=This"])
+ assert 2 == len(res)
+ assert 100 == len(res[0]["1"][1])
+
+ res = client.ts().mrange(0, 200, filters=["Test=This"], count=10)
+ assert 10 == len(res[0]["1"][1])
+
+ for i in range(100):
+ client.ts().add(1, i + 200, i % 7)
+ res = client.ts().mrange(
+ 0,
+ 500,
+ filters=["Test=This"],
+ aggregation_type="avg",
+ bucket_size_msec=10
+ )
+ assert 2 == len(res)
+ assert 20 == len(res[0]["1"][1])
+
+ # test withlabels
+ assert {} == res[0]["1"][0]
+ res = client.ts().mrange(0, 200, filters=["Test=This"], with_labels=True)
+ assert {"Test": "This", "team": "ny"} == res[0]["1"][0]
+
+
+@pytest.mark.redismod
+@skip_ifmodversion_lt("99.99.99", "timeseries")
+def testMultiRangeAdvanced(client):
+ client.ts().create(1, labels={"Test": "This", "team": "ny"})
+ client.ts().create(
+ 2,
+ labels={"Test": "This", "Taste": "That", "team": "sf"}
+ )
+ for i in range(100):
+ client.ts().add(1, i, i % 7)
+ client.ts().add(2, i, i % 11)
+
+ # test with selected labels
+ res = client.ts().mrange(
+ 0,
+ 200,
+ filters=["Test=This"],
+ select_labels=["team"]
+ )
+ assert {"team": "ny"} == res[0]["1"][0]
+ assert {"team": "sf"} == res[1]["2"][0]
+
+ # test with filterby
+ res = client.ts().mrange(
+ 0,
+ 200,
+ filters=["Test=This"],
+ filter_by_ts=[i for i in range(10, 20)],
+ filter_by_min_value=1,
+ filter_by_max_value=2,
+ )
+ assert [(15, 1.0), (16, 2.0)] == res[0]["1"][1]
+
+ # test groupby
+ res = client.ts().mrange(
+ 0,
+ 3,
+ filters=["Test=This"],
+ groupby="Test",
+ reduce="sum"
+ )
+ assert [(0, 0.0), (1, 2.0), (2, 4.0), (3, 6.0)] == res[0]["Test=This"][1]
+ res = client.ts().mrange(
+ 0,
+ 3,
+ filters=["Test=This"],
+ groupby="Test",
+ reduce="max"
+ )
+ assert [(0, 0.0), (1, 1.0), (2, 2.0), (3, 3.0)] == res[0]["Test=This"][1]
+ res = client.ts().mrange(
+ 0,
+ 3,
+ filters=["Test=This"],
+ groupby="team",
+ reduce="min")
+ assert 2 == len(res)
+ assert [(0, 0.0), (1, 1.0), (2, 2.0), (3, 3.0)] == res[0]["team=ny"][1]
+ assert [(0, 0.0), (1, 1.0), (2, 2.0), (3, 3.0)] == res[1]["team=sf"][1]
+
+ # test align
+ res = client.ts().mrange(
+ 0,
+ 10,
+ filters=["team=ny"],
+ aggregation_type="count",
+ bucket_size_msec=10,
+ align="-",
+ )
+ assert [(0, 10.0), (10, 1.0)] == res[0]["1"][1]
+ res = client.ts().mrange(
+ 0,
+ 10,
+ filters=["team=ny"],
+ aggregation_type="count",
+ bucket_size_msec=10,
+ align=5,
+ )
+ assert [(-5, 5.0), (5, 6.0)] == res[0]["1"][1]
+
+
+@pytest.mark.redismod
+@skip_ifmodversion_lt("99.99.99", "timeseries")
+def testMultiReverseRange(client):
+ client.ts().create(1, labels={"Test": "This", "team": "ny"})
+ client.ts().create(
+ 2,
+ labels={"Test": "This", "Taste": "That", "team": "sf"}
+ )
+ for i in range(100):
+ client.ts().add(1, i, i % 7)
+ client.ts().add(2, i, i % 11)
+
+ res = client.ts().mrange(0, 200, filters=["Test=This"])
+ assert 2 == len(res)
+ assert 100 == len(res[0]["1"][1])
+
+ res = client.ts().mrange(0, 200, filters=["Test=This"], count=10)
+ assert 10 == len(res[0]["1"][1])
+
+ for i in range(100):
+ client.ts().add(1, i + 200, i % 7)
+ res = client.ts().mrevrange(
+ 0,
+ 500,
+ filters=["Test=This"],
+ aggregation_type="avg",
+ bucket_size_msec=10
+ )
+ assert 2 == len(res)
+ assert 20 == len(res[0]["1"][1])
+ assert {} == res[0]["1"][0]
+
+ # test withlabels
+ res = client.ts().mrevrange(
+ 0,
+ 200,
+ filters=["Test=This"],
+ with_labels=True
+ )
+ assert {"Test": "This", "team": "ny"} == res[0]["1"][0]
+
+ # test with selected labels
+ res = client.ts().mrevrange(
+ 0,
+ 200,
+ filters=["Test=This"], select_labels=["team"]
+ )
+ assert {"team": "ny"} == res[0]["1"][0]
+ assert {"team": "sf"} == res[1]["2"][0]
+
+ # test filterby
+ res = client.ts().mrevrange(
+ 0,
+ 200,
+ filters=["Test=This"],
+ filter_by_ts=[i for i in range(10, 20)],
+ filter_by_min_value=1,
+ filter_by_max_value=2,
+ )
+ assert [(16, 2.0), (15, 1.0)] == res[0]["1"][1]
+
+ # test groupby
+ res = client.ts().mrevrange(
+ 0, 3, filters=["Test=This"], groupby="Test", reduce="sum"
+ )
+ assert [(3, 6.0), (2, 4.0), (1, 2.0), (0, 0.0)] == res[0]["Test=This"][1]
+ res = client.ts().mrevrange(
+ 0, 3, filters=["Test=This"], groupby="Test", reduce="max"
+ )
+ assert [(3, 3.0), (2, 2.0), (1, 1.0), (0, 0.0)] == res[0]["Test=This"][1]
+ res = client.ts().mrevrange(
+ 0, 3, filters=["Test=This"], groupby="team", reduce="min"
+ )
+ assert 2 == len(res)
+ assert [(3, 3.0), (2, 2.0), (1, 1.0), (0, 0.0)] == res[0]["team=ny"][1]
+ assert [(3, 3.0), (2, 2.0), (1, 1.0), (0, 0.0)] == res[1]["team=sf"][1]
+
+ # test align
+ res = client.ts().mrevrange(
+ 0,
+ 10,
+ filters=["team=ny"],
+ aggregation_type="count",
+ bucket_size_msec=10,
+ align="-",
+ )
+ assert [(10, 1.0), (0, 10.0)] == res[0]["1"][1]
+ res = client.ts().mrevrange(
+ 0,
+ 10,
+ filters=["team=ny"],
+ aggregation_type="count",
+ bucket_size_msec=10,
+ align=1,
+ )
+ assert [(1, 10.0), (-9, 1.0)] == res[0]["1"][1]
+
+
+@pytest.mark.redismod
+def testGet(client):
+ name = "test"
+ client.ts().create(name)
+ assert client.ts().get(name) is None
+ client.ts().add(name, 2, 3)
+ assert 2 == client.ts().get(name)[0]
+ client.ts().add(name, 3, 4)
+ assert 4 == client.ts().get(name)[1]
+
+
+@pytest.mark.redismod
+def testMGet(client):
+ client.ts().create(1, labels={"Test": "This"})
+ client.ts().create(2, labels={"Test": "This", "Taste": "That"})
+ act_res = client.ts().mget(["Test=This"])
+ exp_res = [{"1": [{}, None, None]}, {"2": [{}, None, None]}]
+ assert act_res == exp_res
+ client.ts().add(1, "*", 15)
+ client.ts().add(2, "*", 25)
+ res = client.ts().mget(["Test=This"])
+ assert 15 == res[0]["1"][2]
+ assert 25 == res[1]["2"][2]
+ res = client.ts().mget(["Taste=That"])
+ assert 25 == res[0]["2"][2]
+
+ # test with_labels
+ assert {} == res[0]["2"][0]
+ res = client.ts().mget(["Taste=That"], with_labels=True)
+ assert {"Taste": "That", "Test": "This"} == res[0]["2"][0]
+
+
+@pytest.mark.redismod
+def testInfo(client):
+ client.ts().create(
+ 1,
+ retention_msecs=5,
+ labels={"currentLabel": "currentData"}
+ )
+ info = client.ts().info(1)
+ assert 5 == info.retention_msecs
+ assert info.labels["currentLabel"] == "currentData"
+
+
+@pytest.mark.redismod
+@skip_ifmodversion_lt("1.4.0", "timeseries")
+def testInfoDuplicatePolicy(client):
+ client.ts().create(
+ 1,
+ retention_msecs=5,
+ labels={"currentLabel": "currentData"}
+ )
+ info = client.ts().info(1)
+ assert info.duplicate_policy is None
+
+ client.ts().create("time-serie-2", duplicate_policy="min")
+ info = client.ts().info("time-serie-2")
+ assert "min" == info.duplicate_policy
+
+
+@pytest.mark.redismod
+def testQueryIndex(client):
+ client.ts().create(1, labels={"Test": "This"})
+ client.ts().create(2, labels={"Test": "This", "Taste": "That"})
+ assert 2 == len(client.ts().queryindex(["Test=This"]))
+ assert 1 == len(client.ts().queryindex(["Taste=That"]))
+ assert [2] == client.ts().queryindex(["Taste=That"])
+
+
+#
+# @pytest.mark.redismod
+# @pytest.mark.pipeline
+# def testPipeline(client):
+# pipeline = client.ts().pipeline()
+# pipeline.create("with_pipeline")
+# for i in range(100):
+# pipeline.add("with_pipeline", i, 1.1 * i)
+# pipeline.execute()
+
+# info = client.ts().info("with_pipeline")
+# assert info.lastTimeStamp == 99
+# assert info.total_samples == 100
+# assert client.ts().get("with_pipeline")[1] == 99 * 1.1
+
+
+@pytest.mark.redismod
+def testUncompressed(client):
+ client.ts().create("compressed")
+ client.ts().create("uncompressed", uncompressed=True)
+ compressed_info = client.ts().info("compressed")
+ uncompressed_info = client.ts().info("uncompressed")
+ assert compressed_info.memory_usage != uncompressed_info.memory_usage