summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-01-29 18:23:26 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commitf3708af7f9ea6b3a097d2b33ee7c28624b753215 (patch)
tree83edf079ad11c929b8134a0581bfe947c7c8841d /src/t_stream.c
parent1bc31666daa7f59a8a3473e5a57a296351d4f985 (diff)
downloadredis-f3708af7f9ea6b3a097d2b33ee7c28624b753215.tar.gz
CG: XPENDING with start/stop/count variant implemented.
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c58
1 files changed, 55 insertions, 3 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 0ca1cea03..c659a7017 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1447,7 +1447,7 @@ void xackCommand(client *c) {
addReplyLongLong(c,acknowledged);
}
-/* XPENDING <key> <group> [<start> <stop> <max>] [<consumer>]
+/* XPENDING <key> <group> [<start> <stop> <count>] [<consumer>]
*
* If start and stop are omitted, the command just outputs information about
* the amount of pending messages for the key/group pair, together with
@@ -1462,6 +1462,8 @@ void xpendingCommand(client *c) {
robj *key = c->argv[1];
robj *groupname = c->argv[2];
robj *consumername = (c->argc == 7) ? c->argv[6] : NULL;
+ streamID startid, endid;
+ long long count;
/* Start and stop, and the consumer, can be omitted. */
if (c->argc != 3 && c->argc != 6 && c->argc != 7) {
@@ -1469,6 +1471,17 @@ void xpendingCommand(client *c) {
return;
}
+ /* Parse start/end/count arguments ASAP if needed, in order to report
+ * syntax errors before any other error. */
+ if (c->argc >= 6) {
+ if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR)
+ return;
+ if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR)
+ return;
+ if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR)
+ return;
+ }
+
/* Lookup the key and the group inside the stream. */
robj *o = lookupKeyRead(c->db,c->argv[1]);
streamCG *group;
@@ -1494,8 +1507,6 @@ void xpendingCommand(client *c) {
addReply(c,shared.nullbulk); /* End. */
addReply(c,shared.nullmultibulk); /* Clients. */
} else {
- streamID startid,endid;
-
/* Start. */
raxIterator ri;
raxStart(&ri,group->pel);
@@ -1530,6 +1541,47 @@ void xpendingCommand(client *c) {
}
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
else {
+ streamConsumer *consumer = consumername ?
+ streamLookupConsumer(group,consumername->ptr):
+ NULL;
+ rax *pel = consumer ? consumer->pel : group->pel;
+ unsigned char startkey[sizeof(streamID)];
+ unsigned char endkey[sizeof(streamID)];
+ raxIterator ri;
+ mstime_t now = mstime();
+
+ streamEncodeID(startkey,&startid);
+ streamEncodeID(endkey,&endid);
+ raxStart(&ri,pel);
+ raxSeek(&ri,">=",startkey,sizeof(startkey));
+ void *arraylen_ptr = addDeferredMultiBulkLength(c);
+ size_t arraylen = 0;
+
+ while(raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
+ streamNACK *nack = ri.data;
+
+ arraylen++;
+ addReplyMultiBulkLen(c,4);
+
+ /* Entry ID. */
+ streamID id;
+ streamDecodeID(ri.key,&id);
+ addReplyStreamID(c,&id);
+
+ /* Consumer name. */
+ addReplyBulkCBuffer(c,nack->consumer->name,
+ sdslen(nack->consumer->name));
+
+ /* Milliseconds elapsed since last delivery. */
+ mstime_t elapsed = now - nack->delivery_time;
+ if (elapsed < 0) elapsed = 0;
+ addReplyLongLong(c,elapsed);
+
+ /* Number of deliveries. */
+ addReplyLongLong(c,nack->delivery_count);
+ }
+ raxStop(&ri);
+ setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
}
}