diff options
Diffstat (limited to 'tests/unit/type/stream-cgroups.tcl')
-rw-r--r-- | tests/unit/type/stream-cgroups.tcl | 197 |
1 files changed, 151 insertions, 46 deletions
diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 91dc2245e..f8de0741d 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -27,7 +27,7 @@ start_server { # and not the element "foo bar" which was pre existing in the # stream (see previous test) set reply [ - r XREADGROUP GROUP mygroup client-1 STREAMS mystream ">" + r XREADGROUP GROUP mygroup consumer-1 STREAMS mystream ">" ] assert {[llength [lindex $reply 0 1]] == 2} lindex $reply 0 1 0 1 @@ -39,13 +39,13 @@ start_server { r XADD mystream * d 4 # Read a few elements using a different consumer name set reply [ - r XREADGROUP GROUP mygroup client-2 STREAMS mystream ">" + r XREADGROUP GROUP mygroup consumer-2 STREAMS mystream ">" ] assert {[llength [lindex $reply 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {c 3}} - set r1 [r XREADGROUP GROUP mygroup client-1 COUNT 10 STREAMS mystream 0] - set r2 [r XREADGROUP GROUP mygroup client-2 COUNT 10 STREAMS mystream 0] + set r1 [r XREADGROUP GROUP mygroup consumer-1 COUNT 10 STREAMS mystream 0] + set r2 [r XREADGROUP GROUP mygroup consumer-2 COUNT 10 STREAMS mystream 0] assert {[lindex $r1 0 1 0 1] eq {a 1}} assert {[lindex $r2 0 1 0 1] eq {c 3}} } @@ -56,9 +56,9 @@ start_server { for {set j 0} {$j < 4} {incr j} { set item [lindex $pending $j] if {$j < 2} { - set owner client-1 + set owner consumer-1 } else { - set owner client-2 + set owner consumer-2 } assert {[lindex $item 1] eq $owner} assert {[lindex $item 1] eq $owner} @@ -66,7 +66,7 @@ start_server { } test {XPENDING can return single consumer items} { - set pending [r XPENDING mystream mygroup - + 10 client-1] + set pending [r XPENDING mystream mygroup - + 10 consumer-1] assert {[llength $pending] == 2} } @@ -77,9 +77,9 @@ start_server { test {XPENDING with IDLE} { after 20 - set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 client-1] + set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 consumer-1] assert {[llength $pending] == 0} - set pending [r XPENDING mystream mygroup IDLE 1 - + 10 client-1] + set pending [r XPENDING mystream mygroup IDLE 1 - + 10 consumer-1] assert {[llength $pending] == 2} set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10] assert {[llength $pending] == 0} @@ -101,12 +101,12 @@ start_server { } } - test {XACK is able to remove items from the client/group PEL} { - set pending [r XPENDING mystream mygroup - + 10 client-1] + test {XACK is able to remove items from the consumer/group PEL} { + set pending [r XPENDING mystream mygroup - + 10 consumer-1] set id1 [lindex $pending 0 0] set id2 [lindex $pending 1 0] assert {[r XACK mystream mygroup $id1] eq 1} - set pending [r XPENDING mystream mygroup - + 10 client-1] + set pending [r XPENDING mystream mygroup - + 10 consumer-1] assert {[llength $pending] == 1} set id [lindex $pending 0 0] assert {$id eq $id2} @@ -242,52 +242,52 @@ start_server { set id3 [r XADD mystream * c 3] r XGROUP CREATE mystream mygroup 0 - # Client 1 reads item 1 from the stream without acknowledgements. - # Client 2 then claims pending item 1 from the PEL of client 1 + # Consumer 1 reads item 1 from the stream without acknowledgements. + # Consumer 2 then claims pending item 1 from the PEL of consumer 1 set reply [ - r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > + r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream > ] assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {a 1}} # make sure the entry is present in both the gorup, and the right consumer assert {[llength [r XPENDING mystream mygroup - + 10]] == 1} - assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 1} - assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 0} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 1} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 0} - r debug sleep 0.2 + after 200 set reply [ - r XCLAIM mystream mygroup client2 10 $id1 + r XCLAIM mystream mygroup consumer2 10 $id1 ] assert {[llength [lindex $reply 0 1]] == 2} assert {[lindex $reply 0 1] eq {a 1}} # make sure the entry is present in both the gorup, and the right consumer assert {[llength [r XPENDING mystream mygroup - + 10]] == 1} - assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 0} - assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 1} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 0} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 1} - # Client 1 reads another 2 items from stream - r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream > - r debug sleep 0.2 + # Consumer 1 reads another 2 items from stream + r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream > + after 200 - # Delete item 2 from the stream. Now client 1 has PEL that contains - # only item 3. Try to use client 2 to claim the deleted item 2 - # from the PEL of client 1, this should return nil + # Delete item 2 from the stream. Now consumer 1 has PEL that contains + # only item 3. Try to use consumer 2 to claim the deleted item 2 + # from the PEL of consumer 1, this should return nil r XDEL mystream $id2 set reply [ - r XCLAIM mystream mygroup client2 10 $id2 + r XCLAIM mystream mygroup consumer2 10 $id2 ] assert {[llength $reply] == 1} assert_equal "" [lindex $reply 0] - # Delete item 3 from the stream. Now client 1 has PEL that is empty. - # Try to use client 2 to claim the deleted item 3 from the PEL - # of client 1, this should return nil - r debug sleep 0.2 + # Delete item 3 from the stream. Now consumer 1 has PEL that is empty. + # Try to use consumer 2 to claim the deleted item 3 from the PEL + # of consumer 1, this should return nil + after 200 r XDEL mystream $id3 set reply [ - r XCLAIM mystream mygroup client2 10 $id3 + r XCLAIM mystream mygroup consumer2 10 $id3 ] assert {[llength $reply] == 1} assert_equal "" [lindex $reply 0] @@ -301,16 +301,16 @@ start_server { set id3 [r XADD mystream * c 3] r XGROUP CREATE mystream mygroup 0 - # Client 1 reads item 1 from the stream without acknowledgements. - # Client 2 then claims pending item 1 from the PEL of client 1 + # Consumer 1 reads item 1 from the stream without acknowledgements. + # Consumer 2 then claims pending item 1 from the PEL of consumer 1 set reply [ - r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > + r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream > ] assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {a 1}} - r debug sleep 0.2 + after 200 set reply [ - r XCLAIM mystream mygroup client2 10 $id1 + r XCLAIM mystream mygroup consumer2 10 $id1 ] assert {[llength [lindex $reply 0 1]] == 2} assert {[lindex $reply 0 1] eq {a 1}} @@ -321,10 +321,10 @@ start_server { assert {[llength [lindex $reply 0]] == 4} assert {[lindex $reply 0 3] == 2} - # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID - r debug sleep 0.2 + # Consumer 3 then claims pending item 1 from the PEL of consumer 2 using JUSTID + after 200 set reply [ - r XCLAIM mystream mygroup client3 10 $id1 JUSTID + r XCLAIM mystream mygroup consumer3 10 $id1 JUSTID ] assert {[llength $reply] == 1} assert {[lindex $reply 0] eq $id1} @@ -344,17 +344,122 @@ start_server { set id3 [r XADD mystream * c 3] r XGROUP CREATE mystream mygroup 0 - set reply [r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >] + set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >] assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {a 1}} - r debug sleep 0.2 + after 200 # re-claim with the same consumer that already has it - assert {[llength [r XCLAIM mystream mygroup client1 10 $id1]] == 1} + assert {[llength [r XCLAIM mystream mygroup consumer1 10 $id1]] == 1} # make sure the entry is still in the PEL set reply [r XPENDING mystream mygroup - + 10] assert {[llength $reply] == 1} - assert {[lindex $reply 0 1] eq {client1}} + assert {[lindex $reply 0 1] eq {consumer1}} + } + + test {XAUTOCLAIM can claim PEL items from another consumer} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + # Consumer 1 reads item 1 from the stream without acknowledgements. + # Consumer 2 then claims pending item 1 from the PEL of consumer 1 + set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >] + assert_equal [llength [lindex $reply 0 1 0 1]] 2 + assert_equal [lindex $reply 0 1 0 1] {a 1} + after 200 + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 1] + assert_equal [llength $reply] 2 + assert_equal [lindex $reply 0] $id1 + assert_equal [llength [lindex $reply 1]] 1 + assert_equal [llength [lindex $reply 1 0]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {a 1} + + # Consumer 1 reads another 2 items from stream + r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream > + + # For min-idle-time + after 200 + + # Delete item 2 from the stream. Now consumer 1 has PEL that contains + # only item 3. Try to use consumer 2 to claim the deleted item 2 + # from the PEL of consumer 1, this should return nil + r XDEL mystream $id2 + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2] + # id1 is self-claimed here but not id2 ('count' was set to 2) + assert_equal [llength $reply] 2 + assert_equal [lindex $reply 0] $id2 + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [llength [lindex $reply 1 0]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {a 1} + assert_equal [lindex $reply 1 1] "" + + # Delete item 3 from the stream. Now consumer 1 has PEL that is empty. + # Try to use consumer 2 to claim the deleted item 3 from the PEL + # of consumer 1, this should return nil + after 200 + r XDEL mystream $id3 + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - JUSTID] + # id1 is self-claimed here but not id2 and id3 ('count' is default 100) + + # we also test the JUSTID modifier here. note that, when using JUSTID, + # deleted entries are returned in reply (consistent with XCLAIM). + + assert_equal [llength $reply] 2 + assert_equal [lindex $reply 0] "0-0" + assert_equal [llength [lindex $reply 1]] 3 + assert_equal [lindex $reply 1 0] $id1 + assert_equal [lindex $reply 1 1] $id2 + assert_equal [lindex $reply 1 2] $id3 + } + + test {XAUTOCLAIM as an iterator} { + # Add 5 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + set id4 [r XADD mystream * d 4] + set id5 [r XADD mystream * e 5] + r XGROUP CREATE mystream mygroup 0 + + # Read 5 messages into consumer1 + r XREADGROUP GROUP mygroup consumer1 count 90 STREAMS mystream > + + # For min-idle-time + after 200 + + # Claim 2 entries + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2] + assert_equal [llength $reply] 2 + set cursor [lindex $reply 0] + assert_equal $cursor $id2 + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {a 1} + + # Claim 2 more entries + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 ($cursor COUNT 2] + assert_equal [llength $reply] 2 + set cursor [lindex $reply 0] + assert_equal $cursor $id4 + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {c 3} + + # Claim last entry + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 ($cursor COUNT 2] + assert_equal [llength $reply] 2 + set cursor [lindex $reply 0] + assert_equal $cursor {0-0} + assert_equal [llength [lindex $reply 1]] 1 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {e 5} } test {XINFO FULL output} { @@ -477,7 +582,7 @@ start_server { assert {$curr_grpinfo == $grpinfo} set n_consumers [lindex $grpinfo 3] - # Bob should be created only when there will be new data for this client + # Bob should be created only when there will be new data for this consumer assert_equal $n_consumers 2 set reply [r xinfo consumers mystream mygroup] set consumer_info [lindex $reply 0] |