summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorguybe7 <guy.benoish@redislabs.com>2021-01-06 10:34:27 +0200
committerGitHub <noreply@github.com>2021-01-06 10:34:27 +0200
commit714e103ac317bfa179b0a132c0f78d4ddc84a435 (patch)
treefbeabf367b4aadc98398cf151a6945034baae290 /tests
parent595ecd5f4be39eeec71fb07f687b2d6b7cf5c20c (diff)
downloadredis-714e103ac317bfa179b0a132c0f78d4ddc84a435.tar.gz
Add XAUTOCLAIM (#7973)
New command: XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID] The purpose is to claim entries from a stale consumer without the usual XPENDING+XCLAIM combo which takes two round trips. The syntax for XAUTOCLAIM is similar to scan: A cursor is returned (streamID) by each call and should be used as start for the next call. 0-0 means the scan is complete. This PR extends the deferred reply mechanism for any bulk string (not just counts) This PR carries some unrelated test code changes: - Renames the term "client" into "consumer" in the stream-cgroups test - And also changes DEBUG SLEEP into "after" Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'tests')
-rw-r--r--tests/unit/type/stream-cgroups.tcl197
1 files changed, 151 insertions, 46 deletions
diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl
index 91dc2245e..f8de0741d 100644
--- a/tests/unit/type/stream-cgroups.tcl
+++ b/tests/unit/type/stream-cgroups.tcl
@@ -27,7 +27,7 @@ start_server {
# and not the element "foo bar" which was pre existing in the
# stream (see previous test)
set reply [
- r XREADGROUP GROUP mygroup client-1 STREAMS mystream ">"
+ r XREADGROUP GROUP mygroup consumer-1 STREAMS mystream ">"
]
assert {[llength [lindex $reply 0 1]] == 2}
lindex $reply 0 1 0 1
@@ -39,13 +39,13 @@ start_server {
r XADD mystream * d 4
# Read a few elements using a different consumer name
set reply [
- r XREADGROUP GROUP mygroup client-2 STREAMS mystream ">"
+ r XREADGROUP GROUP mygroup consumer-2 STREAMS mystream ">"
]
assert {[llength [lindex $reply 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {c 3}}
- set r1 [r XREADGROUP GROUP mygroup client-1 COUNT 10 STREAMS mystream 0]
- set r2 [r XREADGROUP GROUP mygroup client-2 COUNT 10 STREAMS mystream 0]
+ set r1 [r XREADGROUP GROUP mygroup consumer-1 COUNT 10 STREAMS mystream 0]
+ set r2 [r XREADGROUP GROUP mygroup consumer-2 COUNT 10 STREAMS mystream 0]
assert {[lindex $r1 0 1 0 1] eq {a 1}}
assert {[lindex $r2 0 1 0 1] eq {c 3}}
}
@@ -56,9 +56,9 @@ start_server {
for {set j 0} {$j < 4} {incr j} {
set item [lindex $pending $j]
if {$j < 2} {
- set owner client-1
+ set owner consumer-1
} else {
- set owner client-2
+ set owner consumer-2
}
assert {[lindex $item 1] eq $owner}
assert {[lindex $item 1] eq $owner}
@@ -66,7 +66,7 @@ start_server {
}
test {XPENDING can return single consumer items} {
- set pending [r XPENDING mystream mygroup - + 10 client-1]
+ set pending [r XPENDING mystream mygroup - + 10 consumer-1]
assert {[llength $pending] == 2}
}
@@ -77,9 +77,9 @@ start_server {
test {XPENDING with IDLE} {
after 20
- set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 client-1]
+ set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 consumer-1]
assert {[llength $pending] == 0}
- set pending [r XPENDING mystream mygroup IDLE 1 - + 10 client-1]
+ set pending [r XPENDING mystream mygroup IDLE 1 - + 10 consumer-1]
assert {[llength $pending] == 2}
set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10]
assert {[llength $pending] == 0}
@@ -101,12 +101,12 @@ start_server {
}
}
- test {XACK is able to remove items from the client/group PEL} {
- set pending [r XPENDING mystream mygroup - + 10 client-1]
+ test {XACK is able to remove items from the consumer/group PEL} {
+ set pending [r XPENDING mystream mygroup - + 10 consumer-1]
set id1 [lindex $pending 0 0]
set id2 [lindex $pending 1 0]
assert {[r XACK mystream mygroup $id1] eq 1}
- set pending [r XPENDING mystream mygroup - + 10 client-1]
+ set pending [r XPENDING mystream mygroup - + 10 consumer-1]
assert {[llength $pending] == 1}
set id [lindex $pending 0 0]
assert {$id eq $id2}
@@ -242,52 +242,52 @@ start_server {
set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0
- # Client 1 reads item 1 from the stream without acknowledgements.
- # Client 2 then claims pending item 1 from the PEL of client 1
+ # Consumer 1 reads item 1 from the stream without acknowledgements.
+ # Consumer 2 then claims pending item 1 from the PEL of consumer 1
set reply [
- r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
+ r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >
]
assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}}
# make sure the entry is present in both the gorup, and the right consumer
assert {[llength [r XPENDING mystream mygroup - + 10]] == 1}
- assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 1}
- assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 0}
+ assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 1}
+ assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 0}
- r debug sleep 0.2
+ after 200
set reply [
- r XCLAIM mystream mygroup client2 10 $id1
+ r XCLAIM mystream mygroup consumer2 10 $id1
]
assert {[llength [lindex $reply 0 1]] == 2}
assert {[lindex $reply 0 1] eq {a 1}}
# make sure the entry is present in both the gorup, and the right consumer
assert {[llength [r XPENDING mystream mygroup - + 10]] == 1}
- assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 0}
- assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 1}
+ assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 0}
+ assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 1}
- # Client 1 reads another 2 items from stream
- r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream >
- r debug sleep 0.2
+ # Consumer 1 reads another 2 items from stream
+ r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream >
+ after 200
- # Delete item 2 from the stream. Now client 1 has PEL that contains
- # only item 3. Try to use client 2 to claim the deleted item 2
- # from the PEL of client 1, this should return nil
+ # Delete item 2 from the stream. Now consumer 1 has PEL that contains
+ # only item 3. Try to use consumer 2 to claim the deleted item 2
+ # from the PEL of consumer 1, this should return nil
r XDEL mystream $id2
set reply [
- r XCLAIM mystream mygroup client2 10 $id2
+ r XCLAIM mystream mygroup consumer2 10 $id2
]
assert {[llength $reply] == 1}
assert_equal "" [lindex $reply 0]
- # Delete item 3 from the stream. Now client 1 has PEL that is empty.
- # Try to use client 2 to claim the deleted item 3 from the PEL
- # of client 1, this should return nil
- r debug sleep 0.2
+ # Delete item 3 from the stream. Now consumer 1 has PEL that is empty.
+ # Try to use consumer 2 to claim the deleted item 3 from the PEL
+ # of consumer 1, this should return nil
+ after 200
r XDEL mystream $id3
set reply [
- r XCLAIM mystream mygroup client2 10 $id3
+ r XCLAIM mystream mygroup consumer2 10 $id3
]
assert {[llength $reply] == 1}
assert_equal "" [lindex $reply 0]
@@ -301,16 +301,16 @@ start_server {
set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0
- # Client 1 reads item 1 from the stream without acknowledgements.
- # Client 2 then claims pending item 1 from the PEL of client 1
+ # Consumer 1 reads item 1 from the stream without acknowledgements.
+ # Consumer 2 then claims pending item 1 from the PEL of consumer 1
set reply [
- r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
+ r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >
]
assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}}
- r debug sleep 0.2
+ after 200
set reply [
- r XCLAIM mystream mygroup client2 10 $id1
+ r XCLAIM mystream mygroup consumer2 10 $id1
]
assert {[llength [lindex $reply 0 1]] == 2}
assert {[lindex $reply 0 1] eq {a 1}}
@@ -321,10 +321,10 @@ start_server {
assert {[llength [lindex $reply 0]] == 4}
assert {[lindex $reply 0 3] == 2}
- # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID
- r debug sleep 0.2
+ # Consumer 3 then claims pending item 1 from the PEL of consumer 2 using JUSTID
+ after 200
set reply [
- r XCLAIM mystream mygroup client3 10 $id1 JUSTID
+ r XCLAIM mystream mygroup consumer3 10 $id1 JUSTID
]
assert {[llength $reply] == 1}
assert {[lindex $reply 0] eq $id1}
@@ -344,17 +344,122 @@ start_server {
set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0
- set reply [r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >]
+ set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >]
assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}}
- r debug sleep 0.2
+ after 200
# re-claim with the same consumer that already has it
- assert {[llength [r XCLAIM mystream mygroup client1 10 $id1]] == 1}
+ assert {[llength [r XCLAIM mystream mygroup consumer1 10 $id1]] == 1}
# make sure the entry is still in the PEL
set reply [r XPENDING mystream mygroup - + 10]
assert {[llength $reply] == 1}
- assert {[lindex $reply 0 1] eq {client1}}
+ assert {[lindex $reply 0 1] eq {consumer1}}
+ }
+
+ test {XAUTOCLAIM can claim PEL items from another consumer} {
+ # Add 3 items into the stream, and create a consumer group
+ r del mystream
+ set id1 [r XADD mystream * a 1]
+ set id2 [r XADD mystream * b 2]
+ set id3 [r XADD mystream * c 3]
+ r XGROUP CREATE mystream mygroup 0
+
+ # Consumer 1 reads item 1 from the stream without acknowledgements.
+ # Consumer 2 then claims pending item 1 from the PEL of consumer 1
+ set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >]
+ assert_equal [llength [lindex $reply 0 1 0 1]] 2
+ assert_equal [lindex $reply 0 1 0 1] {a 1}
+ after 200
+ set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 1]
+ assert_equal [llength $reply] 2
+ assert_equal [lindex $reply 0] $id1
+ assert_equal [llength [lindex $reply 1]] 1
+ assert_equal [llength [lindex $reply 1 0]] 2
+ assert_equal [llength [lindex $reply 1 0 1]] 2
+ assert_equal [lindex $reply 1 0 1] {a 1}
+
+ # Consumer 1 reads another 2 items from stream
+ r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream >
+
+ # For min-idle-time
+ after 200
+
+ # Delete item 2 from the stream. Now consumer 1 has PEL that contains
+ # only item 3. Try to use consumer 2 to claim the deleted item 2
+ # from the PEL of consumer 1, this should return nil
+ r XDEL mystream $id2
+ set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2]
+ # id1 is self-claimed here but not id2 ('count' was set to 2)
+ assert_equal [llength $reply] 2
+ assert_equal [lindex $reply 0] $id2
+ assert_equal [llength [lindex $reply 1]] 2
+ assert_equal [llength [lindex $reply 1 0]] 2
+ assert_equal [llength [lindex $reply 1 0 1]] 2
+ assert_equal [lindex $reply 1 0 1] {a 1}
+ assert_equal [lindex $reply 1 1] ""
+
+ # Delete item 3 from the stream. Now consumer 1 has PEL that is empty.
+ # Try to use consumer 2 to claim the deleted item 3 from the PEL
+ # of consumer 1, this should return nil
+ after 200
+ r XDEL mystream $id3
+ set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - JUSTID]
+ # id1 is self-claimed here but not id2 and id3 ('count' is default 100)
+
+ # we also test the JUSTID modifier here. note that, when using JUSTID,
+ # deleted entries are returned in reply (consistent with XCLAIM).
+
+ assert_equal [llength $reply] 2
+ assert_equal [lindex $reply 0] "0-0"
+ assert_equal [llength [lindex $reply 1]] 3
+ assert_equal [lindex $reply 1 0] $id1
+ assert_equal [lindex $reply 1 1] $id2
+ assert_equal [lindex $reply 1 2] $id3
+ }
+
+ test {XAUTOCLAIM as an iterator} {
+ # Add 5 items into the stream, and create a consumer group
+ r del mystream
+ set id1 [r XADD mystream * a 1]
+ set id2 [r XADD mystream * b 2]
+ set id3 [r XADD mystream * c 3]
+ set id4 [r XADD mystream * d 4]
+ set id5 [r XADD mystream * e 5]
+ r XGROUP CREATE mystream mygroup 0
+
+ # Read 5 messages into consumer1
+ r XREADGROUP GROUP mygroup consumer1 count 90 STREAMS mystream >
+
+ # For min-idle-time
+ after 200
+
+ # Claim 2 entries
+ set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2]
+ assert_equal [llength $reply] 2
+ set cursor [lindex $reply 0]
+ assert_equal $cursor $id2
+ assert_equal [llength [lindex $reply 1]] 2
+ assert_equal [llength [lindex $reply 1 0 1]] 2
+ assert_equal [lindex $reply 1 0 1] {a 1}
+
+ # Claim 2 more entries
+ set reply [r XAUTOCLAIM mystream mygroup consumer2 10 ($cursor COUNT 2]
+ assert_equal [llength $reply] 2
+ set cursor [lindex $reply 0]
+ assert_equal $cursor $id4
+ assert_equal [llength [lindex $reply 1]] 2
+ assert_equal [llength [lindex $reply 1 0 1]] 2
+ assert_equal [lindex $reply 1 0 1] {c 3}
+
+ # Claim last entry
+ set reply [r XAUTOCLAIM mystream mygroup consumer2 10 ($cursor COUNT 2]
+ assert_equal [llength $reply] 2
+ set cursor [lindex $reply 0]
+ assert_equal $cursor {0-0}
+ assert_equal [llength [lindex $reply 1]] 1
+ assert_equal [llength [lindex $reply 1 0 1]] 2
+ assert_equal [lindex $reply 1 0 1] {e 5}
}
test {XINFO FULL output} {
@@ -477,7 +582,7 @@ start_server {
assert {$curr_grpinfo == $grpinfo}
set n_consumers [lindex $grpinfo 3]
- # Bob should be created only when there will be new data for this client
+ # Bob should be created only when there will be new data for this consumer
assert_equal $n_consumers 2
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]