summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
authorGuy Benoish <guy.benoish@redislabs.com>2020-04-22 16:05:57 +0300
committerGuy Benoish <guy.benoish@redislabs.com>2020-04-28 13:03:43 +0300
commit1e2aee39192c8839170743d324c799860dc38c8c (patch)
tree2bf7ee4df9ceca919386c219476160307d149bab /src/t_stream.c
parent3e738c8a6c0ad87a0086a46fac3c0505f5190cbc (diff)
downloadredis-1e2aee39192c8839170743d324c799860dc38c8c.tar.gz
Extend XINFO STREAM output
Introducing XINFO STREAM <key> FULL
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c226
1 files changed, 192 insertions, 34 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 155167af9..508c22225 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -2475,16 +2475,200 @@ void xtrimCommand(client *c) {
addReplyLongLong(c,deleted);
}
+/* Helper function for xinfoCommand.
+ * Handles the variants of XINFO STREAM */
+void xinfoReplyWithStreamInfo(client *c, stream *s) {
+ int full = 1;
+ long long count = 0;
+ robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */
+ int optc = c->argc - 3;
+
+ /* Parse options. */
+ if (optc == 0) {
+ full = 0;
+ } else {
+ /* Valid options are [FULL] or [FULL COUNT <count>] */
+ if (optc != 1 && optc != 3) {
+ addReplySubcommandSyntaxError(c);
+ return;
+ }
+
+ /* First option must be "FULL" */
+ if (strcasecmp(optv[0]->ptr,"full")) {
+ addReplySubcommandSyntaxError(c);
+ return;
+ }
+
+ if (optc == 3) {
+ /* First option must be "FULL" */
+ if (strcasecmp(optv[1]->ptr,"count")) {
+ addReplySubcommandSyntaxError(c);
+ return;
+ }
+ if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL) == C_ERR)
+ return;
+ if (count < 0) count = 0;
+ }
+ }
+
+ addReplyMapLen(c,full ? 6 : 7);
+ addReplyBulkCString(c,"length");
+ addReplyLongLong(c,s->length);
+ addReplyBulkCString(c,"radix-tree-keys");
+ addReplyLongLong(c,raxSize(s->rax));
+ addReplyBulkCString(c,"radix-tree-nodes");
+ addReplyLongLong(c,s->rax->numnodes);
+ addReplyBulkCString(c,"last-generated-id");
+ addReplyStreamID(c,&s->last_id);
+
+ if (!full) {
+ /* XINFO STREAM <key> */
+
+ addReplyBulkCString(c,"groups");
+ addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
+
+ /* To emit the first/last entry we use streamReplyWithRange(). */
+ int emitted;
+ streamID start, end;
+ start.ms = start.seq = 0;
+ end.ms = end.seq = UINT64_MAX;
+ addReplyBulkCString(c,"first-entry");
+ emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
+ STREAM_RWR_RAWENTRIES,NULL);
+ if (!emitted) addReplyNull(c);
+ addReplyBulkCString(c,"last-entry");
+ emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
+ STREAM_RWR_RAWENTRIES,NULL);
+ if (!emitted) addReplyNull(c);
+ } else {
+ /* XINFO STREAM <key> FULL [COUNT <count>] */
+
+ /* Stream entries */
+ addReplyBulkCString(c,"entries");
+ streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL);
+
+ /* Consumer groups */
+ addReplyBulkCString(c,"groups");
+ if (s->cgroups == NULL) {
+ addReplyArrayLen(c,0);
+ } else {
+ addReplyArrayLen(c,raxSize(s->cgroups));
+ raxIterator ri_cgroups;
+ raxStart(&ri_cgroups,s->cgroups);
+ raxSeek(&ri_cgroups,"^",NULL,0);
+ while(raxNext(&ri_cgroups)) {
+ streamCG *cg = ri_cgroups.data;
+ addReplyMapLen(c,5);
+
+ /* Name */
+ addReplyBulkCString(c,"name");
+ addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len);
+
+ /* Last delivered ID */
+ addReplyBulkCString(c,"last-delivered-id");
+ addReplyStreamID(c,&cg->last_id);
+
+ /* Group PEL count */
+ addReplyBulkCString(c,"pel-count");
+ addReplyLongLong(c,raxSize(cg->pel));
+
+ /* Group PEL */
+ addReplyBulkCString(c,"pending");
+ long long arraylen_cg_pel = 0;
+ void *arrayptr_cg_pel = addReplyDeferredLen(c);
+ raxIterator ri_cg_pel;
+ raxStart(&ri_cg_pel,cg->pel);
+ raxSeek(&ri_cg_pel,"^",NULL,0);
+ while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) {
+ streamNACK *nack = ri_cg_pel.data;
+ addReplyArrayLen(c,4);
+
+ /* Entry ID. */
+ streamID id;
+ streamDecodeID(ri_cg_pel.key,&id);
+ addReplyStreamID(c,&id);
+
+ /* Consumer name. */
+ addReplyBulkCBuffer(c,nack->consumer->name,
+ sdslen(nack->consumer->name));
+
+ /* Last delivery. */
+ addReplyLongLong(c,nack->delivery_time);
+
+ /* Number of deliveries. */
+ addReplyLongLong(c,nack->delivery_count);
+
+ arraylen_cg_pel++;
+ }
+ setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel);
+ raxStop(&ri_cg_pel);
+
+ /* Consumers */
+ addReplyBulkCString(c,"consumers");
+ addReplyArrayLen(c,raxSize(cg->consumers));
+ raxIterator ri_consumers;
+ raxStart(&ri_consumers,cg->consumers);
+ raxSeek(&ri_consumers,"^",NULL,0);
+ while(raxNext(&ri_consumers)) {
+ streamConsumer *consumer = ri_consumers.data;
+ addReplyMapLen(c,4);
+
+ /* Consumer name */
+ addReplyBulkCString(c,"name");
+ addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
+
+ /* Seen-time */
+ addReplyBulkCString(c,"seen-time");
+ addReplyLongLong(c,consumer->seen_time);
+
+ /* Consumer PEL count */
+ addReplyBulkCString(c,"pel-count");
+ addReplyLongLong(c,raxSize(consumer->pel));
+
+ /* Consumer PEL */
+ addReplyBulkCString(c,"pending");
+ long long arraylen_cpel = 0;
+ void *arrayptr_cpel = addReplyDeferredLen(c);
+ raxIterator ri_cpel;
+ raxStart(&ri_cpel,consumer->pel);
+ raxSeek(&ri_cpel,"^",NULL,0);
+ while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) {
+ streamNACK *nack = ri_cpel.data;
+ addReplyArrayLen(c,3);
+
+ /* Entry ID. */
+ streamID id;
+ streamDecodeID(ri_cpel.key,&id);
+ addReplyStreamID(c,&id);
+
+ /* Last delivery. */
+ addReplyLongLong(c,nack->delivery_time);
+
+ /* Number of deliveries. */
+ addReplyLongLong(c,nack->delivery_count);
+
+ arraylen_cpel++;
+ }
+ setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel);
+ raxStop(&ri_cpel);
+ }
+ raxStop(&ri_consumers);
+ }
+ raxStop(&ri_cgroups);
+ }
+ }
+}
+
/* XINFO CONSUMERS <key> <group>
* XINFO GROUPS <key>
- * XINFO STREAM <key>
+ * XINFO STREAM <key> [FULL [COUNT <count>]]
* XINFO HELP. */
void xinfoCommand(client *c) {
const char *help[] = {
-"CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.",
-"GROUPS <key> -- Show the stream consumer groups.",
-"STREAM <key> -- Show information about the stream.",
-"HELP -- Print this help.",
+"CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.",
+"GROUPS <key> -- Show the stream consumer groups.",
+"STREAM <key> [FULL [COUNT <count>]] -- Show information about the stream.",
+"HELP -- Print this help.",
NULL
};
stream *s = NULL;
@@ -2564,36 +2748,10 @@ NULL
addReplyStreamID(c,&cg->last_id);
}
raxStop(&ri);
- } else if (!strcasecmp(opt,"STREAM") && c->argc == 3) {
- /* XINFO STREAM <key> (or the alias XINFO <key>). */
- addReplyMapLen(c,7);
- addReplyBulkCString(c,"length");
- addReplyLongLong(c,s->length);
- addReplyBulkCString(c,"radix-tree-keys");
- addReplyLongLong(c,raxSize(s->rax));
- addReplyBulkCString(c,"radix-tree-nodes");
- addReplyLongLong(c,s->rax->numnodes);
- addReplyBulkCString(c,"groups");
- addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
- addReplyBulkCString(c,"last-generated-id");
- addReplyStreamID(c,&s->last_id);
-
- /* To emit the first/last entry we us the streamReplyWithRange()
- * API. */
- int count;
- streamID start, end;
- start.ms = start.seq = 0;
- end.ms = end.seq = UINT64_MAX;
- addReplyBulkCString(c,"first-entry");
- count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
- STREAM_RWR_RAWENTRIES,NULL);
- if (!count) addReplyNull(c);
- addReplyBulkCString(c,"last-entry");
- count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
- STREAM_RWR_RAWENTRIES,NULL);
- if (!count) addReplyNull(c);
+ } else if (!strcasecmp(opt,"STREAM")) {
+ /* XINFO STREAM <key> [FULL [COUNT <count>]]. */
+ xinfoReplyWithStreamInfo(c,s);
} else {
addReplySubcommandSyntaxError(c);
}
}
-