summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-02-21 10:51:43 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commit8727b4845bc1b8eb5cf6e725aad3329d4b4f9fe5 (patch)
tree3130bdf459052c3da17b59aadfee695891e006b8
parent09e3b3b97588e9847fc016dbad489bbe8b46aa39 (diff)
downloadredis-8727b4845bc1b8eb5cf6e725aad3329d4b4f9fe5.tar.gz
CG: XCLAIM, use minidle and fix array len.
-rw-r--r--src/t_stream.c22
1 files changed, 18 insertions, 4 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index e4cafd6ae..c97bf7145 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1602,7 +1602,8 @@ void xpendingCommand(client *c) {
*
* If the message ID (among the specified ones) exists, and its idle
* time greater or equal to <min-idle-time>, then the message new owner
- * becomes the specified <consumer>.
+ * becomes the specified <consumer>. If the minimum idle time specified
+ * is zero, messages are claimed regardless of their idle time.
*
* All the messages that cannot be found inside the pending entires list
* are ignored, but in case the FORCE option is used. In that case we
@@ -1688,7 +1689,7 @@ void xclaimCommand(client *c) {
/* If we stopped because some IDs cannot be parsed, perhaps they
* are trailing options. */
- time_t now = 0;
+ time_t now = mstime();
for (; j < c->argc; j++) {
int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
char *opt = c->argv[j]->ptr;
@@ -1701,7 +1702,6 @@ void xclaimCommand(client *c) {
if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
"Invalid IDLE option argument for XCLAIM")
!= C_OK) return;
- now = mstime();
deliverytime = now - deliverytime;
} else if (!strcasecmp(opt,"TIME") && moreargs) {
j++;
@@ -1720,7 +1720,14 @@ void xclaimCommand(client *c) {
}
if (deliverytime != -1) {
- now = (now == 0) ? mstime() : now;
+ /* If a delivery time was passed, either with IDLE or TIME, we
+ * do some sanity check on it, and set the deliverytime to now
+ * (which is a sane choice usually) if the value is bogus.
+ *
+ * We could raise an error here, but it's not a sensible choice
+ * because the client may use it's local clock to compute the
+ * time, and in case of desynchronizations to fail is not a good
+ * idea most of the times. */
if (deliverytime < 0 || deliverytime > now) deliverytime = now;
}
@@ -1737,6 +1744,12 @@ void xclaimCommand(client *c) {
/* Lookup the ID in the group PEL. */
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
if (nack != raxNotFound) {
+ /* We need to check if the minimum idle time requested
+ * by the caller is satisfied by this entry. */
+ if (minidle) {
+ mstime_t this_idle = now - nack->delivery_time;
+ if (this_idle < minidle) continue;
+ }
/* Remove the entry from the old consumer. */
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer. */
@@ -1750,6 +1763,7 @@ void xclaimCommand(client *c) {
streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES);
}
+ arraylen++;
}
}
setDeferredMultiBulkLength(c,arraylenptr,arraylen);