summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorguybe7 <guy.benoish@redislabs.com>2021-01-06 10:34:27 +0200
committerGitHub <noreply@github.com>2021-01-06 10:34:27 +0200
commit714e103ac317bfa179b0a132c0f78d4ddc84a435 (patch)
treefbeabf367b4aadc98398cf151a6945034baae290
parent595ecd5f4be39eeec71fb07f687b2d6b7cf5c20c (diff)
downloadredis-714e103ac317bfa179b0a132c0f78d4ddc84a435.tar.gz
Add XAUTOCLAIM (#7973)
New command: XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID] The purpose is to claim entries from a stale consumer without the usual XPENDING+XCLAIM combo which takes two round trips. The syntax for XAUTOCLAIM is similar to scan: A cursor is returned (streamID) by each call and should be used as start for the next call. 0-0 means the scan is complete. This PR extends the deferred reply mechanism for any bulk string (not just counts) This PR carries some unrelated test code changes: - Renames the term "client" into "consumer" in the stream-cgroups test - And also changes DEBUG SLEEP into "after" Co-authored-by: Oran Agra <oran@redislabs.com>
-rw-r--r--src/networking.c44
-rw-r--r--src/server.c4
-rw-r--r--src/server.h2
-rw-r--r--src/t_stream.c159
-rw-r--r--tests/unit/type/stream-cgroups.tcl197
5 files changed, 347 insertions, 59 deletions
diff --git a/src/networking.c b/src/networking.c
index 77b25d0b7..29e5605d9 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -559,13 +559,9 @@ void *addReplyDeferredLen(client *c) {
return listLast(c->reply);
}
-/* Populate the length object and try gluing it to the next chunk. */
-void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
- serverAssert(length >= 0);
+void setDeferredReply(client *c, void *node, const char *s, size_t length) {
listNode *ln = (listNode*)node;
clientReplyBlock *next;
- char lenstr[128];
- size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
/* Abort when *node is NULL: when the client should not accept writes
* we return NULL in addReplyDeferredLen() */
@@ -583,25 +579,39 @@ void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
* - It has enough room already allocated
* - And not too large (avoid large memmove) */
if (ln->next != NULL && (next = listNodeValue(ln->next)) &&
- next->size - next->used >= lenstr_len &&
- next->used < PROTO_REPLY_CHUNK_BYTES * 4) {
- memmove(next->buf + lenstr_len, next->buf, next->used);
- memcpy(next->buf, lenstr, lenstr_len);
- next->used += lenstr_len;
+ next->size - next->used >= length &&
+ next->used < PROTO_REPLY_CHUNK_BYTES * 4)
+ {
+ memmove(next->buf + length, next->buf, next->used);
+ memcpy(next->buf, s, length);
+ next->used += length;
listDelNode(c->reply,ln);
} else {
/* Create a new node */
- clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock));
+ clientReplyBlock *buf = zmalloc(length + sizeof(clientReplyBlock));
/* Take over the allocation's internal fragmentation */
buf->size = zmalloc_usable_size(buf) - sizeof(clientReplyBlock);
- buf->used = lenstr_len;
- memcpy(buf->buf, lenstr, lenstr_len);
+ buf->used = length;
+ memcpy(buf->buf, s, length);
listNodeValue(ln) = buf;
c->reply_bytes += buf->size;
}
asyncCloseClientOnOutputBufferLimitReached(c);
}
+/* Populate the length object and try gluing it to the next chunk. */
+void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
+ serverAssert(length >= 0);
+
+ /* Abort when *node is NULL: when the client should not accept writes
+ * we return NULL in addReplyDeferredLen() */
+ if (node == NULL) return;
+
+ char lenstr[128];
+ size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
+ setDeferredReply(c, node, lenstr, lenstr_len);
+}
+
void setDeferredArrayLen(client *c, void *node, long length) {
setDeferredAggregateLen(c,node,length,'*');
}
@@ -797,6 +807,14 @@ void addReplyBulkSds(client *c, sds s) {
addReply(c,shared.crlf);
}
+/* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */
+void setDeferredReplyBulkSds(client *c, void *node, sds s) {
+ sds reply = sdscatprintf(sdsempty(), "$%d\r\n%s\r\n", (unsigned)sdslen(s), s);
+ setDeferredReply(c, node, reply, sdslen(reply));
+ sdsfree(reply);
+ sdsfree(s);
+}
+
/* Add a C null term string as bulk reply */
void addReplyBulkCString(client *c, const char *s) {
if (s == NULL) {
diff --git a/src/server.c b/src/server.c
index 56f7484b1..dc5d52af8 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1027,6 +1027,10 @@ struct redisCommand redisCommandTable[] = {
"write random fast @stream",
0,NULL,1,1,1,0,0,0},
+ {"xautoclaim",xautoclaimCommand,-6,
+ "write random fast @stream",
+ 0,NULL,1,1,1,0,0,0},
+
{"xinfo",xinfoCommand,-2,
"read-only random @stream",
0,NULL,2,2,1,0,0,0},
diff --git a/src/server.h b/src/server.h
index d79bf5b23..8b98f803d 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1759,6 +1759,7 @@ void addReplyBulkLongLong(client *c, long long ll);
void addReply(client *c, robj *obj);
void addReplySds(client *c, sds s);
void addReplyBulkSds(client *c, sds s);
+void setDeferredReplyBulkSds(client *c, void *node, sds s);
void addReplyErrorObject(client *c, robj *err);
void addReplyErrorSds(client *c, sds err);
void addReplyError(client *c, const char *err);
@@ -2584,6 +2585,7 @@ void xsetidCommand(client *c);
void xackCommand(client *c);
void xpendingCommand(client *c);
void xclaimCommand(client *c);
+void xautoclaimCommand(client *c);
void xinfoCommand(client *c);
void xdelCommand(client *c);
void xtrimCommand(client *c);
diff --git a/src/t_stream.c b/src/t_stream.c
index 46918bae8..d085a8948 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -960,6 +960,11 @@ void addReplyStreamID(client *c, streamID *id) {
addReplyBulkSds(c,replyid);
}
+void setDeferredReplyStreamID(client *c, void *dr, streamID *id) {
+ sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
+ setDeferredReplyBulkSds(c, dr, replyid);
+}
+
/* Similar to the above function, but just creates an object, usually useful
* for replication purposes to create arguments. */
robj *createObjectFromStreamID(streamID *id) {
@@ -2666,6 +2671,160 @@ cleanup:
if (ids != static_ids) zfree(ids);
}
+/* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID]
+ *
+ * Gets ownership of one or multiple messages in the Pending Entries List
+ * of a given stream consumer group.
+ *
+ * For each PEL entry, if its idle time greater or equal to <min-idle-time>,
+ * then the message new owner becomes the specified <consumer>.
+ * If the minimum idle time specified is zero, messages are claimed
+ * regardless of their idle time.
+ *
+ * This command creates the consumer as side effect if it does not yet
+ * exists. Moreover the command reset the idle time of the message to 0.
+ *
+ * The command returns an array of messages that the user
+ * successfully claimed, so that the caller is able to understand
+ * what messages it is now in charge of. */
+void xautoclaimCommand(client *c) {
+ streamCG *group = NULL;
+ robj *o = lookupKeyRead(c->db,c->argv[1]);
+ long long minidle; /* Minimum idle time argument, in milliseconds. */
+ long count = 100; /* Maximum entries to claim. */
+ streamID startid;
+ int startex;
+ int justid = 0;
+
+ /* Parse idle/start/end/count arguments ASAP if needed, in order to report
+ * syntax errors before any other error. */
+ if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK)
+ return;
+ if (minidle < 0) minidle = 0;
+
+ if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK)
+ return;
+ if (startex && streamIncrID(&startid) != C_OK) {
+ addReplyError(c,"invalid start ID for the interval");
+ return;
+ }
+
+ int j = 6; /* options start at argv[6] */
+ while(j < c->argc) {
+ int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
+ char *opt = c->argv[j]->ptr;
+ if (!strcasecmp(opt,"COUNT") && moreargs) {
+ if (getPositiveLongFromObjectOrReply(c,c->argv[j+1],&count,NULL) != C_OK)
+ return;
+ if (count == 0) {
+ addReplyError(c,"COUNT must be > 0");
+ return;
+ }
+ j++;
+ } else if (!strcasecmp(opt,"JUSTID")) {
+ justid = 1;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ j++;
+ }
+
+ if (o) {
+ if (checkType(c,o,OBJ_STREAM))
+ return; /* Type error. */
+ group = streamLookupCG(o->ptr,c->argv[2]->ptr);
+ }
+
+ /* No key or group? Send an error given that the group creation
+ * is mandatory. */
+ if (o == NULL || group == NULL) {
+ addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'",
+ (char*)c->argv[1]->ptr,
+ (char*)c->argv[2]->ptr);
+ return;
+ }
+
+ /* Do the actual claiming. */
+ streamConsumer *consumer = NULL;
+ long long attempts = count*10;
+
+ addReplyArrayLen(c, 2);
+ void *endidptr = addReplyDeferredLen(c);
+ void *arraylenptr = addReplyDeferredLen(c);
+
+ unsigned char startkey[sizeof(streamID)];
+ streamEncodeID(startkey,&startid);
+ raxIterator ri;
+ raxStart(&ri,group->pel);
+ raxSeek(&ri,">=",startkey,sizeof(startkey));
+ size_t arraylen = 0;
+ mstime_t now = mstime();
+ while (attempts-- && count && raxNext(&ri)) {
+ streamNACK *nack = ri.data;
+
+ if (minidle) {
+ mstime_t this_idle = now - nack->delivery_time;
+ if (this_idle < minidle)
+ continue;
+ }
+
+ streamID id;
+ streamDecodeID(ri.key, &id);
+
+ if (consumer == NULL)
+ consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE,NULL);
+ if (nack->consumer != consumer) {
+ /* Remove the entry from the old consumer.
+ * Note that nack->consumer is NULL if we created the
+ * NACK above because of the FORCE option. */
+ if (nack->consumer)
+ raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
+ }
+
+ /* Update the consumer and idle time. */
+ nack->delivery_time = now;
+ nack->delivery_count++;
+
+ if (nack->consumer != consumer) {
+ /* Add the entry in the new consumer local PEL. */
+ raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL);
+ nack->consumer = consumer;
+ }
+
+ /* Send the reply for this entry. */
+ if (justid) {
+ addReplyStreamID(c,&id);
+ } else {
+ size_t emitted =
+ streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,
+ STREAM_RWR_RAWENTRIES,NULL);
+ if (!emitted)
+ addReplyNull(c);
+ }
+ arraylen++;
+ count--;
+
+ /* Propagate this change. */
+ robj *idstr = createObjectFromStreamID(&id);
+ streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
+ decrRefCount(idstr);
+ server.dirty++;
+ }
+
+ streamID endid;
+ if (raxEOF(&ri)) {
+ endid.ms = endid.seq = 0;
+ } else {
+ streamDecodeID(ri.key, &endid);
+ }
+ raxStop(&ri);
+
+ setDeferredArrayLen(c,arraylenptr,arraylen);
+ setDeferredReplyStreamID(c,endidptr,&endid);
+
+ preventCommandPropagation(c);
+}
/* XDEL <key> [<ID1> <ID2> ... <IDN>]
*
diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl
index 91dc2245e..f8de0741d 100644
--- a/tests/unit/type/stream-cgroups.tcl
+++ b/tests/unit/type/stream-cgroups.tcl
@@ -27,7 +27,7 @@ start_server {
# and not the element "foo bar" which was pre existing in the
# stream (see previous test)
set reply [
- r XREADGROUP GROUP mygroup client-1 STREAMS mystream ">"
+ r XREADGROUP GROUP mygroup consumer-1 STREAMS mystream ">"
]
assert {[llength [lindex $reply 0 1]] == 2}
lindex $reply 0 1 0 1
@@ -39,13 +39,13 @@ start_server {
r XADD mystream * d 4
# Read a few elements using a different consumer name
set reply [
- r XREADGROUP GROUP mygroup client-2 STREAMS mystream ">"
+ r XREADGROUP GROUP mygroup consumer-2 STREAMS mystream ">"
]
assert {[llength [lindex $reply 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {c 3}}
- set r1 [r XREADGROUP GROUP mygroup client-1 COUNT 10 STREAMS mystream 0]
- set r2 [r XREADGROUP GROUP mygroup client-2 COUNT 10 STREAMS mystream 0]
+ set r1 [r XREADGROUP GROUP mygroup consumer-1 COUNT 10 STREAMS mystream 0]
+ set r2 [r XREADGROUP GROUP mygroup consumer-2 COUNT 10 STREAMS mystream 0]
assert {[lindex $r1 0 1 0 1] eq {a 1}}
assert {[lindex $r2 0 1 0 1] eq {c 3}}
}
@@ -56,9 +56,9 @@ start_server {
for {set j 0} {$j < 4} {incr j} {
set item [lindex $pending $j]
if {$j < 2} {
- set owner client-1
+ set owner consumer-1
} else {
- set owner client-2
+ set owner consumer-2
}
assert {[lindex $item 1] eq $owner}
assert {[lindex $item 1] eq $owner}
@@ -66,7 +66,7 @@ start_server {
}
test {XPENDING can return single consumer items} {
- set pending [r XPENDING mystream mygroup - + 10 client-1]
+ set pending [r XPENDING mystream mygroup - + 10 consumer-1]
assert {[llength $pending] == 2}
}
@@ -77,9 +77,9 @@ start_server {
test {XPENDING with IDLE} {
after 20
- set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 client-1]
+ set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 consumer-1]
assert {[llength $pending] == 0}
- set pending [r XPENDING mystream mygroup IDLE 1 - + 10 client-1]
+ set pending [r XPENDING mystream mygroup IDLE 1 - + 10 consumer-1]
assert {[llength $pending] == 2}
set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10]
assert {[llength $pending] == 0}
@@ -101,12 +101,12 @@ start_server {
}
}
- test {XACK is able to remove items from the client/group PEL} {
- set pending [r XPENDING mystream mygroup - + 10 client-1]
+ test {XACK is able to remove items from the consumer/group PEL} {
+ set pending [r XPENDING mystream mygroup - + 10 consumer-1]
set id1 [lindex $pending 0 0]
set id2 [lindex $pending 1 0]
assert {[r XACK mystream mygroup $id1] eq 1}
- set pending [r XPENDING mystream mygroup - + 10 client-1]
+ set pending [r XPENDING mystream mygroup - + 10 consumer-1]
assert {[llength $pending] == 1}
set id [lindex $pending 0 0]
assert {$id eq $id2}
@@ -242,52 +242,52 @@ start_server {
set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0
- # Client 1 reads item 1 from the stream without acknowledgements.
- # Client 2 then claims pending item 1 from the PEL of client 1
+ # Consumer 1 reads item 1 from the stream without acknowledgements.
+ # Consumer 2 then claims pending item 1 from the PEL of consumer 1
set reply [
- r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
+ r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >
]
assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}}
# make sure the entry is present in both the gorup, and the right consumer
assert {[llength [r XPENDING mystream mygroup - + 10]] == 1}
- assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 1}
- assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 0}
+ assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 1}
+ assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 0}
- r debug sleep 0.2
+ after 200
set reply [
- r XCLAIM mystream mygroup client2 10 $id1
+ r XCLAIM mystream mygroup consumer2 10 $id1
]
assert {[llength [lindex $reply 0 1]] == 2}
assert {[lindex $reply 0 1] eq {a 1}}
# make sure the entry is present in both the gorup, and the right consumer
assert {[llength [r XPENDING mystream mygroup - + 10]] == 1}
- assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 0}
- assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 1}
+ assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 0}
+ assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 1}
- # Client 1 reads another 2 items from stream
- r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream >
- r debug sleep 0.2
+ # Consumer 1 reads another 2 items from stream
+ r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream >
+ after 200
- # Delete item 2 from the stream. Now client 1 has PEL that contains
- # only item 3. Try to use client 2 to claim the deleted item 2
- # from the PEL of client 1, this should return nil
+ # Delete item 2 from the stream. Now consumer 1 has PEL that contains
+ # only item 3. Try to use consumer 2 to claim the deleted item 2
+ # from the PEL of consumer 1, this should return nil
r XDEL mystream $id2
set reply [
- r XCLAIM mystream mygroup client2 10 $id2
+ r XCLAIM mystream mygroup consumer2 10 $id2
]
assert {[llength $reply] == 1}
assert_equal "" [lindex $reply 0]
- # Delete item 3 from the stream. Now client 1 has PEL that is empty.
- # Try to use client 2 to claim the deleted item 3 from the PEL
- # of client 1, this should return nil
- r debug sleep 0.2
+ # Delete item 3 from the stream. Now consumer 1 has PEL that is empty.
+ # Try to use consumer 2 to claim the deleted item 3 from the PEL
+ # of consumer 1, this should return nil
+ after 200
r XDEL mystream $id3
set reply [
- r XCLAIM mystream mygroup client2 10 $id3
+ r XCLAIM mystream mygroup consumer2 10 $id3
]
assert {[llength $reply] == 1}
assert_equal "" [lindex $reply 0]
@@ -301,16 +301,16 @@ start_server {
set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0
- # Client 1 reads item 1 from the stream without acknowledgements.
- # Client 2 then claims pending item 1 from the PEL of client 1
+ # Consumer 1 reads item 1 from the stream without acknowledgements.
+ # Consumer 2 then claims pending item 1 from the PEL of consumer 1
set reply [
- r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
+ r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >
]
assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}}
- r debug sleep 0.2
+ after 200
set reply [
- r XCLAIM mystream mygroup client2 10 $id1
+ r XCLAIM mystream mygroup consumer2 10 $id1
]
assert {[llength [lindex $reply 0 1]] == 2}
assert {[lindex $reply 0 1] eq {a 1}}
@@ -321,10 +321,10 @@ start_server {
assert {[llength [lindex $reply 0]] == 4}
assert {[lindex $reply 0 3] == 2}
- # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID
- r debug sleep 0.2
+ # Consumer 3 then claims pending item 1 from the PEL of consumer 2 using JUSTID
+ after 200
set reply [
- r XCLAIM mystream mygroup client3 10 $id1 JUSTID
+ r XCLAIM mystream mygroup consumer3 10 $id1 JUSTID
]
assert {[llength $reply] == 1}
assert {[lindex $reply 0] eq $id1}
@@ -344,17 +344,122 @@ start_server {
set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0
- set reply [r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >]
+ set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >]
assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}}
- r debug sleep 0.2
+ after 200
# re-claim with the same consumer that already has it
- assert {[llength [r XCLAIM mystream mygroup client1 10 $id1]] == 1}
+ assert {[llength [r XCLAIM mystream mygroup consumer1 10 $id1]] == 1}
# make sure the entry is still in the PEL
set reply [r XPENDING mystream mygroup - + 10]
assert {[llength $reply] == 1}
- assert {[lindex $reply 0 1] eq {client1}}
+ assert {[lindex $reply 0 1] eq {consumer1}}
+ }
+
+ test {XAUTOCLAIM can claim PEL items from another consumer} {
+ # Add 3 items into the stream, and create a consumer group
+ r del mystream
+ set id1 [r XADD mystream * a 1]
+ set id2 [r XADD mystream * b 2]
+ set id3 [r XADD mystream * c 3]
+ r XGROUP CREATE mystream mygroup 0
+
+ # Consumer 1 reads item 1 from the stream without acknowledgements.
+ # Consumer 2 then claims pending item 1 from the PEL of consumer 1
+ set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >]
+ assert_equal [llength [lindex $reply 0 1 0 1]] 2
+ assert_equal [lindex $reply 0 1 0 1] {a 1}
+ after 200
+ set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 1]
+ assert_equal [llength $reply] 2
+ assert_equal [lindex $reply 0] $id1
+ assert_equal [llength [lindex $reply 1]] 1
+ assert_equal [llength [lindex $reply 1 0]] 2
+ assert_equal [llength [lindex $reply 1 0 1]] 2
+ assert_equal [lindex $reply 1 0 1] {a 1}
+
+ # Consumer 1 reads another 2 items from stream
+ r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream >
+
+ # For min-idle-time
+ after 200
+
+ # Delete item 2 from the stream. Now consumer 1 has PEL that contains
+ # only item 3. Try to use consumer 2 to claim the deleted item 2
+ # from the PEL of consumer 1, this should return nil
+ r XDEL mystream $id2
+ set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2]
+ # id1 is self-claimed here but not id2 ('count' was set to 2)
+ assert_equal [llength $reply] 2
+ assert_equal [lindex $reply 0] $id2
+ assert_equal [llength [lindex $reply 1]] 2
+ assert_equal [llength [lindex $reply 1 0]] 2
+ assert_equal [llength [lindex $reply 1 0 1]] 2
+ assert_equal [lindex $reply 1 0 1] {a 1}
+ assert_equal [lindex $reply 1 1] ""
+
+ # Delete item 3 from the stream. Now consumer 1 has PEL that is empty.
+ # Try to use consumer 2 to claim the deleted item 3 from the PEL
+ # of consumer 1, this should return nil
+ after 200
+ r XDEL mystream $id3
+ set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - JUSTID]
+ # id1 is self-claimed here but not id2 and id3 ('count' is default 100)
+
+ # we also test the JUSTID modifier here. note that, when using JUSTID,
+ # deleted entries are returned in reply (consistent with XCLAIM).
+
+ assert_equal [llength $reply] 2
+ assert_equal [lindex $reply 0] "0-0"
+ assert_equal [llength [lindex $reply 1]] 3
+ assert_equal [lindex $reply 1 0] $id1
+ assert_equal [lindex $reply 1 1] $id2
+ assert_equal [lindex $reply 1 2] $id3
+ }
+
+ test {XAUTOCLAIM as an iterator} {
+ # Add 5 items into the stream, and create a consumer group
+ r del mystream
+ set id1 [r XADD mystream * a 1]
+ set id2 [r XADD mystream * b 2]
+ set id3 [r XADD mystream * c 3]
+ set id4 [r XADD mystream * d 4]
+ set id5 [r XADD mystream * e 5]
+ r XGROUP CREATE mystream mygroup 0
+
+ # Read 5 messages into consumer1
+ r XREADGROUP GROUP mygroup consumer1 count 90 STREAMS mystream >
+
+ # For min-idle-time
+ after 200
+
+ # Claim 2 entries
+ set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2]
+ assert_equal [llength $reply] 2
+ set cursor [lindex $reply 0]
+ assert_equal $cursor $id2
+ assert_equal [llength [lindex $reply 1]] 2
+ assert_equal [llength [lindex $reply 1 0 1]] 2
+ assert_equal [lindex $reply 1 0 1] {a 1}
+
+ # Claim 2 more entries
+ set reply [r XAUTOCLAIM mystream mygroup consumer2 10 ($cursor COUNT 2]
+ assert_equal [llength $reply] 2
+ set cursor [lindex $reply 0]
+ assert_equal $cursor $id4
+ assert_equal [llength [lindex $reply 1]] 2
+ assert_equal [llength [lindex $reply 1 0 1]] 2
+ assert_equal [lindex $reply 1 0 1] {c 3}
+
+ # Claim last entry
+ set reply [r XAUTOCLAIM mystream mygroup consumer2 10 ($cursor COUNT 2]
+ assert_equal [llength $reply] 2
+ set cursor [lindex $reply 0]
+ assert_equal $cursor {0-0}
+ assert_equal [llength [lindex $reply 1]] 1
+ assert_equal [llength [lindex $reply 1 0 1]] 2
+ assert_equal [lindex $reply 1 0 1] {e 5}
}
test {XINFO FULL output} {
@@ -477,7 +582,7 @@ start_server {
assert {$curr_grpinfo == $grpinfo}
set n_consumers [lindex $grpinfo 3]
- # Bob should be created only when there will be new data for this client
+ # Bob should be created only when there will be new data for this consumer
assert_equal $n_consumers 2
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]