diff options
author | guybe7 <guy.benoish@redislabs.com> | 2022-10-18 18:50:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-18 19:50:02 +0300 |
commit | b57fd01064428ab388c9d9038a617a52488a447b (patch) | |
tree | 82bcfcf7355292dcdbd8da8d56b149cedb10da56 /tests | |
parent | b43f254813025e3deea6ef65126ea2bad49af857 (diff) | |
download | redis-b57fd01064428ab388c9d9038a617a52488a447b.tar.gz |
Blocked module clients should be aware when a key is deleted (#11310)
The use case is a module that wants to implement a blocking command on a key that
necessarily exists and wants to unblock the client in case the key is deleted (much like
what we implemented for XREADGROUP in #10306)
New module API:
* RedisModule_BlockClientOnKeysWithFlags
Flags:
* REDISMODULE_BLOCK_UNBLOCK_NONE
* REDISMODULE_BLOCK_UNBLOCK_DELETED
### Detailed description of code changes
blocked.c:
1. Both module and stream functions are called whether the key exists or not, regardless of
its type. We do that in order to allow modules/stream to unblock the client in case the key
is no longer present or has changed type (the behavior for streams didn't change, just code
that moved into serveClientsBlockedOnStreamKey)
2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in order to propagate
actions from moduleTryServeClientBlockedOnKey.
3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after lookupKeyReadWithFlags
to prevent a possible lazy-expire DEL from being mixed with any command propagated by the
preceding functions.
4. blockForKeys: Caller can specifiy that it wants to be awakened if key is deleted.
Minor optimizations (use dictAddRaw).
5. signalKeyAsReady became signalKeyAsReadyLogic which can take a boolean in case the key is deleted.
It will only signal if there's at least one client that awaits key deletion (to save calls to
handleClientsBlockedOnKeys).
Minor optimizations (use dictAddRaw)
db.c:
1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will signalKeyAsReady
for any key that was removed from the database or changed type. It is the responsibility of the code
in blocked.c to ignore or act on deleted/type-changed keys.
2. Use the new signalDeletedKeyAsReady where needed
blockedonkey.c + tcl:
1. Added test of new capabilities (FSL.BPOPGT now requires the key to exist in order to work)
Diffstat (limited to 'tests')
-rw-r--r-- | tests/modules/blockonkeys.c | 17 | ||||
-rw-r--r-- | tests/unit/moduleapi/blockonkeys.tcl | 115 |
2 files changed, 127 insertions, 5 deletions
diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c index 1aa576489..9b6c5e60b 100644 --- a/tests/modules/blockonkeys.c +++ b/tests/modules/blockonkeys.c @@ -5,6 +5,8 @@ #include <assert.h> #include <unistd.h> +#define UNUSED(V) ((void) V) + #define LIST_SIZE 1024 typedef struct { @@ -174,7 +176,7 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg fsl_t *fsl; if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl) - return REDISMODULE_ERR; + return RedisModule_ReplyWithError(ctx,"UNBLOCKED key no longer exists"); if (fsl->list[fsl->length-1] <= *pgt) return REDISMODULE_ERR; @@ -212,12 +214,17 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) return REDISMODULE_OK; - if (!fsl || fsl->list[fsl->length-1] <= gt) { + if (!fsl) + return RedisModule_ReplyWithError(ctx,"ERR key must exist"); + + if (fsl->list[fsl->length-1] <= gt) { /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */ long long *pgt = RedisModule_Alloc(sizeof(long long)); *pgt = gt; - RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback, - bpopgt_free_privdata, timeout, &argv[1], 1, pgt); + RedisModule_BlockClientOnKeysWithFlags( + ctx, bpopgt_reply_callback, bpopgt_timeout_callback, + bpopgt_free_privdata, timeout, &argv[1], 1, pgt, + REDISMODULE_BLOCK_UNBLOCK_DELETED); } else { RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); } @@ -469,7 +476,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) .aof_rewrite = fsl_aofrw, .mem_usage = NULL, .free = fsl_free, - .digest = NULL + .digest = NULL, }; fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm); diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl index 094bcc0c0..50b130ba2 100644 --- a/tests/unit/moduleapi/blockonkeys.tcl +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -106,6 +106,7 @@ start_server {tags {"modules"}} { test {Module client blocked on keys (with metadata): Blocked, case 2} { r del k + r fsl.push k 32 set rd [redis_deferring_client] $rd fsl.bpopgt k 35 0 ;# wait until clients are actually blocked @@ -121,8 +122,120 @@ start_server {tags {"modules"}} { assert_equal {36} [$rd read] } + test {Module client blocked on keys (with metadata): Blocked, DEL} { + r del k + r fsl.push k 32 + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r del k + assert_error {*UNBLOCKED key no longer exists*} {$rd read} + } + + test {Module client blocked on keys (with metadata): Blocked, FLUSHALL} { + r del k + r fsl.push k 32 + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r flushall + assert_error {*UNBLOCKED key no longer exists*} {$rd read} + } + + test {Module client blocked on keys (with metadata): Blocked, SWAPDB, no key} { + r select 9 + r del k + r fsl.push k 32 + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r swapdb 0 9 + assert_error {*UNBLOCKED key no longer exists*} {$rd read} + } + + test {Module client blocked on keys (with metadata): Blocked, SWAPDB, key exists, case 1} { + ;# Key exists on other db, but wrong type + r flushall + r select 9 + r fsl.push k 32 + r select 0 + r lpush k 38 + r select 9 + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r swapdb 0 9 + assert_error {*UNBLOCKED key no longer exists*} {$rd read} + r select 9 + } + + test {Module client blocked on keys (with metadata): Blocked, SWAPDB, key exists, case 2} { + ;# Key exists on other db, with the right type, but the value doesn't allow to unblock + r flushall + r select 9 + r fsl.push k 32 + r select 0 + r fsl.push k 34 + r select 9 + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r swapdb 0 9 + assert_equal {1} [s 0 blocked_clients] + r fsl.push k 38 + assert_equal {38} [$rd read] + r select 9 + } + + test {Module client blocked on keys (with metadata): Blocked, SWAPDB, key exists, case 3} { + ;# Key exists on other db, with the right type, the value allows to unblock + r flushall + r select 9 + r fsl.push k 32 + r select 0 + r fsl.push k 38 + r select 9 + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r swapdb 0 9 + assert_equal {38} [$rd read] + r select 9 + } + test {Module client blocked on keys (with metadata): Blocked, CLIENT KILL} { r del k + r fsl.push k 32 set rd [redis_deferring_client] $rd client id set cid [$rd read] @@ -138,6 +251,7 @@ start_server {tags {"modules"}} { test {Module client blocked on keys (with metadata): Blocked, CLIENT UNBLOCK TIMEOUT} { r del k + r fsl.push k 32 set rd [redis_deferring_client] $rd client id set cid [$rd read] @@ -154,6 +268,7 @@ start_server {tags {"modules"}} { test {Module client blocked on keys (with metadata): Blocked, CLIENT UNBLOCK ERROR} { r del k + r fsl.push k 32 set rd [redis_deferring_client] $rd client id set cid [$rd read] |