summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-01-26 11:57:19 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commitb65fe09bb8d00d9edf07f29274e0405a9d802fe3 (patch)
treefae5697b203a7e30335e6f69082ecfa6a4ca8799
parent5ad29325fed710011236763b3b73256ef8319103 (diff)
downloadredis-b65fe09bb8d00d9edf07f29274e0405a9d802fe3.tar.gz
CG: Now XREADGROUP + blocking operations work.
-rw-r--r--src/blocked.c30
-rw-r--r--src/stream.h2
-rw-r--r--src/t_stream.c18
3 files changed, 42 insertions, 8 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 0bbbe6c6a..8f472f2b8 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -313,9 +313,31 @@ void handleClientsBlockedOnKeys(void) {
{
streamID start = *gt;
start.seq++; /* Can't overflow, it's an uint64_t */
+
+ /* If we blocked in the context of a consumer
+ * group, we need to resolve the group and
+ * consumer here. */
+ streamCG *group = NULL;
+ streamConsumer *consumer = NULL;
+ if (receiver->bpop.xread_group) {
+ group = streamLookupCG(s,
+ receiver->bpop.xread_group->ptr);
+ /* In theory if the group is not found we
+ * just perform the read without the group,
+ * but actually when the group, or the key
+ * itself is deleted (triggering the removal
+ * of the group), we check for blocked clients
+ * and send them an error. */
+ }
+ if (group) {
+ consumer = streamLookupConsumer(group,
+ receiver->bpop.xread_consumer->ptr);
+ }
+
/* Note that after we unblock the client, 'gt'
- * is no longer valid, so we must do it after
- * we copied the ID into the 'start' variable. */
+ * and other receiver->bpop stuff are no longer
+ * valid, so we must do the setup above before
+ * this call. */
unblockClient(receiver);
/* Emit the two elements sub-array consisting of
@@ -326,8 +348,8 @@ void handleClientsBlockedOnKeys(void) {
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,rl->key);
streamReplyWithRange(receiver,s,&start,NULL,
- receiver->bpop.xread_count,0,
- NULL,NULL,0);
+ receiver->bpop.xread_count,
+ 0, group, consumer, 0);
}
}
}
diff --git a/src/stream.h b/src/stream.h
index 917392076..908e9ff72 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -94,5 +94,7 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorStop(streamIterator *si);
+streamCG *streamLookupCG(stream *s, sds groupname);
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
#endif
diff --git a/src/t_stream.c b/src/t_stream.c
index 3f87b5764..f4babd3fc 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -41,8 +41,6 @@
#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
void streamFreeCG(streamCG *cg);
-streamCG *streamLookupCG(stream *s, sds groupname);
-streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
streamNACK *streamCreateNACK(streamConsumer *consumer);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer);
@@ -1242,8 +1240,20 @@ void xreadCommand(client *c) {
* in case the ID provided is too low, we do not want the server to
* block just to serve this client a huge stream of messages. */
c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;
- c->bpop.xread_group = groupname;
- c->bpop.xread_consumer = consumername;
+
+ /* If this is a XREADGROUP + GROUP we need to remember for which
+ * group and consumer name we are blocking, so later when one of the
+ * keys receive more data, we can call streamReplyWithRange() passing
+ * the right arguments. */
+ if (groupname) {
+ incrRefCount(groupname);
+ incrRefCount(consumername);
+ c->bpop.xread_group = groupname;
+ c->bpop.xread_consumer = consumername;
+ } else {
+ c->bpop.xread_group = NULL;
+ c->bpop.xread_consumer = NULL;
+ }
goto cleanup;
}