summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
authorValentino Geron <valentino@redislabs.com>2020-04-21 20:55:43 +0300
committerValentino Geron <valentino@redislabs.com>2020-04-22 17:03:51 +0300
commit6fd2d7cfee05c7493a0c50e9ef462b8183035ca1 (patch)
treeb55952f2fc129e3d0e43cb041b90dd6c3fe21d99 /src/t_stream.c
parentc49fb47fbe0086dc9e44b8ac39128beb7ec1bf3a (diff)
downloadredis-6fd2d7cfee05c7493a0c50e9ef462b8183035ca1.tar.gz
XREADGROUP with NOACK should propagate only one XGROUP SETID command
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c20
1 files changed, 13 insertions, 7 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 155167af9..2dffdbcdf 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -935,6 +935,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamIterator si;
int64_t numfields;
streamID id;
+ int propagate_last_id = 0;
+ int noack = flags & STREAM_RWR_NOACK;
/* If the client is asking for some history, we serve it using a
* different function, so that we return entries *solely* from its
@@ -950,12 +952,14 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
arraylen_ptr = addReplyDeferredLen(c);
streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
- int propagate_last_id = 0;
-
/* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0) {
group->last_id = id;
- propagate_last_id = 1;
+ /* Group last id should be propagated only if NOACK was
+ * specified, otherwise the last id would be included
+ * in XCLAIM. */
+ if (noack)
+ propagate_last_id = 1;
}
/* Emit a two elements array for each item. The first is
@@ -983,7 +987,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* XGROUP SETID command. So if we find that there is already
* a NACK for the entry, we need to associate it to the new
* consumer. */
- if (group && !(flags & STREAM_RWR_NOACK)) {
+ if (group && !noack) {
unsigned char buf[sizeof(streamID)];
streamEncodeID(buf,&id);
@@ -1020,14 +1024,16 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
decrRefCount(idarg);
}
- } else {
- if (propagate_last_id)
- streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
}
arraylen++;
if (count && count == arraylen) break;
}
+
+ if (spi && propagate_last_id) {
+ streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
+ }
+
streamIteratorStop(&si);
if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
return arraylen;