From e9bb30fd859ed4e9e3e6434207dedbc251086858 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 15 Jun 2018 13:14:57 +0200 Subject: Experimental: new keyspace and expire algorithm. This is an alpha quality implementation of a new keyspace representation and a new expire algorithm for Redis. This work is described here: https://gist.github.com/antirez/b2eb293819666ee104c7fcad71986eb7 --- src/aof.c | 69 ++++---- src/bio.c | 2 +- src/bitops.c | 12 +- src/blocked.c | 2 +- src/cluster.c | 59 ++++--- src/config.c | 12 +- src/db.c | 334 ++++++++++++++----------------------- src/debug.c | 34 ++-- src/defrag.c | 2 +- src/dict.c | 37 +++-- src/dict.h | 43 ++++- src/evict.c | 39 +++-- src/expire.c | 452 +++++++++++++++++++++++++++++++++++--------------- src/geo.c | 10 +- src/gopher.c | 2 +- src/hyperloglog.c | 12 +- src/latency.c | 6 +- src/lazyfree.c | 30 ++-- src/module.c | 44 ++--- src/object.c | 53 +++--- src/rdb.c | 39 ++--- src/rdb.h | 5 +- src/redis-benchmark.c | 3 +- src/redis-cli.c | 12 +- src/sentinel.c | 12 +- src/server.c | 155 +++++++++++------ src/server.h | 71 +++++--- src/sort.c | 8 +- src/t_hash.c | 24 +-- src/t_list.c | 36 ++-- src/t_set.c | 32 ++-- src/t_stream.c | 28 ++-- src/t_string.c | 34 ++-- src/t_zset.c | 43 ++--- 34 files changed, 998 insertions(+), 758 deletions(-) diff --git a/src/aof.c b/src/aof.c index 4744847d2..b9ade76a4 100644 --- a/src/aof.c +++ b/src/aof.c @@ -923,7 +923,7 @@ int rioWriteBulkObject(rio *r, robj *obj) { /* Emit the commands needed to rebuild a list object. * The function returns 0 on error, 1 on success. */ -int rewriteListObject(rio *r, robj *key, robj *o) { +int rewriteListObject(rio *r, rkey *key, robj *o) { long long count = 0, items = listTypeLength(o); if (o->encoding == OBJ_ENCODING_QUICKLIST) { @@ -937,7 +937,7 @@ int rewriteListObject(rio *r, robj *key, robj *o) { AOF_REWRITE_ITEMS_PER_CMD : items; if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; } if (entry.value) { @@ -957,7 +957,7 @@ int rewriteListObject(rio *r, robj *key, robj *o) { /* Emit the commands needed to rebuild a set object. * The function returns 0 on error, 1 on success. */ -int rewriteSetObject(rio *r, robj *key, robj *o) { +int rewriteSetObject(rio *r, rkey *key, robj *o) { long long count = 0, items = setTypeSize(o); if (o->encoding == OBJ_ENCODING_INTSET) { @@ -971,7 +971,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; if (rioWriteBulkString(r,"SADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; } if (rioWriteBulkLongLong(r,llval) == 0) return 0; if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; @@ -989,7 +989,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; if (rioWriteBulkString(r,"SADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; } if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0; if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; @@ -1004,7 +1004,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) { /* Emit the commands needed to rebuild a sorted set object. * The function returns 0 on error, 1 on success. */ -int rewriteSortedSetObject(rio *r, robj *key, robj *o) { +int rewriteSortedSetObject(rio *r, rkey *key, robj *o) { long long count = 0, items = zsetLength(o); if (o->encoding == OBJ_ENCODING_ZIPLIST) { @@ -1030,7 +1030,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; if (rioWriteBulkString(r,"ZADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; } if (rioWriteBulkDouble(r,score) == 0) return 0; if (vstr != NULL) { @@ -1057,7 +1057,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; if (rioWriteBulkString(r,"ZADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; } if (rioWriteBulkDouble(r,*score) == 0) return 0; if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0; @@ -1099,7 +1099,7 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) { /* Emit the commands needed to rebuild a hash object. * The function returns 0 on error, 1 on success. */ -int rewriteHashObject(rio *r, robj *key, robj *o) { +int rewriteHashObject(rio *r, rkey *key, robj *o) { hashTypeIterator *hi; long long count = 0, items = hashTypeLength(o); @@ -1111,7 +1111,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; if (rioWriteBulkString(r,"HMSET",5) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; } if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) == 0) return 0; @@ -1140,14 +1140,14 @@ int rioWriteBulkStreamID(rio *r,streamID *id) { * add the message described by 'nack' having the id 'rawid', into the pending * list of the specified consumer. All this in the context of the specified * key and group. */ -int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) { +int rioWriteStreamPendingEntry(rio *r, rkey *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) { /* XCLAIM 0 TIME RETRYCOUNT JUSTID FORCE. */ streamID id; streamDecodeID(rawid,&id); if (rioWriteBulkCount(r,'*',12) == 0) return 0; if (rioWriteBulkString(r,"XCLAIM",6) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0; if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0; if (rioWriteBulkString(r,"0",1) == 0) return 0; @@ -1163,7 +1163,7 @@ int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t /* Emit the commands needed to rebuild a stream object. * The function returns 0 on error, 1 on success. */ -int rewriteStreamObject(rio *r, robj *key, robj *o) { +int rewriteStreamObject(rio *r, rkey *key, robj *o) { stream *s = o->ptr; streamIterator si; streamIteratorStart(&si,s,NULL,NULL,0); @@ -1179,7 +1179,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { /* Emit the XADD ...fields... command. */ if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0; if (rioWriteBulkString(r,"XADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; if (rioWriteBulkStreamID(r,&id) == 0) return 0; while(numfields--) { unsigned char *field, *value; @@ -1195,7 +1195,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { * for the Stream type. */ if (rioWriteBulkCount(r,'*',7) == 0) return 0; if (rioWriteBulkString(r,"XADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0; if (rioWriteBulkString(r,"0",1) == 0) return 0; if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; @@ -1207,7 +1207,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { * in case of XDEL lastid. */ if (rioWriteBulkCount(r,'*',3) == 0) return 0; if (rioWriteBulkString(r,"XSETID",6) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; @@ -1222,7 +1222,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',5) == 0) return 0; if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0; if (rioWriteBulkString(r,"CREATE",6) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return 0; if (rioWriteBulkStreamID(r,&group->last_id) == 0) return 0; @@ -1262,12 +1262,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { /* Call the module type callback in order to rewrite a data type * that is exported by a module and is not handled by Redis itself. * The function returns 0 on error, 1 on success. */ -int rewriteModuleObject(rio *r, robj *key, robj *o) { +int rewriteModuleObject(rio *r, rkey *key, robj *o) { RedisModuleIO io; moduleValue *mv = o->ptr; moduleType *mt = mv->type; - moduleInitIOContext(io,mt,r,key); - mt->aof_rewrite(&io,key,mv->value); + robj *keyname = createStringObject(key->name,key->len); + moduleInitIOContext(io,mt,r,keyname); + mt->aof_rewrite(&io,keyname,mv->value); + decrRefCount(keyname); if (io.ctx) { moduleFreeContext(io.ctx); zfree(io.ctx); @@ -1309,15 +1311,14 @@ int rewriteAppendOnlyFileRio(rio *aof) { /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - sds keystr; - robj key, *o; + rkey *key; + robj *o; long long expiretime; - keystr = dictGetKey(de); + key = dictGetKey(de); o = dictGetVal(de); - initStaticStringObject(key,keystr); - expiretime = getExpire(db,&key); + expiretime = getExpire(key); /* Save the key and associated value */ if (o->type == OBJ_STRING) { @@ -1325,20 +1326,21 @@ int rewriteAppendOnlyFileRio(rio *aof) { char cmd[]="*3\r\n$3\r\nSET\r\n"; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ - if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWriteBulkString(aof,key->name,key->len) == 0) + goto werr; if (rioWriteBulkObject(aof,o) == 0) goto werr; } else if (o->type == OBJ_LIST) { - if (rewriteListObject(aof,&key,o) == 0) goto werr; + if (rewriteListObject(aof,key,o) == 0) goto werr; } else if (o->type == OBJ_SET) { - if (rewriteSetObject(aof,&key,o) == 0) goto werr; + if (rewriteSetObject(aof,key,o) == 0) goto werr; } else if (o->type == OBJ_ZSET) { - if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr; + if (rewriteSortedSetObject(aof,key,o) == 0) goto werr; } else if (o->type == OBJ_HASH) { - if (rewriteHashObject(aof,&key,o) == 0) goto werr; + if (rewriteHashObject(aof,key,o) == 0) goto werr; } else if (o->type == OBJ_STREAM) { - if (rewriteStreamObject(aof,&key,o) == 0) goto werr; + if (rewriteStreamObject(aof,key,o) == 0) goto werr; } else if (o->type == OBJ_MODULE) { - if (rewriteModuleObject(aof,&key,o) == 0) goto werr; + if (rewriteModuleObject(aof,key,o) == 0) goto werr; } else { serverPanic("Unknown object type"); } @@ -1346,7 +1348,8 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWriteBulkString(aof,key->name,key->len) == 0) + goto werr; if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr; } /* Read some diff from the parent process from time to time. */ diff --git a/src/bio.c b/src/bio.c index 2af684570..749727e79 100644 --- a/src/bio.c +++ b/src/bio.c @@ -85,7 +85,7 @@ struct bio_job { void *bioProcessBackgroundJobs(void *arg); void lazyfreeFreeObjectFromBioThread(robj *o); -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2); +void lazyfreeFreeDatabaseFromBioThread(dict *ht, rax *tree); void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl); /* Make sure we have enough stack to perform all the things we do in the diff --git a/src/bitops.c b/src/bitops.c index ee1ce0460..a51538516 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -477,7 +477,7 @@ int getBitfieldTypeFromArgument(client *c, robj *o, int *sign, int *bits) { * an error is sent to the client. */ robj *lookupStringForBitCommand(client *c, size_t maxbit) { size_t byte = maxbit >> 3; - robj *o = lookupKeyWrite(c->db,c->argv[1]); + robj *o = lookupKeyWrite(c->db,c->argv[1],NULL); if (o == NULL) { o = createObject(OBJ_STRING,sdsnewlen(NULL, byte+1)); @@ -571,7 +571,7 @@ void getbitCommand(client *c) { if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK) return; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_STRING)) return; byte = bitoffset >> 3; @@ -625,7 +625,7 @@ void bitopCommand(client *c) { len = zmalloc(sizeof(long) * numkeys); objects = zmalloc(sizeof(robj*) * numkeys); for (j = 0; j < numkeys; j++) { - o = lookupKeyRead(c->db,c->argv[j+3]); + o = lookupKeyRead(c->db,c->argv[j+3],NULL); /* Handle non-existing keys as empty strings. */ if (o == NULL) { objects[j] = NULL; @@ -773,7 +773,7 @@ void bitcountCommand(client *c) { char llbuf[LONG_STR_SIZE]; /* Lookup, check for type, and return 0 for non existing keys. */ - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_STRING)) return; p = getObjectReadOnlyString(o,&strlen,llbuf); @@ -834,7 +834,7 @@ void bitposCommand(client *c) { /* If the key does not exist, from our point of view it is an infinite * array of 0 bits. If the user is looking for the fist clear bit return 0, * If the user is looking for the first set bit, return -1. */ - if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { + if ((o = lookupKeyRead(c->db,c->argv[1],NULL)) == NULL) { addReplyLongLong(c, bit ? -1 : 0); return; } @@ -993,7 +993,7 @@ void bitfieldCommand(client *c) { if (readonly) { /* Lookup for read is ok if key doesn't exit, but errors * if it's not a string. */ - o = lookupKeyRead(c->db,c->argv[1]); + o = lookupKeyRead(c->db,c->argv[1],NULL); if (o != NULL && checkType(c,o,OBJ_STRING)) { zfree(ops); return; diff --git a/src/blocked.c b/src/blocked.c index 1db657869..c4162e1e7 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -270,7 +270,7 @@ void handleClientsBlockedOnKeys(void) { dictDelete(rl->db->ready_keys,rl->key); /* Serve clients blocked on list key. */ - robj *o = lookupKeyWrite(rl->db,rl->key); + robj *o = lookupKeyWrite(rl->db,rl->key,NULL); if (o != NULL && o->type == OBJ_LIST) { dictEntry *de; diff --git a/src/cluster.c b/src/cluster.c index c85e3791d..505c3c757 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4776,7 +4776,7 @@ NULL /* Generates a DUMP-format representation of the object 'o', adding it to the * io stream pointed by 'rio'. This function can't fail. */ -void createDumpPayload(rio *payload, robj *o, robj *key) { +void createDumpPayload(rio *payload, robj *o, rkey *key) { unsigned char buf[2]; uint64_t crc; @@ -4833,16 +4833,17 @@ int verifyDumpPayload(unsigned char *p, size_t len) { * complement of RESTORE and can be useful for different applications. */ void dumpCommand(client *c) { robj *o, *dumpobj; + rkey *key; rio payload; /* Check if the key is here. */ - if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { + if ((o = lookupKeyRead(c->db,c->argv[1],&key)) == NULL) { addReplyNull(c); return; } /* Create the DUMP encoded representation. */ - createDumpPayload(&payload,o,c->argv[1]); + createDumpPayload(&payload,o,key); /* Transfer to the client */ dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr); @@ -4893,7 +4894,7 @@ void restoreCommand(client *c) { } /* Make sure this key does not already exist here... */ - if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) { + if (!replace && lookupKeyWrite(c->db,c->argv[1],NULL) != NULL) { addReply(c,shared.busykeyerr); return; } @@ -4925,12 +4926,12 @@ void restoreCommand(client *c) { if (replace) dbDelete(c->db,c->argv[1]); /* Create the key and set the TTL if any */ - dbAdd(c->db,c->argv[1],obj); + rkey *key = dbAdd(c->db,c->argv[1],obj); if (ttl) { if (!absttl) ttl+=mstime(); - setExpire(c,c->db,c->argv[1],ttl); + setExpire(c,c->db,key,ttl); } - objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock); + objectSetLRUOrLFU(key,lfu_freq,lru_idle,lru_clock); signalModifiedKey(c->db,c->argv[1]); addReply(c,shared.ok); server.dirty++; @@ -5066,7 +5067,8 @@ void migrateCommand(client *c) { long timeout; long dbid; robj **ov = NULL; /* Objects to migrate. */ - robj **kv = NULL; /* Key names. */ + robj **knv = NULL; /* Key names. */ + rkey **kov = NULL; /* Key objects. */ robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */ rio cmd, payload; int may_retry = 1; @@ -5121,18 +5123,21 @@ void migrateCommand(client *c) { * this case, since often this is due to a normal condition like the key * expiring in the meantime. */ ov = zrealloc(ov,sizeof(robj*)*num_keys); - kv = zrealloc(kv,sizeof(robj*)*num_keys); + knv = zrealloc(knv,sizeof(robj*)*num_keys); + kov = zrealloc(kov,sizeof(rkey*)*num_keys); int oi = 0; for (j = 0; j < num_keys; j++) { - if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) { - kv[oi] = c->argv[first_key+j]; + rkey *ko; + if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j],&ko)) != NULL) { + knv[oi] = c->argv[first_key+j]; + kov[oi] = ko; oi++; } } num_keys = oi; if (num_keys == 0) { - zfree(ov); zfree(kv); + zfree(ov); zfree(knv); zfree(kov); addReplySds(c,sdsnew("+NOKEY\r\n")); return; } @@ -5143,7 +5148,7 @@ try_again: /* Connect */ cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); if (cs == NULL) { - zfree(ov); zfree(kv); + zfree(ov); zfree(knv); zfree(kov); return; /* error sent to the client by migrateGetSocket() */ } @@ -5173,7 +5178,7 @@ try_again: /* Create RESTORE payload and generate the protocol to call the command. */ for (j = 0; j < num_keys; j++) { long long ttl = 0; - long long expireat = getExpire(c->db,kv[j]); + long long expireat = getExpire(kov[j]); if (expireat != -1) { ttl = expireat-mstime(); @@ -5186,7 +5191,9 @@ try_again: /* Relocate valid (non expired) keys into the array in successive * positions to remove holes created by the keys that were present * in the first lookup but are now expired after the second lookup. */ - kv[non_expired++] = kv[j]; + knv[non_expired] = knv[j]; + kov[non_expired] = kov[j]; + non_expired++; serverAssertWithInfo(c,NULL, rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); @@ -5196,14 +5203,14 @@ try_again: rioWriteBulkString(&cmd,"RESTORE-ASKING",14)); else serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); - serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j])); - serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr, - sdslen(kv[j]->ptr))); + serverAssertWithInfo(c,NULL,sdsEncodedObject(knv[j])); + serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,knv[j]->ptr, + sdslen(knv[j]->ptr))); serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); /* Emit the payload argument, that is the serialized object using * the DUMP format. */ - createDumpPayload(&payload,ov[j],kv[j]); + createDumpPayload(&payload,ov[j],kov[j]); serverAssertWithInfo(c,NULL, rioWriteBulkString(&cmd,payload.io.buffer.ptr, sdslen(payload.io.buffer.ptr))); @@ -5283,13 +5290,13 @@ try_again: } else { if (!copy) { /* No COPY option: remove the local key, signal the change. */ - dbDelete(c->db,kv[j]); - signalModifiedKey(c->db,kv[j]); + dbDelete(c->db,knv[j]); + signalModifiedKey(c->db,knv[j]); server.dirty++; /* Populate the argument vector to replace the old one. */ - newargv[del_idx++] = kv[j]; - incrRefCount(kv[j]); + newargv[del_idx++] = knv[j]; + incrRefCount(knv[j]); } } } @@ -5347,7 +5354,7 @@ try_again: } sdsfree(cmd.io.buffer.ptr); - zfree(ov); zfree(kv); zfree(newargv); + zfree(ov); zfree(knv); zfree(kov); zfree(newargv); return; /* On socket errors we try to close the cached socket and try again. @@ -5374,7 +5381,7 @@ socket_err: } /* Cleanup we want to do if no retry is attempted. */ - zfree(ov); zfree(kv); + zfree(ov); zfree(knv); zfree(kov); addReplySds(c, sdscatprintf(sdsempty(), "-IOERR error or timeout %s to target instance\r\n", @@ -5554,7 +5561,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in /* Migarting / Improrting slot? Count keys we don't have. */ if ((migrating_slot || importing_slot) && - lookupKeyRead(&server.db[0],thiskey) == NULL) + lookupKeyRead(&server.db[0],thiskey,NULL) == NULL) { missing_keys++; } diff --git a/src/config.c b/src/config.c index 7f0e9af89..9e509b659 100644 --- a/src/config.c +++ b/src/config.c @@ -1650,19 +1650,23 @@ void dictListDestructor(void *privdata, void *val); void rewriteConfigSentinelOption(struct rewriteConfigState *state); dictType optionToLineDictType = { - dictSdsCaseHash, /* hash function */ + dictSdsCaseHash, /* lookup hash function */ + dictSdsCaseHash, /* store hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCaseCompare, /* key compare */ + dictSdsKeyCaseCompare, /* lookup key compare */ + dictSdsKeyCaseCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ dictListDestructor /* val destructor */ }; dictType optionSetDictType = { - dictSdsCaseHash, /* hash function */ + dictSdsCaseHash, /* lookup hash function */ + dictSdsCaseHash, /* store hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCaseCompare, /* key compare */ + dictSdsKeyCaseCompare, /* lookup key compare */ + dictSdsKeyCaseCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; diff --git a/src/db.c b/src/db.c index b537a29a4..101eb1b8a 100644 --- a/src/db.c +++ b/src/db.c @@ -38,24 +38,26 @@ * C-level DB API *----------------------------------------------------------------------------*/ -int keyIsExpired(redisDb *db, robj *key); +int keyIsExpired(rkey *key); /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. * Then logarithmically increment the counter, and update the access time. */ -void updateLFU(robj *val) { - unsigned long counter = LFUDecrAndReturn(val); +void updateLFU(rkey *key) { + unsigned long counter = LFUDecrAndReturn(key); counter = LFULogIncr(counter); - val->lru = (LFUGetTimeInMinutes()<<8) | counter; + key->lru = (LFUGetTimeInMinutes()<<8) | counter; } /* Low level key lookup API, not actually called directly from commands * implementations that should instead rely on lookupKeyRead(), * lookupKeyWrite() and lookupKeyReadWithFlags(). */ -robj *lookupKey(redisDb *db, robj *key, int flags) { - dictEntry *de = dictFind(db->dict,key->ptr); +robj *lookupKey(redisDb *db, robj *keyname, rkey **keyptr, int flags) { + dictEntry *de = dictFind(db->dict,keyname->ptr); if (de) { + rkey *key = dictGetKey(de); robj *val = dictGetVal(de); + if (keyptr) *keyptr = key; /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger @@ -65,9 +67,9 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { !(flags & LOOKUP_NOTOUCH)) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { - updateLFU(val); + updateLFU(key); } else { - val->lru = LRU_CLOCK(); + key->lru = LRU_CLOCK(); } } return val; @@ -98,16 +100,18 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { * for read operations. Even if the key expiry is master-driven, we can * correctly report a key is expired on slaves even if the master is lagging * expiring our key via DELs in the replication link. */ -robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { +robj *lookupKeyReadWithFlags(redisDb *db, robj *keyname, rkey **keyptr, int flags) { robj *val; + rkey *keyobj; - if (expireIfNeeded(db,key) == 1) { + val = lookupKey(db,keyname,&keyobj,flags); + if (val && expireIfNeeded(db,keyname,keyobj) == 1) { /* Key expired. If we are in the context of a master, expireIfNeeded() * returns 0 only when the key does not exist at all, so it's safe * to return NULL ASAP. */ if (server.masterhost == NULL) { server.stat_keyspace_misses++; - notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", keyname, db->id); return NULL; } @@ -129,24 +133,24 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { server.current_client->cmd->flags & CMD_READONLY) { server.stat_keyspace_misses++; - notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", keyname, db->id); return NULL; } } - val = lookupKey(db,key,flags); if (val == NULL) { server.stat_keyspace_misses++; - notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", keyname, db->id); } else server.stat_keyspace_hits++; + if (keyptr) *keyptr = keyobj; return val; } /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the * common case. */ -robj *lookupKeyRead(redisDb *db, robj *key) { - return lookupKeyReadWithFlags(db,key,LOOKUP_NONE); +robj *lookupKeyRead(redisDb *db, robj *keyname, rkey **keyptr) { + return lookupKeyReadWithFlags(db,keyname,keyptr,LOOKUP_NONE); } /* Lookup a key for write operations, and as a side effect, if needed, expires @@ -154,36 +158,64 @@ robj *lookupKeyRead(redisDb *db, robj *key) { * * Returns the linked value object if the key exists or NULL if the key * does not exist in the specified DB. */ -robj *lookupKeyWrite(redisDb *db, robj *key) { - expireIfNeeded(db,key); - return lookupKey(db,key,LOOKUP_NONE); +robj *lookupKeyWrite(redisDb *db, robj *keyname, rkey **keyptr) { + rkey *key; + robj *val = lookupKey(db,keyname,&key,LOOKUP_NONE); + if (val && expireIfNeeded(db,keyname,key)) return NULL; + if (keyptr) *keyptr = key; + return val; } -robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) { - robj *o = lookupKeyRead(c->db, key); +robj *lookupKeyReadOrReply(client *c, robj *keyname, rkey **keyptr, robj *reply) { + robj *o = lookupKeyRead(c->db, keyname, keyptr); if (!o) addReply(c,reply); return o; } -robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { - robj *o = lookupKeyWrite(c->db, key); +robj *lookupKeyWriteOrReply(client *c, robj *keyname, rkey **keyptr, robj *reply) { + robj *o = lookupKeyWrite(c->db, keyname, keyptr); if (!o) addReply(c,reply); return o; } +/* Allocate and return a key object. Key objects are used to represent + * keys in the main Redis dictionaries that associate key names to + * value objects. */ +rkey *createKey(const char *keyname, size_t keylen) { + rkey *k = zcalloc(sizeof(*k) + keylen + 1); + k->len = keylen; + memcpy(k->name,keyname,keylen); + k->name[keylen] = '\0'; + return k; +} + +void freeKey(rkey *key) { + zfree(key); +} + /* Add the key to the DB. It's up to the caller to increment the reference * counter of the value if needed. * * The program is aborted if the key already exists. */ -void dbAdd(redisDb *db, robj *key, robj *val) { - sds copy = sdsdup(key->ptr); - int retval = dictAdd(db->dict, copy, val); +rkey *dbAdd(redisDb *db, robj *key, robj *val) { + rkey *ko = createKey(key->ptr,sdslen(key->ptr)); + int retval = dictAdd(db->dict, ko, val); serverAssertWithInfo(NULL,key,retval == DICT_OK); if (val->type == OBJ_LIST || val->type == OBJ_ZSET) signalKeyAsReady(db, key); if (server.cluster_enabled) slotToKeyAdd(key); + + + /* Set the LRU to the current lruclock (minutes resolution), or + * alternatively the LFU counter. */ + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { + ko->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; + } else { + ko->lru = LRU_CLOCK(); + } + return ko; } /* Overwrite an existing key with a new value. Incrementing the reference @@ -191,15 +223,13 @@ void dbAdd(redisDb *db, robj *key, robj *val) { * This function does not modify the expire time of the existing key. * * The program is aborted if the key was not already present. */ -void dbOverwrite(redisDb *db, robj *key, robj *val) { - dictEntry *de = dictFind(db->dict,key->ptr); +rkey *dbOverwrite(redisDb *db, robj *keyname, robj *val) { + dictEntry *de = dictFind(db->dict,keyname->ptr); - serverAssertWithInfo(NULL,key,de != NULL); + serverAssertWithInfo(NULL,keyname,de != NULL); dictEntry auxentry = *de; robj *old = dictGetVal(de); - if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { - val->lru = old->lru; - } + rkey *key = dictGetKey(de); dictSetVal(db->dict, de, val); if (server.lazyfree_lazy_server_del) { @@ -208,6 +238,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { } dictFreeVal(db->dict, &auxentry); + return key; } /* High level Set operation. This function can be used in order to set @@ -217,16 +248,20 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { * 2) clients WATCHing for the destination key notified. * 3) The expire time of the key is reset (the key is made persistent). * - * All the new keys in the database should be created via this interface. */ -void setKey(redisDb *db, robj *key, robj *val) { - if (lookupKeyWrite(db,key) == NULL) { - dbAdd(db,key,val); + * All the new keys in the database should be created via this interface. + * The function returns a pointer to the key object representing the key + * in the database dictionary. */ +rkey *setKey(redisDb *db, robj *keyname, robj *val) { + rkey *key; + if (lookupKeyWrite(db,keyname,&key) == NULL) { + key = dbAdd(db,keyname,val); } else { - dbOverwrite(db,key,val); + key = dbOverwrite(db,keyname,val); } incrRefCount(val); removeExpire(db,key); - signalModifiedKey(db,key); + signalModifiedKey(db,keyname); + return key; } int dbExists(redisDb *db, robj *key) { @@ -240,18 +275,18 @@ int dbExists(redisDb *db, robj *key) { robj *dbRandomKey(redisDb *db) { dictEntry *de; int maxtries = 100; - int allvolatile = dictSize(db->dict) == dictSize(db->expires); + int allvolatile = dictSize(db->dict) == raxSize(db->expires); while(1) { - sds key; - robj *keyobj; + rkey *key; + robj *keyname; de = dictGetFairRandomKey(db->dict); if (de == NULL) return NULL; key = dictGetKey(de); - keyobj = createStringObject(key,sdslen(key)); - if (dictFind(db->expires,key)) { + keyname = createStringObject(key->name,key->len); + if (key->flags & KEY_FLAG_EXPIRE) { if (allvolatile && server.masterhost && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, * it could happen that all the keys are already logically @@ -261,28 +296,27 @@ robj *dbRandomKey(redisDb *db) { * To prevent the infinite loop we do some tries, but if there * are the conditions for an infinite loop, eventually we * return a key name that may be already expired. */ - return keyobj; + return keyname; } - if (expireIfNeeded(db,keyobj)) { - decrRefCount(keyobj); + if (expireIfNeeded(db,keyname,key)) { + decrRefCount(keyname); continue; /* search for another key. This expired. */ } } - return keyobj; + return keyname; } } -/* Delete a key, value, and associated expiration entry if any, from the DB */ -int dbSyncDelete(redisDb *db, robj *key) { - /* Deleting an entry from the expires dict will not free the sds of - * the key, because it is shared with the main dictionary. */ - if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); - if (dictDelete(db->dict,key->ptr) == DICT_OK) { - if (server.cluster_enabled) slotToKeyDel(key); - return 1; - } else { - return 0; - } +/* Delete a key, value, and associated expiration entry if any, from the DB. + * The function returns 1 is they existed and was removed, 0 otherwise. */ +int dbSyncDelete(redisDb *db, robj *keyname) { + dictEntry *de = dictUnlink(db->dict,keyname->ptr); + if (de == NULL) return 0; + rkey *key = dictGetKey(de); + if (key->flags & KEY_FLAG_EXPIRE) removeExpireFromTree(db,key); + if (server.cluster_enabled) slotToKeyDel(keyname); + dictFreeUnlinkedEntry(db->dict,de); + return 1; } /* This is a wrapper whose behavior depends on the Redis lazy free @@ -312,7 +346,7 @@ int dbDelete(redisDb *db, robj *key) { * The object 'o' is what the caller already obtained by looking up 'key' * in 'db', the usage pattern looks like this: * - * o = lookupKeyWrite(db,key); + * o = lookupKeyWrite(db,key,NULL); * if (checkType(c,o,OBJ_STRING)) return; * o = dbUnshareStringValue(db,key,o); * @@ -367,7 +401,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { emptyDbAsync(&server.db[j]); } else { dictEmpty(server.db[j].dict,callback); - dictEmpty(server.db[j].expires,callback); + raxFreeWithCallback(server.db[j].expires,callback); } } if (server.cluster_enabled) { @@ -471,7 +505,7 @@ void delGenericCommand(client *c, int lazy) { int numdel = 0, j; for (j = 1; j < c->argc; j++) { - expireIfNeeded(c->db,c->argv[j]); + expireIfNeededByName(c->db,c->argv[j]); int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : dbSyncDelete(c->db,c->argv[j]); if (deleted) { @@ -500,7 +534,7 @@ void existsCommand(client *c) { int j; for (j = 1; j < c->argc; j++) { - if (lookupKeyRead(c->db,c->argv[j])) count++; + if (lookupKeyRead(c->db,c->argv[j],NULL)) count++; } addReplyLongLong(c,count); } @@ -546,16 +580,13 @@ void keysCommand(client *c) { di = dictGetSafeIterator(c->db->dict); allkeys = (pattern[0] == '*' && pattern[1] == '\0'); while((de = dictNext(di)) != NULL) { - sds key = dictGetKey(de); - robj *keyobj; + rkey *key = dictGetKey(de); - if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { - keyobj = createStringObject(key,sdslen(key)); - if (!keyIsExpired(c->db,keyobj)) { - addReplyBulk(c,keyobj); + if (allkeys || stringmatchlen(pattern,plen,key->name,key->len,0)) { + if (!keyIsExpired(key)) { + addReplyBulkCBuffer(c,key->name,key->len); numkeys++; } - decrRefCount(keyobj); } } dictReleaseIterator(di); @@ -571,8 +602,8 @@ void scanCallback(void *privdata, const dictEntry *de) { robj *key, *val = NULL; if (o == NULL) { - sds sdskey = dictGetKey(de); - key = createStringObject(sdskey, sdslen(sdskey)); + rkey *ko = dictGetKey(de); + key = createStringObject(ko->name, ko->len); } else if (o->type == OBJ_SET) { sds keysds = dictGetKey(de); key = createStringObject(keysds,sdslen(keysds)); @@ -760,7 +791,12 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { } /* Filter element if it is an expired key. */ - if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1; + if (!filter && + o == NULL && + expireIfNeededByName(c->db,kobj)) + { + filter = 1; + } /* Remove the element and its associted value if needed. */ if (filter) { @@ -819,7 +855,7 @@ void typeCommand(client *c) { robj *o; char *type; - o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH); + o = lookupKeyReadWithFlags(c->db,c->argv[1],NULL,LOOKUP_NOTOUCH); if (o == NULL) { type = "none"; } else { @@ -870,6 +906,7 @@ void shutdownCommand(client *c) { void renameGenericCommand(client *c, int nx) { robj *o; + rkey *key; long long expire; int samekey = 0; @@ -877,7 +914,7 @@ void renameGenericCommand(client *c, int nx) { * if the key exists, however we still return an error on unexisting key. */ if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1; - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL) + if ((o = lookupKeyWriteOrReply(c,c->argv[1],&key,shared.nokeyerr)) == NULL) return; if (samekey) { @@ -886,8 +923,8 @@ void renameGenericCommand(client *c, int nx) { } incrRefCount(o); - expire = getExpire(c->db,c->argv[1]); - if (lookupKeyWrite(c->db,c->argv[2]) != NULL) { + expire = getExpire(key); + if (lookupKeyWrite(c->db,c->argv[2],NULL) != NULL) { if (nx) { decrRefCount(o); addReply(c,shared.czero); @@ -897,8 +934,8 @@ void renameGenericCommand(client *c, int nx) { * with the same name. */ dbDelete(c->db,c->argv[2]); } - dbAdd(c->db,c->argv[2],o); - if (expire != -1) setExpire(c,c->db,c->argv[2],expire); + key = dbAdd(c->db,c->argv[2],o); + if (expire != -1) setExpire(c,c->db,key,expire); dbDelete(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[2]); @@ -951,20 +988,21 @@ void moveCommand(client *c) { } /* Check if the element exists and get a reference */ - o = lookupKeyWrite(c->db,c->argv[1]); + rkey *key; + o = lookupKeyWrite(c->db,c->argv[1],&key); if (!o) { addReply(c,shared.czero); return; } - expire = getExpire(c->db,c->argv[1]); + expire = getExpire(key); /* Return zero if the key already exists in the target DB */ - if (lookupKeyWrite(dst,c->argv[1]) != NULL) { + if (lookupKeyWrite(dst,c->argv[1],NULL) != NULL) { addReply(c,shared.czero); return; } - dbAdd(dst,c->argv[1],o); - if (expire != -1) setExpire(c,dst,c->argv[1],expire); + key = dbAdd(dst,c->argv[1],o); + if (expire != -1) setExpire(c,dst,key,expire); incrRefCount(o); /* OK! key moved, free the entry in the source DB */ @@ -982,7 +1020,7 @@ void scanDatabaseForReadyLists(redisDb *db) { dictIterator *di = dictGetSafeIterator(db->blocking_keys); while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); - robj *value = lookupKey(db,key,LOOKUP_NOTOUCH); + robj *value = lookupKey(db,key,NULL,LOOKUP_NOTOUCH); if (value && (value->type == OBJ_LIST || value->type == OBJ_STREAM || value->type == OBJ_ZSET)) @@ -1060,134 +1098,6 @@ void swapdbCommand(client *c) { } } -/*----------------------------------------------------------------------------- - * Expires API - *----------------------------------------------------------------------------*/ - -int removeExpire(redisDb *db, robj *key) { - /* An expire may only be removed if there is a corresponding entry in the - * main dict. Otherwise, the key will never be freed. */ - serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL); - return dictDelete(db->expires,key->ptr) == DICT_OK; -} - -/* Set an expire to the specified key. If the expire is set in the context - * of an user calling a command 'c' is the client, otherwise 'c' is set - * to NULL. The 'when' parameter is the absolute unix time in milliseconds - * after which the key will no longer be considered valid. */ -void setExpire(client *c, redisDb *db, robj *key, long long when) { - dictEntry *kde, *de; - - /* Reuse the sds from the main dict in the expire dict */ - kde = dictFind(db->dict,key->ptr); - serverAssertWithInfo(NULL,key,kde != NULL); - de = dictAddOrFind(db->expires,dictGetKey(kde)); - dictSetSignedIntegerVal(de,when); - - int writable_slave = server.masterhost && server.repl_slave_ro == 0; - if (c && writable_slave && !(c->flags & CLIENT_MASTER)) - rememberSlaveKeyWithExpire(db,key); -} - -/* Return the expire time of the specified key, or -1 if no expire - * is associated with this key (i.e. the key is non volatile) */ -long long getExpire(redisDb *db, robj *key) { - dictEntry *de; - - /* No expire? return ASAP */ - if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key->ptr)) == NULL) return -1; - - /* The entry was found in the expire dict, this means it should also - * be present in the main dict (safety check). */ - serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL); - return dictGetSignedIntegerVal(de); -} - -/* Propagate expires into slaves and the AOF file. - * When a key expires in the master, a DEL operation for this key is sent - * to all the slaves and the AOF file if enabled. - * - * This way the key expiry is centralized in one place, and since both - * AOF and the master->slave link guarantee operation ordering, everything - * will be consistent even if we allow write operations against expiring - * keys. */ -void propagateExpire(redisDb *db, robj *key, int lazy) { - robj *argv[2]; - - argv[0] = lazy ? shared.unlink : shared.del; - argv[1] = key; - incrRefCount(argv[0]); - incrRefCount(argv[1]); - - if (server.aof_state != AOF_OFF) - feedAppendOnlyFile(server.delCommand,db->id,argv,2); - replicationFeedSlaves(server.slaves,db->id,argv,2); - - decrRefCount(argv[0]); - decrRefCount(argv[1]); -} - -/* Check if the key is expired. */ -int keyIsExpired(redisDb *db, robj *key) { - mstime_t when = getExpire(db,key); - - if (when < 0) return 0; /* No expire for this key */ - - /* Don't expire anything while loading. It will be done later. */ - if (server.loading) return 0; - - /* If we are in the context of a Lua script, we pretend that time is - * blocked to when the Lua script started. This way a key can expire - * only the first time it is accessed and not in the middle of the - * script execution, making propagation to slaves / AOF consistent. - * See issue #1525 on Github for more information. */ - mstime_t now = server.lua_caller ? server.lua_time_start : mstime(); - - return now > when; -} - -/* This function is called when we are going to perform some operation - * in a given key, but such key may be already logically expired even if - * it still exists in the database. The main way this function is called - * is via lookupKey*() family of functions. - * - * The behavior of the function depends on the replication role of the - * instance, because slave instances do not expire keys, they wait - * for DELs from the master for consistency matters. However even - * slaves will try to have a coherent return value for the function, - * so that read commands executed in the slave side will be able to - * behave like if the key is expired even if still present (because the - * master has yet to propagate the DEL). - * - * In masters as a side effect of finding a key which is expired, such - * key will be evicted from the database. Also this may trigger the - * propagation of a DEL/UNLINK command in AOF / replication stream. - * - * The return value of the function is 0 if the key is still valid, - * otherwise the function returns 1 if the key is expired. */ -int expireIfNeeded(redisDb *db, robj *key) { - if (!keyIsExpired(db,key)) return 0; - - /* If we are running in the context of a slave, instead of - * evicting the expired key from the database, we return ASAP: - * the slave key expiration is controlled by the master that will - * send us synthesized DEL operations for expired keys. - * - * Still we try to return the right information to the caller, - * that is, 0 if we think the key should be still valid, 1 if - * we think the key is expired at this time. */ - if (server.masterhost != NULL) return 1; - - /* Delete the key */ - server.stat_expiredkeys++; - propagateExpire(db,key,server.lazyfree_lazy_expire); - notifyKeyspaceEvent(NOTIFY_EXPIRED, - "expired",key,db->id); - return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : - dbSyncDelete(db,key); -} - /* ----------------------------------------------------------------------------- * API to get key arguments from commands * ---------------------------------------------------------------------------*/ diff --git a/src/debug.c b/src/debug.c index 0c6b5630c..e4dc12f22 100644 --- a/src/debug.c +++ b/src/debug.c @@ -118,10 +118,10 @@ void mixStringObjectDigest(unsigned char *digest, robj *o) { * Note that this function does not reset the initial 'digest' passed, it * will continue mixing this object digest to anything that was already * present. */ -void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) { +void xorObjectDigest(rkey *key, unsigned char *digest, robj *o) { uint32_t aux = htonl(o->type); mixDigest(digest,&aux,sizeof(aux)); - long long expiretime = getExpire(db,keyobj); + long long expiretime = getExpire(key); char buf[128]; /* Save the key and associated value */ @@ -277,21 +277,19 @@ void computeDatasetDigest(unsigned char *final) { /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - sds key; - robj *keyobj, *o; + rkey *key; + robj *o; memset(digest,0,20); /* This key-val digest */ key = dictGetKey(de); - keyobj = createStringObject(key,sdslen(key)); - mixDigest(digest,key,sdslen(key)); + mixDigest(digest,key->name,key->len); o = dictGetVal(de); - xorObjectDigest(db,keyobj,digest,o); + xorObjectDigest(key,digest,o); /* We can finally xor the key-val digest to the final digest */ xorDigest(final,digest,20); - decrRefCount(keyobj); } dictReleaseIterator(di); } @@ -385,6 +383,7 @@ NULL addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) { dictEntry *de; + rkey *key; robj *val; char *strenc; @@ -392,6 +391,7 @@ NULL addReply(c,shared.nokeyerr); return; } + key = dictGetKey(de); val = dictGetVal(de); strenc = strEncoding(val->encoding); @@ -434,7 +434,7 @@ NULL "lru:%d lru_seconds_idle:%llu%s", (void*)val, val->refcount, strenc, rdbSavedObjectLen(val), - val->lru, estimateObjectIdleTime(val)/1000, extra); + key->lru, estimateObjectIdleTime(key)/1000, extra); } else if (!strcasecmp(c->argv[1]->ptr,"sdslen") && c->argc == 3) { dictEntry *de; robj *val; @@ -463,7 +463,7 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"ziplist") && c->argc == 3) { robj *o; - if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr)) + if ((o = objectCommandLookupOrReply(c,c->argv[2],NULL,shared.nokeyerr)) == NULL) return; if (o->encoding != OBJ_ENCODING_ZIPLIST) { @@ -489,7 +489,7 @@ NULL if (c->argc == 5) if (getLongFromObjectOrReply(c, c->argv[4], &valsize, NULL) != C_OK) return; - if (lookupKeyWrite(c->db,key) != NULL) { + if (lookupKeyWrite(c->db,key,NULL) != NULL) { decrRefCount(key); continue; } @@ -521,8 +521,10 @@ NULL for (int j = 2; j < c->argc; j++) { unsigned char digest[20]; memset(digest,0,20); /* Start with a clean result */ - robj *o = lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH); - if (o) xorObjectDigest(c->db,c->argv[j],digest,o); + rkey *key; + robj *o = lookupKeyReadWithFlags(c->db,c->argv[j],&key, + LOOKUP_NOTOUCH); + if (o) xorObjectDigest(key,digest,o); sds d = sdsempty(); for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]); @@ -634,16 +636,12 @@ NULL dictGetStats(buf,sizeof(buf),server.db[dbid].dict); stats = sdscat(stats,buf); - stats = sdscatprintf(stats,"[Expires HT]\n"); - dictGetStats(buf,sizeof(buf),server.db[dbid].expires); - stats = sdscat(stats,buf); - addReplyBulkSds(c,stats); } else if (!strcasecmp(c->argv[1]->ptr,"htstats-key") && c->argc == 3) { robj *o; dict *ht = NULL; - if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr)) + if ((o = objectCommandLookupOrReply(c,c->argv[2],NULL,shared.nokeyerr)) == NULL) return; /* Get the hash table reference from the object, if possible. */ diff --git a/src/defrag.c b/src/defrag.c index ecf0255dc..c578dad7c 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -775,7 +775,7 @@ long defragKey(redisDb *db, dictEntry *de) { /* Dirty code: * I can't search in db->expires for that key after i already released * the pointer it holds it won't be able to do the string compare */ - uint64_t hash = dictGetHash(db->dict, de->key); + uint64_t hash = dictGetEntryHash(db->dict, de); replaceSateliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged); } diff --git a/src/dict.c b/src/dict.c index 106467ef7..15821213a 100644 --- a/src/dict.c +++ b/src/dict.c @@ -206,7 +206,7 @@ int dictRehash(dict *d, int n) { nextde = de->next; /* Get the index in the new hash table */ - h = dictHashKey(d, de->key) & d->ht[1].sizemask; + h = dictHashStoredKey(d, de->key) & d->ht[1].sizemask; de->next = d->ht[1].table[h]; d->ht[1].table[h] = de; d->ht[0].used--; @@ -299,7 +299,7 @@ dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) /* Get the index of the new element, or -1 if * the element already exists. */ - if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1) + if ((index = _dictKeyIndex(d,key,dictHashStoredKey(d,key),existing)) == -1) return NULL; /* Allocate the memory and store the new entry. @@ -369,14 +369,14 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL; if (dictIsRehashing(d)) _dictRehashStep(d); - h = dictHashKey(d, key); + h = dictHashLookupKey(d, key); for (table = 0; table <= 1; table++) { idx = h & d->ht[table].sizemask; he = d->ht[table].table[idx]; prevHe = NULL; while(he) { - if (key==he->key || dictCompareKeys(d, key, he->key)) { + if (key==he->key || dictCompareLookupKeys(d, key, he->key)) { /* Unlink the element from the list */ if (prevHe) prevHe->next = he->next; @@ -480,12 +480,12 @@ dictEntry *dictFind(dict *d, const void *key) if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */ if (dictIsRehashing(d)) _dictRehashStep(d); - h = dictHashKey(d, key); + h = dictHashLookupKey(d, key); for (table = 0; table <= 1; table++) { idx = h & d->ht[table].sizemask; he = d->ht[table].table[idx]; while(he) { - if (key==he->key || dictCompareKeys(d, key, he->key)) + if (key==he->key || dictCompareLookupKeys(d, key, he->key)) return he; he = he->next; } @@ -998,7 +998,7 @@ static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **e /* Search if this slot does not already contain the given key */ he = d->ht[table].table[idx]; while(he) { - if (key==he->key || dictCompareKeys(d, key, he->key)) { + if (key==he->key || dictCompareStoredKeys(d, key, he->key)) { if (existing) *existing = he; return -1; } @@ -1024,15 +1024,28 @@ void dictDisableResize(void) { dict_can_resize = 0; } +/* Compute the hash of the specified key (using the lookup hash function) + * and returns it to the caller. This is useful in order to later call + * dictFindEntryRefByPtrAndHash(). */ uint64_t dictGetHash(dict *d, const void *key) { - return dictHashKey(d, key); + return dictHashLookupKey(d, key); +} + +/* Compute the hash of the specified dict entry (using the stored keys hash + * function) and returns it to the caller. This is useful in order to later + * call dictFindEntryRefByPtrAndHash(). */ +uint64_t dictGetEntryHash(dict *d, const dictEntry *de) { + return dictHashStoredKey(d, de->key); } /* Finds the dictEntry reference by using pointer and pre-calculated hash. * oldkey is a dead pointer and should not be accessed. - * the hash value should be provided using dictGetHash. - * no string / key comparison is performed. - * return value is the reference to the dictEntry if found, or NULL if not found. */ + * the hash value should be provided using dictGetHash() or dictGetEntryHash() + * depending on the object type (if the one we lookup the hash table with, or + * the one stored in the dict entry). + * No string / key comparison is performed. + * Return value is the reference to the dictEntry if found, or NULL if not + * found. */ dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) { dictEntry *he, **heref; unsigned long idx, table; @@ -1159,10 +1172,12 @@ void freeCallback(void *privdata, void *val) { } dictType BenchmarkDictType = { + hashCallback, hashCallback, NULL, NULL, compareCallback, + compareCallback, freeCallback, NULL }; diff --git a/src/dict.h b/src/dict.h index dec60f637..251062a53 100644 --- a/src/dict.h +++ b/src/dict.h @@ -44,6 +44,8 @@ /* Unused arguments generate annoying warnings... */ #define DICT_NOTUSED(V) ((void) V) +/* An entry (a key basically) as stored inside the hash table. Note that + * there is a 'next' pointer, since we use chaining to resolve conflicts. */ typedef struct dictEntry { void *key; union { @@ -55,11 +57,33 @@ typedef struct dictEntry { struct dictEntry *next; } dictEntry; +/* The hash table type. + * + * There are two hash functions both for key hashing and comparison, one set is + * used for looking up the key and the other set to store it. + * + * Most of the times each set will be the same function pointer, but sometimes + * we want the ability to store a key in a given way inside the hash function, + * and lookup it in some other way without resorting to any kind of conversion. + * For instance the key may be stored as a structure also representing other + * things, but the lookup happens via just a pointer to a null terminated + * string. The dual hash design allows for such usage. In that case we'll have + * a lookupHashFunction that will expect a null terminated C string, and a + * storeHashFunction that will instead expect the structure. + * Similarly the two comparison functions will work differently. The + * lookupKeyCompare will treat the first argument as a pointer to a C string + * and the other as a structure (this way we can directly lookup the structure + * key using the C string). While the storedKeyCompare() will check if two + * pointers to the key in structure form are the same. + * + * Every actual hash table have a pointer to its type. */ typedef struct dictType { - uint64_t (*hashFunction)(const void *key); + uint64_t (*lookupHashFunction)(const void *key); + uint64_t (*storedHashFunction)(const void *key); void *(*keyDup)(void *privdata, const void *key); void *(*valDup)(void *privdata, const void *obj); - int (*keyCompare)(void *privdata, const void *key1, const void *key2); + int (*lookupKeyCompare)(void *privdata, const void *key1, const void *key2); + int (*storedKeyCompare)(void *privdata, const void *key1, const void *key2); void (*keyDestructor)(void *privdata, void *key); void (*valDestructor)(void *privdata, void *obj); } dictType; @@ -132,12 +156,18 @@ typedef void (dictScanBucketFunction)(void *privdata, dictEntry **bucketref); (entry)->key = (_key_); \ } while(0) -#define dictCompareKeys(d, key1, key2) \ - (((d)->type->keyCompare) ? \ - (d)->type->keyCompare((d)->privdata, key1, key2) : \ +#define dictCompareLookupKeys(d, key1, key2) \ + (((d)->type->lookupKeyCompare) ? \ + (d)->type->lookupKeyCompare((d)->privdata, key1, key2) : \ + (key1) == (key2)) + +#define dictCompareStoredKeys(d, key1, key2) \ + (((d)->type->storedKeyCompare) ? \ + (d)->type->storedKeyCompare((d)->privdata, key1, key2) : \ (key1) == (key2)) -#define dictHashKey(d, key) (d)->type->hashFunction(key) +#define dictHashLookupKey(d, key) (d)->type->lookupHashFunction(key) +#define dictHashStoredKey(d, key) (d)->type->storedHashFunction(key) #define dictGetKey(he) ((he)->key) #define dictGetVal(he) ((he)->v.val) #define dictGetSignedIntegerVal(he) ((he)->v.s64) @@ -180,6 +210,7 @@ void dictSetHashFunctionSeed(uint8_t *seed); uint8_t *dictGetHashFunctionSeed(void); unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction *bucketfn, void *privdata); uint64_t dictGetHash(dict *d, const void *key); +uint64_t dictGetEntryHash(dict *d, const dictEntry *de); dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash); /* Hash table types */ diff --git a/src/evict.c b/src/evict.c index 176f4c362..22f278645 100644 --- a/src/evict.c +++ b/src/evict.c @@ -87,12 +87,12 @@ unsigned int LRU_CLOCK(void) { /* Given an object returns the min number of milliseconds the object was never * requested, using an approximated LRU algorithm. */ -unsigned long long estimateObjectIdleTime(robj *o) { +unsigned long long estimateObjectIdleTime(rkey *key) { unsigned long long lruclock = LRU_CLOCK(); - if (lruclock >= o->lru) { - return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION; + if (lruclock >= key->lru) { + return (lruclock - key->lru) * LRU_CLOCK_RESOLUTION; } else { - return (lruclock + (LRU_CLOCK_MAX - o->lru)) * + return (lruclock + (LRU_CLOCK_MAX - key->lru)) * LRU_CLOCK_RESOLUTION; } } @@ -166,7 +166,7 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples); for (j = 0; j < count; j++) { unsigned long long idle; - sds key; + rkey *key; robj *o; dictEntry *de; @@ -185,7 +185,7 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic * idle just because the code initially handled LRU, but is in fact * just a score where an higher score means better candidate. */ if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) { - idle = estimateObjectIdleTime(o); + idle = estimateObjectIdleTime(key); } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { /* When we use an LRU policy, we sort the keys by idle time * so that we expire keys starting from greater idle time. @@ -194,10 +194,10 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic * first. So inside the pool we put objects using the inverted * frequency subtracting the actual frequency to the maximum * frequency of 255. */ - idle = 255-LFUDecrAndReturn(o); + idle = 255-LFUDecrAndReturn(key); } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { /* In this case the sooner the expire the better. */ - idle = ULLONG_MAX - (long)dictGetVal(de); + idle = key->expire; } else { serverPanic("Unknown eviction policy in evictionPoolPopulate()"); } @@ -243,9 +243,9 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic * because allocating and deallocating this object is costly * (according to the profiler, not my fantasy. Remember: * premature optimizbla bla bla bla. */ - int klen = sdslen(key); + int klen = key->len; if (klen > EVPOOL_CACHED_SDS_SIZE) { - pool[k].key = sdsdup(key); + pool[k].key = sdsnewlen(key->name,key->len); } else { memcpy(pool[k].cached,key,klen+1); sdssetlen(pool[k].cached,klen); @@ -332,9 +332,9 @@ uint8_t LFULogIncr(uint8_t counter) { * This function is used in order to scan the dataset for the best object * to fit: as we check for the candidate, we incrementally decrement the * counter of the scanned objects if needed. */ -unsigned long LFUDecrAndReturn(robj *o) { - unsigned long ldt = o->lru >> 8; - unsigned long counter = o->lru & 255; +unsigned long LFUDecrAndReturn(rkey *k) { + unsigned long ldt = k->lru >> 8; + unsigned long counter = k->lru & 255; unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0; if (num_periods) counter = (num_periods > counter) ? 0 : counter - num_periods; @@ -488,8 +488,13 @@ int freeMemoryIfNeeded(void) { * every DB. */ for (i = 0; i < server.dbnum; i++) { db = server.db+i; + #if 0 dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ? db->dict : db->expires; + #else + dict = db->dict; + #warning "Implement the volatile policy correctly" + #endif if ((keys = dictSize(dict)) != 0) { evictionPoolPopulate(i, dict, db->dict, pool); total_keys += keys; @@ -506,8 +511,11 @@ int freeMemoryIfNeeded(void) { de = dictFind(server.db[pool[k].dbid].dict, pool[k].key); } else { + #warning "Implement the volatile policy correctly" + #if 0 de = dictFind(server.db[pool[k].dbid].expires, pool[k].key); + #endif } /* Remove the entry from the pool. */ @@ -538,8 +546,13 @@ int freeMemoryIfNeeded(void) { for (i = 0; i < server.dbnum; i++) { j = (++next_db) % server.dbnum; db = server.db+j; + #if 0 dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? db->dict : db->expires; + #else + #warning "Implement the volatile policy correctly" + dict = db->dict; + #endif if (dictSize(dict) != 0) { de = dictGetRandomKey(dict); bestkey = dictGetKey(de); diff --git a/src/expire.c b/src/expire.c index 0b92ee3fe..87c87bccd 100644 --- a/src/expire.c +++ b/src/expire.c @@ -38,9 +38,103 @@ * When keys are accessed they are expired on-access. However we need a * mechanism in order to ensure keys are eventually removed when expired even * if no access is performed on them. + * + * In order to accomplish this every key with an expire is represented in + * two data structures: + * + * 1. The main dictionary of keys, server.db[x]->dict, is an hash table that + * represents the keyspace of a given Redis database. The keys stored + * in the hash table are redisKey structures (typedef 'rkey'). When + * a key has an expire set, the key->flags have the KEY_FLAG_EXPIRE set, + * and the key->expire is populated with the milliseconds unix time at + * which the key will no longer be valid. + * + * 2. Redis also takes a radix tree that is composed only of keys that have + * an expire set, lexicographically sorted by the expire time. Basically + * each key in the radix tree is composed as follows: + * + * [8 bytes expire unix time][8 bytes key object pointer] + * + * Such tree is stored in server.db[x]->expire. + * + * The first field, the unix time, is the same stored in the key->expire of + * the corresponding key in the hash table, however it is stored in big endian + * so that sorting the time lexicographically in the tree, will also make the + * tree sorted by numerical expire time (from the smallest unix time to the + * greatest one). + * + * Then we store the key pointer, this time in native endianess, because how + * it is sorted does not matter, being after the unix time. If Redis is running + * as a 32 bit system, the last 4 bytes of the pointer are just zeroes, so + * we can assume a 16 bytes key in every architecture. Note that from the + * pointer we can retrieve the key name, lookup it in the main dictionary, and + * delete the key. + * + * On the other hand, when we modify the expire time of some key, we need to + * update the tree accordingly. At every expire cycle, what we need to do is + * conceptually very simple: we run the tree and expire keys as long as we + * find keys that are already logically expired (expire time > current time). + * *----------------------------------------------------------------------------*/ -/* Helper function for the activeExpireCycle() function. +#define EXPIRE_KEY_LEN 16 /* Key length in the radix tree of expires. */ + +/* Populate the buffer 'buf', that should be at least EXPIRE_KEY_LEN bytes, + * with the key to store such key in the expires radix tree. See the comment + * above to see the format. */ +void encodeExpireKey(unsigned char *buf, rkey *key) { + uint64_t expire = htonu64(key->expire); + uint64_t ptr = (uint64_t) key; /* The pointer may be 32 bit, cast to 64. */ + memcpy(buf,&expire,sizeof(expire)); + memcpy(buf+8,&ptr,sizeof(ptr)); +} + +/* This is the reverse of encodeExpireKey(): given the key will return a + * pointer to an rkey and the expire value. */ +void decodeExpireKey(unsigned char *buf, uint64_t *expireptr, rkey **keyptrptr) { + uint64_t expire; + uint64_t keyptr; + memcpy(&expire,buf,sizeof(expire)); + expire = ntohu64(expire); + memcpy(&keyptr,buf+8,sizeof(keyptr)); + *expireptr = expire; + *keyptrptr = (rkey*)(unsigned long)keyptr; +} + +/* Populate the expires radix tree with the specified key. */ +void addExpireToTree(redisDb *db, rkey *key) { + unsigned char expirekey[EXPIRE_KEY_LEN]; + encodeExpireKey(expirekey,key); + int retval = raxTryInsert(db->expires,expirekey,EXPIRE_KEY_LEN,NULL,NULL); + serverAssert(retval != 0); +} + +/* Remove the specified key from the expires radix tree. */ +void removeExpireFromTree(redisDb *db, rkey *key) { + unsigned char expirekey[EXPIRE_KEY_LEN]; + encodeExpireKey(expirekey,key); + int retval = raxRemove(db->expires,expirekey,EXPIRE_KEY_LEN,NULL); + serverAssert(retval != 0); +} + +/* Delete a key that is found expired by the expiration cycle. We need to + * propagate the key too, send the notification event, and take a few + * stats. */ +void deleteExpiredKey(redisDb *db, rkey *key) { + robj *keyname = createStringObject(key->name,key->len); + + propagateExpire(db,keyname,server.lazyfree_lazy_expire); + if (server.lazyfree_lazy_expire) + dbAsyncDelete(db,keyname); + else + dbSyncDelete(db,keyname); + notifyKeyspaceEvent(NOTIFY_EXPIRED, + "expired",keyname,db->id); + decrRefCount(keyname); + server.stat_expiredkeys++; +} + +/* Helper function for the expireSlaveKeys() function. * This function will try to expire the key that is stored in the hash table * entry 'de' of the 'expires' hash table of a Redis database. * @@ -51,21 +145,10 @@ * * The parameter 'now' is the current time in milliseconds as is passed * to the function to avoid too many gettimeofday() syscalls. */ -int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { - long long t = dictGetSignedIntegerVal(de); +int activeExpireCycleTryExpire(redisDb *db, rkey *key, long long now) { + long long t = key->expire; if (now > t) { - sds key = dictGetKey(de); - robj *keyobj = createStringObject(key,sdslen(key)); - - propagateExpire(db,keyobj,server.lazyfree_lazy_expire); - if (server.lazyfree_lazy_expire) - dbAsyncDelete(db,keyobj); - else - dbSyncDelete(db,keyobj); - notifyKeyspaceEvent(NOTIFY_EXPIRED, - "expired",keyobj,db->id); - decrRefCount(keyobj); - server.stat_expiredkeys++; + deleteExpiredKey(db,key); return 1; } else { return 0; @@ -101,7 +184,8 @@ void activeExpireCycle(int type) { static int timelimit_exit = 0; /* Time limit hit in previous call? */ static long long last_fast_cycle = 0; /* When last fast cycle ran. */ - int j, iteration = 0; + int j; + unsigned long iteration = 0; int dbs_per_call = CRON_DBS_PER_CALL; long long start = ustime(), timelimit, elapsed; @@ -129,25 +213,20 @@ void activeExpireCycle(int type) { if (dbs_per_call > server.dbnum || timelimit_exit) dbs_per_call = server.dbnum; - /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time - * per iteration. Since this function gets called with a frequency of - * server.hz times per second, the following is the max amount of - * microseconds we can spend in this function. */ + /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of + * CPU time per iteration. Since this function gets called with a + * frequency of server.hz times per second, the following is the max + * amount of microseconds we can spend in this function. */ timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; timelimit_exit = 0; if (timelimit <= 0) timelimit = 1; + /* If it's a fast cycle, override the time limit with our fixed + * time limit (defaults to 1 millisecond). */ if (type == ACTIVE_EXPIRE_CYCLE_FAST) timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ - /* Accumulate some global stats as we expire keys, to have some idea - * about the number of keys that are already logically expired, but still - * existing inside the database. */ - long total_sampled = 0; - long total_expired = 0; - for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { - int expired; redisDb *db = server.db+(current_db % server.dbnum); /* Increment the DB now so we are sure if we run out of time @@ -155,92 +234,59 @@ void activeExpireCycle(int type) { * distribute the time evenly across DBs. */ current_db++; - /* Continue to expire if at the end of the cycle more than 25% - * of the keys were expired. */ - do { - unsigned long num, slots; - long long now, ttl_sum; - int ttl_samples; + /* If there is nothing to expire try next DB ASAP, avoiding the + * cost of seeking the radix tree iterator. */ + if (raxSize(db->expires) == 0) continue; + + /* The main collection cycle. Run the tree and expire keys that + * are found to be already logically expired. */ + long long now = mstime(); + raxIterator ri; + raxStart(&ri,db->expires); + raxSeek(&ri,"^",NULL,0); + + /* Enter the loop expiring keys for this database. Inside this + * loop there are two stop conditions: + * + * 1. The time limit. + * 2. The loop will exit if in this DB there are no more keys + * that are logically expired. + * + * Moreover the loop naturally terminates when there are no longer + * elements in the radix tree. */ + while(raxNext(&ri)) { + rkey *key; + uint64_t expire; + decodeExpireKey(ri.key,&expire,&key); + + /* First stop condition: no keys to expire here. */ + if (expire >= (uint64_t)now) break; + + printf("DEL %.*s -> %llu\n", (int)key->len, key->name, expire); + deleteExpiredKey(db,key); + + /* Second stop condition: the time limit. */ iteration++; - - /* If there is nothing to expire try next DB ASAP. */ - if ((num = dictSize(db->expires)) == 0) { - db->avg_ttl = 0; - break; - } - slots = dictSlots(db->expires); - now = mstime(); - - /* When there are less than 1% filled slots getting random - * keys is expensive, so stop here waiting for better times... - * The dictionary will be resized asap. */ - if (num && slots > DICT_HT_INITIAL_SIZE && - (num*100/slots < 1)) break; - - /* The main collection cycle. Sample random keys among keys - * with an expire set, checking for expired ones. */ - expired = 0; - ttl_sum = 0; - ttl_samples = 0; - - if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) - num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; - - while (num--) { - dictEntry *de; - long long ttl; - - if ((de = dictGetRandomKey(db->expires)) == NULL) break; - ttl = dictGetSignedIntegerVal(de)-now; - if (activeExpireCycleTryExpire(db,de,now)) expired++; - if (ttl > 0) { - /* We want the average TTL of keys yet not expired. */ - ttl_sum += ttl; - ttl_samples++; - } - total_sampled++; - } - total_expired += expired; - - /* Update the average TTL stats for this database. */ - if (ttl_samples) { - long long avg_ttl = ttl_sum/ttl_samples; - - /* Do a simple running average with a few samples. - * We just use the current estimate with a weight of 2% - * and the previous estimate with a weight of 98%. */ - if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; - db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); - } - - /* We can't block forever here even if there are many keys to - * expire. So after a given amount of milliseconds return to the - * caller waiting for the other active expire cycle. */ - if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ - elapsed = ustime()-start; + if ((iteration & 0xff) == 0) { + now = ustime(); + elapsed = now-start; + now /= 1000; /* Convert back now to milliseconds. */ if (elapsed > timelimit) { timelimit_exit = 1; + printf("LIMIT (%llu) type:%d [elapsed=%llu]\n", timelimit, type, elapsed); server.stat_expired_time_cap_reached_count++; break; } } - /* We don't repeat the cycle if there are less than 25% of keys - * found expired in the current DB. */ - } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); + /* Reseek the iterator: the node we were on is now + * deleted. */ + raxSeek(&ri,"^",NULL,0); + } + raxStop(&ri); } elapsed = ustime()-start; latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); - - /* Update our estimate of keys existing but yet to be expired. - * Running average with this sample accounting for 5%. */ - double current_perc; - if (total_sampled) { - current_perc = (double)total_expired/total_sampled; - } else - current_perc = 0; - server.stat_expired_stale_perc = (current_perc*0.05)+ - (server.stat_expired_stale_perc*0.95); } /*----------------------------------------------------------------------------- @@ -269,8 +315,8 @@ void activeExpireCycle(int type) { /* The dictionary where we remember key names and database ID of keys we may * want to expire from the slave. Since this function is not often used we * don't even care to initialize the database at startup. We'll do it once - * the feature is used the first time, that is, when rememberSlaveKeyWithExpire() - * is called. + * the feature is used the first time, that is, when the function + * rememberSlaveKeyWithExpire() is called. * * The dictionary has an SDS string representing the key as the hash table * key, while the value is a 64 bit unsigned integer with the bits corresponding @@ -300,11 +346,12 @@ void expireSlaveKeys(void) { while(dbids && dbid < server.dbnum) { if ((dbids & 1) != 0) { redisDb *db = server.db+dbid; - dictEntry *expire = dictFind(db->expires,keyname); + rkey *key = dictFetchValue(db->dict,keyname); + if (!(key->flags & KEY_FLAG_EXPIRE)) key = NULL; int expired = 0; - if (expire && - activeExpireCycleTryExpire(server.db+dbid,expire,start)) + if (key && + activeExpireCycleTryExpire(server.db+dbid,key,start)) { expired = 1; } @@ -313,7 +360,7 @@ void expireSlaveKeys(void) { * corresponding bit in the new bitmap we set as value. * At the end of the loop if the bitmap is zero, it means we * no longer need to keep track of this key. */ - if (expire && !expired) { + if (key && !expired) { noexpire++; new_dbids |= (uint64_t)1 << dbid; } @@ -341,13 +388,15 @@ void expireSlaveKeys(void) { /* Track keys that received an EXPIRE or similar command in the context * of a writable slave. */ -void rememberSlaveKeyWithExpire(redisDb *db, robj *key) { +void rememberSlaveKeyWithExpire(redisDb *db, rkey *key) { if (slaveKeysWithExpire == NULL) { static dictType dt = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ + dictSdsKeyCompare, /* loopkup key compare */ + dictSdsKeyCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; @@ -355,13 +404,15 @@ void rememberSlaveKeyWithExpire(redisDb *db, robj *key) { } if (db->id > 63) return; - dictEntry *de = dictAddOrFind(slaveKeysWithExpire,key->ptr); - /* If the entry was just created, set it to a copy of the SDS string - * representing the key: we don't want to need to take those keys - * in sync with the main DB. The keys will be removed by expireSlaveKeys() - * as it scans to find keys to remove. */ - if (de->key == key->ptr) { - de->key = sdsdup(key->ptr); + sds skey = sdsnewlen(key->name,key->len); + dictEntry *de = dictAddOrFind(slaveKeysWithExpire,skey); + /* If the entry was already there, free the SDS string we used to lookup. + * Note that we don't care to take those keys in sync with the + * main DB. The keys will be removed by expireSlaveKeys() as it scans to + * find keys to remove. */ + if (de->key != skey) { + sdsfree(skey); + } else { dictSetUnsignedIntegerVal(de,0); } @@ -391,6 +442,137 @@ void flushSlaveKeysWithExpireList(void) { } } +/*----------------------------------------------------------------------------- + * Expires API + *----------------------------------------------------------------------------*/ + +/* Remove the expire from the key making it persistent. */ +int removeExpire(redisDb *db, rkey *key) { + if (!(key->flags & KEY_FLAG_EXPIRE)) return 0; + removeExpireFromTree(db,key); + key->flags &= ~KEY_FLAG_EXPIRE; + key->expire = 0; /* Not needed but better to leave the object clean. */ + return 1; +} + +/* Set an expire to the specified key. If the expire is set in the context + * of an user calling a command 'c' is the client, otherwise 'c' is set + * to NULL. The 'when' parameter is the absolute unix time in milliseconds + * after which the key will no longer be considered valid. */ +void setExpire(client *c, redisDb *db, rkey *key, long long when) { + /* Reuse the sds from the main dict in the expire dict */ + if (key->flags & KEY_FLAG_EXPIRE) removeExpireFromTree(db,key); + key->flags |= KEY_FLAG_EXPIRE; + key->expire = when; + addExpireToTree(db,key); + + int writable_slave = server.masterhost && server.repl_slave_ro == 0; + if (c && writable_slave && !(c->flags & CLIENT_MASTER)) + rememberSlaveKeyWithExpire(db,key); +} + +/* Return the expire time of the specified key, or -1 if no expire + * is associated with this key (i.e. the key is non volatile) */ +long long getExpire(rkey *key) { + return (key->flags & KEY_FLAG_EXPIRE) ? key->expire : -1; +} + +/* Propagate expires into slaves and the AOF file. + * When a key expires in the master, a DEL operation for this key is sent + * to all the slaves and the AOF file if enabled. + * + * This way the key expiry is centralized in one place, and since both + * AOF and the master->slave link guarantee operation ordering, everything + * will be consistent even if we allow write operations against expiring + * keys. */ +void propagateExpire(redisDb *db, robj *key, int lazy) { + robj *argv[2]; + + argv[0] = lazy ? shared.unlink : shared.del; + argv[1] = key; + incrRefCount(argv[0]); + incrRefCount(argv[1]); + + if (server.aof_state != AOF_OFF) + feedAppendOnlyFile(server.delCommand,db->id,argv,2); + replicationFeedSlaves(server.slaves,db->id,argv,2); + + decrRefCount(argv[0]); + decrRefCount(argv[1]); +} + +/* Check if the key is expired. */ +int keyIsExpired(rkey *key) { + mstime_t when = getExpire(key); + + if (when < 0) return 0; /* No expire for this key */ + + /* Don't expire anything while loading. It will be done later. */ + if (server.loading) return 0; + + /* If we are in the context of a Lua script, we pretend that time is + * blocked to when the Lua script started. This way a key can expire + * only the first time it is accessed and not in the middle of the + * script execution, making propagation to slaves / AOF consistent. + * See issue #1525 on Github for more information. */ + mstime_t now = server.lua_caller ? server.lua_time_start : mstime(); + + return now > when; +} + +/* This function is called when we are going to perform some operation + * in a given key, but such key may be already logically expired even if + * it still exists in the database. The main way this function is called + * is via lookupKey*() family of functions. + * + * The behavior of the function depends on the replication role of the + * instance, because slave instances do not expire keys, they wait + * for DELs from the master for consistency matters. However even + * slaves will try to have a coherent return value for the function, + * so that read commands executed in the slave side will be able to + * behave like if the key is expired even if still present (because the + * master has yet to propagate the DEL). + * + * In masters as a side effect of finding a key which is expired, such + * key will be evicted from the database. Also this may trigger the + * propagation of a DEL/UNLINK command in AOF / replication stream. + * + * The return value of the function is 0 if the key is still valid, + * otherwise the function returns 1 if the key is expired. */ +int expireIfNeeded(redisDb *db, robj *keyname, rkey *key) { + if (!keyIsExpired(key)) return 0; + + /* If we are running in the context of a slave, instead of + * evicting the expired key from the database, we return ASAP: + * the slave key expiration is controlled by the master that will + * send us synthesized DEL operations for expired keys. + * + * Still we try to return the right information to the caller, + * that is, 0 if we think the key should be still valid, 1 if + * we think the key is expired at this time. */ + if (server.masterhost != NULL) return 1; + + /* Delete the key */ + server.stat_expiredkeys++; + propagateExpire(db,keyname,server.lazyfree_lazy_expire); + notifyKeyspaceEvent(NOTIFY_EXPIRED, + "expired",keyname,db->id); + return server.lazyfree_lazy_expire ? dbAsyncDelete(db,keyname) : + dbSyncDelete(db,keyname); +} + +/* Sometimes we have just the name of the key, because we have still to + * lookup it. In such cases this function is more handy compared to + * expireIfNeeded(): just a wrapper performing the lookup first. */ +int expireIfNeededByName(redisDb *db, robj *keyname) { + rkey *key; + robj *val = lookupKey(db,keyname,&key,LOOKUP_NOTOUCH); + if (!val) return 0; + return expireIfNeeded(db,keyname,key); +} + + + /*----------------------------------------------------------------------------- * Expires Commands *----------------------------------------------------------------------------*/ @@ -403,7 +585,8 @@ void flushSlaveKeysWithExpireList(void) { * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for * the argv[2] parameter. The basetime is always specified in milliseconds. */ void expireGenericCommand(client *c, long long basetime, int unit) { - robj *key = c->argv[1], *param = c->argv[2]; + robj *keyname = c->argv[1], *param = c->argv[2]; + rkey *key; long long when; /* unix time in milliseconds when the key will expire. */ if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK) @@ -413,7 +596,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) { when += basetime; /* No key, return zero. */ - if (lookupKeyWrite(c->db,key) == NULL) { + if (lookupKeyWrite(c->db,keyname,&key) == NULL) { addReply(c,shared.czero); return; } @@ -427,23 +610,24 @@ void expireGenericCommand(client *c, long long basetime, int unit) { if (when <= mstime() && !server.loading && !server.masterhost) { robj *aux; - int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) : - dbSyncDelete(c->db,key); - serverAssertWithInfo(c,key,deleted); + int deleted = server.lazyfree_lazy_expire ? + dbAsyncDelete(c->db,keyname) : + dbSyncDelete(c->db,keyname); + serverAssertWithInfo(c,keyname,deleted); server.dirty++; /* Replicate/AOF this as an explicit DEL or UNLINK. */ aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del; - rewriteClientCommandVector(c,2,aux,key); - signalModifiedKey(c->db,key); - notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); + rewriteClientCommandVector(c,2,aux,keyname); + signalModifiedKey(c->db,keyname); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",keyname,c->db->id); addReply(c, shared.cone); return; } else { setExpire(c,c->db,key,when); addReply(c,shared.cone); - signalModifiedKey(c->db,key); - notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); + signalModifiedKey(c->db,keyname); + notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",keyname,c->db->id); server.dirty++; return; } @@ -472,15 +656,16 @@ void pexpireatCommand(client *c) { /* Implements TTL and PTTL */ void ttlGenericCommand(client *c, int output_ms) { long long expire, ttl = -1; + rkey *key; /* If the key does not exist at all, return -2 */ - if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == NULL) { + if (lookupKeyReadWithFlags(c->db,c->argv[1],&key,LOOKUP_NOTOUCH) == NULL) { addReplyLongLong(c,-2); return; } /* The key exists. Return -1 if it has no expire, or the actual * TTL value otherwise. */ - expire = getExpire(c->db,c->argv[1]); + expire = getExpire(key); if (expire != -1) { ttl = expire-mstime(); if (ttl < 0) ttl = 0; @@ -504,8 +689,9 @@ void pttlCommand(client *c) { /* PERSIST key */ void persistCommand(client *c) { - if (lookupKeyWrite(c->db,c->argv[1])) { - if (removeExpire(c->db,c->argv[1])) { + rkey *key; + if (lookupKeyWrite(c->db,c->argv[1],&key)) { + if (removeExpire(c->db,key)) { addReply(c,shared.cone); server.dirty++; } else { @@ -520,7 +706,7 @@ void persistCommand(client *c) { void touchCommand(client *c) { int touched = 0; for (int j = 1; j < c->argc; j++) - if (lookupKeyRead(c->db,c->argv[j]) != NULL) touched++; + if (lookupKeyRead(c->db,c->argv[j],NULL) != NULL) touched++; addReplyLongLong(c,touched); } diff --git a/src/geo.c b/src/geo.c index 826d11ff5..54d9c6d0f 100644 --- a/src/geo.c +++ b/src/geo.c @@ -466,8 +466,8 @@ void georadiusGeneric(client *c, int flags) { /* Look up the requested zset */ robj *zobj = NULL; - if ((zobj = lookupKeyReadOrReply(c, key, shared.null[c->resp])) == NULL || - checkType(c, zobj, OBJ_ZSET)) { + if ((zobj = lookupKeyReadOrReply(c, key, NULL, shared.null[c->resp])) + == NULL || checkType(c, zobj, OBJ_ZSET)) { return; } @@ -701,7 +701,7 @@ void geohashCommand(client *c) { int j; /* Look up the requested zset */ - robj *zobj = lookupKeyRead(c->db, c->argv[1]); + robj *zobj = lookupKeyRead(c->db, c->argv[1], NULL); if (zobj && checkType(c, zobj, OBJ_ZSET)) return; /* Geohash elements one after the other, using a null bulk reply for @@ -754,7 +754,7 @@ void geoposCommand(client *c) { int j; /* Look up the requested zset */ - robj *zobj = lookupKeyRead(c->db, c->argv[1]); + robj *zobj = lookupKeyRead(c->db, c->argv[1], NULL); if (zobj && checkType(c, zobj, OBJ_ZSET)) return; /* Report elements one after the other, using a null bulk reply for @@ -797,7 +797,7 @@ void geodistCommand(client *c) { /* Look up the requested zset */ robj *zobj = NULL; - if ((zobj = lookupKeyReadOrReply(c, c->argv[1], shared.null[c->resp])) + if ((zobj = lookupKeyReadOrReply(c, c->argv[1], NULL, shared.null[c->resp])) == NULL || checkType(c, zobj, OBJ_ZSET)) return; /* Get the scores. We need both otherwise NULL is returned. */ diff --git a/src/gopher.c b/src/gopher.c index 38e44f754..6e4485cb3 100644 --- a/src/gopher.c +++ b/src/gopher.c @@ -50,7 +50,7 @@ void addReplyGopherItem(client *c, const char *type, const char *descr, * protocol. */ void processGopherRequest(client *c) { robj *keyname = c->argc == 0 ? createStringObject("/",1) : c->argv[0]; - robj *o = lookupKeyRead(c->db,keyname); + robj *o = lookupKeyRead(c->db,keyname,NULL); /* If there is no such key, return with a Gopher error. */ if (o == NULL || o->type != OBJ_STRING) { diff --git a/src/hyperloglog.c b/src/hyperloglog.c index e01ea6042..161ae9457 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -1179,7 +1179,7 @@ invalid: /* PFADD var ele ele ele ... ele => :0 or :1 */ void pfaddCommand(client *c) { - robj *o = lookupKeyWrite(c->db,c->argv[1]); + robj *o = lookupKeyWrite(c->db,c->argv[1],NULL); struct hllhdr *hdr; int updated = 0, j; @@ -1238,7 +1238,7 @@ void pfcountCommand(client *c) { registers = max + HLL_HDR_SIZE; for (j = 1; j < c->argc; j++) { /* Check type and size. */ - robj *o = lookupKeyRead(c->db,c->argv[j]); + robj *o = lookupKeyRead(c->db,c->argv[j],NULL); if (o == NULL) continue; /* Assume empty HLL for non existing var.*/ if (isHLLObjectOrReply(c,o) != C_OK) return; @@ -1259,7 +1259,7 @@ void pfcountCommand(client *c) { * * The user specified a single key. Either return the cached value * or compute one and update the cache. */ - o = lookupKeyWrite(c->db,c->argv[1]); + o = lookupKeyWrite(c->db,c->argv[1],NULL); if (o == NULL) { /* No key? Cardinality is zero since no element was added, otherwise * we would have a key as HLLADD creates it as a side effect. */ @@ -1320,7 +1320,7 @@ void pfmergeCommand(client *c) { memset(max,0,sizeof(max)); for (j = 1; j < c->argc; j++) { /* Check type and size. */ - robj *o = lookupKeyRead(c->db,c->argv[j]); + robj *o = lookupKeyRead(c->db,c->argv[j],NULL); if (o == NULL) continue; /* Assume empty HLL for non existing var. */ if (isHLLObjectOrReply(c,o) != C_OK) return; @@ -1338,7 +1338,7 @@ void pfmergeCommand(client *c) { } /* Create / unshare the destination key's value if needed. */ - robj *o = lookupKeyWrite(c->db,c->argv[1]); + robj *o = lookupKeyWrite(c->db,c->argv[1],NULL); if (o == NULL) { /* Create the key with a string value of the exact length to * hold our HLL data structure. sdsnewlen() when NULL is passed @@ -1497,7 +1497,7 @@ void pfdebugCommand(client *c) { robj *o; int j; - o = lookupKeyWrite(c->db,c->argv[2]); + o = lookupKeyWrite(c->db,c->argv[2],NULL); if (o == NULL) { addReplyError(c,"The specified key does not exist"); return; diff --git a/src/latency.c b/src/latency.c index 33aa1245b..072b24ca6 100644 --- a/src/latency.c +++ b/src/latency.c @@ -48,10 +48,12 @@ uint64_t dictStringHash(const void *key) { void dictVanillaFree(void *privdata, void *val); dictType latencyTimeSeriesDictType = { - dictStringHash, /* hash function */ + dictStringHash, /* lookup hash function */ + dictStringHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictStringKeyCompare, /* key compare */ + dictStringKeyCompare, /* lookup key compare */ + dictStringKeyCompare, /* stored key compare */ dictVanillaFree, /* key destructor */ dictVanillaFree /* val destructor */ }; diff --git a/src/lazyfree.c b/src/lazyfree.c index 3d3159c90..2e0bdfd42 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -51,19 +51,19 @@ size_t lazyfreeGetFreeEffort(robj *obj) { * a lazy free list instead of being freed synchronously. The lazy free list * will be reclaimed in a different bio.c thread. */ #define LAZYFREE_THRESHOLD 64 -int dbAsyncDelete(redisDb *db, robj *key) { - /* Deleting an entry from the expires dict will not free the sds of - * the key, because it is shared with the main dictionary. */ - if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); - +int dbAsyncDelete(redisDb *db, robj *keyname) { /* If the value is composed of a few allocations, to free in a lazy way * is actually just slower... So under a certain limit we just free * the object synchronously. */ - dictEntry *de = dictUnlink(db->dict,key->ptr); + dictEntry *de = dictUnlink(db->dict,keyname->ptr); if (de) { + rkey *key = dictGetKey(de); robj *val = dictGetVal(de); size_t free_effort = lazyfreeGetFreeEffort(val); + /* Remove the entry from the expire tree since this is fast anyway. */ + if (key->flags & KEY_FLAG_EXPIRE) removeExpireFromTree(db,key); + /* If releasing the object is too much work, do it in the background * by adding the object to the lazy free list. * Note that if the object is shared, to reclaim it now it is not @@ -83,7 +83,7 @@ int dbAsyncDelete(redisDb *db, robj *key) { * field to NULL in order to lazy free it later. */ if (de) { dictFreeUnlinkedEntry(db->dict,de); - if (server.cluster_enabled) slotToKeyDel(key); + if (server.cluster_enabled) slotToKeyDel(keyname); return 1; } else { return 0; @@ -105,11 +105,11 @@ void freeObjAsync(robj *o) { * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ void emptyDbAsync(redisDb *db) { - dict *oldht1 = db->dict, *oldht2 = db->expires; + void *hashtable = db->dict, *tree = db->expires; db->dict = dictCreate(&dbDictType,NULL); - db->expires = dictCreate(&keyptrDictType,NULL); - atomicIncr(lazyfree_objects,dictSize(oldht1)); - bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2); + db->expires = raxNew(); + atomicIncr(lazyfree_objects,dictSize((dict*)hashtable)); + bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,hashtable,tree); } /* Empty the slots-keys map of Redis CLuster by creating a new empty one @@ -136,10 +136,10 @@ void lazyfreeFreeObjectFromBioThread(robj *o) { * when the database was logically deleted. 'sl' is a skiplist used by * Redis Cluster in order to take the hash slots -> keys mapping. This * may be NULL if Redis Cluster is disabled. */ -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) { - size_t numkeys = dictSize(ht1); - dictRelease(ht1); - dictRelease(ht2); +void lazyfreeFreeDatabaseFromBioThread(dict *hashtable, rax *tree) { + size_t numkeys = dictSize(hashtable); + dictRelease(hashtable); + raxFree(tree); atomicDecr(lazyfree_objects,numkeys); } diff --git a/src/module.c b/src/module.c index 7dee7e776..b5040a67b 100644 --- a/src/module.c +++ b/src/module.c @@ -148,7 +148,8 @@ typedef struct RedisModuleCtx RedisModuleCtx; struct RedisModuleKey { RedisModuleCtx *ctx; redisDb *db; - robj *key; /* Key name object. */ + rkey *keyobj; /* Key object stored at the dictionary. */ + robj *keyname; /* Key name object. */ robj *value; /* Value object, or NULL if the key was not found. */ void *iter; /* Iterator. */ int mode; /* Opening mode. */ @@ -436,7 +437,7 @@ int moduleCreateEmptyKey(RedisModuleKey *key, int type) { break; default: return REDISMODULE_ERR; } - dbAdd(key->db,key->key,obj); + key->keyobj = dbAdd(key->db,key->keyname,obj); key->value = obj; return REDISMODULE_OK; } @@ -465,7 +466,7 @@ int moduleDelKeyIfEmpty(RedisModuleKey *key) { } if (isempty) { - dbDelete(key->db,key->key); + dbDelete(key->db,key->keyname); key->value = NULL; return 1; } else { @@ -1519,11 +1520,12 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) { void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { RedisModuleKey *kp; robj *value; + rkey *keyobj; if (mode & REDISMODULE_WRITE) { - value = lookupKeyWrite(ctx->client->db,keyname); + value = lookupKeyWrite(ctx->client->db,keyname,&keyobj); } else { - value = lookupKeyRead(ctx->client->db,keyname); + value = lookupKeyRead(ctx->client->db,keyname,&keyobj); if (value == NULL) { return NULL; } @@ -1533,8 +1535,9 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { kp = zmalloc(sizeof(*kp)); kp->ctx = ctx; kp->db = ctx->client->db; - kp->key = keyname; + kp->keyname = keyname; incrRefCount(keyname); + kp->keyobj = keyobj; kp->value = value; kp->iter = NULL; kp->mode = mode; @@ -1546,10 +1549,10 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { /* Close a key handle. */ void RM_CloseKey(RedisModuleKey *key) { if (key == NULL) return; - if (key->mode & REDISMODULE_WRITE) signalModifiedKey(key->db,key->key); + if (key->mode & REDISMODULE_WRITE) signalModifiedKey(key->db,key->keyname); /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */ RM_ZsetRangeStop(key); - decrRefCount(key->key); + decrRefCount(key->keyname); autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key); zfree(key); } @@ -1595,13 +1598,13 @@ size_t RM_ValueLength(RedisModuleKey *key) { int RM_DeleteKey(RedisModuleKey *key) { if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR; if (key->value) { - dbDelete(key->db,key->key); + dbDelete(key->db,key->keyname); key->value = NULL; } return REDISMODULE_OK; } -/* If the key is open for writing, unlink it (that is delete it in a +/* If the key is open for writing, unlink it (that is delete it in a * non-blocking way, not reclaiming memory immediately) and setup the key to * accept new writes as an empty key (that will be created on demand). * On success REDISMODULE_OK is returned. If the key is not open for @@ -1609,7 +1612,7 @@ int RM_DeleteKey(RedisModuleKey *key) { int RM_UnlinkKey(RedisModuleKey *key) { if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR; if (key->value) { - dbAsyncDelete(key->db,key->key); + dbAsyncDelete(key->db,key->keyname); key->value = NULL; } return REDISMODULE_OK; @@ -1619,7 +1622,7 @@ int RM_UnlinkKey(RedisModuleKey *key) { * If no TTL is associated with the key or if the key is empty, * REDISMODULE_NO_EXPIRE is returned. */ mstime_t RM_GetExpire(RedisModuleKey *key) { - mstime_t expire = getExpire(key->db,key->key); + mstime_t expire = getExpire(key->keyobj); if (expire == -1 || key->value == NULL) return -1; expire -= mstime(); return expire >= 0 ? expire : 0; @@ -1639,9 +1642,9 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { return REDISMODULE_ERR; if (expire != REDISMODULE_NO_EXPIRE) { expire += mstime(); - setExpire(key->ctx->client,key->db,key->key,expire); + setExpire(key->ctx->client,key->db,key->keyobj,expire); } else { - removeExpire(key->db,key->key); + removeExpire(key->db,key->keyobj); } return REDISMODULE_OK; } @@ -1657,7 +1660,7 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) { if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR; RM_DeleteKey(key); - setKey(key->db,key->key,str); + setKey(key->db,key->keyname,str); key->value = str; return REDISMODULE_OK; } @@ -1707,7 +1710,7 @@ char *RM_StringDMA(RedisModuleKey *key, size_t *len, int mode) { /* For write access, and even for read access if the object is encoded, * we unshare the string (that has the side effect of decoding it). */ if ((mode & REDISMODULE_WRITE) || key->value->encoding != OBJ_ENCODING_RAW) - key->value = dbUnshareStringValue(key->db, key->key, key->value); + key->value = dbUnshareStringValue(key->db, key->keyname, key->value); *len = sdslen(key->value->ptr); return key->value->ptr; @@ -1737,12 +1740,12 @@ int RM_StringTruncate(RedisModuleKey *key, size_t newlen) { if (key->value == NULL) { /* Empty key: create it with the new size. */ robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen)); - setKey(key->db,key->key,o); + setKey(key->db,key->keyname,o); key->value = o; decrRefCount(o); } else { /* Unshare and resize. */ - key->value = dbUnshareStringValue(key->db, key->key, key->value); + key->value = dbUnshareStringValue(key->db, key->keyname, key->value); size_t curlen = sdslen(key->value->ptr); if (newlen > curlen) { key->value->ptr = sdsgrowzero(key->value->ptr,newlen); @@ -3088,7 +3091,7 @@ int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) { if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR; RM_DeleteKey(key); robj *o = createModuleObject(mt,value); - setKey(key->db,key->key,o); + setKey(key->db,key->keyname,o); decrRefCount(o); key->value = o; return REDISMODULE_OK; @@ -5031,7 +5034,8 @@ int dictCStringKeyCompare(void *privdata, const void *key1, const void *key2) { } dictType moduleAPIDictType = { - dictCStringKeyHash, /* hash function */ + dictCStringKeyHash, /* lookup hash function */ + dictCStringKeyHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ dictCStringKeyCompare, /* key compare */ diff --git a/src/object.c b/src/object.c index 234e11f8a..4d0eeeca6 100644 --- a/src/object.c +++ b/src/object.c @@ -44,14 +44,6 @@ robj *createObject(int type, void *ptr) { o->encoding = OBJ_ENCODING_RAW; o->ptr = ptr; o->refcount = 1; - - /* Set the LRU to the current lruclock (minutes resolution), or - * alternatively the LFU counter. */ - if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { - o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; - } else { - o->lru = LRU_CLOCK(); - } return o; } @@ -89,11 +81,6 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { o->encoding = OBJ_ENCODING_EMBSTR; o->ptr = sh+1; o->refcount = 1; - if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { - o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; - } else { - o->lru = LRU_CLOCK(); - } sh->len = len; sh->alloc = len; @@ -1052,8 +1039,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mh->db[mh->num_dbs].overhead_ht_main = mem; mem_total+=mem; - mem = dictSize(db->expires) * sizeof(dictEntry) + - dictSlots(db->expires) * sizeof(dictEntry*); + #warning "Fix the memory computation here with expires overhead" mh->db[mh->num_dbs].overhead_ht_expires = mem; mem_total+=mem; @@ -1202,12 +1188,12 @@ sds getMemoryDoctorReport(void) { * The lru_idle and lru_clock args are only relevant if policy * is MAXMEMORY_FLAG_LRU. * Either or both of them may be <0, in that case, nothing is set. */ -void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, +void objectSetLRUOrLFU(rkey *key, long long lfu_freq, long long lru_idle, long long lru_clock) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { if (lfu_freq >= 0) { serverAssert(lfu_freq <= 255); - val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq; + key->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq; } } else if (lru_idle >= 0) { /* Provided LRU idle time is in seconds. Scale @@ -1223,7 +1209,7 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, * some time. */ if (lru_abs < 0) lru_abs = (lru_clock+(LRU_CLOCK_MAX/2)) % LRU_CLOCK_MAX; - val->lru = lru_abs; + key->lru = lru_abs; } } @@ -1231,15 +1217,16 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, /* This is a helper function for the OBJECT command. We need to lookup keys * without any modification of LRU or other parameters. */ -robj *objectCommandLookup(client *c, robj *key) { +robj *objectCommandLookup(client *c, robj *keyname, rkey **key) { dictEntry *de; - if ((de = dictFind(c->db->dict,key->ptr)) == NULL) return NULL; + if ((de = dictFind(c->db->dict,keyname->ptr)) == NULL) return NULL; + if (key) *key = dictGetKey(de); return (robj*) dictGetVal(de); } -robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply) { - robj *o = objectCommandLookup(c,key); +robj *objectCommandLookupOrReply(client *c, robj *keyname, rkey **key, robj *reply) { + robj *o = objectCommandLookup(c,keyname,key); if (!o) addReply(c, reply); return o; @@ -1260,24 +1247,26 @@ NULL }; addReplyHelp(c, help); } else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) { - if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) - == NULL) return; + if ((o = objectCommandLookupOrReply(c,c->argv[2],NULL, + shared.null[c->resp])) == NULL) return; addReplyLongLong(c,o->refcount); } else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) { - if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) - == NULL) return; + if ((o = objectCommandLookupOrReply(c,c->argv[2],NULL, + shared.null[c->resp])) == NULL) return; addReplyBulkCString(c,strEncoding(o->encoding)); } else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) { - if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) - == NULL) return; + rkey *key; + if ((o = objectCommandLookupOrReply(c,c->argv[2],&key, + shared.null[c->resp])) == NULL) return; if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { addReplyError(c,"An LFU maxmemory policy is selected, idle time not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust."); return; } - addReplyLongLong(c,estimateObjectIdleTime(o)/1000); + addReplyLongLong(c,estimateObjectIdleTime(key)/1000); } else if (!strcasecmp(c->argv[1]->ptr,"freq") && c->argc == 3) { - if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) - == NULL) return; + rkey *k; + if ((o = objectCommandLookupOrReply(c,c->argv[2],&k, + shared.null[c->resp])) == NULL) return; if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LFU)) { addReplyError(c,"An LFU maxmemory policy is not selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust."); return; @@ -1286,7 +1275,7 @@ NULL * in case of the key has not been accessed for a long time, * because we update the access time only * when the key is read or overwritten. */ - addReplyLongLong(c,LFUDecrAndReturn(o)); + addReplyLongLong(c,LFUDecrAndReturn(k)); } else { addReplySubcommandSyntaxError(c); } diff --git a/src/rdb.c b/src/rdb.c index 95e4766ea..39974a33e 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -475,6 +475,7 @@ ssize_t rdbSaveStringObject(rio *rdb, robj *obj) { * RDB_LOAD_PLAIN: Return a plain string allocated with zmalloc() * instead of a Redis object with an sds in it. * RDB_LOAD_SDS: Return an SDS string instead of a Redis object. + * RDB_LOAD_KEY: Return a key object instead of a Redis object. * * On I/O error NULL is returned. */ @@ -751,7 +752,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) { /* Save a Redis object. * Returns -1 on error, number of bytes written on success. */ -ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { +ssize_t rdbSaveObject(rio *rdb, robj *o, rkey *key) { ssize_t n = 0, nwritten = 0; if (o->type == OBJ_STRING) { @@ -966,7 +967,9 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { RedisModuleIO io; moduleValue *mv = o->ptr; moduleType *mt = mv->type; - moduleInitIOContext(io,mt,rdb,key); + robj *keyobj = createStringObject(key->name,key->len); + moduleInitIOContext(io,mt,rdb,keyobj); + decrRefCount(keyobj); /* Write the "module" identifier as prefix, so that we'll be able * to call the right module during loading. */ @@ -1005,7 +1008,7 @@ size_t rdbSavedObjectLen(robj *o) { * On error -1 is returned. * On success if the key was actually saved 1 is returned, otherwise 0 * is returned (the key was already expired). */ -int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { +int rdbSaveKeyValuePair(rio *rdb, rkey *key, robj *val, long long expiretime) { int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU; int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU; @@ -1017,7 +1020,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { /* Save the LRU info. */ if (savelru) { - uint64_t idletime = estimateObjectIdleTime(val); + uint64_t idletime = estimateObjectIdleTime(key); idletime /= 1000; /* Using seconds is enough and requires less space.*/ if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1; if (rdbSaveLen(rdb,idletime) == -1) return -1; @@ -1026,7 +1029,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { /* Save the LFU info. */ if (savelfu) { uint8_t buf[1]; - buf[0] = LFUDecrAndReturn(val); + buf[0] = LFUDecrAndReturn(key); /* We can encode this in exactly two bytes: the opcode and an 8 * bit counter, since the frequency is logarithmic with a 0-255 range. * Note that we do not store the halving time because to reset it @@ -1037,7 +1040,8 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; - if (rdbSaveStringObject(rdb,key) == -1) return -1; + if (rdbSaveRawString(rdb,(unsigned char*)key->name,key->len) == -1) + return -1; if (rdbSaveObject(rdb,val,key) == -1) return -1; return 1; } @@ -1129,20 +1133,18 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { * these sizes are just hints to resize the hash tables. */ uint64_t db_size, expires_size; db_size = dictSize(db->dict); - expires_size = dictSize(db->expires); + expires_size = raxSize(db->expires); if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; if (rdbSaveLen(rdb,db_size) == -1) goto werr; if (rdbSaveLen(rdb,expires_size) == -1) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - sds keystr = dictGetKey(de); - robj key, *o = dictGetVal(de); - long long expire; + rkey *key = dictGetKey(de); + robj *o = dictGetVal(de); - initStaticStringObject(key,keystr); - expire = getExpire(db,&key); - if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr; + if (rdbSaveKeyValuePair(rdb,key,o,getExpire(key)) == -1) + goto werr; /* When this RDB is produced as part of an AOF rewrite, move * accumulated diff from parent to child while rewriting in @@ -1928,7 +1930,6 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; dictExpand(db->dict,db_size); - dictExpand(db->expires,expires_size); continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_AUX) { /* AUX: generic string-string fields. Use to add state to RDB @@ -2034,17 +2035,13 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { decrRefCount(val); } else { /* Add the new object in the hash table */ - dbAdd(db,key,val); + rkey *k = dbAdd(db,key,val); /* Set the expire time if needed */ - if (expiretime != -1) setExpire(NULL,db,key,expiretime); + if (expiretime != -1) setExpire(NULL,db,k,expiretime); /* Set usage information (for eviction). */ - objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); - - /* Decrement the key refcount since dbAdd() will take its - * own reference. */ - decrRefCount(key); + objectSetLRUOrLFU(k,lfu_freq,lru_idle,lru_clock); } /* Reset the state that is key-specified and is populated by diff --git a/src/rdb.h b/src/rdb.h index 0acddf9ab..5bd3e42b1 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -120,6 +120,7 @@ #define RDB_LOAD_ENC (1<<0) #define RDB_LOAD_PLAIN (1<<1) #define RDB_LOAD_SDS (1<<2) +#define RDB_LOAD_KEY (1<<3) #define RDB_SAVE_NONE 0 #define RDB_SAVE_AOF_PREAMBLE (1<<0) @@ -140,11 +141,11 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); int rdbSave(char *filename, rdbSaveInfo *rsi); -ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key); +ssize_t rdbSaveObject(rio *rdb, robj *o, rkey *key); size_t rdbSavedObjectLen(robj *o); robj *rdbLoadObject(int type, rio *rdb, robj *key); void backgroundSaveDoneHandler(int exitcode, int bysignal); -int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime); +int rdbSaveKeyValuePair(rio *rdb, rkey *key, robj *val, long long expiretime); robj *rdbLoadStringObject(rio *rdb); ssize_t rdbSaveStringObject(rio *rdb, robj *obj); ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len); diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 2785167a8..c8b3f41a6 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -1164,7 +1164,8 @@ static int fetchClusterSlotsConfiguration(client c) { printf("Cluster slots configuration changed, fetching new one...\n"); const char *errmsg = "Failed to update cluster slots configuration"; static dictType dtype = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ diff --git a/src/redis-cli.c b/src/redis-cli.c index e363a2795..a1f3f4010 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1987,7 +1987,8 @@ typedef struct clusterManagerLink { } clusterManagerLink; static dictType clusterManagerDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ @@ -1996,10 +1997,12 @@ static dictType clusterManagerDictType = { }; static dictType clusterManagerLinkDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ + dictSdsKeyCompare, /* lookup key compare */ + dictSdsKeyCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ dictListDestructor /* val destructor */ }; @@ -6933,7 +6936,8 @@ void type_free(void* priv_data, void* val) { } static dictType typeinfoDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ diff --git a/src/sentinel.c b/src/sentinel.c index 92ea75436..95b4ff8e4 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -402,7 +402,8 @@ void dictInstancesValDestructor (void *privdata, void *obj) { * also used for: sentinelRedisInstance->sentinels dictionary that maps * sentinels ip:port to last seen time in Pub/Sub hello message. */ dictType instancesDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* store hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ @@ -415,7 +416,8 @@ dictType instancesDictType = { * This is useful into sentinelGetObjectiveLeader() function in order to * count the votes and understand who is the leader. */ dictType leaderVotesDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ @@ -425,10 +427,12 @@ dictType leaderVotesDictType = { /* Instance renamed commands table. */ dictType renamedCommandsDictType = { - dictSdsCaseHash, /* hash function */ + dictSdsCaseHash, /* lookup hash function */ + dictSdsCaseHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCaseCompare, /* key compare */ + dictSdsKeyCaseCompare, /* lookup key compare */ + dictSdsKeyCaseCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ dictSdsDestructor /* val destructor */ }; diff --git a/src/server.c b/src/server.c index 4b87b6ac2..ee2985954 100644 --- a/src/server.c +++ b/src/server.c @@ -1142,16 +1142,33 @@ void dictListDestructor(void *privdata, void *val) listRelease((list*)val); } -int dictSdsKeyCompare(void *privdata, const void *key1, - const void *key2) -{ +static int dictGenericKeyCompare(const char *key1, const char *key2, size_t len1, size_t len2) { + if (len1 != len2) return 0; + return memcmp(key1, key2, len1) == 0; +} + +int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2) { int l1,l2; DICT_NOTUSED(privdata); l1 = sdslen((sds)key1); l2 = sdslen((sds)key2); - if (l1 != l2) return 0; - return memcmp(key1, key2, l1) == 0; + return dictGenericKeyCompare(key1,key2,l1,l2); +} + +int dictSdsRkeyKeyCompare(void *privdata, const void *key1, const void *key2){ + DICT_NOTUSED(privdata); + sds sdskey = (sds)key1; + rkey *keyobj = (rkey*)key2; + return dictGenericKeyCompare(sdskey,keyobj->name, + sdslen(sdskey),keyobj->len); +} + +int dictRkeyKeyCompare(void *privdata, const void *key1, const void *key2) { + DICT_NOTUSED(privdata); + rkey *k1 = (rkey*)key1; + rkey *k2 = (rkey*)key2; + return dictGenericKeyCompare(k1->name,k2->name,k1->len,k2->len); } /* A case insensitive version used for the command lookup table and other @@ -1179,6 +1196,13 @@ void dictSdsDestructor(void *privdata, void *val) sdsfree(val); } +void dictKeyDestructor(void *privdata, void *val) +{ + DICT_NOTUSED(privdata); + + freeKey(val); +} + int dictObjKeyCompare(void *privdata, const void *key1, const void *key2) { @@ -1195,6 +1219,11 @@ uint64_t dictSdsHash(const void *key) { return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); } +uint64_t dictKeyHash(const void *keyptr) { + const rkey *key = keyptr; + return dictGenHashFunction(key->name, key->len); +} + uint64_t dictSdsCaseHash(const void *key) { return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key)); } @@ -1243,10 +1272,12 @@ uint64_t dictEncObjHash(const void *key) { /* Generic hash table type where keys are Redis Objects, Values * dummy pointers. */ dictType objectKeyPointerValueDictType = { - dictEncObjHash, /* hash function */ + dictEncObjHash, /* lookup hash function */ + dictEncObjHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictEncObjKeyCompare, /* key compare */ + dictEncObjKeyCompare, /* lookup key compare */ + dictEncObjKeyCompare, /* stored key compare */ dictObjectDestructor, /* key destructor */ NULL /* val destructor */ }; @@ -1254,80 +1285,100 @@ dictType objectKeyPointerValueDictType = { /* Like objectKeyPointerValueDictType(), but values can be destroyed, if * not NULL, calling zfree(). */ dictType objectKeyHeapPointerValueDictType = { - dictEncObjHash, /* hash function */ + dictEncObjHash, /* lookup hash function */ + dictEncObjHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictEncObjKeyCompare, /* key compare */ + dictEncObjKeyCompare, /* lookup key compare */ + dictEncObjKeyCompare, /* stored key compare */ dictObjectDestructor, /* key destructor */ dictVanillaFree /* val destructor */ }; /* Set dictionary type. Keys are SDS strings, values are ot used. */ dictType setDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ + dictSdsKeyCompare, /* lookup key compare */ + dictSdsKeyCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */ dictType zsetDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ + dictSdsKeyCompare, /* lookup key compare */ + dictSdsKeyCompare, /* stored key compare */ NULL, /* Note: SDS string shared & freed by skiplist */ NULL /* val destructor */ }; -/* Db->dict, keys are sds strings, vals are Redis objects. */ +/* Db->dict, keys are "rkey" key objects, vals are "robj" Redis objects. + * + * Note that this dictionary is designed to be looked up via SDS strings + * even if keys are stored as rkey structures. So there are two differet + * hash functions and two different compare functions. */ dictType dbDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictKeyHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ + dictSdsRkeyKeyCompare, /* lookup key compare */ + dictRkeyKeyCompare, /* stored key compare */ + dictKeyDestructor, /* key destructor */ dictObjectDestructor /* val destructor */ }; /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */ dictType shaScriptObjectDictType = { - dictSdsCaseHash, /* hash function */ + dictSdsCaseHash, /* lookup hash function */ + dictSdsCaseHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCaseCompare, /* key compare */ + dictSdsKeyCaseCompare, /* lookup key compare */ + dictSdsKeyCaseCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ dictObjectDestructor /* val destructor */ }; /* Db->expires */ dictType keyptrDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* look hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ + dictSdsKeyCompare, /* lookup compare */ + dictSdsKeyCompare, /* stored compare */ NULL, /* key destructor */ NULL /* val destructor */ }; /* Command table. sds string -> command struct pointer. */ dictType commandTableDictType = { - dictSdsCaseHash, /* hash function */ + dictSdsCaseHash, /* lookup hash function */ + dictSdsCaseHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCaseCompare, /* key compare */ + dictSdsKeyCaseCompare, /* lookup key compare */ + dictSdsKeyCaseCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; -/* Hash type hash table (note that small hashes are represented with ziplists) */ +/* Hash values type (note that small hashes are represented with ziplists) */ dictType hashDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ + dictSdsKeyCompare, /* lookup key compare */ + dictSdsKeyCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ dictSdsDestructor /* val destructor */ }; @@ -1336,10 +1387,12 @@ dictType hashDictType = { * lists as values. It's used for blocking operations (BLPOP) and to * map swapped keys to a list of clients waiting for this keys to be loaded. */ dictType keylistDictType = { - dictObjHash, /* hash function */ + dictObjHash, /* lookup hash function */ + dictObjHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictObjKeyCompare, /* key compare */ + dictObjKeyCompare, /* lookup key compare */ + dictObjKeyCompare, /* stored key compare */ dictObjectDestructor, /* key destructor */ dictListDestructor /* val destructor */ }; @@ -1347,10 +1400,12 @@ dictType keylistDictType = { /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to * clusterNode structures. */ dictType clusterNodesDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ + dictSdsKeyCompare, /* lookup key compare */ + dictSdsKeyCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; @@ -1359,10 +1414,12 @@ dictType clusterNodesDictType = { * we can re-add this node. The goal is to avoid readding a removed * node for some time. */ dictType clusterNodesBlackListDictType = { - dictSdsCaseHash, /* hash function */ + dictSdsCaseHash, /* lookup hash function */ + dictSdsCaseHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCaseCompare, /* key compare */ + dictSdsKeyCaseCompare, /* lookup key compare */ + dictSdsKeyCaseCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; @@ -1371,20 +1428,24 @@ dictType clusterNodesBlackListDictType = { * we can re-add this node. The goal is to avoid readding a removed * node for some time. */ dictType modulesDictType = { - dictSdsCaseHash, /* hash function */ + dictSdsCaseHash, /* lookup hash function */ + dictSdsCaseHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCaseCompare, /* key compare */ + dictSdsKeyCaseCompare, /* lookup key compare */ + dictSdsKeyCaseCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; /* Migrate cache dict type. */ dictType migrateCacheDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ + dictSdsKeyCompare, /* lookup key compare */ + dictSdsKeyCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; @@ -1393,10 +1454,12 @@ dictType migrateCacheDictType = { * Keys are sds SHA1 strings, while values are not used at all in the current * implementation. */ dictType replScriptCacheDictType = { - dictSdsCaseHash, /* hash function */ + dictSdsCaseHash, /* lookup hash function */ + dictSdsCaseHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictSdsKeyCaseCompare, /* key compare */ + dictSdsKeyCaseCompare, /* lookup key compare */ + dictSdsKeyCaseCompare, /* stored key compare */ dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; @@ -1415,8 +1478,6 @@ int htNeedsResize(dict *dict) { void tryResizeHashTables(int dbid) { if (htNeedsResize(server.db[dbid].dict)) dictResize(server.db[dbid].dict); - if (htNeedsResize(server.db[dbid].expires)) - dictResize(server.db[dbid].expires); } /* Our hash table implementation performs rehashing incrementally while @@ -1432,11 +1493,6 @@ int incrementallyRehash(int dbid) { dictRehashMilliseconds(server.db[dbid].dict,1); return 1; /* already used our millisecond for this loop... */ } - /* Expires */ - if (dictIsRehashing(server.db[dbid].expires)) { - dictRehashMilliseconds(server.db[dbid].expires,1); - return 1; /* already used our millisecond for this loop... */ - } return 0; } @@ -1859,7 +1915,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { size = dictSlots(server.db[j].dict); used = dictSize(server.db[j].dict); - vkeys = dictSize(server.db[j].expires); + vkeys = raxSize(server.db[j].expires); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); /* dictPrintStats(server.dict); */ @@ -2668,7 +2724,6 @@ void resetServerStats(void) { server.stat_numcommands = 0; server.stat_numconnections = 0; server.stat_expiredkeys = 0; - server.stat_expired_stale_perc = 0; server.stat_expired_time_cap_reached_count = 0; server.stat_evictedkeys = 0; server.stat_keyspace_misses = 0; @@ -2763,7 +2818,7 @@ void initServer(void) { /* Create the Redis databases, and initialize other internal state. */ for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); - server.db[j].expires = dictCreate(&keyptrDictType,NULL); + server.db[j].expires = raxNew(); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); @@ -4109,7 +4164,6 @@ sds genRedisInfoString(char *section) { "sync_partial_ok:%lld\r\n" "sync_partial_err:%lld\r\n" "expired_keys:%lld\r\n" - "expired_stale_perc:%.2f\r\n" "expired_time_cap_reached_count:%lld\r\n" "evicted_keys:%lld\r\n" "keyspace_hits:%lld\r\n" @@ -4135,7 +4189,6 @@ sds genRedisInfoString(char *section) { server.stat_sync_partial_ok, server.stat_sync_partial_err, server.stat_expiredkeys, - server.stat_expired_stale_perc*100, server.stat_expired_time_cap_reached_count, server.stat_evictedkeys, server.stat_keyspace_hits, @@ -4331,7 +4384,7 @@ sds genRedisInfoString(char *section) { long long keys, vkeys; keys = dictSize(server.db[j].dict); - vkeys = dictSize(server.db[j].expires); + vkeys = raxSize(server.db[j].expires); if (keys || vkeys) { info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", diff --git a/src/server.h b/src/server.h index 0813f8bd1..f5262487f 100644 --- a/src/server.h +++ b/src/server.h @@ -63,8 +63,8 @@ typedef long long mstime_t; /* millisecond time type. */ #include "util.h" /* Misc functions useful in many places */ #include "latency.h" /* Latency monitor API */ #include "sparkline.h" /* ASCII graphs API */ -#include "quicklist.h" /* Lists are encoded as linked lists of - N-elements flat arrays */ +#include "quicklist.h" /* Lists are encoded as linked lists of + N-elements flat arrays */ #include "rax.h" /* Radix tree */ /* Following includes allow test functions to be called from Redis main() */ @@ -634,14 +634,13 @@ typedef struct RedisModuleDigest { #define LRU_BITS 24 #define LRU_CLOCK_MAX ((1<lru */ #define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */ +#define KEY_FLAGS_BITS (32-LRU_BITS) #define OBJ_SHARED_REFCOUNT INT_MAX typedef struct redisObject { unsigned type:4; unsigned encoding:4; - unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or - * LFU data (least significant 8 bits frequency - * and most significant 16 bits access time). */ + unsigned flags:24; int refcount; void *ptr; } robj; @@ -660,18 +659,34 @@ typedef struct redisObject { struct evictionPoolEntry; /* Defined in evict.c */ /* This structure is used in order to represent the output buffer of a client, - * which is actually a linked list of blocks like that, that is: client->reply. */ + * which is actually a linked list of clientReplyBlock blocks. + * Such list is stored in the client->reply field. */ typedef struct clientReplyBlock { size_t size, used; char buf[]; } clientReplyBlock; +/* Representation of a Redis key. This representation is just used in order + * to store keys in the main dictionary. Note that keys with an expire are + * also stored in a different data structure as well, stored in db->expires, + * for the expiration algorithm to work more efficiently. */ +#define KEY_FLAG_EXPIRE (1<<0) +typedef struct redisKey { + uint32_t len; + unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or + * LFU data (least significant 8 bits frequency + * and most significant 16 bits access time). */ + unsigned flags:KEY_FLAGS_BITS; + uint64_t expire; + char name[]; +} rkey; + /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { dict *dict; /* The keyspace for this DB */ - dict *expires; /* Timeout of keys with a timeout set */ + rax *expires; /* Sorted tree of keys with an expire set. */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ @@ -1089,7 +1104,6 @@ struct redisServer { long long stat_numcommands; /* Number of processed commands */ long long stat_numconnections; /* Number of connections received */ long long stat_expiredkeys; /* Number of expired keys */ - double stat_expired_stale_perc; /* Percentage of keys probably expired */ long long stat_expired_time_cap_reached_count; /* Early expire cylce stops.*/ long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ long long stat_keyspace_hits; /* Number of successful lookups of keys */ @@ -1674,7 +1688,7 @@ char *strEncoding(int encoding); int compareStringObjects(robj *a, robj *b); int collateStringObjects(robj *a, robj *b); int equalStringObjects(robj *a, robj *b); -unsigned long long estimateObjectIdleTime(robj *o); +unsigned long long estimateObjectIdleTime(rkey *k); void trimStringObjectIfNeeded(robj *o); #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) @@ -1941,31 +1955,34 @@ void rewriteConfigRewriteLine(struct rewriteConfigState *state, const char *opti int rewriteConfig(char *path); /* db.c -- Keyspace access API */ -int removeExpire(redisDb *db, robj *key); +int removeExpire(redisDb *db, rkey *key); void propagateExpire(redisDb *db, robj *key, int lazy); -int expireIfNeeded(redisDb *db, robj *key); -long long getExpire(redisDb *db, robj *key); -void setExpire(client *c, redisDb *db, robj *key, long long when); -robj *lookupKey(redisDb *db, robj *key, int flags); -robj *lookupKeyRead(redisDb *db, robj *key); -robj *lookupKeyWrite(redisDb *db, robj *key); -robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply); -robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply); -robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags); -robj *objectCommandLookup(client *c, robj *key); -robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply); -void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, +int expireIfNeeded(redisDb *db, robj *keyname, rkey *key); +int expireIfNeededByName(redisDb *db, robj *keyname); +long long getExpire(rkey *key); +void setExpire(client *c, redisDb *db, rkey *key, long long when); +robj *lookupKey(redisDb *db, robj *keyname, rkey **keyptr, int flags); +robj *lookupKeyRead(redisDb *db, robj *keyname, rkey **keyptr); +robj *lookupKeyWrite(redisDb *db, robj *keyname, rkey **keyptr); +robj *lookupKeyReadOrReply(client *c, robj *keyname, rkey **keyptr, robj *reply); +robj *lookupKeyWriteOrReply(client *c, robj *keyname, rkey **keyptr, robj *reply); +robj *lookupKeyReadWithFlags(redisDb *db, robj *keyname, rkey **keyptr, int flags); +void objectSetLRUOrLFU(rkey *key, long long lfu_freq, long long lru_idle, long long lru_clock); +robj *objectCommandLookup(client *c, robj *keyname, rkey **key); +robj *objectCommandLookupOrReply(client *c, robj *keyname, rkey **key, robj *reply); #define LOOKUP_NONE 0 #define LOOKUP_NOTOUCH (1<<0) -void dbAdd(redisDb *db, robj *key, robj *val); -void dbOverwrite(redisDb *db, robj *key, robj *val); -void setKey(redisDb *db, robj *key, robj *val); +rkey *dbAdd(redisDb *db, robj *key, robj *val); +rkey *dbOverwrite(redisDb *db, robj *key, robj *val); +rkey *setKey(redisDb *db, robj *keyname, robj *val); int dbExists(redisDb *db, robj *key); robj *dbRandomKey(redisDb *db); int dbSyncDelete(redisDb *db, robj *key); int dbDelete(redisDb *db, robj *key); robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); +void freeKey(rkey *key); +void removeExpireFromTree(redisDb *db, rkey *key); #define EMPTYDB_NO_FLAGS 0 /* No flags. */ #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ @@ -2043,7 +2060,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); void expireSlaveKeys(void); -void rememberSlaveKeyWithExpire(redisDb *db, robj *key); +void rememberSlaveKeyWithExpire(redisDb *db, rkey *key); void flushSlaveKeysWithExpireList(void); size_t getSlaveKeyWithExpireCount(void); @@ -2052,7 +2069,7 @@ void evictionPoolAlloc(void); #define LFU_INIT_VAL 5 unsigned long LFUGetTimeInMinutes(void); uint8_t LFULogIncr(uint8_t value); -unsigned long LFUDecrAndReturn(robj *o); +unsigned long LFUDecrAndReturn(rkey *k); /* Keys hashing / comparison functions for dict.c hash tables. */ uint64_t dictSdsHash(const void *key); diff --git a/src/sort.c b/src/sort.c index db26da158..b54a40355 100644 --- a/src/sort.c +++ b/src/sort.c @@ -107,9 +107,9 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst, int writeflag) /* Lookup substituted key */ if (!writeflag) - o = lookupKeyRead(db,keyobj); + o = lookupKeyRead(db,keyobj,NULL); else - o = lookupKeyWrite(db,keyobj); + o = lookupKeyWrite(db,keyobj,NULL); if (o == NULL) goto noobj; if (fieldobj) { @@ -271,9 +271,9 @@ void sortCommand(client *c) { /* Lookup the key to sort. It must be of the right types */ if (storekey) - sortval = lookupKeyRead(c->db,c->argv[1]); + sortval = lookupKeyRead(c->db,c->argv[1],NULL); else - sortval = lookupKeyWrite(c->db,c->argv[1]); + sortval = lookupKeyWrite(c->db,c->argv[1],NULL); if (sortval && sortval->type != OBJ_SET && sortval->type != OBJ_LIST && sortval->type != OBJ_ZSET) diff --git a/src/t_hash.c b/src/t_hash.c index bc70e4051..79c6053d8 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -449,7 +449,7 @@ sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what) { } robj *hashTypeLookupWriteOrCreate(client *c, robj *key) { - robj *o = lookupKeyWrite(c->db,key); + robj *o = lookupKeyWrite(c->db,key,NULL); if (o == NULL) { o = createHashObject(); dbAdd(c->db,key,o); @@ -679,8 +679,8 @@ static void addHashFieldToReply(client *c, robj *o, sds field) { void hgetCommand(client *c) { robj *o; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL || - checkType(c,o,OBJ_HASH)) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp])) + == NULL || checkType(c,o,OBJ_HASH)) return; addHashFieldToReply(c, o, c->argv[2]->ptr); } @@ -691,7 +691,7 @@ void hmgetCommand(client *c) { /* Don't abort when the key cannot be found. Non-existing keys are empty * hashes, where HMGET should respond with a series of null bulks. */ - o = lookupKeyRead(c->db, c->argv[1]); + o = lookupKeyRead(c->db, c->argv[1], NULL); if (o != NULL && o->type != OBJ_HASH) { addReply(c, shared.wrongtypeerr); return; @@ -707,7 +707,7 @@ void hdelCommand(client *c) { robj *o; int j, deleted = 0, keyremoved = 0; - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || + if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; for (j = 2; j < c->argc; j++) { @@ -734,7 +734,7 @@ void hdelCommand(client *c) { void hlenCommand(client *c) { robj *o; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; addReplyLongLong(c,hashTypeLength(o)); @@ -743,7 +743,7 @@ void hlenCommand(client *c) { void hstrlenCommand(client *c) { robj *o; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; addReplyLongLong(c,hashTypeGetValueLength(o,c->argv[2]->ptr)); } @@ -772,8 +772,8 @@ void genericHgetallCommand(client *c, int flags) { hashTypeIterator *hi; int length, count = 0; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL - || checkType(c,o,OBJ_HASH)) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp])) + == NULL || checkType(c,o,OBJ_HASH)) return; /* We return a map if the user requested keys and values, like in the * HGETALL case. Otherwise to use a flat array makes more sense. */ @@ -817,7 +817,7 @@ void hgetallCommand(client *c) { void hexistsCommand(client *c) { robj *o; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; addReply(c, hashTypeExists(o,c->argv[2]->ptr) ? shared.cone : shared.czero); @@ -828,7 +828,7 @@ void hscanCommand(client *c) { unsigned long cursor; if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL || - checkType(c,o,OBJ_HASH)) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.emptyscan)) + == NULL || checkType(c,o,OBJ_HASH)) return; scanGenericCommand(c,o,cursor); } diff --git a/src/t_list.c b/src/t_list.c index 54e4959b9..b8ac88e46 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -196,7 +196,7 @@ void listTypeConvert(robj *subject, int enc) { void pushGenericCommand(client *c, int where) { int j, pushed = 0; - robj *lobj = lookupKeyWrite(c->db,c->argv[1]); + robj *lobj = lookupKeyWrite(c->db,c->argv[1],NULL); if (lobj && lobj->type != OBJ_LIST) { addReply(c,shared.wrongtypeerr); @@ -235,8 +235,8 @@ void pushxGenericCommand(client *c, int where) { int j, pushed = 0; robj *subject; - if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,subject,OBJ_LIST)) return; + if ((subject = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) + == NULL || checkType(c,subject,OBJ_LIST)) return; for (j = 2; j < c->argc; j++) { listTypePush(subject,c->argv[j],where); @@ -277,8 +277,8 @@ void linsertCommand(client *c) { return; } - if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,subject,OBJ_LIST)) return; + if ((subject = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) + == NULL || checkType(c,subject,OBJ_LIST)) return; /* Seek pivot from head to tail */ iter = listTypeInitIterator(subject,0,LIST_TAIL); @@ -306,13 +306,13 @@ void linsertCommand(client *c) { } void llenCommand(client *c) { - robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero); + robj *o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero); if (o == NULL || checkType(c,o,OBJ_LIST)) return; addReplyLongLong(c,listTypeLength(o)); } void lindexCommand(client *c) { - robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]); + robj *o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp]); if (o == NULL || checkType(c,o,OBJ_LIST)) return; long index; robj *value = NULL; @@ -339,7 +339,7 @@ void lindexCommand(client *c) { } void lsetCommand(client *c) { - robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); + robj *o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.nokeyerr); if (o == NULL || checkType(c,o,OBJ_LIST)) return; long index; robj *value = c->argv[3]; @@ -365,7 +365,7 @@ void lsetCommand(client *c) { } void popGenericCommand(client *c, int where) { - robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]); + robj *o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.null[c->resp]); if (o == NULL || checkType(c,o,OBJ_LIST)) return; robj *value = listTypePop(o,where); @@ -402,8 +402,8 @@ void lrangeCommand(client *c) { if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL - || checkType(c,o,OBJ_LIST)) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp])) + == NULL || checkType(c,o,OBJ_LIST)) return; llen = listTypeLength(o); /* convert negative indexes */ @@ -448,7 +448,7 @@ void ltrimCommand(client *c) { if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL || + if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.ok)) == NULL || checkType(c,o,OBJ_LIST)) return; llen = listTypeLength(o); @@ -496,7 +496,7 @@ void lremCommand(client *c) { if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK)) return; - subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero); + subject = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero); if (subject == NULL || checkType(c,subject,OBJ_LIST)) return; listTypeIterator *li; @@ -564,7 +564,7 @@ void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) { void rpoplpushCommand(client *c) { robj *sobj, *value; - if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp])) + if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.null[c->resp])) == NULL || checkType(c,sobj,OBJ_LIST)) return; if (listTypeLength(sobj) == 0) { @@ -572,7 +572,7 @@ void rpoplpushCommand(client *c) { * versions of Redis delete keys of empty lists. */ addReplyNull(c); } else { - robj *dobj = lookupKeyWrite(c->db,c->argv[2]); + robj *dobj = lookupKeyWrite(c->db,c->argv[2],NULL); robj *touchedkey = c->argv[1]; if (dobj && checkType(c,dobj,OBJ_LIST)) return; @@ -649,7 +649,7 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb } else { /* BRPOPLPUSH */ robj *dstobj = - lookupKeyWrite(receiver->db,dstkey); + lookupKeyWrite(receiver->db,dstkey,NULL); if (!(dstobj && checkType(receiver,dstobj,OBJ_LIST))) { @@ -692,7 +692,7 @@ void blockingPopGenericCommand(client *c, int where) { != C_OK) return; for (j = 1; j < c->argc-1; j++) { - o = lookupKeyWrite(c->db,c->argv[j]); + o = lookupKeyWrite(c->db,c->argv[j],NULL); if (o != NULL) { if (o->type != OBJ_LIST) { addReply(c,shared.wrongtypeerr); @@ -753,7 +753,7 @@ void brpoplpushCommand(client *c) { if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS) != C_OK) return; - robj *key = lookupKeyWrite(c->db, c->argv[1]); + robj *key = lookupKeyWrite(c->db, c->argv[1], NULL); if (key == NULL) { if (c->flags & CLIENT_MULTI) { diff --git a/src/t_set.c b/src/t_set.c index 05d9ee243..b75918ffa 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -265,7 +265,7 @@ void saddCommand(client *c) { robj *set; int j, added = 0; - set = lookupKeyWrite(c->db,c->argv[1]); + set = lookupKeyWrite(c->db,c->argv[1],NULL); if (set == NULL) { set = setTypeCreate(c->argv[2]->ptr); dbAdd(c->db,c->argv[1],set); @@ -291,7 +291,7 @@ void sremCommand(client *c) { robj *set; int j, deleted = 0, keyremoved = 0; - if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || + if ((set = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,set,OBJ_SET)) return; for (j = 2; j < c->argc; j++) { @@ -317,8 +317,8 @@ void sremCommand(client *c) { void smoveCommand(client *c) { robj *srcset, *dstset, *ele; - srcset = lookupKeyWrite(c->db,c->argv[1]); - dstset = lookupKeyWrite(c->db,c->argv[2]); + srcset = lookupKeyWrite(c->db,c->argv[1],NULL); + dstset = lookupKeyWrite(c->db,c->argv[2],NULL); ele = c->argv[3]; /* If the source key does not exist return 0 */ @@ -373,7 +373,7 @@ void smoveCommand(client *c) { void sismemberCommand(client *c) { robj *set; - if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + if ((set = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,set,OBJ_SET)) return; if (setTypeIsMember(set,c->argv[2]->ptr)) @@ -385,7 +385,7 @@ void sismemberCommand(client *c) { void scardCommand(client *c) { robj *o; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_SET)) return; addReplyLongLong(c,setTypeSize(o)); @@ -415,7 +415,7 @@ void spopWithCountCommand(client *c) { /* Make sure a key with the name inputted exists, and that it's type is * indeed a set. Otherwise, return nil */ - if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp])) + if ((set = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.null[c->resp])) == NULL || checkType(c,set,OBJ_SET)) return; /* If count is zero, serve an empty multibulk ASAP to avoid special @@ -566,7 +566,7 @@ void spopCommand(client *c) { /* Make sure a key with the name inputted exists, and that it's type is * indeed a set */ - if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp])) + if ((set = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.null[c->resp])) == NULL || checkType(c,set,OBJ_SET)) return; /* Get a random element from the set */ @@ -632,7 +632,7 @@ void srandmemberWithCountCommand(client *c) { uniq = 0; } - if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) + if ((set = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp])) == NULL || checkType(c,set,OBJ_SET)) return; size = setTypeSize(set); @@ -760,7 +760,7 @@ void srandmemberCommand(client *c) { return; } - if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) + if ((set = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp])) == NULL || checkType(c,set,OBJ_SET)) return; encoding = setTypeRandomElement(set,&ele,&llele); @@ -802,8 +802,8 @@ void sinterGenericCommand(client *c, robj **setkeys, for (j = 0; j < setnum; j++) { robj *setobj = dstkey ? - lookupKeyWrite(c->db,setkeys[j]) : - lookupKeyRead(c->db,setkeys[j]); + lookupKeyWrite(c->db,setkeys[j],NULL) : + lookupKeyRead(c->db,setkeys[j],NULL); if (!setobj) { zfree(sets); if (dstkey) { @@ -939,8 +939,8 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, for (j = 0; j < setnum; j++) { robj *setobj = dstkey ? - lookupKeyWrite(c->db,setkeys[j]) : - lookupKeyRead(c->db,setkeys[j]); + lookupKeyWrite(c->db,setkeys[j],NULL) : + lookupKeyRead(c->db,setkeys[j],NULL); if (!setobj) { sets[j] = NULL; continue; @@ -1110,7 +1110,7 @@ void sscanCommand(client *c) { unsigned long cursor; if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return; - if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL || - checkType(c,set,OBJ_SET)) return; + if ((set = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.emptyscan)) + == NULL || checkType(c,set,OBJ_SET)) return; scanGenericCommand(c,set,cursor); } diff --git a/src/t_stream.c b/src/t_stream.c index 9e7d3d126..2fa59fa21 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1057,7 +1057,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start /* Look the stream at 'key' and return the corresponding stream object. * The function creates a key setting it to an empty stream if needed. */ robj *streamTypeLookupWriteOrCreate(client *c, robj *key) { - robj *o = lookupKeyWrite(c->db,key); + robj *o = lookupKeyWrite(c->db,key,NULL); if (o == NULL) { o = createStreamObject(); dbAdd(c->db,key,o); @@ -1287,8 +1287,8 @@ void xrangeGenericCommand(client *c, int rev) { } /* Return the specified range to the user. */ - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL || - checkType(c,o,OBJ_STREAM)) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.emptyarray)) + == NULL || checkType(c,o,OBJ_STREAM)) return; s = o->ptr; @@ -1313,7 +1313,7 @@ void xrevrangeCommand(client *c) { /* XLEN */ void xlenCommand(client *c) { robj *o; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_STREAM)) return; stream *s = o->ptr; addReplyLongLong(c,s->length); @@ -1411,7 +1411,7 @@ void xreadCommand(client *c) { * starting from now. */ int id_idx = i - streams_arg - streams_count; robj *key = c->argv[i-streams_count]; - robj *o = lookupKeyRead(c->db,key); + robj *o = lookupKeyRead(c->db,key,NULL); if (o && checkType(c,o,OBJ_STREAM)) goto cleanup; streamCG *group = NULL; @@ -1469,7 +1469,7 @@ void xreadCommand(client *c) { size_t arraylen = 0; void *arraylen_ptr = NULL; for (int i = 0; i < streams_count; i++) { - robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]); + robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i],NULL); if (o == NULL) continue; stream *s = o->ptr; streamID *gt = ids+i; /* ID must be greater than this. */ @@ -1736,7 +1736,7 @@ NULL /* Everything but the "HELP" option requires a key and group name. */ if (c->argc >= 4) { - o = lookupKeyWrite(c->db,c->argv[2]); + o = lookupKeyWrite(c->db,c->argv[2],NULL); if (o) { if (checkType(c,o,OBJ_STREAM)) return; s = o->ptr; @@ -1840,7 +1840,7 @@ NULL * * Set the internal "last ID" of a stream. */ void xsetidCommand(client *c) { - robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); + robj *o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.nokeyerr); if (o == NULL || checkType(c,o,OBJ_STREAM)) return; stream *s = o->ptr; @@ -1881,7 +1881,7 @@ void xsetidCommand(client *c) { */ void xackCommand(client *c) { streamCG *group = NULL; - robj *o = lookupKeyRead(c->db,c->argv[1]); + robj *o = lookupKeyRead(c->db,c->argv[1],NULL); if (o) { if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */ group = streamLookupCG(o->ptr,c->argv[2]->ptr); @@ -1952,7 +1952,7 @@ void xpendingCommand(client *c) { } /* Lookup the key and the group inside the stream. */ - robj *o = lookupKeyRead(c->db,c->argv[1]); + robj *o = lookupKeyRead(c->db,c->argv[1],NULL); streamCG *group; if (o && checkType(c,o,OBJ_STREAM)) return; @@ -2131,7 +2131,7 @@ void xpendingCommand(client *c) { * what messages it is now in charge of. */ void xclaimCommand(client *c) { streamCG *group = NULL; - robj *o = lookupKeyRead(c->db,c->argv[1]); + robj *o = lookupKeyRead(c->db,c->argv[1],NULL); long long minidle; /* Minimum idle time argument. */ long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */ mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */ @@ -2323,7 +2323,7 @@ void xclaimCommand(client *c) { void xdelCommand(client *c) { robj *o; - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL + if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_STREAM)) return; stream *s = o->ptr; @@ -2368,7 +2368,7 @@ void xtrimCommand(client *c) { /* If the key does not exist, we are ok returning zero, that is, the * number of elements removed from the stream. */ - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL + if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_STREAM)) return; stream *s = o->ptr; @@ -2460,7 +2460,7 @@ NULL key = c->argv[2]; /* Lookup the key now, this is common for all the subcommands but HELP. */ - robj *o = lookupKeyWriteOrReply(c,key,shared.nokeyerr); + robj *o = lookupKeyWriteOrReply(c,key,NULL,shared.nokeyerr); if (o == NULL || checkType(c,o,OBJ_STREAM)) return; s = o->ptr; diff --git a/src/t_string.c b/src/t_string.c index 5800c5c0c..6c7c666ad 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -64,7 +64,7 @@ static int checkStringLength(client *c, long long size) { #define OBJ_SET_EX (1<<2) /* Set if time in seconds is given */ #define OBJ_SET_PX (1<<3) /* Set if time in ms in given */ -void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { +void setGenericCommand(client *c, int flags, robj *keyname, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { long long milliseconds = 0; /* initialized to avoid any harmness warning */ if (expire) { @@ -77,18 +77,18 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, if (unit == UNIT_SECONDS) milliseconds *= 1000; } - if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) || - (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL)) + if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,keyname,NULL) != NULL) || + (flags & OBJ_SET_XX && lookupKeyWrite(c->db,keyname,NULL) == NULL)) { addReply(c, abort_reply ? abort_reply : shared.null[c->resp]); return; } - setKey(c->db,key,val); + rkey *key = setKey(c->db,keyname,val); server.dirty++; if (expire) setExpire(c,c->db,key,mstime()+milliseconds); - notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); + notifyKeyspaceEvent(NOTIFY_STRING,"set",keyname,c->db->id); if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, - "expire",key,c->db->id); + "expire",keyname,c->db->id); addReply(c, ok_reply ? ok_reply : shared.ok); } @@ -157,8 +157,8 @@ void psetexCommand(client *c) { int getGenericCommand(client *c) { robj *o; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL) - return C_OK; + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.null[c->resp])) + == NULL) return C_OK; if (o->type != OBJ_STRING) { addReply(c,shared.wrongtypeerr); @@ -194,7 +194,7 @@ void setrangeCommand(client *c) { return; } - o = lookupKeyWrite(c->db,c->argv[1]); + o = lookupKeyWrite(c->db,c->argv[1],NULL); if (o == NULL) { /* Return 0 when setting nothing on a non-existing string */ if (sdslen(value) == 0) { @@ -251,8 +251,8 @@ void getrangeCommand(client *c) { return; if (getLongLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK) return; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptybulk)) == NULL || - checkType(c,o,OBJ_STRING)) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.emptybulk)) == NULL + || checkType(c,o,OBJ_STRING)) return; if (o->encoding == OBJ_ENCODING_INT) { str = llbuf; @@ -287,7 +287,7 @@ void mgetCommand(client *c) { addReplyArrayLen(c,c->argc-1); for (j = 1; j < c->argc; j++) { - robj *o = lookupKeyRead(c->db,c->argv[j]); + robj *o = lookupKeyRead(c->db,c->argv[j],NULL); if (o == NULL) { addReplyNull(c); } else { @@ -312,7 +312,7 @@ void msetGenericCommand(client *c, int nx) { * set anything if at least one key alerady exists. */ if (nx) { for (j = 1; j < c->argc; j += 2) { - if (lookupKeyWrite(c->db,c->argv[j]) != NULL) { + if (lookupKeyWrite(c->db,c->argv[j],NULL) != NULL) { addReply(c, shared.czero); return; } @@ -340,7 +340,7 @@ void incrDecrCommand(client *c, long long incr) { long long value, oldvalue; robj *o, *new; - o = lookupKeyWrite(c->db,c->argv[1]); + o = lookupKeyWrite(c->db,c->argv[1],NULL); if (o != NULL && checkType(c,o,OBJ_STRING)) return; if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return; @@ -400,7 +400,7 @@ void incrbyfloatCommand(client *c) { long double incr, value; robj *o, *new, *aux; - o = lookupKeyWrite(c->db,c->argv[1]); + o = lookupKeyWrite(c->db,c->argv[1],NULL); if (o != NULL && checkType(c,o,OBJ_STRING)) return; if (getLongDoubleFromObjectOrReply(c,o,&value,NULL) != C_OK || getLongDoubleFromObjectOrReply(c,c->argv[2],&incr,NULL) != C_OK) @@ -434,7 +434,7 @@ void appendCommand(client *c) { size_t totlen; robj *o, *append; - o = lookupKeyWrite(c->db,c->argv[1]); + o = lookupKeyWrite(c->db,c->argv[1],NULL); if (o == NULL) { /* Create the key */ c->argv[2] = tryObjectEncoding(c->argv[2]); @@ -465,7 +465,7 @@ void appendCommand(client *c) { void strlenCommand(client *c) { robj *o; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_STRING)) return; addReplyLongLong(c,stringObjectLen(o)); } diff --git a/src/t_zset.c b/src/t_zset.c index fb7078abd..f6f2239e4 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1597,7 +1597,7 @@ void zaddGenericCommand(client *c, int flags) { } /* Lookup the key and create the sorted set if does not exist. */ - zobj = lookupKeyWrite(c->db,key); + zobj = lookupKeyWrite(c->db,key,NULL); if (zobj == NULL) { if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */ if (server.zset_max_ziplist_entries == 0 || @@ -1665,7 +1665,7 @@ void zremCommand(client *c) { robj *zobj; int deleted = 0, keyremoved = 0, j; - if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL || + if ((zobj = lookupKeyWriteOrReply(c,key,NULL,shared.czero)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; for (j = 2; j < c->argc; j++) { @@ -1718,7 +1718,7 @@ void zremrangeGenericCommand(client *c, int rangetype) { } /* Step 2: Lookup & range sanity checks if needed. */ - if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL || + if ((zobj = lookupKeyWriteOrReply(c,key,NULL,shared.czero)) == NULL || checkType(c,zobj,OBJ_ZSET)) goto cleanup; if (rangetype == ZRANGE_RANK) { @@ -2165,7 +2165,8 @@ uint64_t dictSdsHash(const void *key); int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2); dictType setAccumulatorDictType = { - dictSdsHash, /* hash function */ + dictSdsHash, /* lookup hash function */ + dictSdsHash, /* stored hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ @@ -2205,7 +2206,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) { /* read keys to be used for input */ src = zcalloc(sizeof(zsetopsrc) * setnum); for (i = 0, j = 3; i < setnum; i++, j++) { - robj *obj = lookupKeyWrite(c->db,c->argv[j]); + robj *obj = lookupKeyWrite(c->db,c->argv[j],NULL); if (obj != NULL) { if (obj->type != OBJ_ZSET && obj->type != OBJ_SET) { zfree(src); @@ -2427,7 +2428,7 @@ void zrangeGenericCommand(client *c, int reverse) { return; } - if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL + if ((zobj = lookupKeyReadOrReply(c,key,NULL,shared.null[c->resp])) == NULL || checkType(c,zobj,OBJ_ZSET)) return; /* Sanitize indexes. */ @@ -2575,8 +2576,8 @@ void genericZrangebyscoreCommand(client *c, int reverse) { } /* Ok, lookup the key and get the range */ - if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL || - checkType(c,zobj,OBJ_ZSET)) return; + if ((zobj = lookupKeyReadOrReply(c,key,NULL,shared.null[c->resp])) + == NULL || checkType(c,zobj,OBJ_ZSET)) return; if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; @@ -2730,7 +2731,7 @@ void zcountCommand(client *c) { } /* Lookup the sorted set */ - if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL || + if ((zobj = lookupKeyReadOrReply(c, key, NULL, shared.czero)) == NULL || checkType(c, zobj, OBJ_ZSET)) return; if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { @@ -2807,7 +2808,7 @@ void zlexcountCommand(client *c) { } /* Lookup the sorted set */ - if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL || + if ((zobj = lookupKeyReadOrReply(c, key, NULL, shared.czero)) == NULL || checkType(c, zobj, OBJ_ZSET)) { zslFreeLexRange(&range); @@ -2920,8 +2921,8 @@ void genericZrangebylexCommand(client *c, int reverse) { } /* Ok, lookup the key and get the range */ - if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL || - checkType(c,zobj,OBJ_ZSET)) + if ((zobj = lookupKeyReadOrReply(c,key,NULL,shared.null[c->resp])) + == NULL || checkType(c,zobj,OBJ_ZSET)) { zslFreeLexRange(&range); return; @@ -3065,7 +3066,7 @@ void zcardCommand(client *c) { robj *key = c->argv[1]; robj *zobj; - if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL || + if ((zobj = lookupKeyReadOrReply(c,key,NULL,shared.czero)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; addReplyLongLong(c,zsetLength(zobj)); @@ -3076,8 +3077,8 @@ void zscoreCommand(client *c) { robj *zobj; double score; - if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL || - checkType(c,zobj,OBJ_ZSET)) return; + if ((zobj = lookupKeyReadOrReply(c,key,NULL,shared.null[c->resp])) == NULL + || checkType(c,zobj,OBJ_ZSET)) return; if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR) { addReplyNull(c); @@ -3092,8 +3093,8 @@ void zrankGenericCommand(client *c, int reverse) { robj *zobj; long rank; - if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL || - checkType(c,zobj,OBJ_ZSET)) return; + if ((zobj = lookupKeyReadOrReply(c,key,NULL,shared.null[c->resp])) + == NULL || checkType(c,zobj,OBJ_ZSET)) return; serverAssertWithInfo(c,ele,sdsEncodedObject(ele)); rank = zsetRank(zobj,ele->ptr,reverse); @@ -3117,8 +3118,8 @@ void zscanCommand(client *c) { unsigned long cursor; if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL || - checkType(c,o,OBJ_ZSET)) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.emptyscan)) + == NULL || checkType(c,o,OBJ_ZSET)) return; scanGenericCommand(c,o,cursor); } @@ -3153,7 +3154,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey idx = 0; while (idx < keyc) { key = keyv[idx++]; - zobj = lookupKeyWrite(c->db,key); + zobj = lookupKeyWrite(c->db,key,NULL); if (!zobj) continue; if (checkType(c,zobj,OBJ_ZSET)) return; break; @@ -3265,7 +3266,7 @@ void blockingGenericZpopCommand(client *c, int where) { != C_OK) return; for (j = 1; j < c->argc-1; j++) { - o = lookupKeyWrite(c->db,c->argv[j]); + o = lookupKeyWrite(c->db,c->argv[j],NULL); if (o != NULL) { if (o->type != OBJ_ZSET) { addReply(c,shared.wrongtypeerr); -- cgit v1.2.1