summaryrefslogtreecommitdiff
path: root/src/db.c
diff options
context:
space:
mode:
authorHuang Zhw <huang_zhw@126.com>2022-11-30 17:56:36 +0800
committerGitHub <noreply@github.com>2022-11-30 11:56:36 +0200
commitc81813148b71cd4be686402ef69f628f67dbb8c4 (patch)
tree54a5f6bc0701c97db66cb1cebe85fecfa3181429 /src/db.c
parent7dfd7b9197bbe216912049eebecbda3f1684925e (diff)
downloadredis-c81813148b71cd4be686402ef69f628f67dbb8c4.tar.gz
Add a special notification unlink available only for modules (#9406)
Add a new module event `RedisModule_Event_Key`, this event is fired when a key is removed from the keyspace. The event includes an open key that can be used for reading the key before it is removed. Modules can also extract the key-name, and use RM_Open or RM_Call to access key from within that event, but shouldn't modify anything from within this event. The following sub events are available: - `REDISMODULE_SUBEVENT_KEY_DELETED` - `REDISMODULE_SUBEVENT_KEY_EXPIRED` - `REDISMODULE_SUBEVENT_KEY_EVICTED` - `REDISMODULE_SUBEVENT_KEY_OVERWRITE` The data pointer can be casted to a RedisModuleKeyInfo structure with the following fields: ``` RedisModuleKey *key; // Opened Key ``` ### internals * We also add two dict functions: `dictTwoPhaseUnlinkFind` finds an element from the table, also get the plink of the entry. The entry is returned if the element is found. The user should later call `dictTwoPhaseUnlinkFree` with it in order to unlink and release it. Otherwise if the key is not found, NULL is returned. These two functions should be used in pair. `dictTwoPhaseUnlinkFind` pauses rehash and `dictTwoPhaseUnlinkFree` resumes rehash. * We change `dbOverwrite` to `dbReplaceValue` which just replaces the value of the key and doesn't fire any events. The "overwrite" part (which emits events) is just when called from `setKey`, the other places that called dbOverwrite were ones that just update the value in-place (INCR*, SPOP, and dbUnshareStringValue). This should not have any real impact since `moduleNotifyKeyUnlink` and `signalDeletedKeyAsReady` wouldn't have mattered in these cases anyway (i.e. module keys and stream keys didn't have direct calls to dbOverwrite) * since we allow doing RM_OpenKey from withing these callbacks, we temporarily disable lazy expiry. * We also temporarily disable lazy expiry when we are in unlink/unlink2 callback and keyspace notification callback. * Move special definitions to the top of redismodule.h This is needed to resolve compilation errors with RedisModuleKeyInfoV1 that carries a RedisModuleKey member. Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/db.c')
-rw-r--r--src/db.c77
1 files changed, 53 insertions, 24 deletions
diff --git a/src/db.c b/src/db.c
index 5ebd21e4b..265650126 100644
--- a/src/db.c
+++ b/src/db.c
@@ -218,8 +218,13 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
* count of the new value is up to the caller.
* This function does not modify the expire time of the existing key.
*
+ * The 'overwrite' flag is an indication whether this is done as part of a
+ * complete replacement of their key, which can be thought as a deletion and
+ * replacement (in which case we need to emit deletion signals), or just an
+ * update of a value of an existing key (when false).
+ *
* The program is aborted if the key was not already present. */
-void dbOverwrite(redisDb *db, robj *key, robj *val) {
+static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite) {
dictEntry *de = dictFind(db->dict,key->ptr);
serverAssertWithInfo(NULL,key,de != NULL);
@@ -228,12 +233,22 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
val->lru = old->lru;
}
- /* Although the key is not really deleted from the database, we regard
- * overwrite as two steps of unlink+add, so we still need to call the unlink
- * callback of the module. */
- moduleNotifyKeyUnlink(key,old,db->id);
- /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
- signalDeletedKeyAsReady(db,key,old->type);
+ if (overwrite) {
+ /* RM_StringDMA may call dbUnshareStringValue which may free val, so we
+ * need to incr to retain old */
+ incrRefCount(old);
+ /* Although the key is not really deleted from the database, we regard
+ * overwrite as two steps of unlink+add, so we still need to call the unlink
+ * callback of the module. */
+ moduleNotifyKeyUnlink(key,old,db->id,DB_FLAG_KEY_OVERWRITE);
+ /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
+ signalDeletedKeyAsReady(db,key,old->type);
+ decrRefCount(old);
+ /* Because of RM_StringDMA, old may be changed, so we need get old again */
+ old = dictGetVal(de);
+ /* Entry in auxentry may be changed, so we need update auxentry */
+ auxentry = *de;
+ }
dictSetVal(db->dict, de, val);
if (server.lazyfree_lazy_server_del) {
@@ -244,6 +259,12 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
dictFreeVal(db->dict, &auxentry);
}
+/* Replace an existing key with a new value, we just replace value and don't
+ * emit any events */
+void dbReplaceValue(redisDb *db, robj *key, robj *val) {
+ dbSetValue(db, key, val, 0);
+}
+
/* High level Set operation. This function can be used in order to set
* a key, whatever it was existing or not, to a new object.
*
@@ -268,7 +289,7 @@ void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) {
if (!keyfound) {
dbAdd(db,key,val);
} else {
- dbOverwrite(db,key,val);
+ dbSetValue(db,key,val,1);
}
incrRefCount(val);
if (!(flags & SETKEY_KEEPTTL)) removeExpire(db,key);
@@ -315,23 +336,33 @@ robj *dbRandomKey(redisDb *db) {
}
/* Helper for sync and async delete. */
-static int dbGenericDelete(redisDb *db, robj *key, int async) {
- /* Deleting an entry from the expires dict will not free the sds of
- * the key, because it is shared with the main dictionary. */
- if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
- dictEntry *de = dictUnlink(db->dict,key->ptr);
+int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
+ dictEntry **plink;
+ int table;
+ dictEntry *de = dictTwoPhaseUnlinkFind(db->dict,key->ptr,&plink,&table);
if (de) {
robj *val = dictGetVal(de);
+ /* RM_StringDMA may call dbUnshareStringValue which may free val, so we
+ * need to incr to retain val */
+ incrRefCount(val);
/* Tells the module that the key has been unlinked from the database. */
- moduleNotifyKeyUnlink(key,val,db->id);
+ moduleNotifyKeyUnlink(key,val,db->id,flags);
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
signalDeletedKeyAsReady(db,key,val->type);
+ /* We should call decr before freeObjAsync. If not, the refcount may be
+ * greater than 1, so freeObjAsync doesn't work */
+ decrRefCount(val);
if (async) {
- freeObjAsync(key, val, db->id);
+ /* Because of dbUnshareStringValue, the val in de may change. */
+ freeObjAsync(key, dictGetVal(de), db->id);
dictSetVal(db->dict, de, NULL);
}
if (server.cluster_enabled) slotToKeyDelEntry(de, db);
- dictFreeUnlinkedEntry(db->dict,de);
+
+ /* Deleting an entry from the expires dict will not free the sds of
+ * the key, because it is shared with the main dictionary. */
+ if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
+ dictTwoPhaseUnlinkFree(db->dict,de,plink,table);
return 1;
} else {
return 0;
@@ -340,19 +371,19 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) {
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
- return dbGenericDelete(db, key, 0);
+ return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED);
}
/* Delete a key, value, and associated expiration entry if any, from the DB. If
* the value consists of many allocations, it may be freed asynchronously. */
int dbAsyncDelete(redisDb *db, robj *key) {
- return dbGenericDelete(db, key, 1);
+ return dbGenericDelete(db, key, 1, DB_FLAG_KEY_DELETED);
}
/* This is a wrapper whose behavior depends on the Redis lazy free
* configuration. Deletes the key synchronously or asynchronously. */
int dbDelete(redisDb *db, robj *key) {
- return dbGenericDelete(db, key, server.lazyfree_lazy_server_del);
+ return dbGenericDelete(db, key, server.lazyfree_lazy_server_del, DB_FLAG_KEY_DELETED);
}
/* Prepare the string object stored at 'key' to be modified destructively
@@ -388,7 +419,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
robj *decoded = getDecodedObject(o);
o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
- dbOverwrite(db,key,o);
+ dbReplaceValue(db,key,o);
}
return o;
}
@@ -1561,10 +1592,7 @@ long long getExpire(redisDb *db, robj *key) {
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
mstime_t expire_latency;
latencyStartMonitor(expire_latency);
- if (server.lazyfree_lazy_expire)
- dbAsyncDelete(db,keyobj);
- else
- dbSyncDelete(db,keyobj);
+ dbGenericDelete(db,keyobj,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED);
latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del",expire_latency);
notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
@@ -1657,6 +1685,7 @@ int keyIsExpired(redisDb *db, robj *key) {
* The return value of the function is 0 if the key is still valid,
* otherwise the function returns 1 if the key is expired. */
int expireIfNeeded(redisDb *db, robj *key, int flags) {
+ if (server.lazy_expire_disabled) return 0;
if (!keyIsExpired(db,key)) return 0;
/* If we are running in the context of a replica, instead of