summaryrefslogtreecommitdiff
path: root/tests/integration
diff options
context:
space:
mode:
authorItamar Haber <itamar@redis.com>2022-02-23 22:34:58 +0200
committerGitHub <noreply@github.com>2022-02-23 22:34:58 +0200
commitc81c7f51c38de6dff5ffc55b5184061b84c7ea5f (patch)
treec0a397c1a69990cfbef8b9766472c489f1f14f7e /tests/integration
parentb857928ba7b1d40ef96de7b94bf115e2e59e3075 (diff)
downloadredis-c81c7f51c38de6dff5ffc55b5184061b84c7ea5f.tar.gz
Add stream consumer group lag tracking and reporting (#9127)
Adds the ability to track the lag of a consumer group (CG), that is, the number of entries yet-to-be-delivered from the stream. The proposed constant-time solution is in the spirit of "best-effort." Partially addresses #8737. ## Description of approach We add a new "entries_added" property to the stream. This starts at 0 for a new stream and is incremented by 1 with every `XADD`. It is essentially an all-time counter of the entries added to the stream. Given the stream's length and this counter value, we can trivially find the logical "entries_added" counter of the first ID if and only if the stream is contiguous. A fragmented stream contains one or more tombstones generated by `XDEL`s. The new "xdel_max_id" stream property tracks the latest tombstone. The CG also tracks its last delivered ID's as an "entries_read" counter and increments it independently when delivering new messages, unless the this read counter is invalid (-1 means invalid offset). When the CG's counter is available, the reported lag is the difference between added and read counters. Lastly, this also adds a "first_id" field to the stream structure in order to make looking it up cheaper in most cases. ## Limitations There are two cases in which the mechanism isn't able to track the lag. In these cases, `XINFO` replies with `null` in the "lag" field. The first case is when a CG is created with an arbitrary last delivered ID, that isn't "0-0", nor the first or the last entries of the stream. In this case, it is impossible to obtain a valid read counter (short of an O(N) operation). The second case is when there are one or more tombstones fragmenting the stream's entries range. In both cases, given enough time and assuming that the consumers are active (reading and lacking) and advancing, the CG should be able to catch up with the tip of the stream and report zero lag. Once that's achieved, lag tracking would resume as normal (until the next tombstone is set). ## API changes * `XGROUP CREATE` added with the optional named argument `[ENTRIESREAD entries-read]` for explicitly specifying the new CG's counter. * `XGROUP SETID` added with an optional positional argument `[ENTRIESREAD entries-read]` for specifying the CG's counter. * `XINFO` reports the maximal tombstone ID, the recorded first entry ID, and total number of entries added to the stream. * `XINFO` reports the current lag and logical read counter of CGs. * `XSETID` is an internal command that's used in replication/aof. It has been added with the optional positional arguments `[ENTRIESADDED entries-added] [MAXDELETEDID max-deleted-entry-id]` for propagating the CG's offset and maximal tombstone ID of the stream. ## The generic unsolved problem The current stream implementation doesn't provide an efficient way to obtain the approximate/exact size of a range of entries. While it could've been nice to have that ability (#5813) in general, let alone specifically in the context of CGs, the risk and complexities involved in such implementation are in all likelihood prohibitive. ## A refactoring note The `streamGetEdgeID` has been refactored to accommodate both the existing seek of any entry as well as seeking non-deleted entries (the addition of the `skip_tombstones` argument). Furthermore, this refactoring also migrated the seek logic to use the `streamIterator` (rather than `raxIterator`) that was, in turn, extended with the `skip_tombstones` Boolean struct field to control the emission of these. Co-authored-by: Guy Benoish <guy.benoish@redislabs.com> Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'tests/integration')
-rw-r--r--tests/integration/corrupt-dump.tcl7
1 files changed, 3 insertions, 4 deletions
diff --git a/tests/integration/corrupt-dump.tcl b/tests/integration/corrupt-dump.tcl
index 86c7dd246..d2491306a 100644
--- a/tests/integration/corrupt-dump.tcl
+++ b/tests/integration/corrupt-dump.tcl
@@ -193,9 +193,8 @@ test {corrupt payload: listpack invalid size header} {
test {corrupt payload: listpack too long entry len} {
start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] {
r config set sanitize-dump-payload no
- r restore key 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x40\x55\x55\x00\x00\x00\x0F\x00\x01\x01\x00\x01\x02\x01\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x00\x01\x00\x01\x00\x01\x00\x01\x02\x02\x89\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x61\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x88\x62\x00\x00\x00\x00\x00\x00\x00\x09\x08\x01\xFF\x0A\x01\x00\x00\x09\x00\x40\x63\xC9\x37\x03\xA2\xE5\x68"
catch {
- r xinfo stream key full
+ r restore key 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x40\x55\x55\x00\x00\x00\x0F\x00\x01\x01\x00\x01\x02\x01\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x00\x01\x00\x01\x00\x01\x00\x01\x02\x02\x89\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x61\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x88\x62\x00\x00\x00\x00\x00\x00\x00\x09\x08\x01\xFF\x0A\x01\x00\x00\x09\x00\x40\x63\xC9\x37\x03\xA2\xE5\x68"
} err
assert_equal [count_log_message 0 "crashed by signal"] 0
assert_equal [count_log_message 0 "ASSERTION FAILED"] 1
@@ -205,9 +204,9 @@ test {corrupt payload: listpack too long entry len} {
test {corrupt payload: listpack very long entry len} {
start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] {
r config set sanitize-dump-payload no
- r restore key 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x40\x55\x55\x00\x00\x00\x0F\x00\x01\x01\x00\x01\x02\x01\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x00\x01\x00\x01\x00\x01\x00\x01\x02\x02\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x61\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x9C\x62\x00\x00\x00\x00\x00\x00\x00\x09\x08\x01\xFF\x0A\x01\x00\x00\x09\x00\x63\x6F\x42\x8E\x7C\xB5\xA2\x9D"
catch {
- r xinfo stream key full
+ # This will catch migrated payloads from v6.2.x
+ r restore key 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x40\x55\x55\x00\x00\x00\x0F\x00\x01\x01\x00\x01\x02\x01\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x00\x01\x00\x01\x00\x01\x00\x01\x02\x02\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x61\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x9C\x62\x00\x00\x00\x00\x00\x00\x00\x09\x08\x01\xFF\x0A\x01\x00\x00\x09\x00\x63\x6F\x42\x8E\x7C\xB5\xA2\x9D"
} err
assert_equal [count_log_message 0 "crashed by signal"] 0
assert_equal [count_log_message 0 "ASSERTION FAILED"] 1