summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
authorItamar Haber <itamar@redis.com>2022-02-23 22:34:58 +0200
committerGitHub <noreply@github.com>2022-02-23 22:34:58 +0200
commitc81c7f51c38de6dff5ffc55b5184061b84c7ea5f (patch)
treec0a397c1a69990cfbef8b9766472c489f1f14f7e /src/t_stream.c
parentb857928ba7b1d40ef96de7b94bf115e2e59e3075 (diff)
downloadredis-c81c7f51c38de6dff5ffc55b5184061b84c7ea5f.tar.gz
Add stream consumer group lag tracking and reporting (#9127)
Adds the ability to track the lag of a consumer group (CG), that is, the number of entries yet-to-be-delivered from the stream. The proposed constant-time solution is in the spirit of "best-effort." Partially addresses #8737. ## Description of approach We add a new "entries_added" property to the stream. This starts at 0 for a new stream and is incremented by 1 with every `XADD`. It is essentially an all-time counter of the entries added to the stream. Given the stream's length and this counter value, we can trivially find the logical "entries_added" counter of the first ID if and only if the stream is contiguous. A fragmented stream contains one or more tombstones generated by `XDEL`s. The new "xdel_max_id" stream property tracks the latest tombstone. The CG also tracks its last delivered ID's as an "entries_read" counter and increments it independently when delivering new messages, unless the this read counter is invalid (-1 means invalid offset). When the CG's counter is available, the reported lag is the difference between added and read counters. Lastly, this also adds a "first_id" field to the stream structure in order to make looking it up cheaper in most cases. ## Limitations There are two cases in which the mechanism isn't able to track the lag. In these cases, `XINFO` replies with `null` in the "lag" field. The first case is when a CG is created with an arbitrary last delivered ID, that isn't "0-0", nor the first or the last entries of the stream. In this case, it is impossible to obtain a valid read counter (short of an O(N) operation). The second case is when there are one or more tombstones fragmenting the stream's entries range. In both cases, given enough time and assuming that the consumers are active (reading and lacking) and advancing, the CG should be able to catch up with the tip of the stream and report zero lag. Once that's achieved, lag tracking would resume as normal (until the next tombstone is set). ## API changes * `XGROUP CREATE` added with the optional named argument `[ENTRIESREAD entries-read]` for explicitly specifying the new CG's counter. * `XGROUP SETID` added with an optional positional argument `[ENTRIESREAD entries-read]` for specifying the CG's counter. * `XINFO` reports the maximal tombstone ID, the recorded first entry ID, and total number of entries added to the stream. * `XINFO` reports the current lag and logical read counter of CGs. * `XSETID` is an internal command that's used in replication/aof. It has been added with the optional positional arguments `[ENTRIESADDED entries-added] [MAXDELETEDID max-deleted-entry-id]` for propagating the CG's offset and maximal tombstone ID of the stream. ## The generic unsolved problem The current stream implementation doesn't provide an efficient way to obtain the approximate/exact size of a range of entries. While it could've been nice to have that ability (#5813) in general, let alone specifically in the context of CGs, the risk and complexities involved in such implementation are in all likelihood prohibitive. ## A refactoring note The `streamGetEdgeID` has been refactored to accommodate both the existing seek of any entry as well as seeking non-deleted entries (the addition of the `skip_tombstones` argument). Furthermore, this refactoring also migrated the seek logic to use the `streamIterator` (rather than `raxIterator`) that was, in turn, extended with the `skip_tombstones` Boolean struct field to control the emission of these. Co-authored-by: Guy Benoish <guy.benoish@redislabs.com> Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c417
1 files changed, 342 insertions, 75 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 9ff6a17ac..408c0199a 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -68,8 +68,13 @@ stream *streamNew(void) {
stream *s = zmalloc(sizeof(*s));
s->rax = raxNew();
s->length = 0;
+ s->first_id.ms = 0;
+ s->first_id.seq = 0;
s->last_id.ms = 0;
s->last_id.seq = 0;
+ s->max_deleted_entry_id.seq = 0;
+ s->max_deleted_entry_id.ms = 0;
+ s->entries_added = 0;
s->cgroups = NULL; /* Created on demand to save memory when not used. */
return s;
}
@@ -184,7 +189,10 @@ robj *streamDup(robj *o) {
new_lp, NULL);
}
new_s->length = s->length;
+ new_s->first_id = s->first_id;
new_s->last_id = s->last_id;
+ new_s->max_deleted_entry_id = s->max_deleted_entry_id;
+ new_s->entries_added = s->entries_added;
raxStop(&ri);
if (s->cgroups == NULL) return sobj;
@@ -196,7 +204,8 @@ robj *streamDup(robj *o) {
while (raxNext(&ri_cgroups)) {
streamCG *cg = ri_cgroups.data;
streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key,
- ri_cgroups.key_len, &cg->last_id);
+ ri_cgroups.key_len, &cg->last_id,
+ cg->entries_read);
serverAssert(new_cg != NULL);
@@ -378,37 +387,21 @@ int streamCompareID(streamID *a, streamID *b) {
return 0;
}
-void streamGetEdgeID(stream *s, int first, streamID *edge_id)
+/* Retrieves the ID of the stream edge entry. An edge is either the first or
+ * the last ID in the stream, and may be a tombstone. To filter out tombstones,
+ * set the'skip_tombstones' argument to 1. */
+void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id)
{
- raxIterator ri;
- raxStart(&ri, s->rax);
- int empty;
- if (first) {
- raxSeek(&ri, "^", NULL, 0);
- empty = !raxNext(&ri);
- } else {
- raxSeek(&ri, "$", NULL, 0);
- empty = !raxPrev(&ri);
- }
-
- if (empty) {
- /* Stream is empty, mark edge ID as lowest/highest possible. */
- edge_id->ms = first ? UINT64_MAX : 0;
- edge_id->seq = first ? UINT64_MAX : 0;
- raxStop(&ri);
- return;
+ streamIterator si;
+ int64_t numfields;
+ streamIteratorStart(&si,s,NULL,NULL,!first);
+ si.skip_tombstones = skip_tombstones;
+ int found = streamIteratorGetID(&si,edge_id,&numfields);
+ if (!found) {
+ streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX};
+ *edge_id = first ? max_id : min_id;
}
- unsigned char *lp = ri.data;
-
- /* Read the master ID from the radix tree key. */
- streamID master_id;
- streamDecodeID(ri.key, &master_id);
-
- /* Construct edge ID. */
- lpGetEdgeStreamID(lp, first, &master_id, edge_id);
-
- raxStop(&ri);
}
/* Adds a new item into the stream 's' having the specified number of
@@ -664,7 +657,9 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
if (ri.data != lp)
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
s->length++;
+ s->entries_added++;
s->last_id = id;
+ if (s->length == 1) s->first_id = id;
if (added_id) *added_id = id;
return C_OK;
}
@@ -842,7 +837,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
}
deleted += deleted_from_lp;
- /* Now we the entries/deleted counters. */
+ /* Now we update the entries/deleted counters. */
p = lpFirst(lp);
lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp);
p = lpNext(lp,p); /* Skip deleted field. */
@@ -864,8 +859,16 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
break; /* If we are here, there was enough to delete in the current
node, so no need to go to the next node. */
}
-
raxStop(&ri);
+
+ /* Update the stream's first ID after the trimming. */
+ if (s->length == 0) {
+ s->first_id.ms = 0;
+ s->first_id.seq = 0;
+ } else if (deleted) {
+ streamGetEdgeID(s,1,1,&s->first_id);
+ }
+
return deleted;
}
@@ -1089,9 +1092,10 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
}
}
si->stream = s;
- si->lp = NULL; /* There is no current listpack right now. */
+ si->lp = NULL; /* There is no current listpack right now. */
si->lp_ele = NULL; /* Current listpack cursor. */
- si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
+ si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
+ si->skip_tombstones = 1; /* By default tombstones aren't emitted. */
}
/* Return 1 and store the current item ID at 'id' if there are still
@@ -1189,10 +1193,10 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
serverAssert(*numfields>=0);
/* If current >= start, and the entry is not marked as
- * deleted, emit it. */
+ * deleted or tombstones are included, emit it. */
if (!si->rev) {
if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
- !(flags & STREAM_ITEM_FLAG_DELETED))
+ (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED)))
{
if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
return 0; /* We are already out of range. */
@@ -1203,7 +1207,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
}
} else {
if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 &&
- !(flags & STREAM_ITEM_FLAG_DELETED))
+ (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED)))
{
if (memcmp(buf,si->start_key,sizeof(streamID)) < 0)
return 0; /* We are already out of range. */
@@ -1270,7 +1274,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
int64_t aux;
/* We do not really delete the entry here. Instead we mark it as
- * deleted flagging it, and also incrementing the count of the
+ * deleted by flagging it, and also incrementing the count of the
* deleted entries in the listpack header.
*
* We start flagging: */
@@ -1314,7 +1318,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
streamIteratorStop(si);
streamIteratorStart(si,si->stream,&start,&end,si->rev);
- /* TODO: perform a garbage collection here if the ration between
+ /* TODO: perform a garbage collection here if the ratio between
* deleted and valid goes over a certain limit. */
}
@@ -1386,6 +1390,148 @@ robj *createObjectFromStreamID(streamID *id) {
id->ms,id->seq));
}
+/* Returns non-zero if the ID is 0-0. */
+int streamIDEqZero(streamID *id) {
+ return !(id->ms || id->seq);
+}
+
+/* A helper that returns non-zero if the range from 'start' to `end`
+ * contains a tombstone.
+ *
+ * NOTE: this assumes that the caller had verified that 'start' is less than
+ * 's->last_id'. */
+int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
+ streamID start_id, end_id;
+
+ if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) {
+ /* The stream is empty or has no tombstones. */
+ return 0;
+ }
+
+ if (streamCompareID(&s->first_id,&s->max_deleted_entry_id) > 0) {
+ /* The latest tombstone is before the first entry. */
+ return 0;
+ }
+
+ if (start) {
+ start_id = *start;
+ } else {
+ start_id.ms = 0;
+ start_id.seq = 0;
+ }
+
+ if (end) {
+ end_id = *end;
+ } else {
+ end_id.ms = UINT64_MAX;
+ end_id.seq = UINT64_MAX;
+ }
+
+ if (streamCompareID(&start_id,&s->max_deleted_entry_id) <= 0 &&
+ streamCompareID(&s->max_deleted_entry_id,&end_id) <= 0)
+ {
+ /* start_id <= max_deleted_entry_id <= end_id: The range does include a tombstone. */
+ return 1;
+ }
+
+ /* The range doesn't includes a tombstone. */
+ return 0;
+}
+
+/* Replies with a consumer group's current lag, that is the number of messages
+ * in the stream that are yet to be delivered. In case that the lag isn't
+ * available due to fragmentation, the reply to the client is a null. */
+void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) {
+ int valid = 0;
+ long long lag = 0;
+
+ if (!s->entries_added) {
+ /* The lag of a newly-initialized stream is 0. */
+ lag = 0;
+ valid = 1;
+ } else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) {
+ /* No fragmentation ahead means that the group's logical reads counter
+ * is valid for performing the lag calculation. */
+ lag = (long long)s->entries_added - cg->entries_read;
+ valid = 1;
+ } else {
+ /* Attempt to retrieve the group's last ID logical read counter. */
+ long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id);
+ if (entries_read != SCG_INVALID_ENTRIES_READ) {
+ /* A valid counter was obtained. */
+ lag = (long long)s->entries_added - entries_read;
+ valid = 1;
+ }
+ }
+
+ if (valid) {
+ addReplyLongLong(c,lag);
+ } else {
+ addReplyNull(c);
+ }
+}
+
+/* This function returns a value that is the ID's logical read counter, or its
+ * distance (the number of entries) from the first entry ever to have been added
+ * to the stream.
+ *
+ * A counter is returned only in one of the following cases:
+ * 1. The ID is the same as the stream's last ID. In this case, the returned
+ * is the same as the stream's entries_added counter.
+ * 2. The ID equals that of the currently first entry in the stream, and the
+ * stream has no tombstones. The returned value, in this case, is the result
+ * of subtracting the stream's length from its added_entries, incremented by
+ * one.
+ * 3. The ID less than the stream's first current entry's ID, and there are no
+ * tombstones. Here the estimated counter is the result of subtracting the
+ * stream's length from its added_entries.
+ * 4. The stream's added_entries is zero, meaning that no entries were ever
+ * added.
+ *
+ * The special return value of ULLONG_MAX signals that the counter's value isn't
+ * obtainable. It is returned in these cases:
+ * 1. The provided ID, if it even exists, is somewhere between the stream's
+ * current first and last entries' IDs, or in the future.
+ * 2. The stream contains one or more tombstones. */
+long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
+ /* The counter of any ID in an empty, never-before-used stream is 0. */
+ if (!s->entries_added) {
+ return 0;
+ }
+
+ /* In the empty stream, if the ID is smaller or equal to the last ID,
+ * it can set to the current added_entries value. */
+ if (!s->length && streamCompareID(id,&s->last_id) < 1) {
+ return s->entries_added;
+ }
+
+ int cmp_last = streamCompareID(id,&s->last_id);
+ if (cmp_last == 0) {
+ /* Return the exact counter of the last entry in the stream. */
+ return s->entries_added;
+ } else if (cmp_last > 0) {
+ /* The counter of a future ID is unknown. */
+ return SCG_INVALID_ENTRIES_READ;
+ }
+
+ int cmp_id_first = streamCompareID(id,&s->first_id);
+ int cmp_xdel_first = streamCompareID(&s->max_deleted_entry_id,&s->first_id);
+ if (streamIDEqZero(&s->max_deleted_entry_id) || cmp_xdel_first < 0) {
+ /* There's definitely no fragmentation ahead. */
+ if (cmp_id_first < 0) {
+ /* Return the estimated counter. */
+ return s->entries_added - s->length;
+ } else if (cmp_id_first == 0) {
+ /* Return the exact counter of the first entry in the stream. */
+ return s->entries_added - s->length + 1;
+ }
+ }
+
+ /* The ID is either before an XDEL that fragments the stream or an arbitrary
+ * ID. Either case, so we can't make a prediction. */
+ return SCG_INVALID_ENTRIES_READ;
+}
+
/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
* are created in the pending list of the stream and consumers. We need
* to propagate this changes in the form of XCLAIM commands. */
@@ -1425,19 +1571,22 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
* that was consumed by XREADGROUP with the NOACK option: in that case we can't
* propagate the last ID just using the XCLAIM LASTID option, so we emit
*
- * XGROUP SETID <key> <groupname> <id>
+ * XGROUP SETID <key> <groupname> <id> ENTRIESREAD <entries_read>
*/
void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
- robj *argv[5];
+ robj *argv[7];
argv[0] = shared.xgroup;
argv[1] = shared.setid;
argv[2] = key;
argv[3] = groupname;
argv[4] = createObjectFromStreamID(&group->last_id);
+ argv[5] = shared.entriesread;
+ argv[6] = createStringObjectFromLongLong(group->entries_read);
- alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
+ alsoPropagate(c->db->id,argv,7,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[4]);
+ decrRefCount(argv[6]);
}
/* We need this when we want to propagate creation of consumer that was created
@@ -1476,6 +1625,10 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds
* function will not return it to the client.
* 3. An entry in the pending list will be created for every entry delivered
* for the first time to this consumer.
+ * 4. The group's read counter is incremented if it is already valid and there
+ * are no future tombstones, or is invalidated (set to 0) otherwise. If the
+ * counter is invalid to begin with, we try to obtain it for the last
+ * delivered ID.
*
* The behavior may be modified passing non-zero flags:
*
@@ -1532,6 +1685,15 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0) {
+ if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&id,NULL)) {
+ /* A valid counter and no future tombstones mean we can
+ * increment the read counter to keep tracking the group's
+ * progress. */
+ group->entries_read++;
+ } else if (s->entries_added) {
+ /* The group's counter may be invalid, so we try to obtain it. */
+ group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id);
+ }
group->last_id = id;
/* Group last ID should be propagated only if NOACK was
* specified, otherwise the last id will be included
@@ -1805,7 +1967,7 @@ void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx)
arg = createStringObjectFromLongLong(s->length);
} else {
streamID first_id;
- streamGetEdgeID(s, 1, &first_id);
+ streamGetEdgeID(s,1,0,&first_id);
arg = createObjectFromStreamID(&first_id);
}
@@ -2298,10 +2460,10 @@ void streamFreeConsumer(streamConsumer *sc) {
}
/* Create a new consumer group in the context of the stream 's', having the
- * specified name and last server ID. If a consumer group with the same name
- * already existed NULL is returned, otherwise the pointer to the consumer
- * group is returned. */
-streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
+ * specified name, last server ID and reads counter. If a consumer group with
+ * the same name already exists NULL is returned, otherwise the pointer to the
+ * consumer group is returned. */
+streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) {
if (s->cgroups == NULL) s->cgroups = raxNew();
if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
return NULL;
@@ -2310,6 +2472,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
cg->pel = raxNew();
cg->consumers = raxNew();
cg->last_id = *id;
+ cg->entries_read = entries_read;
raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
return cg;
}
@@ -2389,8 +2552,8 @@ void streamDelConsumer(streamCG *cg, streamConsumer *consumer) {
* Consumer groups commands
* ----------------------------------------------------------------------- */
-/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM]
- * XGROUP SETID <key> <groupname> <id or $>
+/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] [ENTRIESADDED count]
+ * XGROUP SETID <key> <groupname> <id or $> [ENTRIESADDED count]
* XGROUP DESTROY <key> <groupname>
* XGROUP CREATECONSUMER <key> <groupname> <consumer>
* XGROUP DELCONSUMER <key> <groupname> <consumername> */
@@ -2400,21 +2563,33 @@ void xgroupCommand(client *c) {
streamCG *cg = NULL;
char *opt = c->argv[1]->ptr; /* Subcommand name. */
int mkstream = 0;
+ long long entries_read = SCG_INVALID_ENTRIES_READ;
robj *o;
- /* CREATE has an MKSTREAM option that creates the stream if it
- * does not exist. */
- if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {
- if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {
- addReplySubcommandSyntaxError(c);
- return;
- }
- mkstream = 1;
- grpname = c->argv[3]->ptr;
- }
-
/* Everything but the "HELP" option requires a key and group name. */
if (c->argc >= 4) {
+ /* Parse optional arguments for CREATE and SETID */
+ int i = 5;
+ int create_subcmd = !strcasecmp(opt,"CREATE");
+ int setid_subcmd = !strcasecmp(opt,"SETID");
+ while (i < c->argc) {
+ if (create_subcmd && !strcasecmp(c->argv[i]->ptr,"MKSTREAM")) {
+ mkstream = 1;
+ i++;
+ } else if ((create_subcmd || setid_subcmd) && !strcasecmp(c->argv[i]->ptr,"ENTRIESREAD") && i + 1 < c->argc) {
+ if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_read,NULL) != C_OK)
+ return;
+ if (entries_read < 0 && entries_read != SCG_INVALID_ENTRIES_READ) {
+ addReplyError(c,"value for ENTRIESREAD must be positive or -1");
+ return;
+ }
+ i += 2;
+ } else {
+ addReplySubcommandSyntaxError(c);
+ return;
+ }
+ }
+
o = lookupKeyWrite(c->db,c->argv[2]);
if (o) {
if (checkType(c,o,OBJ_STREAM)) return;
@@ -2454,18 +2629,20 @@ void xgroupCommand(client *c) {
" Create a new consumer group. Options are:",
" * MKSTREAM",
" Create the empty stream if it does not exist.",
+" * ENTRIESREAD entries_read",
+" Set the group's entries_read counter (internal use)."
"CREATECONSUMER <key> <groupname> <consumer>",
" Create a new consumer in the specified group.",
"DELCONSUMER <key> <groupname> <consumer>",
" Remove the specified consumer.",
"DESTROY <key> <groupname>",
" Remove the specified group.",
-"SETID <key> <groupname> <id|$>",
-" Set the current group ID.",
+"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]",
+" Set the current group ID and entries_read counter.",
NULL
};
addReplyHelp(c, help);
- } else if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) {
+ } else if (!strcasecmp(opt,"CREATE") && (c->argc >= 5 && c->argc <= 8)) {
streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) {
if (s) {
@@ -2487,7 +2664,7 @@ NULL
signalModifiedKey(c,c->db,c->argv[2]);
}
- streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
+ streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read);
if (cg) {
addReply(c,shared.ok);
server.dirty++;
@@ -2496,7 +2673,7 @@ NULL
} else {
addReplyError(c,"-BUSYGROUP Consumer Group name already exists");
}
- } else if (!strcasecmp(opt,"SETID") && c->argc == 5) {
+ } else if (!strcasecmp(opt,"SETID") && (c->argc == 5 || c->argc == 7)) {
streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) {
id = s->last_id;
@@ -2504,6 +2681,7 @@ NULL
return;
}
cg->last_id = id;
+ cg->entries_read = entries_read;
addReply(c,shared.ok);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id);
@@ -2542,16 +2720,46 @@ NULL
}
}
-/* XSETID <stream> <id>
+/* XSETID <stream> <id> [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id]
*
- * Set the internal "last ID" of a stream. */
+ * Set the internal "last ID", "added entries" and "maximal deleted entry ID"
+ * of a stream. */
void xsetidCommand(client *c) {
+ streamID id, max_xdel_id = {0, 0};
+ long long entries_added = -1;
+
+ if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK)
+ return;
+
+ int i = 3;
+ while (i < c->argc) {
+ int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
+ char *opt = c->argv[i]->ptr;
+ if (!strcasecmp(opt,"ENTRIESADDED") && moreargs) {
+ if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_added,NULL) != C_OK) {
+ return;
+ } else if (entries_added < 0) {
+ addReplyError(c,"entries_added must be positive");
+ return;
+ }
+ i += 2;
+ } else if (!strcasecmp(opt,"MAXDELETEDID") && moreargs) {
+ if (streamParseStrictIDOrReply(c,c->argv[i+1],&max_xdel_id,0,NULL) != C_OK) {
+ return;
+ } else if (streamCompareID(&id,&max_xdel_id) < 0) {
+ addReplyError(c,"The ID specified in XSETID is smaller than the provided max_deleted_entry_id");
+ return;
+ }
+ i += 2;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ }
+
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
-
stream *s = o->ptr;
- streamID id;
- if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) return;
/* If the stream has at least one item, we want to check that the user
* is setting a last ID that is equal or greater than the current top
@@ -2561,12 +2769,22 @@ void xsetidCommand(client *c) {
streamLastValidID(s,&maxid);
if (streamCompareID(&id,&maxid) < 0) {
- addReplyError(c,"The ID specified in XSETID is smaller than the "
- "target stream top item");
+ addReplyError(c,"The ID specified in XSETID is smaller than the target stream top item");
+ return;
+ }
+
+ /* If an entries_added was provided, it can't be lower than the length. */
+ if (entries_added != -1 && s->length > (uint64_t)entries_added) {
+ addReplyError(c,"The entries_added specified in XSETID is smaller than the target stream length");
return;
}
}
+
s->last_id = id;
+ if (entries_added != -1)
+ s->entries_added = entries_added;
+ if (!streamIDEqZero(&max_xdel_id))
+ s->max_deleted_entry_id = max_xdel_id;
addReply(c,shared.ok);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id);
@@ -3289,8 +3507,31 @@ void xdelCommand(client *c) {
/* Actually apply the command. */
int deleted = 0;
+ int first_entry = 0;
for (int j = 2; j < c->argc; j++) {
- deleted += streamDeleteItem(s,&ids[j-2]);
+ streamID *id = &ids[j-2];
+ if (streamDeleteItem(s,id)) {
+ /* We want to know if the first entry in the stream was deleted
+ * so we can later set the new one. */
+ if (streamCompareID(id,&s->first_id) == 0) {
+ first_entry = 1;
+ }
+ /* Update the stream's maximal tombstone if needed. */
+ if (streamCompareID(id,&s->max_deleted_entry_id) > 0) {
+ s->max_deleted_entry_id = *id;
+ }
+ deleted++;
+ };
+ }
+
+ /* Update the stream's first ID. */
+ if (deleted) {
+ if (s->length == 0) {
+ s->first_id.ms = 0;
+ s->first_id.seq = 0;
+ } else if (first_entry) {
+ streamGetEdgeID(s,1,1,&s->first_id);
+ }
}
/* Propagate the write if needed. */
@@ -3398,7 +3639,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
}
}
- addReplyMapLen(c,full ? 6 : 7);
+ addReplyMapLen(c,full ? 9 : 10);
addReplyBulkCString(c,"length");
addReplyLongLong(c,s->length);
addReplyBulkCString(c,"radix-tree-keys");
@@ -3407,6 +3648,12 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
addReplyLongLong(c,s->rax->numnodes);
addReplyBulkCString(c,"last-generated-id");
addReplyStreamID(c,&s->last_id);
+ addReplyBulkCString(c,"max-deleted-entry-id");
+ addReplyStreamID(c,&s->max_deleted_entry_id);
+ addReplyBulkCString(c,"entries-added");
+ addReplyLongLong(c,s->entries_added);
+ addReplyBulkCString(c,"recorded-first-entry-id");
+ addReplyStreamID(c,&s->first_id);
if (!full) {
/* XINFO STREAM <key> */
@@ -3445,7 +3692,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
raxSeek(&ri_cgroups,"^",NULL,0);
while(raxNext(&ri_cgroups)) {
streamCG *cg = ri_cgroups.data;
- addReplyMapLen(c,5);
+ addReplyMapLen(c,7);
/* Name */
addReplyBulkCString(c,"name");
@@ -3455,6 +3702,18 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
addReplyBulkCString(c,"last-delivered-id");
addReplyStreamID(c,&cg->last_id);
+ /* Read counter of the last delivered ID */
+ addReplyBulkCString(c,"entries-read");
+ if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
+ addReplyLongLong(c,cg->entries_read);
+ } else {
+ addReplyNull(c);
+ }
+
+ /* Group lag */
+ addReplyBulkCString(c,"lag");
+ streamReplyWithCGLag(c,s,cg);
+
/* Group PEL count */
addReplyBulkCString(c,"pel-count");
addReplyLongLong(c,raxSize(cg->pel));
@@ -3632,7 +3891,7 @@ NULL
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *cg = ri.data;
- addReplyMapLen(c,4);
+ addReplyMapLen(c,6);
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,ri.key,ri.key_len);
addReplyBulkCString(c,"consumers");
@@ -3641,6 +3900,14 @@ NULL
addReplyLongLong(c,raxSize(cg->pel));
addReplyBulkCString(c,"last-delivered-id");
addReplyStreamID(c,&cg->last_id);
+ addReplyBulkCString(c,"entries-read");
+ if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
+ addReplyLongLong(c,cg->entries_read);
+ } else {
+ addReplyNull(c);
+ }
+ addReplyBulkCString(c,"lag");
+ streamReplyWithCGLag(c,s,cg);
}
raxStop(&ri);
} else if (!strcasecmp(opt,"STREAM")) {