From d4f81ebdba7eadcb89b91377fb9091ef7e0513b7 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 2 Mar 2018 17:24:00 +0100 Subject: CG: XGROUP DELCONSUMER implemented. --- src/t_stream.c | 47 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index ed3544374..a2404244c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1351,6 +1351,33 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { return consumer; } +/* Delete the consumer specified in the consumer group 'cg'. The consumer + * may have pending messages: they are removed from the PEL, and the number + * of pending messages "lost" is returned. */ +uint64_t streamDelConsumer(streamCG *cg, sds name) { + streamConsumer *consumer = streamLookupConsumer(cg,name,0); + if (consumer == NULL) return 0; + + uint64_t retval = raxSize(consumer->pel); + + /* Iterate all the consumer pending messages, deleting every corresponding + * entry from the global entry. */ + raxIterator ri; + raxStart(&ri,consumer->pel); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + streamNACK *nack = ri.data; + raxRemove(cg->pel,ri.key,ri.key_len,NULL); + streamFreeNACK(nack); + } + raxStop(&ri); + + /* Deallocate the consumer. */ + raxRemove(cg->consumers,(unsigned char*)name,sdslen(name),NULL); + streamFreeConsumer(consumer); + return retval; +} + /* ----------------------------------------------------------------------- * Consumer groups commands * ----------------------------------------------------------------------- */ @@ -1370,6 +1397,8 @@ NULL }; stream *s = NULL; sds grpname = NULL; + streamCG *cg = NULL; + char *opt = c->argv[1]->ptr; /* Subcommand name. */ /* Lookup the key now, this is common for all the subcommands but HELP. */ if (c->argc >= 4) { @@ -1377,9 +1406,21 @@ NULL if (o == NULL) return; s = o->ptr; grpname = c->argv[3]->ptr; + + /* Certain subcommands require the group to exist. */ + if ((cg = streamLookupCG(s,grpname)) == NULL && + (!strcasecmp(opt,"SETID") || + !strcasecmp(opt,"DELGROUP") || + !strcasecmp(opt,"DELCONSUMER"))) + { + addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " + "for key name '%s'", + grpname, c->argv[2]->ptr); + return; + } } - char *opt = c->argv[1]->ptr; + /* Dispatch the different subcommands. */ if (!strcasecmp(opt,"CREATE") && c->argc == 5) { streamID id; if (!strcmp(c->argv[4]->ptr,"$")) { @@ -1397,6 +1438,10 @@ NULL } else if (!strcasecmp(opt,"SETID") && c->argc == 5) { } else if (!strcasecmp(opt,"DELGROUP") && c->argc == 4) { } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) { + /* Delete the consumer and returns the number of pending messages + * that were yet associated with such a consumer. */ + long long pending = streamDelConsumer(cg,c->argv[4]->ptr); + addReplyLongLong(c,pending); } else if (!strcasecmp(opt,"HELP")) { addReplyHelp(c, help); } else { -- cgit v1.2.1