summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-03-07 16:08:06 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commit0cf6b1e3ae303214f0fdc73890cac2db41a75225 (patch)
tree16fa123f9d7bc652054aa45b54772ab4e2607c89
parent67eeeb0b1084a72f2eca296235f732bb8d1d472c (diff)
downloadredis-0cf6b1e3ae303214f0fdc73890cac2db41a75225.tar.gz
CG: XINFO CONSUMERS implemented.
-rw-r--r--src/server.c1
-rw-r--r--src/server.h1
-rw-r--r--src/t_stream.c51
3 files changed, 53 insertions, 0 deletions
diff --git a/src/server.c b/src/server.c
index 8c8300439..6f06a33a0 100644
--- a/src/server.c
+++ b/src/server.c
@@ -312,6 +312,7 @@ struct redisCommand redisCommandTable[] = {
{"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0},
{"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0},
{"xclaim",xclaimCommand,-5,"wF",0,NULL,1,1,1,0,0},
+ {"xinfo",xinfoCommand,-2,"r",0,NULL,1,1,1,0,0},
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
diff --git a/src/server.h b/src/server.h
index 76aaab88f..046e84b45 100644
--- a/src/server.h
+++ b/src/server.h
@@ -2029,6 +2029,7 @@ void xgroupCommand(client *c);
void xackCommand(client *c);
void xpendingCommand(client *c);
void xclaimCommand(client *c);
+void xinfoCommand(client *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
diff --git a/src/t_stream.c b/src/t_stream.c
index 3b7d022a1..7944ab398 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1858,3 +1858,54 @@ void xclaimCommand(client *c) {
}
/* XINFO <key> [CONSUMERS group|GROUPS|STREAM]. STREAM is the default */
+void xinfoCommand(client *c) {
+ const char *help[] = {
+"<key> CONSUMERS <groupname> -- Show consumer groups of group <groupname>.",
+"<key> GROUPS -- Show the stream consumer groups.",
+"<key> STREAM -- Show information about the stream.",
+"<key> HELP -- Prints this help.",
+NULL
+ };
+ stream *s = NULL;
+ char *opt = c->argc > 2 ? c->argv[2]->ptr : "STREAM"; /* Subcommand. */
+
+ /* Lookup the key now, this is common for all the subcommands but HELP. */
+ if (c->argc >= 3 && strcasecmp(opt,"HELP")) {
+ robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
+ if (o == NULL) return;
+ s = o->ptr;
+ }
+
+ /* Dispatch the different subcommands. */
+ if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
+ streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
+ if (cg == NULL) {
+ addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
+ "for key name '%s'",
+ c->argv[3]->ptr, c->argv[1]->ptr);
+ return;
+ }
+
+ addReplyMultiBulkLen(c,raxSize(cg->consumers));
+ raxIterator ri;
+ raxStart(&ri,cg->consumers);
+ raxSeek(&ri,"^",NULL,0);
+ mstime_t now = mstime();
+ while(raxNext(&ri)) {
+ streamConsumer *consumer = ri.data;
+ mstime_t idle = now - consumer->seen_time;
+ if (idle < 0) idle = 0;
+
+ addReplyMultiBulkLen(c,3);
+ addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
+ addReplyLongLong(c,raxSize(consumer->pel));
+ addReplyLongLong(c,idle);
+ }
+ raxStop(&ri);
+ } else if (!strcasecmp(opt,"HELP")) {
+ addReplyHelp(c, help);
+ } else {
+ addReply(c,shared.syntaxerr);
+ }
+}
+