diff options
author | guybe7 <guy.benoish@redislabs.com> | 2021-01-06 10:34:27 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-06 10:34:27 +0200 |
commit | 714e103ac317bfa179b0a132c0f78d4ddc84a435 (patch) | |
tree | fbeabf367b4aadc98398cf151a6945034baae290 /src/t_stream.c | |
parent | 595ecd5f4be39eeec71fb07f687b2d6b7cf5c20c (diff) | |
download | redis-714e103ac317bfa179b0a132c0f78d4ddc84a435.tar.gz |
Add XAUTOCLAIM (#7973)
New command: XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID]
The purpose is to claim entries from a stale consumer without the usual
XPENDING+XCLAIM combo which takes two round trips.
The syntax for XAUTOCLAIM is similar to scan: A cursor is returned (streamID)
by each call and should be used as start for the next call. 0-0 means the scan is complete.
This PR extends the deferred reply mechanism for any bulk string (not just counts)
This PR carries some unrelated test code changes:
- Renames the term "client" into "consumer" in the stream-cgroups test
- And also changes DEBUG SLEEP into "after"
Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/t_stream.c')
-rw-r--r-- | src/t_stream.c | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index 46918bae8..d085a8948 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -960,6 +960,11 @@ void addReplyStreamID(client *c, streamID *id) { addReplyBulkSds(c,replyid); } +void setDeferredReplyStreamID(client *c, void *dr, streamID *id) { + sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); + setDeferredReplyBulkSds(c, dr, replyid); +} + /* Similar to the above function, but just creates an object, usually useful * for replication purposes to create arguments. */ robj *createObjectFromStreamID(streamID *id) { @@ -2666,6 +2671,160 @@ cleanup: if (ids != static_ids) zfree(ids); } +/* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID] + * + * Gets ownership of one or multiple messages in the Pending Entries List + * of a given stream consumer group. + * + * For each PEL entry, if its idle time greater or equal to <min-idle-time>, + * then the message new owner becomes the specified <consumer>. + * If the minimum idle time specified is zero, messages are claimed + * regardless of their idle time. + * + * This command creates the consumer as side effect if it does not yet + * exists. Moreover the command reset the idle time of the message to 0. + * + * The command returns an array of messages that the user + * successfully claimed, so that the caller is able to understand + * what messages it is now in charge of. */ +void xautoclaimCommand(client *c) { + streamCG *group = NULL; + robj *o = lookupKeyRead(c->db,c->argv[1]); + long long minidle; /* Minimum idle time argument, in milliseconds. */ + long count = 100; /* Maximum entries to claim. */ + streamID startid; + int startex; + int justid = 0; + + /* Parse idle/start/end/count arguments ASAP if needed, in order to report + * syntax errors before any other error. */ + if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK) + return; + if (minidle < 0) minidle = 0; + + if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK) + return; + if (startex && streamIncrID(&startid) != C_OK) { + addReplyError(c,"invalid start ID for the interval"); + return; + } + + int j = 6; /* options start at argv[6] */ + while(j < c->argc) { + int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ + char *opt = c->argv[j]->ptr; + if (!strcasecmp(opt,"COUNT") && moreargs) { + if (getPositiveLongFromObjectOrReply(c,c->argv[j+1],&count,NULL) != C_OK) + return; + if (count == 0) { + addReplyError(c,"COUNT must be > 0"); + return; + } + j++; + } else if (!strcasecmp(opt,"JUSTID")) { + justid = 1; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + j++; + } + + if (o) { + if (checkType(c,o,OBJ_STREAM)) + return; /* Type error. */ + group = streamLookupCG(o->ptr,c->argv[2]->ptr); + } + + /* No key or group? Send an error given that the group creation + * is mandatory. */ + if (o == NULL || group == NULL) { + addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'", + (char*)c->argv[1]->ptr, + (char*)c->argv[2]->ptr); + return; + } + + /* Do the actual claiming. */ + streamConsumer *consumer = NULL; + long long attempts = count*10; + + addReplyArrayLen(c, 2); + void *endidptr = addReplyDeferredLen(c); + void *arraylenptr = addReplyDeferredLen(c); + + unsigned char startkey[sizeof(streamID)]; + streamEncodeID(startkey,&startid); + raxIterator ri; + raxStart(&ri,group->pel); + raxSeek(&ri,">=",startkey,sizeof(startkey)); + size_t arraylen = 0; + mstime_t now = mstime(); + while (attempts-- && count && raxNext(&ri)) { + streamNACK *nack = ri.data; + + if (minidle) { + mstime_t this_idle = now - nack->delivery_time; + if (this_idle < minidle) + continue; + } + + streamID id; + streamDecodeID(ri.key, &id); + + if (consumer == NULL) + consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE,NULL); + if (nack->consumer != consumer) { + /* Remove the entry from the old consumer. + * Note that nack->consumer is NULL if we created the + * NACK above because of the FORCE option. */ + if (nack->consumer) + raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); + } + + /* Update the consumer and idle time. */ + nack->delivery_time = now; + nack->delivery_count++; + + if (nack->consumer != consumer) { + /* Add the entry in the new consumer local PEL. */ + raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL); + nack->consumer = consumer; + } + + /* Send the reply for this entry. */ + if (justid) { + addReplyStreamID(c,&id); + } else { + size_t emitted = + streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL, + STREAM_RWR_RAWENTRIES,NULL); + if (!emitted) + addReplyNull(c); + } + arraylen++; + count--; + + /* Propagate this change. */ + robj *idstr = createObjectFromStreamID(&id); + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); + decrRefCount(idstr); + server.dirty++; + } + + streamID endid; + if (raxEOF(&ri)) { + endid.ms = endid.seq = 0; + } else { + streamDecodeID(ri.key, &endid); + } + raxStop(&ri); + + setDeferredArrayLen(c,arraylenptr,arraylen); + setDeferredReplyStreamID(c,endidptr,&endid); + + preventCommandPropagation(c); +} /* XDEL <key> [<ID1> <ID2> ... <IDN>] * |