summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-03-13 16:54:23 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commit66143616153eeeb2067b8960221c30110eb39aae (patch)
tree9e4c35cd85d8159b854fc0161abc360afb034ffc
parentd7d8cd0b2fa0523b575504d4800c2f1f05dd1a90 (diff)
downloadredis-66143616153eeeb2067b8960221c30110eb39aae.tar.gz
CG: XINFO STREAM.
-rw-r--r--src/t_stream.c32
1 files changed, 29 insertions, 3 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index ad6d1c79a..3b2ddc2e4 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1871,7 +1871,7 @@ NULL
char *opt = c->argc > 2 ? c->argv[2]->ptr : "STREAM"; /* Subcommand. */
/* Lookup the key now, this is common for all the subcommands but HELP. */
- if (c->argc >= 3 && strcasecmp(opt,"HELP")) {
+ if (c->argc >= 2 && strcasecmp(opt,"HELP")) {
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL) return;
s = o->ptr;
@@ -1880,7 +1880,6 @@ NULL
/* Dispatch the different subcommands. */
if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
/* XINFO <key> CONSUMERS <group>. */
-
streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
if (cg == NULL) {
addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
@@ -1910,7 +1909,6 @@ NULL
raxStop(&ri);
} else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {
/* XINFO <key> GROUPS. */
-
if (s->cgroups == NULL) {
addReplyMultiBulkLen(c,0);
return;
@@ -1931,6 +1929,34 @@ NULL
addReplyLongLong(c,raxSize(cg->pel));
}
raxStop(&ri);
+ } else if (c->argc == 2 ||
+ (!strcasecmp(opt,"STREAM") && c->argc == 3))
+ {
+ /* XINFO <key> STREAM (or the alias XINFO <key>). */
+ addReplyMultiBulkLen(c,12);
+ addReplyStatus(c,"length");
+ addReplyLongLong(c,s->length);
+ addReplyStatus(c,"radix-tree-keys");
+ addReplyLongLong(c,raxSize(s->rax));
+ addReplyStatus(c,"radix-tree-nodes");
+ addReplyLongLong(c,s->rax->numnodes);
+ addReplyStatus(c,"groups");
+ addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
+
+ /* To emit the first/last entry we us the streamReplyWithRange()
+ * API. */
+ int count;
+ streamID start, end;
+ start.ms = start.seq = 0;
+ end.ms = end.seq = UINT64_MAX;
+ addReplyStatus(c,"first-entry");
+ count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
+ STREAM_RWR_RAWENTRIES);
+ if (!count) addReply(c,shared.nullbulk);
+ addReplyStatus(c,"last-entry");
+ count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
+ STREAM_RWR_RAWENTRIES);
+ if (!count) addReply(c,shared.nullbulk);
} else if (!strcasecmp(opt,"HELP")) {
addReplyHelp(c, help);
} else {