diff options
author | antirez <antirez@gmail.com> | 2018-06-15 13:14:57 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2019-06-18 12:31:10 +0200 |
commit | e9bb30fd859ed4e9e3e6434207dedbc251086858 (patch) | |
tree | 0ac8972fefe6911dee8c7376e14c611fcc105e1c /src/db.c | |
parent | fd0ee469ab165d0e005e9fe1fca1c4f5c604cd56 (diff) | |
download | redis-new-keyspace.tar.gz |
Experimental: new keyspace and expire algorithm.new-keyspace
This is an alpha quality implementation of a new keyspace representation
and a new expire algorithm for Redis.
This work is described here:
https://gist.github.com/antirez/b2eb293819666ee104c7fcad71986eb7
Diffstat (limited to 'src/db.c')
-rw-r--r-- | src/db.c | 334 |
1 files changed, 122 insertions, 212 deletions
@@ -38,24 +38,26 @@ * C-level DB API *----------------------------------------------------------------------------*/ -int keyIsExpired(redisDb *db, robj *key); +int keyIsExpired(rkey *key); /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. * Then logarithmically increment the counter, and update the access time. */ -void updateLFU(robj *val) { - unsigned long counter = LFUDecrAndReturn(val); +void updateLFU(rkey *key) { + unsigned long counter = LFUDecrAndReturn(key); counter = LFULogIncr(counter); - val->lru = (LFUGetTimeInMinutes()<<8) | counter; + key->lru = (LFUGetTimeInMinutes()<<8) | counter; } /* Low level key lookup API, not actually called directly from commands * implementations that should instead rely on lookupKeyRead(), * lookupKeyWrite() and lookupKeyReadWithFlags(). */ -robj *lookupKey(redisDb *db, robj *key, int flags) { - dictEntry *de = dictFind(db->dict,key->ptr); +robj *lookupKey(redisDb *db, robj *keyname, rkey **keyptr, int flags) { + dictEntry *de = dictFind(db->dict,keyname->ptr); if (de) { + rkey *key = dictGetKey(de); robj *val = dictGetVal(de); + if (keyptr) *keyptr = key; /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger @@ -65,9 +67,9 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { !(flags & LOOKUP_NOTOUCH)) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { - updateLFU(val); + updateLFU(key); } else { - val->lru = LRU_CLOCK(); + key->lru = LRU_CLOCK(); } } return val; @@ -98,16 +100,18 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { * for read operations. Even if the key expiry is master-driven, we can * correctly report a key is expired on slaves even if the master is lagging * expiring our key via DELs in the replication link. */ -robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { +robj *lookupKeyReadWithFlags(redisDb *db, robj *keyname, rkey **keyptr, int flags) { robj *val; + rkey *keyobj; - if (expireIfNeeded(db,key) == 1) { + val = lookupKey(db,keyname,&keyobj,flags); + if (val && expireIfNeeded(db,keyname,keyobj) == 1) { /* Key expired. If we are in the context of a master, expireIfNeeded() * returns 0 only when the key does not exist at all, so it's safe * to return NULL ASAP. */ if (server.masterhost == NULL) { server.stat_keyspace_misses++; - notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", keyname, db->id); return NULL; } @@ -129,24 +133,24 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { server.current_client->cmd->flags & CMD_READONLY) { server.stat_keyspace_misses++; - notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", keyname, db->id); return NULL; } } - val = lookupKey(db,key,flags); if (val == NULL) { server.stat_keyspace_misses++; - notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", keyname, db->id); } else server.stat_keyspace_hits++; + if (keyptr) *keyptr = keyobj; return val; } /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the * common case. */ -robj *lookupKeyRead(redisDb *db, robj *key) { - return lookupKeyReadWithFlags(db,key,LOOKUP_NONE); +robj *lookupKeyRead(redisDb *db, robj *keyname, rkey **keyptr) { + return lookupKeyReadWithFlags(db,keyname,keyptr,LOOKUP_NONE); } /* Lookup a key for write operations, and as a side effect, if needed, expires @@ -154,36 +158,64 @@ robj *lookupKeyRead(redisDb *db, robj *key) { * * Returns the linked value object if the key exists or NULL if the key * does not exist in the specified DB. */ -robj *lookupKeyWrite(redisDb *db, robj *key) { - expireIfNeeded(db,key); - return lookupKey(db,key,LOOKUP_NONE); +robj *lookupKeyWrite(redisDb *db, robj *keyname, rkey **keyptr) { + rkey *key; + robj *val = lookupKey(db,keyname,&key,LOOKUP_NONE); + if (val && expireIfNeeded(db,keyname,key)) return NULL; + if (keyptr) *keyptr = key; + return val; } -robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) { - robj *o = lookupKeyRead(c->db, key); +robj *lookupKeyReadOrReply(client *c, robj *keyname, rkey **keyptr, robj *reply) { + robj *o = lookupKeyRead(c->db, keyname, keyptr); if (!o) addReply(c,reply); return o; } -robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { - robj *o = lookupKeyWrite(c->db, key); +robj *lookupKeyWriteOrReply(client *c, robj *keyname, rkey **keyptr, robj *reply) { + robj *o = lookupKeyWrite(c->db, keyname, keyptr); if (!o) addReply(c,reply); return o; } +/* Allocate and return a key object. Key objects are used to represent + * keys in the main Redis dictionaries that associate key names to + * value objects. */ +rkey *createKey(const char *keyname, size_t keylen) { + rkey *k = zcalloc(sizeof(*k) + keylen + 1); + k->len = keylen; + memcpy(k->name,keyname,keylen); + k->name[keylen] = '\0'; + return k; +} + +void freeKey(rkey *key) { + zfree(key); +} + /* Add the key to the DB. It's up to the caller to increment the reference * counter of the value if needed. * * The program is aborted if the key already exists. */ -void dbAdd(redisDb *db, robj *key, robj *val) { - sds copy = sdsdup(key->ptr); - int retval = dictAdd(db->dict, copy, val); +rkey *dbAdd(redisDb *db, robj *key, robj *val) { + rkey *ko = createKey(key->ptr,sdslen(key->ptr)); + int retval = dictAdd(db->dict, ko, val); serverAssertWithInfo(NULL,key,retval == DICT_OK); if (val->type == OBJ_LIST || val->type == OBJ_ZSET) signalKeyAsReady(db, key); if (server.cluster_enabled) slotToKeyAdd(key); + + + /* Set the LRU to the current lruclock (minutes resolution), or + * alternatively the LFU counter. */ + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { + ko->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; + } else { + ko->lru = LRU_CLOCK(); + } + return ko; } /* Overwrite an existing key with a new value. Incrementing the reference @@ -191,15 +223,13 @@ void dbAdd(redisDb *db, robj *key, robj *val) { * This function does not modify the expire time of the existing key. * * The program is aborted if the key was not already present. */ -void dbOverwrite(redisDb *db, robj *key, robj *val) { - dictEntry *de = dictFind(db->dict,key->ptr); +rkey *dbOverwrite(redisDb *db, robj *keyname, robj *val) { + dictEntry *de = dictFind(db->dict,keyname->ptr); - serverAssertWithInfo(NULL,key,de != NULL); + serverAssertWithInfo(NULL,keyname,de != NULL); dictEntry auxentry = *de; robj *old = dictGetVal(de); - if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { - val->lru = old->lru; - } + rkey *key = dictGetKey(de); dictSetVal(db->dict, de, val); if (server.lazyfree_lazy_server_del) { @@ -208,6 +238,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { } dictFreeVal(db->dict, &auxentry); + return key; } /* High level Set operation. This function can be used in order to set @@ -217,16 +248,20 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { * 2) clients WATCHing for the destination key notified. * 3) The expire time of the key is reset (the key is made persistent). * - * All the new keys in the database should be created via this interface. */ -void setKey(redisDb *db, robj *key, robj *val) { - if (lookupKeyWrite(db,key) == NULL) { - dbAdd(db,key,val); + * All the new keys in the database should be created via this interface. + * The function returns a pointer to the key object representing the key + * in the database dictionary. */ +rkey *setKey(redisDb *db, robj *keyname, robj *val) { + rkey *key; + if (lookupKeyWrite(db,keyname,&key) == NULL) { + key = dbAdd(db,keyname,val); } else { - dbOverwrite(db,key,val); + key = dbOverwrite(db,keyname,val); } incrRefCount(val); removeExpire(db,key); - signalModifiedKey(db,key); + signalModifiedKey(db,keyname); + return key; } int dbExists(redisDb *db, robj *key) { @@ -240,18 +275,18 @@ int dbExists(redisDb *db, robj *key) { robj *dbRandomKey(redisDb *db) { dictEntry *de; int maxtries = 100; - int allvolatile = dictSize(db->dict) == dictSize(db->expires); + int allvolatile = dictSize(db->dict) == raxSize(db->expires); while(1) { - sds key; - robj *keyobj; + rkey *key; + robj *keyname; de = dictGetFairRandomKey(db->dict); if (de == NULL) return NULL; key = dictGetKey(de); - keyobj = createStringObject(key,sdslen(key)); - if (dictFind(db->expires,key)) { + keyname = createStringObject(key->name,key->len); + if (key->flags & KEY_FLAG_EXPIRE) { if (allvolatile && server.masterhost && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, * it could happen that all the keys are already logically @@ -261,28 +296,27 @@ robj *dbRandomKey(redisDb *db) { * To prevent the infinite loop we do some tries, but if there * are the conditions for an infinite loop, eventually we * return a key name that may be already expired. */ - return keyobj; + return keyname; } - if (expireIfNeeded(db,keyobj)) { - decrRefCount(keyobj); + if (expireIfNeeded(db,keyname,key)) { + decrRefCount(keyname); continue; /* search for another key. This expired. */ } } - return keyobj; + return keyname; } } -/* Delete a key, value, and associated expiration entry if any, from the DB */ -int dbSyncDelete(redisDb *db, robj *key) { - /* Deleting an entry from the expires dict will not free the sds of - * the key, because it is shared with the main dictionary. */ - if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); - if (dictDelete(db->dict,key->ptr) == DICT_OK) { - if (server.cluster_enabled) slotToKeyDel(key); - return 1; - } else { - return 0; - } +/* Delete a key, value, and associated expiration entry if any, from the DB. + * The function returns 1 is they existed and was removed, 0 otherwise. */ +int dbSyncDelete(redisDb *db, robj *keyname) { + dictEntry *de = dictUnlink(db->dict,keyname->ptr); + if (de == NULL) return 0; + rkey *key = dictGetKey(de); + if (key->flags & KEY_FLAG_EXPIRE) removeExpireFromTree(db,key); + if (server.cluster_enabled) slotToKeyDel(keyname); + dictFreeUnlinkedEntry(db->dict,de); + return 1; } /* This is a wrapper whose behavior depends on the Redis lazy free @@ -312,7 +346,7 @@ int dbDelete(redisDb *db, robj *key) { * The object 'o' is what the caller already obtained by looking up 'key' * in 'db', the usage pattern looks like this: * - * o = lookupKeyWrite(db,key); + * o = lookupKeyWrite(db,key,NULL); * if (checkType(c,o,OBJ_STRING)) return; * o = dbUnshareStringValue(db,key,o); * @@ -367,7 +401,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { emptyDbAsync(&server.db[j]); } else { dictEmpty(server.db[j].dict,callback); - dictEmpty(server.db[j].expires,callback); + raxFreeWithCallback(server.db[j].expires,callback); } } if (server.cluster_enabled) { @@ -471,7 +505,7 @@ void delGenericCommand(client *c, int lazy) { int numdel = 0, j; for (j = 1; j < c->argc; j++) { - expireIfNeeded(c->db,c->argv[j]); + expireIfNeededByName(c->db,c->argv[j]); int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : dbSyncDelete(c->db,c->argv[j]); if (deleted) { @@ -500,7 +534,7 @@ void existsCommand(client *c) { int j; for (j = 1; j < c->argc; j++) { - if (lookupKeyRead(c->db,c->argv[j])) count++; + if (lookupKeyRead(c->db,c->argv[j],NULL)) count++; } addReplyLongLong(c,count); } @@ -546,16 +580,13 @@ void keysCommand(client *c) { di = dictGetSafeIterator(c->db->dict); allkeys = (pattern[0] == '*' && pattern[1] == '\0'); while((de = dictNext(di)) != NULL) { - sds key = dictGetKey(de); - robj *keyobj; + rkey *key = dictGetKey(de); - if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { - keyobj = createStringObject(key,sdslen(key)); - if (!keyIsExpired(c->db,keyobj)) { - addReplyBulk(c,keyobj); + if (allkeys || stringmatchlen(pattern,plen,key->name,key->len,0)) { + if (!keyIsExpired(key)) { + addReplyBulkCBuffer(c,key->name,key->len); numkeys++; } - decrRefCount(keyobj); } } dictReleaseIterator(di); @@ -571,8 +602,8 @@ void scanCallback(void *privdata, const dictEntry *de) { robj *key, *val = NULL; if (o == NULL) { - sds sdskey = dictGetKey(de); - key = createStringObject(sdskey, sdslen(sdskey)); + rkey *ko = dictGetKey(de); + key = createStringObject(ko->name, ko->len); } else if (o->type == OBJ_SET) { sds keysds = dictGetKey(de); key = createStringObject(keysds,sdslen(keysds)); @@ -760,7 +791,12 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { } /* Filter element if it is an expired key. */ - if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1; + if (!filter && + o == NULL && + expireIfNeededByName(c->db,kobj)) + { + filter = 1; + } /* Remove the element and its associted value if needed. */ if (filter) { @@ -819,7 +855,7 @@ void typeCommand(client *c) { robj *o; char *type; - o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH); + o = lookupKeyReadWithFlags(c->db,c->argv[1],NULL,LOOKUP_NOTOUCH); if (o == NULL) { type = "none"; } else { @@ -870,6 +906,7 @@ void shutdownCommand(client *c) { void renameGenericCommand(client *c, int nx) { robj *o; + rkey *key; long long expire; int samekey = 0; @@ -877,7 +914,7 @@ void renameGenericCommand(client *c, int nx) { * if the key exists, however we still return an error on unexisting key. */ if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1; - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL) + if ((o = lookupKeyWriteOrReply(c,c->argv[1],&key,shared.nokeyerr)) == NULL) return; if (samekey) { @@ -886,8 +923,8 @@ void renameGenericCommand(client *c, int nx) { } incrRefCount(o); - expire = getExpire(c->db,c->argv[1]); - if (lookupKeyWrite(c->db,c->argv[2]) != NULL) { + expire = getExpire(key); + if (lookupKeyWrite(c->db,c->argv[2],NULL) != NULL) { if (nx) { decrRefCount(o); addReply(c,shared.czero); @@ -897,8 +934,8 @@ void renameGenericCommand(client *c, int nx) { * with the same name. */ dbDelete(c->db,c->argv[2]); } - dbAdd(c->db,c->argv[2],o); - if (expire != -1) setExpire(c,c->db,c->argv[2],expire); + key = dbAdd(c->db,c->argv[2],o); + if (expire != -1) setExpire(c,c->db,key,expire); dbDelete(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[2]); @@ -951,20 +988,21 @@ void moveCommand(client *c) { } /* Check if the element exists and get a reference */ - o = lookupKeyWrite(c->db,c->argv[1]); + rkey *key; + o = lookupKeyWrite(c->db,c->argv[1],&key); if (!o) { addReply(c,shared.czero); return; } - expire = getExpire(c->db,c->argv[1]); + expire = getExpire(key); /* Return zero if the key already exists in the target DB */ - if (lookupKeyWrite(dst,c->argv[1]) != NULL) { + if (lookupKeyWrite(dst,c->argv[1],NULL) != NULL) { addReply(c,shared.czero); return; } - dbAdd(dst,c->argv[1],o); - if (expire != -1) setExpire(c,dst,c->argv[1],expire); + key = dbAdd(dst,c->argv[1],o); + if (expire != -1) setExpire(c,dst,key,expire); incrRefCount(o); /* OK! key moved, free the entry in the source DB */ @@ -982,7 +1020,7 @@ void scanDatabaseForReadyLists(redisDb *db) { dictIterator *di = dictGetSafeIterator(db->blocking_keys); while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); - robj *value = lookupKey(db,key,LOOKUP_NOTOUCH); + robj *value = lookupKey(db,key,NULL,LOOKUP_NOTOUCH); if (value && (value->type == OBJ_LIST || value->type == OBJ_STREAM || value->type == OBJ_ZSET)) @@ -1060,134 +1098,6 @@ void swapdbCommand(client *c) { } } -/*----------------------------------------------------------------------------- - * Expires API - *----------------------------------------------------------------------------*/ - -int removeExpire(redisDb *db, robj *key) { - /* An expire may only be removed if there is a corresponding entry in the - * main dict. Otherwise, the key will never be freed. */ - serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL); - return dictDelete(db->expires,key->ptr) == DICT_OK; -} - -/* Set an expire to the specified key. If the expire is set in the context - * of an user calling a command 'c' is the client, otherwise 'c' is set - * to NULL. The 'when' parameter is the absolute unix time in milliseconds - * after which the key will no longer be considered valid. */ -void setExpire(client *c, redisDb *db, robj *key, long long when) { - dictEntry *kde, *de; - - /* Reuse the sds from the main dict in the expire dict */ - kde = dictFind(db->dict,key->ptr); - serverAssertWithInfo(NULL,key,kde != NULL); - de = dictAddOrFind(db->expires,dictGetKey(kde)); - dictSetSignedIntegerVal(de,when); - - int writable_slave = server.masterhost && server.repl_slave_ro == 0; - if (c && writable_slave && !(c->flags & CLIENT_MASTER)) - rememberSlaveKeyWithExpire(db,key); -} - -/* Return the expire time of the specified key, or -1 if no expire - * is associated with this key (i.e. the key is non volatile) */ -long long getExpire(redisDb *db, robj *key) { - dictEntry *de; - - /* No expire? return ASAP */ - if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key->ptr)) == NULL) return -1; - - /* The entry was found in the expire dict, this means it should also - * be present in the main dict (safety check). */ - serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL); - return dictGetSignedIntegerVal(de); -} - -/* Propagate expires into slaves and the AOF file. - * When a key expires in the master, a DEL operation for this key is sent - * to all the slaves and the AOF file if enabled. - * - * This way the key expiry is centralized in one place, and since both - * AOF and the master->slave link guarantee operation ordering, everything - * will be consistent even if we allow write operations against expiring - * keys. */ -void propagateExpire(redisDb *db, robj *key, int lazy) { - robj *argv[2]; - - argv[0] = lazy ? shared.unlink : shared.del; - argv[1] = key; - incrRefCount(argv[0]); - incrRefCount(argv[1]); - - if (server.aof_state != AOF_OFF) - feedAppendOnlyFile(server.delCommand,db->id,argv,2); - replicationFeedSlaves(server.slaves,db->id,argv,2); - - decrRefCount(argv[0]); - decrRefCount(argv[1]); -} - -/* Check if the key is expired. */ -int keyIsExpired(redisDb *db, robj *key) { - mstime_t when = getExpire(db,key); - - if (when < 0) return 0; /* No expire for this key */ - - /* Don't expire anything while loading. It will be done later. */ - if (server.loading) return 0; - - /* If we are in the context of a Lua script, we pretend that time is - * blocked to when the Lua script started. This way a key can expire - * only the first time it is accessed and not in the middle of the - * script execution, making propagation to slaves / AOF consistent. - * See issue #1525 on Github for more information. */ - mstime_t now = server.lua_caller ? server.lua_time_start : mstime(); - - return now > when; -} - -/* This function is called when we are going to perform some operation - * in a given key, but such key may be already logically expired even if - * it still exists in the database. The main way this function is called - * is via lookupKey*() family of functions. - * - * The behavior of the function depends on the replication role of the - * instance, because slave instances do not expire keys, they wait - * for DELs from the master for consistency matters. However even - * slaves will try to have a coherent return value for the function, - * so that read commands executed in the slave side will be able to - * behave like if the key is expired even if still present (because the - * master has yet to propagate the DEL). - * - * In masters as a side effect of finding a key which is expired, such - * key will be evicted from the database. Also this may trigger the - * propagation of a DEL/UNLINK command in AOF / replication stream. - * - * The return value of the function is 0 if the key is still valid, - * otherwise the function returns 1 if the key is expired. */ -int expireIfNeeded(redisDb *db, robj *key) { - if (!keyIsExpired(db,key)) return 0; - - /* If we are running in the context of a slave, instead of - * evicting the expired key from the database, we return ASAP: - * the slave key expiration is controlled by the master that will - * send us synthesized DEL operations for expired keys. - * - * Still we try to return the right information to the caller, - * that is, 0 if we think the key should be still valid, 1 if - * we think the key is expired at this time. */ - if (server.masterhost != NULL) return 1; - - /* Delete the key */ - server.stat_expiredkeys++; - propagateExpire(db,key,server.lazyfree_lazy_expire); - notifyKeyspaceEvent(NOTIFY_EXPIRED, - "expired",key,db->id); - return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : - dbSyncDelete(db,key); -} - /* ----------------------------------------------------------------------------- * API to get key arguments from commands * ---------------------------------------------------------------------------*/ |