summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/t_stream.c33
1 files changed, 31 insertions, 2 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 7944ab398..ad6d1c79a 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1863,6 +1863,7 @@ void xinfoCommand(client *c) {
"<key> CONSUMERS <groupname> -- Show consumer groups of group <groupname>.",
"<key> GROUPS -- Show the stream consumer groups.",
"<key> STREAM -- Show information about the stream.",
+"<key> (without subcommand) -- Alias for <key> STREAM.",
"<key> HELP -- Prints this help.",
NULL
};
@@ -1878,6 +1879,8 @@ NULL
/* Dispatch the different subcommands. */
if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
+ /* XINFO <key> CONSUMERS <group>. */
+
streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
if (cg == NULL) {
addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
@@ -1896,16 +1899,42 @@ NULL
mstime_t idle = now - consumer->seen_time;
if (idle < 0) idle = 0;
- addReplyMultiBulkLen(c,3);
+ addReplyMultiBulkLen(c,6);
+ addReplyStatus(c,"name");
addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
+ addReplyStatus(c,"pending");
addReplyLongLong(c,raxSize(consumer->pel));
+ addReplyStatus(c,"idle");
addReplyLongLong(c,idle);
}
raxStop(&ri);
+ } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {
+ /* XINFO <key> GROUPS. */
+
+ if (s->cgroups == NULL) {
+ addReplyMultiBulkLen(c,0);
+ return;
+ }
+
+ addReplyMultiBulkLen(c,raxSize(s->cgroups));
+ raxIterator ri;
+ raxStart(&ri,s->cgroups);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ streamCG *cg = ri.data;
+ addReplyMultiBulkLen(c,6);
+ addReplyStatus(c,"name");
+ addReplyBulkCBuffer(c,ri.key,ri.key_len);
+ addReplyStatus(c,"consumers");
+ addReplyLongLong(c,raxSize(cg->consumers));
+ addReplyStatus(c,"pending");
+ addReplyLongLong(c,raxSize(cg->pel));
+ }
+ raxStop(&ri);
} else if (!strcasecmp(opt,"HELP")) {
addReplyHelp(c, help);
} else {
- addReply(c,shared.syntaxerr);
+ addReplyError(c,"syntax error, try 'XINFO anykey HELP'");
}
}