diff options
author | Guy Benoish <guy.benoish@redislabs.com> | 2019-12-26 12:19:24 +0530 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2020-03-05 16:26:27 +0100 |
commit | 7f3fcedb8ca14a557074bf679f77a6eef97b9b81 (patch) | |
tree | 630a961f80a7c0d018942bea09c550383b5a295f | |
parent | f93b2fa524cbaab1d8b0ab4d48bae8d4017fecee (diff) | |
download | redis-7f3fcedb8ca14a557074bf679f77a6eef97b9b81.tar.gz |
Blocking XREAD[GROUP] should always reply with valid data (or timeout)
This commit solves the following bug:
127.0.0.1:6379> XGROUP CREATE x grp $ MKSTREAM
OK
127.0.0.1:6379> XADD x 666 f v
"666-0"
127.0.0.1:6379> XREADGROUP GROUP grp Alice BLOCK 0 STREAMS x >
1) 1) "x"
2) 1) 1) "666-0"
2) 1) "f"
2) "v"
127.0.0.1:6379> XADD x 667 f v
"667-0"
127.0.0.1:6379> XDEL x 667
(integer) 1
127.0.0.1:6379> XREADGROUP GROUP grp Alice BLOCK 0 STREAMS x >
1) 1) "x"
2) (empty array)
The root cause is that we use s->last_id in streamCompareID
while we should use the last *valid* ID
-rw-r--r-- | src/t_stream.c | 29 | ||||
-rw-r--r-- | tests/unit/type/stream-cgroups.tcl | 14 | ||||
-rw-r--r-- | tests/unit/type/stream.tcl | 11 |
3 files changed, 44 insertions, 10 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index 5cfab8bf5..ad3e60408 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -791,6 +791,16 @@ int streamDeleteItem(stream *s, streamID *id) { return deleted; } +/* Get the last valid (non-tombstone) streamID of 's'. */ +void streamLastValidID(stream *s, streamID *maxid) +{ + streamIterator si; + streamIteratorStart(&si,s,NULL,NULL,1); + int64_t numfields; + streamIteratorGetID(&si,maxid,&numfields); + streamIteratorStop(&si); +} + /* Emit a reply in the client output buffer by formatting a Stream ID * in the standard <ms>-<seq> format, using the simple string protocol * of REPL. */ @@ -1518,20 +1528,23 @@ void xreadCommand(client *c) { { serve_synchronously = 1; serve_history = 1; - } else { + } else if (s->length) { /* We also want to serve a consumer in a consumer group * synchronously in case the group top item delivered is smaller * than what the stream has inside. */ - streamID *last = &groups[i]->last_id; - if (s->length && (streamCompareID(&s->last_id, last) > 0)) { + streamID maxid, *last = &groups[i]->last_id; + streamLastValidID(s, &maxid); + if (streamCompareID(&maxid, last) > 0) { serve_synchronously = 1; *gt = *last; } } - } else { + } else if (s->length) { /* For consumers without a group, we serve synchronously if we can * actually provide at least one item from the stream. */ - if (s->length && (streamCompareID(&s->last_id, gt) > 0)) { + streamID maxid; + streamLastValidID(s, &maxid); + if (streamCompareID(&maxid, gt) > 0) { serve_synchronously = 1; } } @@ -1880,11 +1893,7 @@ void xsetidCommand(client *c) { * item, otherwise the fundamental ID monotonicity assumption is violated. */ if (s->length > 0) { streamID maxid; - streamIterator si; - streamIteratorStart(&si,s,NULL,NULL,1); - int64_t numfields; - streamIteratorGetID(&si,&maxid,&numfields); - streamIteratorStop(&si); + streamLastValidID(s,&maxid); if (streamCompareID(&id,&maxid) < 0) { addReplyError(c,"The ID specified in XSETID is smaller than the " diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 34d4061c2..a59e168ef 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -147,6 +147,20 @@ start_server { assert {[lindex $res 0 1 1] == {2-0 {field1 B}}} } + test {Blocking XREADGROUP will not reply with an empty array} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream 666 f v + set res [r XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">"] + assert {[lindex $res 0 1 0] == {666-0 {f v}}} + r XADD mystream 667 f2 v2 + r XDEL mystream 667 + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">" + after 20 + assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}} + } + test {XCLAIM can claim PEL items from another consumer} { # Add 3 items into the stream, and create a consumer group r del mystream diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 9840e3b74..5de9f0571 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -191,6 +191,17 @@ start_server { assert {[lindex $res 0 1 0 1] eq {old abcd1234}} } + test {Blocking XREAD will not reply with an empty array} { + r del s1 + r XADD s1 666 f v + r XADD s1 667 f2 v2 + r XDEL s1 667 + set rd [redis_deferring_client] + $rd XREAD BLOCK 10 STREAMS s1 666 + after 20 + assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {s1 {}} + } + test "XREAD: XADD + DEL should not awake client" { set rd [redis_deferring_client] r del s1 |