summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c29
1 files changed, 28 insertions, 1 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index f991765eb..197b7d4f7 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -818,6 +818,28 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
return deleted;
}
+/* Trims a stream by length. Returns the number of deleted items. */
+int64_t streamTrimByLength(stream *s, long long maxlen, int approx) {
+ streamAddTrimArgs args = {
+ .trim_strategy = TRIM_STRATEGY_MAXLEN,
+ .approx_trim = approx,
+ .limit = approx ? 100 * server.stream_node_max_entries : 0,
+ .maxlen = maxlen
+ };
+ return streamTrim(s, &args);
+}
+
+/* Trims a stream by minimum ID. Returns the number of deleted items. */
+int64_t streamTrimByID(stream *s, streamID minid, int approx) {
+ streamAddTrimArgs args = {
+ .trim_strategy = TRIM_STRATEGY_MINID,
+ .approx_trim = approx,
+ .limit = approx ? 100 * server.stream_node_max_entries : 0,
+ .minid = minid
+ };
+ return streamTrim(s, &args);
+}
+
/* Parse the arguements of XADD/XTRIM.
*
* See streamAddTrimArgs for more details about the arguments handled.
@@ -1625,7 +1647,7 @@ robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) {
* treated as an invalid ID.
*
* If 'c' is set to NULL, no reply is sent to the client. */
-int streamGenericParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int strict) {
+int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict) {
char buf[128];
if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
memcpy(buf,o->ptr,sdslen(o->ptr)+1);
@@ -1661,6 +1683,11 @@ invalid:
return C_ERR;
}
+/* Wrapper for streamGenericParseIDOrReply() used by module API. */
+int streamParseID(const robj *o, streamID *id) {
+ return streamGenericParseIDOrReply(NULL, o, id, 0, 0);
+}
+
/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
* 0, to be used when - and + are acceptable IDs. */
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {