diff options
author | Guy Benoish <guy.benoish@redislabs.com> | 2020-04-22 16:05:57 +0300 |
---|---|---|
committer | Guy Benoish <guy.benoish@redislabs.com> | 2020-04-28 13:03:43 +0300 |
commit | 1e2aee39192c8839170743d324c799860dc38c8c (patch) | |
tree | 2bf7ee4df9ceca919386c219476160307d149bab /src/t_stream.c | |
parent | 3e738c8a6c0ad87a0086a46fac3c0505f5190cbc (diff) | |
download | redis-1e2aee39192c8839170743d324c799860dc38c8c.tar.gz |
Extend XINFO STREAM output
Introducing XINFO STREAM <key> FULL
Diffstat (limited to 'src/t_stream.c')
-rw-r--r-- | src/t_stream.c | 226 |
1 files changed, 192 insertions, 34 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index 155167af9..508c22225 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2475,16 +2475,200 @@ void xtrimCommand(client *c) { addReplyLongLong(c,deleted); } +/* Helper function for xinfoCommand. + * Handles the variants of XINFO STREAM */ +void xinfoReplyWithStreamInfo(client *c, stream *s) { + int full = 1; + long long count = 0; + robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */ + int optc = c->argc - 3; + + /* Parse options. */ + if (optc == 0) { + full = 0; + } else { + /* Valid options are [FULL] or [FULL COUNT <count>] */ + if (optc != 1 && optc != 3) { + addReplySubcommandSyntaxError(c); + return; + } + + /* First option must be "FULL" */ + if (strcasecmp(optv[0]->ptr,"full")) { + addReplySubcommandSyntaxError(c); + return; + } + + if (optc == 3) { + /* First option must be "FULL" */ + if (strcasecmp(optv[1]->ptr,"count")) { + addReplySubcommandSyntaxError(c); + return; + } + if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL) == C_ERR) + return; + if (count < 0) count = 0; + } + } + + addReplyMapLen(c,full ? 6 : 7); + addReplyBulkCString(c,"length"); + addReplyLongLong(c,s->length); + addReplyBulkCString(c,"radix-tree-keys"); + addReplyLongLong(c,raxSize(s->rax)); + addReplyBulkCString(c,"radix-tree-nodes"); + addReplyLongLong(c,s->rax->numnodes); + addReplyBulkCString(c,"last-generated-id"); + addReplyStreamID(c,&s->last_id); + + if (!full) { + /* XINFO STREAM <key> */ + + addReplyBulkCString(c,"groups"); + addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); + + /* To emit the first/last entry we use streamReplyWithRange(). */ + int emitted; + streamID start, end; + start.ms = start.seq = 0; + end.ms = end.seq = UINT64_MAX; + addReplyBulkCString(c,"first-entry"); + emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, + STREAM_RWR_RAWENTRIES,NULL); + if (!emitted) addReplyNull(c); + addReplyBulkCString(c,"last-entry"); + emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, + STREAM_RWR_RAWENTRIES,NULL); + if (!emitted) addReplyNull(c); + } else { + /* XINFO STREAM <key> FULL [COUNT <count>] */ + + /* Stream entries */ + addReplyBulkCString(c,"entries"); + streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL); + + /* Consumer groups */ + addReplyBulkCString(c,"groups"); + if (s->cgroups == NULL) { + addReplyArrayLen(c,0); + } else { + addReplyArrayLen(c,raxSize(s->cgroups)); + raxIterator ri_cgroups; + raxStart(&ri_cgroups,s->cgroups); + raxSeek(&ri_cgroups,"^",NULL,0); + while(raxNext(&ri_cgroups)) { + streamCG *cg = ri_cgroups.data; + addReplyMapLen(c,5); + + /* Name */ + addReplyBulkCString(c,"name"); + addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len); + + /* Last delivered ID */ + addReplyBulkCString(c,"last-delivered-id"); + addReplyStreamID(c,&cg->last_id); + + /* Group PEL count */ + addReplyBulkCString(c,"pel-count"); + addReplyLongLong(c,raxSize(cg->pel)); + + /* Group PEL */ + addReplyBulkCString(c,"pending"); + long long arraylen_cg_pel = 0; + void *arrayptr_cg_pel = addReplyDeferredLen(c); + raxIterator ri_cg_pel; + raxStart(&ri_cg_pel,cg->pel); + raxSeek(&ri_cg_pel,"^",NULL,0); + while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) { + streamNACK *nack = ri_cg_pel.data; + addReplyArrayLen(c,4); + + /* Entry ID. */ + streamID id; + streamDecodeID(ri_cg_pel.key,&id); + addReplyStreamID(c,&id); + + /* Consumer name. */ + addReplyBulkCBuffer(c,nack->consumer->name, + sdslen(nack->consumer->name)); + + /* Last delivery. */ + addReplyLongLong(c,nack->delivery_time); + + /* Number of deliveries. */ + addReplyLongLong(c,nack->delivery_count); + + arraylen_cg_pel++; + } + setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel); + raxStop(&ri_cg_pel); + + /* Consumers */ + addReplyBulkCString(c,"consumers"); + addReplyArrayLen(c,raxSize(cg->consumers)); + raxIterator ri_consumers; + raxStart(&ri_consumers,cg->consumers); + raxSeek(&ri_consumers,"^",NULL,0); + while(raxNext(&ri_consumers)) { + streamConsumer *consumer = ri_consumers.data; + addReplyMapLen(c,4); + + /* Consumer name */ + addReplyBulkCString(c,"name"); + addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); + + /* Seen-time */ + addReplyBulkCString(c,"seen-time"); + addReplyLongLong(c,consumer->seen_time); + + /* Consumer PEL count */ + addReplyBulkCString(c,"pel-count"); + addReplyLongLong(c,raxSize(consumer->pel)); + + /* Consumer PEL */ + addReplyBulkCString(c,"pending"); + long long arraylen_cpel = 0; + void *arrayptr_cpel = addReplyDeferredLen(c); + raxIterator ri_cpel; + raxStart(&ri_cpel,consumer->pel); + raxSeek(&ri_cpel,"^",NULL,0); + while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) { + streamNACK *nack = ri_cpel.data; + addReplyArrayLen(c,3); + + /* Entry ID. */ + streamID id; + streamDecodeID(ri_cpel.key,&id); + addReplyStreamID(c,&id); + + /* Last delivery. */ + addReplyLongLong(c,nack->delivery_time); + + /* Number of deliveries. */ + addReplyLongLong(c,nack->delivery_count); + + arraylen_cpel++; + } + setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel); + raxStop(&ri_cpel); + } + raxStop(&ri_consumers); + } + raxStop(&ri_cgroups); + } + } +} + /* XINFO CONSUMERS <key> <group> * XINFO GROUPS <key> - * XINFO STREAM <key> + * XINFO STREAM <key> [FULL [COUNT <count>]] * XINFO HELP. */ void xinfoCommand(client *c) { const char *help[] = { -"CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.", -"GROUPS <key> -- Show the stream consumer groups.", -"STREAM <key> -- Show information about the stream.", -"HELP -- Print this help.", +"CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.", +"GROUPS <key> -- Show the stream consumer groups.", +"STREAM <key> [FULL [COUNT <count>]] -- Show information about the stream.", +"HELP -- Print this help.", NULL }; stream *s = NULL; @@ -2564,36 +2748,10 @@ NULL addReplyStreamID(c,&cg->last_id); } raxStop(&ri); - } else if (!strcasecmp(opt,"STREAM") && c->argc == 3) { - /* XINFO STREAM <key> (or the alias XINFO <key>). */ - addReplyMapLen(c,7); - addReplyBulkCString(c,"length"); - addReplyLongLong(c,s->length); - addReplyBulkCString(c,"radix-tree-keys"); - addReplyLongLong(c,raxSize(s->rax)); - addReplyBulkCString(c,"radix-tree-nodes"); - addReplyLongLong(c,s->rax->numnodes); - addReplyBulkCString(c,"groups"); - addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); - addReplyBulkCString(c,"last-generated-id"); - addReplyStreamID(c,&s->last_id); - - /* To emit the first/last entry we us the streamReplyWithRange() - * API. */ - int count; - streamID start, end; - start.ms = start.seq = 0; - end.ms = end.seq = UINT64_MAX; - addReplyBulkCString(c,"first-entry"); - count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); - if (!count) addReplyNull(c); - addReplyBulkCString(c,"last-entry"); - count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); - if (!count) addReplyNull(c); + } else if (!strcasecmp(opt,"STREAM")) { + /* XINFO STREAM <key> [FULL [COUNT <count>]]. */ + xinfoReplyWithStreamInfo(c,s); } else { addReplySubcommandSyntaxError(c); } } - |