summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c32
1 files changed, 29 insertions, 3 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index ad6d1c79a..3b2ddc2e4 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1871,7 +1871,7 @@ NULL
char *opt = c->argc > 2 ? c->argv[2]->ptr : "STREAM"; /* Subcommand. */
/* Lookup the key now, this is common for all the subcommands but HELP. */
- if (c->argc >= 3 && strcasecmp(opt,"HELP")) {
+ if (c->argc >= 2 && strcasecmp(opt,"HELP")) {
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL) return;
s = o->ptr;
@@ -1880,7 +1880,6 @@ NULL
/* Dispatch the different subcommands. */
if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
/* XINFO <key> CONSUMERS <group>. */
-
streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
if (cg == NULL) {
addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
@@ -1910,7 +1909,6 @@ NULL
raxStop(&ri);
} else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {
/* XINFO <key> GROUPS. */
-
if (s->cgroups == NULL) {
addReplyMultiBulkLen(c,0);
return;
@@ -1931,6 +1929,34 @@ NULL
addReplyLongLong(c,raxSize(cg->pel));
}
raxStop(&ri);
+ } else if (c->argc == 2 ||
+ (!strcasecmp(opt,"STREAM") && c->argc == 3))
+ {
+ /* XINFO <key> STREAM (or the alias XINFO <key>). */
+ addReplyMultiBulkLen(c,12);
+ addReplyStatus(c,"length");
+ addReplyLongLong(c,s->length);
+ addReplyStatus(c,"radix-tree-keys");
+ addReplyLongLong(c,raxSize(s->rax));
+ addReplyStatus(c,"radix-tree-nodes");
+ addReplyLongLong(c,s->rax->numnodes);
+ addReplyStatus(c,"groups");
+ addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
+
+ /* To emit the first/last entry we us the streamReplyWithRange()
+ * API. */
+ int count;
+ streamID start, end;
+ start.ms = start.seq = 0;
+ end.ms = end.seq = UINT64_MAX;
+ addReplyStatus(c,"first-entry");
+ count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
+ STREAM_RWR_RAWENTRIES);
+ if (!count) addReply(c,shared.nullbulk);
+ addReplyStatus(c,"last-entry");
+ count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
+ STREAM_RWR_RAWENTRIES);
+ if (!count) addReply(c,shared.nullbulk);
} else if (!strcasecmp(opt,"HELP")) {
addReplyHelp(c, help);
} else {