From 388c69fe4efc9f686aaf26af189c6f3de6bae405 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 25 Jan 2018 16:39:49 +0100 Subject: CG: XACK implementation. --- src/server.c | 1 + src/server.h | 1 + src/t_stream.c | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/src/server.c b/src/server.c index aa8d17e6e..c39acfce2 100644 --- a/src/server.c +++ b/src/server.c @@ -309,6 +309,7 @@ struct redisCommand redisCommandTable[] = { {"xread",xreadCommand,-3,"rs",0,xreadGetKeys,1,1,1,0,0}, {"xreadgroup",xreadCommand,-3,"ws",0,xreadGetKeys,1,1,1,0,0}, {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0}, + {"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} diff --git a/src/server.h b/src/server.h index d4f989264..d2ef36983 100644 --- a/src/server.h +++ b/src/server.h @@ -2026,6 +2026,7 @@ void xrevrangeCommand(client *c); void xlenCommand(client *c); void xreadCommand(client *c); void xgroupCommand(client *c); +void xackCommand(client *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --git a/src/t_stream.c b/src/t_stream.c index 4cc1bb020..8de4dab67 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1393,12 +1393,62 @@ NULL } } -/* XPENDING [ ]. */ +/* XACK ... + * + * Acknowledge a message as processed. In practical terms we just check the + * pendine entries list (PEL) of the group, and delete the PEL entry both from + * the group and the consumer (pending messages are referenced in both places). + * + * Return value of the command is the number of messages successfully + * acknowledged, that is, the IDs we were actually able to resolve in the PEL. + */ +void xackCommand(client *c) { + streamCG *group; + robj *o = lookupKeyRead(c->db,c->argv[1]); + if (o) { + if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */ + group = streamLookupCG(o->ptr,c->argv[2]->ptr); + } + + /* No key or group? Nothing to ack. */ + if (o == NULL || group == NULL) { + addReply(c,shared.cone); + return; + } -/* XCLAIM ...*/ + int acknowledged = 0; + for (int j = 3; j < c->argc; j++) { + streamID id; + unsigned char buf[sizeof(streamID)]; + if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + streamEncodeID(buf,&id); + + /* Lookup the ID in the group PEL: it will have a reference to the + * NACK structure that will have a reference to the consumer, so that + * we are able to remove the entry from both PELs. */ + streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); + if (nack != raxNotFound) { + raxRemove(group->pel,buf,sizeof(buf),NULL); + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + streamFreeNACK(nack); + acknowledged++; + } + } + addReplyLongLong(c,acknowledged); +} + +/* XPENDING [ ] + * + * If start and stop are omitted, the command just outputs information about + * the amount of pending messages for the key/group pair, together with + * the minimum and maxium ID of pending messages. + * + * If start and stop are provided instead, the pending messages are returned + * with informations about the current owner, number of deliveries and last + * delivery time and so forth. */ -/* XACK */ +/* XCLAIM ...*/ /* XREAD-GROUP will be implemented by xreadGenericCommand() */ -/* XINFO [CONSUMERS|GROUPS|STREAM]. STREAM is the default */ +/* XINFO [CONSUMERS group|GROUPS|STREAM]. STREAM is the default */ -- cgit v1.2.1