summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuy Benoish <guy.benoish@redislabs.com>2019-12-26 12:19:24 +0530
committerantirez <antirez@gmail.com>2020-03-05 16:26:27 +0100
commit7f3fcedb8ca14a557074bf679f77a6eef97b9b81 (patch)
tree630a961f80a7c0d018942bea09c550383b5a295f
parentf93b2fa524cbaab1d8b0ab4d48bae8d4017fecee (diff)
downloadredis-7f3fcedb8ca14a557074bf679f77a6eef97b9b81.tar.gz
Blocking XREAD[GROUP] should always reply with valid data (or timeout)
This commit solves the following bug: 127.0.0.1:6379> XGROUP CREATE x grp $ MKSTREAM OK 127.0.0.1:6379> XADD x 666 f v "666-0" 127.0.0.1:6379> XREADGROUP GROUP grp Alice BLOCK 0 STREAMS x > 1) 1) "x" 2) 1) 1) "666-0" 2) 1) "f" 2) "v" 127.0.0.1:6379> XADD x 667 f v "667-0" 127.0.0.1:6379> XDEL x 667 (integer) 1 127.0.0.1:6379> XREADGROUP GROUP grp Alice BLOCK 0 STREAMS x > 1) 1) "x" 2) (empty array) The root cause is that we use s->last_id in streamCompareID while we should use the last *valid* ID
-rw-r--r--src/t_stream.c29
-rw-r--r--tests/unit/type/stream-cgroups.tcl14
-rw-r--r--tests/unit/type/stream.tcl11
3 files changed, 44 insertions, 10 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 5cfab8bf5..ad3e60408 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -791,6 +791,16 @@ int streamDeleteItem(stream *s, streamID *id) {
return deleted;
}
+/* Get the last valid (non-tombstone) streamID of 's'. */
+void streamLastValidID(stream *s, streamID *maxid)
+{
+ streamIterator si;
+ streamIteratorStart(&si,s,NULL,NULL,1);
+ int64_t numfields;
+ streamIteratorGetID(&si,maxid,&numfields);
+ streamIteratorStop(&si);
+}
+
/* Emit a reply in the client output buffer by formatting a Stream ID
* in the standard <ms>-<seq> format, using the simple string protocol
* of REPL. */
@@ -1518,20 +1528,23 @@ void xreadCommand(client *c) {
{
serve_synchronously = 1;
serve_history = 1;
- } else {
+ } else if (s->length) {
/* We also want to serve a consumer in a consumer group
* synchronously in case the group top item delivered is smaller
* than what the stream has inside. */
- streamID *last = &groups[i]->last_id;
- if (s->length && (streamCompareID(&s->last_id, last) > 0)) {
+ streamID maxid, *last = &groups[i]->last_id;
+ streamLastValidID(s, &maxid);
+ if (streamCompareID(&maxid, last) > 0) {
serve_synchronously = 1;
*gt = *last;
}
}
- } else {
+ } else if (s->length) {
/* For consumers without a group, we serve synchronously if we can
* actually provide at least one item from the stream. */
- if (s->length && (streamCompareID(&s->last_id, gt) > 0)) {
+ streamID maxid;
+ streamLastValidID(s, &maxid);
+ if (streamCompareID(&maxid, gt) > 0) {
serve_synchronously = 1;
}
}
@@ -1880,11 +1893,7 @@ void xsetidCommand(client *c) {
* item, otherwise the fundamental ID monotonicity assumption is violated. */
if (s->length > 0) {
streamID maxid;
- streamIterator si;
- streamIteratorStart(&si,s,NULL,NULL,1);
- int64_t numfields;
- streamIteratorGetID(&si,&maxid,&numfields);
- streamIteratorStop(&si);
+ streamLastValidID(s,&maxid);
if (streamCompareID(&id,&maxid) < 0) {
addReplyError(c,"The ID specified in XSETID is smaller than the "
diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl
index 34d4061c2..a59e168ef 100644
--- a/tests/unit/type/stream-cgroups.tcl
+++ b/tests/unit/type/stream-cgroups.tcl
@@ -147,6 +147,20 @@ start_server {
assert {[lindex $res 0 1 1] == {2-0 {field1 B}}}
}
+ test {Blocking XREADGROUP will not reply with an empty array} {
+ r del mystream
+ r XGROUP CREATE mystream mygroup $ MKSTREAM
+ r XADD mystream 666 f v
+ set res [r XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">"]
+ assert {[lindex $res 0 1 0] == {666-0 {f v}}}
+ r XADD mystream 667 f2 v2
+ r XDEL mystream 667
+ set rd [redis_deferring_client]
+ $rd XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">"
+ after 20
+ assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}}
+ }
+
test {XCLAIM can claim PEL items from another consumer} {
# Add 3 items into the stream, and create a consumer group
r del mystream
diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl
index 9840e3b74..5de9f0571 100644
--- a/tests/unit/type/stream.tcl
+++ b/tests/unit/type/stream.tcl
@@ -191,6 +191,17 @@ start_server {
assert {[lindex $res 0 1 0 1] eq {old abcd1234}}
}
+ test {Blocking XREAD will not reply with an empty array} {
+ r del s1
+ r XADD s1 666 f v
+ r XADD s1 667 f2 v2
+ r XDEL s1 667
+ set rd [redis_deferring_client]
+ $rd XREAD BLOCK 10 STREAMS s1 666
+ after 20
+ assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {s1 {}}
+ }
+
test "XREAD: XADD + DEL should not awake client" {
set rd [redis_deferring_client]
r del s1