summaryrefslogtreecommitdiff
path: root/redis/commands/timeseries/commands.py
diff options
context:
space:
mode:
authordvora-h <67596500+dvora-h@users.noreply.github.com>2022-07-24 15:39:02 +0300
committerGitHub <noreply@github.com>2022-07-24 15:39:02 +0300
commit912acccb610b182fa69420e2c79bb5bc06d94703 (patch)
tree8a270f6f3f9217c5e140c16e374c3b35a0a32059 /redis/commands/timeseries/commands.py
parentae171d16e173c367256b1da42f66947fd3c6d1ea (diff)
downloadredis-py-912acccb610b182fa69420e2c79bb5bc06d94703.tar.gz
Add support for TIMESERIES 1.8 (#2296)
* Add support for timeseries 1.8 * fix info * linters * linters * fix info test * type hints * linters
Diffstat (limited to 'redis/commands/timeseries/commands.py')
-rw-r--r--redis/commands/timeseries/commands.py668
1 files changed, 398 insertions, 270 deletions
diff --git a/redis/commands/timeseries/commands.py b/redis/commands/timeseries/commands.py
index 3a30c24..13e3cdf 100644
--- a/redis/commands/timeseries/commands.py
+++ b/redis/commands/timeseries/commands.py
@@ -1,4 +1,7 @@
+from typing import Dict, List, Optional, Tuple, Union
+
from redis.exceptions import DataError
+from redis.typing import KeyT, Number
ADD_CMD = "TS.ADD"
ALTER_CMD = "TS.ALTER"
@@ -22,7 +25,15 @@ REVRANGE_CMD = "TS.REVRANGE"
class TimeSeriesCommands:
"""RedisTimeSeries Commands."""
- def create(self, key, **kwargs):
+ def create(
+ self,
+ key: KeyT,
+ retention_msecs: Optional[int] = None,
+ uncompressed: Optional[bool] = False,
+ labels: Optional[Dict[str, str]] = None,
+ chunk_size: Optional[int] = None,
+ duplicate_policy: Optional[str] = None,
+ ):
"""
Create a new time-series.
@@ -31,40 +42,26 @@ class TimeSeriesCommands:
key:
time-series key
retention_msecs:
- Maximum age for samples compared to last event time (in milliseconds).
+ Maximum age for samples compared to highest reported timestamp (in milliseconds).
If None or 0 is passed then the series is not trimmed at all.
uncompressed:
- Since RedisTimeSeries v1.2, both timestamps and values are
- compressed by default.
- Adding this flag will keep data in an uncompressed form.
- Compression not only saves
- memory but usually improve performance due to lower number
- of memory accesses.
+ Changes data storage from compressed (by default) to uncompressed
labels:
Set of label-value pairs that represent metadata labels of the key.
chunk_size:
- Each time-serie uses chunks of memory of fixed size for
- time series samples.
- You can alter the default TSDB chunk size by passing the
- chunk_size argument (in Bytes).
+ Memory size, in bytes, allocated for each data chunk.
+ Must be a multiple of 8 in the range [128 .. 1048576].
duplicate_policy:
- Since RedisTimeSeries v1.4 you can specify the duplicate sample policy
- ( Configure what to do on duplicate sample. )
+ Policy for handling multiple samples with identical timestamps.
Can be one of:
- 'block': an error will occur for any out of order sample.
- 'first': ignore the new value.
- 'last': override with latest value.
- 'min': only override if the value is lower than the existing value.
- 'max': only override if the value is higher than the existing value.
- When this is not set, the server-wide default will be used.
- For more information: https://oss.redis.com/redistimeseries/commands/#tscreate
+ For more information: https://redis.io/commands/ts.create/
""" # noqa
- retention_msecs = kwargs.get("retention_msecs", None)
- uncompressed = kwargs.get("uncompressed", False)
- labels = kwargs.get("labels", {})
- chunk_size = kwargs.get("chunk_size", None)
- duplicate_policy = kwargs.get("duplicate_policy", None)
params = [key]
self._append_retention(params, retention_msecs)
self._append_uncompressed(params, uncompressed)
@@ -74,29 +71,62 @@ class TimeSeriesCommands:
return self.execute_command(CREATE_CMD, *params)
- def alter(self, key, **kwargs):
+ def alter(
+ self,
+ key: KeyT,
+ retention_msecs: Optional[int] = None,
+ labels: Optional[Dict[str, str]] = None,
+ chunk_size: Optional[int] = None,
+ duplicate_policy: Optional[str] = None,
+ ):
"""
- Update the retention, labels of an existing key.
- For more information see
+ Update the retention, chunk size, duplicate policy, and labels of an existing
+ time series.
+
+ Args:
- The parameters are the same as TS.CREATE.
+ key:
+ time-series key
+ retention_msecs:
+ Maximum retention period, compared to maximal existing timestamp (in milliseconds).
+ If None or 0 is passed then the series is not trimmed at all.
+ labels:
+ Set of label-value pairs that represent metadata labels of the key.
+ chunk_size:
+ Memory size, in bytes, allocated for each data chunk.
+ Must be a multiple of 8 in the range [128 .. 1048576].
+ duplicate_policy:
+ Policy for handling multiple samples with identical timestamps.
+ Can be one of:
+ - 'block': an error will occur for any out of order sample.
+ - 'first': ignore the new value.
+ - 'last': override with latest value.
+ - 'min': only override if the value is lower than the existing value.
+ - 'max': only override if the value is higher than the existing value.
- For more information: https://oss.redis.com/redistimeseries/commands/#tsalter
+ For more information: https://redis.io/commands/ts.alter/
""" # noqa
- retention_msecs = kwargs.get("retention_msecs", None)
- labels = kwargs.get("labels", {})
- duplicate_policy = kwargs.get("duplicate_policy", None)
params = [key]
self._append_retention(params, retention_msecs)
+ self._append_chunk_size(params, chunk_size)
self._append_duplicate_policy(params, ALTER_CMD, duplicate_policy)
self._append_labels(params, labels)
return self.execute_command(ALTER_CMD, *params)
- def add(self, key, timestamp, value, **kwargs):
+ def add(
+ self,
+ key: KeyT,
+ timestamp: Union[int, str],
+ value: Number,
+ retention_msecs: Optional[int] = None,
+ uncompressed: Optional[bool] = False,
+ labels: Optional[Dict[str, str]] = None,
+ chunk_size: Optional[int] = None,
+ duplicate_policy: Optional[str] = None,
+ ):
"""
- Append (or create and append) a new sample to the series.
- For more information see
+ Append (or create and append) a new sample to a time series.
Args:
@@ -107,35 +137,26 @@ class TimeSeriesCommands:
value:
Numeric data value of the sample
retention_msecs:
- Maximum age for samples compared to last event time (in milliseconds).
+ Maximum retention period, compared to maximal existing timestamp (in milliseconds).
If None or 0 is passed then the series is not trimmed at all.
uncompressed:
- Since RedisTimeSeries v1.2, both timestamps and values are compressed by default.
- Adding this flag will keep data in an uncompressed form. Compression not only saves
- memory but usually improve performance due to lower number of memory accesses.
+ Changes data storage from compressed (by default) to uncompressed
labels:
Set of label-value pairs that represent metadata labels of the key.
chunk_size:
- Each time-serie uses chunks of memory of fixed size for time series samples.
- You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
+ Memory size, in bytes, allocated for each data chunk.
+ Must be a multiple of 8 in the range [128 .. 1048576].
duplicate_policy:
- Since RedisTimeSeries v1.4 you can specify the duplicate sample policy
- (Configure what to do on duplicate sample).
+ Policy for handling multiple samples with identical timestamps.
Can be one of:
- 'block': an error will occur for any out of order sample.
- 'first': ignore the new value.
- 'last': override with latest value.
- 'min': only override if the value is lower than the existing value.
- 'max': only override if the value is higher than the existing value.
- When this is not set, the server-wide default will be used.
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsadd
+ For more information: https://redis.io/commands/ts.add/
""" # noqa
- retention_msecs = kwargs.get("retention_msecs", None)
- uncompressed = kwargs.get("uncompressed", False)
- labels = kwargs.get("labels", {})
- chunk_size = kwargs.get("chunk_size", None)
- duplicate_policy = kwargs.get("duplicate_policy", None)
params = [key, timestamp, value]
self._append_retention(params, retention_msecs)
self._append_uncompressed(params, uncompressed)
@@ -145,28 +166,34 @@ class TimeSeriesCommands:
return self.execute_command(ADD_CMD, *params)
- def madd(self, ktv_tuples):
+ def madd(self, ktv_tuples: List[Tuple[KeyT, Union[int, str], Number]]):
"""
Append (or create and append) a new `value` to series
`key` with `timestamp`.
Expects a list of `tuples` as (`key`,`timestamp`, `value`).
Return value is an array with timestamps of insertions.
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsmadd
+ For more information: https://redis.io/commands/ts.madd/
""" # noqa
params = []
for ktv in ktv_tuples:
- for item in ktv:
- params.append(item)
+ params.extend(ktv)
return self.execute_command(MADD_CMD, *params)
- def incrby(self, key, value, **kwargs):
+ def incrby(
+ self,
+ key: KeyT,
+ value: Number,
+ timestamp: Optional[Union[int, str]] = None,
+ retention_msecs: Optional[int] = None,
+ uncompressed: Optional[bool] = False,
+ labels: Optional[Dict[str, str]] = None,
+ chunk_size: Optional[int] = None,
+ ):
"""
- Increment (or create an time-series and increment) the latest
- sample's of a series.
- This command can be used as a counter or gauge that automatically gets
- history as a time series.
+ Increment (or create an time-series and increment) the latest sample's of a series.
+ This command can be used as a counter or gauge that automatically gets history as a time series.
Args:
@@ -175,27 +202,19 @@ class TimeSeriesCommands:
value:
Numeric data value of the sample
timestamp:
- Timestamp of the sample. None can be used for automatic timestamp (using the system clock).
+ Timestamp of the sample. * can be used for automatic timestamp (using the system clock).
retention_msecs:
Maximum age for samples compared to last event time (in milliseconds).
If None or 0 is passed then the series is not trimmed at all.
uncompressed:
- Since RedisTimeSeries v1.2, both timestamps and values are compressed by default.
- Adding this flag will keep data in an uncompressed form. Compression not only saves
- memory but usually improve performance due to lower number of memory accesses.
+ Changes data storage from compressed (by default) to uncompressed
labels:
Set of label-value pairs that represent metadata labels of the key.
chunk_size:
- Each time-series uses chunks of memory of fixed size for time series samples.
- You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
+ Memory size, in bytes, allocated for each data chunk.
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsincrbytsdecrby
+ For more information: https://redis.io/commands/ts.incrby/
""" # noqa
- timestamp = kwargs.get("timestamp", None)
- retention_msecs = kwargs.get("retention_msecs", None)
- uncompressed = kwargs.get("uncompressed", False)
- labels = kwargs.get("labels", {})
- chunk_size = kwargs.get("chunk_size", None)
params = [key, value]
self._append_timestamp(params, timestamp)
self._append_retention(params, retention_msecs)
@@ -205,12 +224,19 @@ class TimeSeriesCommands:
return self.execute_command(INCRBY_CMD, *params)
- def decrby(self, key, value, **kwargs):
+ def decrby(
+ self,
+ key: KeyT,
+ value: Number,
+ timestamp: Optional[Union[int, str]] = None,
+ retention_msecs: Optional[int] = None,
+ uncompressed: Optional[bool] = False,
+ labels: Optional[Dict[str, str]] = None,
+ chunk_size: Optional[int] = None,
+ ):
"""
- Decrement (or create an time-series and decrement) the
- latest sample's of a series.
- This command can be used as a counter or gauge that
- automatically gets history as a time series.
+ Decrement (or create an time-series and decrement) the latest sample's of a series.
+ This command can be used as a counter or gauge that automatically gets history as a time series.
Args:
@@ -219,31 +245,19 @@ class TimeSeriesCommands:
value:
Numeric data value of the sample
timestamp:
- Timestamp of the sample. None can be used for automatic
- timestamp (using the system clock).
+ Timestamp of the sample. * can be used for automatic timestamp (using the system clock).
retention_msecs:
Maximum age for samples compared to last event time (in milliseconds).
If None or 0 is passed then the series is not trimmed at all.
uncompressed:
- Since RedisTimeSeries v1.2, both timestamps and values are
- compressed by default.
- Adding this flag will keep data in an uncompressed form.
- Compression not only saves
- memory but usually improve performance due to lower number
- of memory accesses.
+ Changes data storage from compressed (by default) to uncompressed
labels:
Set of label-value pairs that represent metadata labels of the key.
chunk_size:
- Each time-series uses chunks of memory of fixed size for time series samples.
- You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
+ Memory size, in bytes, allocated for each data chunk.
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsincrbytsdecrby
+ For more information: https://redis.io/commands/ts.decrby/
""" # noqa
- timestamp = kwargs.get("timestamp", None)
- retention_msecs = kwargs.get("retention_msecs", None)
- uncompressed = kwargs.get("uncompressed", False)
- labels = kwargs.get("labels", {})
- chunk_size = kwargs.get("chunk_size", None)
params = [key, value]
self._append_timestamp(params, timestamp)
self._append_retention(params, retention_msecs)
@@ -253,14 +267,9 @@ class TimeSeriesCommands:
return self.execute_command(DECRBY_CMD, *params)
- def delete(self, key, from_time, to_time):
+ def delete(self, key: KeyT, from_time: int, to_time: int):
"""
- Delete data points for a given timeseries and interval range
- in the form of start and end delete timestamps.
- The given timestamp interval is closed (inclusive), meaning start
- and end data points will also be deleted.
- Return the count for deleted items.
- For more information see
+ Delete all samples between two timestamps for a given time series.
Args:
@@ -271,68 +280,98 @@ class TimeSeriesCommands:
to_time:
End timestamp for the range deletion.
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsdel
+ For more information: https://redis.io/commands/ts.del/
""" # noqa
return self.execute_command(DEL_CMD, key, from_time, to_time)
- def createrule(self, source_key, dest_key, aggregation_type, bucket_size_msec):
+ def createrule(
+ self,
+ source_key: KeyT,
+ dest_key: KeyT,
+ aggregation_type: str,
+ bucket_size_msec: int,
+ align_timestamp: Optional[int] = None,
+ ):
"""
Create a compaction rule from values added to `source_key` into `dest_key`.
- Aggregating for `bucket_size_msec` where an `aggregation_type` can be
- [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`,
- `std.p`, `std.s`, `var.p`, `var.s`]
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tscreaterule
+ Args:
+
+ source_key:
+ Key name for source time series
+ dest_key:
+ Key name for destination (compacted) time series
+ aggregation_type:
+ Aggregation type: One of the following:
+ [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`,
+ `std.s`, `var.p`, `var.s`, `twa`]
+ bucket_size_msec:
+ Duration of each bucket, in milliseconds
+ align_timestamp:
+ Assure that there is a bucket that starts at exactly align_timestamp and
+ align all other buckets accordingly.
+
+ For more information: https://redis.io/commands/ts.createrule/
""" # noqa
params = [source_key, dest_key]
self._append_aggregation(params, aggregation_type, bucket_size_msec)
+ if align_timestamp is not None:
+ params.append(align_timestamp)
return self.execute_command(CREATERULE_CMD, *params)
- def deleterule(self, source_key, dest_key):
+ def deleterule(self, source_key: KeyT, dest_key: KeyT):
"""
- Delete a compaction rule.
- For more information see
+ Delete a compaction rule from `source_key` to `dest_key`..
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsdeleterule
+ For more information: https://redis.io/commands/ts.deleterule/
""" # noqa
return self.execute_command(DELETERULE_CMD, source_key, dest_key)
def __range_params(
self,
- key,
- from_time,
- to_time,
- count,
- aggregation_type,
- bucket_size_msec,
- filter_by_ts,
- filter_by_min_value,
- filter_by_max_value,
- align,
+ key: KeyT,
+ from_time: Union[int, str],
+ to_time: Union[int, str],
+ count: Optional[int],
+ aggregation_type: Optional[str],
+ bucket_size_msec: Optional[int],
+ filter_by_ts: Optional[List[int]],
+ filter_by_min_value: Optional[int],
+ filter_by_max_value: Optional[int],
+ align: Optional[Union[int, str]],
+ latest: Optional[bool],
+ bucket_timestamp: Optional[str],
+ empty: Optional[bool],
):
"""Create TS.RANGE and TS.REVRANGE arguments."""
params = [key, from_time, to_time]
+ self._append_latest(params, latest)
self._append_filer_by_ts(params, filter_by_ts)
self._append_filer_by_value(params, filter_by_min_value, filter_by_max_value)
self._append_count(params, count)
self._append_align(params, align)
self._append_aggregation(params, aggregation_type, bucket_size_msec)
+ self._append_bucket_timestamp(params, bucket_timestamp)
+ self._append_empty(params, empty)
return params
def range(
self,
- key,
- from_time,
- to_time,
- count=None,
- aggregation_type=None,
- bucket_size_msec=0,
- filter_by_ts=None,
- filter_by_min_value=None,
- filter_by_max_value=None,
- align=None,
+ key: KeyT,
+ from_time: Union[int, str],
+ to_time: Union[int, str],
+ count: Optional[int] = None,
+ aggregation_type: Optional[str] = None,
+ bucket_size_msec: Optional[int] = 0,
+ filter_by_ts: Optional[List[int]] = None,
+ filter_by_min_value: Optional[int] = None,
+ filter_by_max_value: Optional[int] = None,
+ align: Optional[Union[int, str]] = None,
+ latest: Optional[bool] = False,
+ bucket_timestamp: Optional[str] = None,
+ empty: Optional[bool] = False,
):
"""
Query a range in forward direction for a specific time-serie.
@@ -342,31 +381,34 @@ class TimeSeriesCommands:
key:
Key name for timeseries.
from_time:
- Start timestamp for the range query. - can be used to express
- the minimum possible timestamp (0).
+ Start timestamp for the range query. - can be used to express the minimum possible timestamp (0).
to_time:
- End timestamp for range query, + can be used to express the
- maximum possible timestamp.
+ End timestamp for range query, + can be used to express the maximum possible timestamp.
count:
- Optional maximum number of returned results.
+ Limits the number of returned samples.
aggregation_type:
- Optional aggregation type. Can be one of
- [`avg`, `sum`, `min`, `max`, `range`, `count`,
- `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`]
+ Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`,
+ `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`]
bucket_size_msec:
Time bucket for aggregation in milliseconds.
filter_by_ts:
List of timestamps to filter the result by specific timestamps.
filter_by_min_value:
- Filter result by minimum value (must mention also filter
- by_max_value).
+ Filter result by minimum value (must mention also filter by_max_value).
filter_by_max_value:
- Filter result by maximum value (must mention also filter
- by_min_value).
+ Filter result by maximum value (must mention also filter by_min_value).
align:
Timestamp for alignment control for aggregation.
-
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsrangetsrevrange
+ latest:
+ Used when a time series is a compaction, reports the compacted value of the
+ latest possibly partial bucket
+ bucket_timestamp:
+ Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`,
+ `high`, `~`, `mid`].
+ empty:
+ Reports aggregations for empty buckets.
+
+ For more information: https://redis.io/commands/ts.range/
""" # noqa
params = self.__range_params(
key,
@@ -379,21 +421,27 @@ class TimeSeriesCommands:
filter_by_min_value,
filter_by_max_value,
align,
+ latest,
+ bucket_timestamp,
+ empty,
)
return self.execute_command(RANGE_CMD, *params)
def revrange(
self,
- key,
- from_time,
- to_time,
- count=None,
- aggregation_type=None,
- bucket_size_msec=0,
- filter_by_ts=None,
- filter_by_min_value=None,
- filter_by_max_value=None,
- align=None,
+ key: KeyT,
+ from_time: Union[int, str],
+ to_time: Union[int, str],
+ count: Optional[int] = None,
+ aggregation_type: Optional[str] = None,
+ bucket_size_msec: Optional[int] = 0,
+ filter_by_ts: Optional[List[int]] = None,
+ filter_by_min_value: Optional[int] = None,
+ filter_by_max_value: Optional[int] = None,
+ align: Optional[Union[int, str]] = None,
+ latest: Optional[bool] = False,
+ bucket_timestamp: Optional[str] = None,
+ empty: Optional[bool] = False,
):
"""
Query a range in reverse direction for a specific time-series.
@@ -409,10 +457,10 @@ class TimeSeriesCommands:
to_time:
End timestamp for range query, + can be used to express the maximum possible timestamp.
count:
- Optional maximum number of returned results.
+ Limits the number of returned samples.
aggregation_type:
- Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`,
- `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`]
+ Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`,
+ `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`]
bucket_size_msec:
Time bucket for aggregation in milliseconds.
filter_by_ts:
@@ -423,8 +471,16 @@ class TimeSeriesCommands:
Filter result by maximum value (must mention also filter_by_min_value).
align:
Timestamp for alignment control for aggregation.
-
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsrangetsrevrange
+ latest:
+ Used when a time series is a compaction, reports the compacted value of the
+ latest possibly partial bucket
+ bucket_timestamp:
+ Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`,
+ `high`, `~`, `mid`].
+ empty:
+ Reports aggregations for empty buckets.
+
+ For more information: https://redis.io/commands/ts.revrange/
""" # noqa
params = self.__range_params(
key,
@@ -437,34 +493,43 @@ class TimeSeriesCommands:
filter_by_min_value,
filter_by_max_value,
align,
+ latest,
+ bucket_timestamp,
+ empty,
)
return self.execute_command(REVRANGE_CMD, *params)
def __mrange_params(
self,
- aggregation_type,
- bucket_size_msec,
- count,
- filters,
- from_time,
- to_time,
- with_labels,
- filter_by_ts,
- filter_by_min_value,
- filter_by_max_value,
- groupby,
- reduce,
- select_labels,
- align,
+ aggregation_type: Optional[str],
+ bucket_size_msec: Optional[int],
+ count: Optional[int],
+ filters: List[str],
+ from_time: Union[int, str],
+ to_time: Union[int, str],
+ with_labels: Optional[bool],
+ filter_by_ts: Optional[List[int]],
+ filter_by_min_value: Optional[int],
+ filter_by_max_value: Optional[int],
+ groupby: Optional[str],
+ reduce: Optional[str],
+ select_labels: Optional[List[str]],
+ align: Optional[Union[int, str]],
+ latest: Optional[bool],
+ bucket_timestamp: Optional[str],
+ empty: Optional[bool],
):
"""Create TS.MRANGE and TS.MREVRANGE arguments."""
params = [from_time, to_time]
+ self._append_latest(params, latest)
self._append_filer_by_ts(params, filter_by_ts)
self._append_filer_by_value(params, filter_by_min_value, filter_by_max_value)
+ self._append_with_labels(params, with_labels, select_labels)
self._append_count(params, count)
self._append_align(params, align)
self._append_aggregation(params, aggregation_type, bucket_size_msec)
- self._append_with_labels(params, with_labels, select_labels)
+ self._append_bucket_timestamp(params, bucket_timestamp)
+ self._append_empty(params, empty)
params.extend(["FILTER"])
params += filters
self._append_groupby_reduce(params, groupby, reduce)
@@ -472,20 +537,23 @@ class TimeSeriesCommands:
def mrange(
self,
- from_time,
- to_time,
- filters,
- count=None,
- aggregation_type=None,
- bucket_size_msec=0,
- with_labels=False,
- filter_by_ts=None,
- filter_by_min_value=None,
- filter_by_max_value=None,
- groupby=None,
- reduce=None,
- select_labels=None,
- align=None,
+ from_time: Union[int, str],
+ to_time: Union[int, str],
+ filters: List[str],
+ count: Optional[int] = None,
+ aggregation_type: Optional[str] = None,
+ bucket_size_msec: Optional[int] = 0,
+ with_labels: Optional[bool] = False,
+ filter_by_ts: Optional[List[int]] = None,
+ filter_by_min_value: Optional[int] = None,
+ filter_by_max_value: Optional[int] = None,
+ groupby: Optional[str] = None,
+ reduce: Optional[str] = None,
+ select_labels: Optional[List[str]] = None,
+ align: Optional[Union[int, str]] = None,
+ latest: Optional[bool] = False,
+ bucket_timestamp: Optional[str] = None,
+ empty: Optional[bool] = False,
):
"""
Query a range across multiple time-series by filters in forward direction.
@@ -493,46 +561,45 @@ class TimeSeriesCommands:
Args:
from_time:
- Start timestamp for the range query. `-` can be used to
- express the minimum possible timestamp (0).
+ Start timestamp for the range query. `-` can be used to express the minimum possible timestamp (0).
to_time:
- End timestamp for range query, `+` can be used to express
- the maximum possible timestamp.
+ End timestamp for range query, `+` can be used to express the maximum possible timestamp.
filters:
filter to match the time-series labels.
count:
- Optional maximum number of returned results.
+ Limits the number of returned samples.
aggregation_type:
- Optional aggregation type. Can be one of
- [`avg`, `sum`, `min`, `max`, `range`, `count`,
- `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`]
+ Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`,
+ `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`]
bucket_size_msec:
Time bucket for aggregation in milliseconds.
with_labels:
- Include in the reply the label-value pairs that represent metadata
- labels of the time-series.
- If this argument is not set, by default, an empty Array will be
- replied on the labels array position.
+ Include in the reply all label-value pairs representing metadata labels of the time series.
filter_by_ts:
List of timestamps to filter the result by specific timestamps.
filter_by_min_value:
- Filter result by minimum value (must mention also
- filter_by_max_value).
+ Filter result by minimum value (must mention also filter_by_max_value).
filter_by_max_value:
- Filter result by maximum value (must mention also
- filter_by_min_value).
+ Filter result by maximum value (must mention also filter_by_min_value).
groupby:
Grouping by fields the results (must mention also reduce).
reduce:
- Applying reducer functions on each group. Can be one
- of [`sum`, `min`, `max`].
+ Applying reducer functions on each group. Can be one of [`avg` `sum`, `min`,
+ `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`].
select_labels:
- Include in the reply only a subset of the key-value
- pair labels of a series.
+ Include in the reply only a subset of the key-value pair labels of a series.
align:
Timestamp for alignment control for aggregation.
-
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsmrangetsmrevrange
+ latest:
+ Used when a time series is a compaction, reports the compacted
+ value of the latest possibly partial bucket
+ bucket_timestamp:
+ Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`,
+ `high`, `~`, `mid`].
+ empty:
+ Reports aggregations for empty buckets.
+
+ For more information: https://redis.io/commands/ts.mrange/
""" # noqa
params = self.__mrange_params(
aggregation_type,
@@ -549,26 +616,32 @@ class TimeSeriesCommands:
reduce,
select_labels,
align,
+ latest,
+ bucket_timestamp,
+ empty,
)
return self.execute_command(MRANGE_CMD, *params)
def mrevrange(
self,
- from_time,
- to_time,
- filters,
- count=None,
- aggregation_type=None,
- bucket_size_msec=0,
- with_labels=False,
- filter_by_ts=None,
- filter_by_min_value=None,
- filter_by_max_value=None,
- groupby=None,
- reduce=None,
- select_labels=None,
- align=None,
+ from_time: Union[int, str],
+ to_time: Union[int, str],
+ filters: List[str],
+ count: Optional[int] = None,
+ aggregation_type: Optional[str] = None,
+ bucket_size_msec: Optional[int] = 0,
+ with_labels: Optional[bool] = False,
+ filter_by_ts: Optional[List[int]] = None,
+ filter_by_min_value: Optional[int] = None,
+ filter_by_max_value: Optional[int] = None,
+ groupby: Optional[str] = None,
+ reduce: Optional[str] = None,
+ select_labels: Optional[List[str]] = None,
+ align: Optional[Union[int, str]] = None,
+ latest: Optional[bool] = False,
+ bucket_timestamp: Optional[str] = None,
+ empty: Optional[bool] = False,
):
"""
Query a range across multiple time-series by filters in reverse direction.
@@ -576,48 +649,45 @@ class TimeSeriesCommands:
Args:
from_time:
- Start timestamp for the range query. - can be used to express
- the minimum possible timestamp (0).
+ Start timestamp for the range query. - can be used to express the minimum possible timestamp (0).
to_time:
- End timestamp for range query, + can be used to express
- the maximum possible timestamp.
+ End timestamp for range query, + can be used to express the maximum possible timestamp.
filters:
Filter to match the time-series labels.
count:
- Optional maximum number of returned results.
+ Limits the number of returned samples.
aggregation_type:
- Optional aggregation type. Can be one of
- [`avg`, `sum`, `min`, `max`, `range`, `count`,
- `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`]
+ Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`,
+ `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`]
bucket_size_msec:
Time bucket for aggregation in milliseconds.
with_labels:
- Include in the reply the label-value pairs that represent
- metadata labels
- of the time-series.
- If this argument is not set, by default, an empty Array
- will be replied
- on the labels array position.
+ Include in the reply all label-value pairs representing metadata labels of the time series.
filter_by_ts:
List of timestamps to filter the result by specific timestamps.
filter_by_min_value:
- Filter result by minimum value (must mention also filter
- by_max_value).
+ Filter result by minimum value (must mention also filter_by_max_value).
filter_by_max_value:
- Filter result by maximum value (must mention also filter
- by_min_value).
+ Filter result by maximum value (must mention also filter_by_min_value).
groupby:
Grouping by fields the results (must mention also reduce).
reduce:
- Applying reducer functions on each group. Can be one
- of [`sum`, `min`, `max`].
+ Applying reducer functions on each group. Can be one of [`avg` `sum`, `min`,
+ `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`].
select_labels:
- Include in the reply only a subset of the key-value pair
- labels of a series.
+ Include in the reply only a subset of the key-value pair labels of a series.
align:
Timestamp for alignment control for aggregation.
-
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsmrangetsmrevrange
+ latest:
+ Used when a time series is a compaction, reports the compacted
+ value of the latest possibly partial bucket
+ bucket_timestamp:
+ Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`,
+ `high`, `~`, `mid`].
+ empty:
+ Reports aggregations for empty buckets.
+
+ For more information: https://redis.io/commands/ts.mrevrange/
""" # noqa
params = self.__mrange_params(
aggregation_type,
@@ -634,54 +704,85 @@ class TimeSeriesCommands:
reduce,
select_labels,
align,
+ latest,
+ bucket_timestamp,
+ empty,
)
return self.execute_command(MREVRANGE_CMD, *params)
- def get(self, key):
+ def get(self, key: KeyT, latest: Optional[bool] = False):
"""# noqa
Get the last sample of `key`.
+ `latest` used when a time series is a compaction, reports the compacted
+ value of the latest (possibly partial) bucket
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsget
+ For more information: https://redis.io/commands/ts.get/
""" # noqa
- return self.execute_command(GET_CMD, key)
+ params = [key]
+ self._append_latest(params, latest)
+ return self.execute_command(GET_CMD, *params)
- def mget(self, filters, with_labels=False):
+ def mget(
+ self,
+ filters: List[str],
+ with_labels: Optional[bool] = False,
+ select_labels: Optional[List[str]] = None,
+ latest: Optional[bool] = False,
+ ):
"""# noqa
Get the last samples matching the specific `filter`.
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsmget
+ Args:
+
+ filters:
+ Filter to match the time-series labels.
+ with_labels:
+ Include in the reply all label-value pairs representing metadata
+ labels of the time series.
+ select_labels:
+ Include in the reply only a subset of the key-value pair labels of a series.
+ latest:
+ Used when a time series is a compaction, reports the compacted
+ value of the latest possibly partial bucket
+
+ For more information: https://redis.io/commands/ts.mget/
""" # noqa
params = []
- self._append_with_labels(params, with_labels)
+ self._append_latest(params, latest)
+ self._append_with_labels(params, with_labels, select_labels)
params.extend(["FILTER"])
params += filters
return self.execute_command(MGET_CMD, *params)
- def info(self, key):
+ def info(self, key: KeyT):
"""# noqa
Get information of `key`.
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsinfo
+ For more information: https://redis.io/commands/ts.info/
""" # noqa
return self.execute_command(INFO_CMD, key)
- def queryindex(self, filters):
+ def queryindex(self, filters: List[str]):
"""# noqa
- Get all the keys matching the `filter` list.
+ Get all time series keys matching the `filter` list.
- For more information: https://oss.redis.com/redistimeseries/master/commands/#tsqueryindex
+ For more information: https://redis.io/commands/ts.queryindex/
""" # noq
return self.execute_command(QUERYINDEX_CMD, *filters)
@staticmethod
- def _append_uncompressed(params, uncompressed):
+ def _append_uncompressed(params: List[str], uncompressed: Optional[bool]):
"""Append UNCOMPRESSED tag to params."""
if uncompressed:
params.extend(["UNCOMPRESSED"])
@staticmethod
- def _append_with_labels(params, with_labels, select_labels=None):
+ def _append_with_labels(
+ params: List[str],
+ with_labels: Optional[bool],
+ select_labels: Optional[List[str]],
+ ):
"""Append labels behavior to params."""
if with_labels and select_labels:
raise DataError(
@@ -694,19 +795,21 @@ class TimeSeriesCommands:
params.extend(["SELECTED_LABELS", *select_labels])
@staticmethod
- def _append_groupby_reduce(params, groupby, reduce):
+ def _append_groupby_reduce(
+ params: List[str], groupby: Optional[str], reduce: Optional[str]
+ ):
"""Append GROUPBY REDUCE property to params."""
if groupby is not None and reduce is not None:
params.extend(["GROUPBY", groupby, "REDUCE", reduce.upper()])
@staticmethod
- def _append_retention(params, retention):
+ def _append_retention(params: List[str], retention: Optional[int]):
"""Append RETENTION property to params."""
if retention is not None:
params.extend(["RETENTION", retention])
@staticmethod
- def _append_labels(params, labels):
+ def _append_labels(params: List[str], labels: Optional[List[str]]):
"""Append LABELS property to params."""
if labels:
params.append("LABELS")
@@ -714,38 +817,43 @@ class TimeSeriesCommands:
params.extend([k, v])
@staticmethod
- def _append_count(params, count):
+ def _append_count(params: List[str], count: Optional[int]):
"""Append COUNT property to params."""
if count is not None:
params.extend(["COUNT", count])
@staticmethod
- def _append_timestamp(params, timestamp):
+ def _append_timestamp(params: List[str], timestamp: Optional[int]):
"""Append TIMESTAMP property to params."""
if timestamp is not None:
params.extend(["TIMESTAMP", timestamp])
@staticmethod
- def _append_align(params, align):
+ def _append_align(params: List[str], align: Optional[Union[int, str]]):
"""Append ALIGN property to params."""
if align is not None:
params.extend(["ALIGN", align])
@staticmethod
- def _append_aggregation(params, aggregation_type, bucket_size_msec):
+ def _append_aggregation(
+ params: List[str],
+ aggregation_type: Optional[str],
+ bucket_size_msec: Optional[int],
+ ):
"""Append AGGREGATION property to params."""
if aggregation_type is not None:
- params.append("AGGREGATION")
- params.extend([aggregation_type, bucket_size_msec])
+ params.extend(["AGGREGATION", aggregation_type, bucket_size_msec])
@staticmethod
- def _append_chunk_size(params, chunk_size):
+ def _append_chunk_size(params: List[str], chunk_size: Optional[int]):
"""Append CHUNK_SIZE property to params."""
if chunk_size is not None:
params.extend(["CHUNK_SIZE", chunk_size])
@staticmethod
- def _append_duplicate_policy(params, command, duplicate_policy):
+ def _append_duplicate_policy(
+ params: List[str], command: Optional[str], duplicate_policy: Optional[str]
+ ):
"""Append DUPLICATE_POLICY property to params on CREATE
and ON_DUPLICATE on ADD.
"""
@@ -756,13 +864,33 @@ class TimeSeriesCommands:
params.extend(["DUPLICATE_POLICY", duplicate_policy])
@staticmethod
- def _append_filer_by_ts(params, ts_list):
+ def _append_filer_by_ts(params: List[str], ts_list: Optional[List[int]]):
"""Append FILTER_BY_TS property to params."""
if ts_list is not None:
params.extend(["FILTER_BY_TS", *ts_list])
@staticmethod
- def _append_filer_by_value(params, min_value, max_value):
+ def _append_filer_by_value(
+ params: List[str], min_value: Optional[int], max_value: Optional[int]
+ ):
"""Append FILTER_BY_VALUE property to params."""
if min_value is not None and max_value is not None:
params.extend(["FILTER_BY_VALUE", min_value, max_value])
+
+ @staticmethod
+ def _append_latest(params: List[str], latest: Optional[bool]):
+ """Append LATEST property to params."""
+ if latest:
+ params.append("LATEST")
+
+ @staticmethod
+ def _append_bucket_timestamp(params: List[str], bucket_timestamp: Optional[str]):
+ """Append BUCKET_TIMESTAMP property to params."""
+ if bucket_timestamp is not None:
+ params.extend(["BUCKETTIMESTAMP", bucket_timestamp])
+
+ @staticmethod
+ def _append_empty(params: List[str], empty: Optional[bool]):
+ """Append EMPTY property to params."""
+ if empty:
+ params.append("EMPTY")