summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-01-23 18:52:24 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commit41809fd969cb763c0323124aefaaaa2f5ab77839 (patch)
treedd63cee43f261cc9c8381c9378abc4d9608c7f74
parent1ffb6723f52a1473385b7284ec0e53733083ddab (diff)
downloadredis-41809fd969cb763c0323124aefaaaa2f5ab77839.tar.gz
CG: creation of NACK entries in PELs.
-rw-r--r--src/blocked.c3
-rw-r--r--src/stream.h10
-rw-r--r--src/t_stream.c86
3 files changed, 75 insertions, 24 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 371f243bb..0bbbe6c6a 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -327,8 +327,7 @@ void handleClientsBlockedOnKeys(void) {
addReplyBulk(receiver,rl->key);
streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count,0,
- NULL,
- NULL);
+ NULL,NULL,0);
}
}
}
diff --git a/src/stream.h b/src/stream.h
index 91bdbee5d..fa6947482 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -55,7 +55,7 @@ typedef struct streamCG {
the NOACK option) that was yet not acknowledged
as processed. The key of the radix tree is the
ID as a 64 bit big endian number, while the
- associated value is a streamNotAcked structure.*/
+ associated value is a streamNACK structure.*/
rax *consumers; /* A radix tree representing the consumers by name
and their associated representation in the form
of streamConsumer structures. */
@@ -71,25 +71,25 @@ typedef struct streamConsumer {
the pending messages delivered to this
consumer not yet acknowledged. Keys are
big endian message IDs, while values are
- the same streamNotAcked structure referenced
+ the same streamNACK structure referenced
in the "pel" of the conumser group structure
itself, so the value is shared. */
} streamConsumer;
/* Pending (yet not acknowledged) message in a consumer group. */
-typedef struct streamNotAcked {
+typedef struct streamNACK {
mstime_t delivery_time; /* Last time this message was delivered. */
uint64_t delivery_count; /* Number of times this message was delivered.*/
streamConsumer *consumer; /* The consumer this message was delivered to
in the last delivery. */
-} streamNotAcked;
+} streamNACK;
/* Prototypes of exported APIs. */
struct client;
stream *streamNew(void);
void freeStream(stream *s);
-size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer);
+size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int noack);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
diff --git a/src/t_stream.c b/src/t_stream.c
index 4d9c45548..605d3251b 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -43,6 +43,7 @@
void streamFreeCG(streamCG *cg);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
+streamNACK *streamCreateNACK(streamConsumer *consumer);
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
@@ -665,6 +666,18 @@ void streamIteratorStop(streamIterator *si) {
raxStop(&si->ri);
}
+/* This is an helper function for streamReplyWithRange() when called with
+ * group and consumer arguments, but with a range that is referring to already
+ * delivered messages. In this case we just emit messages that are already
+ * in the history of the conusmer, fetching the IDs from its PEL.
+ *
+ * Note that this function does not have a 'rev' argument because it's not
+ * possible to iterate in reverse using a group. Basically this function
+ * is only called as a result of the XREADGROUP command. */
+size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) {
+ /* TODO: update the last time delivery and delivery count. */
+}
+
/* Send the specified range to the client 'c'. The range the client will
* receive is between start and end inclusive, if 'count' is non zero, no more
* than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
@@ -678,24 +691,27 @@ void streamIteratorStop(streamIterator *si) {
* function will not return it to the client.
* 3. An entry in the pending list will be created for every entry delivered
* for the first time to this consumer. This is only performed if
- * consumer != NULL, so in order to implement the XREADGROUP NOACK option
- * no consumer is passed to this function.
+ * 'noack' is non-zero.
*/
-size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer) {
+size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int noack) {
void *arraylen_ptr = addDeferredMultiBulkLength(c);
size_t arraylen = 0;
streamIterator si;
int64_t numfields;
streamID id;
- /* If a group was passed, as an optimization we check if the range
- * specified is about messages that were never delivered. This is true if
- * the 'start' range is greater than the current last_id in the stream.
- * In that case, there is no need to check if the messages may already
- * have another owner before delivering the message. This speeds up
- * the processing significantly. */
- int newmessages = group != NULL &&
- streamCompareID(start,&group->last_id) > 0;
+ /* If a group was passed, we check if the request is about messages
+ * never delivered so far (normally this happens when ">" ID is passed).
+ *
+ * If instead the client is asking for some history, we serve it
+ * using a different function, so that we return entries *solely*
+ * from its own PEL. This ensures each consumer will always and only
+ * see the history of messages delivered to it and not yet confirmed
+ * as delivered. */
+ if (group && streamCompareID(start,&group->last_id) <= 0) {
+ return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
+ group,consumer);
+ }
streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
@@ -720,6 +736,22 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
}
arraylen++;
if (count && count == arraylen) break;
+
+ /* If a group is passed, we need to create an entry in the
+ * PEL (pending entries list) of this group *and* this consumer.
+ * Note that we are sure about the fact the message is not already
+ * associated with some other consumer, because if we reached this
+ * loop the IDs the user is requesting are greater than any message
+ * delivered for this group. */
+ if (group && !noack) {
+ unsigned char buf[sizeof(streamID)];
+ streamEncodeID(buf,&id);
+ streamNACK *nack = streamCreateNACK(consumer);
+ int retval = 0;
+ retval += raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
+ retval += raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
+ serverAssert(retval == 2); /* Make sure entry was inserted. */
+ }
}
streamIteratorStop(&si);
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
@@ -937,7 +969,7 @@ void xrangeGenericCommand(client *c, int rev) {
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
- streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL);
+ streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0);
}
/* XRANGE key start end [COUNT <n>] */
@@ -972,6 +1004,7 @@ void xreadCommand(client *c) {
long long count = 0;
int streams_count = 0;
int streams_arg = 0;
+ int noack = 0; /* True if NOACK option was specified. */
#define STREAMID_STATIC_VECTOR_LEN 8
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
@@ -1013,6 +1046,13 @@ void xreadCommand(client *c) {
groupname = c->argv[i+1];
consumername = c->argv[i+2];
i += 2;
+ } else if (!strcasecmp(o,"NOACK")) {
+ if (!xreadgroup) {
+ addReplyError(c,"The NOACK option is only supported by "
+ "XREADGROUP. You called XREAD instead.");
+ return;
+ }
+ noack = 1;
} else {
addReply(c,shared.syntaxerr);
return;
@@ -1109,7 +1149,7 @@ void xreadCommand(client *c) {
consumername->ptr);
streamReplyWithRange(c,s,&start,NULL,count,0,
groups ? groups[i] : NULL,
- consumer);
+ consumer, noack);
}
}
@@ -1153,7 +1193,19 @@ cleanup:
* Low level implementation of consumer groups
* ----------------------------------------------------------------------- */
-void streamNotAckedFree(streamNotAcked *na) {
+/* Create a NACK entry setting the delivery count to 1 and the delivery
+ * time to the current time. The NACK consumer will be set to the one
+ * specified as argument of the function. */
+streamNACK *streamCreateNACK(streamConsumer *consumer) {
+ streamNACK *nack = zmalloc(sizeof(*nack));
+ nack->delivery_time = mstime();
+ nack->delivery_count = 1;
+ nack->consumer = consumer;
+ return nack;
+}
+
+/* Free a NACK entry. */
+void streamFreeNACK(streamNACK *na) {
zfree(na);
}
@@ -1162,7 +1214,7 @@ void streamNotAckedFree(streamNotAcked *na) {
* nor will delete them from the stream, so when this function is called
* to delete a consumer, and not when the whole stream is destroyed, the caller
* should do some work before. */
-void streamConsumerFree(streamConsumer *sc) {
+void streamFreeConsumer(streamConsumer *sc) {
raxFree(sc->pel); /* No value free callback: the PEL entries are shared
between the consumer and the main stream PEL. */
sdsfree(sc->name);
@@ -1188,8 +1240,8 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID id) {
/* Free a consumer group and all its associated data. */
void streamFreeCG(streamCG *cg) {
- raxFreeWithCallback(cg->pel,(void(*)(void*))streamNotAckedFree);
- raxFreeWithCallback(cg->consumers,(void(*)(void*))streamConsumerFree);
+ raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK);
+ raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer);
zfree(cg);
}