diff options
author | guybe7 <guy.benoish@redislabs.com> | 2021-01-06 10:34:27 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-06 10:34:27 +0200 |
commit | 714e103ac317bfa179b0a132c0f78d4ddc84a435 (patch) | |
tree | fbeabf367b4aadc98398cf151a6945034baae290 /tests | |
parent | 595ecd5f4be39eeec71fb07f687b2d6b7cf5c20c (diff) | |
download | redis-714e103ac317bfa179b0a132c0f78d4ddc84a435.tar.gz |
Add XAUTOCLAIM (#7973)
New command: XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID]
The purpose is to claim entries from a stale consumer without the usual
XPENDING+XCLAIM combo which takes two round trips.
The syntax for XAUTOCLAIM is similar to scan: A cursor is returned (streamID)
by each call and should be used as start for the next call. 0-0 means the scan is complete.
This PR extends the deferred reply mechanism for any bulk string (not just counts)
This PR carries some unrelated test code changes:
- Renames the term "client" into "consumer" in the stream-cgroups test
- And also changes DEBUG SLEEP into "after"
Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unit/type/stream-cgroups.tcl | 197 |
1 files changed, 151 insertions, 46 deletions
diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 91dc2245e..f8de0741d 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -27,7 +27,7 @@ start_server { # and not the element "foo bar" which was pre existing in the # stream (see previous test) set reply [ - r XREADGROUP GROUP mygroup client-1 STREAMS mystream ">" + r XREADGROUP GROUP mygroup consumer-1 STREAMS mystream ">" ] assert {[llength [lindex $reply 0 1]] == 2} lindex $reply 0 1 0 1 @@ -39,13 +39,13 @@ start_server { r XADD mystream * d 4 # Read a few elements using a different consumer name set reply [ - r XREADGROUP GROUP mygroup client-2 STREAMS mystream ">" + r XREADGROUP GROUP mygroup consumer-2 STREAMS mystream ">" ] assert {[llength [lindex $reply 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {c 3}} - set r1 [r XREADGROUP GROUP mygroup client-1 COUNT 10 STREAMS mystream 0] - set r2 [r XREADGROUP GROUP mygroup client-2 COUNT 10 STREAMS mystream 0] + set r1 [r XREADGROUP GROUP mygroup consumer-1 COUNT 10 STREAMS mystream 0] + set r2 [r XREADGROUP GROUP mygroup consumer-2 COUNT 10 STREAMS mystream 0] assert {[lindex $r1 0 1 0 1] eq {a 1}} assert {[lindex $r2 0 1 0 1] eq {c 3}} } @@ -56,9 +56,9 @@ start_server { for {set j 0} {$j < 4} {incr j} { set item [lindex $pending $j] if {$j < 2} { - set owner client-1 + set owner consumer-1 } else { - set owner client-2 + set owner consumer-2 } assert {[lindex $item 1] eq $owner} assert {[lindex $item 1] eq $owner} @@ -66,7 +66,7 @@ start_server { } test {XPENDING can return single consumer items} { - set pending [r XPENDING mystream mygroup - + 10 client-1] + set pending [r XPENDING mystream mygroup - + 10 consumer-1] assert {[llength $pending] == 2} } @@ -77,9 +77,9 @@ start_server { test {XPENDING with IDLE} { after 20 - set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 client-1] + set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 consumer-1] assert {[llength $pending] == 0} - set pending [r XPENDING mystream mygroup IDLE 1 - + 10 client-1] + set pending [r XPENDING mystream mygroup IDLE 1 - + 10 consumer-1] assert {[llength $pending] == 2} set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10] assert {[llength $pending] == 0} @@ -101,12 +101,12 @@ start_server { } } - test {XACK is able to remove items from the client/group PEL} { - set pending [r XPENDING mystream mygroup - + 10 client-1] + test {XACK is able to remove items from the consumer/group PEL} { + set pending [r XPENDING mystream mygroup - + 10 consumer-1] set id1 [lindex $pending 0 0] set id2 [lindex $pending 1 0] assert {[r XACK mystream mygroup $id1] eq 1} - set pending [r XPENDING mystream mygroup - + 10 client-1] + set pending [r XPENDING mystream mygroup - + 10 consumer-1] assert {[llength $pending] == 1} set id [lindex $pending 0 0] assert {$id eq $id2} @@ -242,52 +242,52 @@ start_server { set id3 [r XADD mystream * c 3] r XGROUP CREATE mystream mygroup 0 - # Client 1 reads item 1 from the stream without acknowledgements. - # Client 2 then claims pending item 1 from the PEL of client 1 + # Consumer 1 reads item 1 from the stream without acknowledgements. + # Consumer 2 then claims pending item 1 from the PEL of consumer 1 set reply [ - r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > + r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream > ] assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {a 1}} # make sure the entry is present in both the gorup, and the right consumer assert {[llength [r XPENDING mystream mygroup - + 10]] == 1} - assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 1} - assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 0} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 1} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 0} - r debug sleep 0.2 + after 200 set reply [ - r XCLAIM mystream mygroup client2 10 $id1 + r XCLAIM mystream mygroup consumer2 10 $id1 ] assert {[llength [lindex $reply 0 1]] == 2} assert {[lindex $reply 0 1] eq {a 1}} # make sure the entry is present in both the gorup, and the right consumer assert {[llength [r XPENDING mystream mygroup - + 10]] == 1} - assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 0} - assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 1} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 0} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 1} - # Client 1 reads another 2 items from stream - r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream > - r debug sleep 0.2 + # Consumer 1 reads another 2 items from stream + r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream > + after 200 - # Delete item 2 from the stream. Now client 1 has PEL that contains - # only item 3. Try to use client 2 to claim the deleted item 2 - # from the PEL of client 1, this should return nil + # Delete item 2 from the stream. Now consumer 1 has PEL that contains + # only item 3. Try to use consumer 2 to claim the deleted item 2 + # from the PEL of consumer 1, this should return nil r XDEL mystream $id2 set reply [ - r XCLAIM mystream mygroup client2 10 $id2 + r XCLAIM mystream mygroup consumer2 10 $id2 ] assert {[llength $reply] == 1} assert_equal "" [lindex $reply 0] - # Delete item 3 from the stream. Now client 1 has PEL that is empty. - # Try to use client 2 to claim the deleted item 3 from the PEL - # of client 1, this should return nil - r debug sleep 0.2 + # Delete item 3 from the stream. Now consumer 1 has PEL that is empty. + # Try to use consumer 2 to claim the deleted item 3 from the PEL + # of consumer 1, this should return nil + after 200 r XDEL mystream $id3 set reply [ - r XCLAIM mystream mygroup client2 10 $id3 + r XCLAIM mystream mygroup consumer2 10 $id3 ] assert {[llength $reply] == 1} assert_equal "" [lindex $reply 0] @@ -301,16 +301,16 @@ start_server { set id3 [r XADD mystream * c 3] r XGROUP CREATE mystream mygroup 0 - # Client 1 reads item 1 from the stream without acknowledgements. - # Client 2 then claims pending item 1 from the PEL of client 1 + # Consumer 1 reads item 1 from the stream without acknowledgements. + # Consumer 2 then claims pending item 1 from the PEL of consumer 1 set reply [ - r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > + r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream > ] assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {a 1}} - r debug sleep 0.2 + after 200 set reply [ - r XCLAIM mystream mygroup client2 10 $id1 + r XCLAIM mystream mygroup consumer2 10 $id1 ] assert {[llength [lindex $reply 0 1]] == 2} assert {[lindex $reply 0 1] eq {a 1}} @@ -321,10 +321,10 @@ start_server { assert {[llength [lindex $reply 0]] == 4} assert {[lindex $reply 0 3] == 2} - # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID - r debug sleep 0.2 + # Consumer 3 then claims pending item 1 from the PEL of consumer 2 using JUSTID + after 200 set reply [ - r XCLAIM mystream mygroup client3 10 $id1 JUSTID + r XCLAIM mystream mygroup consumer3 10 $id1 JUSTID ] assert {[llength $reply] == 1} assert {[lindex $reply 0] eq $id1} @@ -344,17 +344,122 @@ start_server { set id3 [r XADD mystream * c 3] r XGROUP CREATE mystream mygroup 0 - set reply [r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >] + set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >] assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {a 1}} - r debug sleep 0.2 + after 200 # re-claim with the same consumer that already has it - assert {[llength [r XCLAIM mystream mygroup client1 10 $id1]] == 1} + assert {[llength [r XCLAIM mystream mygroup consumer1 10 $id1]] == 1} # make sure the entry is still in the PEL set reply [r XPENDING mystream mygroup - + 10] assert {[llength $reply] == 1} - assert {[lindex $reply 0 1] eq {client1}} + assert {[lindex $reply 0 1] eq {consumer1}} + } + + test {XAUTOCLAIM can claim PEL items from another consumer} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + # Consumer 1 reads item 1 from the stream without acknowledgements. + # Consumer 2 then claims pending item 1 from the PEL of consumer 1 + set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >] + assert_equal [llength [lindex $reply 0 1 0 1]] 2 + assert_equal [lindex $reply 0 1 0 1] {a 1} + after 200 + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 1] + assert_equal [llength $reply] 2 + assert_equal [lindex $reply 0] $id1 + assert_equal [llength [lindex $reply 1]] 1 + assert_equal [llength [lindex $reply 1 0]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {a 1} + + # Consumer 1 reads another 2 items from stream + r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream > + + # For min-idle-time + after 200 + + # Delete item 2 from the stream. Now consumer 1 has PEL that contains + # only item 3. Try to use consumer 2 to claim the deleted item 2 + # from the PEL of consumer 1, this should return nil + r XDEL mystream $id2 + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2] + # id1 is self-claimed here but not id2 ('count' was set to 2) + assert_equal [llength $reply] 2 + assert_equal [lindex $reply 0] $id2 + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [llength [lindex $reply 1 0]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {a 1} + assert_equal [lindex $reply 1 1] "" + + # Delete item 3 from the stream. Now consumer 1 has PEL that is empty. + # Try to use consumer 2 to claim the deleted item 3 from the PEL + # of consumer 1, this should return nil + after 200 + r XDEL mystream $id3 + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - JUSTID] + # id1 is self-claimed here but not id2 and id3 ('count' is default 100) + + # we also test the JUSTID modifier here. note that, when using JUSTID, + # deleted entries are returned in reply (consistent with XCLAIM). + + assert_equal [llength $reply] 2 + assert_equal [lindex $reply 0] "0-0" + assert_equal [llength [lindex $reply 1]] 3 + assert_equal [lindex $reply 1 0] $id1 + assert_equal [lindex $reply 1 1] $id2 + assert_equal [lindex $reply 1 2] $id3 + } + + test {XAUTOCLAIM as an iterator} { + # Add 5 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + set id4 [r XADD mystream * d 4] + set id5 [r XADD mystream * e 5] + r XGROUP CREATE mystream mygroup 0 + + # Read 5 messages into consumer1 + r XREADGROUP GROUP mygroup consumer1 count 90 STREAMS mystream > + + # For min-idle-time + after 200 + + # Claim 2 entries + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2] + assert_equal [llength $reply] 2 + set cursor [lindex $reply 0] + assert_equal $cursor $id2 + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {a 1} + + # Claim 2 more entries + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 ($cursor COUNT 2] + assert_equal [llength $reply] 2 + set cursor [lindex $reply 0] + assert_equal $cursor $id4 + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {c 3} + + # Claim last entry + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 ($cursor COUNT 2] + assert_equal [llength $reply] 2 + set cursor [lindex $reply 0] + assert_equal $cursor {0-0} + assert_equal [llength [lindex $reply 1]] 1 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {e 5} } test {XINFO FULL output} { @@ -477,7 +582,7 @@ start_server { assert {$curr_grpinfo == $grpinfo} set n_consumers [lindex $grpinfo 3] - # Bob should be created only when there will be new data for this client + # Bob should be created only when there will be new data for this consumer assert_equal $n_consumers 2 set reply [r xinfo consumers mystream mygroup] set consumer_info [lindex $reply 0] |