diff options
author | antirez <antirez@gmail.com> | 2018-03-07 16:08:06 +0100 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2018-03-15 12:54:10 +0100 |
commit | 0cf6b1e3ae303214f0fdc73890cac2db41a75225 (patch) | |
tree | 16fa123f9d7bc652054aa45b54772ab4e2607c89 | |
parent | 67eeeb0b1084a72f2eca296235f732bb8d1d472c (diff) | |
download | redis-0cf6b1e3ae303214f0fdc73890cac2db41a75225.tar.gz |
CG: XINFO CONSUMERS implemented.
-rw-r--r-- | src/server.c | 1 | ||||
-rw-r--r-- | src/server.h | 1 | ||||
-rw-r--r-- | src/t_stream.c | 51 |
3 files changed, 53 insertions, 0 deletions
diff --git a/src/server.c b/src/server.c index 8c8300439..6f06a33a0 100644 --- a/src/server.c +++ b/src/server.c @@ -312,6 +312,7 @@ struct redisCommand redisCommandTable[] = { {"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0}, {"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0}, {"xclaim",xclaimCommand,-5,"wF",0,NULL,1,1,1,0,0}, + {"xinfo",xinfoCommand,-2,"r",0,NULL,1,1,1,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} diff --git a/src/server.h b/src/server.h index 76aaab88f..046e84b45 100644 --- a/src/server.h +++ b/src/server.h @@ -2029,6 +2029,7 @@ void xgroupCommand(client *c); void xackCommand(client *c); void xpendingCommand(client *c); void xclaimCommand(client *c); +void xinfoCommand(client *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --git a/src/t_stream.c b/src/t_stream.c index 3b7d022a1..7944ab398 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1858,3 +1858,54 @@ void xclaimCommand(client *c) { } /* XINFO <key> [CONSUMERS group|GROUPS|STREAM]. STREAM is the default */ +void xinfoCommand(client *c) { + const char *help[] = { +"<key> CONSUMERS <groupname> -- Show consumer groups of group <groupname>.", +"<key> GROUPS -- Show the stream consumer groups.", +"<key> STREAM -- Show information about the stream.", +"<key> HELP -- Prints this help.", +NULL + }; + stream *s = NULL; + char *opt = c->argc > 2 ? c->argv[2]->ptr : "STREAM"; /* Subcommand. */ + + /* Lookup the key now, this is common for all the subcommands but HELP. */ + if (c->argc >= 3 && strcasecmp(opt,"HELP")) { + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); + if (o == NULL) return; + s = o->ptr; + } + + /* Dispatch the different subcommands. */ + if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) { + streamCG *cg = streamLookupCG(s,c->argv[3]->ptr); + if (cg == NULL) { + addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " + "for key name '%s'", + c->argv[3]->ptr, c->argv[1]->ptr); + return; + } + + addReplyMultiBulkLen(c,raxSize(cg->consumers)); + raxIterator ri; + raxStart(&ri,cg->consumers); + raxSeek(&ri,"^",NULL,0); + mstime_t now = mstime(); + while(raxNext(&ri)) { + streamConsumer *consumer = ri.data; + mstime_t idle = now - consumer->seen_time; + if (idle < 0) idle = 0; + + addReplyMultiBulkLen(c,3); + addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); + addReplyLongLong(c,raxSize(consumer->pel)); + addReplyLongLong(c,idle); + } + raxStop(&ri); + } else if (!strcasecmp(opt,"HELP")) { + addReplyHelp(c, help); + } else { + addReply(c,shared.syntaxerr); + } +} + |