summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-03-02 17:24:00 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commitd4f81ebdba7eadcb89b91377fb9091ef7e0513b7 (patch)
tree82c35e24b70a788a890abdefa88e9549b8ebf772
parent9b423ae2378b9234f100f5c1617ba18fba9db1a3 (diff)
downloadredis-d4f81ebdba7eadcb89b91377fb9091ef7e0513b7.tar.gz
CG: XGROUP DELCONSUMER implemented.
-rw-r--r--src/t_stream.c47
1 files changed, 46 insertions, 1 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index ed3544374..a2404244c 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1351,6 +1351,33 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
return consumer;
}
+/* Delete the consumer specified in the consumer group 'cg'. The consumer
+ * may have pending messages: they are removed from the PEL, and the number
+ * of pending messages "lost" is returned. */
+uint64_t streamDelConsumer(streamCG *cg, sds name) {
+ streamConsumer *consumer = streamLookupConsumer(cg,name,0);
+ if (consumer == NULL) return 0;
+
+ uint64_t retval = raxSize(consumer->pel);
+
+ /* Iterate all the consumer pending messages, deleting every corresponding
+ * entry from the global entry. */
+ raxIterator ri;
+ raxStart(&ri,consumer->pel);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ streamNACK *nack = ri.data;
+ raxRemove(cg->pel,ri.key,ri.key_len,NULL);
+ streamFreeNACK(nack);
+ }
+ raxStop(&ri);
+
+ /* Deallocate the consumer. */
+ raxRemove(cg->consumers,(unsigned char*)name,sdslen(name),NULL);
+ streamFreeConsumer(consumer);
+ return retval;
+}
+
/* -----------------------------------------------------------------------
* Consumer groups commands
* ----------------------------------------------------------------------- */
@@ -1370,6 +1397,8 @@ NULL
};
stream *s = NULL;
sds grpname = NULL;
+ streamCG *cg = NULL;
+ char *opt = c->argv[1]->ptr; /* Subcommand name. */
/* Lookup the key now, this is common for all the subcommands but HELP. */
if (c->argc >= 4) {
@@ -1377,9 +1406,21 @@ NULL
if (o == NULL) return;
s = o->ptr;
grpname = c->argv[3]->ptr;
+
+ /* Certain subcommands require the group to exist. */
+ if ((cg = streamLookupCG(s,grpname)) == NULL &&
+ (!strcasecmp(opt,"SETID") ||
+ !strcasecmp(opt,"DELGROUP") ||
+ !strcasecmp(opt,"DELCONSUMER")))
+ {
+ addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
+ "for key name '%s'",
+ grpname, c->argv[2]->ptr);
+ return;
+ }
}
- char *opt = c->argv[1]->ptr;
+ /* Dispatch the different subcommands. */
if (!strcasecmp(opt,"CREATE") && c->argc == 5) {
streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) {
@@ -1397,6 +1438,10 @@ NULL
} else if (!strcasecmp(opt,"SETID") && c->argc == 5) {
} else if (!strcasecmp(opt,"DELGROUP") && c->argc == 4) {
} else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
+ /* Delete the consumer and returns the number of pending messages
+ * that were yet associated with such a consumer. */
+ long long pending = streamDelConsumer(cg,c->argv[4]->ptr);
+ addReplyLongLong(c,pending);
} else if (!strcasecmp(opt,"HELP")) {
addReplyHelp(c, help);
} else {