diff options
author | antirez <antirez@gmail.com> | 2018-02-21 10:51:43 +0100 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2018-03-15 12:54:10 +0100 |
commit | 8727b4845bc1b8eb5cf6e725aad3329d4b4f9fe5 (patch) | |
tree | 3130bdf459052c3da17b59aadfee695891e006b8 | |
parent | 09e3b3b97588e9847fc016dbad489bbe8b46aa39 (diff) | |
download | redis-8727b4845bc1b8eb5cf6e725aad3329d4b4f9fe5.tar.gz |
CG: XCLAIM, use minidle and fix array len.
-rw-r--r-- | src/t_stream.c | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index e4cafd6ae..c97bf7145 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1602,7 +1602,8 @@ void xpendingCommand(client *c) { * * If the message ID (among the specified ones) exists, and its idle * time greater or equal to <min-idle-time>, then the message new owner - * becomes the specified <consumer>. + * becomes the specified <consumer>. If the minimum idle time specified + * is zero, messages are claimed regardless of their idle time. * * All the messages that cannot be found inside the pending entires list * are ignored, but in case the FORCE option is used. In that case we @@ -1688,7 +1689,7 @@ void xclaimCommand(client *c) { /* If we stopped because some IDs cannot be parsed, perhaps they * are trailing options. */ - time_t now = 0; + time_t now = mstime(); for (; j < c->argc; j++) { int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ char *opt = c->argv[j]->ptr; @@ -1701,7 +1702,6 @@ void xclaimCommand(client *c) { if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, "Invalid IDLE option argument for XCLAIM") != C_OK) return; - now = mstime(); deliverytime = now - deliverytime; } else if (!strcasecmp(opt,"TIME") && moreargs) { j++; @@ -1720,7 +1720,14 @@ void xclaimCommand(client *c) { } if (deliverytime != -1) { - now = (now == 0) ? mstime() : now; + /* If a delivery time was passed, either with IDLE or TIME, we + * do some sanity check on it, and set the deliverytime to now + * (which is a sane choice usually) if the value is bogus. + * + * We could raise an error here, but it's not a sensible choice + * because the client may use it's local clock to compute the + * time, and in case of desynchronizations to fail is not a good + * idea most of the times. */ if (deliverytime < 0 || deliverytime > now) deliverytime = now; } @@ -1737,6 +1744,12 @@ void xclaimCommand(client *c) { /* Lookup the ID in the group PEL. */ streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); if (nack != raxNotFound) { + /* We need to check if the minimum idle time requested + * by the caller is satisfied by this entry. */ + if (minidle) { + mstime_t this_idle = now - nack->delivery_time; + if (this_idle < minidle) continue; + } /* Remove the entry from the old consumer. */ raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); /* Update the consumer. */ @@ -1750,6 +1763,7 @@ void xclaimCommand(client *c) { streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL, STREAM_RWR_RAWENTRIES); } + arraylen++; } } setDeferredMultiBulkLength(c,arraylenptr,arraylen); |