diff options
author | Valentino Geron <valentino@redislabs.com> | 2020-04-21 20:55:43 +0300 |
---|---|---|
committer | Valentino Geron <valentino@redislabs.com> | 2020-04-22 17:03:51 +0300 |
commit | 6fd2d7cfee05c7493a0c50e9ef462b8183035ca1 (patch) | |
tree | b55952f2fc129e3d0e43cb041b90dd6c3fe21d99 /src/t_stream.c | |
parent | c49fb47fbe0086dc9e44b8ac39128beb7ec1bf3a (diff) | |
download | redis-6fd2d7cfee05c7493a0c50e9ef462b8183035ca1.tar.gz |
XREADGROUP with NOACK should propagate only one XGROUP SETID command
Diffstat (limited to 'src/t_stream.c')
-rw-r--r-- | src/t_stream.c | 20 |
1 files changed, 13 insertions, 7 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index 155167af9..2dffdbcdf 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -935,6 +935,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end streamIterator si; int64_t numfields; streamID id; + int propagate_last_id = 0; + int noack = flags & STREAM_RWR_NOACK; /* If the client is asking for some history, we serve it using a * different function, so that we return entries *solely* from its @@ -950,12 +952,14 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end arraylen_ptr = addReplyDeferredLen(c); streamIteratorStart(&si,s,start,end,rev); while(streamIteratorGetID(&si,&id,&numfields)) { - int propagate_last_id = 0; - /* Update the group last_id if needed. */ if (group && streamCompareID(&id,&group->last_id) > 0) { group->last_id = id; - propagate_last_id = 1; + /* Group last id should be propagated only if NOACK was + * specified, otherwise the last id would be included + * in XCLAIM. */ + if (noack) + propagate_last_id = 1; } /* Emit a two elements array for each item. The first is @@ -983,7 +987,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end * XGROUP SETID command. So if we find that there is already * a NACK for the entry, we need to associate it to the new * consumer. */ - if (group && !(flags & STREAM_RWR_NOACK)) { + if (group && !noack) { unsigned char buf[sizeof(streamID)]; streamEncodeID(buf,&id); @@ -1020,14 +1024,16 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); decrRefCount(idarg); } - } else { - if (propagate_last_id) - streamPropagateGroupID(c,spi->keyname,group,spi->groupname); } arraylen++; if (count && count == arraylen) break; } + + if (spi && propagate_last_id) { + streamPropagateGroupID(c,spi->keyname,group,spi->groupname); + } + streamIteratorStop(&si); if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); return arraylen; |