diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/server.c | 1 | ||||
-rw-r--r-- | src/server.h | 1 | ||||
-rw-r--r-- | src/t_stream.c | 58 |
3 files changed, 56 insertions, 4 deletions
diff --git a/src/server.c b/src/server.c index aa8d17e6e..c39acfce2 100644 --- a/src/server.c +++ b/src/server.c @@ -309,6 +309,7 @@ struct redisCommand redisCommandTable[] = { {"xread",xreadCommand,-3,"rs",0,xreadGetKeys,1,1,1,0,0}, {"xreadgroup",xreadCommand,-3,"ws",0,xreadGetKeys,1,1,1,0,0}, {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0}, + {"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} diff --git a/src/server.h b/src/server.h index d4f989264..d2ef36983 100644 --- a/src/server.h +++ b/src/server.h @@ -2026,6 +2026,7 @@ void xrevrangeCommand(client *c); void xlenCommand(client *c); void xreadCommand(client *c); void xgroupCommand(client *c); +void xackCommand(client *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --git a/src/t_stream.c b/src/t_stream.c index 4cc1bb020..8de4dab67 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1393,12 +1393,62 @@ NULL } } -/* XPENDING <key> [<start> <stop>]. */ +/* XACK <key> <group> <id> <id> ... <id> + * + * Acknowledge a message as processed. In practical terms we just check the + * pendine entries list (PEL) of the group, and delete the PEL entry both from + * the group and the consumer (pending messages are referenced in both places). + * + * Return value of the command is the number of messages successfully + * acknowledged, that is, the IDs we were actually able to resolve in the PEL. + */ +void xackCommand(client *c) { + streamCG *group; + robj *o = lookupKeyRead(c->db,c->argv[1]); + if (o) { + if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */ + group = streamLookupCG(o->ptr,c->argv[2]->ptr); + } + + /* No key or group? Nothing to ack. */ + if (o == NULL || group == NULL) { + addReply(c,shared.cone); + return; + } -/* XCLAIM <key> <group-name> <consumer-name> <min-idle-time> <ID-1> <ID-2> ...*/ + int acknowledged = 0; + for (int j = 3; j < c->argc; j++) { + streamID id; + unsigned char buf[sizeof(streamID)]; + if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + streamEncodeID(buf,&id); + + /* Lookup the ID in the group PEL: it will have a reference to the + * NACK structure that will have a reference to the consumer, so that + * we are able to remove the entry from both PELs. */ + streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); + if (nack != raxNotFound) { + raxRemove(group->pel,buf,sizeof(buf),NULL); + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + streamFreeNACK(nack); + acknowledged++; + } + } + addReplyLongLong(c,acknowledged); +} + +/* XPENDING <key> <group> [<start> <stop>] + * + * If start and stop are omitted, the command just outputs information about + * the amount of pending messages for the key/group pair, together with + * the minimum and maxium ID of pending messages. + * + * If start and stop are provided instead, the pending messages are returned + * with informations about the current owner, number of deliveries and last + * delivery time and so forth. */ -/* XACK <stream-key> */ +/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ...*/ /* XREAD-GROUP will be implemented by xreadGenericCommand() */ -/* XINFO <key> [CONSUMERS|GROUPS|STREAM]. STREAM is the default */ +/* XINFO <key> [CONSUMERS group|GROUPS|STREAM]. STREAM is the default */ |