summaryrefslogtreecommitdiff
path: root/src/db.c
diff options
context:
space:
mode:
authorguybe7 <guy.benoish@redislabs.com>2022-10-18 18:50:02 +0200
committerGitHub <noreply@github.com>2022-10-18 19:50:02 +0300
commitb57fd01064428ab388c9d9038a617a52488a447b (patch)
tree82bcfcf7355292dcdbd8da8d56b149cedb10da56 /src/db.c
parentb43f254813025e3deea6ef65126ea2bad49af857 (diff)
downloadredis-b57fd01064428ab388c9d9038a617a52488a447b.tar.gz
Blocked module clients should be aware when a key is deleted (#11310)
The use case is a module that wants to implement a blocking command on a key that necessarily exists and wants to unblock the client in case the key is deleted (much like what we implemented for XREADGROUP in #10306) New module API: * RedisModule_BlockClientOnKeysWithFlags Flags: * REDISMODULE_BLOCK_UNBLOCK_NONE * REDISMODULE_BLOCK_UNBLOCK_DELETED ### Detailed description of code changes blocked.c: 1. Both module and stream functions are called whether the key exists or not, regardless of its type. We do that in order to allow modules/stream to unblock the client in case the key is no longer present or has changed type (the behavior for streams didn't change, just code that moved into serveClientsBlockedOnStreamKey) 2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in order to propagate actions from moduleTryServeClientBlockedOnKey. 3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after lookupKeyReadWithFlags to prevent a possible lazy-expire DEL from being mixed with any command propagated by the preceding functions. 4. blockForKeys: Caller can specifiy that it wants to be awakened if key is deleted. Minor optimizations (use dictAddRaw). 5. signalKeyAsReady became signalKeyAsReadyLogic which can take a boolean in case the key is deleted. It will only signal if there's at least one client that awaits key deletion (to save calls to handleClientsBlockedOnKeys). Minor optimizations (use dictAddRaw) db.c: 1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will signalKeyAsReady for any key that was removed from the database or changed type. It is the responsibility of the code in blocked.c to ignore or act on deleted/type-changed keys. 2. Use the new signalDeletedKeyAsReady where needed blockedonkey.c + tcl: 1. Added test of new capabilities (FSL.BPOPGT now requires the key to exist in order to work)
Diffstat (limited to 'src/db.c')
-rw-r--r--src/db.c38
1 files changed, 18 insertions, 20 deletions
diff --git a/src/db.c b/src/db.c
index 63705cd01..58febe0d9 100644
--- a/src/db.c
+++ b/src/db.c
@@ -232,9 +232,8 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
* overwrite as two steps of unlink+add, so we still need to call the unlink
* callback of the module. */
moduleNotifyKeyUnlink(key,old,db->id);
- /* We want to try to unblock any client using a blocking XREADGROUP */
- if (old->type == OBJ_STREAM)
- signalKeyAsReady(db,key,old->type);
+ /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
+ signalDeletedKeyAsReady(db,key,old->type);
dictSetVal(db->dict, de, val);
if (server.lazyfree_lazy_server_del) {
@@ -325,9 +324,8 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) {
robj *val = dictGetVal(de);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val,db->id);
- /* We want to try to unblock any client using a blocking XREADGROUP */
- if (val->type == OBJ_STREAM)
- signalKeyAsReady(db,key,val->type);
+ /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
+ signalDeletedKeyAsReady(db,key,val->type);
if (async) {
freeObjAsync(key, val, db->id);
dictSetVal(db->dict, de, NULL);
@@ -568,7 +566,7 @@ void signalFlushedDb(int dbid, int async) {
}
for (int j = startdb; j <= enddb; j++) {
- scanDatabaseForDeletedStreams(&server.db[j], NULL);
+ scanDatabaseForDeletedKeys(&server.db[j], NULL);
touchAllWatchedKeysInDb(&server.db[j], NULL);
}
@@ -1350,32 +1348,32 @@ void scanDatabaseForReadyKeys(redisDb *db) {
/* Since we are unblocking XREADGROUP clients in the event the
* key was deleted/overwritten we must do the same in case the
* database was flushed/swapped. */
-void scanDatabaseForDeletedStreams(redisDb *emptied, redisDb *replaced_with) {
- /* Optimization: If no clients are in type BLOCKED_STREAM,
- * we can skip this loop. */
- if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
-
+void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) {
dictEntry *de;
dictIterator *di = dictGetSafeIterator(emptied->blocking_keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
- int was_stream = 0, is_stream = 0;
+ int existed = 0, exists = 0;
+ int original_type = -1, curr_type = -1;
dictEntry *kde = dictFind(emptied->dict, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
- was_stream = value->type == OBJ_STREAM;
+ original_type = value->type;
+ existed = 1;
}
+
if (replaced_with) {
dictEntry *kde = dictFind(replaced_with->dict, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
- is_stream = value->type == OBJ_STREAM;
+ curr_type = value->type;
+ exists = 1;
}
}
/* We want to try to unblock any client using a blocking XREADGROUP */
- if (was_stream && !is_stream)
- signalKeyAsReady(emptied, key, OBJ_STREAM);
+ if ((existed && !exists) || original_type != curr_type)
+ signalDeletedKeyAsReady(emptied, key, original_type);
}
dictReleaseIterator(di);
}
@@ -1401,8 +1399,8 @@ int dbSwapDatabases(int id1, int id2) {
touchAllWatchedKeysInDb(db2, db1);
/* Try to unblock any XREADGROUP clients if the key no longer exists. */
- scanDatabaseForDeletedStreams(db1, db2);
- scanDatabaseForDeletedStreams(db2, db1);
+ scanDatabaseForDeletedKeys(db1, db2);
+ scanDatabaseForDeletedKeys(db2, db1);
/* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since we want clients to
@@ -1451,7 +1449,7 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
touchAllWatchedKeysInDb(activedb, newdb);
/* Try to unblock any XREADGROUP clients if the key no longer exists. */
- scanDatabaseForDeletedStreams(activedb, newdb);
+ scanDatabaseForDeletedKeys(activedb, newdb);
/* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since clients