summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-01-25 16:39:49 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commit388c69fe4efc9f686aaf26af189c6f3de6bae405 (patch)
tree2757412190a359b6e5990564763cc9539604cb87
parent5bbd117c290755d796f17be81fe22a4460ba9b80 (diff)
downloadredis-388c69fe4efc9f686aaf26af189c6f3de6bae405.tar.gz
CG: XACK implementation.
-rw-r--r--src/server.c1
-rw-r--r--src/server.h1
-rw-r--r--src/t_stream.c58
3 files changed, 56 insertions, 4 deletions
diff --git a/src/server.c b/src/server.c
index aa8d17e6e..c39acfce2 100644
--- a/src/server.c
+++ b/src/server.c
@@ -309,6 +309,7 @@ struct redisCommand redisCommandTable[] = {
{"xread",xreadCommand,-3,"rs",0,xreadGetKeys,1,1,1,0,0},
{"xreadgroup",xreadCommand,-3,"ws",0,xreadGetKeys,1,1,1,0,0},
{"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
+ {"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0},
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
diff --git a/src/server.h b/src/server.h
index d4f989264..d2ef36983 100644
--- a/src/server.h
+++ b/src/server.h
@@ -2026,6 +2026,7 @@ void xrevrangeCommand(client *c);
void xlenCommand(client *c);
void xreadCommand(client *c);
void xgroupCommand(client *c);
+void xackCommand(client *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
diff --git a/src/t_stream.c b/src/t_stream.c
index 4cc1bb020..8de4dab67 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1393,12 +1393,62 @@ NULL
}
}
-/* XPENDING <key> [<start> <stop>]. */
+/* XACK <key> <group> <id> <id> ... <id>
+ *
+ * Acknowledge a message as processed. In practical terms we just check the
+ * pendine entries list (PEL) of the group, and delete the PEL entry both from
+ * the group and the consumer (pending messages are referenced in both places).
+ *
+ * Return value of the command is the number of messages successfully
+ * acknowledged, that is, the IDs we were actually able to resolve in the PEL.
+ */
+void xackCommand(client *c) {
+ streamCG *group;
+ robj *o = lookupKeyRead(c->db,c->argv[1]);
+ if (o) {
+ if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
+ group = streamLookupCG(o->ptr,c->argv[2]->ptr);
+ }
+
+ /* No key or group? Nothing to ack. */
+ if (o == NULL || group == NULL) {
+ addReply(c,shared.cone);
+ return;
+ }
-/* XCLAIM <key> <group-name> <consumer-name> <min-idle-time> <ID-1> <ID-2> ...*/
+ int acknowledged = 0;
+ for (int j = 3; j < c->argc; j++) {
+ streamID id;
+ unsigned char buf[sizeof(streamID)];
+ if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
+ streamEncodeID(buf,&id);
+
+ /* Lookup the ID in the group PEL: it will have a reference to the
+ * NACK structure that will have a reference to the consumer, so that
+ * we are able to remove the entry from both PELs. */
+ streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
+ if (nack != raxNotFound) {
+ raxRemove(group->pel,buf,sizeof(buf),NULL);
+ raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
+ streamFreeNACK(nack);
+ acknowledged++;
+ }
+ }
+ addReplyLongLong(c,acknowledged);
+}
+
+/* XPENDING <key> <group> [<start> <stop>]
+ *
+ * If start and stop are omitted, the command just outputs information about
+ * the amount of pending messages for the key/group pair, together with
+ * the minimum and maxium ID of pending messages.
+ *
+ * If start and stop are provided instead, the pending messages are returned
+ * with informations about the current owner, number of deliveries and last
+ * delivery time and so forth. */
-/* XACK <stream-key> */
+/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ...*/
/* XREAD-GROUP will be implemented by xreadGenericCommand() */
-/* XINFO <key> [CONSUMERS|GROUPS|STREAM]. STREAM is the default */
+/* XINFO <key> [CONSUMERS group|GROUPS|STREAM]. STREAM is the default */