summaryrefslogtreecommitdiff
path: root/redis
diff options
context:
space:
mode:
authorChayim <chayim@users.noreply.github.com>2021-10-28 09:57:03 +0300
committerGitHub <noreply@github.com>2021-10-28 09:57:03 +0300
commiteaa56b7d721182541bab087a6d61304c778f7ea9 (patch)
tree9ff017a9ef6f4fc9da152951a9939ef5297f8969 /redis
parent1f11f8c4ecbc2227c28077b9e9764e543c38d0a5 (diff)
downloadredis-py-eaa56b7d721182541bab087a6d61304c778f7ea9.tar.gz
redis timeseries support (#1652)
Diffstat (limited to 'redis')
-rw-r--r--redis/commands/helpers.py14
-rw-r--r--redis/commands/json/__init__.py4
-rw-r--r--redis/commands/redismodules.py16
-rw-r--r--redis/commands/timeseries/__init__.py61
-rw-r--r--redis/commands/timeseries/commands.py775
-rw-r--r--redis/commands/timeseries/info.py82
-rw-r--r--redis/commands/timeseries/utils.py49
7 files changed, 996 insertions, 5 deletions
diff --git a/redis/commands/helpers.py b/redis/commands/helpers.py
index b012621..a92c025 100644
--- a/redis/commands/helpers.py
+++ b/redis/commands/helpers.py
@@ -23,3 +23,17 @@ def nativestr(x):
def delist(x):
"""Given a list of binaries, return the stringified version."""
return [nativestr(obj) for obj in x]
+
+
+def parse_to_list(response):
+ """Optimistally parse the response to a list.
+ """
+ res = []
+ for item in response:
+ try:
+ res.append(int(item))
+ except ValueError:
+ res.append(nativestr(item))
+ except TypeError:
+ res.append(None)
+ return res
diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py
index 2e26de3..9783705 100644
--- a/redis/commands/json/__init__.py
+++ b/redis/commands/json/__init__.py
@@ -1,12 +1,8 @@
-# from typing import Optional
from json import JSONDecoder, JSONEncoder
-# # from redis.client import Redis
-
from .helpers import bulk_of_jsons
from ..helpers import nativestr, delist
from .commands import JSONCommands
-# from ..feature import AbstractFeature
class JSON(JSONCommands):
diff --git a/redis/commands/redismodules.py b/redis/commands/redismodules.py
index 3ecce29..457a69e 100644
--- a/redis/commands/redismodules.py
+++ b/redis/commands/redismodules.py
@@ -27,8 +27,22 @@ class RedisModuleCommands:
try:
modversion = self.loaded_modules['search']
except IndexError:
- raise ModuleError("rejson is not a loaded in the redis instance.")
+ raise ModuleError("search is not a loaded in the redis instance.")
from .search import Search
s = Search(client=self, version=modversion, index_name=index_name)
return s
+
+ def ts(self, index_name="idx"):
+ """Access the timeseries namespace, providing support for
+ redis timeseries data.
+ """
+ try:
+ modversion = self.loaded_modules['timeseries']
+ except IndexError:
+ raise ModuleError("timeseries is not a loaded in "
+ "the redis instance.")
+
+ from .timeseries import TimeSeries
+ s = TimeSeries(client=self, version=modversion, index_name=index_name)
+ return s
diff --git a/redis/commands/timeseries/__init__.py b/redis/commands/timeseries/__init__.py
new file mode 100644
index 0000000..db9c3a5
--- /dev/null
+++ b/redis/commands/timeseries/__init__.py
@@ -0,0 +1,61 @@
+from redis.client import bool_ok
+
+from .utils import (
+ parse_range,
+ parse_get,
+ parse_m_range,
+ parse_m_get,
+)
+from .info import TSInfo
+from ..helpers import parse_to_list
+from .commands import (
+ ALTER_CMD,
+ CREATE_CMD,
+ CREATERULE_CMD,
+ DELETERULE_CMD,
+ DEL_CMD,
+ GET_CMD,
+ INFO_CMD,
+ MGET_CMD,
+ MRANGE_CMD,
+ MREVRANGE_CMD,
+ QUERYINDEX_CMD,
+ RANGE_CMD,
+ REVRANGE_CMD,
+ TimeSeriesCommands,
+)
+
+
+class TimeSeries(TimeSeriesCommands):
+ """
+ This class subclasses redis-py's `Redis` and implements RedisTimeSeries's
+ commands (prefixed with "ts").
+ The client allows to interact with RedisTimeSeries and use all of it's
+ functionality.
+ """
+
+ def __init__(self, client=None, version=None, **kwargs):
+ """Create a new RedisTimeSeries client."""
+ # Set the module commands' callbacks
+ MODULE_CALLBACKS = {
+ CREATE_CMD: bool_ok,
+ ALTER_CMD: bool_ok,
+ CREATERULE_CMD: bool_ok,
+ DEL_CMD: int,
+ DELETERULE_CMD: bool_ok,
+ RANGE_CMD: parse_range,
+ REVRANGE_CMD: parse_range,
+ MRANGE_CMD: parse_m_range,
+ MREVRANGE_CMD: parse_m_range,
+ GET_CMD: parse_get,
+ MGET_CMD: parse_m_get,
+ INFO_CMD: TSInfo,
+ QUERYINDEX_CMD: parse_to_list,
+ }
+
+ self.client = client
+ self.execute_command = client.execute_command
+ self.MODULE_VERSION = version
+
+ for k in MODULE_CALLBACKS:
+ self.client.set_response_callback(k, MODULE_CALLBACKS[k])
diff --git a/redis/commands/timeseries/commands.py b/redis/commands/timeseries/commands.py
new file mode 100644
index 0000000..3b9ee0f
--- /dev/null
+++ b/redis/commands/timeseries/commands.py
@@ -0,0 +1,775 @@
+from redis.exceptions import DataError
+
+
+ADD_CMD = "TS.ADD"
+ALTER_CMD = "TS.ALTER"
+CREATERULE_CMD = "TS.CREATERULE"
+CREATE_CMD = "TS.CREATE"
+DECRBY_CMD = "TS.DECRBY"
+DELETERULE_CMD = "TS.DELETERULE"
+DEL_CMD = "TS.DEL"
+GET_CMD = "TS.GET"
+INCRBY_CMD = "TS.INCRBY"
+INFO_CMD = "TS.INFO"
+MADD_CMD = "TS.MADD"
+MGET_CMD = "TS.MGET"
+MRANGE_CMD = "TS.MRANGE"
+MREVRANGE_CMD = "TS.MREVRANGE"
+QUERYINDEX_CMD = "TS.QUERYINDEX"
+RANGE_CMD = "TS.RANGE"
+REVRANGE_CMD = "TS.REVRANGE"
+
+
+class TimeSeriesCommands:
+ """RedisTimeSeries Commands."""
+
+ def create(self, key, **kwargs):
+ """
+ Create a new time-series.
+ For more information see
+ `TS.CREATE <https://oss.redis.com/redistimeseries/master/commands/#tscreate>`_. # noqa
+
+ Args:
+
+ key:
+ time-series key
+ retention_msecs:
+ Maximum age for samples compared to last event time (in milliseconds).
+ If None or 0 is passed then the series is not trimmed at all.
+ uncompressed:
+ Since RedisTimeSeries v1.2, both timestamps and values are
+ compressed by default.
+ Adding this flag will keep data in an uncompressed form.
+ Compression not only saves
+ memory but usually improve performance due to lower number
+ of memory accesses.
+ labels:
+ Set of label-value pairs that represent metadata labels of the key.
+ chunk_size:
+ Each time-serie uses chunks of memory of fixed size for
+ time series samples.
+ You can alter the default TSDB chunk size by passing the
+ chunk_size argument (in Bytes).
+ duplicate_policy:
+ Since RedisTimeSeries v1.4 you can specify the duplicate sample policy
+ ( Configure what to do on duplicate sample. )
+ Can be one of:
+ - 'block': an error will occur for any out of order sample.
+ - 'first': ignore the new value.
+ - 'last': override with latest value.
+ - 'min': only override if the value is lower than the existing value.
+ - 'max': only override if the value is higher than the existing value.
+ When this is not set, the server-wide default will be used.
+ """
+ retention_msecs = kwargs.get("retention_msecs", None)
+ uncompressed = kwargs.get("uncompressed", False)
+ labels = kwargs.get("labels", {})
+ chunk_size = kwargs.get("chunk_size", None)
+ duplicate_policy = kwargs.get("duplicate_policy", None)
+ params = [key]
+ self._appendRetention(params, retention_msecs)
+ self._appendUncompressed(params, uncompressed)
+ self._appendChunkSize(params, chunk_size)
+ self._appendDuplicatePolicy(params, CREATE_CMD, duplicate_policy)
+ self._appendLabels(params, labels)
+
+ return self.execute_command(CREATE_CMD, *params)
+
+ def alter(self, key, **kwargs):
+ """
+ Update the retention, labels of an existing key.
+ For more information see
+ `TS.ALTER <https://oss.redis.com/redistimeseries/master/commands/#tsalter>`_. # noqa
+
+ The parameters are the same as TS.CREATE.
+ """
+ retention_msecs = kwargs.get("retention_msecs", None)
+ labels = kwargs.get("labels", {})
+ duplicate_policy = kwargs.get("duplicate_policy", None)
+ params = [key]
+ self._appendRetention(params, retention_msecs)
+ self._appendDuplicatePolicy(params, ALTER_CMD, duplicate_policy)
+ self._appendLabels(params, labels)
+
+ return self.execute_command(ALTER_CMD, *params)
+
+ def add(self, key, timestamp, value, **kwargs):
+ """
+ Append (or create and append) a new sample to the series.
+ For more information see
+ `TS.ADD <https://oss.redis.com/redistimeseries/master/commands/#tsadd>`_. # noqa
+
+ Args:
+
+ key:
+ time-series key
+ timestamp:
+ Timestamp of the sample. * can be used for automatic timestamp (using the system clock).
+ value:
+ Numeric data value of the sample
+ retention_msecs:
+ Maximum age for samples compared to last event time (in milliseconds).
+ If None or 0 is passed then the series is not trimmed at all.
+ uncompressed:
+ Since RedisTimeSeries v1.2, both timestamps and values are compressed by default.
+ Adding this flag will keep data in an uncompressed form. Compression not only saves
+ memory but usually improve performance due to lower number of memory accesses.
+ labels:
+ Set of label-value pairs that represent metadata labels of the key.
+ chunk_size:
+ Each time-serie uses chunks of memory of fixed size for time series samples.
+ You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
+ duplicate_policy:
+ Since RedisTimeSeries v1.4 you can specify the duplicate sample policy
+ (Configure what to do on duplicate sample).
+ Can be one of:
+ - 'block': an error will occur for any out of order sample.
+ - 'first': ignore the new value.
+ - 'last': override with latest value.
+ - 'min': only override if the value is lower than the existing value.
+ - 'max': only override if the value is higher than the existing value.
+ When this is not set, the server-wide default will be used.
+ """
+ retention_msecs = kwargs.get("retention_msecs", None)
+ uncompressed = kwargs.get("uncompressed", False)
+ labels = kwargs.get("labels", {})
+ chunk_size = kwargs.get("chunk_size", None)
+ duplicate_policy = kwargs.get("duplicate_policy", None)
+ params = [key, timestamp, value]
+ self._appendRetention(params, retention_msecs)
+ self._appendUncompressed(params, uncompressed)
+ self._appendChunkSize(params, chunk_size)
+ self._appendDuplicatePolicy(params, ADD_CMD, duplicate_policy)
+ self._appendLabels(params, labels)
+
+ return self.execute_command(ADD_CMD, *params)
+
+ def madd(self, ktv_tuples):
+ """
+ Append (or create and append) a new `value` to series
+ `key` with `timestamp`.
+ Expects a list of `tuples` as (`key`,`timestamp`, `value`).
+ Return value is an array with timestamps of insertions.
+ For more information see
+ `TS.MADD <https://oss.redis.com/redistimeseries/master/commands/#tsmadd>`_. # noqa
+ """
+ params = []
+ for ktv in ktv_tuples:
+ for item in ktv:
+ params.append(item)
+
+ return self.execute_command(MADD_CMD, *params)
+
+ def incrby(self, key, value, **kwargs):
+ """
+ Increment (or create an time-series and increment) the latest
+ sample's of a series.
+ This command can be used as a counter or gauge that automatically gets
+ history as a time series.
+ For more information see
+ `TS.INCRBY <https://oss.redis.com/redistimeseries/master/commands/#tsincrbytsdecrby>`_. # noqa
+
+ Args:
+
+ key:
+ time-series key
+ value:
+ Numeric data value of the sample
+ timestamp:
+ Timestamp of the sample. None can be used for automatic timestamp (using the system clock).
+ retention_msecs:
+ Maximum age for samples compared to last event time (in milliseconds).
+ If None or 0 is passed then the series is not trimmed at all.
+ uncompressed:
+ Since RedisTimeSeries v1.2, both timestamps and values are compressed by default.
+ Adding this flag will keep data in an uncompressed form. Compression not only saves
+ memory but usually improve performance due to lower number of memory accesses.
+ labels:
+ Set of label-value pairs that represent metadata labels of the key.
+ chunk_size:
+ Each time-series uses chunks of memory of fixed size for time series samples.
+ You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
+ """
+ timestamp = kwargs.get("timestamp", None)
+ retention_msecs = kwargs.get("retention_msecs", None)
+ uncompressed = kwargs.get("uncompressed", False)
+ labels = kwargs.get("labels", {})
+ chunk_size = kwargs.get("chunk_size", None)
+ params = [key, value]
+ self._appendTimestamp(params, timestamp)
+ self._appendRetention(params, retention_msecs)
+ self._appendUncompressed(params, uncompressed)
+ self._appendChunkSize(params, chunk_size)
+ self._appendLabels(params, labels)
+
+ return self.execute_command(INCRBY_CMD, *params)
+
+ def decrby(self, key, value, **kwargs):
+ """
+ Decrement (or create an time-series and decrement) the
+ latest sample's of a series.
+ This command can be used as a counter or gauge that
+ automatically gets history as a time series.
+ For more information see
+ `TS.DECRBY <https://oss.redis.com/redistimeseries/master/commands/#tsincrbytsdecrby>`_. # noqa
+
+ Args:
+
+ key:
+ time-series key
+ value:
+ Numeric data value of the sample
+ timestamp:
+ Timestamp of the sample. None can be used for automatic
+ timestamp (using the system clock).
+ retention_msecs:
+ Maximum age for samples compared to last event time (in milliseconds).
+ If None or 0 is passed then the series is not trimmed at all.
+ uncompressed:
+ Since RedisTimeSeries v1.2, both timestamps and values are
+ compressed by default.
+ Adding this flag will keep data in an uncompressed form.
+ Compression not only saves
+ memory but usually improve performance due to lower number
+ of memory accesses.
+ labels:
+ Set of label-value pairs that represent metadata labels of the key.
+ chunk_size:
+ Each time-series uses chunks of memory of fixed size for time series samples.
+ You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
+ """
+ timestamp = kwargs.get("timestamp", None)
+ retention_msecs = kwargs.get("retention_msecs", None)
+ uncompressed = kwargs.get("uncompressed", False)
+ labels = kwargs.get("labels", {})
+ chunk_size = kwargs.get("chunk_size", None)
+ params = [key, value]
+ self._appendTimestamp(params, timestamp)
+ self._appendRetention(params, retention_msecs)
+ self._appendUncompressed(params, uncompressed)
+ self._appendChunkSize(params, chunk_size)
+ self._appendLabels(params, labels)
+
+ return self.execute_command(DECRBY_CMD, *params)
+
+ def delete(self, key, from_time, to_time):
+ """
+ Delete data points for a given timeseries and interval range
+ in the form of start and end delete timestamps.
+ The given timestamp interval is closed (inclusive), meaning start
+ and end data points will also be deleted.
+ Return the count for deleted items.
+ For more information see
+ `TS.DEL <https://oss.redis.com/redistimeseries/master/commands/#tsdel>`_. # noqa
+
+ Args:
+
+ key:
+ time-series key.
+ from_time:
+ Start timestamp for the range deletion.
+ to_time:
+ End timestamp for the range deletion.
+ """
+ return self.execute_command(DEL_CMD, key, from_time, to_time)
+
+ def createrule(
+ self,
+ source_key,
+ dest_key,
+ aggregation_type,
+ bucket_size_msec
+ ):
+ """
+ Create a compaction rule from values added to `source_key` into `dest_key`.
+ Aggregating for `bucket_size_msec` where an `aggregation_type` can be
+ [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`,
+ `std.p`, `std.s`, `var.p`, `var.s`]
+ For more information see
+ `TS.CREATERULE <https://oss.redis.com/redistimeseries/master/commands/#tscreaterule>`_. # noqa
+ """
+ params = [source_key, dest_key]
+ self._appendAggregation(params, aggregation_type, bucket_size_msec)
+
+ return self.execute_command(CREATERULE_CMD, *params)
+
+ def deleterule(self, source_key, dest_key):
+ """
+ Delete a compaction rule.
+ For more information see
+ `TS.DELETERULE <https://oss.redis.com/redistimeseries/master/commands/#tsdeleterule>`_. # noqa
+ """
+ return self.execute_command(DELETERULE_CMD, source_key, dest_key)
+
+ def __range_params(
+ self,
+ key,
+ from_time,
+ to_time,
+ count,
+ aggregation_type,
+ bucket_size_msec,
+ filter_by_ts,
+ filter_by_min_value,
+ filter_by_max_value,
+ align,
+ ):
+ """Create TS.RANGE and TS.REVRANGE arguments."""
+ params = [key, from_time, to_time]
+ self._appendFilerByTs(params, filter_by_ts)
+ self._appendFilerByValue(
+ params,
+ filter_by_min_value,
+ filter_by_max_value
+ )
+ self._appendCount(params, count)
+ self._appendAlign(params, align)
+ self._appendAggregation(params, aggregation_type, bucket_size_msec)
+
+ return params
+
+ def range(
+ self,
+ key,
+ from_time,
+ to_time,
+ count=None,
+ aggregation_type=None,
+ bucket_size_msec=0,
+ filter_by_ts=None,
+ filter_by_min_value=None,
+ filter_by_max_value=None,
+ align=None,
+ ):
+ """
+ Query a range in forward direction for a specific time-serie.
+ For more information see
+ `TS.RANGE <https://oss.redis.com/redistimeseries/master/commands/#tsrangetsrevrange>`_. # noqa
+
+ Args:
+
+ key:
+ Key name for timeseries.
+ from_time:
+ Start timestamp for the range query. - can be used to express
+ the minimum possible timestamp (0).
+ to_time:
+ End timestamp for range query, + can be used to express the
+ maximum possible timestamp.
+ count:
+ Optional maximum number of returned results.
+ aggregation_type:
+ Optional aggregation type. Can be one of
+ [`avg`, `sum`, `min`, `max`, `range`, `count`,
+ `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`]
+ bucket_size_msec:
+ Time bucket for aggregation in milliseconds.
+ filter_by_ts:
+ List of timestamps to filter the result by specific timestamps.
+ filter_by_min_value:
+ Filter result by minimum value (must mention also filter
+ by_max_value).
+ filter_by_max_value:
+ Filter result by maximum value (must mention also filter
+ by_min_value).
+ align:
+ Timestamp for alignment control for aggregation.
+ """
+ params = self.__range_params(
+ key,
+ from_time,
+ to_time,
+ count,
+ aggregation_type,
+ bucket_size_msec,
+ filter_by_ts,
+ filter_by_min_value,
+ filter_by_max_value,
+ align,
+ )
+ return self.execute_command(RANGE_CMD, *params)
+
+ def revrange(
+ self,
+ key,
+ from_time,
+ to_time,
+ count=None,
+ aggregation_type=None,
+ bucket_size_msec=0,
+ filter_by_ts=None,
+ filter_by_min_value=None,
+ filter_by_max_value=None,
+ align=None,
+ ):
+ """
+ Query a range in reverse direction for a specific time-series.
+ For more information see
+ `TS.REVRANGE <https://oss.redis.com/redistimeseries/master/commands/#tsrangetsrevrange>`_. # noqa
+
+ **Note**: This command is only available since RedisTimeSeries >= v1.4
+
+ Args:
+
+ key:
+ Key name for timeseries.
+ from_time:
+ Start timestamp for the range query. - can be used to express the minimum possible timestamp (0).
+ to_time:
+ End timestamp for range query, + can be used to express the maximum possible timestamp.
+ count:
+ Optional maximum number of returned results.
+ aggregation_type:
+ Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`,
+ `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`]
+ bucket_size_msec:
+ Time bucket for aggregation in milliseconds.
+ filter_by_ts:
+ List of timestamps to filter the result by specific timestamps.
+ filter_by_min_value:
+ Filter result by minimum value (must mention also filter_by_max_value).
+ filter_by_max_value:
+ Filter result by maximum value (must mention also filter_by_min_value).
+ align:
+ Timestamp for alignment control for aggregation.
+ """
+ params = self.__range_params(
+ key,
+ from_time,
+ to_time,
+ count,
+ aggregation_type,
+ bucket_size_msec,
+ filter_by_ts,
+ filter_by_min_value,
+ filter_by_max_value,
+ align,
+ )
+ return self.execute_command(REVRANGE_CMD, *params)
+
+ def __mrange_params(
+ self,
+ aggregation_type,
+ bucket_size_msec,
+ count,
+ filters,
+ from_time,
+ to_time,
+ with_labels,
+ filter_by_ts,
+ filter_by_min_value,
+ filter_by_max_value,
+ groupby,
+ reduce,
+ select_labels,
+ align,
+ ):
+ """Create TS.MRANGE and TS.MREVRANGE arguments."""
+ params = [from_time, to_time]
+ self._appendFilerByTs(params, filter_by_ts)
+ self._appendFilerByValue(
+ params,
+ filter_by_min_value,
+ filter_by_max_value
+ )
+ self._appendCount(params, count)
+ self._appendAlign(params, align)
+ self._appendAggregation(params, aggregation_type, bucket_size_msec)
+ self._appendWithLabels(params, with_labels, select_labels)
+ params.extend(["FILTER"])
+ params += filters
+ self._appendGroupbyReduce(params, groupby, reduce)
+ return params
+
+ def mrange(
+ self,
+ from_time,
+ to_time,
+ filters,
+ count=None,
+ aggregation_type=None,
+ bucket_size_msec=0,
+ with_labels=False,
+ filter_by_ts=None,
+ filter_by_min_value=None,
+ filter_by_max_value=None,
+ groupby=None,
+ reduce=None,
+ select_labels=None,
+ align=None,
+ ):
+ """
+ Query a range across multiple time-series by filters in forward direction.
+ For more information see
+ `TS.MRANGE <https://oss.redis.com/redistimeseries/master/commands/#tsmrangetsmrevrange>`_. # noqa
+
+ Args:
+
+ from_time:
+ Start timestamp for the range query. `-` can be used to
+ express the minimum possible timestamp (0).
+ to_time:
+ End timestamp for range query, `+` can be used to express
+ the maximum possible timestamp.
+ filters:
+ filter to match the time-series labels.
+ count:
+ Optional maximum number of returned results.
+ aggregation_type:
+ Optional aggregation type. Can be one of
+ [`avg`, `sum`, `min`, `max`, `range`, `count`,
+ `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`]
+ bucket_size_msec:
+ Time bucket for aggregation in milliseconds.
+ with_labels:
+ Include in the reply the label-value pairs that represent metadata
+ labels of the time-series.
+ If this argument is not set, by default, an empty Array will be
+ replied on the labels array position.
+ filter_by_ts:
+ List of timestamps to filter the result by specific timestamps.
+ filter_by_min_value:
+ Filter result by minimum value (must mention also
+ filter_by_max_value).
+ filter_by_max_value:
+ Filter result by maximum value (must mention also
+ filter_by_min_value).
+ groupby:
+ Grouping by fields the results (must mention also reduce).
+ reduce:
+ Applying reducer functions on each group. Can be one
+ of [`sum`, `min`, `max`].
+ select_labels:
+ Include in the reply only a subset of the key-value
+ pair labels of a series.
+ align:
+ Timestamp for alignment control for aggregation.
+ """
+ params = self.__mrange_params(
+ aggregation_type,
+ bucket_size_msec,
+ count,
+ filters,
+ from_time,
+ to_time,
+ with_labels,
+ filter_by_ts,
+ filter_by_min_value,
+ filter_by_max_value,
+ groupby,
+ reduce,
+ select_labels,
+ align,
+ )
+
+ return self.execute_command(MRANGE_CMD, *params)
+
+ def mrevrange(
+ self,
+ from_time,
+ to_time,
+ filters,
+ count=None,
+ aggregation_type=None,
+ bucket_size_msec=0,
+ with_labels=False,
+ filter_by_ts=None,
+ filter_by_min_value=None,
+ filter_by_max_value=None,
+ groupby=None,
+ reduce=None,
+ select_labels=None,
+ align=None,
+ ):
+ """
+ Query a range across multiple time-series by filters in reverse direction.
+ For more information see
+ `TS.MREVRANGE <https://oss.redis.com/redistimeseries/master/commands/#tsmrangetsmrevrange>`_. # noqa
+
+ Args:
+
+ from_time:
+ Start timestamp for the range query. - can be used to express
+ the minimum possible timestamp (0).
+ to_time:
+ End timestamp for range query, + can be used to express
+ the maximum possible timestamp.
+ filters:
+ Filter to match the time-series labels.
+ count:
+ Optional maximum number of returned results.
+ aggregation_type:
+ Optional aggregation type. Can be one of
+ [`avg`, `sum`, `min`, `max`, `range`, `count`,
+ `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`]
+ bucket_size_msec:
+ Time bucket for aggregation in milliseconds.
+ with_labels:
+ Include in the reply the label-value pairs that represent
+ metadata labels
+ of the time-series.
+ If this argument is not set, by default, an empty Array
+ will be replied
+ on the labels array position.
+ filter_by_ts:
+ List of timestamps to filter the result by specific timestamps.
+ filter_by_min_value:
+ Filter result by minimum value (must mention also filter
+ by_max_value).
+ filter_by_max_value:
+ Filter result by maximum value (must mention also filter
+ by_min_value).
+ groupby:
+ Grouping by fields the results (must mention also reduce).
+ reduce:
+ Applying reducer functions on each group. Can be one
+ of [`sum`, `min`, `max`].
+ select_labels:
+ Include in the reply only a subset of the key-value pair
+ labels of a series.
+ align:
+ Timestamp for alignment control for aggregation.
+ """
+ params = self.__mrange_params(
+ aggregation_type,
+ bucket_size_msec,
+ count,
+ filters,
+ from_time,
+ to_time,
+ with_labels,
+ filter_by_ts,
+ filter_by_min_value,
+ filter_by_max_value,
+ groupby,
+ reduce,
+ select_labels,
+ align,
+ )
+
+ return self.execute_command(MREVRANGE_CMD, *params)
+
+ def get(self, key):
+ """ # noqa
+ Get the last sample of `key`.
+ For more information see `TS.GET <https://oss.redis.com/redistimeseries/master/commands/#tsget>`_.
+ """
+ return self.execute_command(GET_CMD, key)
+
+ def mget(self, filters, with_labels=False):
+ """ # noqa
+ Get the last samples matching the specific `filter`.
+ For more information see `TS.MGET <https://oss.redis.com/redistimeseries/master/commands/#tsmget>`_.
+ """
+ params = []
+ self._appendWithLabels(params, with_labels)
+ params.extend(["FILTER"])
+ params += filters
+ return self.execute_command(MGET_CMD, *params)
+
+ def info(self, key):
+ """ # noqa
+ Get information of `key`.
+ For more information see `TS.INFO <https://oss.redis.com/redistimeseries/master/commands/#tsinfo>`_.
+ """
+ return self.execute_command(INFO_CMD, key)
+
+ def queryindex(self, filters):
+ """ # noqa
+ Get all the keys matching the `filter` list.
+ For more information see `TS.QUERYINDEX <https://oss.redis.com/redistimeseries/master/commands/#tsqueryindex>`_.
+ """
+ return self.execute_command(QUERYINDEX_CMD, *filters)
+
+ @staticmethod
+ def _appendUncompressed(params, uncompressed):
+ """Append UNCOMPRESSED tag to params."""
+ if uncompressed:
+ params.extend(["UNCOMPRESSED"])
+
+ @staticmethod
+ def _appendWithLabels(params, with_labels, select_labels=None):
+ """Append labels behavior to params."""
+ if with_labels and select_labels:
+ raise DataError(
+ "with_labels and select_labels cannot be provided together."
+ )
+
+ if with_labels:
+ params.extend(["WITHLABELS"])
+ if select_labels:
+ params.extend(["SELECTED_LABELS", *select_labels])
+
+ @staticmethod
+ def _appendGroupbyReduce(params, groupby, reduce):
+ """Append GROUPBY REDUCE property to params."""
+ if groupby is not None and reduce is not None:
+ params.extend(["GROUPBY", groupby, "REDUCE", reduce.upper()])
+
+ @staticmethod
+ def _appendRetention(params, retention):
+ """Append RETENTION property to params."""
+ if retention is not None:
+ params.extend(["RETENTION", retention])
+
+ @staticmethod
+ def _appendLabels(params, labels):
+ """Append LABELS property to params."""
+ if labels:
+ params.append("LABELS")
+ for k, v in labels.items():
+ params.extend([k, v])
+
+ @staticmethod
+ def _appendCount(params, count):
+ """Append COUNT property to params."""
+ if count is not None:
+ params.extend(["COUNT", count])
+
+ @staticmethod
+ def _appendTimestamp(params, timestamp):
+ """Append TIMESTAMP property to params."""
+ if timestamp is not None:
+ params.extend(["TIMESTAMP", timestamp])
+
+ @staticmethod
+ def _appendAlign(params, align):
+ """Append ALIGN property to params."""
+ if align is not None:
+ params.extend(["ALIGN", align])
+
+ @staticmethod
+ def _appendAggregation(params, aggregation_type, bucket_size_msec):
+ """Append AGGREGATION property to params."""
+ if aggregation_type is not None:
+ params.append("AGGREGATION")
+ params.extend([aggregation_type, bucket_size_msec])
+
+ @staticmethod
+ def _appendChunkSize(params, chunk_size):
+ """Append CHUNK_SIZE property to params."""
+ if chunk_size is not None:
+ params.extend(["CHUNK_SIZE", chunk_size])
+
+ @staticmethod
+ def _appendDuplicatePolicy(params, command, duplicate_policy):
+ """Append DUPLICATE_POLICY property to params on CREATE
+ and ON_DUPLICATE on ADD.
+ """
+ if duplicate_policy is not None:
+ if command == "TS.ADD":
+ params.extend(["ON_DUPLICATE", duplicate_policy])
+ else:
+ params.extend(["DUPLICATE_POLICY", duplicate_policy])
+
+ @staticmethod
+ def _appendFilerByTs(params, ts_list):
+ """Append FILTER_BY_TS property to params."""
+ if ts_list is not None:
+ params.extend(["FILTER_BY_TS", *ts_list])
+
+ @staticmethod
+ def _appendFilerByValue(params, min_value, max_value):
+ """Append FILTER_BY_VALUE property to params."""
+ if min_value is not None and max_value is not None:
+ params.extend(["FILTER_BY_VALUE", min_value, max_value])
diff --git a/redis/commands/timeseries/info.py b/redis/commands/timeseries/info.py
new file mode 100644
index 0000000..3b89503
--- /dev/null
+++ b/redis/commands/timeseries/info.py
@@ -0,0 +1,82 @@
+from .utils import list_to_dict
+from ..helpers import nativestr
+
+
+class TSInfo(object):
+ """
+ Hold information and statistics on the time-series.
+ Can be created using ``tsinfo`` command
+ https://oss.redis.com/redistimeseries/commands/#tsinfo.
+ """
+
+ rules = []
+ labels = []
+ sourceKey = None
+ chunk_count = None
+ memory_usage = None
+ total_samples = None
+ retention_msecs = None
+ last_time_stamp = None
+ first_time_stamp = None
+
+ max_samples_per_chunk = None
+ chunk_size = None
+ duplicate_policy = None
+
+ def __init__(self, args):
+ """
+ Hold information and statistics on the time-series.
+
+ The supported params that can be passed as args:
+
+ rules:
+ A list of compaction rules of the time series.
+ sourceKey:
+ Key name for source time series in case the current series
+ is a target of a rule.
+ chunkCount:
+ Number of Memory Chunks used for the time series.
+ memoryUsage:
+ Total number of bytes allocated for the time series.
+ totalSamples:
+ Total number of samples in the time series.
+ labels:
+ A list of label-value pairs that represent the metadata
+ labels of the time series.
+ retentionTime:
+ Retention time, in milliseconds, for the time series.
+ lastTimestamp:
+ Last timestamp present in the time series.
+ firstTimestamp:
+ First timestamp present in the time series.
+ maxSamplesPerChunk:
+ Deprecated.
+ chunkSize:
+ Amount of memory, in bytes, allocated for data.
+ duplicatePolicy:
+ Policy that will define handling of duplicate samples.
+
+ Can read more about on
+ https://oss.redis.com/redistimeseries/configuration/#duplicate_policy
+ """
+ response = dict(zip(map(nativestr, args[::2]), args[1::2]))
+ self.rules = response["rules"]
+ self.source_key = response["sourceKey"]
+ self.chunk_count = response["chunkCount"]
+ self.memory_usage = response["memoryUsage"]
+ self.total_samples = response["totalSamples"]
+ self.labels = list_to_dict(response["labels"])
+ self.retention_msecs = response["retentionTime"]
+ self.lastTimeStamp = response["lastTimestamp"]
+ self.first_time_stamp = response["firstTimestamp"]
+ if "maxSamplesPerChunk" in response:
+ self.max_samples_per_chunk = response["maxSamplesPerChunk"]
+ self.chunk_size = (
+ self.max_samples_per_chunk * 16
+ ) # backward compatible changes
+ if "chunkSize" in response:
+ self.chunk_size = response["chunkSize"]
+ if "duplicatePolicy" in response:
+ self.duplicate_policy = response["duplicatePolicy"]
+ if type(self.duplicate_policy) == bytes:
+ self.duplicate_policy = self.duplicate_policy.decode()
diff --git a/redis/commands/timeseries/utils.py b/redis/commands/timeseries/utils.py
new file mode 100644
index 0000000..c33b7c5
--- /dev/null
+++ b/redis/commands/timeseries/utils.py
@@ -0,0 +1,49 @@
+from ..helpers import nativestr
+
+
+def list_to_dict(aList):
+ return {
+ nativestr(aList[i][0]): nativestr(aList[i][1])
+ for i in range(len(aList))}
+
+
+def parse_range(response):
+ """Parse range response. Used by TS.RANGE and TS.REVRANGE."""
+ return [tuple((r[0], float(r[1]))) for r in response]
+
+
+def parse_m_range(response):
+ """Parse multi range response. Used by TS.MRANGE and TS.MREVRANGE."""
+ res = []
+ for item in response:
+ res.append(
+ {nativestr(item[0]):
+ [list_to_dict(item[1]), parse_range(item[2])]})
+ return sorted(res, key=lambda d: list(d.keys()))
+
+
+def parse_get(response):
+ """Parse get response. Used by TS.GET."""
+ if not response:
+ return None
+ return int(response[0]), float(response[1])
+
+
+def parse_m_get(response):
+ """Parse multi get response. Used by TS.MGET."""
+ res = []
+ for item in response:
+ if not item[2]:
+ res.append(
+ {nativestr(item[0]): [list_to_dict(item[1]), None, None]})
+ else:
+ res.append(
+ {
+ nativestr(item[0]): [
+ list_to_dict(item[1]),
+ int(item[2][0]),
+ float(item[2][1]),
+ ]
+ }
+ )
+ return sorted(res, key=lambda d: list(d.keys()))