summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-01-24 18:51:23 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commitaa808394f64ce735311f63bea1a142d5fab9bc82 (patch)
tree6c55764fc5543fce465020c3678f75ba7623af1a
parentbbec4569a57df0ee14e64f64caa6bb0b12c7f9db (diff)
downloadredis-aa808394f64ce735311f63bea1a142d5fab9bc82.tar.gz
CG: first draft of streamReplyWithRangeFromConsumerPEL().
-rw-r--r--src/stream.h2
-rw-r--r--src/t_stream.c101
2 files changed, 81 insertions, 22 deletions
diff --git a/src/stream.h b/src/stream.h
index fa6947482..917392076 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -89,7 +89,7 @@ struct client;
stream *streamNew(void);
void freeStream(stream *s);
-size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int noack);
+size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
diff --git a/src/t_stream.c b/src/t_stream.c
index 1d1dd8943..0277f72a9 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -44,6 +44,7 @@ void streamFreeCG(streamCG *cg);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
streamNACK *streamCreateNACK(streamConsumer *consumer);
+size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer);
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
@@ -666,16 +667,12 @@ void streamIteratorStop(streamIterator *si) {
raxStop(&si->ri);
}
-/* This is an helper function for streamReplyWithRange() when called with
- * group and consumer arguments, but with a range that is referring to already
- * delivered messages. In this case we just emit messages that are already
- * in the history of the conusmer, fetching the IDs from its PEL.
- *
- * Note that this function does not have a 'rev' argument because it's not
- * possible to iterate in reverse using a group. Basically this function
- * is only called as a result of the XREADGROUP command. */
-size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) {
- /* TODO: update the last time delivery and delivery count. */
+/* Emit a reply in the client output buffer by formatting a Stream ID
+ * in the standard <ms>-<seq> format, using the simple string protocol
+ * of REPL. */
+void addReplyStreamID(client *c, streamID *id) {
+ sds replyid = sdscatfmt(sdsempty(),"+%U-%U\r\n",id->ms,id->seq);
+ addReplySds(c,replyid);
}
/* Send the specified range to the client 'c'. The range the client will
@@ -690,11 +687,30 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
* 2. If the requested IDs are already assigned to some other consumer, the
* function will not return it to the client.
* 3. An entry in the pending list will be created for every entry delivered
- * for the first time to this consumer. This is only performed if
- * 'noack' is non-zero.
+ * for the first time to this consumer.
+ *
+ * The behavior may be modified passing non-zero flags:
+ *
+ * STREAM_RWR_NOACK: Do not craete PEL entries, that is, the point "3" above
+ * is not performed.
+ * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries,
+ * and return the number of entries emitted as usually.
+ * This is used when the function is just used in order
+ * to emit data and there is some higher level logic.
+ *
+ * Note that this function is recursive in certian cases. When it's called
+ * with a non NULL group and consumer argument, it may call
+ * streamReplyWithRangeFromConsumerPEL() in order to get entries from the
+ * consumer pending entires list. However such a function will then call
+ * streamReplyWithRange() in order to emit single entries (found in the
+ * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES
+ * flag.
*/
-size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int noack) {
- void *arraylen_ptr;
+#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
+#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
+ boundaries, just the entries. */
+size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags) {
+ void *arraylen_ptr = NULL;
size_t arraylen = 0;
streamIterator si;
int64_t numfields;
@@ -713,7 +729,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
group,consumer);
}
- arraylen_ptr = addDeferredMultiBulkLength(c);
+ if (!(flags & STREAM_RWR_RAWENTRIES))
+ arraylen_ptr = addDeferredMultiBulkLength(c);
streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */
@@ -722,9 +739,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
- sds replyid = sdscatfmt(sdsempty(),"+%U-%U\r\n",id.ms,id.seq);
addReplyMultiBulkLen(c,2);
- addReplySds(c,replyid);
+ addReplyStreamID(c,&id);
addReplyMultiBulkLen(c,numfields*2);
/* Emit the field-value pairs. */
@@ -744,7 +760,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* associated with some other consumer, because if we reached this
* loop the IDs the user is requesting are greater than any message
* delivered for this group. */
- if (group && !noack) {
+ if (group && !(flags & STREAM_RWR_NOACK)) {
unsigned char buf[sizeof(streamID)];
streamEncodeID(buf,&id);
streamNACK *nack = streamCreateNACK(consumer);
@@ -755,10 +771,54 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
}
}
streamIteratorStop(&si);
- setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
+ if (arraylen_ptr) setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
return arraylen;
}
+/* This is an helper function for streamReplyWithRange() when called with
+ * group and consumer arguments, but with a range that is referring to already
+ * delivered messages. In this case we just emit messages that are already
+ * in the history of the conusmer, fetching the IDs from its PEL.
+ *
+ * Note that this function does not have a 'rev' argument because it's not
+ * possible to iterate in reverse using a group. Basically this function
+ * is only called as a result of the XREADGROUP command.
+ *
+ * This function is more expensive because it needs to inspect the PEL and then
+ * seek into the radix tree of the messages in order to emit the full message
+ * to the client. However clients only reach this code path when they are
+ * fetching the history of already retrieved messages, which is rare. */
+size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) {
+ raxIterator ri;
+ unsigned char startkey[sizeof(streamID)];
+ unsigned char endkey[sizeof(streamID)];
+ streamEncodeID(startkey,start);
+ if (end) streamEncodeID(endkey,start);
+
+ size_t arraylen = 0;
+ void *arraylen_ptr = addDeferredMultiBulkLength(c);
+ raxStart(&ri,consumer->pel);
+ raxSeek(&ri,">=",startkey,sizeof(startkey));
+ while(raxNext(&ri)) {
+ if (end && memcmp(end,ri.key,ri.key_len) > 0) break;
+ if (streamReplyWithRange(c,s,start,end,1,0,NULL,NULL,
+ STREAM_RWR_RAWENTRIES) == 0)
+ {
+ /* Note that we may have a not acknowledged entry in the PEL
+ * about a message that's no longer here because was removed
+ * by the user by other means. In that case we signal it emitting
+ * the ID but then a NULL entry for the fields. */
+ addReplyMultiBulkLen(c,2);
+ streamID id;
+ streamDecodeID(ri.key,&id);
+ addReplyStreamID(c,&id);
+ addReply(c,shared.nullmultibulk);
+ }
+ }
+ raxStop(&ri);
+ setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
+}
+
/* -----------------------------------------------------------------------
* Stream commands implementation
* ----------------------------------------------------------------------- */
@@ -904,8 +964,7 @@ void xaddCommand(client *c) {
"target stream top item");
return;
}
- sds reply = sdscatfmt(sdsempty(),"+%U-%U\r\n",id.ms,id.seq);
- addReplySds(c,reply);
+ addReplyStreamID(c,&id);
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);