summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
authorguybe7 <guy.benoish@redislabs.com>2021-01-06 10:34:27 +0200
committerGitHub <noreply@github.com>2021-01-06 10:34:27 +0200
commit714e103ac317bfa179b0a132c0f78d4ddc84a435 (patch)
treefbeabf367b4aadc98398cf151a6945034baae290 /src/t_stream.c
parent595ecd5f4be39eeec71fb07f687b2d6b7cf5c20c (diff)
downloadredis-714e103ac317bfa179b0a132c0f78d4ddc84a435.tar.gz
Add XAUTOCLAIM (#7973)
New command: XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID] The purpose is to claim entries from a stale consumer without the usual XPENDING+XCLAIM combo which takes two round trips. The syntax for XAUTOCLAIM is similar to scan: A cursor is returned (streamID) by each call and should be used as start for the next call. 0-0 means the scan is complete. This PR extends the deferred reply mechanism for any bulk string (not just counts) This PR carries some unrelated test code changes: - Renames the term "client" into "consumer" in the stream-cgroups test - And also changes DEBUG SLEEP into "after" Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c159
1 files changed, 159 insertions, 0 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 46918bae8..d085a8948 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -960,6 +960,11 @@ void addReplyStreamID(client *c, streamID *id) {
addReplyBulkSds(c,replyid);
}
+void setDeferredReplyStreamID(client *c, void *dr, streamID *id) {
+ sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
+ setDeferredReplyBulkSds(c, dr, replyid);
+}
+
/* Similar to the above function, but just creates an object, usually useful
* for replication purposes to create arguments. */
robj *createObjectFromStreamID(streamID *id) {
@@ -2666,6 +2671,160 @@ cleanup:
if (ids != static_ids) zfree(ids);
}
+/* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID]
+ *
+ * Gets ownership of one or multiple messages in the Pending Entries List
+ * of a given stream consumer group.
+ *
+ * For each PEL entry, if its idle time greater or equal to <min-idle-time>,
+ * then the message new owner becomes the specified <consumer>.
+ * If the minimum idle time specified is zero, messages are claimed
+ * regardless of their idle time.
+ *
+ * This command creates the consumer as side effect if it does not yet
+ * exists. Moreover the command reset the idle time of the message to 0.
+ *
+ * The command returns an array of messages that the user
+ * successfully claimed, so that the caller is able to understand
+ * what messages it is now in charge of. */
+void xautoclaimCommand(client *c) {
+ streamCG *group = NULL;
+ robj *o = lookupKeyRead(c->db,c->argv[1]);
+ long long minidle; /* Minimum idle time argument, in milliseconds. */
+ long count = 100; /* Maximum entries to claim. */
+ streamID startid;
+ int startex;
+ int justid = 0;
+
+ /* Parse idle/start/end/count arguments ASAP if needed, in order to report
+ * syntax errors before any other error. */
+ if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK)
+ return;
+ if (minidle < 0) minidle = 0;
+
+ if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK)
+ return;
+ if (startex && streamIncrID(&startid) != C_OK) {
+ addReplyError(c,"invalid start ID for the interval");
+ return;
+ }
+
+ int j = 6; /* options start at argv[6] */
+ while(j < c->argc) {
+ int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
+ char *opt = c->argv[j]->ptr;
+ if (!strcasecmp(opt,"COUNT") && moreargs) {
+ if (getPositiveLongFromObjectOrReply(c,c->argv[j+1],&count,NULL) != C_OK)
+ return;
+ if (count == 0) {
+ addReplyError(c,"COUNT must be > 0");
+ return;
+ }
+ j++;
+ } else if (!strcasecmp(opt,"JUSTID")) {
+ justid = 1;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ j++;
+ }
+
+ if (o) {
+ if (checkType(c,o,OBJ_STREAM))
+ return; /* Type error. */
+ group = streamLookupCG(o->ptr,c->argv[2]->ptr);
+ }
+
+ /* No key or group? Send an error given that the group creation
+ * is mandatory. */
+ if (o == NULL || group == NULL) {
+ addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'",
+ (char*)c->argv[1]->ptr,
+ (char*)c->argv[2]->ptr);
+ return;
+ }
+
+ /* Do the actual claiming. */
+ streamConsumer *consumer = NULL;
+ long long attempts = count*10;
+
+ addReplyArrayLen(c, 2);
+ void *endidptr = addReplyDeferredLen(c);
+ void *arraylenptr = addReplyDeferredLen(c);
+
+ unsigned char startkey[sizeof(streamID)];
+ streamEncodeID(startkey,&startid);
+ raxIterator ri;
+ raxStart(&ri,group->pel);
+ raxSeek(&ri,">=",startkey,sizeof(startkey));
+ size_t arraylen = 0;
+ mstime_t now = mstime();
+ while (attempts-- && count && raxNext(&ri)) {
+ streamNACK *nack = ri.data;
+
+ if (minidle) {
+ mstime_t this_idle = now - nack->delivery_time;
+ if (this_idle < minidle)
+ continue;
+ }
+
+ streamID id;
+ streamDecodeID(ri.key, &id);
+
+ if (consumer == NULL)
+ consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE,NULL);
+ if (nack->consumer != consumer) {
+ /* Remove the entry from the old consumer.
+ * Note that nack->consumer is NULL if we created the
+ * NACK above because of the FORCE option. */
+ if (nack->consumer)
+ raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
+ }
+
+ /* Update the consumer and idle time. */
+ nack->delivery_time = now;
+ nack->delivery_count++;
+
+ if (nack->consumer != consumer) {
+ /* Add the entry in the new consumer local PEL. */
+ raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL);
+ nack->consumer = consumer;
+ }
+
+ /* Send the reply for this entry. */
+ if (justid) {
+ addReplyStreamID(c,&id);
+ } else {
+ size_t emitted =
+ streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,
+ STREAM_RWR_RAWENTRIES,NULL);
+ if (!emitted)
+ addReplyNull(c);
+ }
+ arraylen++;
+ count--;
+
+ /* Propagate this change. */
+ robj *idstr = createObjectFromStreamID(&id);
+ streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
+ decrRefCount(idstr);
+ server.dirty++;
+ }
+
+ streamID endid;
+ if (raxEOF(&ri)) {
+ endid.ms = endid.seq = 0;
+ } else {
+ streamDecodeID(ri.key, &endid);
+ }
+ raxStop(&ri);
+
+ setDeferredArrayLen(c,arraylenptr,arraylen);
+ setDeferredReplyStreamID(c,endidptr,&endid);
+
+ preventCommandPropagation(c);
+}
/* XDEL <key> [<ID1> <ID2> ... <IDN>]
*