summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-01-19 17:18:06 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commit86fe8fde2098f912f345e8c9385e678bba8fd2b2 (patch)
tree796e44a2bc04f186829d9f2ef669840543ca018d
parentccdae09046e05dae4d2e23a8a98a3ab7305c8d76 (diff)
downloadredis-86fe8fde2098f912f345e8c9385e678bba8fd2b2.tar.gz
CG: consumer lookup + initial streamReplyWithRange() work to supprot CG.
-rw-r--r--src/blocked.c4
-rw-r--r--src/stream.h4
-rw-r--r--src/t_stream.c44
3 files changed, 44 insertions, 8 deletions
diff --git a/src/blocked.c b/src/blocked.c
index d560a8f38..371f243bb 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -326,7 +326,9 @@ void handleClientsBlockedOnKeys(void) {
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,rl->key);
streamReplyWithRange(receiver,s,&start,NULL,
- receiver->bpop.xread_count,0);
+ receiver->bpop.xread_count,0,
+ NULL,
+ NULL);
}
}
}
diff --git a/src/stream.h b/src/stream.h
index 4b9e68885..91bdbee5d 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -64,7 +64,7 @@ typedef struct streamCG {
/* A specific consumer in a consumer group. */
typedef struct streamConsumer {
mstime_t seen_time; /* Last time this consumer was active. */
- sds *name; /* Consumer name. This is how the consumer
+ sds name; /* Consumer name. This is how the consumer
will be identified in the consumer group
protocol. Case sensitive. */
rax *pel; /* Consumer specific pending entries list: all
@@ -89,7 +89,7 @@ struct client;
stream *streamNew(void);
void freeStream(stream *s);
-size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev);
+size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
diff --git a/src/t_stream.c b/src/t_stream.c
index 65a926ae1..c51dc94cb 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -42,6 +42,7 @@
void streamFreeCG(streamCG *cg);
streamCG *streamLookupCG(stream *s, sds groupname);
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
@@ -659,8 +660,19 @@ void streamIteratorStop(streamIterator *si) {
* receive is between start and end inclusive, if 'count' is non zero, no more
* than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
* we want all the elements from 'start' till the end of the stream. If 'rev'
- * is non zero, elements are produced in reversed order from end to start. */
-size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev) {
+ * is non zero, elements are produced in reversed order from end to start.
+ *
+ * If group and consumer are not NULL, the function performs additional work:
+ * 1. It updates the last delivered ID in the group in case we are
+ * sending IDs greater than the current last ID.
+ * 2. If the requested IDs are already assigned to some other consumer, the
+ * function will not return it to the client.
+ * 3. An entry in the pending list will be created for every entry delivered
+ * for the first time to this consumer. This is only performed if
+ * consumer != NULL, so in order to implement the XREADGROUP NOACK option
+ * no consumer is passed to this function.
+ */
+size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer) {
void *arraylen_ptr = addDeferredMultiBulkLength(c);
size_t arraylen = 0;
streamIterator si;
@@ -903,7 +915,7 @@ void xrangeGenericCommand(client *c, int rev) {
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
- streamReplyWithRange(c,s,&startid,&endid,count,rev);
+ streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL);
}
/* XRANGE key start end [COUNT <n>] */
@@ -1063,13 +1075,18 @@ void xreadCommand(client *c) {
* so start from the next ID, since we want only messages with
* IDs greater than start. */
streamID start = *gt;
- start.seq++; /* Can't overflow, it's an uint64_t */
+ start.seq++; /* uint64_t can't overflow in this context. */
/* Emit the two elements sub-array consisting of the name
* of the stream and the data we extracted from it. */
addReplyMultiBulkLen(c,2);
addReplyBulk(c,c->argv[i+streams_arg]);
- streamReplyWithRange(c,s,&start,NULL,count,0);
+ streamConsumer *consumer = NULL;
+ if (groups) consumer = streamLookupConsumer(groups[i],
+ consumername->ptr);
+ streamReplyWithRange(c,s,&start,NULL,count,0,
+ groups ? groups[i] : NULL,
+ consumer);
}
}
@@ -1118,6 +1135,7 @@ void streamNotAckedFree(streamNotAcked *na) {
}
void streamConsumerFree(streamConsumer *sc) {
+ zfree(sc->name);
zfree(sc);
}
@@ -1154,6 +1172,22 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
return (cg == raxNotFound) ? NULL : cg;
}
+/* Lookup the consumer with the specified name in the group 'cg': if the
+ * consumer does not exist it is automatically created as a side effect
+ * of calling this function, otherwise its last seen time is updated and
+ * the existing consumer reference returned. */
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name) {
+ streamConsumer *c = raxFind(cg->consumers,(unsigned char*)name,
+ sdslen(name));
+ if (c == raxNotFound) {
+ c = zmalloc(sizeof(*c));
+ c->name = sdsdup(name);
+ c->pel = raxNew();
+ }
+ c->seen_time = mstime();
+ return c;
+}
+
/* -----------------------------------------------------------------------
* Consumer groups commands
* ----------------------------------------------------------------------- */