summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorguybe7 <guy.benoish@redislabs.com>2021-01-08 18:13:25 +0200
committerGitHub <noreply@github.com>2021-01-08 18:13:25 +0200
commit814aad65f16ed27cb68c4177e4f8b61d7edb31e1 (patch)
tree5129ce067e8e80d86ecc3f0f65067a2933e36b8a
parent5843a45d01f76c55288abe00c29931a2520fe521 (diff)
downloadredis-814aad65f16ed27cb68c4177e4f8b61d7edb31e1.tar.gz
XADD and XTRIM, Trim by MINID, and new LIMIT argument (#8169)
This PR adds another trimming strategy to XADD and XTRIM named MINID (complements the existing MAXLEN). It also adds a new LIMIT argument that allows incremental trimming by repeated calls (rather than all at once). This provides the ability to trim all records older than a certain ID (which makes it possible for the user to trim by age too). Example: XTRIM mystream MINID ~ 1608540753 will trim entries with id < 1608540753, but might not trim all (because of the ~ modifier) The purpose is to ease the use of streams. many users use streams as logs and the common case is wanting a log of the last X seconds rather than a log that contains maximum X entries (new MINID vs existing MAXLEN) The new LIMIT modifier is only supported when the trim strategy uses ~. i.e. when the user asked for exact trimming, it all happens in one go (no possibility for incremental trimming). However, when ~ is provided, we trim full rax nodes, up to the limit number of records. The default limit is 100*stream_node_max_entries (used when LIMIT is not provided). I.e. this is a behavior change (even if the existing MAXLEN strategy is used). An explicit limit of 0 means unlimited (but note that it's not the default). Other changes: Refactor arg parsing code for XADD and XTRIM to use common code.
-rw-r--r--src/t_stream.c579
-rw-r--r--tests/unit/type/stream.tcl140
2 files changed, 569 insertions, 150 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index d085a8948..f991765eb 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -46,6 +46,8 @@
void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
+int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
+int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
@@ -282,6 +284,65 @@ static inline int64_t lpGetIntegerIfValid(unsigned char *ele, int *valid) {
#define lpGetInteger(ele) lpGetIntegerIfValid(ele, NULL)
+/* Get an edge streamID of a given listpack.
+ * 'master_id' is an input param, used to build the 'edge_id' output param */
+int lpGetEdgeStreamID(unsigned char *lp, int first, streamID *master_id, streamID *edge_id)
+{
+ if (lp == NULL)
+ return 0;
+
+ unsigned char *lp_ele;
+
+ /* We need to seek either the first or the last entry depending
+ * on the direction of the iteration. */
+ if (first) {
+ /* Get the master fields count. */
+ lp_ele = lpFirst(lp); /* Seek items count */
+ lp_ele = lpNext(lp, lp_ele); /* Seek deleted count. */
+ lp_ele = lpNext(lp, lp_ele); /* Seek num fields. */
+ int64_t master_fields_count = lpGetInteger(lp_ele);
+ lp_ele = lpNext(lp, lp_ele); /* Seek first field. */
+
+ /* If we are iterating in normal order, skip the master fields
+ * to seek the first actual entry. */
+ for (int64_t i = 0; i < master_fields_count; i++)
+ lp_ele = lpNext(lp, lp_ele);
+
+ /* If we are going forward, skip the previous entry's
+ * lp-count field (or in case of the master entry, the zero
+ * term field) */
+ lp_ele = lpNext(lp, lp_ele);
+ if (lp_ele == NULL)
+ return 0;
+ } else {
+ /* If we are iterating in reverse direction, just seek the
+ * last part of the last entry in the listpack (that is, the
+ * fields count). */
+ lp_ele = lpLast(lp);
+
+ /* If we are going backward, read the number of elements this
+ * entry is composed of, and jump backward N times to seek
+ * its start. */
+ int64_t lp_count = lpGetInteger(lp_ele);
+ if (lp_count == 0) /* We reached the master entry. */
+ return 0;
+
+ while (lp_count--)
+ lp_ele = lpPrev(lp, lp_ele);
+ }
+
+ lp_ele = lpNext(lp, lp_ele); /* Seek ID (lp_ele currently points to 'flags'). */
+
+ /* Get the ID: it is encoded as difference between the master
+ * ID and this entry ID. */
+ streamID id = *master_id;
+ id.ms += lpGetInteger(lp_ele);
+ lp_ele = lpNext(lp, lp_ele);
+ id.seq += lpGetInteger(lp_ele);
+ *edge_id = id;
+ return 1;
+}
+
/* Debugging function to log the full content of a listpack. Useful
* for development and debugging. */
void streamLogListpackContent(unsigned char *lp) {
@@ -325,6 +386,39 @@ int streamCompareID(streamID *a, streamID *b) {
return 0;
}
+void streamGetEdgeID(stream *s, int first, streamID *edge_id)
+{
+ raxIterator ri;
+ raxStart(&ri, s->rax);
+ int empty;
+ if (first) {
+ raxSeek(&ri, "^", NULL, 0);
+ empty = !raxNext(&ri);
+ } else {
+ raxSeek(&ri, "$", NULL, 0);
+ empty = !raxPrev(&ri);
+ }
+
+ if (empty) {
+ /* Stream is empty, mark edge ID as lowest/highest possible. */
+ edge_id->ms = first ? UINT64_MAX : 0;
+ edge_id->seq = first ? UINT64_MAX : 0;
+ raxStop(&ri);
+ return;
+ }
+
+ unsigned char *lp = ri.data;
+
+ /* Read the master ID from the radix tree key. */
+ streamID master_id;
+ streamDecodeID(ri.key, &master_id);
+
+ /* Construct edge ID. */
+ lpGetEdgeStreamID(lp, first, &master_id, edge_id);
+
+ raxStop(&ri);
+}
+
/* Adds a new item into the stream 's' having the specified number of
* field-value pairs as specified in 'numfields' and stored into 'argv'.
* Returns the new entry ID populating the 'added_id' structure.
@@ -525,35 +619,96 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
return C_OK;
}
-/* Trim the stream 's' to have no more than maxlen elements, and return the
+typedef struct {
+ /* XADD options */
+ streamID id; /* User-provided ID, for XADD only. */
+ int id_given; /* Was an ID different than "*" specified? for XADD only. */
+ int no_mkstream; /* if set to 1 do not create new stream */
+
+ /* XADD + XTRIM common options */
+ int trim_strategy; /* TRIM_STRATEGY_* */
+ int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */
+ int approx_trim; /* If 1 only delete whole radix tree nodes, so
+ * the trim argument is not applied verbatim. */
+ long long limit; /* Maximum amount of entries to trim. If 0, no limitation
+ * on the amount of trimming work is enforced. */
+ /* TRIM_STRATEGY_MAXLEN options */
+ long long maxlen; /* After trimming, leave stream at this length . */
+ /* TRIM_STRATEGY_MINID options */
+ streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */
+} streamAddTrimArgs;
+
+#define TRIM_STRATEGY_NONE 0
+#define TRIM_STRATEGY_MAXLEN 1
+#define TRIM_STRATEGY_MINID 2
+
+/* Trim the stream 's' according to args->trim_strategy, and return the
* number of elements removed from the stream. The 'approx' option, if non-zero,
* specifies that the trimming must be performed in a approximated way in
* order to maximize performances. This means that the stream may contain
- * more elements than 'maxlen', and elements are only removed if we can remove
+ * entries with IDs < 'id' in case of MINID (or more elements than 'maxlen'
+ * in case of MAXLEN), and elements are only removed if we can remove
* a *whole* node of the radix tree. The elements are removed from the head
* of the stream (older elements).
*
* The function may return zero if:
*
- * 1) The stream is already shorter or equal to the specified max length.
- * 2) The 'approx' option is true and the head node had not enough elements
- * to be deleted, leaving the stream with a number of elements >= maxlen.
+ * 1) The minimal entry ID of the stream is already < 'id' (MINID); or
+ * 2) The stream is already shorter or equal to the specified max length (MAXLEN); or
+ * 3) The 'approx' option is true and the head node did not have enough elements
+ * to be deleted.
+ *
+ * args->limit is the maximum number of entries to delete. The purpose is to
+ * prevent this function from taking to long.
+ * If 'limit' is 0 then we do not limit the number of deleted entries.
+ * Much like the 'approx', if 'limit' is smaller than the number of entries
+ * that should be trimmed, there is a chance we will still have entries with
+ * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN).
*/
-int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
- if (s->length <= maxlen) return 0;
+int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
+ size_t maxlen = args->maxlen;
+ streamID *id = &args->minid;
+ int approx = args->approx_trim;
+ int64_t limit = args->limit;
+ int trim_strategy = args->trim_strategy;
+
+ if (trim_strategy == TRIM_STRATEGY_NONE)
+ return 0;
raxIterator ri;
raxStart(&ri,s->rax);
raxSeek(&ri,"^",NULL,0);
int64_t deleted = 0;
- while(s->length > maxlen && raxNext(&ri)) {
+ while (raxNext(&ri)) {
+ /* Check if we exceeded the amount of work we could do */
+ if (limit && deleted >= limit)
+ break;
+
+ if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen)
+ break;
+
unsigned char *lp = ri.data, *p = lpFirst(lp);
int64_t entries = lpGetInteger(p);
- /* Check if we can remove the whole node, and still have at
- * least maxlen elements. */
- if (s->length - entries >= maxlen) {
+ /* Check if we can remove the whole node. */
+ int remove_node;
+ streamID master_id = {0}; /* For MINID */
+ if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
+ remove_node = s->length - entries >= maxlen;
+ } else {
+ /* Read the master ID from the radix tree key. */
+ streamDecodeID(ri.key, &master_id);
+
+ /* Read last ID. */
+ streamID last_id;
+ lpGetEdgeStreamID(lp, 0, &master_id, &last_id);
+
+ /* We can remove the entire node id its last ID < 'id' */
+ remove_node = streamCompareID(&last_id, id) < 0;
+ }
+
+ if (remove_node) {
lpFree(lp);
raxRemove(s->rax,ri.key,ri.key_len,NULL);
raxSeek(&ri,">=",ri.key,ri.key_len);
@@ -566,19 +721,15 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
* stop here. */
if (approx) break;
- /* Otherwise, we have to mark single entries inside the listpack
- * as deleted. We start by updating the entries/deleted counters. */
- int64_t to_delete = s->length - maxlen;
- serverAssert(to_delete < entries);
- lp = lpReplaceInteger(lp,&p,entries-to_delete);
- p = lpNext(lp,p); /* Seek deleted field. */
- int64_t marked_deleted = lpGetInteger(p);
- lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
- p = lpNext(lp,p); /* Seek num-of-fields in the master entry. */
+ /* Now we have to trim entries from within 'lp' */
+ int64_t deleted_from_lp = 0;
+
+ p = lpNext(lp, p); /* Skip deleted field. */
+ p = lpNext(lp, p); /* Skip num-of-fields in the master entry. */
/* Skip all the master fields. */
int64_t master_fields_count = lpGetInteger(p);
- p = lpNext(lp,p); /* Seek the first field. */
+ p = lpNext(lp,p); /* Skip the first field. */
for (int64_t j = 0; j < master_fields_count; j++)
p = lpNext(lp,p); /* Skip all master fields. */
p = lpNext(lp,p); /* Skip the zero master entry terminator. */
@@ -586,37 +737,72 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
/* 'p' is now pointing to the first entry inside the listpack.
* We have to run entry after entry, marking entries as deleted
* if they are already not deleted. */
- while(p) {
+ while (p) {
+ /* We keep a copy of p (which point to flags part) in order to
+ * update it after (and if) we actually remove the entry */
+ unsigned char *pcopy = p;
+
int flags = lpGetInteger(p);
+ p = lpNext(lp, p); /* Skip flags. */
int to_skip;
- /* Mark the entry as deleted. */
- if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
- flags |= STREAM_ITEM_FLAG_DELETED;
- lp = lpReplaceInteger(lp,&p,flags);
- deleted++;
- s->length--;
- if (s->length <= maxlen) break; /* Enough entries deleted. */
+ int ms_delta = lpGetInteger(p);
+ p = lpNext(lp, p); /* Skip ID ms delta */
+ int seq_delta = lpGetInteger(p);
+ p = lpNext(lp, p); /* Skip ID seq delta */
+
+ streamID currid = {0}; /* For MINID */
+ if (trim_strategy == TRIM_STRATEGY_MINID) {
+ currid.ms = master_id.ms + ms_delta;
+ currid.seq = master_id.seq + seq_delta;
+ }
+
+ int stop;
+ if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
+ stop = s->length <= maxlen;
+ } else {
+ /* Following IDs will definitely be greater because the rax
+ * tree is sorted, no point of continuing. */
+ stop = streamCompareID(&currid, id) >= 0;
}
+ if (stop)
+ break;
- p = lpNext(lp,p); /* Skip ID ms delta. */
- p = lpNext(lp,p); /* Skip ID seq delta. */
- p = lpNext(lp,p); /* Seek num-fields or values (if compressed). */
if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
to_skip = master_fields_count;
} else {
- to_skip = lpGetInteger(p);
- to_skip = 1+(to_skip*2);
+ to_skip = lpGetInteger(p); /* Get num-fields. */
+ p = lpNext(lp,p); /* Skip num-fields. */
+ to_skip *= 2; /* Fields and values. */
}
while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
p = lpNext(lp,p); /* Skip the final lp-count field. */
+
+ /* Mark the entry as deleted. */
+ if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
+ intptr_t delta = p - lp;
+ flags |= STREAM_ITEM_FLAG_DELETED;
+ lp = lpReplaceInteger(lp, &pcopy, flags);
+ deleted_from_lp++;
+ s->length--;
+ p = lp + delta;
+ }
}
+ deleted += deleted_from_lp;
+
+ /* Now we the entries/deleted counters. */
+ p = lpFirst(lp);
+ lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp);
+ p = lpNext(lp,p); /* Skip deleted field. */
+ int64_t marked_deleted = lpGetInteger(p);
+ lp = lpReplaceInteger(lp,&p,marked_deleted+deleted_from_lp);
+ p = lpNext(lp,p); /* Skip num-of-fields in the master entry. */
/* Here we should perform garbage collection in case at this point
* there are too many entries deleted inside the listpack. */
- entries -= to_delete;
- marked_deleted += to_delete;
+ entries -= deleted_from_lp;
+ marked_deleted += deleted_from_lp;
if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
/* TODO: perform a garbage collection. */
}
@@ -632,6 +818,142 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
return deleted;
}
+/* Parse the arguements of XADD/XTRIM.
+ *
+ * See streamAddTrimArgs for more details about the arguments handled.
+ *
+ * This function returns the position of the ID argument (relevant only to XADD).
+ * On error -1 is returned and a reply is sent. */
+static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, int xadd) {
+ /* Initialize arguments to defaults */
+ memset(args, 0, sizeof(*args));
+
+ /* Parse options. */
+ int i = 2; /* This is the first argument position where we could
+ find an option, or the ID. */
+ int limit_given = 0;
+ for (; i < c->argc; i++) {
+ int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
+ char *opt = c->argv[i]->ptr;
+ if (xadd && opt[0] == '*' && opt[1] == '\0') {
+ /* This is just a fast path for the common case of auto-ID
+ * creation. */
+ break;
+ } else if (!strcasecmp(opt,"maxlen") && moreargs) {
+ if (args->trim_strategy != TRIM_STRATEGY_NONE) {
+ addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible");
+ return -1;
+ }
+ args->approx_trim = 0;
+ char *next = c->argv[i+1]->ptr;
+ /* Check for the form MAXLEN ~ <count>. */
+ if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
+ args->approx_trim = 1;
+ i++;
+ } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
+ i++;
+ }
+ if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->maxlen,NULL)
+ != C_OK) return -1;
+
+ if (args->maxlen < 0) {
+ addReplyError(c,"The MAXLEN argument must be >= 0.");
+ return -1;
+ }
+ i++;
+ args->trim_strategy = TRIM_STRATEGY_MAXLEN;
+ args->trim_strategy_arg_idx = i;
+ } else if (!strcasecmp(opt,"minid") && moreargs) {
+ if (args->trim_strategy != TRIM_STRATEGY_NONE) {
+ addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible");
+ return -1;
+ }
+ args->approx_trim = 0;
+ char *next = c->argv[i+1]->ptr;
+ /* Check for the form MINID ~ <id>|<age>. */
+ if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
+ args->approx_trim = 1;
+ i++;
+ } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
+ i++;
+ }
+
+ if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0) != C_OK)
+ return -1;
+
+ i++;
+ args->trim_strategy = TRIM_STRATEGY_MINID;
+ args->trim_strategy_arg_idx = i;
+ } else if (!strcasecmp(opt,"limit") && moreargs) {
+ /* Note about LIMIT: If it was not provided by the caller we set
+ * it to 100*server.stream_node_max_entries, and that's to prevent the
+ * trimming from taking too long, on the expense of not deleting entries
+ * that should be trimmed.
+ * If user wanted exact trimming (i.e. no '~') we never limit the number
+ * of trimmed entries */
+ if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->limit,NULL) != C_OK)
+ return -1;
+
+ if (args->limit < 0) {
+ addReplyError(c,"The LIMIT argument must be >= 0.");
+ return -1;
+ }
+ limit_given = 1;
+ i++;
+ } else if (xadd && !strcasecmp(opt,"nomkstream")) {
+ args->no_mkstream = 1;
+ } else if (xadd) {
+ /* If we are here is a syntax error or a valid ID. */
+ if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0) != C_OK)
+ return -1;
+ args->id_given = 1;
+ break;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return -1;
+ }
+ }
+
+ if (args->limit && args->trim_strategy == TRIM_STRATEGY_NONE) {
+ addReplyError(c,"syntax error, LIMIT cannot be used without specifying a trimming strategy");
+ return -1;
+ }
+
+ if (!xadd && args->trim_strategy == TRIM_STRATEGY_NONE) {
+ addReplyError(c,"syntax error, XTRIM must be called with a trimming strategy");
+ return -1;
+ }
+
+ if (c == server.master || c->id == CLIENT_ID_AOF) {
+ /* If command cam from master or from AOF we must not enforce maxnodes
+ * (The maxlen/minid argument was re-written to make sure there's no
+ * inconsistency). */
+ args->limit = 0;
+ } else {
+ /* We need to set the limit (only if we got '~') */
+ if (limit_given) {
+ if (!args->approx_trim) {
+ /* LIMIT was provided without ~ */
+ addReplyError(c,"syntax error, LIMIT cannot be used without the special ~ option");
+ return -1;
+ }
+ } else {
+ /* User didn't provide LIMIT, we must set it. */
+
+ if (args->approx_trim) {
+ /* In order to prevent from trimming to do too much work and cause
+ * latency spikes we limit the amount of work it can do */
+ args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */
+ } else {
+ /* No LIMIT for exact trimming */
+ args->limit = 0;
+ }
+ }
+ }
+
+ return i;
+}
+
/* Initialize the stream iterator, so that we can call iterating functions
* to get the next items. This requires a corresponding streamIteratorStop()
* at the end. The 'rev' parameter controls the direction. If it's zero the
@@ -1375,68 +1697,36 @@ int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude,
return C_OK;
}
-/* We propagate MAXLEN ~ <count> as MAXLEN = <resulting-len-of-stream>
- * otherwise trimming is no longer determinsitic on replicas / AOF. */
-void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) {
- robj *maxlen_obj = createStringObjectFromLongLong(s->length);
+void streamRewriteApproxSpecifier(client *c, int idx) {
robj *equal_obj = createStringObject("=",1);
+ rewriteClientCommandArgument(c,idx,equal_obj);
+ decrRefCount(equal_obj);
+}
- rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
- rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
+/* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-stream>
+ * otherwise trimming is no longer deterministic on replicas / AOF. */
+void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) {
+ robj *arg;
+ if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
+ arg = createStringObjectFromLongLong(s->length);
+ } else {
+ streamID first_id;
+ streamGetEdgeID(s, 1, &first_id);
+ arg = createObjectFromStreamID(&first_id);
+ }
- decrRefCount(equal_obj);
- decrRefCount(maxlen_obj);
+ rewriteClientCommandArgument(c,idx,arg);
+ decrRefCount(arg);
}
-/* XADD key [MAXLEN [~|=] <count>] [NOMKSTREAM] <ID or *> [field value] [field value] ... */
+/* XADD key [(MAXLEN [~|=] <count> | MINID [~|=] <id>) [LIMIT <entries>]] [NOMKSTREAM] <ID or *> [field value] [field value] ... */
void xaddCommand(client *c) {
- streamID id;
- int id_given = 0; /* Was an ID different than "*" specified? */
- long long maxlen = -1; /* If left to -1 no trimming is performed. */
- int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
- the maximum length is not applied verbatim. */
- int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
- int no_mkstream = 0; /* if set to 1 do not create new stream */
-
/* Parse options. */
- int i = 2; /* This is the first argument position where we could
- find an option, or the ID. */
- for (; i < c->argc; i++) {
- int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
- char *opt = c->argv[i]->ptr;
- if (opt[0] == '*' && opt[1] == '\0') {
- /* This is just a fast path for the common case of auto-ID
- * creation. */
- break;
- } else if (!strcasecmp(opt,"maxlen") && moreargs) {
- approx_maxlen = 0;
- char *next = c->argv[i+1]->ptr;
- /* Check for the form MAXLEN ~ <count>. */
- if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
- approx_maxlen = 1;
- i++;
- } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
- i++;
- }
- if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
- != C_OK) return;
-
- if (maxlen < 0) {
- addReplyError(c,"The MAXLEN argument must be >= 0.");
- return;
- }
- i++;
- maxlen_arg_idx = i;
- } else if (!strcasecmp(opt,"nomkstream")) {
- no_mkstream = 1;
- } else {
- /* If we are here is a syntax error or a valid ID. */
- if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
- id_given = 1;
- break;
- }
- }
- int field_pos = i+1;
+ streamAddTrimArgs parsed_args;
+ int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1);
+ if (idpos < 0)
+ return; /* streamParseAddOrTrimArgsOrReply already replied. */
+ int field_pos = idpos+1; /* The ID is always one argument before the first field */
/* Check arity. */
if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {
@@ -1447,7 +1737,9 @@ void xaddCommand(client *c) {
/* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
* a new stream and have streamAppendItem fail, leaving an empty key in the
* database. */
- if (id_given && id.ms == 0 && id.seq == 0) {
+ if (parsed_args.id_given &&
+ parsed_args.id.ms == 0 && parsed_args.id.seq == 0)
+ {
addReplyError(c,"The ID specified in XADD must be greater than 0-0");
return;
}
@@ -1455,7 +1747,7 @@ void xaddCommand(client *c) {
/* Lookup the stream at key. */
robj *o;
stream *s;
- if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],no_mkstream)) == NULL) return;
+ if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return;
s = o->ptr;
/* Return ASAP if the stream has reached the last possible ID */
@@ -1466,8 +1758,9 @@ void xaddCommand(client *c) {
}
/* Append using the low level function and return the ID. */
+ streamID id;
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
- &id, id_given ? &id : NULL)
+ &id, parsed_args.id_given ? &parsed_args.id : NULL)
== C_ERR)
{
addReplyError(c,"The ID specified in XADD is equal or smaller than the "
@@ -1480,18 +1773,26 @@ void xaddCommand(client *c) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++;
- if (maxlen >= 0) {
- /* Notify xtrim event if needed. */
- if (streamTrimByLength(s,maxlen,approx_maxlen)) {
+ /* Trim if needed. */
+ if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE) {
+ if (streamTrim(s, &parsed_args)) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
}
- if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
+ if (parsed_args.approx_trim) {
+ /* In case our trimming was limited (by LIMIT or by ~) we must
+ * re-write the relevant trim argument to make sure there will be
+ * no inconsistencies in AOF loading or in the replica.
+ * It's enough to check only args->approx because there is no
+ * way LIMIT is given without the ~ option. */
+ streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
+ streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
+ }
}
/* Let's rewrite the ID argument with the one actually generated for
* AOF/replication propagation. */
robj *idarg = createObjectFromStreamID(&id);
- rewriteClientCommandArgument(c,i,idarg);
+ rewriteClientCommandArgument(c,idpos,idarg);
decrRefCount(idarg);
/* We need to signal to blocked clients that there is new data on this
@@ -2871,14 +3172,25 @@ cleanup:
*
* List of options:
*
+ * Trim strategies:
+ *
* MAXLEN [~|=] <count> -- Trim so that the stream will be capped at
* the specified length. Use ~ before the
* count in order to demand approximated trimming
* (like XADD MAXLEN option).
+ * MINID [~|=] <id> -- Trim so that the stream will not contain entries
+ * with IDs smaller than 'id'. Use ~ before the
+ * count in order to demand approximated trimming
+ * (like XADD MINID option).
+ *
+ * Other options:
+ *
+ * LIMIT <entries> -- The maximum number of entries to trim.
+ * 0 means unlimited. Unless specified, it is set
+ * to a default of 100*server.stream_node_max_entries,
+ * and that's in order to keep the trimming time sane.
+ * Has meaning only if `~` was provided.
*/
-
-#define TRIM_STRATEGY_NONE 0
-#define TRIM_STRATEGY_MAXLEN 1
void xtrimCommand(client *c) {
robj *o;
@@ -2889,58 +3201,27 @@ void xtrimCommand(client *c) {
stream *s = o->ptr;
/* Argument parsing. */
- int trim_strategy = TRIM_STRATEGY_NONE;
- long long maxlen = -1; /* If left to -1 no trimming is performed. */
- int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
- the maxium length is not applied verbatim. */
- int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
-
- /* Parse options. */
- int i = 2; /* Start of options. */
- for (; i < c->argc; i++) {
- int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
- char *opt = c->argv[i]->ptr;
- if (!strcasecmp(opt,"maxlen") && moreargs) {
- approx_maxlen = 0;
- trim_strategy = TRIM_STRATEGY_MAXLEN;
- char *next = c->argv[i+1]->ptr;
- /* Check for the form MAXLEN ~ <count>. */
- if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
- approx_maxlen = 1;
- i++;
- } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
- i++;
- }
- if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
- != C_OK) return;
-
- if (maxlen < 0) {
- addReplyError(c,"The MAXLEN argument must be >= 0.");
- return;
- }
- i++;
- maxlen_arg_idx = i;
- } else {
- addReplyErrorObject(c,shared.syntaxerr);
- return;
- }
- }
+ streamAddTrimArgs parsed_args;
+ if (streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1) < 0)
+ return; /* streamParseAddOrTrimArgsOrReply already replied. */
/* Perform the trimming. */
- int64_t deleted = 0;
- if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
- deleted = streamTrimByLength(s,maxlen,approx_maxlen);
- } else {
- addReplyError(c,"XTRIM called without an option to trim the stream");
- return;
- }
-
- /* Propagate the write if needed. */
+ int64_t deleted = streamTrim(s, &parsed_args);
if (deleted) {
- signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
+ if (parsed_args.approx_trim) {
+ /* In case our trimming was limited (by LIMIT or by ~) we must
+ * re-write the relevant trim argument to make sure there will be
+ * no inconsistencies in AOF loading or in the replica.
+ * It's enough to check only args->approx because there is no
+ * way LIMIT is given without the ~ option. */
+ streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
+ streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
+ }
+
+ /* Propagate the write. */
+ signalModifiedKey(c, c->db,c->argv[1]);
server.dirty += deleted;
- if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
}
addReplyLongLong(c,deleted);
}
diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl
index 63cc697c2..a89a65299 100644
--- a/tests/unit/type/stream.tcl
+++ b/tests/unit/type/stream.tcl
@@ -94,6 +94,7 @@ start_server {
r XADD mystream MAXLEN 5 * yitem $j
}
}
+ assert {[r xlen mystream] == 5}
set res [r xrange mystream - +]
set expected 995
foreach r $res {
@@ -138,6 +139,39 @@ start_server {
assert_equal [lindex $items 1 1] {item 2 value b}
}
+ test {XADD with MINID option} {
+ r DEL mystream
+ for {set j 1} {$j < 1001} {incr j} {
+ set minid 1000
+ if {$j >= 5} {
+ set minid [expr {$j-5}]
+ }
+ if {rand() < 0.9} {
+ r XADD mystream MINID $minid $j xitem $j
+ } else {
+ r XADD mystream MINID $minid $j yitem $j
+ }
+ }
+ assert {[r xlen mystream] == 6}
+ set res [r xrange mystream - +]
+ set expected 995
+ foreach r $res {
+ assert {[lindex $r 1 1] == $expected}
+ incr expected
+ }
+ }
+
+ test {XTRIM with MINID option} {
+ r DEL mystream
+ r XADD mystream 1-0 f v
+ r XADD mystream 2-0 f v
+ r XADD mystream 3-0 f v
+ r XADD mystream 4-0 f v
+ r XADD mystream 5-0 f v
+ r XTRIM mystream MINID = 3-0
+ assert_equal [r XRANGE mystream - +] {{3-0 {f v}} {4-0 {f v}} {5-0 {f v}}}
+ }
+
test {XADD mass insertion and XLEN} {
r DEL mystream
r multi
@@ -448,7 +482,49 @@ start_server {
assert {[r XLEN mystream] == 400}
}
-
+ test {XADD with LIMIT consecutive calls} {
+ r del mystream
+ r config set stream-node-max-entries 10
+ for {set j 0} {$j < 100} {incr j} {
+ r XADD mystream * xitem v
+ }
+ r XADD mystream MAXLEN ~ 55 LIMIT 30 * xitem v
+ assert {[r xlen mystream] == 71}
+ r XADD mystream MAXLEN ~ 55 LIMIT 30 * xitem v
+ assert {[r xlen mystream] == 62}
+ r config set stream-node-max-entries 100
+ }
+
+ test {XTRIM with ~ is limited} {
+ r del mystream
+ r config set stream-node-max-entries 1
+ for {set j 0} {$j < 102} {incr j} {
+ r XADD mystream * xitem v
+ }
+ r XTRIM mystream MAXLEN ~ 1
+ assert {[r xlen mystream] == 2}
+ r config set stream-node-max-entries 100
+ }
+
+ test {XTRIM without ~ is not limited} {
+ r del mystream
+ r config set stream-node-max-entries 1
+ for {set j 0} {$j < 102} {incr j} {
+ r XADD mystream * xitem v
+ }
+ r XTRIM mystream MAXLEN 1
+ assert {[r xlen mystream] == 1}
+ r config set stream-node-max-entries 100
+ }
+
+ test {XTRIM without ~ and with LIMIT} {
+ r del mystream
+ r config set stream-node-max-entries 1
+ for {set j 0} {$j < 102} {incr j} {
+ r XADD mystream * xitem v
+ }
+ assert_error ERR* {r XTRIM mystream MAXLEN 1 LIMIT 30}
+ }
}
start_server {tags {"stream"} overrides {appendonly yes}} {
@@ -467,6 +543,22 @@ start_server {tags {"stream"} overrides {appendonly yes}} {
}
start_server {tags {"stream"} overrides {appendonly yes}} {
+ test {XADD with MINID > lastid can propagate correctly} {
+ for {set j 0} {$j < 100} {incr j} {
+ set id [expr {$j+1}]
+ r XADD mystream $id xitem v
+ }
+ r XADD mystream MINID 1 * xitem v
+ incr j
+ assert {[r xlen mystream] == $j}
+ r debug loadaof
+ r XADD mystream * xitem v
+ incr j
+ assert {[r xlen mystream] == $j}
+ }
+}
+
+start_server {tags {"stream"} overrides {appendonly yes}} {
test {XADD with ~ MAXLEN can propagate correctly} {
for {set j 0} {$j < 100} {incr j} {
r XADD mystream * xitem v
@@ -483,6 +575,52 @@ start_server {tags {"stream"} overrides {appendonly yes}} {
}
start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} {
+ test {XADD with ~ MAXLEN and LIMIT can propagate correctly} {
+ for {set j 0} {$j < 100} {incr j} {
+ r XADD mystream * xitem v
+ }
+ r XADD mystream MAXLEN ~ 55 LIMIT 30 * xitem v
+ assert {[r xlen mystream] == 71}
+ r config set stream-node-max-entries 1
+ r debug loadaof
+ r XADD mystream * xitem v
+ assert {[r xlen mystream] == 72}
+ }
+}
+
+start_server {tags {"stream"} overrides {appendonly yes}} {
+ test {XADD with ~ MINID can propagate correctly} {
+ for {set j 0} {$j < 100} {incr j} {
+ set id [expr {$j+1}]
+ r XADD mystream $id xitem v
+ }
+ r XADD mystream MINID ~ $j * xitem v
+ incr j
+ assert {[r xlen mystream] == $j}
+ r config set stream-node-max-entries 1
+ r debug loadaof
+ r XADD mystream * xitem v
+ incr j
+ assert {[r xlen mystream] == $j}
+ }
+}
+
+start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} {
+ test {XADD with ~ MINID and LIMIT can propagate correctly} {
+ for {set j 0} {$j < 100} {incr j} {
+ set id [expr {$j+1}]
+ r XADD mystream $id xitem v
+ }
+ r XADD mystream MINID ~ 55 LIMIT 30 * xitem v
+ assert {[r xlen mystream] == 71}
+ r config set stream-node-max-entries 1
+ r debug loadaof
+ r XADD mystream * xitem v
+ assert {[r xlen mystream] == 72}
+ }
+}
+
+start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} {
test {XTRIM with ~ MAXLEN can propagate correctly} {
for {set j 0} {$j < 100} {incr j} {
r XADD mystream * xitem v