diff options
Diffstat (limited to 'src/db.c')
-rw-r--r-- | src/db.c | 77 |
1 files changed, 53 insertions, 24 deletions
@@ -218,8 +218,13 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) { * count of the new value is up to the caller. * This function does not modify the expire time of the existing key. * + * The 'overwrite' flag is an indication whether this is done as part of a + * complete replacement of their key, which can be thought as a deletion and + * replacement (in which case we need to emit deletion signals), or just an + * update of a value of an existing key (when false). + * * The program is aborted if the key was not already present. */ -void dbOverwrite(redisDb *db, robj *key, robj *val) { +static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite) { dictEntry *de = dictFind(db->dict,key->ptr); serverAssertWithInfo(NULL,key,de != NULL); @@ -228,12 +233,22 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { val->lru = old->lru; } - /* Although the key is not really deleted from the database, we regard - * overwrite as two steps of unlink+add, so we still need to call the unlink - * callback of the module. */ - moduleNotifyKeyUnlink(key,old,db->id); - /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ - signalDeletedKeyAsReady(db,key,old->type); + if (overwrite) { + /* RM_StringDMA may call dbUnshareStringValue which may free val, so we + * need to incr to retain old */ + incrRefCount(old); + /* Although the key is not really deleted from the database, we regard + * overwrite as two steps of unlink+add, so we still need to call the unlink + * callback of the module. */ + moduleNotifyKeyUnlink(key,old,db->id,DB_FLAG_KEY_OVERWRITE); + /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ + signalDeletedKeyAsReady(db,key,old->type); + decrRefCount(old); + /* Because of RM_StringDMA, old may be changed, so we need get old again */ + old = dictGetVal(de); + /* Entry in auxentry may be changed, so we need update auxentry */ + auxentry = *de; + } dictSetVal(db->dict, de, val); if (server.lazyfree_lazy_server_del) { @@ -244,6 +259,12 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { dictFreeVal(db->dict, &auxentry); } +/* Replace an existing key with a new value, we just replace value and don't + * emit any events */ +void dbReplaceValue(redisDb *db, robj *key, robj *val) { + dbSetValue(db, key, val, 0); +} + /* High level Set operation. This function can be used in order to set * a key, whatever it was existing or not, to a new object. * @@ -268,7 +289,7 @@ void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) { if (!keyfound) { dbAdd(db,key,val); } else { - dbOverwrite(db,key,val); + dbSetValue(db,key,val,1); } incrRefCount(val); if (!(flags & SETKEY_KEEPTTL)) removeExpire(db,key); @@ -315,23 +336,33 @@ robj *dbRandomKey(redisDb *db) { } /* Helper for sync and async delete. */ -static int dbGenericDelete(redisDb *db, robj *key, int async) { - /* Deleting an entry from the expires dict will not free the sds of - * the key, because it is shared with the main dictionary. */ - if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); - dictEntry *de = dictUnlink(db->dict,key->ptr); +int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { + dictEntry **plink; + int table; + dictEntry *de = dictTwoPhaseUnlinkFind(db->dict,key->ptr,&plink,&table); if (de) { robj *val = dictGetVal(de); + /* RM_StringDMA may call dbUnshareStringValue which may free val, so we + * need to incr to retain val */ + incrRefCount(val); /* Tells the module that the key has been unlinked from the database. */ - moduleNotifyKeyUnlink(key,val,db->id); + moduleNotifyKeyUnlink(key,val,db->id,flags); /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ signalDeletedKeyAsReady(db,key,val->type); + /* We should call decr before freeObjAsync. If not, the refcount may be + * greater than 1, so freeObjAsync doesn't work */ + decrRefCount(val); if (async) { - freeObjAsync(key, val, db->id); + /* Because of dbUnshareStringValue, the val in de may change. */ + freeObjAsync(key, dictGetVal(de), db->id); dictSetVal(db->dict, de, NULL); } if (server.cluster_enabled) slotToKeyDelEntry(de, db); - dictFreeUnlinkedEntry(db->dict,de); + + /* Deleting an entry from the expires dict will not free the sds of + * the key, because it is shared with the main dictionary. */ + if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); + dictTwoPhaseUnlinkFree(db->dict,de,plink,table); return 1; } else { return 0; @@ -340,19 +371,19 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) { /* Delete a key, value, and associated expiration entry if any, from the DB */ int dbSyncDelete(redisDb *db, robj *key) { - return dbGenericDelete(db, key, 0); + return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED); } /* Delete a key, value, and associated expiration entry if any, from the DB. If * the value consists of many allocations, it may be freed asynchronously. */ int dbAsyncDelete(redisDb *db, robj *key) { - return dbGenericDelete(db, key, 1); + return dbGenericDelete(db, key, 1, DB_FLAG_KEY_DELETED); } /* This is a wrapper whose behavior depends on the Redis lazy free * configuration. Deletes the key synchronously or asynchronously. */ int dbDelete(redisDb *db, robj *key) { - return dbGenericDelete(db, key, server.lazyfree_lazy_server_del); + return dbGenericDelete(db, key, server.lazyfree_lazy_server_del, DB_FLAG_KEY_DELETED); } /* Prepare the string object stored at 'key' to be modified destructively @@ -388,7 +419,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { robj *decoded = getDecodedObject(o); o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr)); decrRefCount(decoded); - dbOverwrite(db,key,o); + dbReplaceValue(db,key,o); } return o; } @@ -1561,10 +1592,7 @@ long long getExpire(redisDb *db, robj *key) { void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) { mstime_t expire_latency; latencyStartMonitor(expire_latency); - if (server.lazyfree_lazy_expire) - dbAsyncDelete(db,keyobj); - else - dbSyncDelete(db,keyobj); + dbGenericDelete(db,keyobj,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED); latencyEndMonitor(expire_latency); latencyAddSampleIfNeeded("expire-del",expire_latency); notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id); @@ -1657,6 +1685,7 @@ int keyIsExpired(redisDb *db, robj *key) { * The return value of the function is 0 if the key is still valid, * otherwise the function returns 1 if the key is expired. */ int expireIfNeeded(redisDb *db, robj *key, int flags) { + if (server.lazy_expire_disabled) return 0; if (!keyIsExpired(db,key)) return 0; /* If we are running in the context of a replica, instead of |