diff options
author | guybe7 <guy.benoish@redislabs.com> | 2022-10-18 18:50:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-18 19:50:02 +0300 |
commit | b57fd01064428ab388c9d9038a617a52488a447b (patch) | |
tree | 82bcfcf7355292dcdbd8da8d56b149cedb10da56 /src/db.c | |
parent | b43f254813025e3deea6ef65126ea2bad49af857 (diff) | |
download | redis-b57fd01064428ab388c9d9038a617a52488a447b.tar.gz |
Blocked module clients should be aware when a key is deleted (#11310)
The use case is a module that wants to implement a blocking command on a key that
necessarily exists and wants to unblock the client in case the key is deleted (much like
what we implemented for XREADGROUP in #10306)
New module API:
* RedisModule_BlockClientOnKeysWithFlags
Flags:
* REDISMODULE_BLOCK_UNBLOCK_NONE
* REDISMODULE_BLOCK_UNBLOCK_DELETED
### Detailed description of code changes
blocked.c:
1. Both module and stream functions are called whether the key exists or not, regardless of
its type. We do that in order to allow modules/stream to unblock the client in case the key
is no longer present or has changed type (the behavior for streams didn't change, just code
that moved into serveClientsBlockedOnStreamKey)
2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in order to propagate
actions from moduleTryServeClientBlockedOnKey.
3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after lookupKeyReadWithFlags
to prevent a possible lazy-expire DEL from being mixed with any command propagated by the
preceding functions.
4. blockForKeys: Caller can specifiy that it wants to be awakened if key is deleted.
Minor optimizations (use dictAddRaw).
5. signalKeyAsReady became signalKeyAsReadyLogic which can take a boolean in case the key is deleted.
It will only signal if there's at least one client that awaits key deletion (to save calls to
handleClientsBlockedOnKeys).
Minor optimizations (use dictAddRaw)
db.c:
1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will signalKeyAsReady
for any key that was removed from the database or changed type. It is the responsibility of the code
in blocked.c to ignore or act on deleted/type-changed keys.
2. Use the new signalDeletedKeyAsReady where needed
blockedonkey.c + tcl:
1. Added test of new capabilities (FSL.BPOPGT now requires the key to exist in order to work)
Diffstat (limited to 'src/db.c')
-rw-r--r-- | src/db.c | 38 |
1 files changed, 18 insertions, 20 deletions
@@ -232,9 +232,8 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { * overwrite as two steps of unlink+add, so we still need to call the unlink * callback of the module. */ moduleNotifyKeyUnlink(key,old,db->id); - /* We want to try to unblock any client using a blocking XREADGROUP */ - if (old->type == OBJ_STREAM) - signalKeyAsReady(db,key,old->type); + /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ + signalDeletedKeyAsReady(db,key,old->type); dictSetVal(db->dict, de, val); if (server.lazyfree_lazy_server_del) { @@ -325,9 +324,8 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) { robj *val = dictGetVal(de); /* Tells the module that the key has been unlinked from the database. */ moduleNotifyKeyUnlink(key,val,db->id); - /* We want to try to unblock any client using a blocking XREADGROUP */ - if (val->type == OBJ_STREAM) - signalKeyAsReady(db,key,val->type); + /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ + signalDeletedKeyAsReady(db,key,val->type); if (async) { freeObjAsync(key, val, db->id); dictSetVal(db->dict, de, NULL); @@ -568,7 +566,7 @@ void signalFlushedDb(int dbid, int async) { } for (int j = startdb; j <= enddb; j++) { - scanDatabaseForDeletedStreams(&server.db[j], NULL); + scanDatabaseForDeletedKeys(&server.db[j], NULL); touchAllWatchedKeysInDb(&server.db[j], NULL); } @@ -1350,32 +1348,32 @@ void scanDatabaseForReadyKeys(redisDb *db) { /* Since we are unblocking XREADGROUP clients in the event the * key was deleted/overwritten we must do the same in case the * database was flushed/swapped. */ -void scanDatabaseForDeletedStreams(redisDb *emptied, redisDb *replaced_with) { - /* Optimization: If no clients are in type BLOCKED_STREAM, - * we can skip this loop. */ - if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; - +void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) { dictEntry *de; dictIterator *di = dictGetSafeIterator(emptied->blocking_keys); while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); - int was_stream = 0, is_stream = 0; + int existed = 0, exists = 0; + int original_type = -1, curr_type = -1; dictEntry *kde = dictFind(emptied->dict, key->ptr); if (kde) { robj *value = dictGetVal(kde); - was_stream = value->type == OBJ_STREAM; + original_type = value->type; + existed = 1; } + if (replaced_with) { dictEntry *kde = dictFind(replaced_with->dict, key->ptr); if (kde) { robj *value = dictGetVal(kde); - is_stream = value->type == OBJ_STREAM; + curr_type = value->type; + exists = 1; } } /* We want to try to unblock any client using a blocking XREADGROUP */ - if (was_stream && !is_stream) - signalKeyAsReady(emptied, key, OBJ_STREAM); + if ((existed && !exists) || original_type != curr_type) + signalDeletedKeyAsReady(emptied, key, original_type); } dictReleaseIterator(di); } @@ -1401,8 +1399,8 @@ int dbSwapDatabases(int id1, int id2) { touchAllWatchedKeysInDb(db2, db1); /* Try to unblock any XREADGROUP clients if the key no longer exists. */ - scanDatabaseForDeletedStreams(db1, db2); - scanDatabaseForDeletedStreams(db2, db1); + scanDatabaseForDeletedKeys(db1, db2); + scanDatabaseForDeletedKeys(db2, db1); /* Swap hash tables. Note that we don't swap blocking_keys, * ready_keys and watched_keys, since we want clients to @@ -1451,7 +1449,7 @@ void swapMainDbWithTempDb(redisDb *tempDb) { touchAllWatchedKeysInDb(activedb, newdb); /* Try to unblock any XREADGROUP clients if the key no longer exists. */ - scanDatabaseForDeletedStreams(activedb, newdb); + scanDatabaseForDeletedKeys(activedb, newdb); /* Swap hash tables. Note that we don't swap blocking_keys, * ready_keys and watched_keys, since clients |