diff options
author | Huang Zhw <huang_zhw@126.com> | 2022-11-30 17:56:36 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-30 11:56:36 +0200 |
commit | c81813148b71cd4be686402ef69f628f67dbb8c4 (patch) | |
tree | 54a5f6bc0701c97db66cb1cebe85fecfa3181429 /src/db.c | |
parent | 7dfd7b9197bbe216912049eebecbda3f1684925e (diff) | |
download | redis-c81813148b71cd4be686402ef69f628f67dbb8c4.tar.gz |
Add a special notification unlink available only for modules (#9406)
Add a new module event `RedisModule_Event_Key`, this event is fired
when a key is removed from the keyspace.
The event includes an open key that can be used for reading the key before
it is removed. Modules can also extract the key-name, and use RM_Open
or RM_Call to access key from within that event, but shouldn't modify anything
from within this event.
The following sub events are available:
- `REDISMODULE_SUBEVENT_KEY_DELETED`
- `REDISMODULE_SUBEVENT_KEY_EXPIRED`
- `REDISMODULE_SUBEVENT_KEY_EVICTED`
- `REDISMODULE_SUBEVENT_KEY_OVERWRITE`
The data pointer can be casted to a RedisModuleKeyInfo structure
with the following fields:
```
RedisModuleKey *key; // Opened Key
```
### internals
* We also add two dict functions:
`dictTwoPhaseUnlinkFind` finds an element from the table, also get the plink of the entry.
The entry is returned if the element is found. The user should later call `dictTwoPhaseUnlinkFree`
with it in order to unlink and release it. Otherwise if the key is not found, NULL is returned.
These two functions should be used in pair. `dictTwoPhaseUnlinkFind` pauses rehash and
`dictTwoPhaseUnlinkFree` resumes rehash.
* We change `dbOverwrite` to `dbReplaceValue` which just replaces the value of the key and
doesn't fire any events. The "overwrite" part (which emits events) is just when called from `setKey`,
the other places that called dbOverwrite were ones that just update the value in-place (INCR*, SPOP,
and dbUnshareStringValue). This should not have any real impact since `moduleNotifyKeyUnlink` and
`signalDeletedKeyAsReady` wouldn't have mattered in these cases anyway (i.e. module keys and
stream keys didn't have direct calls to dbOverwrite)
* since we allow doing RM_OpenKey from withing these callbacks, we temporarily disable lazy expiry.
* We also temporarily disable lazy expiry when we are in unlink/unlink2 callback and keyspace
notification callback.
* Move special definitions to the top of redismodule.h
This is needed to resolve compilation errors with RedisModuleKeyInfoV1
that carries a RedisModuleKey member.
Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/db.c')
-rw-r--r-- | src/db.c | 77 |
1 files changed, 53 insertions, 24 deletions
@@ -218,8 +218,13 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) { * count of the new value is up to the caller. * This function does not modify the expire time of the existing key. * + * The 'overwrite' flag is an indication whether this is done as part of a + * complete replacement of their key, which can be thought as a deletion and + * replacement (in which case we need to emit deletion signals), or just an + * update of a value of an existing key (when false). + * * The program is aborted if the key was not already present. */ -void dbOverwrite(redisDb *db, robj *key, robj *val) { +static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite) { dictEntry *de = dictFind(db->dict,key->ptr); serverAssertWithInfo(NULL,key,de != NULL); @@ -228,12 +233,22 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { val->lru = old->lru; } - /* Although the key is not really deleted from the database, we regard - * overwrite as two steps of unlink+add, so we still need to call the unlink - * callback of the module. */ - moduleNotifyKeyUnlink(key,old,db->id); - /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ - signalDeletedKeyAsReady(db,key,old->type); + if (overwrite) { + /* RM_StringDMA may call dbUnshareStringValue which may free val, so we + * need to incr to retain old */ + incrRefCount(old); + /* Although the key is not really deleted from the database, we regard + * overwrite as two steps of unlink+add, so we still need to call the unlink + * callback of the module. */ + moduleNotifyKeyUnlink(key,old,db->id,DB_FLAG_KEY_OVERWRITE); + /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ + signalDeletedKeyAsReady(db,key,old->type); + decrRefCount(old); + /* Because of RM_StringDMA, old may be changed, so we need get old again */ + old = dictGetVal(de); + /* Entry in auxentry may be changed, so we need update auxentry */ + auxentry = *de; + } dictSetVal(db->dict, de, val); if (server.lazyfree_lazy_server_del) { @@ -244,6 +259,12 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { dictFreeVal(db->dict, &auxentry); } +/* Replace an existing key with a new value, we just replace value and don't + * emit any events */ +void dbReplaceValue(redisDb *db, robj *key, robj *val) { + dbSetValue(db, key, val, 0); +} + /* High level Set operation. This function can be used in order to set * a key, whatever it was existing or not, to a new object. * @@ -268,7 +289,7 @@ void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) { if (!keyfound) { dbAdd(db,key,val); } else { - dbOverwrite(db,key,val); + dbSetValue(db,key,val,1); } incrRefCount(val); if (!(flags & SETKEY_KEEPTTL)) removeExpire(db,key); @@ -315,23 +336,33 @@ robj *dbRandomKey(redisDb *db) { } /* Helper for sync and async delete. */ -static int dbGenericDelete(redisDb *db, robj *key, int async) { - /* Deleting an entry from the expires dict will not free the sds of - * the key, because it is shared with the main dictionary. */ - if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); - dictEntry *de = dictUnlink(db->dict,key->ptr); +int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { + dictEntry **plink; + int table; + dictEntry *de = dictTwoPhaseUnlinkFind(db->dict,key->ptr,&plink,&table); if (de) { robj *val = dictGetVal(de); + /* RM_StringDMA may call dbUnshareStringValue which may free val, so we + * need to incr to retain val */ + incrRefCount(val); /* Tells the module that the key has been unlinked from the database. */ - moduleNotifyKeyUnlink(key,val,db->id); + moduleNotifyKeyUnlink(key,val,db->id,flags); /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ signalDeletedKeyAsReady(db,key,val->type); + /* We should call decr before freeObjAsync. If not, the refcount may be + * greater than 1, so freeObjAsync doesn't work */ + decrRefCount(val); if (async) { - freeObjAsync(key, val, db->id); + /* Because of dbUnshareStringValue, the val in de may change. */ + freeObjAsync(key, dictGetVal(de), db->id); dictSetVal(db->dict, de, NULL); } if (server.cluster_enabled) slotToKeyDelEntry(de, db); - dictFreeUnlinkedEntry(db->dict,de); + + /* Deleting an entry from the expires dict will not free the sds of + * the key, because it is shared with the main dictionary. */ + if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); + dictTwoPhaseUnlinkFree(db->dict,de,plink,table); return 1; } else { return 0; @@ -340,19 +371,19 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) { /* Delete a key, value, and associated expiration entry if any, from the DB */ int dbSyncDelete(redisDb *db, robj *key) { - return dbGenericDelete(db, key, 0); + return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED); } /* Delete a key, value, and associated expiration entry if any, from the DB. If * the value consists of many allocations, it may be freed asynchronously. */ int dbAsyncDelete(redisDb *db, robj *key) { - return dbGenericDelete(db, key, 1); + return dbGenericDelete(db, key, 1, DB_FLAG_KEY_DELETED); } /* This is a wrapper whose behavior depends on the Redis lazy free * configuration. Deletes the key synchronously or asynchronously. */ int dbDelete(redisDb *db, robj *key) { - return dbGenericDelete(db, key, server.lazyfree_lazy_server_del); + return dbGenericDelete(db, key, server.lazyfree_lazy_server_del, DB_FLAG_KEY_DELETED); } /* Prepare the string object stored at 'key' to be modified destructively @@ -388,7 +419,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { robj *decoded = getDecodedObject(o); o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr)); decrRefCount(decoded); - dbOverwrite(db,key,o); + dbReplaceValue(db,key,o); } return o; } @@ -1561,10 +1592,7 @@ long long getExpire(redisDb *db, robj *key) { void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) { mstime_t expire_latency; latencyStartMonitor(expire_latency); - if (server.lazyfree_lazy_expire) - dbAsyncDelete(db,keyobj); - else - dbSyncDelete(db,keyobj); + dbGenericDelete(db,keyobj,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED); latencyEndMonitor(expire_latency); latencyAddSampleIfNeeded("expire-del",expire_latency); notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id); @@ -1657,6 +1685,7 @@ int keyIsExpired(redisDb *db, robj *key) { * The return value of the function is 0 if the key is still valid, * otherwise the function returns 1 if the key is expired. */ int expireIfNeeded(redisDb *db, robj *key, int flags) { + if (server.lazy_expire_disabled) return 0; if (!keyIsExpired(db,key)) return 0; /* If we are running in the context of a replica, instead of |