summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-04-19 16:25:29 +0200
committerantirez <antirez@gmail.com>2018-04-19 16:25:29 +0200
commite6b0e8d9ec4561a07864358af8d2d4e81ac413fc (patch)
tree13861df2fe24f9abd765a9c75af370b9661fed52
parent19ae809458b545bf320122c83a4554e3e434aa0f (diff)
downloadredis-e6b0e8d9ec4561a07864358af8d2d4e81ac413fc.tar.gz
Streams: XTRIM command added.
-rw-r--r--src/server.c1
-rw-r--r--src/server.h1
-rw-r--r--src/t_stream.c68
3 files changed, 70 insertions, 0 deletions
diff --git a/src/server.c b/src/server.c
index 2cc6b6f57..404429beb 100644
--- a/src/server.c
+++ b/src/server.c
@@ -314,6 +314,7 @@ struct redisCommand redisCommandTable[] = {
{"xclaim",xclaimCommand,-5,"wF",0,NULL,1,1,1,0,0},
{"xinfo",xinfoCommand,-2,"r",0,NULL,2,2,1,0,0},
{"xdel",xdelCommand,-2,"wF",0,NULL,1,1,1,0,0},
+ {"xtrim",xtrimCommand,-2,"wF",0,NULL,1,1,1,0,0},
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
diff --git a/src/server.h b/src/server.h
index 66581eaa5..172e99c8a 100644
--- a/src/server.h
+++ b/src/server.h
@@ -2054,6 +2054,7 @@ void xpendingCommand(client *c);
void xclaimCommand(client *c);
void xinfoCommand(client *c);
void xdelCommand(client *c);
+void xtrimCommand(client *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
diff --git a/src/t_stream.c b/src/t_stream.c
index ad47941a2..1e46c6367 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -2024,6 +2024,74 @@ void xdelCommand(client *c) {
server.dirty += deleted;
addReplyLongLong(c,deleted);
}
+
+/* General form: XTRIM <key> [... options ...]
+ *
+ * List of options:
+ *
+ * MAXLEN [~] <count> -- Trim so that the stream will be capped at
+ * the specified length. Use ~ before the
+ * count in order to demand approximated trimming
+ * (like XADD MAXLEN option).
+ */
+
+#define TRIM_STRATEGY_NONE 0
+#define TRIM_STRATEGY_MAXLEN 1
+void xtrimCommand(client *c) {
+ robj *o;
+
+ /* If the key does not exist, we are ok returning zero, that is, the
+ * number of elements removed from the stream. */
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL
+ || checkType(c,o,OBJ_STREAM)) return;
+ stream *s = o->ptr;
+
+ /* Argument parsing. */
+ int trim_strategy = TRIM_STRATEGY_NONE;
+ long long maxlen = 0; /* 0 means no maximum length. */
+ int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
+ the maxium length is not applied verbatim. */
+
+ /* Parse options. */
+ int i = 2; /* Start of options. */
+ for (; i < c->argc; i++) {
+ int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
+ char *opt = c->argv[i]->ptr;
+ if (!strcasecmp(opt,"maxlen") && moreargs) {
+ trim_strategy = TRIM_STRATEGY_MAXLEN;
+ char *next = c->argv[i+1]->ptr;
+ /* Check for the form MAXLEN ~ <count>. */
+ if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
+ approx_maxlen = 1;
+ i++;
+ }
+ if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
+ != C_OK) return;
+ i++;
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
+
+ /* Perform the trimming. */
+ int64_t deleted = 0;
+ if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
+ deleted = streamTrimByLength(s,maxlen,approx_maxlen);
+ } else {
+ addReplyError(c,"XTRIM called without an option to trim the stream");
+ return;
+ }
+
+ /* Propagate the write if needed. */
+ if (deleted) {
+ signalModifiedKey(c->db,c->argv[1]);
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
+ server.dirty += deleted;
+ }
+ addReplyLongLong(c,deleted);
+}
+
/* XINFO CONSUMERS key group
* XINFO GROUPS <key>
* XINFO STREAM <key>