summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-02-16 17:25:35 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commit0a6780e560e9077326eab1bd9dd07e3ccc44b8dc (patch)
tree618fb5e573dbc9d5b42eb0613db9bb5b8485481b
parent00a29b1a81f75dce175ac2199a7e1e9806a476dc (diff)
downloadredis-0a6780e560e9077326eab1bd9dd07e3ccc44b8dc.tar.gz
CG: XCLAIM initial draft.
-rw-r--r--src/t_stream.c166
1 files changed, 163 insertions, 3 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 872005949..dc6ac8c61 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1153,7 +1153,7 @@ void xreadCommand(client *c) {
if (o == NULL ||
(group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
{
- addReplyErrorFormat(c, "No such key '%s' or consumer "
+ addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
"group '%s' in XREADGROUP with GROUP "
"option",
key->ptr,groupname->ptr);
@@ -1491,7 +1491,7 @@ void xpendingCommand(client *c) {
if (o == NULL ||
(group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
{
- addReplyErrorFormat(c, "No such key '%s' or consumer "
+ addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
"group '%s'",
key->ptr,groupname->ptr);
return;
@@ -1593,7 +1593,167 @@ void xpendingCommand(client *c) {
}
}
-/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ...*/
+/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
+ * [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
+ * [FORCE] [JUSTID]
+ *
+ * Gets ownership of one or multiple messages in the Pending Entries List
+ * of a given stream consumer group.
+ *
+ * If the message ID (among the specified ones) exists, and its idle
+ * time greater or equal to <min-idle-time>, then the message new owner
+ * becomes the specified <consumer>.
+ *
+ * All the messages that cannot be found inside the pending entires list
+ * are ignored, but in case the FORCE option is used. In that case we
+ * create the NACK (representing a not yet acknowledged message) entry in
+ * the consumer group PEL.
+ *
+ * This command creates the consumer as side effect if it does not yet
+ * exists.
+ *
+ * The options at the end can be used in order to specify more attributes
+ * to set in the representation of the pending message:
+ *
+ * 1. IDLE <ms>:
+ * Set the idle time (last time it was delivered) of the message.
+ * If IDLE is not specified, an IDLE of 0 is assumed, that is,
+ * the time count is reset because the message has now a new
+ * owner trying to process it.
+ *
+ * 2. TIME <ms-unix-time>:
+ * This is the same as IDLE but instead of a relative amount of
+ * milliseconds, it sets the idle time to a specific unix time
+ * (in milliseconds). This is useful in order to rewrite the AOF
+ * file generating XCLAIM commands.
+ *
+ * 3. RETRYCOUNT <count>:
+ * Set the retry counter to the specified value. This counter is
+ * incremented every time a message is delivered again. Normally
+ * XCLAIM does not alter this counter, which is just served to clients
+ * when the XPENDING command is called: this way clients can detect
+ * anomalies, like messages that are never processed for some reason
+ * after a big number of delivery attempts.
+ *
+ * 4. FORCE:
+ * Creates the pending message entry in the PEL even if certain
+ * specified IDs are not already in the PEL assigned to a different
+ * client.
+ *
+ * 5. JUSTID:
+ * Return just an array of IDs of messages successfully claimed,
+ * without returning the actual message.
+ *
+ * The command returns an array of messages that the user
+ * successfully claimed, so that the caller is able to understand
+ * what messages it is now in charge of. */
+void xclaimCommand(client *c) {
+ streamCG *group;
+ robj *o = lookupKeyRead(c->db,c->argv[1]);
+ long long minidle; /* Minimum idle time argument. */
+ long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */
+ mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */
+ int force = 0;
+ int justid = 0;
+
+ if (o) {
+ if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
+ group = streamLookupCG(o->ptr,c->argv[2]->ptr);
+ }
+
+ /* No key or group? Send an error given that the group creation
+ * is mandatory. */
+ if (o == NULL || group == NULL) {
+ addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
+ "consumer group '%s'", c->argv[1]->ptr,
+ c->argv[2]->ptr);
+ return;
+ }
+
+ if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
+ "Invalid min-idle-time argument for XCLAIM")
+ != C_OK) return;
+ if (minidle < 0) minidle = 0;
+
+ /* Start parsing the IDs, so that we abort ASAP if there is a syntax
+ * error: the return value of this command cannot be an error in case
+ * the client successfully claimed some message, so it should be
+ * executed in a "all or nothing" fashion. */
+ int j;
+ for (j = 4; j < c->argc; j++) {
+ streamID id;
+ if (streamParseIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;
+ }
+ int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
+
+ /* If we stopped because some IDs cannot be parsed, perhaps they
+ * are trailing options. */
+ time_t now = 0;
+ for (; j < c->argc; j++) {
+ int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
+ char *opt = c->argv[j]->ptr;
+ if (!strcasecmp(opt,"FORCE")) {
+ force = 1;
+ } else if (!strcasecmp(opt,"JUSTID")) {
+ justid = 1;
+ } else if (!strcasecmp(opt,"IDLE") && moreargs) {
+ j++;
+ if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
+ "Invalid IDLE option argument for XCLAIM")
+ != C_OK) return;
+ now = mstime();
+ deliverytime = now - deliverytime;
+ } else if (!strcasecmp(opt,"TIME") && moreargs) {
+ j++;
+ if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
+ "Invalid IDLE option argument for XCLAIM")
+ != C_OK) return;
+ } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) {
+ j++;
+ if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
+ "Invalid IDLE option argument for XCLAIM")
+ != C_OK) return;
+ } else {
+ addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
+ return;
+ }
+ }
+
+ if (deliverytime != -1) {
+ now = (now == 0) ? mstime() : now;
+ if (deliverytime < 0 || deliverytime > now) deliverytime = now;
+ }
+
+ /* Do the actual claiming. */
+ streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
+ void *arraylenptr = addDeferredMultiBulkLength(c);
+ size_t arraylen = 0;
+ for (int j = 5; j <= last_id_arg; j++) {
+ streamID id;
+ unsigned char buf[sizeof(streamID)];
+ if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
+ streamEncodeID(buf,&id);
+
+ /* Lookup the ID in the group PEL. */
+ streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
+ if (nack != raxNotFound) {
+ /* Remove the entry from the old consumer. */
+ raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
+ /* Update the consumer. */
+ nack->consumer = consumer;
+ /* Add the entry in the new cosnumer local PEL. */
+ raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
+ /* Send the reply for this entry. */
+ if (justid) {
+ addReplyStreamID(c,&id);
+ } else {
+ streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL,
+ STREAM_RWR_RAWENTRIES);
+ }
+ }
+ }
+ setDeferredMultiBulkLength(c,arraylenptr,arraylen);
+}
/* XREAD-GROUP will be implemented by xreadGenericCommand() */